Base protocol codec for comm::Engine. Supported retransmissions
[anna.git] / source / diameter.comm / Entity.cpp
1 // ANNA - Anna is Not Nothingness Anymore                                                         //
2 //                                                                                                //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo                         //
4 //                                                                                                //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite                           //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
7
8
9 #include <anna/diameter.comm/Server.hpp>
10 #include <anna/diameter.comm/Entity.hpp>
11 #include <anna/diameter.comm/Engine.hpp>
12 #include <anna/diameter.comm/OamModule.hpp>
13 #include <anna/diameter/codec/functions.hpp>
14 #include <anna/diameter/helpers/base/defines.hpp>
15 #include <anna/core/functions.hpp>
16 #include <anna/time/functions.hpp>
17 #include <anna/diameter/helpers/base/functions.hpp>
18 #include <anna/diameter/helpers/dcca/functions.hpp>
19
20 #include <anna/core/tracing/Logger.hpp>
21 #include <anna/core/functions.hpp>
22 #include <anna/core/tracing/TraceMethod.hpp>
23
24
25 using namespace anna;
26 using namespace anna::diameter;
27 using namespace anna::diameter::comm;
28
29
30
31 void Entity::initialize() throw() {
32   a_engine = NULL;
33   a_servers.clear(); // importante (el recycler creo que no lo tocaba)
34   a_available = false;
35   a_deprecated = false;
36   a_socketListLiteral = "";
37   a_primarySocketLiteral = "";
38   a_secondarySocketLiteral = "";
39   a_description = "";
40   a_category = 0;
41   a_lastUsedResource = NULL;
42 }
43
44
45 void Entity::assertReady() throw(anna::RuntimeException) {
46   if(a_servers.size() != a_maxServers) {
47     std::string msg(asString());
48     msg += " | Non-configured entity: you must add the remaining servers before any operation (bind, send, etc.)";
49     throw anna::RuntimeException(msg, ANNA_FILE_LOCATION);
50   }
51 }
52
53
54 void Entity::addServer(const socket_t & serverId)
55 throw(anna::RuntimeException) {
56   if(a_servers.size() == a_maxServers) {
57     LOGDEBUG
58     (
59       std::string msg = "Maximum number of servers reached for this entity (";
60       msg += anna::functions::asString(a_maxServers);
61       msg += "). Please use clear()/close() primitives";
62       anna::Logger::debug(msg, ANNA_FILE_LOCATION);
63     );
64     return;
65   }
66
67   if(!a_engine)
68     throw anna::RuntimeException("Invalid engine reference (NULL)", ANNA_FILE_LOCATION);
69
70   Server *s = a_engine->createServer(this, serverId);
71   a_servers.push_back(s);
72   a_deliveryIterator = begin();
73 }
74
75
76 int Entity::readSocketId(const Message* message, int maxClientSessions) const throw() {
77   try {
78     // Service-Context-Id:
79     anna::diameter::helpers::dcca::ChargingContext::_v chargingContext;
80     std::string scid = anna::diameter::helpers::dcca::functions::getServiceContextId(message->getBody(), chargingContext);
81
82     switch(chargingContext) {
83     case anna::diameter::helpers::dcca::ChargingContext::Data:
84     case anna::diameter::helpers::dcca::ChargingContext::Voice:
85     case anna::diameter::helpers::dcca::ChargingContext::Content: {
86       // Session-Id: '<DiameterIdentity>;<high 32 bits>;<low 32 bits>[;<optional value>="">]'
87       std::string sid = anna::diameter::helpers::base::functions::getSessionId(message->getBody());
88       std::string diameterIdentity, optional;
89       anna::U32 high, low;
90       anna::diameter::helpers::base::functions::decodeSessionId(sid, diameterIdentity, high, low /* context-teid */, optional);
91       return (low % maxClientSessions);
92     }
93     //case anna::diameter::helpers::dcca::ChargingContext::SMS:
94     //case anna::diameter::helpers::dcca::ChargingContext::MMS:
95     //default:
96     //   return -1; // IEC model and Unknown traffic types
97     }
98   } catch(anna::RuntimeException &ex) {
99     LOGDEBUG(
100       std::string msg = ex.getText();
101       msg += " | Round-robin between sessions will be used to send";
102       anna::Logger::debug(msg, ANNA_FILE_LOCATION);
103     );
104   }
105
106   return -1;
107 }
108
109
110 bool Entity::send(const Message* message, bool balance) throw(anna::RuntimeException) {
111   LOGMETHOD(anna::TraceMethod tttm("diameter::comm::Entity", "send", ANNA_FILE_LOCATION));
112   assertReady();
113   // Carried socket id (forwarding/proxy features):
114   std::string carriedSocketId = message->getRequestClientSessionKey();
115
116   if(carriedSocketId != "") {
117     // Send (it was a request, we forward the answer):
118     try {
119       ClientSession * fixedClientSession = a_engine->findClientSession(carriedSocketId); // exception if not found
120       fixedClientSession->send(message);
121       return true;
122     } catch(anna::RuntimeException &ex) {
123       std::string msg = "Cannot deliver answer through a fixed client session (";
124       msg += carriedSocketId;
125       msg += "). Perhaps it existed but not now. Ignore";
126       anna::Logger::error(msg, ANNA_FILE_LOCATION);
127       ex.trace();
128     }
129   }
130
131   ////////////////////////////////////////////////////////////////////////////////////////
132   // BALANCE vs STANDARD
133   // Balance algorythm remember last delivery resource used, balancing from start to end,
134   // understanding start as next resource to last used, and end as last used. Standard
135   // algorithm always starts at primary (first defined) server.
136   ////////////////////////////////////////////////////////////////////////////////////////
137
138   // Balance
139   if(balance) {
140     for(int k = 0; k < getMaxServers(); k++) {   // try round-robin only over one cycle,
141       // no matter where you are: don't repeat same server
142       if(a_deliveryIterator == end()) a_deliveryIterator = begin();
143
144       a_lastUsedResource = (*a_deliveryIterator);
145       a_deliveryIterator++;
146       // At 'readSocketId' documentation:
147       //      If server is configured as single session (max client sessions equal to 1), entity will ignore
148       //      this method because it won't affect the session selection.
149       int serverSessions = a_lastUsedResource->getMaxClientSessions();
150       int socketId = (serverSessions > 1) ? readSocketId(message, serverSessions) : -1; // optimization
151
152       if(a_lastUsedResource->send(message, socketId))  // exception only possible at findClientSession within server send procedure
153         return true;
154     }
155   } else {
156     // Standard (no balance) //   start at begining, try secondary, and so on until end.
157     std::vector<Server*>::iterator it = begin();
158
159     while(it != end()) {
160       a_lastUsedResource = (*it);
161       it++;
162       // At 'readSocketId' documentation:
163       //      If server is configured as single session (max client sessions equal to 1), entity will ignore
164       //      this method because it won't affect the session selection.
165       int serverSessions = a_lastUsedResource->getMaxClientSessions();
166       int socketId = (serverSessions > 1) ? readSocketId(message, serverSessions) : -1; // optimization
167
168       if(a_lastUsedResource->send(message, socketId))  // exception only possible at findClientSession within server send procedure
169         return true;
170     }
171   }
172
173   // END BALANCE AND TRY ALGORITHM or STANDARD ///////////////////////////////////////////
174   ////////////////////////////////////////////////////////////////////////////////////////
175   // Here, sent has failed:
176   // OAM
177   OamModule &oamModule = OamModule::instantiate();
178
179   if(a_maxServers != 2) {
180     oamModule.activateAlarm(OamModule::Alarm::UnableToDeliverDiameterMessageToEntityDefinedAs__s__, a_socketListLiteral.c_str());
181     oamModule.count(OamModule::Counter::UnableToDeliverOverEntity);
182   } else {
183     OamModule &oamModule = OamModule::instantiate();
184     oamModule.activateAlarm(OamModule::Alarm::UnableToDeliverDiameterMessageToEntityDefinedAsPrimary__s__AndSecondary__s__,
185                             a_primarySocketLiteral.c_str(),
186                             a_secondarySocketLiteral.c_str());
187     oamModule.count(OamModule::Counter::UnableToDeliverOverEntity);
188   }
189
190   return false;
191 }
192
193 bool Entity::broadcast(const Message* message) throw(anna::RuntimeException) {
194   LOGMETHOD(anna::TraceMethod tttm("diameter::comm::Entity", "broadcast", ANNA_FILE_LOCATION));
195   assertReady();
196   bool allok = true;
197   bool ok;
198
199   for(std::vector<Server*>::iterator it = begin(); it != end(); it++) {
200     try {
201       ok = (*it)->broadcast(message);
202
203       if(!ok) allok = false;
204     } catch(anna::RuntimeException &ex) {
205       ex.trace();
206       allok = false;
207     }
208   }
209
210   return allok;
211 }
212
213 bool Entity::bind() throw(anna::RuntimeException) {
214   LOGMETHOD(anna::TraceMethod tttm("diameter::comm::Entity", "bind", ANNA_FILE_LOCATION));
215   assertReady();
216   bool result = true; // all OK return
217
218   for(std::vector<Server*>::iterator it = begin(); it != end(); it++) {
219     try {
220       (*it)->bind();
221     } catch(anna::RuntimeException &ex) {
222       ex.trace();
223       result = false;
224     }
225   }
226
227   return result;
228 }
229
230 void Entity::raiseAutoRecovery(bool autoRecovery) throw(anna::RuntimeException) {
231   LOGMETHOD(anna::TraceMethod tttm("diameter::comm::Entity", "raiseAutoRecovery", ANNA_FILE_LOCATION));
232   assertReady();
233
234   for(std::vector<Server*>::iterator it = begin(); it != end(); it++)
235     (*it)->raiseAutoRecovery(autoRecovery);
236 }
237
238 void Entity::setClassCodeTimeout(const ClassCode::_v v, const anna::Millisecond & millisecond) throw() {
239   LOGMETHOD(anna::TraceMethod tttm("diameter::comm::Entity", "setClassCodeTimeout", ANNA_FILE_LOCATION));
240   assertReady();
241
242   for(std::vector<Server*>::iterator it = begin(); it != end(); it++) {
243     try {
244       (*it)->setClassCodeTimeout(v, millisecond);
245     } catch(anna::RuntimeException &ex) {
246       ex.trace();
247     }
248   }
249 }
250
251
252 // Private close/destroy method
253 void Entity::close(bool destroy) throw(anna::RuntimeException) {
254   LOGMETHOD(anna::TraceMethod tttm("diameter::comm::Entity", "close", ANNA_FILE_LOCATION));
255
256   if(!a_engine)
257     throw anna::RuntimeException("Invalid engine reference (NULL)", ANNA_FILE_LOCATION);
258
259   assertReady();
260
261   for(std::vector<Server*>::iterator it = begin(); it != end(); it++)
262     a_engine->closeServer(*it, destroy);
263 }
264
265
266 socket_v Entity::getAddressPortList() const throw() {
267   socket_v result;
268
269   for(std::vector<Server*>::const_iterator it = begin(); it != end(); it++) {
270     socket_t address((*it)->getAddress(), (*it)->getPort());
271     result.push_back(address);
272   }
273
274   return result;
275 }
276
277 int Entity::getOTARequests() const throw() {
278   int result = 0;
279
280   for(std::vector<Server*>::const_iterator it = begin(); it != end(); it++)
281     result += (*it)->getOTARequests();
282
283   return result;
284 }
285
286 void Entity::childIdle() const throw() {
287   // Check father engine idleness:
288   if(idle()) a_engine->eraseDeprecatedIdleEntities();
289 }
290
291
292 void Entity::hide() throw() {
293   for(std::vector<Server*>::iterator it = begin(); it != end(); it++)
294     (*it)->hide();
295 }
296
297 void Entity::show() throw() {
298   for(std::vector<Server*>::iterator it = begin(); it != end(); it++)
299     (*it)->show();
300 }
301
302 bool Entity::hidden() const throw() {
303   for(std::vector<Server*>::const_iterator it = begin(); it != end(); it++)
304     if((*it)->shown()) return false;
305
306   return true;
307 }
308 bool Entity::shown() const throw() {
309   for(std::vector<Server*>::const_iterator it = begin(); it != end(); it++)
310     if((*it)->hidden()) return false;
311
312   return true;
313 }
314
315 void Entity::eventPeerShutdown(const ClientSession* clientSession) throw() {
316   LOGWARNING(
317     std::string msg(clientSession->asString());
318     msg += " | eventPeerShutdown";
319     anna::Logger::warning(msg, ANNA_FILE_LOCATION);
320   );
321 }
322
323 void Entity::eventRequestRetransmission(const ClientSession* clientSession, Message *request) throw() {
324   LOGWARNING(
325     std::string msg(clientSession->asString());
326     msg += " | eventRequestRetransmission";
327     anna::Logger::warning(msg, ANNA_FILE_LOCATION);
328   );
329 }
330
331 std::string Entity::asString() const throw() {
332   std::string result("diameter::comm::Entity { ");
333   std::string realm = a_engine->getRealm();
334
335   if(realm != "") {
336     result += "Parent Engine (realm): ";
337     result += realm;
338   }
339
340   result += " | Category: ";
341   result += anna::functions::asString(a_category);
342
343   if(a_description != "") {
344     result += " | Description: '";
345     result += a_description;
346   }
347
348   result += " | Available: ";
349   result += a_available ? "yes" : "no";
350   result += " | Deprecated: ";
351   result += a_deprecated ? "yes" : "no";
352   result += " | Max servers supported: ";
353   result += anna::functions::asString(a_maxServers);
354   result += " | Currently configured servers: ";
355   result += anna::functions::asString(a_servers.size());
356   result += anna::functions::asString(" | OTA requests: %d%s", getOTARequests(), idle() ? " (idle)" : "");
357   result += " | Last Incoming Activity Time: ";
358   result += a_lastIncomingActivityTime.asString();
359   result += " | Last Outgoing Activity Time: ";
360   result += a_lastOutgoingActivityTime.asString();
361   result += " | Hidden: ";
362   result += (hidden() ? "yes" : "no");
363   result += "\n";
364
365   for(std::vector<Server*>::const_iterator it = begin(); it != end(); it++) {
366     result += "\n";
367     result += (*it)->asString();
368   }
369
370   return result;
371 }
372
373 anna::xml::Node* Entity::asXML(anna::xml::Node* parent) const throw() {
374   anna::xml::Node* result = parent->createChild("diameter.Entity");
375   std::string realm = a_engine->getRealm();
376
377   if(realm != "") result->createAttribute("ParentEngineRealm", realm);
378
379   result->createAttribute("Category", anna::functions::asString(a_category));
380
381   if(a_description != "") result->createAttribute("Description", a_description);
382
383   result->createAttribute("Available", a_available ? "yes" : "no");
384   result->createAttribute("Deprecated", a_deprecated ? "yes" : "no");
385   result->createAttribute("MaxServersSupported", anna::functions::asString(a_maxServers));
386   result->createAttribute("CurrentlyConfiguredServers", anna::functions::asString(a_servers.size()));
387   result->createAttribute("OTArequests", anna::functions::asString("%d%s", getOTARequests(), idle() ? " (idle)" : ""));
388   result->createAttribute("LastIncomingActivityTime", a_lastIncomingActivityTime.asString());
389   result->createAttribute("LastOutgoingActivityTime", a_lastOutgoingActivityTime.asString());
390   result->createAttribute("Hidden", hidden() ? "yes" : "no");
391   anna::xml::Node* servers = result->createChild("Entity.Servers");
392
393   for(std::vector<Server*>::const_iterator it = begin(); it != end(); it++)
394     (*it)->asXML(servers);
395
396   return result;
397 }
398
399 void Entity::availabilityLost() throw() {
400   a_available = false;
401   LOGDEBUG(
402     std::string msg = "diameter::comm::Entity { Description: ";
403     msg += getDescription();
404     msg += " } has lost its availability";
405     anna::Logger::debug(msg, ANNA_FILE_LOCATION);
406   );
407   OamModule &oamModule = OamModule::instantiate();
408
409   if(a_maxServers != 2) {
410     oamModule.activateAlarm(OamModule::Alarm::c_LostAvailabilityOverEntityDefinedAs__s__, a_socketListLiteral.c_str());
411     oamModule.count(OamModule::Counter::LostAvailabilityOverEntity);
412   } else {
413     oamModule.activateAlarm(OamModule::Alarm::c_LostAvailabilityOverEntityDefinedAsPrimary__s__AndSecondary__s__,
414                             a_primarySocketLiteral.c_str(),
415                             a_secondarySocketLiteral.c_str());
416     oamModule.count(OamModule::Counter::LostAvailabilityOverEntity);
417   }
418
419   a_engine->availabilityLost(this);
420   a_engine->refreshAvailabilityForEntities();
421 }
422
423
424 void Entity::availabilityRecovered() throw() {
425   a_available = true;
426   LOGDEBUG(
427     std::string msg = "diameter::comm::Entity { Description: ";
428     msg += getDescription();
429     msg += " } has recovered its availability";
430     anna::Logger::debug(msg, ANNA_FILE_LOCATION);
431   );
432   OamModule &oamModule = OamModule::instantiate();
433
434   if(a_maxServers != 2) {
435     oamModule.cancelAlarm(OamModule::Alarm::c_LostAvailabilityOverEntityDefinedAs__s__, a_socketListLiteral.c_str());
436     oamModule.count(OamModule::Counter::RecoveredAvailabilityOverEntity);
437   } else {
438     oamModule.cancelAlarm(OamModule::Alarm::c_LostAvailabilityOverEntityDefinedAsPrimary__s__AndSecondary__s__,
439                           a_primarySocketLiteral.c_str(),
440                           a_secondarySocketLiteral.c_str());
441     oamModule.count(OamModule::Counter::RecoveredAvailabilityOverEntity);
442   }
443
444   a_engine->availabilityRecovered(this);
445   a_engine->refreshAvailabilityForEntities();
446 }
447
448
449 bool Entity::refreshAvailability() throw() {
450   // Here available
451   if(a_available) {  // check not-bound state for all servers:
452     bool isolate = true;
453
454     for(std::vector<Server*>::const_iterator it = begin(); it != end(); it++)
455       if((*it)->isAvailable()) { isolate = false; break; }
456
457     if(isolate) {
458       availabilityLost();
459       return true;
460     }
461
462     return false;
463   }
464
465   // Here not available
466   for(std::vector<Server*>::const_iterator it = begin(); it != end(); it++)
467     if((*it)->isAvailable()) {
468       availabilityRecovered();
469       return true;
470     }
471
472   return false;
473 }
474
475 //------------------------------------------------------------------------------
476 //----------------------------------------- Entity::updateIncomingActivityTime()
477 //------------------------------------------------------------------------------
478 void Entity::updateIncomingActivityTime() throw() {
479   a_lastIncomingActivityTime = anna::functions::millisecond();
480   LOGDEBUG
481   (
482     std::string msg = "Updated INCOMING activity on entity (milliseconds unix): ";
483     msg += anna::functions::asString(a_lastIncomingActivityTime.getValue());
484     anna::Logger::debug(msg, ANNA_FILE_LOCATION);
485   );
486 }
487
488
489 //------------------------------------------------------------------------------
490 //----------------------------------------- Entity::updateOutgoingActivityTime()
491 //------------------------------------------------------------------------------
492 void Entity::updateOutgoingActivityTime(void) throw() {
493   a_lastOutgoingActivityTime = anna::functions::millisecond();
494   LOGDEBUG
495   (
496     std::string msg = "Updated OUTGOING activity on entity (milliseconds unix): ";
497     msg += anna::functions::asString(a_lastOutgoingActivityTime.getValue());
498     anna::Logger::debug(msg, ANNA_FILE_LOCATION);
499   );
500 }
501