Updated license
[anna.git] / source / comm / Communicator.cpp
1 // ANNA - Anna is Not Nothingness Anymore
2 //
3 // (c) Copyright 2005-2014 Eduardo Ramos Testillano & Francisco Ruiz Rayo
4 //
5 // https://bitbucket.org/testillano/anna
6 //
7 // Redistribution and use in source and binary forms, with or without
8 // modification, are permitted provided that the following conditions
9 // are met:
10 //
11 //     * Redistributions of source code must retain the above copyright
12 // notice, this list of conditions and the following disclaimer.
13 //     * Redistributions in binary form must reproduce the above
14 // copyright notice, this list of conditions and the following disclaimer
15 // in the documentation and/or other materials provided with the
16 // distribution.
17 //     * Neither the name of Google Inc. nor the names of its
18 // contributors may be used to endorse or promote products derived from
19 // this software without specific prior written permission.
20 //
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 //
33 // Authors: eduardo.ramos.testillano@gmail.com
34 //          cisco.tierra@gmail.com
35
36
37 #include <poll.h>
38 #include <time.h>
39 #include <errno.h>
40 #include <signal.h>
41 #include <string.h>
42 #include <sys/ioctl.h>
43
44 #include <algorithm>
45
46 #include <anna/core/tracing/Logger.hpp>
47 #include <anna/core/tracing/TraceMethod.hpp>
48 #include <anna/core/mt/Guard.hpp>
49 #include <anna/core/util/Average.hpp>
50 #include <anna/core/mt/ThreadManager.hpp>
51 #include <anna/core/mt/Thread.hpp>
52 #include <anna/core/util/Microsecond.hpp>
53 #include <anna/core/util/defines.hpp>
54
55 #include <anna/comm/comm.hpp>
56 #include <anna/comm/internal/sccs.hpp>
57 #include <anna/comm/internal/BinderSocket.hpp>
58 #include <anna/comm/internal/RemoteConnection.hpp>
59 #include <anna/comm/handler/Manager.hpp>
60
61 // ST
62 #include <anna/comm/internal/Poll.hpp>
63
64 // MT
65 #include <anna/core/mt/ThreadManager.hpp>
66
67 #include <anna/xml/Node.hpp>
68 #include <anna/xml/Attribute.hpp>
69
70 using namespace std;
71 using namespace anna;
72
73 // static
74 const Millisecond comm::Communicator::MinRecoveryTime(1000);
75 const Millisecond comm::Communicator::DefaultRecoveryTime(5000);
76 const Millisecond comm::Communicator::MaxRecoveryTime(30000);
77 const Millisecond comm::Communicator::MinTryingConnectionTime(100);
78 const Millisecond comm::Communicator::DefaultTryingConnectionTime(200);
79 const Millisecond comm::Communicator::MaxTryingConnectionTime(1000);
80 const Millisecond comm::Communicator::DefaultTimeout(10 * 60 * 1000); // 10 minutos.
81
82 //static
83 Millisecond comm::Communicator::st_ReceivingChunkSize(comm::Communicator::DefaultChunkSize);
84
85 Communicator::Communicator(const Communicator::WorkMode::_v workMode) :
86   Component(getClassName()),
87 #ifdef _MT
88   a_workMode(WorkMode::Single),
89 #else
90   a_workMode(workMode),
91 #endif
92   a_recoveryTime(DefaultRecoveryTime),
93   a_requestedStop(false),
94   a_status(Status::Available),
95   a_isServing(false),
96   a_poll(NULL),
97   a_threadManager(NULL),
98   a_timeout(DefaultTimeout),
99   a_mainHandler(NULL),
100   a_tryingConnectionTime(DefaultTryingConnectionTime) {
101   WHEN_SINGLETHREAD(a_poll = new Poll);
102   WHEN_MULTITHREAD(
103     a_threadManager = new ThreadManager("comm::Communicator::ThreadManager", ThreadManager::Mode::Unlimit, 0)
104   );
105   handler::Manager::instantiate().initialize(this);
106   a_connectionRecover = new comm::ConnectionRecover(this);
107   a_levelOfDenialService = comm::CongestionController::MaxLevel - 1;
108   sigignore(SIGPIPE);
109   comm::sccs::activate();
110 }
111
112 /*virtual*/
113 Communicator::~Communicator() {
114   delete a_poll;
115   delete a_threadManager;
116   delete a_connectionRecover;
117 }
118
119 void Communicator::setRecoveryTime(const Millisecond &recoveryTime)
120 throw(RuntimeException) {
121   if(recoveryTime < MinRecoveryTime || recoveryTime > MaxRecoveryTime) {
122     string msg("comm::Communicator::setRecoveryTime | ");
123     msg += functions::asString("RecoveryTime (%d ms) must be between %d ms and %d ms", recoveryTime.getValue(), MinRecoveryTime.getValue(), MaxRecoveryTime.getValue());
124     throw RuntimeException(msg, ANNA_FILE_LOCATION);
125   }
126
127   a_recoveryTime = recoveryTime;
128 }
129
130 void Communicator::setTryingConnectionTime(const Millisecond &tryingConnectionTime)
131 throw(RuntimeException) {
132   if(tryingConnectionTime < MinTryingConnectionTime || tryingConnectionTime > MaxTryingConnectionTime) {
133     string msg("comm::Communicator::setTryingConnectionTime | ");
134     msg += functions::asString("TryingConnectionTime (%d ms) must be between %d ms and %d ms", tryingConnectionTime.getValue(), MinTryingConnectionTime.getValue(), MaxTryingConnectionTime.getValue());
135     throw RuntimeException(msg, ANNA_FILE_LOCATION);
136   }
137
138   a_tryingConnectionTime = tryingConnectionTime;
139 }
140
141 //static
142 void Communicator::setReceivingChunkSize(const int receivingChunkSize)
143 throw(RuntimeException) {
144   if(receivingChunkSize < MinReceivingChunkSize || receivingChunkSize > MaxReceivingChunkSize) {
145     string msg("comm::Communicator::setReceivingChunkSize | ");
146     msg += functions::asString("ReceivingChunkSize (%d bytes) must be between %d bytes and %d bytes", receivingChunkSize, MinReceivingChunkSize, MaxReceivingChunkSize);
147     throw RuntimeException(msg, ANNA_FILE_LOCATION);
148   }
149
150   st_ReceivingChunkSize = receivingChunkSize;
151   CongestionController::instantiate().setMaxPendingBytes(receivingChunkSize);
152 }
153
154 void Communicator::setLevelOfDenialService(const int levelOfDenialService)
155 throw(RuntimeException) {
156   const int min(comm::CongestionController::MaxLevel - 2);
157   const int max(comm::CongestionController::MaxLevel);
158
159   if(levelOfDenialService < min || levelOfDenialService > max) {
160     string msg("comm::Communicator::setTryingConnectionTime | ");
161     msg += functions::asString("LevelOfDenialService %d must be between %d and %d", levelOfDenialService, min, max);
162     throw RuntimeException(msg, ANNA_FILE_LOCATION);
163   }
164
165   a_levelOfDenialService = levelOfDenialService;
166 }
167
168 void Communicator::attach(ServerSocket* serverSocket)
169 throw(RuntimeException) {
170   if(serverSocket == NULL)
171     throw RuntimeException("Cannot attach a NULL comm::ServerSocket", ANNA_FILE_LOCATION);
172
173   Guard guard(this, "comm::Communicator::attach (ServerSocket)");
174   Handler* handler = handler::Manager::instantiate().createHandler(serverSocket);
175   insert(handler);
176   LOGDEBUG(
177     string msg("comm::Communicator::attach (ServerSocket) | ");
178     msg += handler->asString();
179     Logger::debug(msg, ANNA_FILE_LOCATION);
180   );
181
182   if(serverSocket->isSharedBind() == true && serverSocket->getBinderSocket() != NULL)
183     attach(serverSocket->getBinderSocket());
184 }
185
186 void Communicator::attach(BinderSocket* binderSocket)
187 throw(RuntimeException) {
188   if(binderSocket == NULL)
189     throw RuntimeException("Cannot attach a NULL comm::BinderSocket", ANNA_FILE_LOCATION);
190
191   Guard guard(this, "comm::Communicator::attach (BinderSocket)");
192   Handler* handler = handler::Manager::instantiate().createHandler(binderSocket);
193   insert(handler);
194   LOGDEBUG(
195     string msg("comm::Communicator::attach (BinderSocket) | ");
196     msg += handler->asString();
197     Logger::debug(msg, ANNA_FILE_LOCATION);
198   );
199 }
200
201 /**
202  * Comienza a tratar la conexion que hacen a un comm::ServerSocket de este proceso, desde algun otro proceso.
203  *
204  * Se invoca desde  comm::handler::ServerSocket::accept [Tx] -> <null>
205  */
206 void Communicator::attach(LocalConnection* localConnection)
207 throw(RuntimeException) {
208   if(localConnection == NULL)
209     throw RuntimeException("Cannot attach a NULL comm::LocalConnection", ANNA_FILE_LOCATION);
210
211   /*
212    * Obtiene la información/bloquea al ClientSocket para asegurar el orden correcto, ya que de otro modo,
213    * se podría producir un interbloqueo Communicator & ClientSocket.
214    *
215    * Recordar que el handler::MetaClientSocket::apply sólo bloquea el ClientSocket sobre el que actúa y luego
216    * y si detecta el cierre,  desbloquea el ClientSocket y bloquea el Communicator. Así que para mantener
217    * el orden correcto hay que invocar al comm::ClientSocket::getTransport (que establece una SCCS sobre él) antes
218    * de bloquea el Communicator.
219    */
220   ClientSocket* clientSocket = localConnection->getClientSocket();
221
222   if(clientSocket == NULL)
223     throw RuntimeException("comm::Communicator::attach (LocalConnection) | ClientSocket can not be NULL", ANNA_FILE_LOCATION);
224
225   // Todavía no está corriendo el thread que se encargará de éste comm::ClientSocket => podemos acceder a él sin SSCC.
226   const Transport* transport = clientSocket->unsafe_reserveTransport();
227   Guard guard(this, "comm::Communicator::attach (LocalConnection)");
228   Handler* handler = handler::Manager::instantiate().createHandler(localConnection);
229   insert(handler);
230
231   if(((transport == NULL) ? true : transport->enableTimeout()) == true)
232     handler->setTimeout(a_timeout);
233
234   WHEN_SINGLETHREAD(
235
236     if(a_workMode == WorkMode::Single && handler->supportTimeout() == true)
237     a_timedouts.add(handler)
238   );
239
240   LOGDEBUG(
241     string msg("comm::Communicator::attach | ");
242     msg += handler->asString();
243     Logger::debug(msg, ANNA_FILE_LOCATION);
244   );
245
246   if(a_workMode == WorkMode::Clone) {
247     a_mainHandler = handler;
248     app::Application& app = app::functions::getApp().clone();
249   }
250 }
251
252 /*
253  * Este método sólo se invoca desde comm::Server::connect y este método los primero que hace
254  * es bloquear el acceso al comunicador, para asegurar el orden correcto de bloqueo a l hora
255  * de tratar la desconexión.
256  *
257  * Por eso no hace falta establecer la sección crítica que habitualmente se establece en todos
258  * los métodos Communicator::attach.
259  */
260 void Communicator::attach(RemoteConnection* remoteConnection)
261 throw(RuntimeException) {
262   if(remoteConnection == NULL)
263     throw RuntimeException("Cannot attach a NULL comm::RemoteConnection", ANNA_FILE_LOCATION);
264
265 //   Guard guard (this, "comm::Communicator::attach (RemoteConnection)");
266   Handler* handler = handler::Manager::instantiate().createHandler(remoteConnection);
267   insert(handler);
268   LOGDEBUG(
269     string msg("comm::Communicator::attach | ");
270     msg += handler->asString();
271     Logger::debug(msg, ANNA_FILE_LOCATION);
272   );
273 }
274
275 void Communicator::attach(ClientSocket* socket)
276 throw(RuntimeException) {
277   if(socket == NULL)
278     throw RuntimeException("Cannot attach a NULL comm::ClientSocket", ANNA_FILE_LOCATION);
279
280   Guard guard(this, "comm::Communicator::attach (ClientSocket)");
281   Handler* handler = handler::Manager::instantiate().createHandler(socket);
282   insert(handler);
283   LOGDEBUG(
284     string msg("comm::Communicator::attach | ");
285     msg += handler->asString();
286     Logger::debug(msg, ANNA_FILE_LOCATION);
287   );
288 }
289
290 void Communicator::attach(DatagramSocket* socket)
291 throw(RuntimeException) {
292   if(socket == NULL)
293     throw RuntimeException("Cannot attach a NULL comm::DatagramSocket", ANNA_FILE_LOCATION);
294
295   Guard guard(this, "comm::Communicator::attach (DatagramSocket)");
296   Handler* handler = handler::Manager::instantiate().createHandler(socket);
297   insert(handler);
298   LOGDEBUG(
299     string msg("comm::Communicator::attach | ");
300     msg += handler->asString();
301     Logger::debug(msg, ANNA_FILE_LOCATION);
302   );
303 }
304
305 void Communicator::attach(Handler* handler)
306 throw(RuntimeException) {
307   if(handler == NULL)
308     throw RuntimeException("Cannot attach a NULL comm::Handler", ANNA_FILE_LOCATION);
309
310   if(handler->getType() != Handler::Type::Custom)
311     throw RuntimeException("Communicator::attach only accept 'Custom' Handlers", ANNA_FILE_LOCATION);
312
313   Guard guard(this, "comm::Communicator::attach (Handler)");
314   insert(handler);
315   LOGDEBUG(
316     string msg("comm::Communicator::attach (Handler) | ");
317     msg += handler->asString();
318     Logger::debug(msg, ANNA_FILE_LOCATION);
319   );
320 }
321
322 void Communicator::attach(Service* service)
323 throw(RuntimeException) {
324   if(service == NULL)
325     throw RuntimeException("Cannot attach a NULL comm::Service", ANNA_FILE_LOCATION);
326
327   if(std::find(service_begin(), service_end(), service) != service_end())
328     return;
329
330   service->initialize();
331   LOGDEBUG(
332     string msg("comm::Communicator::attach | ");
333     msg += service->asString();
334     Logger::debug(msg, ANNA_FILE_LOCATION);
335   );
336   a_services.push_back(service);
337
338   if(service->isCritical() == true && service->isAvailable() == false)
339     setStatus(Status::Unavailable);
340 }
341
342 void Communicator::insert(Handler* handler)
343 throw(RuntimeException) {
344   handler->initialize();
345
346   if(handler->getfd() < 0) {
347     string msg(handler->asString());
348     msg += " | Cannot attach a Handler with fd < 0";
349     throw RuntimeException(msg, ANNA_FILE_LOCATION);
350   }
351
352   const Handler* other = find(handler->getfd());
353
354   if(other != NULL) {
355     string msg("commm::Comunicator::insert | New: ");
356     msg += handler->asString();
357     msg += " | Collision: ";
358     msg += other->asString();
359     throw RuntimeException(msg, ANNA_FILE_LOCATION);
360   }
361
362   a_handlers.add(handler);
363
364   if(handler->supportTimeout())
365     handler->beat(anna::functions::hardwareClock());
366
367   WHEN_SINGLETHREAD(a_poll->insert(handler->getfd()));
368   WHEN_MULTITHREAD(
369
370     if(a_isServing == true)
371     a_threadManager->createThread()->start(*handler);
372   );
373 }
374
375 void Communicator::detach(ServerSocket* serverSocket)
376 throw() {
377   if(serverSocket == NULL)
378     return;
379
380   comm::BinderSocket* binderSocket = serverSocket->getBinderSocket();
381   detach(find(serverSocket->getfd()));
382
383   if(binderSocket != NULL)
384     detach(binderSocket);
385 }
386
387 void Communicator::detach(ClientSocket* clientSocket)
388 throw() {
389   if(clientSocket == NULL)
390     return;
391
392   detach(find(clientSocket->getfd()));
393 }
394
395 void Communicator::detach(BinderSocket* binderSocket)
396 throw() {
397   if(binderSocket == NULL)
398     return;
399
400   detach(find(binderSocket->getfd()));
401 }
402
403 /*
404  * Finaliza el trabajo del handler recibido, en este momento NO hay ninguna SSCC establecida.
405  * (1) Si se cierra la conexion con el cliente que al que atencia este proceso clonado => debe terminar la ejecucion.
406  */
407 void Communicator::detach(Handler* handler)
408 throw() {
409   if(handler == NULL)
410     return;
411
412   Guard guard(this, "comm::Communicator::detach");
413   const int fd = handler->getfd();
414   LOGDEBUG(
415     string msg("comm::Communicator::detach (Handler) | ");
416     msg += handler->asString();
417     Logger::debug(msg, ANNA_FILE_LOCATION);
418   );
419
420   if(anna::functions::supportMultithread() == true)
421     handler->requestStop();
422   else if(handler->supportTimeout() == true)
423     a_timedouts.erase(handler);
424
425   handler->finalize();
426   WHEN_SINGLETHREAD(a_poll->erase(fd););
427
428   if(a_handlers.erase(handler) == false) {
429     LOGWARNING(
430       string msg(functions::asText("Handler: ", fd));
431       msg += " | Not found";
432       Logger::warning(msg, ANNA_FILE_LOCATION);
433     );
434   }
435
436   if(a_workMode == WorkMode::Clone && handler == a_mainHandler)                                   // (1)
437     requestStop();
438
439   handler::Manager::instantiate().releaseHandler(handler);
440 }
441
442 const Handler* Communicator::getHandler(const ClientSocket& clientSocket)
443 throw(RuntimeException) {
444   Guard guard(this, "comm::Communicator::getHandler");
445   Handler* result = find(clientSocket.getfd());
446
447   if(result != NULL) {
448     switch(result->getType()) {
449     case Handler::Type::ServerSocket:
450     case Handler::Type::BinderSocket:
451     case Handler::Type::Custom:
452       result = NULL;
453       break;
454     }
455   }
456
457   return result;
458 }
459
460 //----------------------------------------------------------------------------------------------
461 // (1) Compruebo la solicitud de parada antes y despues de procesar el mensaje para acelar
462 //     la solicitud de la peticion. En otro caso podrian pasar 30 seg desde que se solicita
463 //     hasta que se acepta.
464 // (2) La invocacion de este metodo originara que se aniadan y borren fd's a la lista de
465 // streams pero la forma de recorrer el bucle nos blinda (un poco) de anomalias.
466 //----------------------------------------------------------------------------------------------
467 void Communicator::accept()
468 throw(RuntimeException) {
469   LOGMETHOD(TraceMethod traceMethod("comm::Communicator", "accept", ANNA_FILE_LOCATION));
470
471   if(isServing() == true)
472     throw RuntimeException("Communicator::accept is already invoked", ANNA_FILE_LOCATION);
473
474   a_requestedStop = false;
475
476   if(handler_size() == 0)
477     throw RuntimeException("No socket has been established for sending and/or reception", ANNA_FILE_LOCATION);
478
479   a_isServing = true;
480   eventStartup();
481   LOGINFORMATION(Logger::write(Logger::Information, Component::asString(), "Polling network", ANNA_FILE_LOCATION));
482
483   try {
484     WHEN_SINGLETHREAD(singlethreadedAccept());
485     WHEN_MULTITHREAD(multithreadedAccept());
486   } catch(RuntimeException& ex) {
487     ex.trace();
488   }
489
490   a_isServing = false;
491   eventShutdown();
492 }
493
494 void Communicator::singlethreadedAccept()
495 throw(RuntimeException) {
496   LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "singlethreadedAccept", ANNA_FILE_LOCATION));
497   Handler* handler;
498   Microsecond maxTime;
499   Microsecond now(anna::functions::hardwareClock());
500   int fd;
501   a_poll->setTimeout((a_connectionRecover->isRunning() == true) ? a_recoveryTime : (Millisecond)5000);
502   maxTime = a_timeout;
503   maxTime += now;
504
505   while(a_requestedStop == false) {
506     if(a_connectionRecover->isRunning() == true) {
507       a_connectionRecover->tryRecover();
508       a_poll->setTimeout((a_connectionRecover->isRunning() == true) ? a_recoveryTime : (Millisecond)5000);
509     }
510
511     a_poll->waitMessage();
512
513     if(a_timedouts.size() > 0)
514       now = anna::functions::hardwareClock();
515
516     while((fd = a_poll->fetch()) != -1) {
517       if((handler = find(fd)) == NULL)
518         continue;
519
520       try {
521         if(handler->supportTimeout())
522           handler->beat(now);
523
524         handler->apply();
525       } catch(RuntimeException& ex) {
526         ex.trace();
527       }
528     }
529
530     if(a_pendingClose == true) {
531       a_pendingClose = false;
532
533       // @Eduardo (multiconnection to same address/port); On st, is considered to have one socket pending to be closed
534       for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++)
535 //            if (Communicator::handler (ii)->testClose () == true)
536 //               break;
537         Communicator::handler(ii)->testClose();
538     }
539
540     now = anna::functions::hardwareClock();
541
542     if(a_timedouts.size() == 0)
543       continue;
544
545     if(now > maxTime) {
546       handler_iterator ii = a_timedouts.begin();
547
548       while(ii != a_timedouts.end()) {
549         handler = Communicator::handler(ii);
550
551         if(handler->isTimeout(now) == true) {
552           LOGWARNING(
553             string msg(handler->asString());
554             msg += " | Closed due to inactivity";
555             Logger::warning(msg, ANNA_FILE_LOCATION);
556           );
557           detach(handler);
558           ii = a_timedouts.begin();
559         } else
560           ii ++;
561       }
562
563       maxTime = a_timeout;
564       maxTime += now;
565     }
566   }
567 }
568
569 void Communicator::multithreadedAccept()
570 throw(RuntimeException) {
571   LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "multithreadedAccept", ANNA_FILE_LOCATION));
572   {
573     Guard guard(this, "comm::Communicator::multithreadedAccept");
574     Handler* handler;
575
576     for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++) {
577       handler = Communicator::handler(ii);
578       LOGDEBUG(
579         string msg("Starting | ");
580         msg += handler->asString();
581         Logger::debug(msg, ANNA_FILE_LOCATION)
582       );
583       a_threadManager->createThread()->start(*handler);
584     }
585   }
586   Millisecond delay(500);
587
588   while(a_requestedStop == false) {
589     anna::functions::sleep(delay);
590
591     if(a_requestedStop == true)
592       break;
593
594     if(a_connectionRecover->isRunning() == false)
595       continue;
596
597     {
598       Guard guard(this, "comm::Communicator::multithreadedAccept (ConnectionRecover)");
599       a_connectionRecover->tryRecover();
600     }
601   }
602 }
603
604 void Communicator::requestStop()
605 throw() {
606   if(a_requestedStop == true)
607     return;
608
609   Guard guard(this, "comm::Communicator::requestStop");
610
611   if(a_requestedStop == true)
612     return;
613
614   a_requestedStop = true;
615
616   try  {
617     for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++)
618       handler(ii)->requestStop();
619   } catch(RuntimeException& ex) {
620     ex.trace();
621   }
622
623   LOGWARNING(
624     string msg("comm::Communicator::requestStop | ");
625     msg += asString();
626     Logger::warning(msg, ANNA_FILE_LOCATION);
627   );
628 }
629
630 bool Communicator::isUsable(const ClientSocket* clientSocket)
631 throw() {
632   if(clientSocket == NULL)
633     return false;
634
635   Guard guard(this, "comm::Communicator::isUsable");
636   Handler* handler = find(clientSocket->getfd());
637
638   if(handler == NULL)
639     return false;
640
641   const Handler::Type::_v type = handler->getType();
642   return (type == Handler::Type::LocalConnection || type == Handler::Type::RemoteConnection);
643 }
644
645 void Communicator::setStatus(const Status& status)
646 throw() {
647   Guard guard(this, "comm::Communicator::setStatus");
648
649   if(a_status != status)
650     a_status = status;
651
652   LOGINFORMATION(
653     string msg("comm::Communicator::setStatus | ");
654     msg += a_status.asString();
655     Logger::information(msg, ANNA_FILE_LOCATION);
656   );
657 }
658
659 void Communicator::eventBreakAddress(const in_addr_t& address)
660 throw() {
661   Device* device = Network::instantiate().find(address);
662
663   if(device->getStatus() == Device::Status::Down)
664     return;
665
666   LOGWARNING(
667     string msg("comm::Communicator::eventBreakAddress | ");
668     msg += device->asString();
669     Logger::warning(msg, ANNA_FILE_LOCATION);
670   );
671   Guard guard(this, "comm::Communicator::eventBreakAddress");
672   device->setStatus(Device::Status::Down);
673   /**
674    * Trabaja sobre una copia para no perder la referencia cuando se elimine un miembro de la lista original
675    *
676    * En la lista a borrar sólo mete los handler::ServerSocket y los handler::RemoteConnection, ya que los handler::LocalConnection
677    * será liberados recursivamente cuando liberemos los primeros.
678    */
679   typedef vector <comm::Handler*> work_container;
680   typedef work_container::iterator work_iterator;
681   work_container ww;
682
683   for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++) {
684     comm::Handler* handler = comm::Communicator::handler(ii);
685
686     if(dynamic_cast <comm::handler::ServerSocket*>(handler) != NULL) {
687       ww.push_back(handler);
688     } else if(dynamic_cast <comm::handler::RemoteConnection*>(handler) != NULL) {
689       ww.push_back(handler);
690     }
691   }
692
693   for(work_iterator ii = ww.begin(), maxii = ww.end(); ii != maxii; ii ++) {
694     LOGDEBUG(
695       std::string msg("Communicator::eventBreakAddress | ");
696       msg = (*ii)->asString();
697       Logger::debug(msg, ANNA_FILE_LOCATION)
698     );
699     (*ii)->breakAddress(address);
700   }
701 }
702
703 void Communicator::eventRecoverAddress(const in_addr_t& address)
704 throw() {
705   Device* device = Network::instantiate().find(address);
706
707   if(device->getStatus() == Device::Status::Up)
708     return;
709
710   LOGWARNING(
711     string msg("comm::Communicator::eventRecoverAddress | ");
712     msg += device->asString();
713     Logger::warning(msg, ANNA_FILE_LOCATION);
714   );
715   Guard guard(this, "comm::Communicator::eventRecoverAddress");
716   device->setStatus(Device::Status::Up);
717   Handlers backup(a_handlers);
718
719   for(handler_iterator ii = backup.begin(), maxii = backup.end(); ii != maxii; ii ++)
720     handler(ii)->recoverAddress(address);
721 }
722
723 bool Communicator::eventAcceptConnection(const ClientSocket& clientSocket)
724 throw(RuntimeException) {
725   LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventAcceptConnection", ANNA_FILE_LOCATION));
726
727   if(a_requestedStop == true) {
728     LOGWARNING(
729       string msg(Component::asString());
730       msg += " | ";
731       msg += clientSocket.asString();
732       msg += " | Connection rejected due to stop request";
733       Logger::warning(msg, ANNA_FILE_LOCATION);
734     );
735     return false;
736   }
737
738   CongestionController::Workload ww = CongestionController::instantiate().getAccumulatedWorkload();
739
740   if(CongestionController::getLevel(ww) >= a_levelOfDenialService) {
741     LOGWARNING(
742       string msg("comm::Communicator::eventAcceptConnection | Level: ");
743       msg += functions::asString(CongestionController::getLevel(ww));
744       msg += functions::asString(" | Load: %d%% ", CongestionController::getLoad(ww));
745       msg += " | Result: false";
746       Logger::warning(msg, ANNA_FILE_LOCATION);
747     );
748     return false;
749   }
750
751   return true;
752 }
753
754 //--------------------------------------------------------------------------------------------------------
755 // (1) Alguno de los comm::Server asociados al servicio puede haber pasado a activo, pero nos
756 //     interesa como estaba el servicio antes de esta activacion.
757 // (2) Si el servicio es "No critico" => no afecta al estado del proceso.
758 // (3) Solo si todos los servicios "Criticos" estan disponibles pasara a estar "Activo".
759 //--------------------------------------------------------------------------------------------------------
760 void Communicator::eventCreateConnection(const Server* server)
761 throw() {
762   if(server == NULL)
763     return;
764
765   LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventCreateConnection", ANNA_FILE_LOCATION));
766   LOGNOTICE(
767     string msg("comm::Communicator::eventCreateConnection | ");
768     msg += server->asString();
769     Logger::notice(msg, ANNA_FILE_LOCATION);
770   );
771   bool recoverService = false;
772
773   for(Server::const_iterator ii = server->begin(), maxii = server->end(); ii != maxii; ii ++) {
774     Service* service = const_cast <Service*>(Server::service(ii));
775
776     if(service->wasAvailable() == true) {                                             // (1)
777       service->recover(server);
778       continue;
779     }
780
781     service->recover(server);
782     eventCreateConnection(service);
783     recoverService = true;
784   }
785
786   if(recoverService == false || a_status == Status::Available)
787     return;
788
789   Guard guard(this, "comm::Communicator::eventCreateConnection");
790   Status status(Status::Available);
791
792   for(const_service_iterator ii = service_begin(), maxii = service_end(); ii != maxii; ii ++) {
793     const Service* service = Communicator::service(ii);
794
795     if(service->isCritical() == false)                                               // (2)
796       continue;
797
798     if(service->isAvailable() == false) {
799       status = Status::Unavailable;
800       break;
801     }
802   }
803
804   setStatus(status);                                                                  // (3)
805 }
806
807 void Communicator::eventCreateConnection(const Service* service)
808 throw() {
809   if(service == NULL)
810     return;
811
812   LOGNOTICE(
813     string msg("comm::Communicator::eventCreateConnection | ");
814     msg += service->asString();
815     Logger::notice(msg, ANNA_FILE_LOCATION);
816   );
817 }
818
819 /*
820  * Se invoca desde handler::RemoteConnection:[Tx] -> Communicator
821  */
822 void Communicator::eventBreakConnection(const Server* server)
823 throw() {
824   if(server == NULL)
825     return;
826
827   LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventBreakConnection (server)", ANNA_FILE_LOCATION));
828   LOGWARNING(
829     string msg("comm::Communicator::eventBreakConnection | ");
830     msg += server->asString();
831     Logger::warning(msg, ANNA_FILE_LOCATION);
832   );
833   //Guard guard (this, "comm::Communicator::eventBreakConnection");
834   Status status(a_status);
835
836   for(Server::const_iterator ii = server->begin(), maxii = server->end(); ii != maxii; ii ++) {
837     Service* service = const_cast <Service*>(Server::service(ii));
838     service->fault(server);
839
840     if(service->isAvailable() == true)
841       continue;
842
843     eventBreakConnection(service);
844
845     if(status == Status::Available && service->isCritical() == true)
846       status = Status::Unavailable;
847   }
848
849   setStatus(status);
850 }
851
852 void Communicator::eventBreakConnection(const Service* service)
853 throw() {
854   if(service == NULL)
855     return;
856
857   LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventBreakConnection (service)", ANNA_FILE_LOCATION));
858   LOGWARNING(
859     string msg("comm::Communicator::eventBreakConnection | ");
860     msg += service->asString();
861     Logger::warning(msg, ANNA_FILE_LOCATION);
862   );
863 }
864
865 void Communicator::eventShutdown()
866 throw() {
867   LOGWARNING(
868     string msg("comm::Communicator::eventShutdown | ");
869     msg += asString();
870     Logger::warning(msg, ANNA_FILE_LOCATION);
871   );
872   setStatus(Status::Unavailable);
873 #ifndef _MT
874   Handlers backup(a_handlers);
875
876   for(handler_iterator ii = backup.begin(), maxii = backup.end(); ii != maxii; ii ++)
877     detach(handler(ii));
878
879 #else
880
881   try {
882     a_threadManager->join();
883   } catch(RuntimeException& ex) {
884     ex.trace();
885   }
886
887 #endif
888 }
889
890 std::string Communicator::asString() const
891 throw() {
892   string result("comm::Communicator { ");
893   result += Component::asString();
894   result += " | RequestedStop: ";
895   result += anna::functions::asString(a_requestedStop);
896   result += " | ";
897   result += a_status.asString();
898   result += anna::functions::asString(" | Handlers: %d", a_handlers.size());
899   result += anna::functions::asString(" | RecoveryTime: %d ms", a_recoveryTime.getValue());
900   return result += " }";
901 }
902
903 xml::Node* Communicator::asXML(xml::Node* parent) const
904 throw() {
905   parent = app::Component::asXML(parent);
906   xml::Node* result = parent->createChild("comm.Communicator");
907   result->createAttribute("RequestedStop", anna::functions::asString(a_requestedStop));
908   result->createAttribute("RecoveryTime", a_recoveryTime);
909   result->createAttribute("TryingConnectionTime", a_tryingConnectionTime);
910   result->createAttribute("Timeout", a_timeout);
911   result->createAttribute("Status", a_status.asString());
912   result->createAttribute("Mode", (a_workMode == WorkMode::Single) ? "Single" : "Clone");
913   result->createAttribute("LevelOfDenialService", a_levelOfDenialService);
914   Network::instantiate().asXML(result);
915   xml::Node* node = result->createChild("comm.Handlers");
916
917   for(const_handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++)
918     handler(ii)->asXML(node);
919
920   node = result->createChild("comm.Services");
921
922   for(const_service_iterator ii = service_begin(), maxii = service_end(); ii != maxii; ii ++)
923     service(ii)->asXML(node);
924
925   a_connectionRecover->asXML(result);
926   CongestionController& congestionController = CongestionController::instantiate();
927   congestionController.asXML(result);
928   return result;
929 }
930
931 /*
932  * Se invoca desde app::Application::clone -> app::Component::do_cloneParent (ojo EN EL PROCESO ORIGINAL).
933  * Ofrece la posiblidad de que el componente del proceso original liberen los recursos que no va
934  * a usar.
935  *
936  * Cada vez que se clona tiene que sacar de su comprobacion las conexiones que ha sufrido. Ya que de otro
937  * modo podria estar tratantado contestaciones a peticiones realizadas desde los procesos hijos => estos
938  * nunca recibirian las respuestas a sus peticiones.
939  *
940  * Estas conexiones no se pueden cerrar porque deben mantenerse abiertas para que los procesos hijos las
941  * hereden, solo es que este proceso no debe atenderlas.
942  *
943  * El codigo del servidor (el proceso original) no hace nada con ese Socket, lo cierra porque ya lo ha duplicado
944  * el proceso hijo y sigue atendiendo peticiones de conexion.
945  */
946 void Communicator::do_cloneParent()
947 throw(RuntimeException) {
948   LOGMETHOD(TraceMethod traceMethod("comm::Communicator", "do_cloneParent", ANNA_FILE_LOCATION));
949   Handler* handler;
950   Handler::Type::_v type;
951
952   for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++) {
953     handler = Communicator::handler(ii);
954     type = handler->getType();
955
956     if(type == Handler::Type::RemoteConnection || type == Handler::Type::ClientSocket)
957       a_poll->erase(handler->getfd());
958   }
959
960   try {
961     handler = a_mainHandler;
962     a_mainHandler = NULL;
963     detach(handler);
964   } catch(RuntimeException& ex) {
965     ex.trace();
966   }
967 }
968
969 /*
970  * Se invoca desde app::Application::clone -> app::Component::do_cloneChild (ojo EN EL NUEVO PROCESO).
971  * Ofrece la posiblidad de que los componentes que acaban de ser duplicados liberen los recursos que no van
972  * a usar, ya que la copia solo se dedicara a tratar los mensajes que entren por su conexion asociada y
973  * las respuestas a las peticiones originadas en el tratamiento del mismo.
974  *
975  * (1) Recordar que el handler que tiene asociado el clon esta registrado para evitar su cierre.
976  * (3) Limpia la lista porque de otro modo si el proceso clon hiciera nuevas conexiones podria encontrar
977  *     que el fd ya esta guardado en la lista. No se puede invocar directamente al detach, porque este
978  *     metodo modifica la lista a_handlers, y nos haria perder la cuenta del iterator.
979  * (4) Realiza un duplicado fisico de las conexiones realizadas por el proceso original, elimina el fd de
980  *     la lista a comprobar, este fd sera cerrado por el metodo Handler::clone.
981  * (5) Registra el Handler por el que ha sido creado este nuevo proceso, asi que cuando se invoque al detach
982  *     con ese handler => el programa terminara la ejecucion.
983  */
984 void Communicator::do_cloneChild()
985 throw() {
986   LOGMETHOD(TraceMethod traceMethod("comm::Communicator", "do_cloneChild", ANNA_FILE_LOCATION));
987   LOGINFORMATION(
988     string msg("comm::Communicator::do_cloneChild | MainHandler: ");
989     msg += a_mainHandler->asString();
990     Logger::information(msg, ANNA_FILE_LOCATION);
991   );
992   Handler* handler(NULL);
993   vector <Handler*> toDetach;
994
995   for(handler_iterator ii = handler_begin(); ii != handler_end(); ii ++) {                        // (1)
996     handler = Communicator::handler(ii);
997
998     if(handler == a_mainHandler)
999       continue;
1000
1001     switch(handler->getType()) {
1002     case Handler::Type::Custom:
1003     case Handler::Type::RemoteConnection:
1004     case Handler::Type::ClientSocket:
1005       a_poll->erase(handler->getfd());                                                         // (4)
1006       LOGDEBUG(
1007         string msg("comm::Communicator::do_cloneChild | ");
1008         msg += handler->asString();
1009         Logger::debug(msg, ANNA_FILE_LOCATION);
1010       );
1011       handler->clone();
1012       a_poll->insert(handler->getfd());
1013       break;
1014     default:
1015       toDetach.push_back(handler);                                                             // (3)
1016       break;
1017     }
1018   }
1019
1020   for(vector <Handler*>::iterator ii = toDetach.begin(), maxii = toDetach.end(); ii != maxii; ii ++)
1021     detach(*ii);
1022
1023   toDetach.clear();
1024
1025   try {
1026     singlethreadedAccept();
1027   } catch(RuntimeException& ex) {
1028     ex.trace();
1029   }
1030
1031   exit(0);                                                                                       // (5)
1032 }
1033
1034 int Communicator::SortByFileDescriptor::value(const Handler* handler)
1035 throw() {
1036   return handler->getfd();
1037 }
1038
1039