Avoid service overhead about checking netstat ports
[anna.git] / source / comm / Communicator.cpp
1 // ANNA - Anna is Not Nothingness Anymore                                                         //
2 //                                                                                                //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo                         //
4 //                                                                                                //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite                           //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
7
8
9 #include <poll.h>
10 #include <time.h>
11 #include <errno.h>
12 #include <signal.h>
13 #include <string.h>
14 #include <sys/ioctl.h>
15
16 #include <algorithm>
17
18 #include <anna/core/tracing/Logger.hpp>
19 #include <anna/core/tracing/TraceMethod.hpp>
20 #include <anna/core/mt/Guard.hpp>
21 #include <anna/core/util/Average.hpp>
22 #include <anna/core/mt/ThreadManager.hpp>
23 #include <anna/core/mt/Thread.hpp>
24 #include <anna/core/util/Microsecond.hpp>
25 #include <anna/core/util/defines.hpp>
26
27 #include <anna/comm/comm.hpp>
28 #include <anna/comm/internal/sccs.hpp>
29 #include <anna/comm/internal/BinderSocket.hpp>
30 #include <anna/comm/internal/RemoteConnection.hpp>
31 #include <anna/comm/handler/Manager.hpp>
32
33 // ST
34 #include <anna/comm/internal/Poll.hpp>
35
36 // MT
37 #include <anna/core/mt/ThreadManager.hpp>
38
39 #include <anna/xml/Node.hpp>
40 #include <anna/xml/Attribute.hpp>
41
42 using namespace std;
43 using namespace anna;
44
45 // static
46 const Millisecond comm::Communicator::MinRecoveryTime(1000);
47 const Millisecond comm::Communicator::DefaultRecoveryTime(5000);
48 const Millisecond comm::Communicator::MaxRecoveryTime(30000);
49 const Millisecond comm::Communicator::MinTryingConnectionTime(100);
50 const Millisecond comm::Communicator::DefaultTryingConnectionTime(200);
51 const Millisecond comm::Communicator::MaxTryingConnectionTime(1000);
52 const Millisecond comm::Communicator::DefaultTimeout(10 * 60 * 1000); // 10 minutos.
53
54 //static
55 Millisecond comm::Communicator::st_ReceivingChunkSize(comm::Communicator::DefaultChunkSize);
56
57 Communicator::Communicator(const Communicator::WorkMode::_v workMode) :
58   Component(getClassName()),
59 #ifdef _MT
60   a_workMode(WorkMode::Single),
61 #else
62   a_workMode(workMode),
63 #endif
64   a_recoveryTime(DefaultRecoveryTime),
65   a_requestedStop(false),
66   a_status(Status::Available),
67   a_isServing(false),
68   a_poll(NULL),
69   a_threadManager(NULL),
70   a_timeout(DefaultTimeout),
71   a_mainHandler(NULL),
72   a_tryingConnectionTime(DefaultTryingConnectionTime) {
73   WHEN_SINGLETHREAD(a_poll = new Poll);
74   WHEN_MULTITHREAD(
75     a_threadManager = new ThreadManager("comm::Communicator::ThreadManager", ThreadManager::Mode::Unlimit, 0)
76   );
77   handler::Manager::instantiate().initialize(this);
78   a_connectionRecover = new comm::ConnectionRecover(this);
79   a_levelOfDenialService = comm::CongestionController::MaxLevel - 1;
80   sigignore(SIGPIPE);
81   comm::sccs::activate();
82 }
83
84 /*virtual*/
85 Communicator::~Communicator() {
86   delete a_poll;
87   delete a_threadManager;
88   delete a_connectionRecover;
89 }
90
91 void Communicator::setRecoveryTime(const Millisecond &recoveryTime)
92 throw(RuntimeException) {
93   if(recoveryTime < MinRecoveryTime || recoveryTime > MaxRecoveryTime) {
94     string msg("comm::Communicator::setRecoveryTime | ");
95     msg += functions::asString("RecoveryTime (%d ms) must be between %d ms and %d ms", recoveryTime.getValue(), MinRecoveryTime.getValue(), MaxRecoveryTime.getValue());
96     throw RuntimeException(msg, ANNA_FILE_LOCATION);
97   }
98
99   a_recoveryTime = recoveryTime;
100 }
101
102 void Communicator::setTryingConnectionTime(const Millisecond &tryingConnectionTime)
103 throw(RuntimeException) {
104   if(tryingConnectionTime < MinTryingConnectionTime || tryingConnectionTime > MaxTryingConnectionTime) {
105     string msg("comm::Communicator::setTryingConnectionTime | ");
106     msg += functions::asString("TryingConnectionTime (%d ms) must be between %d ms and %d ms", tryingConnectionTime.getValue(), MinTryingConnectionTime.getValue(), MaxTryingConnectionTime.getValue());
107     throw RuntimeException(msg, ANNA_FILE_LOCATION);
108   }
109
110   a_tryingConnectionTime = tryingConnectionTime;
111 }
112
113 //static
114 void Communicator::setReceivingChunkSize(const int receivingChunkSize)
115 throw(RuntimeException) {
116   if(receivingChunkSize < MinReceivingChunkSize || receivingChunkSize > MaxReceivingChunkSize) {
117     string msg("comm::Communicator::setReceivingChunkSize | ");
118     msg += functions::asString("ReceivingChunkSize (%d bytes) must be between %d bytes and %d bytes", receivingChunkSize, MinReceivingChunkSize, MaxReceivingChunkSize);
119     throw RuntimeException(msg, ANNA_FILE_LOCATION);
120   }
121
122   st_ReceivingChunkSize = receivingChunkSize;
123   CongestionController::instantiate().setMaxPendingBytes(receivingChunkSize);
124 }
125
126 void Communicator::setLevelOfDenialService(const int levelOfDenialService)
127 throw(RuntimeException) {
128   const int min(comm::CongestionController::MaxLevel - 2);
129   const int max(comm::CongestionController::MaxLevel);
130
131   if(levelOfDenialService < min || levelOfDenialService > max) {
132     string msg("comm::Communicator::setTryingConnectionTime | ");
133     msg += functions::asString("LevelOfDenialService %d must be between %d and %d", levelOfDenialService, min, max);
134     throw RuntimeException(msg, ANNA_FILE_LOCATION);
135   }
136
137   a_levelOfDenialService = levelOfDenialService;
138 }
139
140 void Communicator::attach(ServerSocket* serverSocket)
141 throw(RuntimeException) {
142   if(serverSocket == NULL)
143     throw RuntimeException("Cannot attach a NULL comm::ServerSocket", ANNA_FILE_LOCATION);
144
145   Guard guard(this, "comm::Communicator::attach (ServerSocket)");
146   Handler* handler = handler::Manager::instantiate().createHandler(serverSocket);
147   insert(handler);
148   LOGDEBUG(
149     string msg("comm::Communicator::attach (ServerSocket) | ");
150     msg += handler->asString();
151     Logger::debug(msg, ANNA_FILE_LOCATION);
152   );
153
154   if(serverSocket->isSharedBind() == true && serverSocket->getBinderSocket() != NULL)
155     attach(serverSocket->getBinderSocket());
156 }
157
158 void Communicator::attach(BinderSocket* binderSocket)
159 throw(RuntimeException) {
160   if(binderSocket == NULL)
161     throw RuntimeException("Cannot attach a NULL comm::BinderSocket", ANNA_FILE_LOCATION);
162
163   Guard guard(this, "comm::Communicator::attach (BinderSocket)");
164   Handler* handler = handler::Manager::instantiate().createHandler(binderSocket);
165   insert(handler);
166   LOGDEBUG(
167     string msg("comm::Communicator::attach (BinderSocket) | ");
168     msg += handler->asString();
169     Logger::debug(msg, ANNA_FILE_LOCATION);
170   );
171 }
172
173 /**
174  * Comienza a tratar la conexion que hacen a un comm::ServerSocket de este proceso, desde algun otro proceso.
175  *
176  * Se invoca desde  comm::handler::ServerSocket::accept [Tx] -> <null>
177  */
178 void Communicator::attach(LocalConnection* localConnection)
179 throw(RuntimeException) {
180   if(localConnection == NULL)
181     throw RuntimeException("Cannot attach a NULL comm::LocalConnection", ANNA_FILE_LOCATION);
182
183   /*
184    * Obtiene la información/bloquea al ClientSocket para asegurar el orden correcto, ya que de otro modo,
185    * se podría producir un interbloqueo Communicator & ClientSocket.
186    *
187    * Recordar que el handler::MetaClientSocket::apply sólo bloquea el ClientSocket sobre el que actúa y luego
188    * y si detecta el cierre,  desbloquea el ClientSocket y bloquea el Communicator. Así que para mantener
189    * el orden correcto hay que invocar al comm::ClientSocket::getTransport (que establece una SCCS sobre él) antes
190    * de bloquea el Communicator.
191    */
192   ClientSocket* clientSocket = localConnection->getClientSocket();
193
194   if(clientSocket == NULL)
195     throw RuntimeException("comm::Communicator::attach (LocalConnection) | ClientSocket can not be NULL", ANNA_FILE_LOCATION);
196
197   // Todavía no está corriendo el thread que se encargará de éste comm::ClientSocket => podemos acceder a él sin SSCC.
198   const Transport* transport = clientSocket->unsafe_reserveTransport();
199   Guard guard(this, "comm::Communicator::attach (LocalConnection)");
200   Handler* handler = handler::Manager::instantiate().createHandler(localConnection);
201   insert(handler);
202
203   if(((transport == NULL) ? true : transport->enableTimeout()) == true)
204     handler->setTimeout(a_timeout);
205
206   WHEN_SINGLETHREAD(
207
208     if(a_workMode == WorkMode::Single && handler->supportTimeout() == true)
209     a_timedouts.add(handler)
210   );
211
212   LOGDEBUG(
213     string msg("comm::Communicator::attach | ");
214     msg += handler->asString();
215     Logger::debug(msg, ANNA_FILE_LOCATION);
216   );
217
218   if(a_workMode == WorkMode::Clone) {
219     a_mainHandler = handler;
220     app::Application& app = app::functions::getApp().clone();
221   }
222 }
223
224 /*
225  * Este método sólo se invoca desde comm::Server::connect y este método los primero que hace
226  * es bloquear el acceso al comunicador, para asegurar el orden correcto de bloqueo a l hora
227  * de tratar la desconexión.
228  *
229  * Por eso no hace falta establecer la sección crítica que habitualmente se establece en todos
230  * los métodos Communicator::attach.
231  */
232 void Communicator::attach(RemoteConnection* remoteConnection)
233 throw(RuntimeException) {
234   if(remoteConnection == NULL)
235     throw RuntimeException("Cannot attach a NULL comm::RemoteConnection", ANNA_FILE_LOCATION);
236
237 //   Guard guard (this, "comm::Communicator::attach (RemoteConnection)");
238   Handler* handler = handler::Manager::instantiate().createHandler(remoteConnection);
239   insert(handler);
240   LOGDEBUG(
241     string msg("comm::Communicator::attach | ");
242     msg += handler->asString();
243     Logger::debug(msg, ANNA_FILE_LOCATION);
244   );
245 }
246
247 void Communicator::attach(ClientSocket* socket)
248 throw(RuntimeException) {
249   if(socket == NULL)
250     throw RuntimeException("Cannot attach a NULL comm::ClientSocket", ANNA_FILE_LOCATION);
251
252   Guard guard(this, "comm::Communicator::attach (ClientSocket)");
253   Handler* handler = handler::Manager::instantiate().createHandler(socket);
254   insert(handler);
255   LOGDEBUG(
256     string msg("comm::Communicator::attach | ");
257     msg += handler->asString();
258     Logger::debug(msg, ANNA_FILE_LOCATION);
259   );
260 }
261
262 void Communicator::attach(DatagramSocket* socket)
263 throw(RuntimeException) {
264   if(socket == NULL)
265     throw RuntimeException("Cannot attach a NULL comm::DatagramSocket", ANNA_FILE_LOCATION);
266
267   Guard guard(this, "comm::Communicator::attach (DatagramSocket)");
268   Handler* handler = handler::Manager::instantiate().createHandler(socket);
269   insert(handler);
270   LOGDEBUG(
271     string msg("comm::Communicator::attach | ");
272     msg += handler->asString();
273     Logger::debug(msg, ANNA_FILE_LOCATION);
274   );
275 }
276
277 void Communicator::attach(Handler* handler)
278 throw(RuntimeException) {
279   if(handler == NULL)
280     throw RuntimeException("Cannot attach a NULL comm::Handler", ANNA_FILE_LOCATION);
281
282   if(handler->getType() != Handler::Type::Custom)
283     throw RuntimeException("Communicator::attach only accept 'Custom' Handlers", ANNA_FILE_LOCATION);
284
285   Guard guard(this, "comm::Communicator::attach (Handler)");
286   insert(handler);
287   LOGDEBUG(
288     string msg("comm::Communicator::attach (Handler) | ");
289     msg += handler->asString();
290     Logger::debug(msg, ANNA_FILE_LOCATION);
291   );
292 }
293
294 void Communicator::attach(Service* service)
295 throw(RuntimeException) {
296   if(service == NULL)
297     throw RuntimeException("Cannot attach a NULL comm::Service", ANNA_FILE_LOCATION);
298
299   if(std::find(service_begin(), service_end(), service) != service_end())
300     return;
301
302   service->initialize();
303   LOGDEBUG(
304     string msg("comm::Communicator::attach | ");
305     msg += service->asString();
306     Logger::debug(msg, ANNA_FILE_LOCATION);
307   );
308   a_services.push_back(service);
309
310   if(service->isCritical() == true && service->isAvailable() == false)
311     setStatus(Status::Unavailable);
312 }
313
314 void Communicator::insert(Handler* handler)
315 throw(RuntimeException) {
316   handler->initialize();
317
318   if(handler->getfd() < 0) {
319     string msg(handler->asString());
320     msg += " | Cannot attach a Handler with fd < 0";
321     throw RuntimeException(msg, ANNA_FILE_LOCATION);
322   }
323
324   const Handler* other = find(handler->getfd());
325
326   if(other != NULL) {
327     string msg("commm::Comunicator::insert | New: ");
328     msg += handler->asString();
329     msg += " | Collision: ";
330     msg += other->asString();
331     throw RuntimeException(msg, ANNA_FILE_LOCATION);
332   }
333
334   a_handlers.add(handler);
335
336   if(handler->supportTimeout())
337     handler->beat(anna::functions::hardwareClock());
338
339   WHEN_SINGLETHREAD(a_poll->insert(handler->getfd()));
340   WHEN_MULTITHREAD(
341
342     if(a_isServing == true)
343     a_threadManager->createThread()->start(*handler);
344   );
345 }
346
347 void Communicator::detach(ServerSocket* serverSocket)
348 throw() {
349   if(serverSocket == NULL)
350     return;
351
352   comm::BinderSocket* binderSocket = serverSocket->getBinderSocket();
353   detach(find(serverSocket->getfd()));
354
355   if(binderSocket != NULL)
356     detach(binderSocket);
357 }
358
359 void Communicator::detach(ClientSocket* clientSocket)
360 throw() {
361   if(clientSocket == NULL)
362     return;
363
364   detach(find(clientSocket->getfd()));
365 }
366
367 void Communicator::detach(BinderSocket* binderSocket)
368 throw() {
369   if(binderSocket == NULL)
370     return;
371
372   detach(find(binderSocket->getfd()));
373 }
374
375 /*
376  * Finaliza el trabajo del handler recibido, en este momento NO hay ninguna SSCC establecida.
377  * (1) Si se cierra la conexion con el cliente que al que atencia este proceso clonado => debe terminar la ejecucion.
378  */
379 void Communicator::detach(Handler* handler)
380 throw() {
381   if(handler == NULL)
382     return;
383
384   Guard guard(this, "comm::Communicator::detach");
385   const int fd = handler->getfd();
386   LOGDEBUG(
387     string msg("comm::Communicator::detach (Handler) | ");
388     msg += handler->asString();
389     Logger::debug(msg, ANNA_FILE_LOCATION);
390   );
391
392   if(anna::functions::supportMultithread() == true)
393     handler->requestStop();
394   else if(handler->supportTimeout() == true)
395     a_timedouts.erase(handler);
396
397   handler->finalize();
398   WHEN_SINGLETHREAD(a_poll->erase(fd););
399
400   if(a_handlers.erase(handler) == false) {
401     LOGWARNING(
402       string msg(functions::asText("Handler: ", fd));
403       msg += " | Not found";
404       Logger::warning(msg, ANNA_FILE_LOCATION);
405     );
406   }
407
408   if(a_workMode == WorkMode::Clone && handler == a_mainHandler)                                   // (1)
409     requestStop();
410
411   handler::Manager::instantiate().releaseHandler(handler);
412 }
413
414 const Handler* Communicator::getHandler(const ClientSocket& clientSocket)
415 throw(RuntimeException) {
416   Guard guard(this, "comm::Communicator::getHandler");
417   Handler* result = find(clientSocket.getfd());
418
419   if(result != NULL) {
420     switch(result->getType()) {
421     case Handler::Type::ServerSocket:
422     case Handler::Type::BinderSocket:
423     case Handler::Type::Custom:
424       result = NULL;
425       break;
426     default: break;
427     }
428   }
429
430   return result;
431 }
432
433 //----------------------------------------------------------------------------------------------
434 // (1) Compruebo la solicitud de parada antes y despues de procesar el mensaje para acelar
435 //     la solicitud de la peticion. En otro caso podrian pasar 30 seg desde que se solicita
436 //     hasta que se acepta.
437 // (2) La invocacion de este metodo originara que se aniadan y borren fd's a la lista de
438 // streams pero la forma de recorrer el bucle nos blinda (un poco) de anomalias.
439 //----------------------------------------------------------------------------------------------
440 void Communicator::accept()
441 throw(RuntimeException) {
442   LOGMETHOD(TraceMethod traceMethod("comm::Communicator", "accept", ANNA_FILE_LOCATION));
443
444   if(isServing() == true)
445     throw RuntimeException("Communicator::accept is already invoked", ANNA_FILE_LOCATION);
446
447   a_requestedStop = false;
448
449   if(handler_size() == 0)
450     throw RuntimeException("No socket has been established for sending and/or reception", ANNA_FILE_LOCATION);
451
452   a_isServing = true;
453   eventStartup();
454   LOGINFORMATION(Logger::write(Logger::Information, Component::asString(), "Polling network", ANNA_FILE_LOCATION));
455
456   try {
457     WHEN_SINGLETHREAD(singlethreadedAccept());
458     WHEN_MULTITHREAD(multithreadedAccept());
459   } catch(RuntimeException& ex) {
460     ex.trace();
461   }
462
463   a_isServing = false;
464   eventShutdown();
465 }
466
467 void Communicator::singlethreadedAccept()
468 throw(RuntimeException) {
469   LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "singlethreadedAccept", ANNA_FILE_LOCATION));
470   Handler* handler;
471   Microsecond maxTime;
472   Microsecond now(anna::functions::hardwareClock());
473   int fd;
474   a_poll->setTimeout((a_connectionRecover->isRunning() == true) ? a_recoveryTime : (Millisecond)5000);
475   maxTime = a_timeout;
476   maxTime += now;
477
478   while(a_requestedStop == false) {
479     if(a_connectionRecover->isRunning() == true) {
480       a_connectionRecover->tryRecover();
481       a_poll->setTimeout((a_connectionRecover->isRunning() == true) ? a_recoveryTime : (Millisecond)5000);
482     }
483
484     a_poll->waitMessage();
485
486     if(a_timedouts.size() > 0)
487       now = anna::functions::hardwareClock();
488
489     while((fd = a_poll->fetch()) != -1) {
490       if((handler = find(fd)) == NULL)
491         continue;
492
493       try {
494         if(handler->supportTimeout())
495           handler->beat(now);
496
497         handler->apply();
498       } catch(RuntimeException& ex) {
499         ex.trace();
500       }
501     }
502
503     if(a_pendingClose == true) {
504       a_pendingClose = false;
505
506       // @Eduardo (multiconnection to same address/port); On st, is considered to have one socket pending to be closed
507       for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++)
508 //            if (Communicator::handler (ii)->testClose () == true)
509 //               break;
510         Communicator::handler(ii)->testClose();
511     }
512
513     now = anna::functions::hardwareClock();
514
515     if(a_timedouts.size() == 0)
516       continue;
517
518     if(now > maxTime) {
519       handler_iterator ii = a_timedouts.begin();
520
521       while(ii != a_timedouts.end()) {
522         handler = Communicator::handler(ii);
523
524         if(handler->isTimeout(now) == true) {
525           LOGWARNING(
526             string msg(handler->asString());
527             msg += " | Closed due to inactivity";
528             Logger::warning(msg, ANNA_FILE_LOCATION);
529           );
530           detach(handler);
531           ii = a_timedouts.begin();
532         } else
533           ii ++;
534       }
535
536       maxTime = a_timeout;
537       maxTime += now;
538     }
539   }
540 }
541
542 void Communicator::multithreadedAccept()
543 throw(RuntimeException) {
544   LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "multithreadedAccept", ANNA_FILE_LOCATION));
545   {
546     Guard guard(this, "comm::Communicator::multithreadedAccept");
547     Handler* handler;
548
549     for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++) {
550       handler = Communicator::handler(ii);
551       LOGDEBUG(
552         string msg("Starting | ");
553         msg += handler->asString();
554         Logger::debug(msg, ANNA_FILE_LOCATION)
555       );
556       a_threadManager->createThread()->start(*handler);
557     }
558   }
559   Millisecond delay(500);
560
561   while(a_requestedStop == false) {
562     anna::functions::sleep(delay);
563
564     if(a_requestedStop == true)
565       break;
566
567     if(a_connectionRecover->isRunning() == false)
568       continue;
569
570     {
571       Guard guard(this, "comm::Communicator::multithreadedAccept (ConnectionRecover)");
572       a_connectionRecover->tryRecover();
573     }
574   }
575 }
576
577 void Communicator::requestStop()
578 throw() {
579   if(a_requestedStop == true)
580     return;
581
582   Guard guard(this, "comm::Communicator::requestStop");
583
584   if(a_requestedStop == true)
585     return;
586
587   a_requestedStop = true;
588
589   try  {
590     for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++)
591       handler(ii)->requestStop();
592   } catch(RuntimeException& ex) {
593     ex.trace();
594   }
595
596   LOGWARNING(
597     string msg("comm::Communicator::requestStop | ");
598     msg += asString();
599     Logger::warning(msg, ANNA_FILE_LOCATION);
600   );
601 }
602
603 bool Communicator::isUsable(const ClientSocket* clientSocket)
604 throw() {
605   if(clientSocket == NULL)
606     return false;
607
608   Guard guard(this, "comm::Communicator::isUsable");
609   Handler* handler = find(clientSocket->getfd());
610
611   if(handler == NULL)
612     return false;
613
614   const Handler::Type::_v type = handler->getType();
615   return (type == Handler::Type::LocalConnection || type == Handler::Type::RemoteConnection);
616 }
617
618 void Communicator::setStatus(const Status& status)
619 throw() {
620   Guard guard(this, "comm::Communicator::setStatus");
621
622   if(a_status != status)
623     a_status = status;
624
625   LOGINFORMATION(
626     string msg("comm::Communicator::setStatus | ");
627     msg += a_status.asString();
628     Logger::information(msg, ANNA_FILE_LOCATION);
629   );
630 }
631
632 void Communicator::eventBreakAddress(const in_addr_t& address)
633 throw() {
634   Device* device = Network::instantiate().find(address);
635
636   if(device->getStatus() == Device::Status::Down)
637     return;
638
639   LOGWARNING(
640     string msg("comm::Communicator::eventBreakAddress | ");
641     msg += device->asString();
642     Logger::warning(msg, ANNA_FILE_LOCATION);
643   );
644   Guard guard(this, "comm::Communicator::eventBreakAddress");
645   device->setStatus(Device::Status::Down);
646   /**
647    * Trabaja sobre una copia para no perder la referencia cuando se elimine un miembro de la lista original
648    *
649    * En la lista a borrar sólo mete los handler::ServerSocket y los handler::RemoteConnection, ya que los handler::LocalConnection
650    * será liberados recursivamente cuando liberemos los primeros.
651    */
652   typedef vector <comm::Handler*> work_container;
653   typedef work_container::iterator work_iterator;
654   work_container ww;
655
656   for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++) {
657     comm::Handler* handler = comm::Communicator::handler(ii);
658
659     if(dynamic_cast <comm::handler::ServerSocket*>(handler) != NULL) {
660       ww.push_back(handler);
661     } else if(dynamic_cast <comm::handler::RemoteConnection*>(handler) != NULL) {
662       ww.push_back(handler);
663     }
664   }
665
666   for(work_iterator ii = ww.begin(), maxii = ww.end(); ii != maxii; ii ++) {
667     LOGDEBUG(
668       std::string msg("Communicator::eventBreakAddress | ");
669       msg = (*ii)->asString();
670       Logger::debug(msg, ANNA_FILE_LOCATION)
671     );
672     (*ii)->breakAddress(address);
673   }
674 }
675
676 void Communicator::eventRecoverAddress(const in_addr_t& address)
677 throw() {
678   Device* device = Network::instantiate().find(address);
679
680   if(device->getStatus() == Device::Status::Up)
681     return;
682
683   LOGWARNING(
684     string msg("comm::Communicator::eventRecoverAddress | ");
685     msg += device->asString();
686     Logger::warning(msg, ANNA_FILE_LOCATION);
687   );
688   Guard guard(this, "comm::Communicator::eventRecoverAddress");
689   device->setStatus(Device::Status::Up);
690   Handlers backup(a_handlers);
691
692   for(handler_iterator ii = backup.begin(), maxii = backup.end(); ii != maxii; ii ++)
693     handler(ii)->recoverAddress(address);
694 }
695
696 bool Communicator::eventAcceptConnection(const ClientSocket& clientSocket)
697 throw(RuntimeException) {
698   LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventAcceptConnection", ANNA_FILE_LOCATION));
699
700   if(a_requestedStop == true) {
701     LOGWARNING(
702       string msg(Component::asString());
703       msg += " | ";
704       msg += clientSocket.asString();
705       msg += " | Connection rejected due to stop request";
706       Logger::warning(msg, ANNA_FILE_LOCATION);
707     );
708     return false;
709   }
710
711   CongestionController::Workload ww = CongestionController::instantiate().getAccumulatedWorkload();
712
713   if(CongestionController::getLevel(ww) >= a_levelOfDenialService) {
714     LOGWARNING(
715       string msg("comm::Communicator::eventAcceptConnection | Level: ");
716       msg += functions::asString(CongestionController::getLevel(ww));
717       msg += functions::asString(" | Load: %d%% ", CongestionController::getLoad(ww));
718       msg += " | Result: false";
719       Logger::warning(msg, ANNA_FILE_LOCATION);
720     );
721     return false;
722   }
723
724   return true;
725 }
726
727 //--------------------------------------------------------------------------------------------------------
728 // (1) Alguno de los comm::Server asociados al servicio puede haber pasado a activo, pero nos
729 //     interesa como estaba el servicio antes de esta activacion.
730 // (2) Si el servicio es "No critico" => no afecta al estado del proceso.
731 // (3) Solo si todos los servicios "Criticos" estan disponibles pasara a estar "Activo".
732 //--------------------------------------------------------------------------------------------------------
733 void Communicator::eventCreateConnection(const Server* server)
734 throw() {
735   if(server == NULL)
736     return;
737
738   LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventCreateConnection", ANNA_FILE_LOCATION));
739   LOGNOTICE(
740     string msg("comm::Communicator::eventCreateConnection | ");
741     msg += server->asString();
742     Logger::notice(msg, ANNA_FILE_LOCATION);
743   );
744   bool recoverService = false;
745
746   for(Server::const_iterator ii = server->begin(), maxii = server->end(); ii != maxii; ii ++) {
747     Service* service = const_cast <Service*>(Server::service(ii));
748
749     if(service->wasAvailable() == true) {                                             // (1)
750       service->recover(server);
751       continue;
752     }
753
754     service->recover(server);
755     eventCreateConnection(service);
756     recoverService = true;
757   }
758
759   if(recoverService == false || a_status == Status::Available)
760     return;
761
762   Guard guard(this, "comm::Communicator::eventCreateConnection");
763   Status status(Status::Available);
764
765   for(const_service_iterator ii = service_begin(), maxii = service_end(); ii != maxii; ii ++) {
766     const Service* service = Communicator::service(ii);
767
768     if(service->isCritical() == false)                                               // (2)
769       continue;
770
771     if(service->isAvailable() == false) {
772       status = Status::Unavailable;
773       break;
774     }
775   }
776
777   setStatus(status);                                                                  // (3)
778 }
779
780 void Communicator::eventCreateConnection(const Service* service)
781 throw() {
782   if(service == NULL)
783     return;
784
785   LOGNOTICE(
786     string msg("comm::Communicator::eventCreateConnection | ");
787     msg += service->asString();
788     Logger::notice(msg, ANNA_FILE_LOCATION);
789   );
790 }
791
792 /*
793  * Se invoca desde handler::RemoteConnection:[Tx] -> Communicator
794  */
795 void Communicator::eventBreakConnection(const Server* server)
796 throw() {
797   if(server == NULL)
798     return;
799
800   LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventBreakConnection (server)", ANNA_FILE_LOCATION));
801   LOGWARNING(
802     string msg("comm::Communicator::eventBreakConnection | ");
803     msg += server->asString();
804     Logger::warning(msg, ANNA_FILE_LOCATION);
805   );
806   //Guard guard (this, "comm::Communicator::eventBreakConnection");
807   Status status(a_status);
808
809   for(Server::const_iterator ii = server->begin(), maxii = server->end(); ii != maxii; ii ++) {
810     Service* service = const_cast <Service*>(Server::service(ii));
811     service->fault(server);
812
813     if(service->isAvailable() == true)
814       continue;
815
816     eventBreakConnection(service);
817
818     if(status == Status::Available && service->isCritical() == true)
819       status = Status::Unavailable;
820   }
821
822   setStatus(status);
823 }
824
825 void Communicator::eventBreakConnection(const Service* service)
826 throw() {
827   if(service == NULL)
828     return;
829
830   LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventBreakConnection (service)", ANNA_FILE_LOCATION));
831   LOGWARNING(
832     string msg("comm::Communicator::eventBreakConnection | ");
833     msg += service->asString();
834     Logger::warning(msg, ANNA_FILE_LOCATION);
835   );
836 }
837
838 void Communicator::eventShutdown()
839 throw() {
840   LOGWARNING(
841     string msg("comm::Communicator::eventShutdown | ");
842     msg += asString();
843     Logger::warning(msg, ANNA_FILE_LOCATION);
844   );
845   setStatus(Status::Unavailable);
846 #ifndef _MT
847   Handlers backup(a_handlers);
848
849   for(handler_iterator ii = backup.begin(), maxii = backup.end(); ii != maxii; ii ++)
850     detach(handler(ii));
851
852 #else
853
854   try {
855     a_threadManager->join();
856   } catch(RuntimeException& ex) {
857     ex.trace();
858   }
859
860 #endif
861 }
862
863 std::string Communicator::asString() const
864 throw() {
865   string result("comm::Communicator { ");
866   result += Component::asString();
867   result += " | RequestedStop: ";
868   result += anna::functions::asString(a_requestedStop);
869   result += " | ";
870   result += a_status.asString();
871   result += anna::functions::asString(" | Handlers: %d", a_handlers.size());
872   result += anna::functions::asString(" | RecoveryTime: %d ms", a_recoveryTime.getValue());
873   return result += " }";
874 }
875
876 xml::Node* Communicator::asXML(xml::Node* parent) const
877 throw() {
878   parent = app::Component::asXML(parent);
879   xml::Node* result = parent->createChild("comm.Communicator");
880   result->createAttribute("RequestedStop", anna::functions::asString(a_requestedStop));
881   result->createAttribute("RecoveryTime", a_recoveryTime);
882   result->createAttribute("TryingConnectionTime", a_tryingConnectionTime);
883   result->createAttribute("Timeout", a_timeout);
884   result->createAttribute("Status", a_status.asString());
885   result->createAttribute("Mode", (a_workMode == WorkMode::Single) ? "Single" : "Clone");
886   result->createAttribute("LevelOfDenialService", a_levelOfDenialService);
887   Network::instantiate().asXML(result);
888   xml::Node* node = result->createChild("comm.Handlers");
889
890   for(const_handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++)
891     handler(ii)->asXML(node);
892
893   node = result->createChild("comm.Services");
894
895   for(const_service_iterator ii = service_begin(), maxii = service_end(); ii != maxii; ii ++)
896     service(ii)->asXML(node);
897
898   a_connectionRecover->asXML(result);
899   CongestionController& congestionController = CongestionController::instantiate();
900   congestionController.asXML(result);
901   return result;
902 }
903
904 /*
905  * Se invoca desde app::Application::clone -> app::Component::do_cloneParent (ojo EN EL PROCESO ORIGINAL).
906  * Ofrece la posiblidad de que el componente del proceso original liberen los recursos que no va
907  * a usar.
908  *
909  * Cada vez que se clona tiene que sacar de su comprobacion las conexiones que ha sufrido. Ya que de otro
910  * modo podria estar tratantado contestaciones a peticiones realizadas desde los procesos hijos => estos
911  * nunca recibirian las respuestas a sus peticiones.
912  *
913  * Estas conexiones no se pueden cerrar porque deben mantenerse abiertas para que los procesos hijos las
914  * hereden, solo es que este proceso no debe atenderlas.
915  *
916  * El codigo del servidor (el proceso original) no hace nada con ese Socket, lo cierra porque ya lo ha duplicado
917  * el proceso hijo y sigue atendiendo peticiones de conexion.
918  */
919 void Communicator::do_cloneParent()
920 throw(RuntimeException) {
921   LOGMETHOD(TraceMethod traceMethod("comm::Communicator", "do_cloneParent", ANNA_FILE_LOCATION));
922   Handler* handler;
923   Handler::Type::_v type;
924
925   for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++) {
926     handler = Communicator::handler(ii);
927     type = handler->getType();
928
929     if(type == Handler::Type::RemoteConnection || type == Handler::Type::ClientSocket)
930       a_poll->erase(handler->getfd());
931   }
932
933   try {
934     handler = a_mainHandler;
935     a_mainHandler = NULL;
936     detach(handler);
937   } catch(RuntimeException& ex) {
938     ex.trace();
939   }
940 }
941
942 /*
943  * Se invoca desde app::Application::clone -> app::Component::do_cloneChild (ojo EN EL NUEVO PROCESO).
944  * Ofrece la posiblidad de que los componentes que acaban de ser duplicados liberen los recursos que no van
945  * a usar, ya que la copia solo se dedicara a tratar los mensajes que entren por su conexion asociada y
946  * las respuestas a las peticiones originadas en el tratamiento del mismo.
947  *
948  * (1) Recordar que el handler que tiene asociado el clon esta registrado para evitar su cierre.
949  * (3) Limpia la lista porque de otro modo si el proceso clon hiciera nuevas conexiones podria encontrar
950  *     que el fd ya esta guardado en la lista. No se puede invocar directamente al detach, porque este
951  *     metodo modifica la lista a_handlers, y nos haria perder la cuenta del iterator.
952  * (4) Realiza un duplicado fisico de las conexiones realizadas por el proceso original, elimina el fd de
953  *     la lista a comprobar, este fd sera cerrado por el metodo Handler::clone.
954  * (5) Registra el Handler por el que ha sido creado este nuevo proceso, asi que cuando se invoque al detach
955  *     con ese handler => el programa terminara la ejecucion.
956  */
957 void Communicator::do_cloneChild()
958 throw() {
959   LOGMETHOD(TraceMethod traceMethod("comm::Communicator", "do_cloneChild", ANNA_FILE_LOCATION));
960   LOGINFORMATION(
961     string msg("comm::Communicator::do_cloneChild | MainHandler: ");
962     msg += a_mainHandler->asString();
963     Logger::information(msg, ANNA_FILE_LOCATION);
964   );
965   Handler* handler(NULL);
966   vector <Handler*> toDetach;
967
968   for(handler_iterator ii = handler_begin(); ii != handler_end(); ii ++) {                        // (1)
969     handler = Communicator::handler(ii);
970
971     if(handler == a_mainHandler)
972       continue;
973
974     switch(handler->getType()) {
975     case Handler::Type::Custom:
976     case Handler::Type::RemoteConnection:
977     case Handler::Type::ClientSocket:
978       a_poll->erase(handler->getfd());                                                         // (4)
979       LOGDEBUG(
980         string msg("comm::Communicator::do_cloneChild | ");
981         msg += handler->asString();
982         Logger::debug(msg, ANNA_FILE_LOCATION);
983       );
984       handler->clone();
985       a_poll->insert(handler->getfd());
986       break;
987     default:
988       toDetach.push_back(handler);                                                             // (3)
989       break;
990     }
991   }
992
993   for(vector <Handler*>::iterator ii = toDetach.begin(), maxii = toDetach.end(); ii != maxii; ii ++)
994     detach(*ii);
995
996   toDetach.clear();
997
998   try {
999     singlethreadedAccept();
1000   } catch(RuntimeException& ex) {
1001     ex.trace();
1002   }
1003
1004   exit(0);                                                                                       // (5)
1005 }
1006
1007 int Communicator::SortByFileDescriptor::value(const Handler* handler)
1008 throw() {
1009   return handler->getfd();
1010 }
1011
1012