First commit
[anna.git] / source / comm / Communicator.cpp
1 // ANNA - Anna is Not 'N' Anymore
2 //
3 // (c) Copyright 2005-2014 Eduardo Ramos Testillano & Francisco Ruiz Rayo
4 //
5 // https://bitbucket.org/testillano/anna
6 //
7 // Redistribution and use in source and binary forms, with or without
8 // modification, are permitted provided that the following conditions
9 // are met:
10 //
11 //     * Redistributions of source code must retain the above copyright
12 // notice, this list of conditions and the following disclaimer.
13 //     * Redistributions in binary form must reproduce the above
14 // copyright notice, this list of conditions and the following disclaimer
15 // in the documentation and/or other materials provided with the
16 // distribution.
17 //     * Neither the name of Google Inc. nor the names of its
18 // contributors may be used to endorse or promote products derived from
19 // this software without specific prior written permission.
20 //
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 //
33 // Authors: eduardo.ramos.testillano@gmail.com
34 //          cisco.tierra@gmail.com
35
36
37 #include <poll.h>
38 #include <time.h>
39 #include <errno.h>
40 #include <signal.h>
41 #include <string.h>
42 #include <sys/ioctl.h>
43
44 #include <algorithm>
45
46 #include <anna/core/tracing/Logger.hpp>
47 #include <anna/core/tracing/TraceMethod.hpp>
48 #include <anna/core/mt/Guard.hpp>
49 #include <anna/core/util/Average.hpp>
50 #include <anna/core/mt/ThreadManager.hpp>
51 #include <anna/core/mt/Thread.hpp>
52 #include <anna/core/util/Microsecond.hpp>
53
54 #include <anna/comm/comm.hpp>
55 #include <anna/comm/internal/sccs.hpp>
56 #include <anna/comm/internal/BinderSocket.hpp>
57 #include <anna/comm/internal/RemoteConnection.hpp>
58 #include <anna/comm/handler/Manager.hpp>
59
60 // ST
61 #include <anna/comm/internal/Poll.hpp>
62
63 // MT
64 #include <anna/core/mt/ThreadManager.hpp>
65
66 #include <anna/xml/Node.hpp>
67 #include <anna/xml/Attribute.hpp>
68
69 using namespace std;
70 using namespace anna;
71
72 // static
73 const Millisecond comm::Communicator::MinRecoveryTime(1000);
74 const Millisecond comm::Communicator::DefaultRecoveryTime(5000);
75 const Millisecond comm::Communicator::MaxRecoveryTime(30000);
76 const Millisecond comm::Communicator::MinTryingConnectionTime(100);
77 const Millisecond comm::Communicator::DefaultTryingConnectionTime(200);
78 const Millisecond comm::Communicator::MaxTryingConnectionTime(1000);
79 const Millisecond comm::Communicator::DefaultTimeout(10 * 60 * 1000); // 10 minutos.
80
81 //static
82 Millisecond comm::Communicator::st_ReceivingChunkSize(comm::Communicator::DefaultChunkSize);
83
84 Communicator::Communicator(const Communicator::WorkMode::_v workMode) :
85   Component(getClassName()),
86 #ifdef _MT
87   a_workMode(WorkMode::Single),
88 #else
89   a_workMode(workMode),
90 #endif
91   a_recoveryTime(DefaultRecoveryTime),
92   a_requestedStop(false),
93   a_status(Status::Available),
94   a_isServing(false),
95   a_poll(NULL),
96   a_threadManager(NULL),
97   a_timeout(DefaultTimeout),
98   a_mainHandler(NULL),
99   a_tryingConnectionTime(DefaultTryingConnectionTime) {
100   WHEN_SINGLETHREAD(a_poll = new Poll);
101   WHEN_MULTITHREAD(
102     a_threadManager = new ThreadManager("comm::Communicator::ThreadManager", ThreadManager::Mode::Unlimit, 0)
103   );
104   handler::Manager::instantiate().initialize(this);
105   a_connectionRecover = new comm::ConnectionRecover(this);
106   a_levelOfDenialService = comm::CongestionController::MaxLevel - 1;
107   sigignore(SIGPIPE);
108   comm::sccs::activate();
109 }
110
111 /*virtual*/
112 Communicator::~Communicator() {
113   delete a_poll;
114   delete a_threadManager;
115   delete a_connectionRecover;
116 }
117
118 void Communicator::setRecoveryTime(const Millisecond &recoveryTime)
119 throw(RuntimeException) {
120   if(recoveryTime < MinRecoveryTime || recoveryTime > MaxRecoveryTime) {
121     string msg("comm::Communicator::setRecoveryTime | ");
122     msg += functions::asString("RecoveryTime (%d ms) must be between %d ms and %d ms", recoveryTime.getValue(), MinRecoveryTime.getValue(), MaxRecoveryTime.getValue());
123     throw RuntimeException(msg, ANNA_FILE_LOCATION);
124   }
125
126   a_recoveryTime = recoveryTime;
127 }
128
129 void Communicator::setTryingConnectionTime(const Millisecond &tryingConnectionTime)
130 throw(RuntimeException) {
131   if(tryingConnectionTime < MinTryingConnectionTime || tryingConnectionTime > MaxTryingConnectionTime) {
132     string msg("comm::Communicator::setTryingConnectionTime | ");
133     msg += functions::asString("TryingConnectionTime (%d ms) must be between %d ms and %d ms", tryingConnectionTime.getValue(), MinTryingConnectionTime.getValue(), MaxTryingConnectionTime.getValue());
134     throw RuntimeException(msg, ANNA_FILE_LOCATION);
135   }
136
137   a_tryingConnectionTime = tryingConnectionTime;
138 }
139
140 //static
141 void Communicator::setReceivingChunkSize(const int receivingChunkSize)
142 throw(RuntimeException) {
143   if(receivingChunkSize < MinReceivingChunkSize || receivingChunkSize > MaxReceivingChunkSize) {
144     string msg("comm::Communicator::setReceivingChunkSize | ");
145     msg += functions::asString("ReceivingChunkSize (%d bytes) must be between %d bytes and %d bytes", receivingChunkSize, MinReceivingChunkSize, MaxReceivingChunkSize);
146     throw RuntimeException(msg, ANNA_FILE_LOCATION);
147   }
148
149   st_ReceivingChunkSize = receivingChunkSize;
150   CongestionController::instantiate().setMaxPendingBytes(receivingChunkSize);
151 }
152
153 void Communicator::setLevelOfDenialService(const int levelOfDenialService)
154 throw(RuntimeException) {
155   const int min(comm::CongestionController::MaxLevel - 2);
156   const int max(comm::CongestionController::MaxLevel);
157
158   if(levelOfDenialService < min || levelOfDenialService > max) {
159     string msg("comm::Communicator::setTryingConnectionTime | ");
160     msg += functions::asString("LevelOfDenialService %d must be between %d and %d", levelOfDenialService, min, max);
161     throw RuntimeException(msg, ANNA_FILE_LOCATION);
162   }
163
164   a_levelOfDenialService = levelOfDenialService;
165 }
166
167 void Communicator::attach(ServerSocket* serverSocket)
168 throw(RuntimeException) {
169   if(serverSocket == NULL)
170     throw RuntimeException("Cannot attach a NULL comm::ServerSocket", ANNA_FILE_LOCATION);
171
172   Guard guard(this, "comm::Communicator::attach (ServerSocket)");
173   Handler* handler = handler::Manager::instantiate().createHandler(serverSocket);
174   insert(handler);
175   LOGDEBUG(
176     string msg("comm::Communicator::attach (ServerSocket) | ");
177     msg += handler->asString();
178     Logger::debug(msg, ANNA_FILE_LOCATION);
179   );
180
181   if(serverSocket->isSharedBind() == true && serverSocket->getBinderSocket() != NULL)
182     attach(serverSocket->getBinderSocket());
183 }
184
185 void Communicator::attach(BinderSocket* binderSocket)
186 throw(RuntimeException) {
187   if(binderSocket == NULL)
188     throw RuntimeException("Cannot attach a NULL comm::BinderSocket", ANNA_FILE_LOCATION);
189
190   Guard guard(this, "comm::Communicator::attach (BinderSocket)");
191   Handler* handler = handler::Manager::instantiate().createHandler(binderSocket);
192   insert(handler);
193   LOGDEBUG(
194     string msg("comm::Communicator::attach (BinderSocket) | ");
195     msg += handler->asString();
196     Logger::debug(msg, ANNA_FILE_LOCATION);
197   );
198 }
199
200 /**
201  * Comienza a tratar la conexion que hacen a un comm::ServerSocket de este proceso, desde algun otro proceso.
202  *
203  * Se invoca desde  comm::handler::ServerSocket::accept [Tx] -> <null>
204  */
205 void Communicator::attach(LocalConnection* localConnection)
206 throw(RuntimeException) {
207   if(localConnection == NULL)
208     throw RuntimeException("Cannot attach a NULL comm::LocalConnection", ANNA_FILE_LOCATION);
209
210   /*
211    * Obtiene la información/bloquea al ClientSocket para asegurar el orden correcto, ya que de otro modo,
212    * se podría producir un interbloqueo Communicator & ClientSocket.
213    *
214    * Recordar que el handler::MetaClientSocket::apply sólo bloquea el ClientSocket sobre el que actúa y luego
215    * y si detecta el cierre,  desbloquea el ClientSocket y bloquea el Communicator. Así que para mantener
216    * el orden correcto hay que invocar al comm::ClientSocket::getTransport (que establece una SCCS sobre él) antes
217    * de bloquea el Communicator.
218    */
219   ClientSocket* clientSocket = localConnection->getClientSocket();
220
221   if(clientSocket == NULL)
222     throw RuntimeException("comm::Communicator::attach (LocalConnection) | ClientSocket can not be NULL", ANNA_FILE_LOCATION);
223
224   // Todavía no está corriendo el thread que se encargará de éste comm::ClientSocket => podemos acceder a él sin SSCC.
225   const Transport* transport = clientSocket->unsafe_reserveTransport();
226   Guard guard(this, "comm::Communicator::attach (LocalConnection)");
227   Handler* handler = handler::Manager::instantiate().createHandler(localConnection);
228   insert(handler);
229
230   if(((transport == NULL) ? true : transport->enableTimeout()) == true)
231     handler->setTimeout(a_timeout);
232
233   WHEN_SINGLETHREAD(
234
235     if(a_workMode == WorkMode::Single && handler->supportTimeout() == true)
236     a_timedouts.add(handler)
237   );
238
239   LOGDEBUG(
240     string msg("comm::Communicator::attach | ");
241     msg += handler->asString();
242     Logger::debug(msg, ANNA_FILE_LOCATION);
243   );
244
245   if(a_workMode == WorkMode::Clone) {
246     a_mainHandler = handler;
247     app::Application& app = app::functions::getApp().clone();
248   }
249 }
250
251 /*
252  * Este método sólo se invoca desde comm::Server::connect y este método los primero que hace
253  * es bloquear el acceso al comunicador, para asegurar el orden correcto de bloqueo a l hora
254  * de tratar la desconexión.
255  *
256  * Por eso no hace falta establecer la sección crítica que habitualmente se establece en todos
257  * los métodos Communicator::attach.
258  */
259 void Communicator::attach(RemoteConnection* remoteConnection)
260 throw(RuntimeException) {
261   if(remoteConnection == NULL)
262     throw RuntimeException("Cannot attach a NULL comm::RemoteConnection", ANNA_FILE_LOCATION);
263
264 //   Guard guard (this, "comm::Communicator::attach (RemoteConnection)");
265   Handler* handler = handler::Manager::instantiate().createHandler(remoteConnection);
266   insert(handler);
267   LOGDEBUG(
268     string msg("comm::Communicator::attach | ");
269     msg += handler->asString();
270     Logger::debug(msg, ANNA_FILE_LOCATION);
271   );
272 }
273
274 void Communicator::attach(ClientSocket* socket)
275 throw(RuntimeException) {
276   if(socket == NULL)
277     throw RuntimeException("Cannot attach a NULL comm::ClientSocket", ANNA_FILE_LOCATION);
278
279   Guard guard(this, "comm::Communicator::attach (ClientSocket)");
280   Handler* handler = handler::Manager::instantiate().createHandler(socket);
281   insert(handler);
282   LOGDEBUG(
283     string msg("comm::Communicator::attach | ");
284     msg += handler->asString();
285     Logger::debug(msg, ANNA_FILE_LOCATION);
286   );
287 }
288
289 void Communicator::attach(DatagramSocket* socket)
290 throw(RuntimeException) {
291   if(socket == NULL)
292     throw RuntimeException("Cannot attach a NULL comm::DatagramSocket", ANNA_FILE_LOCATION);
293
294   Guard guard(this, "comm::Communicator::attach (DatagramSocket)");
295   Handler* handler = handler::Manager::instantiate().createHandler(socket);
296   insert(handler);
297   LOGDEBUG(
298     string msg("comm::Communicator::attach | ");
299     msg += handler->asString();
300     Logger::debug(msg, ANNA_FILE_LOCATION);
301   );
302 }
303
304 void Communicator::attach(Handler* handler)
305 throw(RuntimeException) {
306   if(handler == NULL)
307     throw RuntimeException("Cannot attach a NULL comm::Handler", ANNA_FILE_LOCATION);
308
309   if(handler->getType() != Handler::Type::Custom)
310     throw RuntimeException("Communicator::attach only accept 'Custom' Handlers", ANNA_FILE_LOCATION);
311
312   Guard guard(this, "comm::Communicator::attach (Handler)");
313   insert(handler);
314   LOGDEBUG(
315     string msg("comm::Communicator::attach (Handler) | ");
316     msg += handler->asString();
317     Logger::debug(msg, ANNA_FILE_LOCATION);
318   );
319 }
320
321 void Communicator::attach(Service* service)
322 throw(RuntimeException) {
323   if(service == NULL)
324     throw RuntimeException("Cannot attach a NULL comm::Service", ANNA_FILE_LOCATION);
325
326   if(std::find(service_begin(), service_end(), service) != service_end())
327     return;
328
329   service->initialize();
330   LOGDEBUG(
331     string msg("comm::Communicator::attach | ");
332     msg += service->asString();
333     Logger::debug(msg, ANNA_FILE_LOCATION);
334   );
335   a_services.push_back(service);
336
337   if(service->isCritical() == true && service->isAvailable() == false)
338     setStatus(Status::Unavailable);
339 }
340
341 void Communicator::insert(Handler* handler)
342 throw(RuntimeException) {
343   handler->initialize();
344
345   if(handler->getfd() < 0) {
346     string msg(handler->asString());
347     msg += " | Cannot attach a Handler with fd < 0";
348     throw RuntimeException(msg, ANNA_FILE_LOCATION);
349   }
350
351   const Handler* other = find(handler->getfd());
352
353   if(other != NULL) {
354     string msg("commm::Comunicator::insert | New: ");
355     msg += handler->asString();
356     msg += " | Collision: ";
357     msg += other->asString();
358     throw RuntimeException(msg, ANNA_FILE_LOCATION);
359   }
360
361   a_handlers.add(handler);
362
363   if(handler->supportTimeout())
364     handler->beat(anna::functions::hardwareClock());
365
366   WHEN_SINGLETHREAD(a_poll->insert(handler->getfd()));
367   WHEN_MULTITHREAD(
368
369     if(a_isServing == true)
370     a_threadManager->createThread()->start(*handler);
371   );
372 }
373
374 void Communicator::detach(ServerSocket* serverSocket)
375 throw() {
376   if(serverSocket == NULL)
377     return;
378
379   comm::BinderSocket* binderSocket = serverSocket->getBinderSocket();
380   detach(find(serverSocket->getfd()));
381
382   if(binderSocket != NULL)
383     detach(binderSocket);
384 }
385
386 void Communicator::detach(ClientSocket* clientSocket)
387 throw() {
388   if(clientSocket == NULL)
389     return;
390
391   detach(find(clientSocket->getfd()));
392 }
393
394 void Communicator::detach(BinderSocket* binderSocket)
395 throw() {
396   if(binderSocket == NULL)
397     return;
398
399   detach(find(binderSocket->getfd()));
400 }
401
402 /*
403  * Finaliza el trabajo del handler recibido, en este momento NO hay ninguna SSCC establecida.
404  * (1) Si se cierra la conexion con el cliente que al que atencia este proceso clonado => debe terminar la ejecucion.
405  */
406 void Communicator::detach(Handler* handler)
407 throw() {
408   if(handler == NULL)
409     return;
410
411   Guard guard(this, "comm::Communicator::detach");
412   const int fd = handler->getfd();
413   LOGDEBUG(
414     string msg("comm::Communicator::detach (Handler) | ");
415     msg += handler->asString();
416     Logger::debug(msg, ANNA_FILE_LOCATION);
417   );
418
419   if(anna::functions::supportMultithread() == true)
420     handler->requestStop();
421   else if(handler->supportTimeout() == true)
422     a_timedouts.erase(handler);
423
424   handler->finalize();
425   WHEN_SINGLETHREAD(a_poll->erase(fd););
426
427   if(a_handlers.erase(handler) == false) {
428     LOGWARNING(
429       string msg(functions::asText("Handler: ", fd));
430       msg += " | Not found";
431       Logger::warning(msg, ANNA_FILE_LOCATION);
432     );
433   }
434
435   if(a_workMode == WorkMode::Clone && handler == a_mainHandler)                                   // (1)
436     requestStop();
437
438   handler::Manager::instantiate().releaseHandler(handler);
439 }
440
441 const Handler* Communicator::getHandler(const ClientSocket& clientSocket)
442 throw(RuntimeException) {
443   Guard guard(this, "comm::Communicator::getHandler");
444   Handler* result = find(clientSocket.getfd());
445
446   if(result != NULL) {
447     switch(result->getType()) {
448     case Handler::Type::ServerSocket:
449     case Handler::Type::BinderSocket:
450     case Handler::Type::Custom:
451       result = NULL;
452       break;
453     }
454   }
455
456   return result;
457 }
458
459 //----------------------------------------------------------------------------------------------
460 // (1) Compruebo la solicitud de parada antes y despues de procesar el mensaje para acelar
461 //     la solicitud de la peticion. En otro caso podrian pasar 30 seg desde que se solicita
462 //     hasta que se acepta.
463 // (2) La invocacion de este metodo originara que se aniadan y borren fd's a la lista de
464 // streams pero la forma de recorrer el bucle nos blinda (un poco) de anomalias.
465 //----------------------------------------------------------------------------------------------
466 void Communicator::accept()
467 throw(RuntimeException) {
468   LOGMETHOD(TraceMethod traceMethod("comm::Communicator", "accept", ANNA_FILE_LOCATION));
469
470   if(isServing() == true)
471     throw RuntimeException("Communicator::accept is already invoked", ANNA_FILE_LOCATION);
472
473   a_requestedStop = false;
474
475   if(handler_size() == 0)
476     throw RuntimeException("No socket has been established for sending and/or reception", ANNA_FILE_LOCATION);
477
478   a_isServing = true;
479   eventStartup();
480   LOGINFORMATION(Logger::write(Logger::Information, Component::asString(), "Polling network", ANNA_FILE_LOCATION));
481
482   try {
483     WHEN_SINGLETHREAD(singlethreadedAccept());
484     WHEN_MULTITHREAD(multithreadedAccept());
485   } catch(RuntimeException& ex) {
486     ex.trace();
487   }
488
489   a_isServing = false;
490   eventShutdown();
491 }
492
493 void Communicator::singlethreadedAccept()
494 throw(RuntimeException) {
495   LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "singlethreadedAccept", ANNA_FILE_LOCATION));
496   Handler* handler;
497   Microsecond maxTime;
498   Microsecond now(anna::functions::hardwareClock());
499   int fd;
500   a_poll->setTimeout((a_connectionRecover->isRunning() == true) ? a_recoveryTime : (Millisecond)5000);
501   maxTime = a_timeout;
502   maxTime += now;
503
504   while(a_requestedStop == false) {
505     if(a_connectionRecover->isRunning() == true) {
506       a_connectionRecover->tryRecover();
507       a_poll->setTimeout((a_connectionRecover->isRunning() == true) ? a_recoveryTime : (Millisecond)5000);
508     }
509
510     a_poll->waitMessage();
511
512     if(a_timedouts.size() > 0)
513       now = anna::functions::hardwareClock();
514
515     while((fd = a_poll->fetch()) != -1) {
516       if((handler = find(fd)) == NULL)
517         continue;
518
519       try {
520         if(handler->supportTimeout())
521           handler->beat(now);
522
523         handler->apply();
524       } catch(RuntimeException& ex) {
525         ex.trace();
526       }
527     }
528
529     if(a_pendingClose == true) {
530       a_pendingClose = false;
531
532       // @Eduardo (multiconnection to same address/port); On st, is considered to have one socket pending to be closed
533       for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++)
534 //            if (Communicator::handler (ii)->testClose () == true)
535 //               break;
536         Communicator::handler(ii)->testClose();
537     }
538
539     now = anna::functions::hardwareClock();
540
541     if(a_timedouts.size() == 0)
542       continue;
543
544     if(now > maxTime) {
545       handler_iterator ii = a_timedouts.begin();
546
547       while(ii != a_timedouts.end()) {
548         handler = Communicator::handler(ii);
549
550         if(handler->isTimeout(now) == true) {
551           LOGWARNING(
552             string msg(handler->asString());
553             msg += " | Closed due to inactivity";
554             Logger::warning(msg, ANNA_FILE_LOCATION);
555           );
556           detach(handler);
557           ii = a_timedouts.begin();
558         } else
559           ii ++;
560       }
561
562       maxTime = a_timeout;
563       maxTime += now;
564     }
565   }
566 }
567
568 void Communicator::multithreadedAccept()
569 throw(RuntimeException) {
570   LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "multithreadedAccept", ANNA_FILE_LOCATION));
571   {
572     Guard guard(this, "comm::Communicator::multithreadedAccept");
573     Handler* handler;
574
575     for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++) {
576       handler = Communicator::handler(ii);
577       LOGDEBUG(
578         string msg("Starting | ");
579         msg += handler->asString();
580         Logger::debug(msg, ANNA_FILE_LOCATION)
581       );
582       a_threadManager->createThread()->start(*handler);
583     }
584   }
585   Millisecond delay(500);
586
587   while(a_requestedStop == false) {
588     anna::functions::sleep(delay);
589
590     if(a_requestedStop == true)
591       break;
592
593     if(a_connectionRecover->isRunning() == false)
594       continue;
595
596     {
597       Guard guard(this, "comm::Communicator::multithreadedAccept (ConnectionRecover)");
598       a_connectionRecover->tryRecover();
599     }
600   }
601 }
602
603 void Communicator::requestStop()
604 throw() {
605   if(a_requestedStop == true)
606     return;
607
608   Guard guard(this, "comm::Communicator::requestStop");
609
610   if(a_requestedStop == true)
611     return;
612
613   a_requestedStop = true;
614
615   try  {
616     for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++)
617       handler(ii)->requestStop();
618   } catch(RuntimeException& ex) {
619     ex.trace();
620   }
621
622   LOGWARNING(
623     string msg("comm::Communicator::requestStop | ");
624     msg += asString();
625     Logger::warning(msg, ANNA_FILE_LOCATION);
626   );
627 }
628
629 bool Communicator::isUsable(const ClientSocket* clientSocket)
630 throw() {
631   if(clientSocket == NULL)
632     return false;
633
634   Guard guard(this, "comm::Communicator::isUsable");
635   Handler* handler = find(clientSocket->getfd());
636
637   if(handler == NULL)
638     return false;
639
640   const Handler::Type::_v type = handler->getType();
641   return (type == Handler::Type::LocalConnection || type == Handler::Type::RemoteConnection);
642 }
643
644 void Communicator::setStatus(const Status& status)
645 throw() {
646   Guard guard(this, "comm::Communicator::setStatus");
647
648   if(a_status != status)
649     a_status = status;
650
651   LOGINFORMATION(
652     string msg("comm::Communicator::setStatus | ");
653     msg += a_status.asString();
654     Logger::information(msg, ANNA_FILE_LOCATION);
655   );
656 }
657
658 void Communicator::eventBreakAddress(const in_addr_t& address)
659 throw() {
660   Device* device = Network::instantiate().find(address);
661
662   if(device->getStatus() == Device::Status::Down)
663     return;
664
665   LOGWARNING(
666     string msg("comm::Communicator::eventBreakAddress | ");
667     msg += device->asString();
668     Logger::warning(msg, ANNA_FILE_LOCATION);
669   );
670   Guard guard(this, "comm::Communicator::eventBreakAddress");
671   device->setStatus(Device::Status::Down);
672   /**
673    * Trabaja sobre una copia para no perder la referencia cuando se elimine un miembro de la lista original
674    *
675    * En la lista a borrar sólo mete los handler::ServerSocket y los handler::RemoteConnection, ya que los handler::LocalConnection
676    * será liberados recursivamente cuando liberemos los primeros.
677    */
678   typedef vector <comm::Handler*> work_container;
679   typedef work_container::iterator work_iterator;
680   work_container ww;
681
682   for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++) {
683     comm::Handler* handler = comm::Communicator::handler(ii);
684
685     if(dynamic_cast <comm::handler::ServerSocket*>(handler) != NULL) {
686       ww.push_back(handler);
687     } else if(dynamic_cast <comm::handler::RemoteConnection*>(handler) != NULL) {
688       ww.push_back(handler);
689     }
690   }
691
692   for(work_iterator ii = ww.begin(), maxii = ww.end(); ii != maxii; ii ++) {
693     LOGDEBUG(
694       std::string msg("Communicator::eventBreakAddress | ");
695       msg = (*ii)->asString();
696       Logger::debug(msg, ANNA_FILE_LOCATION)
697     );
698     (*ii)->breakAddress(address);
699   }
700 }
701
702 void Communicator::eventRecoverAddress(const in_addr_t& address)
703 throw() {
704   Device* device = Network::instantiate().find(address);
705
706   if(device->getStatus() == Device::Status::Up)
707     return;
708
709   LOGWARNING(
710     string msg("comm::Communicator::eventRecoverAddress | ");
711     msg += device->asString();
712     Logger::warning(msg, ANNA_FILE_LOCATION);
713   );
714   Guard guard(this, "comm::Communicator::eventRecoverAddress");
715   device->setStatus(Device::Status::Up);
716   Handlers backup(a_handlers);
717
718   for(handler_iterator ii = backup.begin(), maxii = backup.end(); ii != maxii; ii ++)
719     handler(ii)->recoverAddress(address);
720 }
721
722 bool Communicator::eventAcceptConnection(const ClientSocket& clientSocket)
723 throw(RuntimeException) {
724   LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventAcceptConnection", ANNA_FILE_LOCATION));
725
726   if(a_requestedStop == true) {
727     LOGWARNING(
728       string msg(Component::asString());
729       msg += " | ";
730       msg += clientSocket.asString();
731       msg += " | Connection rejected due to stop request";
732       Logger::warning(msg, ANNA_FILE_LOCATION);
733     );
734     return false;
735   }
736
737   CongestionController::Workload ww = CongestionController::instantiate().getAccumulatedWorkload();
738
739   if(CongestionController::getLevel(ww) >= a_levelOfDenialService) {
740     LOGWARNING(
741       string msg("comm::Communicator::eventAcceptConnection | Level: ");
742       msg += functions::asString(CongestionController::getLevel(ww));
743       msg += functions::asString(" | Load: %d%% ", CongestionController::getLoad(ww));
744       msg += " | Result: false";
745       Logger::warning(msg, ANNA_FILE_LOCATION);
746     );
747     return false;
748   }
749
750   return true;
751 }
752
753 //--------------------------------------------------------------------------------------------------------
754 // (1) Alguno de los comm::Server asociados al servicio puede haber pasado a activo, pero nos
755 //     interesa como estaba el servicio antes de esta activacion.
756 // (2) Si el servicio es "No critico" => no afecta al estado del proceso.
757 // (3) Solo si todos los servicios "Criticos" estan disponibles pasara a estar "Activo".
758 //--------------------------------------------------------------------------------------------------------
759 void Communicator::eventCreateConnection(const Server* server)
760 throw() {
761   if(server == NULL)
762     return;
763
764   LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventCreateConnection", ANNA_FILE_LOCATION));
765   LOGNOTICE(
766     string msg("comm::Communicator::eventCreateConnection | ");
767     msg += server->asString();
768     Logger::notice(msg, ANNA_FILE_LOCATION);
769   );
770   bool recoverService = false;
771
772   for(Server::const_iterator ii = server->begin(), maxii = server->end(); ii != maxii; ii ++) {
773     Service* service = const_cast <Service*>(Server::service(ii));
774
775     if(service->wasAvailable() == true) {                                             // (1)
776       service->recover(server);
777       continue;
778     }
779
780     service->recover(server);
781     eventCreateConnection(service);
782     recoverService = true;
783   }
784
785   if(recoverService == false || a_status == Status::Available)
786     return;
787
788   Guard guard(this, "comm::Communicator::eventCreateConnection");
789   Status status(Status::Available);
790
791   for(const_service_iterator ii = service_begin(), maxii = service_end(); ii != maxii; ii ++) {
792     const Service* service = Communicator::service(ii);
793
794     if(service->isCritical() == false)                                               // (2)
795       continue;
796
797     if(service->isAvailable() == false) {
798       status = Status::Unavailable;
799       break;
800     }
801   }
802
803   setStatus(status);                                                                  // (3)
804 }
805
806 void Communicator::eventCreateConnection(const Service* service)
807 throw() {
808   if(service == NULL)
809     return;
810
811   LOGNOTICE(
812     string msg("comm::Communicator::eventCreateConnection | ");
813     msg += service->asString();
814     Logger::notice(msg, ANNA_FILE_LOCATION);
815   );
816 }
817
818 /*
819  * Se invoca desde handler::RemoteConnection:[Tx] -> Communicator
820  */
821 void Communicator::eventBreakConnection(const Server* server)
822 throw() {
823   if(server == NULL)
824     return;
825
826   LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventBreakConnection (server)", ANNA_FILE_LOCATION));
827   LOGWARNING(
828     string msg("comm::Communicator::eventBreakConnection | ");
829     msg += server->asString();
830     Logger::warning(msg, ANNA_FILE_LOCATION);
831   );
832   //Guard guard (this, "comm::Communicator::eventBreakConnection");
833   Status status(a_status);
834
835   for(Server::const_iterator ii = server->begin(), maxii = server->end(); ii != maxii; ii ++) {
836     Service* service = const_cast <Service*>(Server::service(ii));
837     service->fault(server);
838
839     if(service->isAvailable() == true)
840       continue;
841
842     eventBreakConnection(service);
843
844     if(status == Status::Available && service->isCritical() == true)
845       status = Status::Unavailable;
846   }
847
848   setStatus(status);
849 }
850
851 void Communicator::eventBreakConnection(const Service* service)
852 throw() {
853   if(service == NULL)
854     return;
855
856   LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventBreakConnection (service)", ANNA_FILE_LOCATION));
857   LOGWARNING(
858     string msg("comm::Communicator::eventBreakConnection | ");
859     msg += service->asString();
860     Logger::warning(msg, ANNA_FILE_LOCATION);
861   );
862 }
863
864 void Communicator::eventShutdown()
865 throw() {
866   LOGWARNING(
867     string msg("comm::Communicator::eventShutdown | ");
868     msg += asString();
869     Logger::warning(msg, ANNA_FILE_LOCATION);
870   );
871   setStatus(Status::Unavailable);
872 #ifndef _MT
873   Handlers backup(a_handlers);
874
875   for(handler_iterator ii = backup.begin(), maxii = backup.end(); ii != maxii; ii ++)
876     detach(handler(ii));
877
878 #else
879
880   try {
881     a_threadManager->join();
882   } catch(RuntimeException& ex) {
883     ex.trace();
884   }
885
886 #endif
887 }
888
889 std::string Communicator::asString() const
890 throw() {
891   string result("comm::Communicator { ");
892   result += Component::asString();
893   result += " | RequestedStop: ";
894   result += anna::functions::asString(a_requestedStop);
895   result += " | ";
896   result += a_status.asString();
897   result += anna::functions::asString(" | Handlers: %d", a_handlers.size());
898   result += anna::functions::asString(" | RecoveryTime: %d ms", a_recoveryTime.getValue());
899   return result += " }";
900 }
901
902 xml::Node* Communicator::asXML(xml::Node* parent) const
903 throw() {
904   parent = app::Component::asXML(parent);
905   xml::Node* result = parent->createChild("comm.Communicator");
906   result->createAttribute("RequestedStop", anna::functions::asString(a_requestedStop));
907   result->createAttribute("RecoveryTime", a_recoveryTime);
908   result->createAttribute("TryingConnectionTime", a_tryingConnectionTime);
909   result->createAttribute("Timeout", a_timeout);
910   result->createAttribute("Status", a_status.asString());
911   result->createAttribute("Mode", (a_workMode == WorkMode::Single) ? "Single" : "Clone");
912   result->createAttribute("LevelOfDenialService", a_levelOfDenialService);
913   Network::instantiate().asXML(result);
914   xml::Node* node = result->createChild("comm.Handlers");
915
916   for(const_handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++)
917     handler(ii)->asXML(node);
918
919   node = result->createChild("comm.Services");
920
921   for(const_service_iterator ii = service_begin(), maxii = service_end(); ii != maxii; ii ++)
922     service(ii)->asXML(node);
923
924   a_connectionRecover->asXML(result);
925   CongestionController& congestionController = CongestionController::instantiate();
926   congestionController.asXML(result);
927   return result;
928 }
929
930 /*
931  * Se invoca desde app::Application::clone -> app::Component::do_cloneParent (ojo EN EL PROCESO ORIGINAL).
932  * Ofrece la posiblidad de que el componente del proceso original liberen los recursos que no va
933  * a usar.
934  *
935  * Cada vez que se clona tiene que sacar de su comprobacion las conexiones que ha sufrido. Ya que de otro
936  * modo podria estar tratantado contestaciones a peticiones realizadas desde los procesos hijos => estos
937  * nunca recibirian las respuestas a sus peticiones.
938  *
939  * Estas conexiones no se pueden cerrar porque deben mantenerse abiertas para que los procesos hijos las
940  * hereden, solo es que este proceso no debe atenderlas.
941  *
942  * El codigo del servidor (el proceso original) no hace nada con ese Socket, lo cierra porque ya lo ha duplicado
943  * el proceso hijo y sigue atendiendo peticiones de conexion.
944  */
945 void Communicator::do_cloneParent()
946 throw(RuntimeException) {
947   LOGMETHOD(TraceMethod traceMethod("comm::Communicator", "do_cloneParent", ANNA_FILE_LOCATION));
948   Handler* handler;
949   Handler::Type::_v type;
950
951   for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++) {
952     handler = Communicator::handler(ii);
953     type = handler->getType();
954
955     if(type == Handler::Type::RemoteConnection || type == Handler::Type::ClientSocket)
956       a_poll->erase(handler->getfd());
957   }
958
959   try {
960     handler = a_mainHandler;
961     a_mainHandler = NULL;
962     detach(handler);
963   } catch(RuntimeException& ex) {
964     ex.trace();
965   }
966 }
967
968 /*
969  * Se invoca desde app::Application::clone -> app::Component::do_cloneChild (ojo EN EL NUEVO PROCESO).
970  * Ofrece la posiblidad de que los componentes que acaban de ser duplicados liberen los recursos que no van
971  * a usar, ya que la copia solo se dedicara a tratar los mensajes que entren por su conexion asociada y
972  * las respuestas a las peticiones originadas en el tratamiento del mismo.
973  *
974  * (1) Recordar que el handler que tiene asociado el clon esta registrado para evitar su cierre.
975  * (3) Limpia la lista porque de otro modo si el proceso clon hiciera nuevas conexiones podria encontrar
976  *     que el fd ya esta guardado en la lista. No se puede invocar directamente al detach, porque este
977  *     metodo modifica la lista a_handlers, y nos haria perder la cuenta del iterator.
978  * (4) Realiza un duplicado fisico de las conexiones realizadas por el proceso original, elimina el fd de
979  *     la lista a comprobar, este fd sera cerrado por el metodo Handler::clone.
980  * (5) Registra el Handler por el que ha sido creado este nuevo proceso, asi que cuando se invoque al detach
981  *     con ese handler => el programa terminara la ejecucion.
982  */
983 void Communicator::do_cloneChild()
984 throw() {
985   LOGMETHOD(TraceMethod traceMethod("comm::Communicator", "do_cloneChild", ANNA_FILE_LOCATION));
986   LOGINFORMATION(
987     string msg("comm::Communicator::do_cloneChild | MainHandler: ");
988     msg += a_mainHandler->asString();
989     Logger::information(msg, ANNA_FILE_LOCATION);
990   );
991   Handler* handler(NULL);
992   vector <Handler*> toDetach;
993
994   for(handler_iterator ii = handler_begin(); ii != handler_end(); ii ++) {                        // (1)
995     handler = Communicator::handler(ii);
996
997     if(handler == a_mainHandler)
998       continue;
999
1000     switch(handler->getType()) {
1001     case Handler::Type::Custom:
1002     case Handler::Type::RemoteConnection:
1003     case Handler::Type::ClientSocket:
1004       a_poll->erase(handler->getfd());                                                         // (4)
1005       LOGDEBUG(
1006         string msg("comm::Communicator::do_cloneChild | ");
1007         msg += handler->asString();
1008         Logger::debug(msg, ANNA_FILE_LOCATION);
1009       );
1010       handler->clone();
1011       a_poll->insert(handler->getfd());
1012       break;
1013     default:
1014       toDetach.push_back(handler);                                                             // (3)
1015       break;
1016     }
1017   }
1018
1019   for(vector <Handler*>::iterator ii = toDetach.begin(), maxii = toDetach.end(); ii != maxii; ii ++)
1020     detach(*ii);
1021
1022   toDetach.clear();
1023
1024   try {
1025     singlethreadedAccept();
1026   } catch(RuntimeException& ex) {
1027     ex.trace();
1028   }
1029
1030   exit(0);                                                                                       // (5)
1031 }
1032
1033 int Communicator::SortByFileDescriptor::value(const Handler* handler)
1034 throw() {
1035   return handler->getfd();
1036 }
1037
1038