Include accumulators on statistics engine in order to centralize and ease asXML....
[anna.git] / source / diameter.comm / Server.cpp
1 // ANNA - Anna is Not Nothingness Anymore                                                         //
2 //                                                                                                //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo                         //
4 //                                                                                                //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite                           //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
7
8
9 #include <anna/diameter.comm/ClientSession.hpp>
10 #include <anna/diameter.comm/Entity.hpp>
11 #include <anna/diameter.comm/Engine.hpp>
12 #include <anna/diameter.comm/OamModule.hpp>
13 #include <anna/diameter/codec/functions.hpp>
14 #include <anna/diameter/helpers/base/defines.hpp>
15 #include <anna/core/functions.hpp>
16 #include <anna/statistics/Engine.hpp>
17 #include <anna/time/functions.hpp>
18
19 #include <anna/core/tracing/Logger.hpp>
20 #include <anna/core/functions.hpp>
21 #include <anna/core/tracing/TraceMethod.hpp>
22
23 // STL
24 #include <vector>
25
26 using namespace anna;
27 using namespace anna::diameter;
28 using namespace anna::diameter::comm;
29
30
31 void Server::initialize() throw() {
32   a_parent = NULL;
33   a_engine = NULL;
34   a_clientSessions.clear(); // importante (el recycler creo que no lo tocaba)
35   a_available = false;
36   a_maxClientSessions = 1; // mono client connection
37   a_lastIncomingActivityTime = (anna::Millisecond)0;
38   a_lastOutgoingActivityTime = (anna::Millisecond)0;
39   a_statisticsAccumulator = NULL;
40   a_lastUsedResource = NULL;
41 }
42
43 void Server::initializeStatisticResources() throw() {
44   std::string accName = "remote server '";
45   accName += anna::functions::socketLiteralAsString(a_socket.first, a_socket.second);
46   accName += "' from realm '";
47   accName += a_engine ? a_engine->getRealm() : "unknown"; // it should be known (createServer)
48   accName += "'";
49   a_statisticsAccumulator = anna::statistics::Engine::instantiate().createAccumulator(accName);
50   a_processing_time__StatisticConceptId = a_statisticsAccumulator->addConcept("Diameter requests processing time at", "ms", true/* integer values */);
51   a_received_message_size__StatisticConceptId = a_statisticsAccumulator->addConcept("Diameter message sizes received from", "bytes", true/* integer values */);
52 }
53
54 void Server::resetStatistics() throw() {
55   a_statisticsAccumulator->reset();
56 }
57
58 void Server::updateProcessingTimeStatisticConcept(const double &value) throw() {
59   a_statisticsAccumulator->process(a_processing_time__StatisticConceptId, value);
60   LOGDEBUG(anna::Logger::debug(a_statisticsAccumulator->asString(), ANNA_FILE_LOCATION));
61 }
62
63 void Server::updateReceivedMessageSizeStatisticConcept(const double &value) throw() {
64   a_statisticsAccumulator->process(a_received_message_size__StatisticConceptId, value);
65   //LOGDEBUG(anna::Logger::debug(a_statisticsAccumulator->asString(), ANNA_FILE_LOCATION));
66 }
67
68
69 void Server::assertReady() throw(anna::RuntimeException) {
70   if(a_clientSessions.size() != a_maxClientSessions) {
71     std::string msg(asString());
72     msg += " | Non-configured server: you must add the remaining client-sessions before any operation (bind, send, etc.)";
73     throw anna::RuntimeException(msg, ANNA_FILE_LOCATION);
74   }
75 }
76
77
78 void Server::addClientSession(int socketId)
79 throw(anna::RuntimeException) {
80   if(a_clientSessions.size() == a_maxClientSessions) {
81     LOGDEBUG
82     (
83       std::string msg = "Maximum number of client-sessions reached for this server (";
84       msg += anna::functions::asString(a_maxClientSessions);
85       msg += ").";
86       anna::Logger::debug(msg, ANNA_FILE_LOCATION);
87     );
88     return;
89   }
90
91   if(!a_engine)
92     throw anna::RuntimeException("Invalid engine reference (NULL)", ANNA_FILE_LOCATION);
93
94   ClientSession *s = a_engine->createClientSession(this, socketId);
95   a_clientSessions.push_back(s);
96 }
97
98 int Server::getOTARequests() const throw() {
99   int result = 0;
100
101   for(std::vector<ClientSession*>::const_iterator it = begin(); it != end(); it++)
102     result += (*it)->getOTARequests();
103
104   return result;
105 }
106
107
108 bool Server::send(const Message* message, int socketId) throw(anna::RuntimeException) {
109   LOGMETHOD(anna::TraceMethod tttm("diameter::comm::Server", "send", ANNA_FILE_LOCATION));
110   assertReady();
111   bool fixedSocket = (socketId != -1);
112   int clientSessions = getNumberOfClientSessions();
113
114   for(int k = 0; k < clientSessions; k++) {    // try round-robin only over one cycle,
115     // no matter where you are: don't repeat same socket
116     if(fixedSocket)
117       a_lastUsedResource = a_engine->findClientSession(a_socket.first /*ip*/, a_socket.second /*port*/, socketId); // exception if not found
118     else {
119       if(a_deliveryIterator == end()) a_deliveryIterator = begin();
120
121       a_lastUsedResource = (*a_deliveryIterator);
122
123       if(clientSessions > 1) a_deliveryIterator++;  // Round robin
124     }
125
126     try {
127       // Send:
128       const Response* response = a_lastUsedResource->send(message);
129       return true; // no matter if response is NULL (answers, i.e.) or not.
130     } catch(anna::RuntimeException &ex) {
131       LOGDEBUG(
132         std::string msg = ex.getText();
133         msg += " | Send failed on socket id ";
134         msg += anna::functions::asString(a_lastUsedResource->getSocketId());
135         anna::Logger::debug(msg, ANNA_FILE_LOCATION);
136       );
137     }
138
139     // Specific socket sending doesn't try
140     if(fixedSocket) return false;
141   }
142
143   return false;
144 }
145
146
147 bool Server::broadcast(const Message* message) throw(anna::RuntimeException) {
148   LOGMETHOD(anna::TraceMethod tttm("diameter::comm::Server", "broadcast", ANNA_FILE_LOCATION));
149   assertReady();
150   const Response* response;
151   bool allok = true;
152
153   for(std::vector<ClientSession*>::iterator it = begin(); it != end(); it++) {
154     try {
155       response = (*it)->send(message);
156     } catch(anna::RuntimeException &ex) {
157       ex.trace();
158       allok = false;
159     }
160   }
161
162   return allok;
163 }
164
165
166 bool Server::bind() throw(anna::RuntimeException) {
167   LOGMETHOD(anna::TraceMethod tttm("diameter::comm::Server", "bind", ANNA_FILE_LOCATION));
168   assertReady();
169   a_deliveryIterator = begin();
170   bool result = true; // all OK return
171
172   for(std::vector<ClientSession*>::iterator it = begin(); it != end(); it++) {
173     try {
174       (*it)->bind();
175     } catch(anna::RuntimeException &ex) {
176       ex.trace();
177       result = false;
178     }
179   }
180
181   return result;
182 }
183
184 void Server::raiseAutoRecovery(bool autoRecovery) throw(anna::RuntimeException) {
185   LOGMETHOD(anna::TraceMethod tttm("diameter::comm::Server", "raiseAutoRecovery", ANNA_FILE_LOCATION));
186   assertReady();
187   a_deliveryIterator = begin();
188
189   for(std::vector<ClientSession*>::iterator it = begin(); it != end(); it++)
190     (*it)->setAutoRecovery(autoRecovery);
191 }
192
193 void Server::setClassCodeTimeout(const ClassCode::_v v, const anna::Millisecond & millisecond) throw() {
194   LOGMETHOD(anna::TraceMethod tttm("diameter::comm::Server", "setClassCodeTimeout", ANNA_FILE_LOCATION));
195   assertReady();
196
197   for(std::vector<ClientSession*>::iterator it = begin(); it != end(); it++) {
198     try {
199       (*it)->setClassCodeTimeout(v, millisecond);
200     } catch(anna::RuntimeException &ex) {
201       ex.trace();
202     }
203   }
204 }
205
206
207 // Private close/destroy method
208 void Server::close(bool destroy) throw(anna::RuntimeException) {
209   LOGMETHOD(anna::TraceMethod tttm("diameter::comm::Server", "close", ANNA_FILE_LOCATION));
210
211   if(!a_engine)
212     throw anna::RuntimeException("Invalid engine reference (NULL)", ANNA_FILE_LOCATION);
213
214   assertReady();
215
216   for(std::vector<ClientSession*>::iterator it = begin(); it != end(); it++)
217     a_engine->closeClientSession(*it, destroy);
218 }
219
220
221 void Server::childIdle() const throw() {
222   // Check father entity idleness:
223   if(idle()) a_parent->childIdle();
224 }
225
226
227 void Server::hide() throw() {
228   for(std::vector<ClientSession*>::iterator it = begin(); it != end(); it++)
229     (*it)->hide();
230 }
231
232 void Server::show() throw() {
233   for(std::vector<ClientSession*>::iterator it = begin(); it != end(); it++)
234     (*it)->show();
235 }
236
237 bool Server::hidden() const throw() {
238   for(std::vector<ClientSession*>::const_iterator it = begin(); it != end(); it++)
239     if((*it)->shown()) return false;
240
241   return true;
242 }
243 bool Server::shown() const throw() {
244   for(std::vector<ClientSession*>::const_iterator it = begin(); it != end(); it++)
245     if((*it)->hidden()) return false;
246
247   return true;
248 }
249
250
251
252 std::string Server::socketAsString() const throw() {
253   std::string result = getAddress();
254   result += ":";
255   result += anna::functions::asString(getPort());
256   return result;
257 }
258
259
260 std::string Server::asString() const throw() {
261   std::string result("diameter::comm::Server { ");
262   result += " | Parent Entity: ";
263   result += a_parent->getDescription();
264   result += " | Server Address: ";
265   result += a_socket.first;
266   result += " | Server Port: ";
267   result += anna::functions::asString(a_socket.second);
268   result += " | Available: ";
269   result += a_available ? "yes" : "no";
270   result += " | Max client-sessions supported: ";
271   result += anna::functions::asString(a_maxClientSessions);
272   result += " | Currently configured client-sessions: ";
273   result += anna::functions::asString(a_clientSessions.size());
274   result += anna::functions::asString(" | OTA requests: %d%s", getOTARequests(), idle() ? " (idle)" : "");
275   result += " | Last Incoming Activity Time: ";
276   result += a_lastIncomingActivityTime.asString();
277   result += " | Last Outgoing Activity Time: ";
278   result += a_lastOutgoingActivityTime.asString();
279   result += " | Hidden: ";
280   result += (hidden() ? "yes" : "no");
281   result += "\n";
282   result += a_statisticsAccumulator->asString();
283
284   for(std::vector<ClientSession*>::const_iterator it = begin(); it != end(); it++) {
285     result += "\n";
286     result += (*it)->asString();
287   }
288
289   return result;
290 }
291
292 anna::xml::Node* Server::asXML(anna::xml::Node* parent) const throw() {
293   anna::xml::Node* result = parent->createChild("diameter.Server");
294   result->createAttribute("ParentEntity", a_parent->getDescription());
295   result->createAttribute("ServerAddress", a_socket.first);
296   result->createAttribute("ServerPort", anna::functions::asString(a_socket.second));
297   result->createAttribute("Available", a_available ? "yes" : "no");
298   result->createAttribute("MaxClientSessionsSupported", anna::functions::asString(a_maxClientSessions));
299   result->createAttribute("CurrentlyConfiguredClientSessions", anna::functions::asString(a_clientSessions.size()));
300   result->createAttribute("OTArequests", anna::functions::asString("%d%s", getOTARequests(), idle() ? " (idle)" : ""));
301   result->createAttribute("LastIncomingActivityTime", a_lastIncomingActivityTime.asString());
302   result->createAttribute("LastOutgoingActivityTime", a_lastOutgoingActivityTime.asString());
303   result->createAttribute("Hidden", hidden() ? "yes" : "no");
304   // Statistics
305   anna::xml::Node* stats = result->createChild("Statistics");
306   a_statisticsAccumulator->asXML(stats);
307   anna::xml::Node* clientSessions = result->createChild("Server.ClientSessions");
308
309   for(std::vector<ClientSession*>::const_iterator it = begin(); it != end(); it++)
310     (*it)->asXML(clientSessions);
311
312   return result;
313 }
314
315
316 void Server::eventPeerShutdown(const ClientSession *clientSession) throw() {
317   // Inform father entity:
318   a_parent->eventPeerShutdown(clientSession);
319 }
320
321 void Server::eventRequestRetransmission(const ClientSession* clientSession, Message *request) throw() {
322   // Inform father entity:
323   a_parent->eventRequestRetransmission(clientSession, request);
324 }
325
326 void Server::eventResponse(const Response& response) throw(anna::RuntimeException) {
327   // Inform father entity:
328   a_parent->eventResponse(response);
329 }
330
331 void Server::eventRequest(ClientSession *clientSession, const anna::DataBlock & request) throw(anna::RuntimeException) {
332   // Inform father entity:
333   a_parent->eventRequest(clientSession, request);
334 }
335
336 void Server::eventUnknownResponse(ClientSession *clientSession, const anna::DataBlock & response) throw(anna::RuntimeException) {
337   // Inform father entity:
338   a_parent->eventUnknownResponse(clientSession, response);
339 }
340
341 void Server::eventDPA(ClientSession *clientSession, const anna::DataBlock & response) throw(anna::RuntimeException) {
342   // Inform father entity:
343   a_parent->eventDPA(clientSession, response);
344 }
345
346
347 void Server::availabilityLost() throw() {
348   a_available = false;
349   std::string socket = anna::functions::socketLiteralAsString(a_socket.first, a_socket.second);
350   LOGDEBUG(
351     std::string msg = "diameter::comm::Server { Socket: ";
352     msg += socket;
353     msg += " } has lost its availability";
354     anna::Logger::debug(msg, ANNA_FILE_LOCATION);
355   );
356   // OAM
357   OamModule &oamModule = OamModule::instantiate();
358   oamModule.activateAlarm(OamModule::Alarm::c_LostAvailabilityOverServerDefinedAs__s__, socket.c_str());
359   oamModule.count(OamModule::Counter::LostAvailabilityOverServer);
360   a_engine->availabilityLost(this);
361   a_parent->refreshAvailability();
362 }
363
364
365 void Server::availabilityRecovered() throw() {
366   a_available = true;
367   std::string socket = anna::functions::socketLiteralAsString(a_socket.first, a_socket.second);
368   LOGDEBUG(
369     std::string msg = "diameter::comm::Server { Socket: ";
370     msg += socket;
371     msg += " } has recovered its availability";
372     anna::Logger::debug(msg, ANNA_FILE_LOCATION);
373   );
374   // OAM
375   OamModule &oamModule = OamModule::instantiate();
376   oamModule.cancelAlarm(OamModule::Alarm::c_LostAvailabilityOverServerDefinedAs__s__, socket.c_str());
377   oamModule.count(OamModule::Counter::RecoveredAvailabilityOverServer);
378   a_engine->availabilityRecovered(this);
379   a_parent->refreshAvailability();
380 }
381
382
383
384 bool Server::refreshAvailability() throw() {
385   // Here available
386   if(a_available) {  // check not-bound state for all client-sessions:
387     bool isolate = true;
388
389     for(std::vector<ClientSession*>::const_iterator it = begin(); it != end(); it++)
390
391       //if ((*it)->getState() == ClientSession::State::Bound) { isolate = false; break; }
392       if((*it)->getState() != ClientSession::State::Closed) { isolate = false; break; }
393
394     if(isolate) {
395       availabilityLost();
396       return true;
397     }
398
399     return false;
400   }
401
402   // Here not available
403   for(std::vector<ClientSession*>::const_iterator it = begin(); it != end(); it++)
404     if((*it)->getState() == ClientSession::State::Bound) {
405       availabilityRecovered();
406       return true;
407     }
408
409   return false;
410 }
411
412
413 //------------------------------------------------------------------------------
414 //---------------------------------------- Server::updateIncomingActivityTime()
415 //------------------------------------------------------------------------------
416 void Server::updateIncomingActivityTime() throw() {
417   a_lastIncomingActivityTime = anna::functions::millisecond();
418   LOGDEBUG
419   (
420     std::string msg = "Updated INCOMING activity on server (milliseconds unix): ";
421     msg += anna::functions::asString(a_lastIncomingActivityTime.getValue());
422     anna::Logger::debug(msg, ANNA_FILE_LOCATION);
423   );
424   a_parent->updateIncomingActivityTime();
425 }
426
427
428 //------------------------------------------------------------------------------
429 //---------------------------------------- Server::updateOutgoingActivityTime()
430 //------------------------------------------------------------------------------
431 void Server::updateOutgoingActivityTime(void) throw() {
432   a_lastOutgoingActivityTime = anna::functions::millisecond();
433   LOGDEBUG
434   (
435     std::string msg = "Updated OUTGOING activity on server (milliseconds unix): "; msg += anna::functions::asString(a_lastOutgoingActivityTime.getValue());
436     anna::Logger::debug(msg, ANNA_FILE_LOCATION);
437   );
438   a_parent->updateOutgoingActivityTime();
439 }