1 // ANNA - Anna is Not Nothingness Anymore //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
14 #include <sys/ioctl.h>
18 #include <anna/core/tracing/Logger.hpp>
19 #include <anna/core/tracing/TraceMethod.hpp>
20 #include <anna/core/mt/Guard.hpp>
21 #include <anna/core/util/Average.hpp>
22 #include <anna/core/mt/ThreadManager.hpp>
23 #include <anna/core/mt/Thread.hpp>
24 #include <anna/core/util/Microsecond.hpp>
25 #include <anna/core/util/defines.hpp>
27 #include <anna/comm/comm.hpp>
28 #include <anna/comm/internal/sccs.hpp>
29 #include <anna/comm/internal/BinderSocket.hpp>
30 #include <anna/comm/internal/RemoteConnection.hpp>
31 #include <anna/comm/handler/Manager.hpp>
34 #include <anna/comm/internal/Poll.hpp>
37 #include <anna/core/mt/ThreadManager.hpp>
39 #include <anna/xml/Node.hpp>
40 #include <anna/xml/Attribute.hpp>
46 const Millisecond comm::Communicator::MinRecoveryTime(1000);
47 const Millisecond comm::Communicator::DefaultRecoveryTime(5000);
48 const Millisecond comm::Communicator::MaxRecoveryTime(30000);
49 const Millisecond comm::Communicator::MinTryingConnectionTime(100);
50 const Millisecond comm::Communicator::DefaultTryingConnectionTime(200);
51 const Millisecond comm::Communicator::MaxTryingConnectionTime(1000);
52 const Millisecond comm::Communicator::DefaultTimeout(10 * 60 * 1000); // 10 minutos.
55 Millisecond comm::Communicator::st_ReceivingChunkSize(comm::Communicator::DefaultChunkSize);
57 Communicator::Communicator(const Communicator::WorkMode::_v workMode) :
58 Component(getClassName()),
60 a_workMode(WorkMode::Single),
64 a_recoveryTime(DefaultRecoveryTime),
65 a_requestedStop(false),
66 a_status(Status::Available),
69 a_threadManager(NULL),
70 a_timeout(DefaultTimeout),
72 a_tryingConnectionTime(DefaultTryingConnectionTime) {
73 WHEN_SINGLETHREAD(a_poll = new Poll);
75 a_threadManager = new ThreadManager("comm::Communicator::ThreadManager", ThreadManager::Mode::Unlimit, 0)
77 handler::Manager::instantiate().initialize(this);
78 a_connectionRecover = new comm::ConnectionRecover(this);
79 a_levelOfDenialService = comm::CongestionController::MaxLevel - 1;
81 comm::sccs::activate();
85 Communicator::~Communicator() {
87 delete a_threadManager;
88 delete a_connectionRecover;
91 void Communicator::setRecoveryTime(const Millisecond &recoveryTime)
92 throw(RuntimeException) {
93 if(recoveryTime < MinRecoveryTime || recoveryTime > MaxRecoveryTime) {
94 string msg("comm::Communicator::setRecoveryTime | ");
95 msg += functions::asString("RecoveryTime (%d ms) must be between %d ms and %d ms", recoveryTime.getValue(), MinRecoveryTime.getValue(), MaxRecoveryTime.getValue());
96 throw RuntimeException(msg, ANNA_FILE_LOCATION);
99 a_recoveryTime = recoveryTime;
102 void Communicator::setTryingConnectionTime(const Millisecond &tryingConnectionTime)
103 throw(RuntimeException) {
104 if(tryingConnectionTime < MinTryingConnectionTime || tryingConnectionTime > MaxTryingConnectionTime) {
105 string msg("comm::Communicator::setTryingConnectionTime | ");
106 msg += functions::asString("TryingConnectionTime (%d ms) must be between %d ms and %d ms", tryingConnectionTime.getValue(), MinTryingConnectionTime.getValue(), MaxTryingConnectionTime.getValue());
107 throw RuntimeException(msg, ANNA_FILE_LOCATION);
110 a_tryingConnectionTime = tryingConnectionTime;
114 void Communicator::setReceivingChunkSize(const int receivingChunkSize)
115 throw(RuntimeException) {
116 if(receivingChunkSize < MinReceivingChunkSize || receivingChunkSize > MaxReceivingChunkSize) {
117 string msg("comm::Communicator::setReceivingChunkSize | ");
118 msg += functions::asString("ReceivingChunkSize (%d bytes) must be between %d bytes and %d bytes", receivingChunkSize, MinReceivingChunkSize, MaxReceivingChunkSize);
119 throw RuntimeException(msg, ANNA_FILE_LOCATION);
122 st_ReceivingChunkSize = receivingChunkSize;
123 CongestionController::instantiate().setMaxPendingBytes(receivingChunkSize);
126 void Communicator::setLevelOfDenialService(const int levelOfDenialService)
127 throw(RuntimeException) {
128 const int min(comm::CongestionController::MaxLevel - 2);
129 const int max(comm::CongestionController::MaxLevel);
131 if(levelOfDenialService < min || levelOfDenialService > max) {
132 string msg("comm::Communicator::setTryingConnectionTime | ");
133 msg += functions::asString("LevelOfDenialService %d must be between %d and %d", levelOfDenialService, min, max);
134 throw RuntimeException(msg, ANNA_FILE_LOCATION);
137 a_levelOfDenialService = levelOfDenialService;
140 void Communicator::attach(ServerSocket* serverSocket)
141 throw(RuntimeException) {
142 if(serverSocket == NULL)
143 throw RuntimeException("Cannot attach a NULL comm::ServerSocket", ANNA_FILE_LOCATION);
145 Guard guard(this, "comm::Communicator::attach (ServerSocket)");
146 Handler* handler = handler::Manager::instantiate().createHandler(serverSocket);
149 string msg("comm::Communicator::attach (ServerSocket) | ");
150 msg += handler->asString();
151 Logger::debug(msg, ANNA_FILE_LOCATION);
154 if(serverSocket->isSharedBind() == true && serverSocket->getBinderSocket() != NULL)
155 attach(serverSocket->getBinderSocket());
158 void Communicator::attach(BinderSocket* binderSocket)
159 throw(RuntimeException) {
160 if(binderSocket == NULL)
161 throw RuntimeException("Cannot attach a NULL comm::BinderSocket", ANNA_FILE_LOCATION);
163 Guard guard(this, "comm::Communicator::attach (BinderSocket)");
164 Handler* handler = handler::Manager::instantiate().createHandler(binderSocket);
167 string msg("comm::Communicator::attach (BinderSocket) | ");
168 msg += handler->asString();
169 Logger::debug(msg, ANNA_FILE_LOCATION);
174 * Comienza a tratar la conexion que hacen a un comm::ServerSocket de este proceso, desde algun otro proceso.
176 * Se invoca desde comm::handler::ServerSocket::accept [Tx] -> <null>
178 void Communicator::attach(LocalConnection* localConnection)
179 throw(RuntimeException) {
180 if(localConnection == NULL)
181 throw RuntimeException("Cannot attach a NULL comm::LocalConnection", ANNA_FILE_LOCATION);
184 * Obtiene la información/bloquea al ClientSocket para asegurar el orden correcto, ya que de otro modo,
185 * se podría producir un interbloqueo Communicator & ClientSocket.
187 * Recordar que el handler::MetaClientSocket::apply sólo bloquea el ClientSocket sobre el que actúa y luego
188 * y si detecta el cierre, desbloquea el ClientSocket y bloquea el Communicator. Así que para mantener
189 * el orden correcto hay que invocar al comm::ClientSocket::getTransport (que establece una SCCS sobre él) antes
190 * de bloquea el Communicator.
192 ClientSocket* clientSocket = localConnection->getClientSocket();
194 if(clientSocket == NULL)
195 throw RuntimeException("comm::Communicator::attach (LocalConnection) | ClientSocket can not be NULL", ANNA_FILE_LOCATION);
197 // Todavía no está corriendo el thread que se encargará de éste comm::ClientSocket => podemos acceder a él sin SSCC.
198 const Transport* transport = clientSocket->unsafe_reserveTransport();
199 Guard guard(this, "comm::Communicator::attach (LocalConnection)");
200 Handler* handler = handler::Manager::instantiate().createHandler(localConnection);
203 if(((transport == NULL) ? true : transport->enableTimeout()) == true)
204 handler->setTimeout(a_timeout);
208 if(a_workMode == WorkMode::Single && handler->supportTimeout() == true)
209 a_timedouts.add(handler)
213 string msg("comm::Communicator::attach | ");
214 msg += handler->asString();
215 Logger::debug(msg, ANNA_FILE_LOCATION);
218 if(a_workMode == WorkMode::Clone) {
219 a_mainHandler = handler;
220 app::Application& app = app::functions::getApp().clone();
225 * Este método sólo se invoca desde comm::Server::connect y este método los primero que hace
226 * es bloquear el acceso al comunicador, para asegurar el orden correcto de bloqueo a l hora
227 * de tratar la desconexión.
229 * Por eso no hace falta establecer la sección crítica que habitualmente se establece en todos
230 * los métodos Communicator::attach.
232 void Communicator::attach(RemoteConnection* remoteConnection)
233 throw(RuntimeException) {
234 if(remoteConnection == NULL)
235 throw RuntimeException("Cannot attach a NULL comm::RemoteConnection", ANNA_FILE_LOCATION);
237 // Guard guard (this, "comm::Communicator::attach (RemoteConnection)");
238 Handler* handler = handler::Manager::instantiate().createHandler(remoteConnection);
241 string msg("comm::Communicator::attach | ");
242 msg += handler->asString();
243 Logger::debug(msg, ANNA_FILE_LOCATION);
247 void Communicator::attach(ClientSocket* socket)
248 throw(RuntimeException) {
250 throw RuntimeException("Cannot attach a NULL comm::ClientSocket", ANNA_FILE_LOCATION);
252 Guard guard(this, "comm::Communicator::attach (ClientSocket)");
253 Handler* handler = handler::Manager::instantiate().createHandler(socket);
256 string msg("comm::Communicator::attach | ");
257 msg += handler->asString();
258 Logger::debug(msg, ANNA_FILE_LOCATION);
262 void Communicator::attach(DatagramSocket* socket)
263 throw(RuntimeException) {
265 throw RuntimeException("Cannot attach a NULL comm::DatagramSocket", ANNA_FILE_LOCATION);
267 Guard guard(this, "comm::Communicator::attach (DatagramSocket)");
268 Handler* handler = handler::Manager::instantiate().createHandler(socket);
271 string msg("comm::Communicator::attach | ");
272 msg += handler->asString();
273 Logger::debug(msg, ANNA_FILE_LOCATION);
277 void Communicator::attach(Handler* handler)
278 throw(RuntimeException) {
280 throw RuntimeException("Cannot attach a NULL comm::Handler", ANNA_FILE_LOCATION);
282 if(handler->getType() != Handler::Type::Custom)
283 throw RuntimeException("Communicator::attach only accept 'Custom' Handlers", ANNA_FILE_LOCATION);
285 Guard guard(this, "comm::Communicator::attach (Handler)");
288 string msg("comm::Communicator::attach (Handler) | ");
289 msg += handler->asString();
290 Logger::debug(msg, ANNA_FILE_LOCATION);
294 void Communicator::attach(Service* service)
295 throw(RuntimeException) {
297 throw RuntimeException("Cannot attach a NULL comm::Service", ANNA_FILE_LOCATION);
299 if(std::find(service_begin(), service_end(), service) != service_end())
302 service->initialize();
304 string msg("comm::Communicator::attach | ");
305 msg += service->asString();
306 Logger::debug(msg, ANNA_FILE_LOCATION);
308 a_services.push_back(service);
310 if(service->isCritical() == true && service->isAvailable() == false)
311 setStatus(Status::Unavailable);
314 void Communicator::insert(Handler* handler)
315 throw(RuntimeException) {
316 handler->initialize();
318 if(handler->getfd() < 0) {
319 string msg(handler->asString());
320 msg += " | Cannot attach a Handler with fd < 0";
321 throw RuntimeException(msg, ANNA_FILE_LOCATION);
324 const Handler* other = find(handler->getfd());
327 string msg("commm::Comunicator::insert | New: ");
328 msg += handler->asString();
329 msg += " | Collision: ";
330 msg += other->asString();
331 throw RuntimeException(msg, ANNA_FILE_LOCATION);
334 a_handlers.add(handler);
336 if(handler->supportTimeout())
337 handler->beat(anna::functions::hardwareClock());
339 WHEN_SINGLETHREAD(a_poll->insert(handler->getfd()));
342 if(a_isServing == true)
343 a_threadManager->createThread()->start(*handler);
347 void Communicator::detach(ServerSocket* serverSocket)
349 if(serverSocket == NULL)
352 comm::BinderSocket* binderSocket = serverSocket->getBinderSocket();
353 detach(find(serverSocket->getfd()));
355 if(binderSocket != NULL)
356 detach(binderSocket);
359 void Communicator::detach(ClientSocket* clientSocket)
361 if(clientSocket == NULL)
364 detach(find(clientSocket->getfd()));
367 void Communicator::detach(BinderSocket* binderSocket)
369 if(binderSocket == NULL)
372 detach(find(binderSocket->getfd()));
376 * Finaliza el trabajo del handler recibido, en este momento NO hay ninguna SSCC establecida.
377 * (1) Si se cierra la conexion con el cliente que al que atencia este proceso clonado => debe terminar la ejecucion.
379 void Communicator::detach(Handler* handler)
384 Guard guard(this, "comm::Communicator::detach");
385 const int fd = handler->getfd();
387 string msg("comm::Communicator::detach (Handler) | ");
388 msg += handler->asString();
389 Logger::debug(msg, ANNA_FILE_LOCATION);
392 if(anna::functions::supportMultithread() == true)
393 handler->requestStop();
394 else if(handler->supportTimeout() == true)
395 a_timedouts.erase(handler);
398 WHEN_SINGLETHREAD(a_poll->erase(fd););
400 if(a_handlers.erase(handler) == false) {
402 string msg(functions::asText("Handler: ", fd));
403 msg += " | Not found";
404 Logger::warning(msg, ANNA_FILE_LOCATION);
408 if(a_workMode == WorkMode::Clone && handler == a_mainHandler) // (1)
411 handler::Manager::instantiate().releaseHandler(handler);
414 const Handler* Communicator::getHandler(const ClientSocket& clientSocket)
415 throw(RuntimeException) {
416 Guard guard(this, "comm::Communicator::getHandler");
417 Handler* result = find(clientSocket.getfd());
420 switch(result->getType()) {
421 case Handler::Type::ServerSocket:
422 case Handler::Type::BinderSocket:
423 case Handler::Type::Custom:
433 //----------------------------------------------------------------------------------------------
434 // (1) Compruebo la solicitud de parada antes y despues de procesar el mensaje para acelar
435 // la solicitud de la peticion. En otro caso podrian pasar 30 seg desde que se solicita
436 // hasta que se acepta.
437 // (2) La invocacion de este metodo originara que se aniadan y borren fd's a la lista de
438 // streams pero la forma de recorrer el bucle nos blinda (un poco) de anomalias.
439 //----------------------------------------------------------------------------------------------
440 void Communicator::accept()
441 throw(RuntimeException) {
442 LOGMETHOD(TraceMethod traceMethod("comm::Communicator", "accept", ANNA_FILE_LOCATION));
444 if(isServing() == true)
445 throw RuntimeException("Communicator::accept is already invoked", ANNA_FILE_LOCATION);
447 a_requestedStop = false;
449 if(handler_size() == 0)
450 throw RuntimeException("No socket has been established for sending and/or reception", ANNA_FILE_LOCATION);
454 LOGINFORMATION(Logger::write(Logger::Information, Component::asString(), "Polling network", ANNA_FILE_LOCATION));
457 WHEN_SINGLETHREAD(singlethreadedAccept());
458 WHEN_MULTITHREAD(multithreadedAccept());
459 } catch(RuntimeException& ex) {
467 void Communicator::singlethreadedAccept()
468 throw(RuntimeException) {
469 LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "singlethreadedAccept", ANNA_FILE_LOCATION));
472 Microsecond now(anna::functions::hardwareClock());
474 a_poll->setTimeout((a_connectionRecover->isRunning() == true) ? a_recoveryTime : (Millisecond)5000);
478 while(a_requestedStop == false) {
479 if(a_connectionRecover->isRunning() == true) {
480 a_connectionRecover->tryRecover();
481 a_poll->setTimeout((a_connectionRecover->isRunning() == true) ? a_recoveryTime : (Millisecond)5000);
484 a_poll->waitMessage();
486 if(a_timedouts.size() > 0)
487 now = anna::functions::hardwareClock();
489 while((fd = a_poll->fetch()) != -1) {
490 if((handler = find(fd)) == NULL)
494 if(handler->supportTimeout())
498 } catch(RuntimeException& ex) {
503 if(a_pendingClose == true) {
504 a_pendingClose = false;
506 // @Eduardo (multiconnection to same address/port); On st, is considered to have one socket pending to be closed
507 for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++)
508 // if (Communicator::handler (ii)->testClose () == true)
510 Communicator::handler(ii)->testClose();
513 now = anna::functions::hardwareClock();
515 if(a_timedouts.size() == 0)
519 handler_iterator ii = a_timedouts.begin();
521 while(ii != a_timedouts.end()) {
522 handler = Communicator::handler(ii);
524 if(handler->isTimeout(now) == true) {
526 string msg(handler->asString());
527 msg += " | Closed due to inactivity";
528 Logger::warning(msg, ANNA_FILE_LOCATION);
531 ii = a_timedouts.begin();
542 void Communicator::multithreadedAccept()
543 throw(RuntimeException) {
544 LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "multithreadedAccept", ANNA_FILE_LOCATION));
546 Guard guard(this, "comm::Communicator::multithreadedAccept");
549 for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++) {
550 handler = Communicator::handler(ii);
552 string msg("Starting | ");
553 msg += handler->asString();
554 Logger::debug(msg, ANNA_FILE_LOCATION)
556 a_threadManager->createThread()->start(*handler);
559 Millisecond delay(500);
561 while(a_requestedStop == false) {
562 anna::functions::sleep(delay);
564 if(a_requestedStop == true)
567 if(a_connectionRecover->isRunning() == false)
571 Guard guard(this, "comm::Communicator::multithreadedAccept (ConnectionRecover)");
572 a_connectionRecover->tryRecover();
577 void Communicator::requestStop()
579 if(a_requestedStop == true)
582 Guard guard(this, "comm::Communicator::requestStop");
584 if(a_requestedStop == true)
587 a_requestedStop = true;
590 for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++)
591 handler(ii)->requestStop();
592 } catch(RuntimeException& ex) {
597 string msg("comm::Communicator::requestStop | ");
599 Logger::warning(msg, ANNA_FILE_LOCATION);
603 bool Communicator::isUsable(const ClientSocket* clientSocket)
605 if(clientSocket == NULL)
608 Guard guard(this, "comm::Communicator::isUsable");
609 Handler* handler = find(clientSocket->getfd());
614 const Handler::Type::_v type = handler->getType();
615 return (type == Handler::Type::LocalConnection || type == Handler::Type::RemoteConnection);
618 void Communicator::setStatus(const Status& status)
620 Guard guard(this, "comm::Communicator::setStatus");
622 if(a_status != status)
626 string msg("comm::Communicator::setStatus | ");
627 msg += a_status.asString();
628 Logger::information(msg, ANNA_FILE_LOCATION);
632 void Communicator::eventBreakAddress(const in_addr_t& address)
634 Device* device = Network::instantiate().find(address);
636 if(device->getStatus() == Device::Status::Down)
640 string msg("comm::Communicator::eventBreakAddress | ");
641 msg += device->asString();
642 Logger::warning(msg, ANNA_FILE_LOCATION);
644 Guard guard(this, "comm::Communicator::eventBreakAddress");
645 device->setStatus(Device::Status::Down);
647 * Trabaja sobre una copia para no perder la referencia cuando se elimine un miembro de la lista original
649 * En la lista a borrar sólo mete los handler::ServerSocket y los handler::RemoteConnection, ya que los handler::LocalConnection
650 * será liberados recursivamente cuando liberemos los primeros.
652 typedef vector <comm::Handler*> work_container;
653 typedef work_container::iterator work_iterator;
656 for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++) {
657 comm::Handler* handler = comm::Communicator::handler(ii);
659 if(dynamic_cast <comm::handler::ServerSocket*>(handler) != NULL) {
660 ww.push_back(handler);
661 } else if(dynamic_cast <comm::handler::RemoteConnection*>(handler) != NULL) {
662 ww.push_back(handler);
666 for(work_iterator ii = ww.begin(), maxii = ww.end(); ii != maxii; ii ++) {
668 std::string msg("Communicator::eventBreakAddress | ");
669 msg = (*ii)->asString();
670 Logger::debug(msg, ANNA_FILE_LOCATION)
672 (*ii)->breakAddress(address);
676 void Communicator::eventRecoverAddress(const in_addr_t& address)
678 Device* device = Network::instantiate().find(address);
680 if(device->getStatus() == Device::Status::Up)
684 string msg("comm::Communicator::eventRecoverAddress | ");
685 msg += device->asString();
686 Logger::warning(msg, ANNA_FILE_LOCATION);
688 Guard guard(this, "comm::Communicator::eventRecoverAddress");
689 device->setStatus(Device::Status::Up);
690 Handlers backup(a_handlers);
692 for(handler_iterator ii = backup.begin(), maxii = backup.end(); ii != maxii; ii ++)
693 handler(ii)->recoverAddress(address);
696 bool Communicator::eventAcceptConnection(const ClientSocket& clientSocket)
697 throw(RuntimeException) {
698 LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventAcceptConnection", ANNA_FILE_LOCATION));
700 if(a_requestedStop == true) {
702 string msg(Component::asString());
704 msg += clientSocket.asString();
705 msg += " | Connection rejected due to stop request";
706 Logger::warning(msg, ANNA_FILE_LOCATION);
711 CongestionController::Workload ww = CongestionController::instantiate().getAccumulatedWorkload();
713 if(CongestionController::getLevel(ww) >= a_levelOfDenialService) {
715 string msg("comm::Communicator::eventAcceptConnection | Level: ");
716 msg += functions::asString(CongestionController::getLevel(ww));
717 msg += functions::asString(" | Load: %d%% ", CongestionController::getLoad(ww));
718 msg += " | Result: false";
719 Logger::warning(msg, ANNA_FILE_LOCATION);
727 //--------------------------------------------------------------------------------------------------------
728 // (1) Alguno de los comm::Server asociados al servicio puede haber pasado a activo, pero nos
729 // interesa como estaba el servicio antes de esta activacion.
730 // (2) Si el servicio es "No critico" => no afecta al estado del proceso.
731 // (3) Solo si todos los servicios "Criticos" estan disponibles pasara a estar "Activo".
732 //--------------------------------------------------------------------------------------------------------
733 void Communicator::eventCreateConnection(const Server* server)
738 LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventCreateConnection", ANNA_FILE_LOCATION));
740 string msg("comm::Communicator::eventCreateConnection | ");
741 msg += server->asString();
742 Logger::notice(msg, ANNA_FILE_LOCATION);
744 bool recoverService = false;
746 for(Server::const_iterator ii = server->begin(), maxii = server->end(); ii != maxii; ii ++) {
747 Service* service = const_cast <Service*>(Server::service(ii));
749 if(service->wasAvailable() == true) { // (1)
750 service->recover(server);
754 service->recover(server);
755 eventCreateConnection(service);
756 recoverService = true;
759 if(recoverService == false || a_status == Status::Available)
762 Guard guard(this, "comm::Communicator::eventCreateConnection");
763 Status status(Status::Available);
765 for(const_service_iterator ii = service_begin(), maxii = service_end(); ii != maxii; ii ++) {
766 const Service* service = Communicator::service(ii);
768 if(service->isCritical() == false) // (2)
771 if(service->isAvailable() == false) {
772 status = Status::Unavailable;
777 setStatus(status); // (3)
780 void Communicator::eventCreateConnection(const Service* service)
786 string msg("comm::Communicator::eventCreateConnection | ");
787 msg += service->asString();
788 Logger::notice(msg, ANNA_FILE_LOCATION);
793 * Se invoca desde handler::RemoteConnection:[Tx] -> Communicator
795 void Communicator::eventBreakConnection(const Server* server)
800 LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventBreakConnection (server)", ANNA_FILE_LOCATION));
802 string msg("comm::Communicator::eventBreakConnection | ");
803 msg += server->asString();
804 Logger::warning(msg, ANNA_FILE_LOCATION);
806 //Guard guard (this, "comm::Communicator::eventBreakConnection");
807 Status status(a_status);
809 for(Server::const_iterator ii = server->begin(), maxii = server->end(); ii != maxii; ii ++) {
810 Service* service = const_cast <Service*>(Server::service(ii));
811 service->fault(server);
813 if(service->isAvailable() == true)
816 eventBreakConnection(service);
818 if(status == Status::Available && service->isCritical() == true)
819 status = Status::Unavailable;
825 void Communicator::eventBreakConnection(const Service* service)
830 LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventBreakConnection (service)", ANNA_FILE_LOCATION));
832 string msg("comm::Communicator::eventBreakConnection | ");
833 msg += service->asString();
834 Logger::warning(msg, ANNA_FILE_LOCATION);
838 void Communicator::eventShutdown()
841 string msg("comm::Communicator::eventShutdown | ");
843 Logger::warning(msg, ANNA_FILE_LOCATION);
845 setStatus(Status::Unavailable);
847 Handlers backup(a_handlers);
849 for(handler_iterator ii = backup.begin(), maxii = backup.end(); ii != maxii; ii ++)
855 a_threadManager->join();
856 } catch(RuntimeException& ex) {
863 std::string Communicator::asString() const
865 string result("comm::Communicator { ");
866 result += Component::asString();
867 result += " | RequestedStop: ";
868 result += anna::functions::asString(a_requestedStop);
870 result += a_status.asString();
871 result += anna::functions::asString(" | Handlers: %d", a_handlers.size());
872 result += anna::functions::asString(" | RecoveryTime: %d ms", a_recoveryTime.getValue());
873 return result += " }";
876 xml::Node* Communicator::asXML(xml::Node* parent) const
878 parent = app::Component::asXML(parent);
879 xml::Node* result = parent->createChild("comm.Communicator");
880 result->createAttribute("RequestedStop", anna::functions::asString(a_requestedStop));
881 result->createAttribute("RecoveryTime", a_recoveryTime);
882 result->createAttribute("TryingConnectionTime", a_tryingConnectionTime);
883 result->createAttribute("Timeout", a_timeout);
884 result->createAttribute("Status", a_status.asString());
885 result->createAttribute("Mode", (a_workMode == WorkMode::Single) ? "Single" : "Clone");
886 result->createAttribute("LevelOfDenialService", a_levelOfDenialService);
887 Network::instantiate().asXML(result);
888 xml::Node* node = result->createChild("comm.Handlers");
890 for(const_handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++)
891 handler(ii)->asXML(node);
893 node = result->createChild("comm.Services");
895 for(const_service_iterator ii = service_begin(), maxii = service_end(); ii != maxii; ii ++)
896 service(ii)->asXML(node);
898 a_connectionRecover->asXML(result);
899 CongestionController& congestionController = CongestionController::instantiate();
900 congestionController.asXML(result);
905 * Se invoca desde app::Application::clone -> app::Component::do_cloneParent (ojo EN EL PROCESO ORIGINAL).
906 * Ofrece la posiblidad de que el componente del proceso original liberen los recursos que no va
909 * Cada vez que se clona tiene que sacar de su comprobacion las conexiones que ha sufrido. Ya que de otro
910 * modo podria estar tratantado contestaciones a peticiones realizadas desde los procesos hijos => estos
911 * nunca recibirian las respuestas a sus peticiones.
913 * Estas conexiones no se pueden cerrar porque deben mantenerse abiertas para que los procesos hijos las
914 * hereden, solo es que este proceso no debe atenderlas.
916 * El codigo del servidor (el proceso original) no hace nada con ese Socket, lo cierra porque ya lo ha duplicado
917 * el proceso hijo y sigue atendiendo peticiones de conexion.
919 void Communicator::do_cloneParent()
920 throw(RuntimeException) {
921 LOGMETHOD(TraceMethod traceMethod("comm::Communicator", "do_cloneParent", ANNA_FILE_LOCATION));
923 Handler::Type::_v type;
925 for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++) {
926 handler = Communicator::handler(ii);
927 type = handler->getType();
929 if(type == Handler::Type::RemoteConnection || type == Handler::Type::ClientSocket)
930 a_poll->erase(handler->getfd());
934 handler = a_mainHandler;
935 a_mainHandler = NULL;
937 } catch(RuntimeException& ex) {
943 * Se invoca desde app::Application::clone -> app::Component::do_cloneChild (ojo EN EL NUEVO PROCESO).
944 * Ofrece la posiblidad de que los componentes que acaban de ser duplicados liberen los recursos que no van
945 * a usar, ya que la copia solo se dedicara a tratar los mensajes que entren por su conexion asociada y
946 * las respuestas a las peticiones originadas en el tratamiento del mismo.
948 * (1) Recordar que el handler que tiene asociado el clon esta registrado para evitar su cierre.
949 * (3) Limpia la lista porque de otro modo si el proceso clon hiciera nuevas conexiones podria encontrar
950 * que el fd ya esta guardado en la lista. No se puede invocar directamente al detach, porque este
951 * metodo modifica la lista a_handlers, y nos haria perder la cuenta del iterator.
952 * (4) Realiza un duplicado fisico de las conexiones realizadas por el proceso original, elimina el fd de
953 * la lista a comprobar, este fd sera cerrado por el metodo Handler::clone.
954 * (5) Registra el Handler por el que ha sido creado este nuevo proceso, asi que cuando se invoque al detach
955 * con ese handler => el programa terminara la ejecucion.
957 void Communicator::do_cloneChild()
959 LOGMETHOD(TraceMethod traceMethod("comm::Communicator", "do_cloneChild", ANNA_FILE_LOCATION));
961 string msg("comm::Communicator::do_cloneChild | MainHandler: ");
962 msg += a_mainHandler->asString();
963 Logger::information(msg, ANNA_FILE_LOCATION);
965 Handler* handler(NULL);
966 vector <Handler*> toDetach;
968 for(handler_iterator ii = handler_begin(); ii != handler_end(); ii ++) { // (1)
969 handler = Communicator::handler(ii);
971 if(handler == a_mainHandler)
974 switch(handler->getType()) {
975 case Handler::Type::Custom:
976 case Handler::Type::RemoteConnection:
977 case Handler::Type::ClientSocket:
978 a_poll->erase(handler->getfd()); // (4)
980 string msg("comm::Communicator::do_cloneChild | ");
981 msg += handler->asString();
982 Logger::debug(msg, ANNA_FILE_LOCATION);
985 a_poll->insert(handler->getfd());
988 toDetach.push_back(handler); // (3)
993 for(vector <Handler*>::iterator ii = toDetach.begin(), maxii = toDetach.end(); ii != maxii; ii ++)
999 singlethreadedAccept();
1000 } catch(RuntimeException& ex) {
1007 int Communicator::SortByFileDescriptor::value(const Handler* handler)
1009 return handler->getfd();