Register accumulators on stat engine to centralize reports. TODO remove from engine...
[anna.git] / source / diameter.comm / Server.cpp
1 // ANNA - Anna is Not Nothingness Anymore                                                         //
2 //                                                                                                //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo                         //
4 //                                                                                                //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite                           //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
7
8
9 #include <anna/diameter.comm/ClientSession.hpp>
10 #include <anna/diameter.comm/Entity.hpp>
11 #include <anna/diameter.comm/Engine.hpp>
12 #include <anna/diameter.comm/OamModule.hpp>
13 #include <anna/diameter/codec/functions.hpp>
14 #include <anna/diameter/helpers/base/defines.hpp>
15 #include <anna/core/functions.hpp>
16 #include <anna/statistics/Engine.hpp>
17 #include <anna/time/functions.hpp>
18
19 #include <anna/core/tracing/Logger.hpp>
20 #include <anna/core/functions.hpp>
21 #include <anna/core/tracing/TraceMethod.hpp>
22
23 // STL
24 #include <vector>
25
26 using namespace anna;
27 using namespace anna::diameter;
28 using namespace anna::diameter::comm;
29
30
31 void Server::initialize() throw() {
32   a_parent = NULL;
33   a_engine = NULL;
34   a_clientSessions.clear(); // importante (el recycler creo que no lo tocaba)
35   a_available = false;
36   a_maxClientSessions = 1; // mono client connection
37   a_lastIncomingActivityTime = (anna::Millisecond)0;
38   a_lastOutgoingActivityTime = (anna::Millisecond)0;
39   a_statisticsAccumulator = anna::statistics::Engine::instantiate().createAccumulator();
40   a_lastUsedResource = NULL;
41 }
42
43 void Server::initializeStatisticConcepts() throw() {
44   // Realm name
45   std::string realmName = a_engine ? a_engine->getRealm() : "unknown"; // it should be known (createServer)
46
47   // Statistics:
48   anna::statistics::Engine& statsEngine = anna::statistics::Engine::instantiate();
49   // Concepts descriptions:
50   std::string serverAsString = anna::functions::socketLiteralAsString(a_socket.first, a_socket.second);
51   std::string c1desc = "Diameter processing time (for requests) at servers on "; c1desc += serverAsString; c1desc += " for realm '"; c1desc += realmName; c1desc += "'";
52   std::string c2desc = "Diameter message sizes received from servers on "; c2desc += serverAsString; c2desc += " for realm '"; c2desc += realmName; c2desc += "'";
53   // Registering
54   a_processing_time__StatisticConceptId = statsEngine.addConcept(c1desc.c_str(), "ms", true/* integer values */);
55   a_received_message_size__StatisticConceptId = statsEngine.addConcept(c2desc.c_str(), "bytes", true/* integer values */);
56 }
57
58 void Server::resetStatistics() throw() {
59   a_statisticsAccumulator->reset();
60 }
61
62 void Server::updateProcessingTimeStatisticConcept(const double &value) throw() {
63   a_statisticsAccumulator->process(a_processing_time__StatisticConceptId, value);
64   LOGDEBUG(anna::Logger::debug(a_statisticsAccumulator->asString(), ANNA_FILE_LOCATION));
65 }
66
67 void Server::updateReceivedMessageSizeStatisticConcept(const double &value) throw() {
68   a_statisticsAccumulator->process(a_received_message_size__StatisticConceptId, value);
69   //LOGDEBUG(anna::Logger::debug(a_statisticsAccumulator->asString(), ANNA_FILE_LOCATION));
70 }
71
72
73 void Server::assertReady() throw(anna::RuntimeException) {
74   if(a_clientSessions.size() != a_maxClientSessions) {
75     std::string msg(asString());
76     msg += " | Non-configured server: you must add the remaining client-sessions before any operation (bind, send, etc.)";
77     throw anna::RuntimeException(msg, ANNA_FILE_LOCATION);
78   }
79 }
80
81
82 void Server::addClientSession(int socketId)
83 throw(anna::RuntimeException) {
84   if(a_clientSessions.size() == a_maxClientSessions) {
85     LOGDEBUG
86     (
87       std::string msg = "Maximum number of client-sessions reached for this server (";
88       msg += anna::functions::asString(a_maxClientSessions);
89       msg += ").";
90       anna::Logger::debug(msg, ANNA_FILE_LOCATION);
91     );
92     return;
93   }
94
95   if(!a_engine)
96     throw anna::RuntimeException("Invalid engine reference (NULL)", ANNA_FILE_LOCATION);
97
98   ClientSession *s = a_engine->createClientSession(this, socketId);
99   a_clientSessions.push_back(s);
100 }
101
102 int Server::getOTARequests() const throw() {
103   int result = 0;
104
105   for(std::vector<ClientSession*>::const_iterator it = begin(); it != end(); it++)
106     result += (*it)->getOTARequests();
107
108   return result;
109 }
110
111
112 bool Server::send(const Message* message, int socketId) throw(anna::RuntimeException) {
113   LOGMETHOD(anna::TraceMethod tttm("diameter::comm::Server", "send", ANNA_FILE_LOCATION));
114   assertReady();
115   bool fixedSocket = (socketId != -1);
116   int clientSessions = getNumberOfClientSessions();
117
118   for(int k = 0; k < clientSessions; k++) {    // try round-robin only over one cycle,
119     // no matter where you are: don't repeat same socket
120     if(fixedSocket)
121       a_lastUsedResource = a_engine->findClientSession(a_socket.first /*ip*/, a_socket.second /*port*/, socketId); // exception if not found
122     else {
123       if(a_deliveryIterator == end()) a_deliveryIterator = begin();
124
125       a_lastUsedResource = (*a_deliveryIterator);
126
127       if(clientSessions > 1) a_deliveryIterator++;  // Round robin
128     }
129
130     try {
131       // Send:
132       const Response* response = a_lastUsedResource->send(message);
133       return true; // no matter if response is NULL (answers, i.e.) or not.
134     } catch(anna::RuntimeException &ex) {
135       LOGDEBUG(
136         std::string msg = ex.getText();
137         msg += " | Send failed on socket id ";
138         msg += anna::functions::asString(a_lastUsedResource->getSocketId());
139         anna::Logger::debug(msg, ANNA_FILE_LOCATION);
140       );
141     }
142
143     // Specific socket sending doesn't try
144     if(fixedSocket) return false;
145   }
146
147   return false;
148 }
149
150
151 bool Server::broadcast(const Message* message) throw(anna::RuntimeException) {
152   LOGMETHOD(anna::TraceMethod tttm("diameter::comm::Server", "broadcast", ANNA_FILE_LOCATION));
153   assertReady();
154   const Response* response;
155   bool allok = true;
156
157   for(std::vector<ClientSession*>::iterator it = begin(); it != end(); it++) {
158     try {
159       response = (*it)->send(message);
160     } catch(anna::RuntimeException &ex) {
161       ex.trace();
162       allok = false;
163     }
164   }
165
166   return allok;
167 }
168
169
170 bool Server::bind() throw(anna::RuntimeException) {
171   LOGMETHOD(anna::TraceMethod tttm("diameter::comm::Server", "bind", ANNA_FILE_LOCATION));
172   assertReady();
173   a_deliveryIterator = begin();
174   bool result = true; // all OK return
175
176   for(std::vector<ClientSession*>::iterator it = begin(); it != end(); it++) {
177     try {
178       (*it)->bind();
179     } catch(anna::RuntimeException &ex) {
180       ex.trace();
181       result = false;
182     }
183   }
184
185   return result;
186 }
187
188 void Server::raiseAutoRecovery(bool autoRecovery) throw(anna::RuntimeException) {
189   LOGMETHOD(anna::TraceMethod tttm("diameter::comm::Server", "raiseAutoRecovery", ANNA_FILE_LOCATION));
190   assertReady();
191   a_deliveryIterator = begin();
192
193   for(std::vector<ClientSession*>::iterator it = begin(); it != end(); it++)
194     (*it)->setAutoRecovery(autoRecovery);
195 }
196
197 void Server::setClassCodeTimeout(const ClassCode::_v v, const anna::Millisecond & millisecond) throw() {
198   LOGMETHOD(anna::TraceMethod tttm("diameter::comm::Server", "setClassCodeTimeout", ANNA_FILE_LOCATION));
199   assertReady();
200
201   for(std::vector<ClientSession*>::iterator it = begin(); it != end(); it++) {
202     try {
203       (*it)->setClassCodeTimeout(v, millisecond);
204     } catch(anna::RuntimeException &ex) {
205       ex.trace();
206     }
207   }
208 }
209
210
211 // Private close/destroy method
212 void Server::close(bool destroy) throw(anna::RuntimeException) {
213   LOGMETHOD(anna::TraceMethod tttm("diameter::comm::Server", "close", ANNA_FILE_LOCATION));
214
215   if(!a_engine)
216     throw anna::RuntimeException("Invalid engine reference (NULL)", ANNA_FILE_LOCATION);
217
218   assertReady();
219
220   for(std::vector<ClientSession*>::iterator it = begin(); it != end(); it++)
221     a_engine->closeClientSession(*it, destroy);
222 }
223
224
225 void Server::childIdle() const throw() {
226   // Check father entity idleness:
227   if(idle()) a_parent->childIdle();
228 }
229
230
231 void Server::hide() throw() {
232   for(std::vector<ClientSession*>::iterator it = begin(); it != end(); it++)
233     (*it)->hide();
234 }
235
236 void Server::show() throw() {
237   for(std::vector<ClientSession*>::iterator it = begin(); it != end(); it++)
238     (*it)->show();
239 }
240
241 bool Server::hidden() const throw() {
242   for(std::vector<ClientSession*>::const_iterator it = begin(); it != end(); it++)
243     if((*it)->shown()) return false;
244
245   return true;
246 }
247 bool Server::shown() const throw() {
248   for(std::vector<ClientSession*>::const_iterator it = begin(); it != end(); it++)
249     if((*it)->hidden()) return false;
250
251   return true;
252 }
253
254
255
256 std::string Server::socketAsString() const throw() {
257   std::string result = getAddress();
258   result += ":";
259   result += anna::functions::asString(getPort());
260   return result;
261 }
262
263
264 std::string Server::asString() const throw() {
265   std::string result("diameter::comm::Server { ");
266   result += " | Parent Entity: ";
267   result += a_parent->getDescription();
268   result += " | Server Address: ";
269   result += a_socket.first;
270   result += " | Server Port: ";
271   result += anna::functions::asString(a_socket.second);
272   result += " | Available: ";
273   result += a_available ? "yes" : "no";
274   result += " | Max client-sessions supported: ";
275   result += anna::functions::asString(a_maxClientSessions);
276   result += " | Currently configured client-sessions: ";
277   result += anna::functions::asString(a_clientSessions.size());
278   result += anna::functions::asString(" | OTA requests: %d%s", getOTARequests(), idle() ? " (idle)" : "");
279   result += " | Last Incoming Activity Time: ";
280   result += a_lastIncomingActivityTime.asString();
281   result += " | Last Outgoing Activity Time: ";
282   result += a_lastOutgoingActivityTime.asString();
283   result += " | Hidden: ";
284   result += (hidden() ? "yes" : "no");
285   result += "\n";
286   result += a_statisticsAccumulator->asString();
287
288   for(std::vector<ClientSession*>::const_iterator it = begin(); it != end(); it++) {
289     result += "\n";
290     result += (*it)->asString();
291   }
292
293   return result;
294 }
295
296 anna::xml::Node* Server::asXML(anna::xml::Node* parent) const throw() {
297   anna::xml::Node* result = parent->createChild("diameter.Server");
298   result->createAttribute("ParentEntity", a_parent->getDescription());
299   result->createAttribute("ServerAddress", a_socket.first);
300   result->createAttribute("ServerPort", anna::functions::asString(a_socket.second));
301   result->createAttribute("Available", a_available ? "yes" : "no");
302   result->createAttribute("MaxClientSessionsSupported", anna::functions::asString(a_maxClientSessions));
303   result->createAttribute("CurrentlyConfiguredClientSessions", anna::functions::asString(a_clientSessions.size()));
304   result->createAttribute("OTArequests", anna::functions::asString("%d%s", getOTARequests(), idle() ? " (idle)" : ""));
305   result->createAttribute("LastIncomingActivityTime", a_lastIncomingActivityTime.asString());
306   result->createAttribute("LastOutgoingActivityTime", a_lastOutgoingActivityTime.asString());
307   result->createAttribute("Hidden", hidden() ? "yes" : "no");
308   // Statistics
309   anna::xml::Node* stats = result->createChild("Statistics");
310   a_statisticsAccumulator->asXML(stats);
311   anna::xml::Node* clientSessions = result->createChild("Server.ClientSessions");
312
313   for(std::vector<ClientSession*>::const_iterator it = begin(); it != end(); it++)
314     (*it)->asXML(clientSessions);
315
316   return result;
317 }
318
319
320 void Server::eventPeerShutdown(const ClientSession *clientSession) throw() {
321   // Inform father entity:
322   a_parent->eventPeerShutdown(clientSession);
323 }
324
325 void Server::eventRequestRetransmission(const ClientSession* clientSession, Message *request) throw() {
326   // Inform father entity:
327   a_parent->eventRequestRetransmission(clientSession, request);
328 }
329
330 void Server::eventResponse(const Response& response) throw(anna::RuntimeException) {
331   // Inform father entity:
332   a_parent->eventResponse(response);
333 }
334
335 void Server::eventRequest(ClientSession *clientSession, const anna::DataBlock & request) throw(anna::RuntimeException) {
336   // Inform father entity:
337   a_parent->eventRequest(clientSession, request);
338 }
339
340 void Server::eventUnknownResponse(ClientSession *clientSession, const anna::DataBlock & response) throw(anna::RuntimeException) {
341   // Inform father entity:
342   a_parent->eventUnknownResponse(clientSession, response);
343 }
344
345 void Server::eventDPA(ClientSession *clientSession, const anna::DataBlock & response) throw(anna::RuntimeException) {
346   // Inform father entity:
347   a_parent->eventDPA(clientSession, response);
348 }
349
350
351 void Server::availabilityLost() throw() {
352   a_available = false;
353   std::string socket = anna::functions::socketLiteralAsString(a_socket.first, a_socket.second);
354   LOGDEBUG(
355     std::string msg = "diameter::comm::Server { Socket: ";
356     msg += socket;
357     msg += " } has lost its availability";
358     anna::Logger::debug(msg, ANNA_FILE_LOCATION);
359   );
360   // OAM
361   OamModule &oamModule = OamModule::instantiate();
362   oamModule.activateAlarm(OamModule::Alarm::c_LostAvailabilityOverServerDefinedAs__s__, socket.c_str());
363   oamModule.count(OamModule::Counter::LostAvailabilityOverServer);
364   a_engine->availabilityLost(this);
365   a_parent->refreshAvailability();
366 }
367
368
369 void Server::availabilityRecovered() throw() {
370   a_available = true;
371   std::string socket = anna::functions::socketLiteralAsString(a_socket.first, a_socket.second);
372   LOGDEBUG(
373     std::string msg = "diameter::comm::Server { Socket: ";
374     msg += socket;
375     msg += " } has recovered its availability";
376     anna::Logger::debug(msg, ANNA_FILE_LOCATION);
377   );
378   // OAM
379   OamModule &oamModule = OamModule::instantiate();
380   oamModule.cancelAlarm(OamModule::Alarm::c_LostAvailabilityOverServerDefinedAs__s__, socket.c_str());
381   oamModule.count(OamModule::Counter::RecoveredAvailabilityOverServer);
382   a_engine->availabilityRecovered(this);
383   a_parent->refreshAvailability();
384 }
385
386
387
388 bool Server::refreshAvailability() throw() {
389   // Here available
390   if(a_available) {  // check not-bound state for all client-sessions:
391     bool isolate = true;
392
393     for(std::vector<ClientSession*>::const_iterator it = begin(); it != end(); it++)
394
395       //if ((*it)->getState() == ClientSession::State::Bound) { isolate = false; break; }
396       if((*it)->getState() != ClientSession::State::Closed) { isolate = false; break; }
397
398     if(isolate) {
399       availabilityLost();
400       return true;
401     }
402
403     return false;
404   }
405
406   // Here not available
407   for(std::vector<ClientSession*>::const_iterator it = begin(); it != end(); it++)
408     if((*it)->getState() == ClientSession::State::Bound) {
409       availabilityRecovered();
410       return true;
411     }
412
413   return false;
414 }
415
416
417 //------------------------------------------------------------------------------
418 //---------------------------------------- Server::updateIncomingActivityTime()
419 //------------------------------------------------------------------------------
420 void Server::updateIncomingActivityTime() throw() {
421   a_lastIncomingActivityTime = anna::functions::millisecond();
422   LOGDEBUG
423   (
424     std::string msg = "Updated INCOMING activity on server (milliseconds unix): ";
425     msg += anna::functions::asString(a_lastIncomingActivityTime.getValue());
426     anna::Logger::debug(msg, ANNA_FILE_LOCATION);
427   );
428   a_parent->updateIncomingActivityTime();
429 }
430
431
432 //------------------------------------------------------------------------------
433 //---------------------------------------- Server::updateOutgoingActivityTime()
434 //------------------------------------------------------------------------------
435 void Server::updateOutgoingActivityTime(void) throw() {
436   a_lastOutgoingActivityTime = anna::functions::millisecond();
437   LOGDEBUG
438   (
439     std::string msg = "Updated OUTGOING activity on server (milliseconds unix): "; msg += anna::functions::asString(a_lastOutgoingActivityTime.getValue());
440     anna::Logger::debug(msg, ANNA_FILE_LOCATION);
441   );
442   a_parent->updateOutgoingActivityTime();
443 }