test
[anna.git] / source / diameter.comm / Session.cpp
1 // ANNA - Anna is Not Nothingness Anymore                                                         //
2 //                                                                                                //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo                         //
4 //                                                                                                //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite                           //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
7
8
9 #include <anna/core/functions.hpp>
10 #include <anna/diameter/defines.hpp>
11 #include <anna/diameter/functions.hpp>
12 #include <anna/diameter/helpers/helpers.hpp>
13 #include <anna/diameter/codec/functions.hpp>
14 #include <anna/diameter/helpers/base/functions.hpp>
15 #include <anna/time/functions.hpp>
16
17 // Local
18 #include <anna/diameter.comm/Session.hpp>
19 #include <anna/diameter.comm/Engine.hpp>
20 #include <anna/diameter.comm/Entity.hpp>
21 #include <anna/diameter.comm/Server.hpp>
22 #include <anna/diameter.comm/Transport.hpp>
23 #include <anna/diameter.comm/Response.hpp>
24 #include <anna/diameter.comm/Message.hpp>
25 #include <anna/diameter.comm/TimerManager.hpp>
26 #include <anna/diameter.comm/Timer.hpp>
27
28 #include <anna/comm/Network.hpp>
29 #include <anna/comm/ClientSocket.hpp>
30 #include <anna/core/functions.hpp>
31 #include <anna/core/DataBlock.hpp>
32 #include <anna/core/tracing/Logger.hpp>
33 #include <anna/core/tracing/TraceMethod.hpp>
34 #include <anna/xml/Node.hpp>
35 #include <anna/timex/Engine.hpp>
36 #include <anna/app/functions.hpp>
37
38 // Standard
39 #include <stdlib.h> // rand()
40 #include <time.h>
41
42
43
44 using namespace std;
45 using namespace anna::diameter;
46 using namespace anna::diameter::comm;
47
48 //static
49 const anna::Millisecond Session::DefaultTimeout(10000); // Application messages timeout
50 const int Session::DefaultPort(3868);
51
52
53 Session::Session(const char *className, const char *timerName) : anna::timex::Timer(timerName, (anna::Millisecond)0) /* not assigned */,
54   a_className(className),
55   a_timeController(NULL),
56   a_engine(NULL),
57   a_notifyOrphansOnExpiration(true),
58   a_actionTimer(NULL),
59   a_dpr(ClassCode::ApplicationMessage) { // realmente no es necesario, los Message son por defecto de aplicacion
60   initialize();
61 }
62
63 void Session::initialize() throw() {
64   a_state = State::Closed;
65   a_socketId = 0;
66   a_lastIncomingActivityTime = (anna::Millisecond)0;
67   a_lastOutgoingActivityTime = (anna::Millisecond)0;
68   a_onDisconnect = OnDisconnect::WaitPendings;
69
70   for(int i = ClassCode::Min; i < ClassCode::Max; i ++)
71     a_timeouts [i] = DefaultTimeout;
72 }
73
74 //Session::~Session() {;}
75
76
77 void Session::initializeSequences() throw() {
78   // Sequences
79   //
80   //   Hop-by-Hop Identifier
81   //      The Hop-by-Hop Identifier is an unsigned 32-bit integer field (in
82   //      network byte order) and aids in matching requests and replies.
83   //      The sender MUST ensure that the Hop-by-Hop identifier in a request
84   //      is unique on a given connection at any given time, and MAY attempt
85   //      to ensure that the number is unique across reboots.  The sender of
86   //      an Answer message MUST ensure that the Hop-by-Hop Identifier field
87   //      contains the same value that was found in the corresponding
88   //      request.  The Hop-by-Hop identifier is normally a monotonically
89   //      increasing number, whose start value was randomly generated.  An
90   //      answer message that is received with an unknown Hop-by-Hop
91   //      Identifier MUST be discarded.
92   //
93   //   End-to-End Identifier
94   //      The End-to-End Identifier is an unsigned 32-bit integer field (in
95   //      network byte order) and is used to detect duplicate messages.
96   //      Upon reboot implementations MAY set the high order 12 bits to
97   //      contain the low order 12 bits of current time, and the low order
98   //      20 bits to a random value.  Senders of request messages MUST
99   //      insert a unique identifier on each message.  The identifier MUST
100   //      remain locally unique for a period of at least 4 minutes, even
101   //      across reboots.  The originator of an Answer message MUST ensure
102   //      that the End-to-End Identifier field contains the same value that
103   //      was found in the corresponding request.  The End-to-End Identifier
104   //      MUST NOT be modified by Diameter agents of any kind.  The
105   //      combination of the Origin-Host (see Section 6.3) and this field is
106   //      used to detect duplicates.  Duplicate requests SHOULD cause the
107   //      same answer to be transmitted (modulo the hop-by-hop Identifier
108   //      field and any routing AVPs that may be present), and MUST NOT
109   //      affect any state that was set when the original request was
110   //      processed.  Duplicate answer messages that are to be locally
111   //      consumed (see Section 6.2) SHOULD be silently discarded.
112   srand(::time(NULL) + anna::functions::hash(anna::functions::asString("%s:%d|%d", getAddress().c_str(), getPort(), a_socketId).c_str()));
113   a_nextHopByHop = rand();
114   a_nextEndToEnd = ((::time(NULL) & 0xFFF) << 20) + (rand() & 0xFFFFF);
115 }
116
117 void Session::sendDPA()
118 throw(anna::RuntimeException) {
119   LOGMETHOD(anna::TraceMethod traceMethod("anna::diameter::comm::Session", "sendDPA", ANNA_FILE_LOCATION));
120   anna::DataBlock dpa(true);
121   a_engine->readDPA(dpa, a_dpr.getBody()); // Asume that DPA is valid ...
122
123   if(dpa.isEmpty()) {
124     LOGWARNING(anna::Logger::warning("This diameter agent defines an empty DPA message. Remote disconnection DPR will be ignored going to the Bound state", ANNA_FILE_LOCATION));
125     setState(State::Bound);
126     return;
127   }
128
129   Message msgDpa;
130   msgDpa.setBody(dpa);
131   send(&msgDpa);
132   LOGWARNING(anna::Logger::warning("DPA has been sent to the peer", ANNA_FILE_LOCATION));
133   // Temporizador de proteccion por si el servidor no cierra:
134   //    state            event              action           next state
135   //    ---------------------------------------------------------------
136   //    R-Open           R-Rcv-DPR          R-Snd-DPA        Closing
137   //    Closing          Timeout            Error            Closed
138   activateActionTimer(anna::diameter::comm::Timer::Type::SessionUnbind);
139 }
140
141 void Session::setState(State::_v state) throw() {
142   LOGDEBUG(
143
144   if(state != a_state) {
145   std::string msg("Session state change: ");
146     msg += asText(a_state);
147     msg += " -> ";
148     msg += asText(state);
149     anna::Logger::debug(msg, ANNA_FILE_LOCATION);
150   }
151   );
152   a_state = state;
153 }
154
155
156 void Session::activateActionTimer(const anna::diameter::comm::Timer::Type::_v type) throw() {
157   LOGMETHOD(anna::TraceMethod traceMethod("anna::diameter::comm::Session", "activateActionTimer", ANNA_FILE_LOCATION));
158   cancelTimer(); // Session timer
159
160   try {
161     if(a_actionTimer && a_actionTimer->isActive()) cancelActionTimer();  // no ocurrira
162
163     a_actionTimer = TimerManager::instantiate().createTimer(this, type);
164   } catch(anna::RuntimeException& ex) {
165     std::string msg = "CAPTURED EXCEPTION during action timer activation (activateActionTimer): ";
166     msg += ex.getText();
167     anna::Logger::error(msg, ANNA_FILE_LOCATION);
168   }
169 }
170
171
172 void Session::cancelActionTimer() throw() {
173   LOGMETHOD(anna::TraceMethod traceMethod("anna::diameter::comm::Session", "cancelActionTimer", ANNA_FILE_LOCATION));
174
175   if(a_actionTimer) {
176     if(a_actionTimer->isActive()) {
177       try {
178         TimerManager::instantiate().cancelTimer(a_actionTimer);
179       } catch(anna::RuntimeException& ex) {
180         ex.trace();
181       }
182     } else { // por aqui no deberia pasar ...
183       LOGDEBUG(anna::Logger::debug("Timer not activated!", ANNA_FILE_LOCATION));
184     }
185
186     a_actionTimer = NULL;
187   }
188 }
189
190
191 void Session::activateTimer() throw() {
192   LOGMETHOD(anna::TraceMethod traceMethod("anna::diameter::comm::Session", "activateTimer", ANNA_FILE_LOCATION));
193   cancelActionTimer();
194
195   try {
196     if(a_timeController == NULL)  // Application must created a timex engine
197       a_timeController = anna::app::functions::component <anna::timex::Engine> (ANNA_FILE_LOCATION);
198
199     if(isActive()) cancelTimer();
200
201     a_timeController->activate(this);
202   } catch(anna::RuntimeException& ex) {
203     std::string msg = "CAPTURED EXCEPTION during session timer activation (activateTimer): ";
204     msg += ex.getText();
205     anna::Logger::error(msg, ANNA_FILE_LOCATION);
206   }
207
208   timerStarted();
209 }
210
211
212 void Session::cancelTimer() throw() {
213   LOGMETHOD(anna::TraceMethod traceMethod("anna::diameter::comm::Session", "cancelTimer", ANNA_FILE_LOCATION));
214
215   if(isActive()) {
216     try {
217       if(a_timeController == NULL)  // Application must created a timex engine
218         a_timeController = anna::app::functions::component <anna::timex::Engine> (ANNA_FILE_LOCATION);
219
220       a_timeController->cancel(this);
221     } catch(anna::RuntimeException& ex) {
222       std::string msg = "CAPTURED EXCEPTION during session timer cancellation (cancelTimer): ";
223       msg += ex.getText();
224       anna::Logger::error(msg, ANNA_FILE_LOCATION);
225     }
226
227     timerStopped();
228   } else {
229     LOGDEBUG(anna::Logger::debug("Timer not activated!", ANNA_FILE_LOCATION));
230   }
231 }
232
233
234 //-------------------------------------------------------------------------
235 // Se invoca desde diameter::comm::Timer
236 //-------------------------------------------------------------------------
237 void Session::expireResponse(diameter::comm::Response* response)
238 throw() {
239   LOGMETHOD(anna::TraceMethod traceMethod("anna::diameter::comm::Session", "expireResponse", ANNA_FILE_LOCATION));
240   bool doUnbind = false;
241   bool doRetransmission = false;
242
243   if(response->getClassCode() != ClassCode::Bind) {
244     if(response->getRequest()->getOnExpiry() == Message::OnExpiry::Abandon) {
245       a_onDisconnect = OnDisconnect::IgnorePendings; // Abandon is not graceful
246       doUnbind = true;
247     }
248     else if(response->getRequest()->getOnExpiry() == Message::OnExpiry::Retransmit) {
249       doRetransmission = true;
250     }
251   } else
252     doUnbind = true; // (*)
253
254
255   try {
256     response->setMessage(NULL);
257     eventResponse(*response);
258   } catch(anna::RuntimeException& ex) {
259     ex.trace();
260   }
261
262   // DPR special case:
263   diameter::CommandId cid = response->getRequest()->getCommandId();
264
265   if((cid == helpers::base::COMMANDID__Disconnect_Peer_Request) && (a_state == State::WaitingDPA)) {
266     LOGDEBUG(anna::Logger::debug("Expired DPR sent to remote diameter point: local DPR procedure will be ignored going to the Bound state", ANNA_FILE_LOCATION));
267     setState(State::Bound);
268   }
269
270   if(doRetransmission) {
271     diameter::comm::Message *request = const_cast<Message*>(response->getRequest());
272     int retries = request->getRetries();
273     if (retries > 0) {
274       retries--;
275       request->setRetries(retries);
276       LOGDEBUG
277       (
278         std::string msg(asString());
279         msg += anna::functions::asString(" | Retransmission initiated for request with HopByHop: %u; remaining %d retries", response->getHopByHop(), retries);
280         anna::Logger::debug(msg, ANNA_FILE_LOCATION);
281       );
282       diameter::codec::functions::setPotentiallyReTransmittedMessageBit(*request);
283       eventRequestRetransmission(request);
284       send(request);
285     }
286   }
287
288   response_erase(response);
289
290   if(doUnbind) unbind();
291 }
292
293 void Session::finalize() throw() {
294   LOGMETHOD(anna::TraceMethod traceMethod("anna::diameter::comm::Session", "finalize", ANNA_FILE_LOCATION));
295   setState(State::Closed);
296   cancelTimer(); // Session timer
297   cancelActionTimer(); // Action timer
298   eventPeerShutdown();
299 ///////////////////////////////////////////////////////////////////////
300 // Notificar la finalizaci�n de las respuestas pendientes de recibir //
301 ///////////////////////////////////////////////////////////////////////
302 // RFC 3588 - 5.5.4.  Failover and Failback Procedures
303 //
304 //      In the event that a transport failure is detected with a peer, it is
305 //      necessary for all pending request messages to be forwarded to an
306 //      alternate agent, if possible.  This is commonly referred to as
307 //      failover.
308 //
309 //      In order for a Diameter node to perform failover procedures, it is
310 //      necessary for the node to maintain a pending message queue for a
311 //      given peer.  When an answer message is received, the corresponding
312 //      request is removed from the queue.  The Hop-by-Hop Identifier field
313 //      is used to match the answer with the queued request.
314 //
315 //      When a transport failure is detected, if possible all messages in the
316 //      queue are sent to an alternate agent with the T flag set.  On booting
317 //      a Diameter client or agent, the T flag is also set on any records
318 //      still remaining to be transmitted in non-volatile storage.  An
319 //      example of a case where it is not possible to forward the message to
320 //      an alternate server is when the message has a fixed destination, and
321 //      the unavailable peer is the message's final destination (see
322 //      Destination-Host AVP).  Such an error requires that the agent return
323 //      an answer message with the 'E' bit set and the Result-Code AVP set to
324 //      DIAMETER_UNABLE_TO_DELIVER.
325 //
326 //      It is important to note that multiple identical requests or answers
327 //      MAY be received as a result of a failover.  The End-to-End Identifier
328 //      field in the Diameter header along with the Origin-Host AVP MUST be
329 //      used to identify duplicate messages.
330   Response* response;
331
332   for(response_iterator ii = response_begin(), maxii = response_end(); ii != maxii; ii ++) {
333     response = Session::response(ii);
334     response->setResultCode(Response::ResultCode::DiameterUnavailable);
335
336     if(!a_notifyOrphansOnExpiration) {  // to avoid message bursts (to alternate servers for client-session context), we will manage at expireResponse
337       response->cancelTimer();
338
339       try {
340         response->setMessage(NULL);
341         eventResponse(*response);
342       } catch(anna::RuntimeException& ex) {
343         ex.trace();
344       }
345
346       Response::release(response);
347     }
348   }
349
350   if(a_notifyOrphansOnExpiration) return;
351
352   a_responses.clear();
353 }
354
355 void Session::response_add(Response* response)
356 throw() {
357   a_responses.add(response);
358   response->setSession(this);
359
360   try {
361     response->activateTimer();
362   } catch(anna::Exception& ex) {
363     ex.trace();
364   }
365 }
366
367 void Session::response_erase(Response* response)
368 throw() {
369   a_responses.erase(response);
370   Response::release(response);
371
372   if(a_state == State::Disconnecting)  // only OnDisconnect::WaitPendings arrives here (the other disconnect suddently)
373     if(getOTARequests() == 0) sendDPA();
374
375   if(a_state == State::Closing)  // only OnDisconnect::WaitPendings arrives here (the other disconnect suddently)
376     if(getOTARequests() == 0) unbind();
377 }
378
379 Response* Session::response_find(const HopByHop hopByHop)
380 throw(anna::RuntimeException) {
381   diameter::comm::Response* result = a_responses.find(hopByHop);
382 //   if (result == NULL) {
383 //      string msg(asString());
384 //      msg += anna::functions::asString(" | Response received for non registered context (HopByHop: %u)", hopByHop);
385 //      throw anna::RuntimeException(msg, ANNA_FILE_LOCATION);
386 //   }
387   //Message * message = const_cast<Message*>(result->getRequest());
388   return result;
389 }
390
391 std::string Session::asString() const
392 throw() {
393   string result(a_className);
394   result += " { ";
395   result += anna::timex::Timer::asString();
396   result += " | Socket Id: ";
397   result += anna::functions::asString(a_socketId);
398   result += " | State: ";
399   result += asText(a_state);
400   result += " | OnDisconnect: ";
401   result += asText(a_onDisconnect);
402   result += " | Next Hop by hop: ";
403   result += anna::functions::asString(a_nextHopByHop);
404   result += " | Next End to end: ";
405   result += anna::functions::asString(a_nextEndToEnd);
406   result += anna::functions::asString(" | OTA requests: %d%s", getOTARequests(), idle() ? " (idle)" : "");
407   result += " | Last Incoming Activity Time: ";
408   result += a_lastIncomingActivityTime.asString();
409   result += " | Last Outgoing Activity Time: ";
410   result += a_lastOutgoingActivityTime.asString();
411
412   for(int i = ClassCode::Bind; i < ClassCode::Max; i ++) {
413     result += " | Timeout for ClassCode '";
414     result += ClassCode::asText((ClassCode::_v)i);
415     result += "': ";
416     result += a_timeouts[i].asString();
417   }
418
419   return result;
420 }
421
422 anna::xml::Node* Session::asXML(anna::xml::Node* parent) const
423 throw() {
424   //parent = anna::timex::Timer::asXML(parent);
425   anna::xml::Node* result = parent->createChild("diameter.comm.Session");
426   result->createAttribute("SocketId", anna::functions::asString(a_socketId));
427   result->createAttribute("State", asText(a_state));
428   result->createAttribute("OnDisconnect", asText(a_onDisconnect));
429   result->createAttribute("NextHopByHop", anna::functions::asString(a_nextHopByHop));
430   result->createAttribute("NextEndToEnd", anna::functions::asString(a_nextEndToEnd));
431   result->createAttribute("OTArequests", anna::functions::asString("%d%s", getOTARequests(), idle() ? " (idle)" : ""));
432   result->createAttribute("LastIncomingActivityTime", a_lastIncomingActivityTime.asString());
433   result->createAttribute("LastOutgoingActivityTime", a_lastOutgoingActivityTime.asString());
434
435   for(int i = ClassCode::Bind; i < ClassCode::Max; i ++) {
436     std::string name = "TimeoutFor"; name += ClassCode::asText((ClassCode::_v)i);
437     result->createAttribute(name.c_str(), a_timeouts[i].asString());
438   }
439
440   // Messages
441   anna::xml::Node* messages = result->createChild("diameter.comm.Messages");
442   const Message* message;
443
444   for(const_response_iterator ii = response_begin(), maxii = response_end(); ii != maxii; ii ++) {
445     if((message = Session::response(ii)->getRequest()) != NULL)
446       message->asXML(messages);
447   }
448
449   return result;
450 }
451
452 const char* Session::asText(const State::_v state)
453 throw() {
454   static const char* text [] = { "Closed", "WaitingBind", "Bound", "Failover", "Suspect", "WaitingDPA", "Disconnecting", "Closing" };
455   return text [state];
456 }
457
458 const char* Session::asText(const OnDisconnect::_v onDisconnect)
459 throw() {
460   static const char* text [] = { "IgnorePendings", "WaitPendings" };
461   return text [onDisconnect];
462 }
463
464
465
466 HopByHop Session::SortById::value(const Response* response)
467 throw() {
468   return response->getHopByHop();
469 }
470
471
472 //------------------------------------------------------------------------------
473 //---------------------------------------- Session::updateIncomingActivityTime()
474 //------------------------------------------------------------------------------
475 void Session::updateIncomingActivityTime() throw() {
476   a_lastIncomingActivityTime = anna::functions::millisecond();
477   LOGDEBUG
478   (
479     std::string msg = "Updated INCOMING activity on session (milliseconds unix): ";
480     msg += anna::functions::asString(a_lastIncomingActivityTime.getValue());
481     anna::Logger::debug(msg, ANNA_FILE_LOCATION);
482   );
483 }
484
485
486 //------------------------------------------------------------------------------
487 //---------------------------------------- Session::updateOutgoingActivityTime()
488 //------------------------------------------------------------------------------
489 void Session::updateOutgoingActivityTime(void) throw() {
490   a_lastOutgoingActivityTime = anna::functions::millisecond();
491   LOGDEBUG
492   (
493     std::string msg = "Updated OUTGOING activity on session (milliseconds unix): ";
494     msg += anna::functions::asString(a_lastOutgoingActivityTime.getValue());
495     anna::Logger::debug(msg, ANNA_FILE_LOCATION);
496   );
497 }