1 // ANNA - Anna is Not Nothingness Anymore
3 // (c) Copyright 2005-2014 Eduardo Ramos Testillano & Francisco Ruiz Rayo
5 // http://redmine.teslayout.com/projects/anna-suite
7 // Redistribution and use in source and binary forms, with or without
8 // modification, are permitted provided that the following conditions
11 // * Redistributions of source code must retain the above copyright
12 // notice, this list of conditions and the following disclaimer.
13 // * Redistributions in binary form must reproduce the above
14 // copyright notice, this list of conditions and the following disclaimer
15 // in the documentation and/or other materials provided with the
17 // * Neither the name of the copyright holder nor the names of its
18 // contributors may be used to endorse or promote products derived from
19 // this software without specific prior written permission.
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33 // Authors: eduardo.ramos.testillano@gmail.com
34 // cisco.tierra@gmail.com
42 #include <sys/ioctl.h>
46 #include <anna/core/tracing/Logger.hpp>
47 #include <anna/core/tracing/TraceMethod.hpp>
48 #include <anna/core/mt/Guard.hpp>
49 #include <anna/core/util/Average.hpp>
50 #include <anna/core/mt/ThreadManager.hpp>
51 #include <anna/core/mt/Thread.hpp>
52 #include <anna/core/util/Microsecond.hpp>
53 #include <anna/core/util/defines.hpp>
55 #include <anna/comm/comm.hpp>
56 #include <anna/comm/internal/sccs.hpp>
57 #include <anna/comm/internal/BinderSocket.hpp>
58 #include <anna/comm/internal/RemoteConnection.hpp>
59 #include <anna/comm/handler/Manager.hpp>
62 #include <anna/comm/internal/Poll.hpp>
65 #include <anna/core/mt/ThreadManager.hpp>
67 #include <anna/xml/Node.hpp>
68 #include <anna/xml/Attribute.hpp>
74 const Millisecond comm::Communicator::MinRecoveryTime(1000);
75 const Millisecond comm::Communicator::DefaultRecoveryTime(5000);
76 const Millisecond comm::Communicator::MaxRecoveryTime(30000);
77 const Millisecond comm::Communicator::MinTryingConnectionTime(100);
78 const Millisecond comm::Communicator::DefaultTryingConnectionTime(200);
79 const Millisecond comm::Communicator::MaxTryingConnectionTime(1000);
80 const Millisecond comm::Communicator::DefaultTimeout(10 * 60 * 1000); // 10 minutos.
83 Millisecond comm::Communicator::st_ReceivingChunkSize(comm::Communicator::DefaultChunkSize);
85 Communicator::Communicator(const Communicator::WorkMode::_v workMode) :
86 Component(getClassName()),
88 a_workMode(WorkMode::Single),
92 a_recoveryTime(DefaultRecoveryTime),
93 a_requestedStop(false),
94 a_status(Status::Available),
97 a_threadManager(NULL),
98 a_timeout(DefaultTimeout),
100 a_tryingConnectionTime(DefaultTryingConnectionTime) {
101 WHEN_SINGLETHREAD(a_poll = new Poll);
103 a_threadManager = new ThreadManager("comm::Communicator::ThreadManager", ThreadManager::Mode::Unlimit, 0)
105 handler::Manager::instantiate().initialize(this);
106 a_connectionRecover = new comm::ConnectionRecover(this);
107 a_levelOfDenialService = comm::CongestionController::MaxLevel - 1;
109 comm::sccs::activate();
113 Communicator::~Communicator() {
115 delete a_threadManager;
116 delete a_connectionRecover;
119 void Communicator::setRecoveryTime(const Millisecond &recoveryTime)
120 throw(RuntimeException) {
121 if(recoveryTime < MinRecoveryTime || recoveryTime > MaxRecoveryTime) {
122 string msg("comm::Communicator::setRecoveryTime | ");
123 msg += functions::asString("RecoveryTime (%d ms) must be between %d ms and %d ms", recoveryTime.getValue(), MinRecoveryTime.getValue(), MaxRecoveryTime.getValue());
124 throw RuntimeException(msg, ANNA_FILE_LOCATION);
127 a_recoveryTime = recoveryTime;
130 void Communicator::setTryingConnectionTime(const Millisecond &tryingConnectionTime)
131 throw(RuntimeException) {
132 if(tryingConnectionTime < MinTryingConnectionTime || tryingConnectionTime > MaxTryingConnectionTime) {
133 string msg("comm::Communicator::setTryingConnectionTime | ");
134 msg += functions::asString("TryingConnectionTime (%d ms) must be between %d ms and %d ms", tryingConnectionTime.getValue(), MinTryingConnectionTime.getValue(), MaxTryingConnectionTime.getValue());
135 throw RuntimeException(msg, ANNA_FILE_LOCATION);
138 a_tryingConnectionTime = tryingConnectionTime;
142 void Communicator::setReceivingChunkSize(const int receivingChunkSize)
143 throw(RuntimeException) {
144 if(receivingChunkSize < MinReceivingChunkSize || receivingChunkSize > MaxReceivingChunkSize) {
145 string msg("comm::Communicator::setReceivingChunkSize | ");
146 msg += functions::asString("ReceivingChunkSize (%d bytes) must be between %d bytes and %d bytes", receivingChunkSize, MinReceivingChunkSize, MaxReceivingChunkSize);
147 throw RuntimeException(msg, ANNA_FILE_LOCATION);
150 st_ReceivingChunkSize = receivingChunkSize;
151 CongestionController::instantiate().setMaxPendingBytes(receivingChunkSize);
154 void Communicator::setLevelOfDenialService(const int levelOfDenialService)
155 throw(RuntimeException) {
156 const int min(comm::CongestionController::MaxLevel - 2);
157 const int max(comm::CongestionController::MaxLevel);
159 if(levelOfDenialService < min || levelOfDenialService > max) {
160 string msg("comm::Communicator::setTryingConnectionTime | ");
161 msg += functions::asString("LevelOfDenialService %d must be between %d and %d", levelOfDenialService, min, max);
162 throw RuntimeException(msg, ANNA_FILE_LOCATION);
165 a_levelOfDenialService = levelOfDenialService;
168 void Communicator::attach(ServerSocket* serverSocket)
169 throw(RuntimeException) {
170 if(serverSocket == NULL)
171 throw RuntimeException("Cannot attach a NULL comm::ServerSocket", ANNA_FILE_LOCATION);
173 Guard guard(this, "comm::Communicator::attach (ServerSocket)");
174 Handler* handler = handler::Manager::instantiate().createHandler(serverSocket);
177 string msg("comm::Communicator::attach (ServerSocket) | ");
178 msg += handler->asString();
179 Logger::debug(msg, ANNA_FILE_LOCATION);
182 if(serverSocket->isSharedBind() == true && serverSocket->getBinderSocket() != NULL)
183 attach(serverSocket->getBinderSocket());
186 void Communicator::attach(BinderSocket* binderSocket)
187 throw(RuntimeException) {
188 if(binderSocket == NULL)
189 throw RuntimeException("Cannot attach a NULL comm::BinderSocket", ANNA_FILE_LOCATION);
191 Guard guard(this, "comm::Communicator::attach (BinderSocket)");
192 Handler* handler = handler::Manager::instantiate().createHandler(binderSocket);
195 string msg("comm::Communicator::attach (BinderSocket) | ");
196 msg += handler->asString();
197 Logger::debug(msg, ANNA_FILE_LOCATION);
202 * Comienza a tratar la conexion que hacen a un comm::ServerSocket de este proceso, desde algun otro proceso.
204 * Se invoca desde comm::handler::ServerSocket::accept [Tx] -> <null>
206 void Communicator::attach(LocalConnection* localConnection)
207 throw(RuntimeException) {
208 if(localConnection == NULL)
209 throw RuntimeException("Cannot attach a NULL comm::LocalConnection", ANNA_FILE_LOCATION);
212 * Obtiene la información/bloquea al ClientSocket para asegurar el orden correcto, ya que de otro modo,
213 * se podría producir un interbloqueo Communicator & ClientSocket.
215 * Recordar que el handler::MetaClientSocket::apply sólo bloquea el ClientSocket sobre el que actúa y luego
216 * y si detecta el cierre, desbloquea el ClientSocket y bloquea el Communicator. Así que para mantener
217 * el orden correcto hay que invocar al comm::ClientSocket::getTransport (que establece una SCCS sobre él) antes
218 * de bloquea el Communicator.
220 ClientSocket* clientSocket = localConnection->getClientSocket();
222 if(clientSocket == NULL)
223 throw RuntimeException("comm::Communicator::attach (LocalConnection) | ClientSocket can not be NULL", ANNA_FILE_LOCATION);
225 // Todavía no está corriendo el thread que se encargará de éste comm::ClientSocket => podemos acceder a él sin SSCC.
226 const Transport* transport = clientSocket->unsafe_reserveTransport();
227 Guard guard(this, "comm::Communicator::attach (LocalConnection)");
228 Handler* handler = handler::Manager::instantiate().createHandler(localConnection);
231 if(((transport == NULL) ? true : transport->enableTimeout()) == true)
232 handler->setTimeout(a_timeout);
236 if(a_workMode == WorkMode::Single && handler->supportTimeout() == true)
237 a_timedouts.add(handler)
241 string msg("comm::Communicator::attach | ");
242 msg += handler->asString();
243 Logger::debug(msg, ANNA_FILE_LOCATION);
246 if(a_workMode == WorkMode::Clone) {
247 a_mainHandler = handler;
248 app::Application& app = app::functions::getApp().clone();
253 * Este método sólo se invoca desde comm::Server::connect y este método los primero que hace
254 * es bloquear el acceso al comunicador, para asegurar el orden correcto de bloqueo a l hora
255 * de tratar la desconexión.
257 * Por eso no hace falta establecer la sección crítica que habitualmente se establece en todos
258 * los métodos Communicator::attach.
260 void Communicator::attach(RemoteConnection* remoteConnection)
261 throw(RuntimeException) {
262 if(remoteConnection == NULL)
263 throw RuntimeException("Cannot attach a NULL comm::RemoteConnection", ANNA_FILE_LOCATION);
265 // Guard guard (this, "comm::Communicator::attach (RemoteConnection)");
266 Handler* handler = handler::Manager::instantiate().createHandler(remoteConnection);
269 string msg("comm::Communicator::attach | ");
270 msg += handler->asString();
271 Logger::debug(msg, ANNA_FILE_LOCATION);
275 void Communicator::attach(ClientSocket* socket)
276 throw(RuntimeException) {
278 throw RuntimeException("Cannot attach a NULL comm::ClientSocket", ANNA_FILE_LOCATION);
280 Guard guard(this, "comm::Communicator::attach (ClientSocket)");
281 Handler* handler = handler::Manager::instantiate().createHandler(socket);
284 string msg("comm::Communicator::attach | ");
285 msg += handler->asString();
286 Logger::debug(msg, ANNA_FILE_LOCATION);
290 void Communicator::attach(DatagramSocket* socket)
291 throw(RuntimeException) {
293 throw RuntimeException("Cannot attach a NULL comm::DatagramSocket", ANNA_FILE_LOCATION);
295 Guard guard(this, "comm::Communicator::attach (DatagramSocket)");
296 Handler* handler = handler::Manager::instantiate().createHandler(socket);
299 string msg("comm::Communicator::attach | ");
300 msg += handler->asString();
301 Logger::debug(msg, ANNA_FILE_LOCATION);
305 void Communicator::attach(Handler* handler)
306 throw(RuntimeException) {
308 throw RuntimeException("Cannot attach a NULL comm::Handler", ANNA_FILE_LOCATION);
310 if(handler->getType() != Handler::Type::Custom)
311 throw RuntimeException("Communicator::attach only accept 'Custom' Handlers", ANNA_FILE_LOCATION);
313 Guard guard(this, "comm::Communicator::attach (Handler)");
316 string msg("comm::Communicator::attach (Handler) | ");
317 msg += handler->asString();
318 Logger::debug(msg, ANNA_FILE_LOCATION);
322 void Communicator::attach(Service* service)
323 throw(RuntimeException) {
325 throw RuntimeException("Cannot attach a NULL comm::Service", ANNA_FILE_LOCATION);
327 if(std::find(service_begin(), service_end(), service) != service_end())
330 service->initialize();
332 string msg("comm::Communicator::attach | ");
333 msg += service->asString();
334 Logger::debug(msg, ANNA_FILE_LOCATION);
336 a_services.push_back(service);
338 if(service->isCritical() == true && service->isAvailable() == false)
339 setStatus(Status::Unavailable);
342 void Communicator::insert(Handler* handler)
343 throw(RuntimeException) {
344 handler->initialize();
346 if(handler->getfd() < 0) {
347 string msg(handler->asString());
348 msg += " | Cannot attach a Handler with fd < 0";
349 throw RuntimeException(msg, ANNA_FILE_LOCATION);
352 const Handler* other = find(handler->getfd());
355 string msg("commm::Comunicator::insert | New: ");
356 msg += handler->asString();
357 msg += " | Collision: ";
358 msg += other->asString();
359 throw RuntimeException(msg, ANNA_FILE_LOCATION);
362 a_handlers.add(handler);
364 if(handler->supportTimeout())
365 handler->beat(anna::functions::hardwareClock());
367 WHEN_SINGLETHREAD(a_poll->insert(handler->getfd()));
370 if(a_isServing == true)
371 a_threadManager->createThread()->start(*handler);
375 void Communicator::detach(ServerSocket* serverSocket)
377 if(serverSocket == NULL)
380 comm::BinderSocket* binderSocket = serverSocket->getBinderSocket();
381 detach(find(serverSocket->getfd()));
383 if(binderSocket != NULL)
384 detach(binderSocket);
387 void Communicator::detach(ClientSocket* clientSocket)
389 if(clientSocket == NULL)
392 detach(find(clientSocket->getfd()));
395 void Communicator::detach(BinderSocket* binderSocket)
397 if(binderSocket == NULL)
400 detach(find(binderSocket->getfd()));
404 * Finaliza el trabajo del handler recibido, en este momento NO hay ninguna SSCC establecida.
405 * (1) Si se cierra la conexion con el cliente que al que atencia este proceso clonado => debe terminar la ejecucion.
407 void Communicator::detach(Handler* handler)
412 Guard guard(this, "comm::Communicator::detach");
413 const int fd = handler->getfd();
415 string msg("comm::Communicator::detach (Handler) | ");
416 msg += handler->asString();
417 Logger::debug(msg, ANNA_FILE_LOCATION);
420 if(anna::functions::supportMultithread() == true)
421 handler->requestStop();
422 else if(handler->supportTimeout() == true)
423 a_timedouts.erase(handler);
426 WHEN_SINGLETHREAD(a_poll->erase(fd););
428 if(a_handlers.erase(handler) == false) {
430 string msg(functions::asText("Handler: ", fd));
431 msg += " | Not found";
432 Logger::warning(msg, ANNA_FILE_LOCATION);
436 if(a_workMode == WorkMode::Clone && handler == a_mainHandler) // (1)
439 handler::Manager::instantiate().releaseHandler(handler);
442 const Handler* Communicator::getHandler(const ClientSocket& clientSocket)
443 throw(RuntimeException) {
444 Guard guard(this, "comm::Communicator::getHandler");
445 Handler* result = find(clientSocket.getfd());
448 switch(result->getType()) {
449 case Handler::Type::ServerSocket:
450 case Handler::Type::BinderSocket:
451 case Handler::Type::Custom:
460 //----------------------------------------------------------------------------------------------
461 // (1) Compruebo la solicitud de parada antes y despues de procesar el mensaje para acelar
462 // la solicitud de la peticion. En otro caso podrian pasar 30 seg desde que se solicita
463 // hasta que se acepta.
464 // (2) La invocacion de este metodo originara que se aniadan y borren fd's a la lista de
465 // streams pero la forma de recorrer el bucle nos blinda (un poco) de anomalias.
466 //----------------------------------------------------------------------------------------------
467 void Communicator::accept()
468 throw(RuntimeException) {
469 LOGMETHOD(TraceMethod traceMethod("comm::Communicator", "accept", ANNA_FILE_LOCATION));
471 if(isServing() == true)
472 throw RuntimeException("Communicator::accept is already invoked", ANNA_FILE_LOCATION);
474 a_requestedStop = false;
476 if(handler_size() == 0)
477 throw RuntimeException("No socket has been established for sending and/or reception", ANNA_FILE_LOCATION);
481 LOGINFORMATION(Logger::write(Logger::Information, Component::asString(), "Polling network", ANNA_FILE_LOCATION));
484 WHEN_SINGLETHREAD(singlethreadedAccept());
485 WHEN_MULTITHREAD(multithreadedAccept());
486 } catch(RuntimeException& ex) {
494 void Communicator::singlethreadedAccept()
495 throw(RuntimeException) {
496 LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "singlethreadedAccept", ANNA_FILE_LOCATION));
499 Microsecond now(anna::functions::hardwareClock());
501 a_poll->setTimeout((a_connectionRecover->isRunning() == true) ? a_recoveryTime : (Millisecond)5000);
505 while(a_requestedStop == false) {
506 if(a_connectionRecover->isRunning() == true) {
507 a_connectionRecover->tryRecover();
508 a_poll->setTimeout((a_connectionRecover->isRunning() == true) ? a_recoveryTime : (Millisecond)5000);
511 a_poll->waitMessage();
513 if(a_timedouts.size() > 0)
514 now = anna::functions::hardwareClock();
516 while((fd = a_poll->fetch()) != -1) {
517 if((handler = find(fd)) == NULL)
521 if(handler->supportTimeout())
525 } catch(RuntimeException& ex) {
530 if(a_pendingClose == true) {
531 a_pendingClose = false;
533 // @Eduardo (multiconnection to same address/port); On st, is considered to have one socket pending to be closed
534 for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++)
535 // if (Communicator::handler (ii)->testClose () == true)
537 Communicator::handler(ii)->testClose();
540 now = anna::functions::hardwareClock();
542 if(a_timedouts.size() == 0)
546 handler_iterator ii = a_timedouts.begin();
548 while(ii != a_timedouts.end()) {
549 handler = Communicator::handler(ii);
551 if(handler->isTimeout(now) == true) {
553 string msg(handler->asString());
554 msg += " | Closed due to inactivity";
555 Logger::warning(msg, ANNA_FILE_LOCATION);
558 ii = a_timedouts.begin();
569 void Communicator::multithreadedAccept()
570 throw(RuntimeException) {
571 LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "multithreadedAccept", ANNA_FILE_LOCATION));
573 Guard guard(this, "comm::Communicator::multithreadedAccept");
576 for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++) {
577 handler = Communicator::handler(ii);
579 string msg("Starting | ");
580 msg += handler->asString();
581 Logger::debug(msg, ANNA_FILE_LOCATION)
583 a_threadManager->createThread()->start(*handler);
586 Millisecond delay(500);
588 while(a_requestedStop == false) {
589 anna::functions::sleep(delay);
591 if(a_requestedStop == true)
594 if(a_connectionRecover->isRunning() == false)
598 Guard guard(this, "comm::Communicator::multithreadedAccept (ConnectionRecover)");
599 a_connectionRecover->tryRecover();
604 void Communicator::requestStop()
606 if(a_requestedStop == true)
609 Guard guard(this, "comm::Communicator::requestStop");
611 if(a_requestedStop == true)
614 a_requestedStop = true;
617 for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++)
618 handler(ii)->requestStop();
619 } catch(RuntimeException& ex) {
624 string msg("comm::Communicator::requestStop | ");
626 Logger::warning(msg, ANNA_FILE_LOCATION);
630 bool Communicator::isUsable(const ClientSocket* clientSocket)
632 if(clientSocket == NULL)
635 Guard guard(this, "comm::Communicator::isUsable");
636 Handler* handler = find(clientSocket->getfd());
641 const Handler::Type::_v type = handler->getType();
642 return (type == Handler::Type::LocalConnection || type == Handler::Type::RemoteConnection);
645 void Communicator::setStatus(const Status& status)
647 Guard guard(this, "comm::Communicator::setStatus");
649 if(a_status != status)
653 string msg("comm::Communicator::setStatus | ");
654 msg += a_status.asString();
655 Logger::information(msg, ANNA_FILE_LOCATION);
659 void Communicator::eventBreakAddress(const in_addr_t& address)
661 Device* device = Network::instantiate().find(address);
663 if(device->getStatus() == Device::Status::Down)
667 string msg("comm::Communicator::eventBreakAddress | ");
668 msg += device->asString();
669 Logger::warning(msg, ANNA_FILE_LOCATION);
671 Guard guard(this, "comm::Communicator::eventBreakAddress");
672 device->setStatus(Device::Status::Down);
674 * Trabaja sobre una copia para no perder la referencia cuando se elimine un miembro de la lista original
676 * En la lista a borrar sólo mete los handler::ServerSocket y los handler::RemoteConnection, ya que los handler::LocalConnection
677 * será liberados recursivamente cuando liberemos los primeros.
679 typedef vector <comm::Handler*> work_container;
680 typedef work_container::iterator work_iterator;
683 for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++) {
684 comm::Handler* handler = comm::Communicator::handler(ii);
686 if(dynamic_cast <comm::handler::ServerSocket*>(handler) != NULL) {
687 ww.push_back(handler);
688 } else if(dynamic_cast <comm::handler::RemoteConnection*>(handler) != NULL) {
689 ww.push_back(handler);
693 for(work_iterator ii = ww.begin(), maxii = ww.end(); ii != maxii; ii ++) {
695 std::string msg("Communicator::eventBreakAddress | ");
696 msg = (*ii)->asString();
697 Logger::debug(msg, ANNA_FILE_LOCATION)
699 (*ii)->breakAddress(address);
703 void Communicator::eventRecoverAddress(const in_addr_t& address)
705 Device* device = Network::instantiate().find(address);
707 if(device->getStatus() == Device::Status::Up)
711 string msg("comm::Communicator::eventRecoverAddress | ");
712 msg += device->asString();
713 Logger::warning(msg, ANNA_FILE_LOCATION);
715 Guard guard(this, "comm::Communicator::eventRecoverAddress");
716 device->setStatus(Device::Status::Up);
717 Handlers backup(a_handlers);
719 for(handler_iterator ii = backup.begin(), maxii = backup.end(); ii != maxii; ii ++)
720 handler(ii)->recoverAddress(address);
723 bool Communicator::eventAcceptConnection(const ClientSocket& clientSocket)
724 throw(RuntimeException) {
725 LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventAcceptConnection", ANNA_FILE_LOCATION));
727 if(a_requestedStop == true) {
729 string msg(Component::asString());
731 msg += clientSocket.asString();
732 msg += " | Connection rejected due to stop request";
733 Logger::warning(msg, ANNA_FILE_LOCATION);
738 CongestionController::Workload ww = CongestionController::instantiate().getAccumulatedWorkload();
740 if(CongestionController::getLevel(ww) >= a_levelOfDenialService) {
742 string msg("comm::Communicator::eventAcceptConnection | Level: ");
743 msg += functions::asString(CongestionController::getLevel(ww));
744 msg += functions::asString(" | Load: %d%% ", CongestionController::getLoad(ww));
745 msg += " | Result: false";
746 Logger::warning(msg, ANNA_FILE_LOCATION);
754 //--------------------------------------------------------------------------------------------------------
755 // (1) Alguno de los comm::Server asociados al servicio puede haber pasado a activo, pero nos
756 // interesa como estaba el servicio antes de esta activacion.
757 // (2) Si el servicio es "No critico" => no afecta al estado del proceso.
758 // (3) Solo si todos los servicios "Criticos" estan disponibles pasara a estar "Activo".
759 //--------------------------------------------------------------------------------------------------------
760 void Communicator::eventCreateConnection(const Server* server)
765 LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventCreateConnection", ANNA_FILE_LOCATION));
767 string msg("comm::Communicator::eventCreateConnection | ");
768 msg += server->asString();
769 Logger::notice(msg, ANNA_FILE_LOCATION);
771 bool recoverService = false;
773 for(Server::const_iterator ii = server->begin(), maxii = server->end(); ii != maxii; ii ++) {
774 Service* service = const_cast <Service*>(Server::service(ii));
776 if(service->wasAvailable() == true) { // (1)
777 service->recover(server);
781 service->recover(server);
782 eventCreateConnection(service);
783 recoverService = true;
786 if(recoverService == false || a_status == Status::Available)
789 Guard guard(this, "comm::Communicator::eventCreateConnection");
790 Status status(Status::Available);
792 for(const_service_iterator ii = service_begin(), maxii = service_end(); ii != maxii; ii ++) {
793 const Service* service = Communicator::service(ii);
795 if(service->isCritical() == false) // (2)
798 if(service->isAvailable() == false) {
799 status = Status::Unavailable;
804 setStatus(status); // (3)
807 void Communicator::eventCreateConnection(const Service* service)
813 string msg("comm::Communicator::eventCreateConnection | ");
814 msg += service->asString();
815 Logger::notice(msg, ANNA_FILE_LOCATION);
820 * Se invoca desde handler::RemoteConnection:[Tx] -> Communicator
822 void Communicator::eventBreakConnection(const Server* server)
827 LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventBreakConnection (server)", ANNA_FILE_LOCATION));
829 string msg("comm::Communicator::eventBreakConnection | ");
830 msg += server->asString();
831 Logger::warning(msg, ANNA_FILE_LOCATION);
833 //Guard guard (this, "comm::Communicator::eventBreakConnection");
834 Status status(a_status);
836 for(Server::const_iterator ii = server->begin(), maxii = server->end(); ii != maxii; ii ++) {
837 Service* service = const_cast <Service*>(Server::service(ii));
838 service->fault(server);
840 if(service->isAvailable() == true)
843 eventBreakConnection(service);
845 if(status == Status::Available && service->isCritical() == true)
846 status = Status::Unavailable;
852 void Communicator::eventBreakConnection(const Service* service)
857 LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventBreakConnection (service)", ANNA_FILE_LOCATION));
859 string msg("comm::Communicator::eventBreakConnection | ");
860 msg += service->asString();
861 Logger::warning(msg, ANNA_FILE_LOCATION);
865 void Communicator::eventShutdown()
868 string msg("comm::Communicator::eventShutdown | ");
870 Logger::warning(msg, ANNA_FILE_LOCATION);
872 setStatus(Status::Unavailable);
874 Handlers backup(a_handlers);
876 for(handler_iterator ii = backup.begin(), maxii = backup.end(); ii != maxii; ii ++)
882 a_threadManager->join();
883 } catch(RuntimeException& ex) {
890 std::string Communicator::asString() const
892 string result("comm::Communicator { ");
893 result += Component::asString();
894 result += " | RequestedStop: ";
895 result += anna::functions::asString(a_requestedStop);
897 result += a_status.asString();
898 result += anna::functions::asString(" | Handlers: %d", a_handlers.size());
899 result += anna::functions::asString(" | RecoveryTime: %d ms", a_recoveryTime.getValue());
900 return result += " }";
903 xml::Node* Communicator::asXML(xml::Node* parent) const
905 parent = app::Component::asXML(parent);
906 xml::Node* result = parent->createChild("comm.Communicator");
907 result->createAttribute("RequestedStop", anna::functions::asString(a_requestedStop));
908 result->createAttribute("RecoveryTime", a_recoveryTime);
909 result->createAttribute("TryingConnectionTime", a_tryingConnectionTime);
910 result->createAttribute("Timeout", a_timeout);
911 result->createAttribute("Status", a_status.asString());
912 result->createAttribute("Mode", (a_workMode == WorkMode::Single) ? "Single" : "Clone");
913 result->createAttribute("LevelOfDenialService", a_levelOfDenialService);
914 Network::instantiate().asXML(result);
915 xml::Node* node = result->createChild("comm.Handlers");
917 for(const_handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++)
918 handler(ii)->asXML(node);
920 node = result->createChild("comm.Services");
922 for(const_service_iterator ii = service_begin(), maxii = service_end(); ii != maxii; ii ++)
923 service(ii)->asXML(node);
925 a_connectionRecover->asXML(result);
926 CongestionController& congestionController = CongestionController::instantiate();
927 congestionController.asXML(result);
932 * Se invoca desde app::Application::clone -> app::Component::do_cloneParent (ojo EN EL PROCESO ORIGINAL).
933 * Ofrece la posiblidad de que el componente del proceso original liberen los recursos que no va
936 * Cada vez que se clona tiene que sacar de su comprobacion las conexiones que ha sufrido. Ya que de otro
937 * modo podria estar tratantado contestaciones a peticiones realizadas desde los procesos hijos => estos
938 * nunca recibirian las respuestas a sus peticiones.
940 * Estas conexiones no se pueden cerrar porque deben mantenerse abiertas para que los procesos hijos las
941 * hereden, solo es que este proceso no debe atenderlas.
943 * El codigo del servidor (el proceso original) no hace nada con ese Socket, lo cierra porque ya lo ha duplicado
944 * el proceso hijo y sigue atendiendo peticiones de conexion.
946 void Communicator::do_cloneParent()
947 throw(RuntimeException) {
948 LOGMETHOD(TraceMethod traceMethod("comm::Communicator", "do_cloneParent", ANNA_FILE_LOCATION));
950 Handler::Type::_v type;
952 for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++) {
953 handler = Communicator::handler(ii);
954 type = handler->getType();
956 if(type == Handler::Type::RemoteConnection || type == Handler::Type::ClientSocket)
957 a_poll->erase(handler->getfd());
961 handler = a_mainHandler;
962 a_mainHandler = NULL;
964 } catch(RuntimeException& ex) {
970 * Se invoca desde app::Application::clone -> app::Component::do_cloneChild (ojo EN EL NUEVO PROCESO).
971 * Ofrece la posiblidad de que los componentes que acaban de ser duplicados liberen los recursos que no van
972 * a usar, ya que la copia solo se dedicara a tratar los mensajes que entren por su conexion asociada y
973 * las respuestas a las peticiones originadas en el tratamiento del mismo.
975 * (1) Recordar que el handler que tiene asociado el clon esta registrado para evitar su cierre.
976 * (3) Limpia la lista porque de otro modo si el proceso clon hiciera nuevas conexiones podria encontrar
977 * que el fd ya esta guardado en la lista. No se puede invocar directamente al detach, porque este
978 * metodo modifica la lista a_handlers, y nos haria perder la cuenta del iterator.
979 * (4) Realiza un duplicado fisico de las conexiones realizadas por el proceso original, elimina el fd de
980 * la lista a comprobar, este fd sera cerrado por el metodo Handler::clone.
981 * (5) Registra el Handler por el que ha sido creado este nuevo proceso, asi que cuando se invoque al detach
982 * con ese handler => el programa terminara la ejecucion.
984 void Communicator::do_cloneChild()
986 LOGMETHOD(TraceMethod traceMethod("comm::Communicator", "do_cloneChild", ANNA_FILE_LOCATION));
988 string msg("comm::Communicator::do_cloneChild | MainHandler: ");
989 msg += a_mainHandler->asString();
990 Logger::information(msg, ANNA_FILE_LOCATION);
992 Handler* handler(NULL);
993 vector <Handler*> toDetach;
995 for(handler_iterator ii = handler_begin(); ii != handler_end(); ii ++) { // (1)
996 handler = Communicator::handler(ii);
998 if(handler == a_mainHandler)
1001 switch(handler->getType()) {
1002 case Handler::Type::Custom:
1003 case Handler::Type::RemoteConnection:
1004 case Handler::Type::ClientSocket:
1005 a_poll->erase(handler->getfd()); // (4)
1007 string msg("comm::Communicator::do_cloneChild | ");
1008 msg += handler->asString();
1009 Logger::debug(msg, ANNA_FILE_LOCATION);
1012 a_poll->insert(handler->getfd());
1015 toDetach.push_back(handler); // (3)
1020 for(vector <Handler*>::iterator ii = toDetach.begin(), maxii = toDetach.end(); ii != maxii; ii ++)
1026 singlethreadedAccept();
1027 } catch(RuntimeException& ex) {
1034 int Communicator::SortByFileDescriptor::value(const Handler* handler)
1036 return handler->getfd();