1 // ANNA - Anna is Not 'N' Anymore
3 // (c) Copyright 2005-2014 Eduardo Ramos Testillano & Francisco Ruiz Rayo
5 // https://bitbucket.org/testillano/anna
7 // Redistribution and use in source and binary forms, with or without
8 // modification, are permitted provided that the following conditions
11 // * Redistributions of source code must retain the above copyright
12 // notice, this list of conditions and the following disclaimer.
13 // * Redistributions in binary form must reproduce the above
14 // copyright notice, this list of conditions and the following disclaimer
15 // in the documentation and/or other materials provided with the
17 // * Neither the name of Google Inc. nor the names of its
18 // contributors may be used to endorse or promote products derived from
19 // this software without specific prior written permission.
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33 // Authors: eduardo.ramos.testillano@gmail.com
34 // cisco.tierra@gmail.com
42 #include <sys/ioctl.h>
46 #include <anna/core/tracing/Logger.hpp>
47 #include <anna/core/tracing/TraceMethod.hpp>
48 #include <anna/core/mt/Guard.hpp>
49 #include <anna/core/util/Average.hpp>
50 #include <anna/core/mt/ThreadManager.hpp>
51 #include <anna/core/mt/Thread.hpp>
52 #include <anna/core/util/Microsecond.hpp>
54 #include <anna/comm/comm.hpp>
55 #include <anna/comm/internal/sccs.hpp>
56 #include <anna/comm/internal/BinderSocket.hpp>
57 #include <anna/comm/internal/RemoteConnection.hpp>
58 #include <anna/comm/handler/Manager.hpp>
61 #include <anna/comm/internal/Poll.hpp>
64 #include <anna/core/mt/ThreadManager.hpp>
66 #include <anna/xml/Node.hpp>
67 #include <anna/xml/Attribute.hpp>
73 const Millisecond comm::Communicator::MinRecoveryTime(1000);
74 const Millisecond comm::Communicator::DefaultRecoveryTime(5000);
75 const Millisecond comm::Communicator::MaxRecoveryTime(30000);
76 const Millisecond comm::Communicator::MinTryingConnectionTime(100);
77 const Millisecond comm::Communicator::DefaultTryingConnectionTime(200);
78 const Millisecond comm::Communicator::MaxTryingConnectionTime(1000);
79 const Millisecond comm::Communicator::DefaultTimeout(10 * 60 * 1000); // 10 minutos.
82 Millisecond comm::Communicator::st_ReceivingChunkSize(comm::Communicator::DefaultChunkSize);
84 Communicator::Communicator(const Communicator::WorkMode::_v workMode) :
85 Component(getClassName()),
87 a_workMode(WorkMode::Single),
91 a_recoveryTime(DefaultRecoveryTime),
92 a_requestedStop(false),
93 a_status(Status::Available),
96 a_threadManager(NULL),
97 a_timeout(DefaultTimeout),
99 a_tryingConnectionTime(DefaultTryingConnectionTime) {
100 WHEN_SINGLETHREAD(a_poll = new Poll);
102 a_threadManager = new ThreadManager("comm::Communicator::ThreadManager", ThreadManager::Mode::Unlimit, 0)
104 handler::Manager::instantiate().initialize(this);
105 a_connectionRecover = new comm::ConnectionRecover(this);
106 a_levelOfDenialService = comm::CongestionController::MaxLevel - 1;
108 comm::sccs::activate();
112 Communicator::~Communicator() {
114 delete a_threadManager;
115 delete a_connectionRecover;
118 void Communicator::setRecoveryTime(const Millisecond &recoveryTime)
119 throw(RuntimeException) {
120 if(recoveryTime < MinRecoveryTime || recoveryTime > MaxRecoveryTime) {
121 string msg("comm::Communicator::setRecoveryTime | ");
122 msg += functions::asString("RecoveryTime (%d ms) must be between %d ms and %d ms", recoveryTime.getValue(), MinRecoveryTime.getValue(), MaxRecoveryTime.getValue());
123 throw RuntimeException(msg, ANNA_FILE_LOCATION);
126 a_recoveryTime = recoveryTime;
129 void Communicator::setTryingConnectionTime(const Millisecond &tryingConnectionTime)
130 throw(RuntimeException) {
131 if(tryingConnectionTime < MinTryingConnectionTime || tryingConnectionTime > MaxTryingConnectionTime) {
132 string msg("comm::Communicator::setTryingConnectionTime | ");
133 msg += functions::asString("TryingConnectionTime (%d ms) must be between %d ms and %d ms", tryingConnectionTime.getValue(), MinTryingConnectionTime.getValue(), MaxTryingConnectionTime.getValue());
134 throw RuntimeException(msg, ANNA_FILE_LOCATION);
137 a_tryingConnectionTime = tryingConnectionTime;
141 void Communicator::setReceivingChunkSize(const int receivingChunkSize)
142 throw(RuntimeException) {
143 if(receivingChunkSize < MinReceivingChunkSize || receivingChunkSize > MaxReceivingChunkSize) {
144 string msg("comm::Communicator::setReceivingChunkSize | ");
145 msg += functions::asString("ReceivingChunkSize (%d bytes) must be between %d bytes and %d bytes", receivingChunkSize, MinReceivingChunkSize, MaxReceivingChunkSize);
146 throw RuntimeException(msg, ANNA_FILE_LOCATION);
149 st_ReceivingChunkSize = receivingChunkSize;
150 CongestionController::instantiate().setMaxPendingBytes(receivingChunkSize);
153 void Communicator::setLevelOfDenialService(const int levelOfDenialService)
154 throw(RuntimeException) {
155 const int min(comm::CongestionController::MaxLevel - 2);
156 const int max(comm::CongestionController::MaxLevel);
158 if(levelOfDenialService < min || levelOfDenialService > max) {
159 string msg("comm::Communicator::setTryingConnectionTime | ");
160 msg += functions::asString("LevelOfDenialService %d must be between %d and %d", levelOfDenialService, min, max);
161 throw RuntimeException(msg, ANNA_FILE_LOCATION);
164 a_levelOfDenialService = levelOfDenialService;
167 void Communicator::attach(ServerSocket* serverSocket)
168 throw(RuntimeException) {
169 if(serverSocket == NULL)
170 throw RuntimeException("Cannot attach a NULL comm::ServerSocket", ANNA_FILE_LOCATION);
172 Guard guard(this, "comm::Communicator::attach (ServerSocket)");
173 Handler* handler = handler::Manager::instantiate().createHandler(serverSocket);
176 string msg("comm::Communicator::attach (ServerSocket) | ");
177 msg += handler->asString();
178 Logger::debug(msg, ANNA_FILE_LOCATION);
181 if(serverSocket->isSharedBind() == true && serverSocket->getBinderSocket() != NULL)
182 attach(serverSocket->getBinderSocket());
185 void Communicator::attach(BinderSocket* binderSocket)
186 throw(RuntimeException) {
187 if(binderSocket == NULL)
188 throw RuntimeException("Cannot attach a NULL comm::BinderSocket", ANNA_FILE_LOCATION);
190 Guard guard(this, "comm::Communicator::attach (BinderSocket)");
191 Handler* handler = handler::Manager::instantiate().createHandler(binderSocket);
194 string msg("comm::Communicator::attach (BinderSocket) | ");
195 msg += handler->asString();
196 Logger::debug(msg, ANNA_FILE_LOCATION);
201 * Comienza a tratar la conexion que hacen a un comm::ServerSocket de este proceso, desde algun otro proceso.
203 * Se invoca desde comm::handler::ServerSocket::accept [Tx] -> <null>
205 void Communicator::attach(LocalConnection* localConnection)
206 throw(RuntimeException) {
207 if(localConnection == NULL)
208 throw RuntimeException("Cannot attach a NULL comm::LocalConnection", ANNA_FILE_LOCATION);
211 * Obtiene la información/bloquea al ClientSocket para asegurar el orden correcto, ya que de otro modo,
212 * se podría producir un interbloqueo Communicator & ClientSocket.
214 * Recordar que el handler::MetaClientSocket::apply sólo bloquea el ClientSocket sobre el que actúa y luego
215 * y si detecta el cierre, desbloquea el ClientSocket y bloquea el Communicator. Así que para mantener
216 * el orden correcto hay que invocar al comm::ClientSocket::getTransport (que establece una SCCS sobre él) antes
217 * de bloquea el Communicator.
219 ClientSocket* clientSocket = localConnection->getClientSocket();
221 if(clientSocket == NULL)
222 throw RuntimeException("comm::Communicator::attach (LocalConnection) | ClientSocket can not be NULL", ANNA_FILE_LOCATION);
224 // Todavía no está corriendo el thread que se encargará de éste comm::ClientSocket => podemos acceder a él sin SSCC.
225 const Transport* transport = clientSocket->unsafe_reserveTransport();
226 Guard guard(this, "comm::Communicator::attach (LocalConnection)");
227 Handler* handler = handler::Manager::instantiate().createHandler(localConnection);
230 if(((transport == NULL) ? true : transport->enableTimeout()) == true)
231 handler->setTimeout(a_timeout);
235 if(a_workMode == WorkMode::Single && handler->supportTimeout() == true)
236 a_timedouts.add(handler)
240 string msg("comm::Communicator::attach | ");
241 msg += handler->asString();
242 Logger::debug(msg, ANNA_FILE_LOCATION);
245 if(a_workMode == WorkMode::Clone) {
246 a_mainHandler = handler;
247 app::Application& app = app::functions::getApp().clone();
252 * Este método sólo se invoca desde comm::Server::connect y este método los primero que hace
253 * es bloquear el acceso al comunicador, para asegurar el orden correcto de bloqueo a l hora
254 * de tratar la desconexión.
256 * Por eso no hace falta establecer la sección crítica que habitualmente se establece en todos
257 * los métodos Communicator::attach.
259 void Communicator::attach(RemoteConnection* remoteConnection)
260 throw(RuntimeException) {
261 if(remoteConnection == NULL)
262 throw RuntimeException("Cannot attach a NULL comm::RemoteConnection", ANNA_FILE_LOCATION);
264 // Guard guard (this, "comm::Communicator::attach (RemoteConnection)");
265 Handler* handler = handler::Manager::instantiate().createHandler(remoteConnection);
268 string msg("comm::Communicator::attach | ");
269 msg += handler->asString();
270 Logger::debug(msg, ANNA_FILE_LOCATION);
274 void Communicator::attach(ClientSocket* socket)
275 throw(RuntimeException) {
277 throw RuntimeException("Cannot attach a NULL comm::ClientSocket", ANNA_FILE_LOCATION);
279 Guard guard(this, "comm::Communicator::attach (ClientSocket)");
280 Handler* handler = handler::Manager::instantiate().createHandler(socket);
283 string msg("comm::Communicator::attach | ");
284 msg += handler->asString();
285 Logger::debug(msg, ANNA_FILE_LOCATION);
289 void Communicator::attach(DatagramSocket* socket)
290 throw(RuntimeException) {
292 throw RuntimeException("Cannot attach a NULL comm::DatagramSocket", ANNA_FILE_LOCATION);
294 Guard guard(this, "comm::Communicator::attach (DatagramSocket)");
295 Handler* handler = handler::Manager::instantiate().createHandler(socket);
298 string msg("comm::Communicator::attach | ");
299 msg += handler->asString();
300 Logger::debug(msg, ANNA_FILE_LOCATION);
304 void Communicator::attach(Handler* handler)
305 throw(RuntimeException) {
307 throw RuntimeException("Cannot attach a NULL comm::Handler", ANNA_FILE_LOCATION);
309 if(handler->getType() != Handler::Type::Custom)
310 throw RuntimeException("Communicator::attach only accept 'Custom' Handlers", ANNA_FILE_LOCATION);
312 Guard guard(this, "comm::Communicator::attach (Handler)");
315 string msg("comm::Communicator::attach (Handler) | ");
316 msg += handler->asString();
317 Logger::debug(msg, ANNA_FILE_LOCATION);
321 void Communicator::attach(Service* service)
322 throw(RuntimeException) {
324 throw RuntimeException("Cannot attach a NULL comm::Service", ANNA_FILE_LOCATION);
326 if(std::find(service_begin(), service_end(), service) != service_end())
329 service->initialize();
331 string msg("comm::Communicator::attach | ");
332 msg += service->asString();
333 Logger::debug(msg, ANNA_FILE_LOCATION);
335 a_services.push_back(service);
337 if(service->isCritical() == true && service->isAvailable() == false)
338 setStatus(Status::Unavailable);
341 void Communicator::insert(Handler* handler)
342 throw(RuntimeException) {
343 handler->initialize();
345 if(handler->getfd() < 0) {
346 string msg(handler->asString());
347 msg += " | Cannot attach a Handler with fd < 0";
348 throw RuntimeException(msg, ANNA_FILE_LOCATION);
351 const Handler* other = find(handler->getfd());
354 string msg("commm::Comunicator::insert | New: ");
355 msg += handler->asString();
356 msg += " | Collision: ";
357 msg += other->asString();
358 throw RuntimeException(msg, ANNA_FILE_LOCATION);
361 a_handlers.add(handler);
363 if(handler->supportTimeout())
364 handler->beat(anna::functions::hardwareClock());
366 WHEN_SINGLETHREAD(a_poll->insert(handler->getfd()));
369 if(a_isServing == true)
370 a_threadManager->createThread()->start(*handler);
374 void Communicator::detach(ServerSocket* serverSocket)
376 if(serverSocket == NULL)
379 comm::BinderSocket* binderSocket = serverSocket->getBinderSocket();
380 detach(find(serverSocket->getfd()));
382 if(binderSocket != NULL)
383 detach(binderSocket);
386 void Communicator::detach(ClientSocket* clientSocket)
388 if(clientSocket == NULL)
391 detach(find(clientSocket->getfd()));
394 void Communicator::detach(BinderSocket* binderSocket)
396 if(binderSocket == NULL)
399 detach(find(binderSocket->getfd()));
403 * Finaliza el trabajo del handler recibido, en este momento NO hay ninguna SSCC establecida.
404 * (1) Si se cierra la conexion con el cliente que al que atencia este proceso clonado => debe terminar la ejecucion.
406 void Communicator::detach(Handler* handler)
411 Guard guard(this, "comm::Communicator::detach");
412 const int fd = handler->getfd();
414 string msg("comm::Communicator::detach (Handler) | ");
415 msg += handler->asString();
416 Logger::debug(msg, ANNA_FILE_LOCATION);
419 if(anna::functions::supportMultithread() == true)
420 handler->requestStop();
421 else if(handler->supportTimeout() == true)
422 a_timedouts.erase(handler);
425 WHEN_SINGLETHREAD(a_poll->erase(fd););
427 if(a_handlers.erase(handler) == false) {
429 string msg(functions::asText("Handler: ", fd));
430 msg += " | Not found";
431 Logger::warning(msg, ANNA_FILE_LOCATION);
435 if(a_workMode == WorkMode::Clone && handler == a_mainHandler) // (1)
438 handler::Manager::instantiate().releaseHandler(handler);
441 const Handler* Communicator::getHandler(const ClientSocket& clientSocket)
442 throw(RuntimeException) {
443 Guard guard(this, "comm::Communicator::getHandler");
444 Handler* result = find(clientSocket.getfd());
447 switch(result->getType()) {
448 case Handler::Type::ServerSocket:
449 case Handler::Type::BinderSocket:
450 case Handler::Type::Custom:
459 //----------------------------------------------------------------------------------------------
460 // (1) Compruebo la solicitud de parada antes y despues de procesar el mensaje para acelar
461 // la solicitud de la peticion. En otro caso podrian pasar 30 seg desde que se solicita
462 // hasta que se acepta.
463 // (2) La invocacion de este metodo originara que se aniadan y borren fd's a la lista de
464 // streams pero la forma de recorrer el bucle nos blinda (un poco) de anomalias.
465 //----------------------------------------------------------------------------------------------
466 void Communicator::accept()
467 throw(RuntimeException) {
468 LOGMETHOD(TraceMethod traceMethod("comm::Communicator", "accept", ANNA_FILE_LOCATION));
470 if(isServing() == true)
471 throw RuntimeException("Communicator::accept is already invoked", ANNA_FILE_LOCATION);
473 a_requestedStop = false;
475 if(handler_size() == 0)
476 throw RuntimeException("No socket has been established for sending and/or reception", ANNA_FILE_LOCATION);
480 LOGINFORMATION(Logger::write(Logger::Information, Component::asString(), "Polling network", ANNA_FILE_LOCATION));
483 WHEN_SINGLETHREAD(singlethreadedAccept());
484 WHEN_MULTITHREAD(multithreadedAccept());
485 } catch(RuntimeException& ex) {
493 void Communicator::singlethreadedAccept()
494 throw(RuntimeException) {
495 LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "singlethreadedAccept", ANNA_FILE_LOCATION));
498 Microsecond now(anna::functions::hardwareClock());
500 a_poll->setTimeout((a_connectionRecover->isRunning() == true) ? a_recoveryTime : (Millisecond)5000);
504 while(a_requestedStop == false) {
505 if(a_connectionRecover->isRunning() == true) {
506 a_connectionRecover->tryRecover();
507 a_poll->setTimeout((a_connectionRecover->isRunning() == true) ? a_recoveryTime : (Millisecond)5000);
510 a_poll->waitMessage();
512 if(a_timedouts.size() > 0)
513 now = anna::functions::hardwareClock();
515 while((fd = a_poll->fetch()) != -1) {
516 if((handler = find(fd)) == NULL)
520 if(handler->supportTimeout())
524 } catch(RuntimeException& ex) {
529 if(a_pendingClose == true) {
530 a_pendingClose = false;
532 // @Eduardo (multiconnection to same address/port); On st, is considered to have one socket pending to be closed
533 for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++)
534 // if (Communicator::handler (ii)->testClose () == true)
536 Communicator::handler(ii)->testClose();
539 now = anna::functions::hardwareClock();
541 if(a_timedouts.size() == 0)
545 handler_iterator ii = a_timedouts.begin();
547 while(ii != a_timedouts.end()) {
548 handler = Communicator::handler(ii);
550 if(handler->isTimeout(now) == true) {
552 string msg(handler->asString());
553 msg += " | Closed due to inactivity";
554 Logger::warning(msg, ANNA_FILE_LOCATION);
557 ii = a_timedouts.begin();
568 void Communicator::multithreadedAccept()
569 throw(RuntimeException) {
570 LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "multithreadedAccept", ANNA_FILE_LOCATION));
572 Guard guard(this, "comm::Communicator::multithreadedAccept");
575 for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++) {
576 handler = Communicator::handler(ii);
578 string msg("Starting | ");
579 msg += handler->asString();
580 Logger::debug(msg, ANNA_FILE_LOCATION)
582 a_threadManager->createThread()->start(*handler);
585 Millisecond delay(500);
587 while(a_requestedStop == false) {
588 anna::functions::sleep(delay);
590 if(a_requestedStop == true)
593 if(a_connectionRecover->isRunning() == false)
597 Guard guard(this, "comm::Communicator::multithreadedAccept (ConnectionRecover)");
598 a_connectionRecover->tryRecover();
603 void Communicator::requestStop()
605 if(a_requestedStop == true)
608 Guard guard(this, "comm::Communicator::requestStop");
610 if(a_requestedStop == true)
613 a_requestedStop = true;
616 for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++)
617 handler(ii)->requestStop();
618 } catch(RuntimeException& ex) {
623 string msg("comm::Communicator::requestStop | ");
625 Logger::warning(msg, ANNA_FILE_LOCATION);
629 bool Communicator::isUsable(const ClientSocket* clientSocket)
631 if(clientSocket == NULL)
634 Guard guard(this, "comm::Communicator::isUsable");
635 Handler* handler = find(clientSocket->getfd());
640 const Handler::Type::_v type = handler->getType();
641 return (type == Handler::Type::LocalConnection || type == Handler::Type::RemoteConnection);
644 void Communicator::setStatus(const Status& status)
646 Guard guard(this, "comm::Communicator::setStatus");
648 if(a_status != status)
652 string msg("comm::Communicator::setStatus | ");
653 msg += a_status.asString();
654 Logger::information(msg, ANNA_FILE_LOCATION);
658 void Communicator::eventBreakAddress(const in_addr_t& address)
660 Device* device = Network::instantiate().find(address);
662 if(device->getStatus() == Device::Status::Down)
666 string msg("comm::Communicator::eventBreakAddress | ");
667 msg += device->asString();
668 Logger::warning(msg, ANNA_FILE_LOCATION);
670 Guard guard(this, "comm::Communicator::eventBreakAddress");
671 device->setStatus(Device::Status::Down);
673 * Trabaja sobre una copia para no perder la referencia cuando se elimine un miembro de la lista original
675 * En la lista a borrar sólo mete los handler::ServerSocket y los handler::RemoteConnection, ya que los handler::LocalConnection
676 * será liberados recursivamente cuando liberemos los primeros.
678 typedef vector <comm::Handler*> work_container;
679 typedef work_container::iterator work_iterator;
682 for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++) {
683 comm::Handler* handler = comm::Communicator::handler(ii);
685 if(dynamic_cast <comm::handler::ServerSocket*>(handler) != NULL) {
686 ww.push_back(handler);
687 } else if(dynamic_cast <comm::handler::RemoteConnection*>(handler) != NULL) {
688 ww.push_back(handler);
692 for(work_iterator ii = ww.begin(), maxii = ww.end(); ii != maxii; ii ++) {
694 std::string msg("Communicator::eventBreakAddress | ");
695 msg = (*ii)->asString();
696 Logger::debug(msg, ANNA_FILE_LOCATION)
698 (*ii)->breakAddress(address);
702 void Communicator::eventRecoverAddress(const in_addr_t& address)
704 Device* device = Network::instantiate().find(address);
706 if(device->getStatus() == Device::Status::Up)
710 string msg("comm::Communicator::eventRecoverAddress | ");
711 msg += device->asString();
712 Logger::warning(msg, ANNA_FILE_LOCATION);
714 Guard guard(this, "comm::Communicator::eventRecoverAddress");
715 device->setStatus(Device::Status::Up);
716 Handlers backup(a_handlers);
718 for(handler_iterator ii = backup.begin(), maxii = backup.end(); ii != maxii; ii ++)
719 handler(ii)->recoverAddress(address);
722 bool Communicator::eventAcceptConnection(const ClientSocket& clientSocket)
723 throw(RuntimeException) {
724 LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventAcceptConnection", ANNA_FILE_LOCATION));
726 if(a_requestedStop == true) {
728 string msg(Component::asString());
730 msg += clientSocket.asString();
731 msg += " | Connection rejected due to stop request";
732 Logger::warning(msg, ANNA_FILE_LOCATION);
737 CongestionController::Workload ww = CongestionController::instantiate().getAccumulatedWorkload();
739 if(CongestionController::getLevel(ww) >= a_levelOfDenialService) {
741 string msg("comm::Communicator::eventAcceptConnection | Level: ");
742 msg += functions::asString(CongestionController::getLevel(ww));
743 msg += functions::asString(" | Load: %d%% ", CongestionController::getLoad(ww));
744 msg += " | Result: false";
745 Logger::warning(msg, ANNA_FILE_LOCATION);
753 //--------------------------------------------------------------------------------------------------------
754 // (1) Alguno de los comm::Server asociados al servicio puede haber pasado a activo, pero nos
755 // interesa como estaba el servicio antes de esta activacion.
756 // (2) Si el servicio es "No critico" => no afecta al estado del proceso.
757 // (3) Solo si todos los servicios "Criticos" estan disponibles pasara a estar "Activo".
758 //--------------------------------------------------------------------------------------------------------
759 void Communicator::eventCreateConnection(const Server* server)
764 LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventCreateConnection", ANNA_FILE_LOCATION));
766 string msg("comm::Communicator::eventCreateConnection | ");
767 msg += server->asString();
768 Logger::notice(msg, ANNA_FILE_LOCATION);
770 bool recoverService = false;
772 for(Server::const_iterator ii = server->begin(), maxii = server->end(); ii != maxii; ii ++) {
773 Service* service = const_cast <Service*>(Server::service(ii));
775 if(service->wasAvailable() == true) { // (1)
776 service->recover(server);
780 service->recover(server);
781 eventCreateConnection(service);
782 recoverService = true;
785 if(recoverService == false || a_status == Status::Available)
788 Guard guard(this, "comm::Communicator::eventCreateConnection");
789 Status status(Status::Available);
791 for(const_service_iterator ii = service_begin(), maxii = service_end(); ii != maxii; ii ++) {
792 const Service* service = Communicator::service(ii);
794 if(service->isCritical() == false) // (2)
797 if(service->isAvailable() == false) {
798 status = Status::Unavailable;
803 setStatus(status); // (3)
806 void Communicator::eventCreateConnection(const Service* service)
812 string msg("comm::Communicator::eventCreateConnection | ");
813 msg += service->asString();
814 Logger::notice(msg, ANNA_FILE_LOCATION);
819 * Se invoca desde handler::RemoteConnection:[Tx] -> Communicator
821 void Communicator::eventBreakConnection(const Server* server)
826 LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventBreakConnection (server)", ANNA_FILE_LOCATION));
828 string msg("comm::Communicator::eventBreakConnection | ");
829 msg += server->asString();
830 Logger::warning(msg, ANNA_FILE_LOCATION);
832 //Guard guard (this, "comm::Communicator::eventBreakConnection");
833 Status status(a_status);
835 for(Server::const_iterator ii = server->begin(), maxii = server->end(); ii != maxii; ii ++) {
836 Service* service = const_cast <Service*>(Server::service(ii));
837 service->fault(server);
839 if(service->isAvailable() == true)
842 eventBreakConnection(service);
844 if(status == Status::Available && service->isCritical() == true)
845 status = Status::Unavailable;
851 void Communicator::eventBreakConnection(const Service* service)
856 LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventBreakConnection (service)", ANNA_FILE_LOCATION));
858 string msg("comm::Communicator::eventBreakConnection | ");
859 msg += service->asString();
860 Logger::warning(msg, ANNA_FILE_LOCATION);
864 void Communicator::eventShutdown()
867 string msg("comm::Communicator::eventShutdown | ");
869 Logger::warning(msg, ANNA_FILE_LOCATION);
871 setStatus(Status::Unavailable);
873 Handlers backup(a_handlers);
875 for(handler_iterator ii = backup.begin(), maxii = backup.end(); ii != maxii; ii ++)
881 a_threadManager->join();
882 } catch(RuntimeException& ex) {
889 std::string Communicator::asString() const
891 string result("comm::Communicator { ");
892 result += Component::asString();
893 result += " | RequestedStop: ";
894 result += anna::functions::asString(a_requestedStop);
896 result += a_status.asString();
897 result += anna::functions::asString(" | Handlers: %d", a_handlers.size());
898 result += anna::functions::asString(" | RecoveryTime: %d ms", a_recoveryTime.getValue());
899 return result += " }";
902 xml::Node* Communicator::asXML(xml::Node* parent) const
904 parent = app::Component::asXML(parent);
905 xml::Node* result = parent->createChild("comm.Communicator");
906 result->createAttribute("RequestedStop", anna::functions::asString(a_requestedStop));
907 result->createAttribute("RecoveryTime", a_recoveryTime);
908 result->createAttribute("TryingConnectionTime", a_tryingConnectionTime);
909 result->createAttribute("Timeout", a_timeout);
910 result->createAttribute("Status", a_status.asString());
911 result->createAttribute("Mode", (a_workMode == WorkMode::Single) ? "Single" : "Clone");
912 result->createAttribute("LevelOfDenialService", a_levelOfDenialService);
913 Network::instantiate().asXML(result);
914 xml::Node* node = result->createChild("comm.Handlers");
916 for(const_handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++)
917 handler(ii)->asXML(node);
919 node = result->createChild("comm.Services");
921 for(const_service_iterator ii = service_begin(), maxii = service_end(); ii != maxii; ii ++)
922 service(ii)->asXML(node);
924 a_connectionRecover->asXML(result);
925 CongestionController& congestionController = CongestionController::instantiate();
926 congestionController.asXML(result);
931 * Se invoca desde app::Application::clone -> app::Component::do_cloneParent (ojo EN EL PROCESO ORIGINAL).
932 * Ofrece la posiblidad de que el componente del proceso original liberen los recursos que no va
935 * Cada vez que se clona tiene que sacar de su comprobacion las conexiones que ha sufrido. Ya que de otro
936 * modo podria estar tratantado contestaciones a peticiones realizadas desde los procesos hijos => estos
937 * nunca recibirian las respuestas a sus peticiones.
939 * Estas conexiones no se pueden cerrar porque deben mantenerse abiertas para que los procesos hijos las
940 * hereden, solo es que este proceso no debe atenderlas.
942 * El codigo del servidor (el proceso original) no hace nada con ese Socket, lo cierra porque ya lo ha duplicado
943 * el proceso hijo y sigue atendiendo peticiones de conexion.
945 void Communicator::do_cloneParent()
946 throw(RuntimeException) {
947 LOGMETHOD(TraceMethod traceMethod("comm::Communicator", "do_cloneParent", ANNA_FILE_LOCATION));
949 Handler::Type::_v type;
951 for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++) {
952 handler = Communicator::handler(ii);
953 type = handler->getType();
955 if(type == Handler::Type::RemoteConnection || type == Handler::Type::ClientSocket)
956 a_poll->erase(handler->getfd());
960 handler = a_mainHandler;
961 a_mainHandler = NULL;
963 } catch(RuntimeException& ex) {
969 * Se invoca desde app::Application::clone -> app::Component::do_cloneChild (ojo EN EL NUEVO PROCESO).
970 * Ofrece la posiblidad de que los componentes que acaban de ser duplicados liberen los recursos que no van
971 * a usar, ya que la copia solo se dedicara a tratar los mensajes que entren por su conexion asociada y
972 * las respuestas a las peticiones originadas en el tratamiento del mismo.
974 * (1) Recordar que el handler que tiene asociado el clon esta registrado para evitar su cierre.
975 * (3) Limpia la lista porque de otro modo si el proceso clon hiciera nuevas conexiones podria encontrar
976 * que el fd ya esta guardado en la lista. No se puede invocar directamente al detach, porque este
977 * metodo modifica la lista a_handlers, y nos haria perder la cuenta del iterator.
978 * (4) Realiza un duplicado fisico de las conexiones realizadas por el proceso original, elimina el fd de
979 * la lista a comprobar, este fd sera cerrado por el metodo Handler::clone.
980 * (5) Registra el Handler por el que ha sido creado este nuevo proceso, asi que cuando se invoque al detach
981 * con ese handler => el programa terminara la ejecucion.
983 void Communicator::do_cloneChild()
985 LOGMETHOD(TraceMethod traceMethod("comm::Communicator", "do_cloneChild", ANNA_FILE_LOCATION));
987 string msg("comm::Communicator::do_cloneChild | MainHandler: ");
988 msg += a_mainHandler->asString();
989 Logger::information(msg, ANNA_FILE_LOCATION);
991 Handler* handler(NULL);
992 vector <Handler*> toDetach;
994 for(handler_iterator ii = handler_begin(); ii != handler_end(); ii ++) { // (1)
995 handler = Communicator::handler(ii);
997 if(handler == a_mainHandler)
1000 switch(handler->getType()) {
1001 case Handler::Type::Custom:
1002 case Handler::Type::RemoteConnection:
1003 case Handler::Type::ClientSocket:
1004 a_poll->erase(handler->getfd()); // (4)
1006 string msg("comm::Communicator::do_cloneChild | ");
1007 msg += handler->asString();
1008 Logger::debug(msg, ANNA_FILE_LOCATION);
1011 a_poll->insert(handler->getfd());
1014 toDetach.push_back(handler); // (3)
1019 for(vector <Handler*>::iterator ii = toDetach.begin(), maxii = toDetach.end(); ii != maxii; ii ++)
1025 singlethreadedAccept();
1026 } catch(RuntimeException& ex) {
1033 int Communicator::SortByFileDescriptor::value(const Handler* handler)
1035 return handler->getfd();