Fix local server for multiple applications
[anna.git] / source / diameter.comm / Entity.cpp
1 // ANNA - Anna is Not Nothingness Anymore                                                         //
2 //                                                                                                //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo                         //
4 //                                                                                                //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite                           //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
7
8
9 #include <anna/diameter.comm/Server.hpp>
10 #include <anna/diameter.comm/Entity.hpp>
11 #include <anna/diameter.comm/Engine.hpp>
12 #include <anna/diameter.comm/OamModule.hpp>
13 #include <anna/diameter/codec/functions.hpp>
14 #include <anna/diameter/helpers/base/defines.hpp>
15 #include <anna/core/functions.hpp>
16 #include <anna/time/functions.hpp>
17 #include <anna/diameter/helpers/base/functions.hpp>
18 #include <anna/diameter/helpers/dcca/functions.hpp>
19
20 #include <anna/core/tracing/Logger.hpp>
21 #include <anna/core/functions.hpp>
22 #include <anna/core/tracing/TraceMethod.hpp>
23
24
25 using namespace anna;
26 using namespace anna::diameter;
27 using namespace anna::diameter::comm;
28
29
30
31 void Entity::initialize() {
32   a_engine = NULL;
33   a_servers.clear(); // importante (el recycler creo que no lo tocaba)
34   a_available = false;
35   a_deprecated = false;
36   a_socketListLiteral = "";
37   a_primarySocketLiteral = "";
38   a_secondarySocketLiteral = "";
39   a_description = "";
40   a_category = 0;
41   a_lastUsedResource = NULL;
42   a_balance = false;
43   a_sessionBasedModelsType = SessionBasedModelsType::SessionIdLowPart;
44 }
45
46
47 void Entity::assertReady() noexcept(false) {
48   if(a_servers.size() != a_maxServers) {
49     std::string msg(asString());
50     msg += " | Non-configured entity: you must add the remaining servers before any operation (bind, send, etc.)";
51     throw anna::RuntimeException(msg, ANNA_FILE_LOCATION);
52   }
53 }
54
55
56 void Entity::addServer(const socket_t & serverId)
57 noexcept(false) {
58   if(a_servers.size() == a_maxServers) {
59     LOGDEBUG
60     (
61       std::string msg = "Maximum number of servers reached for this entity (";
62       msg += anna::functions::asString(a_maxServers);
63       msg += "). Please use clear()/close() primitives";
64       anna::Logger::debug(msg, ANNA_FILE_LOCATION);
65     );
66     return;
67   }
68
69   if(!a_engine)
70     throw anna::RuntimeException("Invalid engine reference (NULL)", ANNA_FILE_LOCATION);
71
72   Server *s = a_engine->createServer(this, serverId);
73   a_servers.push_back(s);
74   a_deliveryIterator = begin();
75 }
76
77
78 int Entity::readSocketId(const Message* message, int maxClientSessions) const {
79
80   if(a_sessionBasedModelsType == SessionBasedModelsType::RoundRobin) return -1;  // IEC also would return -1
81
82   try {
83     // Service-Context-Id:
84     anna::diameter::helpers::dcca::ChargingContext::_v chargingContext;
85     std::string scid = anna::diameter::helpers::dcca::functions::getServiceContextId(message->getBody(), chargingContext);
86
87     switch(chargingContext) {
88     case anna::diameter::helpers::dcca::ChargingContext::Data:
89     case anna::diameter::helpers::dcca::ChargingContext::Voice:
90     case anna::diameter::helpers::dcca::ChargingContext::Content: {
91       // Session-Id: '<DiameterIdentity>;<high 32 bits>;<low 32 bits>[;<optional value>="">]'
92       std::string sid = anna::diameter::helpers::base::functions::getSessionId(message->getBody());
93       std::string diameterIdentity, optional;
94       anna::U32 high, low;
95       anna::diameter::helpers::base::functions::decodeSessionId(sid, diameterIdentity, high, low /* context-teid */, optional);
96
97       if(a_sessionBasedModelsType == SessionBasedModelsType::SessionIdLowPart) return (low % maxClientSessions);
98
99       if(a_sessionBasedModelsType == SessionBasedModelsType::SessionIdHighPart) return (high % maxClientSessions);
100
101       if(a_sessionBasedModelsType == SessionBasedModelsType::SessionIdOptionalPart) return (atoi(optional.c_str()) % maxClientSessions);
102     }
103     case anna::diameter::helpers::dcca::ChargingContext::SMS:
104     case anna::diameter::helpers::dcca::ChargingContext::MMS:
105     case anna::diameter::helpers::dcca::ChargingContext::Unknown:
106     default:
107        return -1; // IEC model and Unknown traffic types
108     }
109   } catch(anna::RuntimeException &ex) {
110     LOGDEBUG(
111       std::string msg = ex.getText();
112       msg += " | Round-robin between sessions will be used to send";
113       anna::Logger::debug(msg, ANNA_FILE_LOCATION);
114     );
115   }
116
117   return -1;
118 }
119
120
121 bool Entity::send(const Message* message) noexcept(false) {
122   LOGMETHOD(anna::TraceMethod tttm("diameter::comm::Entity", "send", ANNA_FILE_LOCATION));
123   assertReady();
124   // Carried socket id (forwarding/proxy features):
125   std::string carriedSocketId = message->getRequestClientSessionKey();
126
127   if(carriedSocketId != "") {
128     // Send (it was a request, we forward the answer):
129     try {
130       ClientSession * fixedClientSession = a_engine->findClientSession(carriedSocketId); // exception if not found
131       fixedClientSession->send(message);
132       return true;
133     } catch(anna::RuntimeException &ex) {
134       std::string msg = "Cannot deliver answer through a fixed client session (";
135       msg += carriedSocketId;
136       msg += "). Perhaps it existed but not now. Ignore";
137       anna::Logger::error(msg, ANNA_FILE_LOCATION);
138       ex.trace();
139     }
140   }
141
142   ////////////////////////////////////////////////////////////////////////////////////////
143   // BALANCE vs STANDARD
144   // Balance algorythm remember last delivery resource used, balancing from start to end,
145   // understanding start as next resource to last used, and end as last used. Standard
146   // algorithm always starts at primary (first defined) server.
147   ////////////////////////////////////////////////////////////////////////////////////////
148
149   // Balance
150   if(a_balance) {
151     for(int k = 0; k < getMaxServers(); k++) {   // try round-robin only over one cycle,
152       // no matter where you are: don't repeat same server
153       if(a_deliveryIterator == end()) a_deliveryIterator = begin();
154
155       a_lastUsedResource = (*a_deliveryIterator);
156       a_deliveryIterator++;
157       // At 'readSocketId' documentation:
158       //      If server is configured as single session (max client sessions equal to 1), entity will ignore
159       //      this method because it won't affect the session selection.
160       int serverSessions = a_lastUsedResource->getMaxClientSessions();
161       int socketId = (serverSessions > 1) ? readSocketId(message, serverSessions) : -1; // optimization
162
163       if(a_lastUsedResource->send(message, socketId))  // exception only possible at findClientSession within server send procedure
164         return true;
165     }
166   } else {
167     // Standard (no balance) //   start at begining, try secondary, and so on until end.
168     std::vector<Server*>::iterator it = begin();
169
170     while(it != end()) {
171       a_lastUsedResource = (*it);
172       it++;
173       // At 'readSocketId' documentation:
174       //      If server is configured as single session (max client sessions equal to 1), entity will ignore
175       //      this method because it won't affect the session selection.
176       int serverSessions = a_lastUsedResource->getMaxClientSessions();
177       int socketId = (serverSessions > 1) ? readSocketId(message, serverSessions) : -1; // optimization
178
179       if(a_lastUsedResource->send(message, socketId))  // exception only possible at findClientSession within server send procedure
180         return true;
181     }
182   }
183
184   // END BALANCE AND TRY ALGORITHM or STANDARD ///////////////////////////////////////////
185   ////////////////////////////////////////////////////////////////////////////////////////
186   // Here, sent has failed:
187   // OAM
188   OamModule &oamModule = OamModule::instantiate();
189
190   if(a_maxServers != 2) {
191     oamModule.activateAlarm(OamModule::Alarm::UnableToDeliverDiameterMessageToEntityDefinedAs__s__, a_socketListLiteral.c_str());
192     oamModule.count(OamModule::Counter::UnableToDeliverOverEntity);
193   } else {
194     OamModule &oamModule = OamModule::instantiate();
195     oamModule.activateAlarm(OamModule::Alarm::UnableToDeliverDiameterMessageToEntityDefinedAsPrimary__s__AndSecondary__s__,
196                             a_primarySocketLiteral.c_str(),
197                             a_secondarySocketLiteral.c_str());
198     oamModule.count(OamModule::Counter::UnableToDeliverOverEntity);
199   }
200
201   return false;
202 }
203
204 bool Entity::broadcast(const Message* message) noexcept(false) {
205   LOGMETHOD(anna::TraceMethod tttm("diameter::comm::Entity", "broadcast", ANNA_FILE_LOCATION));
206   assertReady();
207   bool allok = true;
208   bool ok;
209
210   for(std::vector<Server*>::iterator it = begin(); it != end(); it++) {
211     try {
212       ok = (*it)->broadcast(message);
213
214       if(!ok) allok = false;
215     } catch(anna::RuntimeException &ex) {
216       ex.trace();
217       allok = false;
218     }
219   }
220
221   return allok;
222 }
223
224 bool Entity::bind() noexcept(false) {
225   LOGMETHOD(anna::TraceMethod tttm("diameter::comm::Entity", "bind", ANNA_FILE_LOCATION));
226   assertReady();
227   bool result = true; // all OK return
228
229   for(std::vector<Server*>::iterator it = begin(); it != end(); it++) {
230     try {
231       (*it)->bind();
232     } catch(anna::RuntimeException &ex) {
233       ex.trace();
234       result = false;
235     }
236   }
237
238   return result;
239 }
240
241 void Entity::raiseAutoRecovery(bool autoRecovery) noexcept(false) {
242   LOGMETHOD(anna::TraceMethod tttm("diameter::comm::Entity", "raiseAutoRecovery", ANNA_FILE_LOCATION));
243   assertReady();
244
245   for(std::vector<Server*>::iterator it = begin(); it != end(); it++)
246     (*it)->raiseAutoRecovery(autoRecovery);
247 }
248
249 void Entity::setClassCodeTimeout(const ClassCode::_v v, const anna::Millisecond & millisecond) {
250   LOGMETHOD(anna::TraceMethod tttm("diameter::comm::Entity", "setClassCodeTimeout", ANNA_FILE_LOCATION));
251   assertReady();
252
253   for(std::vector<Server*>::iterator it = begin(); it != end(); it++) {
254     try {
255       (*it)->setClassCodeTimeout(v, millisecond);
256     } catch(anna::RuntimeException &ex) {
257       ex.trace();
258     }
259   }
260 }
261
262
263 // Private close/destroy method
264 void Entity::close(bool destroy) noexcept(false) {
265   LOGMETHOD(anna::TraceMethod tttm("diameter::comm::Entity", "close", ANNA_FILE_LOCATION));
266
267   if(!a_engine)
268     throw anna::RuntimeException("Invalid engine reference (NULL)", ANNA_FILE_LOCATION);
269
270   assertReady();
271
272   for(std::vector<Server*>::iterator it = begin(); it != end(); it++)
273     a_engine->closeServer(*it, destroy);
274 }
275
276 const char* Entity::asText(const SessionBasedModelsType::_v sbmt)
277 {
278   static const char* text [] = { "RoundRobin", "SessionIdOptionalPart", "SessionIdHighPart", "SessionIdLowPart" };
279   return text [sbmt];
280 }
281
282 socket_v Entity::getAddressPortList() const {
283   socket_v result;
284
285   for(std::vector<Server*>::const_iterator it = begin(); it != end(); it++) {
286     socket_t address((*it)->getAddress(), (*it)->getPort());
287     result.push_back(address);
288   }
289
290   return result;
291 }
292
293 int Entity::getOTARequests() const {
294   int result = 0;
295
296   for(std::vector<Server*>::const_iterator it = begin(); it != end(); it++)
297     result += (*it)->getOTARequests();
298
299   return result;
300 }
301
302 void Entity::childIdle() const {
303   // Check father engine idleness:
304   if(idle()) a_engine->eraseDeprecatedIdleEntities();
305 }
306
307
308 void Entity::hide() {
309   for(std::vector<Server*>::iterator it = begin(); it != end(); it++)
310     (*it)->hide();
311 }
312
313 void Entity::show() {
314   for(std::vector<Server*>::iterator it = begin(); it != end(); it++)
315     (*it)->show();
316 }
317
318 bool Entity::hidden() const {
319   for(std::vector<Server*>::const_iterator it = begin(); it != end(); it++)
320     if((*it)->shown()) return false;
321
322   return true;
323 }
324 bool Entity::shown() const {
325   for(std::vector<Server*>::const_iterator it = begin(); it != end(); it++)
326     if((*it)->hidden()) return false;
327
328   return true;
329 }
330
331 void Entity::eventPeerShutdown(const ClientSession* clientSession) {
332   LOGWARNING(
333     std::string msg(clientSession->asString());
334     msg += " | eventPeerShutdown";
335     anna::Logger::warning(msg, ANNA_FILE_LOCATION);
336   );
337 }
338
339 void Entity::eventRequestRetransmission(const ClientSession* clientSession, Message *request) {
340   LOGWARNING(
341     std::string msg(clientSession->asString());
342
343     HopByHop hopByHop = codec::functions::getHopByHop(request->getBody()); // context identification
344     int retries = request->getRetries();
345
346     msg += anna::functions::asString(" | eventRequestRetransmission: request with application HopByHop: %u; remaining %d retries", hopByHop, retries);
347     anna::Logger::warning(msg, ANNA_FILE_LOCATION);
348   );
349 }
350
351 std::string Entity::asString() const {
352   std::string result("diameter::comm::Entity { ");
353   std::string originRealm = a_engine->getOriginRealmName();
354   std::string originHost = a_engine->getOriginHostName();
355
356   result += "Parent Engine Origin-Realm: ";
357   result += (originRealm != "") ? originRealm:"[not configured]";
358   result += " | Parent Engine Origin-Host: ";
359   result += (originHost != "") ? originHost:"[not configured]";
360
361   result += " | Category: ";
362   result += anna::functions::asString(a_category);
363
364   if(a_description != "") {
365     result += " | Description: '";
366     result += a_description;
367   }
368
369   result += " | Available: ";
370   result += a_available ? "yes" : "no";
371   result += " | Deprecated: ";
372   result += a_deprecated ? "yes" : "no";
373   result += " | Max servers supported: ";
374   result += anna::functions::asString(a_maxServers);
375   result += " | Currently configured servers: ";
376   result += anna::functions::asString(a_servers.size());
377   result += anna::functions::asString(" | OTA requests: %d%s", getOTARequests(), idle() ? " (idle)" : "");
378   result += " | Last Incoming Activity Time: ";
379   result += a_lastIncomingActivityTime.asString();
380   result += " | Last Outgoing Activity Time: ";
381   result += a_lastOutgoingActivityTime.asString();
382   result += " | Hidden: ";
383   result += (hidden() ? "yes" : "no");
384   result += " | SessionBasedModelsType: ";
385   result += asText(a_sessionBasedModelsType);
386   result += "\n";
387
388   for(std::vector<Server*>::const_iterator it = begin(); it != end(); it++) {
389     result += "\n";
390     result += (*it)->asString();
391   }
392
393   return result;
394 }
395
396 anna::xml::Node* Entity::asXML(anna::xml::Node* parent) const {
397   anna::xml::Node* result = parent->createChild("diameter.Entity");
398   std::string originRealm = a_engine->getOriginRealmName();
399   std::string originHost = a_engine->getOriginHostName();
400
401   if(originRealm != "") result->createAttribute("ParentEngineOriginRealm", originRealm);
402   if(originHost != "") result->createAttribute("ParentEngineOriginHost", originHost);
403
404   result->createAttribute("Category", anna::functions::asString(a_category));
405
406   if(a_description != "") result->createAttribute("Description", a_description);
407
408   result->createAttribute("Available", a_available ? "yes" : "no");
409   result->createAttribute("Deprecated", a_deprecated ? "yes" : "no");
410   result->createAttribute("MaxServersSupported", anna::functions::asString(a_maxServers));
411   result->createAttribute("CurrentlyConfiguredServers", anna::functions::asString(a_servers.size()));
412   result->createAttribute("OTArequests", anna::functions::asString("%d%s", getOTARequests(), idle() ? " (idle)" : ""));
413   result->createAttribute("LastIncomingActivityTime", a_lastIncomingActivityTime.asString());
414   result->createAttribute("LastOutgoingActivityTime", a_lastOutgoingActivityTime.asString());
415   result->createAttribute("Hidden", hidden() ? "yes" : "no");
416   result->createAttribute("SessionBasedModelsType:", asText(a_sessionBasedModelsType));
417   anna::xml::Node* servers = result->createChild("Entity.Servers");
418
419   for(std::vector<Server*>::const_iterator it = begin(); it != end(); it++)
420     (*it)->asXML(servers);
421
422   return result;
423 }
424
425 void Entity::availabilityLost() {
426   a_available = false;
427   LOGDEBUG(
428     std::string msg = "diameter::comm::Entity { Description: ";
429     msg += getDescription();
430     msg += " } has lost its availability";
431     anna::Logger::debug(msg, ANNA_FILE_LOCATION);
432   );
433   OamModule &oamModule = OamModule::instantiate();
434
435   if(a_maxServers != 2) {
436     oamModule.activateAlarm(OamModule::Alarm::c_LostAvailabilityOverEntityDefinedAs__s__, a_socketListLiteral.c_str());
437     oamModule.count(OamModule::Counter::LostAvailabilityOverEntity);
438   } else {
439     oamModule.activateAlarm(OamModule::Alarm::c_LostAvailabilityOverEntityDefinedAsPrimary__s__AndSecondary__s__,
440                             a_primarySocketLiteral.c_str(),
441                             a_secondarySocketLiteral.c_str());
442     oamModule.count(OamModule::Counter::LostAvailabilityOverEntity);
443   }
444
445   a_engine->refreshAvailabilityForEntities();
446 }
447
448
449 void Entity::availabilityRecovered() {
450   a_available = true;
451   LOGDEBUG(
452     std::string msg = "diameter::comm::Entity { Description: ";
453     msg += getDescription();
454     msg += " } has recovered its availability";
455     anna::Logger::debug(msg, ANNA_FILE_LOCATION);
456   );
457   OamModule &oamModule = OamModule::instantiate();
458
459   if(a_maxServers != 2) {
460     oamModule.cancelAlarm(OamModule::Alarm::c_LostAvailabilityOverEntityDefinedAs__s__, a_socketListLiteral.c_str());
461     oamModule.count(OamModule::Counter::RecoveredAvailabilityOverEntity);
462   } else {
463     oamModule.cancelAlarm(OamModule::Alarm::c_LostAvailabilityOverEntityDefinedAsPrimary__s__AndSecondary__s__,
464                           a_primarySocketLiteral.c_str(),
465                           a_secondarySocketLiteral.c_str());
466     oamModule.count(OamModule::Counter::RecoveredAvailabilityOverEntity);
467   }
468
469   a_engine->refreshAvailabilityForEntities();
470 }
471
472
473 bool Entity::refreshAvailability() {
474   // Here available
475   if(a_available) {  // check not-bound state for all servers:
476     bool isolate = true;
477
478     for(std::vector<Server*>::const_iterator it = begin(); it != end(); it++)
479       if((*it)->isAvailable()) { isolate = false; break; }
480
481     if(isolate) {
482       availabilityLost();
483       return true;
484     }
485
486     return false;
487   }
488
489   // Here not available
490   for(std::vector<Server*>::const_iterator it = begin(); it != end(); it++)
491     if((*it)->isAvailable()) {
492       availabilityRecovered();
493       return true;
494     }
495
496   return false;
497 }
498
499 //------------------------------------------------------------------------------
500 //----------------------------------------- Entity::updateIncomingActivityTime()
501 //------------------------------------------------------------------------------
502 void Entity::updateIncomingActivityTime() {
503   a_lastIncomingActivityTime = anna::functions::millisecond();
504   LOGDEBUG
505   (
506     std::string msg = "Updated INCOMING activity on entity (milliseconds unix): ";
507     msg += anna::functions::asString(a_lastIncomingActivityTime.getValue());
508     anna::Logger::debug(msg, ANNA_FILE_LOCATION);
509   );
510 }
511
512
513 //------------------------------------------------------------------------------
514 //----------------------------------------- Entity::updateOutgoingActivityTime()
515 //------------------------------------------------------------------------------
516 void Entity::updateOutgoingActivityTime(void) {
517   a_lastOutgoingActivityTime = anna::functions::millisecond();
518   LOGDEBUG
519   (
520     std::string msg = "Updated OUTGOING activity on entity (milliseconds unix): ";
521     msg += anna::functions::asString(a_lastOutgoingActivityTime.getValue());
522     anna::Logger::debug(msg, ANNA_FILE_LOCATION);
523   );
524 }
525