Suuports clang compiler
[anna.git] / source / comm / Communicator.cpp
1 // ANNA - Anna is Not Nothingness Anymore
2 //
3 // (c) Copyright 2005-2014 Eduardo Ramos Testillano & Francisco Ruiz Rayo
4 //
5 // http://redmine.teslayout.com/projects/anna-suite
6 //
7 // Redistribution and use in source and binary forms, with or without
8 // modification, are permitted provided that the following conditions
9 // are met:
10 //
11 //     * Redistributions of source code must retain the above copyright
12 // notice, this list of conditions and the following disclaimer.
13 //     * Redistributions in binary form must reproduce the above
14 // copyright notice, this list of conditions and the following disclaimer
15 // in the documentation and/or other materials provided with the
16 // distribution.
17 //     *  Neither the name of the copyright holder nor the names of its
18 // contributors may be used to endorse or promote products derived from
19 // this software without specific prior written permission.
20 //
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 //
33 // Authors: eduardo.ramos.testillano@gmail.com
34 //          cisco.tierra@gmail.com
35
36
37 #include <poll.h>
38 #include <time.h>
39 #include <errno.h>
40 #include <signal.h>
41 #include <string.h>
42 #include <sys/ioctl.h>
43
44 #include <algorithm>
45
46 #include <anna/core/tracing/Logger.hpp>
47 #include <anna/core/tracing/TraceMethod.hpp>
48 #include <anna/core/mt/Guard.hpp>
49 #include <anna/core/util/Average.hpp>
50 #include <anna/core/mt/ThreadManager.hpp>
51 #include <anna/core/mt/Thread.hpp>
52 #include <anna/core/util/Microsecond.hpp>
53 #include <anna/core/util/defines.hpp>
54
55 #include <anna/comm/comm.hpp>
56 #include <anna/comm/internal/sccs.hpp>
57 #include <anna/comm/internal/BinderSocket.hpp>
58 #include <anna/comm/internal/RemoteConnection.hpp>
59 #include <anna/comm/handler/Manager.hpp>
60
61 // ST
62 #include <anna/comm/internal/Poll.hpp>
63
64 // MT
65 #include <anna/core/mt/ThreadManager.hpp>
66
67 #include <anna/xml/Node.hpp>
68 #include <anna/xml/Attribute.hpp>
69
70 using namespace std;
71 using namespace anna;
72
73 // static
74 const Millisecond comm::Communicator::MinRecoveryTime(1000);
75 const Millisecond comm::Communicator::DefaultRecoveryTime(5000);
76 const Millisecond comm::Communicator::MaxRecoveryTime(30000);
77 const Millisecond comm::Communicator::MinTryingConnectionTime(100);
78 const Millisecond comm::Communicator::DefaultTryingConnectionTime(200);
79 const Millisecond comm::Communicator::MaxTryingConnectionTime(1000);
80 const Millisecond comm::Communicator::DefaultTimeout(10 * 60 * 1000); // 10 minutos.
81
82 //static
83 Millisecond comm::Communicator::st_ReceivingChunkSize(comm::Communicator::DefaultChunkSize);
84
85 Communicator::Communicator(const Communicator::WorkMode::_v workMode) :
86   Component(getClassName()),
87 #ifdef _MT
88   a_workMode(WorkMode::Single),
89 #else
90   a_workMode(workMode),
91 #endif
92   a_recoveryTime(DefaultRecoveryTime),
93   a_requestedStop(false),
94   a_status(Status::Available),
95   a_isServing(false),
96   a_poll(NULL),
97   a_threadManager(NULL),
98   a_timeout(DefaultTimeout),
99   a_mainHandler(NULL),
100   a_tryingConnectionTime(DefaultTryingConnectionTime) {
101   WHEN_SINGLETHREAD(a_poll = new Poll);
102   WHEN_MULTITHREAD(
103     a_threadManager = new ThreadManager("comm::Communicator::ThreadManager", ThreadManager::Mode::Unlimit, 0)
104   );
105   handler::Manager::instantiate().initialize(this);
106   a_connectionRecover = new comm::ConnectionRecover(this);
107   a_levelOfDenialService = comm::CongestionController::MaxLevel - 1;
108   sigignore(SIGPIPE);
109   comm::sccs::activate();
110 }
111
112 /*virtual*/
113 Communicator::~Communicator() {
114   delete a_poll;
115   delete a_threadManager;
116   delete a_connectionRecover;
117 }
118
119 void Communicator::setRecoveryTime(const Millisecond &recoveryTime)
120 throw(RuntimeException) {
121   if(recoveryTime < MinRecoveryTime || recoveryTime > MaxRecoveryTime) {
122     string msg("comm::Communicator::setRecoveryTime | ");
123     msg += functions::asString("RecoveryTime (%d ms) must be between %d ms and %d ms", recoveryTime.getValue(), MinRecoveryTime.getValue(), MaxRecoveryTime.getValue());
124     throw RuntimeException(msg, ANNA_FILE_LOCATION);
125   }
126
127   a_recoveryTime = recoveryTime;
128 }
129
130 void Communicator::setTryingConnectionTime(const Millisecond &tryingConnectionTime)
131 throw(RuntimeException) {
132   if(tryingConnectionTime < MinTryingConnectionTime || tryingConnectionTime > MaxTryingConnectionTime) {
133     string msg("comm::Communicator::setTryingConnectionTime | ");
134     msg += functions::asString("TryingConnectionTime (%d ms) must be between %d ms and %d ms", tryingConnectionTime.getValue(), MinTryingConnectionTime.getValue(), MaxTryingConnectionTime.getValue());
135     throw RuntimeException(msg, ANNA_FILE_LOCATION);
136   }
137
138   a_tryingConnectionTime = tryingConnectionTime;
139 }
140
141 //static
142 void Communicator::setReceivingChunkSize(const int receivingChunkSize)
143 throw(RuntimeException) {
144   if(receivingChunkSize < MinReceivingChunkSize || receivingChunkSize > MaxReceivingChunkSize) {
145     string msg("comm::Communicator::setReceivingChunkSize | ");
146     msg += functions::asString("ReceivingChunkSize (%d bytes) must be between %d bytes and %d bytes", receivingChunkSize, MinReceivingChunkSize, MaxReceivingChunkSize);
147     throw RuntimeException(msg, ANNA_FILE_LOCATION);
148   }
149
150   st_ReceivingChunkSize = receivingChunkSize;
151   CongestionController::instantiate().setMaxPendingBytes(receivingChunkSize);
152 }
153
154 void Communicator::setLevelOfDenialService(const int levelOfDenialService)
155 throw(RuntimeException) {
156   const int min(comm::CongestionController::MaxLevel - 2);
157   const int max(comm::CongestionController::MaxLevel);
158
159   if(levelOfDenialService < min || levelOfDenialService > max) {
160     string msg("comm::Communicator::setTryingConnectionTime | ");
161     msg += functions::asString("LevelOfDenialService %d must be between %d and %d", levelOfDenialService, min, max);
162     throw RuntimeException(msg, ANNA_FILE_LOCATION);
163   }
164
165   a_levelOfDenialService = levelOfDenialService;
166 }
167
168 void Communicator::attach(ServerSocket* serverSocket)
169 throw(RuntimeException) {
170   if(serverSocket == NULL)
171     throw RuntimeException("Cannot attach a NULL comm::ServerSocket", ANNA_FILE_LOCATION);
172
173   Guard guard(this, "comm::Communicator::attach (ServerSocket)");
174   Handler* handler = handler::Manager::instantiate().createHandler(serverSocket);
175   insert(handler);
176   LOGDEBUG(
177     string msg("comm::Communicator::attach (ServerSocket) | ");
178     msg += handler->asString();
179     Logger::debug(msg, ANNA_FILE_LOCATION);
180   );
181
182   if(serverSocket->isSharedBind() == true && serverSocket->getBinderSocket() != NULL)
183     attach(serverSocket->getBinderSocket());
184 }
185
186 void Communicator::attach(BinderSocket* binderSocket)
187 throw(RuntimeException) {
188   if(binderSocket == NULL)
189     throw RuntimeException("Cannot attach a NULL comm::BinderSocket", ANNA_FILE_LOCATION);
190
191   Guard guard(this, "comm::Communicator::attach (BinderSocket)");
192   Handler* handler = handler::Manager::instantiate().createHandler(binderSocket);
193   insert(handler);
194   LOGDEBUG(
195     string msg("comm::Communicator::attach (BinderSocket) | ");
196     msg += handler->asString();
197     Logger::debug(msg, ANNA_FILE_LOCATION);
198   );
199 }
200
201 /**
202  * Comienza a tratar la conexion que hacen a un comm::ServerSocket de este proceso, desde algun otro proceso.
203  *
204  * Se invoca desde  comm::handler::ServerSocket::accept [Tx] -> <null>
205  */
206 void Communicator::attach(LocalConnection* localConnection)
207 throw(RuntimeException) {
208   if(localConnection == NULL)
209     throw RuntimeException("Cannot attach a NULL comm::LocalConnection", ANNA_FILE_LOCATION);
210
211   /*
212    * Obtiene la información/bloquea al ClientSocket para asegurar el orden correcto, ya que de otro modo,
213    * se podría producir un interbloqueo Communicator & ClientSocket.
214    *
215    * Recordar que el handler::MetaClientSocket::apply sólo bloquea el ClientSocket sobre el que actúa y luego
216    * y si detecta el cierre,  desbloquea el ClientSocket y bloquea el Communicator. Así que para mantener
217    * el orden correcto hay que invocar al comm::ClientSocket::getTransport (que establece una SCCS sobre él) antes
218    * de bloquea el Communicator.
219    */
220   ClientSocket* clientSocket = localConnection->getClientSocket();
221
222   if(clientSocket == NULL)
223     throw RuntimeException("comm::Communicator::attach (LocalConnection) | ClientSocket can not be NULL", ANNA_FILE_LOCATION);
224
225   // Todavía no está corriendo el thread que se encargará de éste comm::ClientSocket => podemos acceder a él sin SSCC.
226   const Transport* transport = clientSocket->unsafe_reserveTransport();
227   Guard guard(this, "comm::Communicator::attach (LocalConnection)");
228   Handler* handler = handler::Manager::instantiate().createHandler(localConnection);
229   insert(handler);
230
231   if(((transport == NULL) ? true : transport->enableTimeout()) == true)
232     handler->setTimeout(a_timeout);
233
234   WHEN_SINGLETHREAD(
235
236     if(a_workMode == WorkMode::Single && handler->supportTimeout() == true)
237     a_timedouts.add(handler)
238   );
239
240   LOGDEBUG(
241     string msg("comm::Communicator::attach | ");
242     msg += handler->asString();
243     Logger::debug(msg, ANNA_FILE_LOCATION);
244   );
245
246   if(a_workMode == WorkMode::Clone) {
247     a_mainHandler = handler;
248     app::Application& app = app::functions::getApp().clone();
249   }
250 }
251
252 /*
253  * Este método sólo se invoca desde comm::Server::connect y este método los primero que hace
254  * es bloquear el acceso al comunicador, para asegurar el orden correcto de bloqueo a l hora
255  * de tratar la desconexión.
256  *
257  * Por eso no hace falta establecer la sección crítica que habitualmente se establece en todos
258  * los métodos Communicator::attach.
259  */
260 void Communicator::attach(RemoteConnection* remoteConnection)
261 throw(RuntimeException) {
262   if(remoteConnection == NULL)
263     throw RuntimeException("Cannot attach a NULL comm::RemoteConnection", ANNA_FILE_LOCATION);
264
265 //   Guard guard (this, "comm::Communicator::attach (RemoteConnection)");
266   Handler* handler = handler::Manager::instantiate().createHandler(remoteConnection);
267   insert(handler);
268   LOGDEBUG(
269     string msg("comm::Communicator::attach | ");
270     msg += handler->asString();
271     Logger::debug(msg, ANNA_FILE_LOCATION);
272   );
273 }
274
275 void Communicator::attach(ClientSocket* socket)
276 throw(RuntimeException) {
277   if(socket == NULL)
278     throw RuntimeException("Cannot attach a NULL comm::ClientSocket", ANNA_FILE_LOCATION);
279
280   Guard guard(this, "comm::Communicator::attach (ClientSocket)");
281   Handler* handler = handler::Manager::instantiate().createHandler(socket);
282   insert(handler);
283   LOGDEBUG(
284     string msg("comm::Communicator::attach | ");
285     msg += handler->asString();
286     Logger::debug(msg, ANNA_FILE_LOCATION);
287   );
288 }
289
290 void Communicator::attach(DatagramSocket* socket)
291 throw(RuntimeException) {
292   if(socket == NULL)
293     throw RuntimeException("Cannot attach a NULL comm::DatagramSocket", ANNA_FILE_LOCATION);
294
295   Guard guard(this, "comm::Communicator::attach (DatagramSocket)");
296   Handler* handler = handler::Manager::instantiate().createHandler(socket);
297   insert(handler);
298   LOGDEBUG(
299     string msg("comm::Communicator::attach | ");
300     msg += handler->asString();
301     Logger::debug(msg, ANNA_FILE_LOCATION);
302   );
303 }
304
305 void Communicator::attach(Handler* handler)
306 throw(RuntimeException) {
307   if(handler == NULL)
308     throw RuntimeException("Cannot attach a NULL comm::Handler", ANNA_FILE_LOCATION);
309
310   if(handler->getType() != Handler::Type::Custom)
311     throw RuntimeException("Communicator::attach only accept 'Custom' Handlers", ANNA_FILE_LOCATION);
312
313   Guard guard(this, "comm::Communicator::attach (Handler)");
314   insert(handler);
315   LOGDEBUG(
316     string msg("comm::Communicator::attach (Handler) | ");
317     msg += handler->asString();
318     Logger::debug(msg, ANNA_FILE_LOCATION);
319   );
320 }
321
322 void Communicator::attach(Service* service)
323 throw(RuntimeException) {
324   if(service == NULL)
325     throw RuntimeException("Cannot attach a NULL comm::Service", ANNA_FILE_LOCATION);
326
327   if(std::find(service_begin(), service_end(), service) != service_end())
328     return;
329
330   service->initialize();
331   LOGDEBUG(
332     string msg("comm::Communicator::attach | ");
333     msg += service->asString();
334     Logger::debug(msg, ANNA_FILE_LOCATION);
335   );
336   a_services.push_back(service);
337
338   if(service->isCritical() == true && service->isAvailable() == false)
339     setStatus(Status::Unavailable);
340 }
341
342 void Communicator::insert(Handler* handler)
343 throw(RuntimeException) {
344   handler->initialize();
345
346   if(handler->getfd() < 0) {
347     string msg(handler->asString());
348     msg += " | Cannot attach a Handler with fd < 0";
349     throw RuntimeException(msg, ANNA_FILE_LOCATION);
350   }
351
352   const Handler* other = find(handler->getfd());
353
354   if(other != NULL) {
355     string msg("commm::Comunicator::insert | New: ");
356     msg += handler->asString();
357     msg += " | Collision: ";
358     msg += other->asString();
359     throw RuntimeException(msg, ANNA_FILE_LOCATION);
360   }
361
362   a_handlers.add(handler);
363
364   if(handler->supportTimeout())
365     handler->beat(anna::functions::hardwareClock());
366
367   WHEN_SINGLETHREAD(a_poll->insert(handler->getfd()));
368   WHEN_MULTITHREAD(
369
370     if(a_isServing == true)
371     a_threadManager->createThread()->start(*handler);
372   );
373 }
374
375 void Communicator::detach(ServerSocket* serverSocket)
376 throw() {
377   if(serverSocket == NULL)
378     return;
379
380   comm::BinderSocket* binderSocket = serverSocket->getBinderSocket();
381   detach(find(serverSocket->getfd()));
382
383   if(binderSocket != NULL)
384     detach(binderSocket);
385 }
386
387 void Communicator::detach(ClientSocket* clientSocket)
388 throw() {
389   if(clientSocket == NULL)
390     return;
391
392   detach(find(clientSocket->getfd()));
393 }
394
395 void Communicator::detach(BinderSocket* binderSocket)
396 throw() {
397   if(binderSocket == NULL)
398     return;
399
400   detach(find(binderSocket->getfd()));
401 }
402
403 /*
404  * Finaliza el trabajo del handler recibido, en este momento NO hay ninguna SSCC establecida.
405  * (1) Si se cierra la conexion con el cliente que al que atencia este proceso clonado => debe terminar la ejecucion.
406  */
407 void Communicator::detach(Handler* handler)
408 throw() {
409   if(handler == NULL)
410     return;
411
412   Guard guard(this, "comm::Communicator::detach");
413   const int fd = handler->getfd();
414   LOGDEBUG(
415     string msg("comm::Communicator::detach (Handler) | ");
416     msg += handler->asString();
417     Logger::debug(msg, ANNA_FILE_LOCATION);
418   );
419
420   if(anna::functions::supportMultithread() == true)
421     handler->requestStop();
422   else if(handler->supportTimeout() == true)
423     a_timedouts.erase(handler);
424
425   handler->finalize();
426   WHEN_SINGLETHREAD(a_poll->erase(fd););
427
428   if(a_handlers.erase(handler) == false) {
429     LOGWARNING(
430       string msg(functions::asText("Handler: ", fd));
431       msg += " | Not found";
432       Logger::warning(msg, ANNA_FILE_LOCATION);
433     );
434   }
435
436   if(a_workMode == WorkMode::Clone && handler == a_mainHandler)                                   // (1)
437     requestStop();
438
439   handler::Manager::instantiate().releaseHandler(handler);
440 }
441
442 const Handler* Communicator::getHandler(const ClientSocket& clientSocket)
443 throw(RuntimeException) {
444   Guard guard(this, "comm::Communicator::getHandler");
445   Handler* result = find(clientSocket.getfd());
446
447   if(result != NULL) {
448     switch(result->getType()) {
449     case Handler::Type::ServerSocket:
450     case Handler::Type::BinderSocket:
451     case Handler::Type::Custom:
452       result = NULL;
453       break;
454     default: break;
455     }
456   }
457
458   return result;
459 }
460
461 //----------------------------------------------------------------------------------------------
462 // (1) Compruebo la solicitud de parada antes y despues de procesar el mensaje para acelar
463 //     la solicitud de la peticion. En otro caso podrian pasar 30 seg desde que se solicita
464 //     hasta que se acepta.
465 // (2) La invocacion de este metodo originara que se aniadan y borren fd's a la lista de
466 // streams pero la forma de recorrer el bucle nos blinda (un poco) de anomalias.
467 //----------------------------------------------------------------------------------------------
468 void Communicator::accept()
469 throw(RuntimeException) {
470   LOGMETHOD(TraceMethod traceMethod("comm::Communicator", "accept", ANNA_FILE_LOCATION));
471
472   if(isServing() == true)
473     throw RuntimeException("Communicator::accept is already invoked", ANNA_FILE_LOCATION);
474
475   a_requestedStop = false;
476
477   if(handler_size() == 0)
478     throw RuntimeException("No socket has been established for sending and/or reception", ANNA_FILE_LOCATION);
479
480   a_isServing = true;
481   eventStartup();
482   LOGINFORMATION(Logger::write(Logger::Information, Component::asString(), "Polling network", ANNA_FILE_LOCATION));
483
484   try {
485     WHEN_SINGLETHREAD(singlethreadedAccept());
486     WHEN_MULTITHREAD(multithreadedAccept());
487   } catch(RuntimeException& ex) {
488     ex.trace();
489   }
490
491   a_isServing = false;
492   eventShutdown();
493 }
494
495 void Communicator::singlethreadedAccept()
496 throw(RuntimeException) {
497   LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "singlethreadedAccept", ANNA_FILE_LOCATION));
498   Handler* handler;
499   Microsecond maxTime;
500   Microsecond now(anna::functions::hardwareClock());
501   int fd;
502   a_poll->setTimeout((a_connectionRecover->isRunning() == true) ? a_recoveryTime : (Millisecond)5000);
503   maxTime = a_timeout;
504   maxTime += now;
505
506   while(a_requestedStop == false) {
507     if(a_connectionRecover->isRunning() == true) {
508       a_connectionRecover->tryRecover();
509       a_poll->setTimeout((a_connectionRecover->isRunning() == true) ? a_recoveryTime : (Millisecond)5000);
510     }
511
512     a_poll->waitMessage();
513
514     if(a_timedouts.size() > 0)
515       now = anna::functions::hardwareClock();
516
517     while((fd = a_poll->fetch()) != -1) {
518       if((handler = find(fd)) == NULL)
519         continue;
520
521       try {
522         if(handler->supportTimeout())
523           handler->beat(now);
524
525         handler->apply();
526       } catch(RuntimeException& ex) {
527         ex.trace();
528       }
529     }
530
531     if(a_pendingClose == true) {
532       a_pendingClose = false;
533
534       // @Eduardo (multiconnection to same address/port); On st, is considered to have one socket pending to be closed
535       for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++)
536 //            if (Communicator::handler (ii)->testClose () == true)
537 //               break;
538         Communicator::handler(ii)->testClose();
539     }
540
541     now = anna::functions::hardwareClock();
542
543     if(a_timedouts.size() == 0)
544       continue;
545
546     if(now > maxTime) {
547       handler_iterator ii = a_timedouts.begin();
548
549       while(ii != a_timedouts.end()) {
550         handler = Communicator::handler(ii);
551
552         if(handler->isTimeout(now) == true) {
553           LOGWARNING(
554             string msg(handler->asString());
555             msg += " | Closed due to inactivity";
556             Logger::warning(msg, ANNA_FILE_LOCATION);
557           );
558           detach(handler);
559           ii = a_timedouts.begin();
560         } else
561           ii ++;
562       }
563
564       maxTime = a_timeout;
565       maxTime += now;
566     }
567   }
568 }
569
570 void Communicator::multithreadedAccept()
571 throw(RuntimeException) {
572   LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "multithreadedAccept", ANNA_FILE_LOCATION));
573   {
574     Guard guard(this, "comm::Communicator::multithreadedAccept");
575     Handler* handler;
576
577     for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++) {
578       handler = Communicator::handler(ii);
579       LOGDEBUG(
580         string msg("Starting | ");
581         msg += handler->asString();
582         Logger::debug(msg, ANNA_FILE_LOCATION)
583       );
584       a_threadManager->createThread()->start(*handler);
585     }
586   }
587   Millisecond delay(500);
588
589   while(a_requestedStop == false) {
590     anna::functions::sleep(delay);
591
592     if(a_requestedStop == true)
593       break;
594
595     if(a_connectionRecover->isRunning() == false)
596       continue;
597
598     {
599       Guard guard(this, "comm::Communicator::multithreadedAccept (ConnectionRecover)");
600       a_connectionRecover->tryRecover();
601     }
602   }
603 }
604
605 void Communicator::requestStop()
606 throw() {
607   if(a_requestedStop == true)
608     return;
609
610   Guard guard(this, "comm::Communicator::requestStop");
611
612   if(a_requestedStop == true)
613     return;
614
615   a_requestedStop = true;
616
617   try  {
618     for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++)
619       handler(ii)->requestStop();
620   } catch(RuntimeException& ex) {
621     ex.trace();
622   }
623
624   LOGWARNING(
625     string msg("comm::Communicator::requestStop | ");
626     msg += asString();
627     Logger::warning(msg, ANNA_FILE_LOCATION);
628   );
629 }
630
631 bool Communicator::isUsable(const ClientSocket* clientSocket)
632 throw() {
633   if(clientSocket == NULL)
634     return false;
635
636   Guard guard(this, "comm::Communicator::isUsable");
637   Handler* handler = find(clientSocket->getfd());
638
639   if(handler == NULL)
640     return false;
641
642   const Handler::Type::_v type = handler->getType();
643   return (type == Handler::Type::LocalConnection || type == Handler::Type::RemoteConnection);
644 }
645
646 void Communicator::setStatus(const Status& status)
647 throw() {
648   Guard guard(this, "comm::Communicator::setStatus");
649
650   if(a_status != status)
651     a_status = status;
652
653   LOGINFORMATION(
654     string msg("comm::Communicator::setStatus | ");
655     msg += a_status.asString();
656     Logger::information(msg, ANNA_FILE_LOCATION);
657   );
658 }
659
660 void Communicator::eventBreakAddress(const in_addr_t& address)
661 throw() {
662   Device* device = Network::instantiate().find(address);
663
664   if(device->getStatus() == Device::Status::Down)
665     return;
666
667   LOGWARNING(
668     string msg("comm::Communicator::eventBreakAddress | ");
669     msg += device->asString();
670     Logger::warning(msg, ANNA_FILE_LOCATION);
671   );
672   Guard guard(this, "comm::Communicator::eventBreakAddress");
673   device->setStatus(Device::Status::Down);
674   /**
675    * Trabaja sobre una copia para no perder la referencia cuando se elimine un miembro de la lista original
676    *
677    * En la lista a borrar sólo mete los handler::ServerSocket y los handler::RemoteConnection, ya que los handler::LocalConnection
678    * será liberados recursivamente cuando liberemos los primeros.
679    */
680   typedef vector <comm::Handler*> work_container;
681   typedef work_container::iterator work_iterator;
682   work_container ww;
683
684   for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++) {
685     comm::Handler* handler = comm::Communicator::handler(ii);
686
687     if(dynamic_cast <comm::handler::ServerSocket*>(handler) != NULL) {
688       ww.push_back(handler);
689     } else if(dynamic_cast <comm::handler::RemoteConnection*>(handler) != NULL) {
690       ww.push_back(handler);
691     }
692   }
693
694   for(work_iterator ii = ww.begin(), maxii = ww.end(); ii != maxii; ii ++) {
695     LOGDEBUG(
696       std::string msg("Communicator::eventBreakAddress | ");
697       msg = (*ii)->asString();
698       Logger::debug(msg, ANNA_FILE_LOCATION)
699     );
700     (*ii)->breakAddress(address);
701   }
702 }
703
704 void Communicator::eventRecoverAddress(const in_addr_t& address)
705 throw() {
706   Device* device = Network::instantiate().find(address);
707
708   if(device->getStatus() == Device::Status::Up)
709     return;
710
711   LOGWARNING(
712     string msg("comm::Communicator::eventRecoverAddress | ");
713     msg += device->asString();
714     Logger::warning(msg, ANNA_FILE_LOCATION);
715   );
716   Guard guard(this, "comm::Communicator::eventRecoverAddress");
717   device->setStatus(Device::Status::Up);
718   Handlers backup(a_handlers);
719
720   for(handler_iterator ii = backup.begin(), maxii = backup.end(); ii != maxii; ii ++)
721     handler(ii)->recoverAddress(address);
722 }
723
724 bool Communicator::eventAcceptConnection(const ClientSocket& clientSocket)
725 throw(RuntimeException) {
726   LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventAcceptConnection", ANNA_FILE_LOCATION));
727
728   if(a_requestedStop == true) {
729     LOGWARNING(
730       string msg(Component::asString());
731       msg += " | ";
732       msg += clientSocket.asString();
733       msg += " | Connection rejected due to stop request";
734       Logger::warning(msg, ANNA_FILE_LOCATION);
735     );
736     return false;
737   }
738
739   CongestionController::Workload ww = CongestionController::instantiate().getAccumulatedWorkload();
740
741   if(CongestionController::getLevel(ww) >= a_levelOfDenialService) {
742     LOGWARNING(
743       string msg("comm::Communicator::eventAcceptConnection | Level: ");
744       msg += functions::asString(CongestionController::getLevel(ww));
745       msg += functions::asString(" | Load: %d%% ", CongestionController::getLoad(ww));
746       msg += " | Result: false";
747       Logger::warning(msg, ANNA_FILE_LOCATION);
748     );
749     return false;
750   }
751
752   return true;
753 }
754
755 //--------------------------------------------------------------------------------------------------------
756 // (1) Alguno de los comm::Server asociados al servicio puede haber pasado a activo, pero nos
757 //     interesa como estaba el servicio antes de esta activacion.
758 // (2) Si el servicio es "No critico" => no afecta al estado del proceso.
759 // (3) Solo si todos los servicios "Criticos" estan disponibles pasara a estar "Activo".
760 //--------------------------------------------------------------------------------------------------------
761 void Communicator::eventCreateConnection(const Server* server)
762 throw() {
763   if(server == NULL)
764     return;
765
766   LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventCreateConnection", ANNA_FILE_LOCATION));
767   LOGNOTICE(
768     string msg("comm::Communicator::eventCreateConnection | ");
769     msg += server->asString();
770     Logger::notice(msg, ANNA_FILE_LOCATION);
771   );
772   bool recoverService = false;
773
774   for(Server::const_iterator ii = server->begin(), maxii = server->end(); ii != maxii; ii ++) {
775     Service* service = const_cast <Service*>(Server::service(ii));
776
777     if(service->wasAvailable() == true) {                                             // (1)
778       service->recover(server);
779       continue;
780     }
781
782     service->recover(server);
783     eventCreateConnection(service);
784     recoverService = true;
785   }
786
787   if(recoverService == false || a_status == Status::Available)
788     return;
789
790   Guard guard(this, "comm::Communicator::eventCreateConnection");
791   Status status(Status::Available);
792
793   for(const_service_iterator ii = service_begin(), maxii = service_end(); ii != maxii; ii ++) {
794     const Service* service = Communicator::service(ii);
795
796     if(service->isCritical() == false)                                               // (2)
797       continue;
798
799     if(service->isAvailable() == false) {
800       status = Status::Unavailable;
801       break;
802     }
803   }
804
805   setStatus(status);                                                                  // (3)
806 }
807
808 void Communicator::eventCreateConnection(const Service* service)
809 throw() {
810   if(service == NULL)
811     return;
812
813   LOGNOTICE(
814     string msg("comm::Communicator::eventCreateConnection | ");
815     msg += service->asString();
816     Logger::notice(msg, ANNA_FILE_LOCATION);
817   );
818 }
819
820 /*
821  * Se invoca desde handler::RemoteConnection:[Tx] -> Communicator
822  */
823 void Communicator::eventBreakConnection(const Server* server)
824 throw() {
825   if(server == NULL)
826     return;
827
828   LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventBreakConnection (server)", ANNA_FILE_LOCATION));
829   LOGWARNING(
830     string msg("comm::Communicator::eventBreakConnection | ");
831     msg += server->asString();
832     Logger::warning(msg, ANNA_FILE_LOCATION);
833   );
834   //Guard guard (this, "comm::Communicator::eventBreakConnection");
835   Status status(a_status);
836
837   for(Server::const_iterator ii = server->begin(), maxii = server->end(); ii != maxii; ii ++) {
838     Service* service = const_cast <Service*>(Server::service(ii));
839     service->fault(server);
840
841     if(service->isAvailable() == true)
842       continue;
843
844     eventBreakConnection(service);
845
846     if(status == Status::Available && service->isCritical() == true)
847       status = Status::Unavailable;
848   }
849
850   setStatus(status);
851 }
852
853 void Communicator::eventBreakConnection(const Service* service)
854 throw() {
855   if(service == NULL)
856     return;
857
858   LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventBreakConnection (service)", ANNA_FILE_LOCATION));
859   LOGWARNING(
860     string msg("comm::Communicator::eventBreakConnection | ");
861     msg += service->asString();
862     Logger::warning(msg, ANNA_FILE_LOCATION);
863   );
864 }
865
866 void Communicator::eventShutdown()
867 throw() {
868   LOGWARNING(
869     string msg("comm::Communicator::eventShutdown | ");
870     msg += asString();
871     Logger::warning(msg, ANNA_FILE_LOCATION);
872   );
873   setStatus(Status::Unavailable);
874 #ifndef _MT
875   Handlers backup(a_handlers);
876
877   for(handler_iterator ii = backup.begin(), maxii = backup.end(); ii != maxii; ii ++)
878     detach(handler(ii));
879
880 #else
881
882   try {
883     a_threadManager->join();
884   } catch(RuntimeException& ex) {
885     ex.trace();
886   }
887
888 #endif
889 }
890
891 std::string Communicator::asString() const
892 throw() {
893   string result("comm::Communicator { ");
894   result += Component::asString();
895   result += " | RequestedStop: ";
896   result += anna::functions::asString(a_requestedStop);
897   result += " | ";
898   result += a_status.asString();
899   result += anna::functions::asString(" | Handlers: %d", a_handlers.size());
900   result += anna::functions::asString(" | RecoveryTime: %d ms", a_recoveryTime.getValue());
901   return result += " }";
902 }
903
904 xml::Node* Communicator::asXML(xml::Node* parent) const
905 throw() {
906   parent = app::Component::asXML(parent);
907   xml::Node* result = parent->createChild("comm.Communicator");
908   result->createAttribute("RequestedStop", anna::functions::asString(a_requestedStop));
909   result->createAttribute("RecoveryTime", a_recoveryTime);
910   result->createAttribute("TryingConnectionTime", a_tryingConnectionTime);
911   result->createAttribute("Timeout", a_timeout);
912   result->createAttribute("Status", a_status.asString());
913   result->createAttribute("Mode", (a_workMode == WorkMode::Single) ? "Single" : "Clone");
914   result->createAttribute("LevelOfDenialService", a_levelOfDenialService);
915   Network::instantiate().asXML(result);
916   xml::Node* node = result->createChild("comm.Handlers");
917
918   for(const_handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++)
919     handler(ii)->asXML(node);
920
921   node = result->createChild("comm.Services");
922
923   for(const_service_iterator ii = service_begin(), maxii = service_end(); ii != maxii; ii ++)
924     service(ii)->asXML(node);
925
926   a_connectionRecover->asXML(result);
927   CongestionController& congestionController = CongestionController::instantiate();
928   congestionController.asXML(result);
929   return result;
930 }
931
932 /*
933  * Se invoca desde app::Application::clone -> app::Component::do_cloneParent (ojo EN EL PROCESO ORIGINAL).
934  * Ofrece la posiblidad de que el componente del proceso original liberen los recursos que no va
935  * a usar.
936  *
937  * Cada vez que se clona tiene que sacar de su comprobacion las conexiones que ha sufrido. Ya que de otro
938  * modo podria estar tratantado contestaciones a peticiones realizadas desde los procesos hijos => estos
939  * nunca recibirian las respuestas a sus peticiones.
940  *
941  * Estas conexiones no se pueden cerrar porque deben mantenerse abiertas para que los procesos hijos las
942  * hereden, solo es que este proceso no debe atenderlas.
943  *
944  * El codigo del servidor (el proceso original) no hace nada con ese Socket, lo cierra porque ya lo ha duplicado
945  * el proceso hijo y sigue atendiendo peticiones de conexion.
946  */
947 void Communicator::do_cloneParent()
948 throw(RuntimeException) {
949   LOGMETHOD(TraceMethod traceMethod("comm::Communicator", "do_cloneParent", ANNA_FILE_LOCATION));
950   Handler* handler;
951   Handler::Type::_v type;
952
953   for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++) {
954     handler = Communicator::handler(ii);
955     type = handler->getType();
956
957     if(type == Handler::Type::RemoteConnection || type == Handler::Type::ClientSocket)
958       a_poll->erase(handler->getfd());
959   }
960
961   try {
962     handler = a_mainHandler;
963     a_mainHandler = NULL;
964     detach(handler);
965   } catch(RuntimeException& ex) {
966     ex.trace();
967   }
968 }
969
970 /*
971  * Se invoca desde app::Application::clone -> app::Component::do_cloneChild (ojo EN EL NUEVO PROCESO).
972  * Ofrece la posiblidad de que los componentes que acaban de ser duplicados liberen los recursos que no van
973  * a usar, ya que la copia solo se dedicara a tratar los mensajes que entren por su conexion asociada y
974  * las respuestas a las peticiones originadas en el tratamiento del mismo.
975  *
976  * (1) Recordar que el handler que tiene asociado el clon esta registrado para evitar su cierre.
977  * (3) Limpia la lista porque de otro modo si el proceso clon hiciera nuevas conexiones podria encontrar
978  *     que el fd ya esta guardado en la lista. No se puede invocar directamente al detach, porque este
979  *     metodo modifica la lista a_handlers, y nos haria perder la cuenta del iterator.
980  * (4) Realiza un duplicado fisico de las conexiones realizadas por el proceso original, elimina el fd de
981  *     la lista a comprobar, este fd sera cerrado por el metodo Handler::clone.
982  * (5) Registra el Handler por el que ha sido creado este nuevo proceso, asi que cuando se invoque al detach
983  *     con ese handler => el programa terminara la ejecucion.
984  */
985 void Communicator::do_cloneChild()
986 throw() {
987   LOGMETHOD(TraceMethod traceMethod("comm::Communicator", "do_cloneChild", ANNA_FILE_LOCATION));
988   LOGINFORMATION(
989     string msg("comm::Communicator::do_cloneChild | MainHandler: ");
990     msg += a_mainHandler->asString();
991     Logger::information(msg, ANNA_FILE_LOCATION);
992   );
993   Handler* handler(NULL);
994   vector <Handler*> toDetach;
995
996   for(handler_iterator ii = handler_begin(); ii != handler_end(); ii ++) {                        // (1)
997     handler = Communicator::handler(ii);
998
999     if(handler == a_mainHandler)
1000       continue;
1001
1002     switch(handler->getType()) {
1003     case Handler::Type::Custom:
1004     case Handler::Type::RemoteConnection:
1005     case Handler::Type::ClientSocket:
1006       a_poll->erase(handler->getfd());                                                         // (4)
1007       LOGDEBUG(
1008         string msg("comm::Communicator::do_cloneChild | ");
1009         msg += handler->asString();
1010         Logger::debug(msg, ANNA_FILE_LOCATION);
1011       );
1012       handler->clone();
1013       a_poll->insert(handler->getfd());
1014       break;
1015     default:
1016       toDetach.push_back(handler);                                                             // (3)
1017       break;
1018     }
1019   }
1020
1021   for(vector <Handler*>::iterator ii = toDetach.begin(), maxii = toDetach.end(); ii != maxii; ii ++)
1022     detach(*ii);
1023
1024   toDetach.clear();
1025
1026   try {
1027     singlethreadedAccept();
1028   } catch(RuntimeException& ex) {
1029     ex.trace();
1030   }
1031
1032   exit(0);                                                                                       // (5)
1033 }
1034
1035 int Communicator::SortByFileDescriptor::value(const Handler* handler)
1036 throw() {
1037   return handler->getfd();
1038 }
1039
1040