1 // ANNA - Anna is Not Nothingness Anymore
3 // (c) Copyright 2005-2014 Eduardo Ramos Testillano & Francisco Ruiz Rayo
5 // http://redmine.teslayout.com/projects/anna-suite
7 // Redistribution and use in source and binary forms, with or without
8 // modification, are permitted provided that the following conditions
11 // * Redistributions of source code must retain the above copyright
12 // notice, this list of conditions and the following disclaimer.
13 // * Redistributions in binary form must reproduce the above
14 // copyright notice, this list of conditions and the following disclaimer
15 // in the documentation and/or other materials provided with the
17 // * Neither the name of the copyright holder nor the names of its
18 // contributors may be used to endorse or promote products derived from
19 // this software without specific prior written permission.
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33 // Authors: eduardo.ramos.testillano@gmail.com
34 // cisco.tierra@gmail.com
42 #include <sys/ioctl.h>
46 #include <anna/core/tracing/Logger.hpp>
47 #include <anna/core/tracing/TraceMethod.hpp>
48 #include <anna/core/mt/Guard.hpp>
49 #include <anna/core/util/Average.hpp>
50 #include <anna/core/mt/ThreadManager.hpp>
51 #include <anna/core/mt/Thread.hpp>
52 #include <anna/core/util/Microsecond.hpp>
53 #include <anna/core/util/defines.hpp>
55 #include <anna/comm/comm.hpp>
56 #include <anna/comm/internal/sccs.hpp>
57 #include <anna/comm/internal/BinderSocket.hpp>
58 #include <anna/comm/internal/RemoteConnection.hpp>
59 #include <anna/comm/handler/Manager.hpp>
62 #include <anna/comm/internal/Poll.hpp>
65 #include <anna/core/mt/ThreadManager.hpp>
67 #include <anna/xml/Node.hpp>
68 #include <anna/xml/Attribute.hpp>
74 const Millisecond comm::Communicator::MinRecoveryTime(1000);
75 const Millisecond comm::Communicator::DefaultRecoveryTime(5000);
76 const Millisecond comm::Communicator::MaxRecoveryTime(30000);
77 const Millisecond comm::Communicator::MinTryingConnectionTime(100);
78 const Millisecond comm::Communicator::DefaultTryingConnectionTime(200);
79 const Millisecond comm::Communicator::MaxTryingConnectionTime(1000);
80 const Millisecond comm::Communicator::DefaultTimeout(10 * 60 * 1000); // 10 minutos.
83 Millisecond comm::Communicator::st_ReceivingChunkSize(comm::Communicator::DefaultChunkSize);
85 Communicator::Communicator(const Communicator::WorkMode::_v workMode) :
86 Component(getClassName()),
88 a_workMode(WorkMode::Single),
92 a_recoveryTime(DefaultRecoveryTime),
93 a_requestedStop(false),
94 a_status(Status::Available),
97 a_threadManager(NULL),
98 a_timeout(DefaultTimeout),
100 a_tryingConnectionTime(DefaultTryingConnectionTime) {
101 WHEN_SINGLETHREAD(a_poll = new Poll);
103 a_threadManager = new ThreadManager("comm::Communicator::ThreadManager", ThreadManager::Mode::Unlimit, 0)
105 handler::Manager::instantiate().initialize(this);
106 a_connectionRecover = new comm::ConnectionRecover(this);
107 a_levelOfDenialService = comm::CongestionController::MaxLevel - 1;
109 comm::sccs::activate();
113 Communicator::~Communicator() {
115 delete a_threadManager;
116 delete a_connectionRecover;
119 void Communicator::setRecoveryTime(const Millisecond &recoveryTime)
120 throw(RuntimeException) {
121 if(recoveryTime < MinRecoveryTime || recoveryTime > MaxRecoveryTime) {
122 string msg("comm::Communicator::setRecoveryTime | ");
123 msg += functions::asString("RecoveryTime (%d ms) must be between %d ms and %d ms", recoveryTime.getValue(), MinRecoveryTime.getValue(), MaxRecoveryTime.getValue());
124 throw RuntimeException(msg, ANNA_FILE_LOCATION);
127 a_recoveryTime = recoveryTime;
130 void Communicator::setTryingConnectionTime(const Millisecond &tryingConnectionTime)
131 throw(RuntimeException) {
132 if(tryingConnectionTime < MinTryingConnectionTime || tryingConnectionTime > MaxTryingConnectionTime) {
133 string msg("comm::Communicator::setTryingConnectionTime | ");
134 msg += functions::asString("TryingConnectionTime (%d ms) must be between %d ms and %d ms", tryingConnectionTime.getValue(), MinTryingConnectionTime.getValue(), MaxTryingConnectionTime.getValue());
135 throw RuntimeException(msg, ANNA_FILE_LOCATION);
138 a_tryingConnectionTime = tryingConnectionTime;
142 void Communicator::setReceivingChunkSize(const int receivingChunkSize)
143 throw(RuntimeException) {
144 if(receivingChunkSize < MinReceivingChunkSize || receivingChunkSize > MaxReceivingChunkSize) {
145 string msg("comm::Communicator::setReceivingChunkSize | ");
146 msg += functions::asString("ReceivingChunkSize (%d bytes) must be between %d bytes and %d bytes", receivingChunkSize, MinReceivingChunkSize, MaxReceivingChunkSize);
147 throw RuntimeException(msg, ANNA_FILE_LOCATION);
150 st_ReceivingChunkSize = receivingChunkSize;
151 CongestionController::instantiate().setMaxPendingBytes(receivingChunkSize);
154 void Communicator::setLevelOfDenialService(const int levelOfDenialService)
155 throw(RuntimeException) {
156 const int min(comm::CongestionController::MaxLevel - 2);
157 const int max(comm::CongestionController::MaxLevel);
159 if(levelOfDenialService < min || levelOfDenialService > max) {
160 string msg("comm::Communicator::setTryingConnectionTime | ");
161 msg += functions::asString("LevelOfDenialService %d must be between %d and %d", levelOfDenialService, min, max);
162 throw RuntimeException(msg, ANNA_FILE_LOCATION);
165 a_levelOfDenialService = levelOfDenialService;
168 void Communicator::attach(ServerSocket* serverSocket)
169 throw(RuntimeException) {
170 if(serverSocket == NULL)
171 throw RuntimeException("Cannot attach a NULL comm::ServerSocket", ANNA_FILE_LOCATION);
173 Guard guard(this, "comm::Communicator::attach (ServerSocket)");
174 Handler* handler = handler::Manager::instantiate().createHandler(serverSocket);
177 string msg("comm::Communicator::attach (ServerSocket) | ");
178 msg += handler->asString();
179 Logger::debug(msg, ANNA_FILE_LOCATION);
182 if(serverSocket->isSharedBind() == true && serverSocket->getBinderSocket() != NULL)
183 attach(serverSocket->getBinderSocket());
186 void Communicator::attach(BinderSocket* binderSocket)
187 throw(RuntimeException) {
188 if(binderSocket == NULL)
189 throw RuntimeException("Cannot attach a NULL comm::BinderSocket", ANNA_FILE_LOCATION);
191 Guard guard(this, "comm::Communicator::attach (BinderSocket)");
192 Handler* handler = handler::Manager::instantiate().createHandler(binderSocket);
195 string msg("comm::Communicator::attach (BinderSocket) | ");
196 msg += handler->asString();
197 Logger::debug(msg, ANNA_FILE_LOCATION);
202 * Comienza a tratar la conexion que hacen a un comm::ServerSocket de este proceso, desde algun otro proceso.
204 * Se invoca desde comm::handler::ServerSocket::accept [Tx] -> <null>
206 void Communicator::attach(LocalConnection* localConnection)
207 throw(RuntimeException) {
208 if(localConnection == NULL)
209 throw RuntimeException("Cannot attach a NULL comm::LocalConnection", ANNA_FILE_LOCATION);
212 * Obtiene la información/bloquea al ClientSocket para asegurar el orden correcto, ya que de otro modo,
213 * se podría producir un interbloqueo Communicator & ClientSocket.
215 * Recordar que el handler::MetaClientSocket::apply sólo bloquea el ClientSocket sobre el que actúa y luego
216 * y si detecta el cierre, desbloquea el ClientSocket y bloquea el Communicator. Así que para mantener
217 * el orden correcto hay que invocar al comm::ClientSocket::getTransport (que establece una SCCS sobre él) antes
218 * de bloquea el Communicator.
220 ClientSocket* clientSocket = localConnection->getClientSocket();
222 if(clientSocket == NULL)
223 throw RuntimeException("comm::Communicator::attach (LocalConnection) | ClientSocket can not be NULL", ANNA_FILE_LOCATION);
225 // Todavía no está corriendo el thread que se encargará de éste comm::ClientSocket => podemos acceder a él sin SSCC.
226 const Transport* transport = clientSocket->unsafe_reserveTransport();
227 Guard guard(this, "comm::Communicator::attach (LocalConnection)");
228 Handler* handler = handler::Manager::instantiate().createHandler(localConnection);
231 if(((transport == NULL) ? true : transport->enableTimeout()) == true)
232 handler->setTimeout(a_timeout);
236 if(a_workMode == WorkMode::Single && handler->supportTimeout() == true)
237 a_timedouts.add(handler)
241 string msg("comm::Communicator::attach | ");
242 msg += handler->asString();
243 Logger::debug(msg, ANNA_FILE_LOCATION);
246 if(a_workMode == WorkMode::Clone) {
247 a_mainHandler = handler;
248 app::Application& app = app::functions::getApp().clone();
253 * Este método sólo se invoca desde comm::Server::connect y este método los primero que hace
254 * es bloquear el acceso al comunicador, para asegurar el orden correcto de bloqueo a l hora
255 * de tratar la desconexión.
257 * Por eso no hace falta establecer la sección crítica que habitualmente se establece en todos
258 * los métodos Communicator::attach.
260 void Communicator::attach(RemoteConnection* remoteConnection)
261 throw(RuntimeException) {
262 if(remoteConnection == NULL)
263 throw RuntimeException("Cannot attach a NULL comm::RemoteConnection", ANNA_FILE_LOCATION);
265 // Guard guard (this, "comm::Communicator::attach (RemoteConnection)");
266 Handler* handler = handler::Manager::instantiate().createHandler(remoteConnection);
269 string msg("comm::Communicator::attach | ");
270 msg += handler->asString();
271 Logger::debug(msg, ANNA_FILE_LOCATION);
275 void Communicator::attach(ClientSocket* socket)
276 throw(RuntimeException) {
278 throw RuntimeException("Cannot attach a NULL comm::ClientSocket", ANNA_FILE_LOCATION);
280 Guard guard(this, "comm::Communicator::attach (ClientSocket)");
281 Handler* handler = handler::Manager::instantiate().createHandler(socket);
284 string msg("comm::Communicator::attach | ");
285 msg += handler->asString();
286 Logger::debug(msg, ANNA_FILE_LOCATION);
290 void Communicator::attach(DatagramSocket* socket)
291 throw(RuntimeException) {
293 throw RuntimeException("Cannot attach a NULL comm::DatagramSocket", ANNA_FILE_LOCATION);
295 Guard guard(this, "comm::Communicator::attach (DatagramSocket)");
296 Handler* handler = handler::Manager::instantiate().createHandler(socket);
299 string msg("comm::Communicator::attach | ");
300 msg += handler->asString();
301 Logger::debug(msg, ANNA_FILE_LOCATION);
305 void Communicator::attach(Handler* handler)
306 throw(RuntimeException) {
308 throw RuntimeException("Cannot attach a NULL comm::Handler", ANNA_FILE_LOCATION);
310 if(handler->getType() != Handler::Type::Custom)
311 throw RuntimeException("Communicator::attach only accept 'Custom' Handlers", ANNA_FILE_LOCATION);
313 Guard guard(this, "comm::Communicator::attach (Handler)");
316 string msg("comm::Communicator::attach (Handler) | ");
317 msg += handler->asString();
318 Logger::debug(msg, ANNA_FILE_LOCATION);
322 void Communicator::attach(Service* service)
323 throw(RuntimeException) {
325 throw RuntimeException("Cannot attach a NULL comm::Service", ANNA_FILE_LOCATION);
327 if(std::find(service_begin(), service_end(), service) != service_end())
330 service->initialize();
332 string msg("comm::Communicator::attach | ");
333 msg += service->asString();
334 Logger::debug(msg, ANNA_FILE_LOCATION);
336 a_services.push_back(service);
338 if(service->isCritical() == true && service->isAvailable() == false)
339 setStatus(Status::Unavailable);
342 void Communicator::insert(Handler* handler)
343 throw(RuntimeException) {
344 handler->initialize();
346 if(handler->getfd() < 0) {
347 string msg(handler->asString());
348 msg += " | Cannot attach a Handler with fd < 0";
349 throw RuntimeException(msg, ANNA_FILE_LOCATION);
352 const Handler* other = find(handler->getfd());
355 string msg("commm::Comunicator::insert | New: ");
356 msg += handler->asString();
357 msg += " | Collision: ";
358 msg += other->asString();
359 throw RuntimeException(msg, ANNA_FILE_LOCATION);
362 a_handlers.add(handler);
364 if(handler->supportTimeout())
365 handler->beat(anna::functions::hardwareClock());
367 WHEN_SINGLETHREAD(a_poll->insert(handler->getfd()));
370 if(a_isServing == true)
371 a_threadManager->createThread()->start(*handler);
375 void Communicator::detach(ServerSocket* serverSocket)
377 if(serverSocket == NULL)
380 comm::BinderSocket* binderSocket = serverSocket->getBinderSocket();
381 detach(find(serverSocket->getfd()));
383 if(binderSocket != NULL)
384 detach(binderSocket);
387 void Communicator::detach(ClientSocket* clientSocket)
389 if(clientSocket == NULL)
392 detach(find(clientSocket->getfd()));
395 void Communicator::detach(BinderSocket* binderSocket)
397 if(binderSocket == NULL)
400 detach(find(binderSocket->getfd()));
404 * Finaliza el trabajo del handler recibido, en este momento NO hay ninguna SSCC establecida.
405 * (1) Si se cierra la conexion con el cliente que al que atencia este proceso clonado => debe terminar la ejecucion.
407 void Communicator::detach(Handler* handler)
412 Guard guard(this, "comm::Communicator::detach");
413 const int fd = handler->getfd();
415 string msg("comm::Communicator::detach (Handler) | ");
416 msg += handler->asString();
417 Logger::debug(msg, ANNA_FILE_LOCATION);
420 if(anna::functions::supportMultithread() == true)
421 handler->requestStop();
422 else if(handler->supportTimeout() == true)
423 a_timedouts.erase(handler);
426 WHEN_SINGLETHREAD(a_poll->erase(fd););
428 if(a_handlers.erase(handler) == false) {
430 string msg(functions::asText("Handler: ", fd));
431 msg += " | Not found";
432 Logger::warning(msg, ANNA_FILE_LOCATION);
436 if(a_workMode == WorkMode::Clone && handler == a_mainHandler) // (1)
439 handler::Manager::instantiate().releaseHandler(handler);
442 const Handler* Communicator::getHandler(const ClientSocket& clientSocket)
443 throw(RuntimeException) {
444 Guard guard(this, "comm::Communicator::getHandler");
445 Handler* result = find(clientSocket.getfd());
448 switch(result->getType()) {
449 case Handler::Type::ServerSocket:
450 case Handler::Type::BinderSocket:
451 case Handler::Type::Custom:
461 //----------------------------------------------------------------------------------------------
462 // (1) Compruebo la solicitud de parada antes y despues de procesar el mensaje para acelar
463 // la solicitud de la peticion. En otro caso podrian pasar 30 seg desde que se solicita
464 // hasta que se acepta.
465 // (2) La invocacion de este metodo originara que se aniadan y borren fd's a la lista de
466 // streams pero la forma de recorrer el bucle nos blinda (un poco) de anomalias.
467 //----------------------------------------------------------------------------------------------
468 void Communicator::accept()
469 throw(RuntimeException) {
470 LOGMETHOD(TraceMethod traceMethod("comm::Communicator", "accept", ANNA_FILE_LOCATION));
472 if(isServing() == true)
473 throw RuntimeException("Communicator::accept is already invoked", ANNA_FILE_LOCATION);
475 a_requestedStop = false;
477 if(handler_size() == 0)
478 throw RuntimeException("No socket has been established for sending and/or reception", ANNA_FILE_LOCATION);
482 LOGINFORMATION(Logger::write(Logger::Information, Component::asString(), "Polling network", ANNA_FILE_LOCATION));
485 WHEN_SINGLETHREAD(singlethreadedAccept());
486 WHEN_MULTITHREAD(multithreadedAccept());
487 } catch(RuntimeException& ex) {
495 void Communicator::singlethreadedAccept()
496 throw(RuntimeException) {
497 LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "singlethreadedAccept", ANNA_FILE_LOCATION));
500 Microsecond now(anna::functions::hardwareClock());
502 a_poll->setTimeout((a_connectionRecover->isRunning() == true) ? a_recoveryTime : (Millisecond)5000);
506 while(a_requestedStop == false) {
507 if(a_connectionRecover->isRunning() == true) {
508 a_connectionRecover->tryRecover();
509 a_poll->setTimeout((a_connectionRecover->isRunning() == true) ? a_recoveryTime : (Millisecond)5000);
512 a_poll->waitMessage();
514 if(a_timedouts.size() > 0)
515 now = anna::functions::hardwareClock();
517 while((fd = a_poll->fetch()) != -1) {
518 if((handler = find(fd)) == NULL)
522 if(handler->supportTimeout())
526 } catch(RuntimeException& ex) {
531 if(a_pendingClose == true) {
532 a_pendingClose = false;
534 // @Eduardo (multiconnection to same address/port); On st, is considered to have one socket pending to be closed
535 for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++)
536 // if (Communicator::handler (ii)->testClose () == true)
538 Communicator::handler(ii)->testClose();
541 now = anna::functions::hardwareClock();
543 if(a_timedouts.size() == 0)
547 handler_iterator ii = a_timedouts.begin();
549 while(ii != a_timedouts.end()) {
550 handler = Communicator::handler(ii);
552 if(handler->isTimeout(now) == true) {
554 string msg(handler->asString());
555 msg += " | Closed due to inactivity";
556 Logger::warning(msg, ANNA_FILE_LOCATION);
559 ii = a_timedouts.begin();
570 void Communicator::multithreadedAccept()
571 throw(RuntimeException) {
572 LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "multithreadedAccept", ANNA_FILE_LOCATION));
574 Guard guard(this, "comm::Communicator::multithreadedAccept");
577 for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++) {
578 handler = Communicator::handler(ii);
580 string msg("Starting | ");
581 msg += handler->asString();
582 Logger::debug(msg, ANNA_FILE_LOCATION)
584 a_threadManager->createThread()->start(*handler);
587 Millisecond delay(500);
589 while(a_requestedStop == false) {
590 anna::functions::sleep(delay);
592 if(a_requestedStop == true)
595 if(a_connectionRecover->isRunning() == false)
599 Guard guard(this, "comm::Communicator::multithreadedAccept (ConnectionRecover)");
600 a_connectionRecover->tryRecover();
605 void Communicator::requestStop()
607 if(a_requestedStop == true)
610 Guard guard(this, "comm::Communicator::requestStop");
612 if(a_requestedStop == true)
615 a_requestedStop = true;
618 for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++)
619 handler(ii)->requestStop();
620 } catch(RuntimeException& ex) {
625 string msg("comm::Communicator::requestStop | ");
627 Logger::warning(msg, ANNA_FILE_LOCATION);
631 bool Communicator::isUsable(const ClientSocket* clientSocket)
633 if(clientSocket == NULL)
636 Guard guard(this, "comm::Communicator::isUsable");
637 Handler* handler = find(clientSocket->getfd());
642 const Handler::Type::_v type = handler->getType();
643 return (type == Handler::Type::LocalConnection || type == Handler::Type::RemoteConnection);
646 void Communicator::setStatus(const Status& status)
648 Guard guard(this, "comm::Communicator::setStatus");
650 if(a_status != status)
654 string msg("comm::Communicator::setStatus | ");
655 msg += a_status.asString();
656 Logger::information(msg, ANNA_FILE_LOCATION);
660 void Communicator::eventBreakAddress(const in_addr_t& address)
662 Device* device = Network::instantiate().find(address);
664 if(device->getStatus() == Device::Status::Down)
668 string msg("comm::Communicator::eventBreakAddress | ");
669 msg += device->asString();
670 Logger::warning(msg, ANNA_FILE_LOCATION);
672 Guard guard(this, "comm::Communicator::eventBreakAddress");
673 device->setStatus(Device::Status::Down);
675 * Trabaja sobre una copia para no perder la referencia cuando se elimine un miembro de la lista original
677 * En la lista a borrar sólo mete los handler::ServerSocket y los handler::RemoteConnection, ya que los handler::LocalConnection
678 * será liberados recursivamente cuando liberemos los primeros.
680 typedef vector <comm::Handler*> work_container;
681 typedef work_container::iterator work_iterator;
684 for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++) {
685 comm::Handler* handler = comm::Communicator::handler(ii);
687 if(dynamic_cast <comm::handler::ServerSocket*>(handler) != NULL) {
688 ww.push_back(handler);
689 } else if(dynamic_cast <comm::handler::RemoteConnection*>(handler) != NULL) {
690 ww.push_back(handler);
694 for(work_iterator ii = ww.begin(), maxii = ww.end(); ii != maxii; ii ++) {
696 std::string msg("Communicator::eventBreakAddress | ");
697 msg = (*ii)->asString();
698 Logger::debug(msg, ANNA_FILE_LOCATION)
700 (*ii)->breakAddress(address);
704 void Communicator::eventRecoverAddress(const in_addr_t& address)
706 Device* device = Network::instantiate().find(address);
708 if(device->getStatus() == Device::Status::Up)
712 string msg("comm::Communicator::eventRecoverAddress | ");
713 msg += device->asString();
714 Logger::warning(msg, ANNA_FILE_LOCATION);
716 Guard guard(this, "comm::Communicator::eventRecoverAddress");
717 device->setStatus(Device::Status::Up);
718 Handlers backup(a_handlers);
720 for(handler_iterator ii = backup.begin(), maxii = backup.end(); ii != maxii; ii ++)
721 handler(ii)->recoverAddress(address);
724 bool Communicator::eventAcceptConnection(const ClientSocket& clientSocket)
725 throw(RuntimeException) {
726 LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventAcceptConnection", ANNA_FILE_LOCATION));
728 if(a_requestedStop == true) {
730 string msg(Component::asString());
732 msg += clientSocket.asString();
733 msg += " | Connection rejected due to stop request";
734 Logger::warning(msg, ANNA_FILE_LOCATION);
739 CongestionController::Workload ww = CongestionController::instantiate().getAccumulatedWorkload();
741 if(CongestionController::getLevel(ww) >= a_levelOfDenialService) {
743 string msg("comm::Communicator::eventAcceptConnection | Level: ");
744 msg += functions::asString(CongestionController::getLevel(ww));
745 msg += functions::asString(" | Load: %d%% ", CongestionController::getLoad(ww));
746 msg += " | Result: false";
747 Logger::warning(msg, ANNA_FILE_LOCATION);
755 //--------------------------------------------------------------------------------------------------------
756 // (1) Alguno de los comm::Server asociados al servicio puede haber pasado a activo, pero nos
757 // interesa como estaba el servicio antes de esta activacion.
758 // (2) Si el servicio es "No critico" => no afecta al estado del proceso.
759 // (3) Solo si todos los servicios "Criticos" estan disponibles pasara a estar "Activo".
760 //--------------------------------------------------------------------------------------------------------
761 void Communicator::eventCreateConnection(const Server* server)
766 LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventCreateConnection", ANNA_FILE_LOCATION));
768 string msg("comm::Communicator::eventCreateConnection | ");
769 msg += server->asString();
770 Logger::notice(msg, ANNA_FILE_LOCATION);
772 bool recoverService = false;
774 for(Server::const_iterator ii = server->begin(), maxii = server->end(); ii != maxii; ii ++) {
775 Service* service = const_cast <Service*>(Server::service(ii));
777 if(service->wasAvailable() == true) { // (1)
778 service->recover(server);
782 service->recover(server);
783 eventCreateConnection(service);
784 recoverService = true;
787 if(recoverService == false || a_status == Status::Available)
790 Guard guard(this, "comm::Communicator::eventCreateConnection");
791 Status status(Status::Available);
793 for(const_service_iterator ii = service_begin(), maxii = service_end(); ii != maxii; ii ++) {
794 const Service* service = Communicator::service(ii);
796 if(service->isCritical() == false) // (2)
799 if(service->isAvailable() == false) {
800 status = Status::Unavailable;
805 setStatus(status); // (3)
808 void Communicator::eventCreateConnection(const Service* service)
814 string msg("comm::Communicator::eventCreateConnection | ");
815 msg += service->asString();
816 Logger::notice(msg, ANNA_FILE_LOCATION);
821 * Se invoca desde handler::RemoteConnection:[Tx] -> Communicator
823 void Communicator::eventBreakConnection(const Server* server)
828 LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventBreakConnection (server)", ANNA_FILE_LOCATION));
830 string msg("comm::Communicator::eventBreakConnection | ");
831 msg += server->asString();
832 Logger::warning(msg, ANNA_FILE_LOCATION);
834 //Guard guard (this, "comm::Communicator::eventBreakConnection");
835 Status status(a_status);
837 for(Server::const_iterator ii = server->begin(), maxii = server->end(); ii != maxii; ii ++) {
838 Service* service = const_cast <Service*>(Server::service(ii));
839 service->fault(server);
841 if(service->isAvailable() == true)
844 eventBreakConnection(service);
846 if(status == Status::Available && service->isCritical() == true)
847 status = Status::Unavailable;
853 void Communicator::eventBreakConnection(const Service* service)
858 LOGMETHOD(TraceMethod traceMethod(Logger::Local7, "comm::Communicator", "eventBreakConnection (service)", ANNA_FILE_LOCATION));
860 string msg("comm::Communicator::eventBreakConnection | ");
861 msg += service->asString();
862 Logger::warning(msg, ANNA_FILE_LOCATION);
866 void Communicator::eventShutdown()
869 string msg("comm::Communicator::eventShutdown | ");
871 Logger::warning(msg, ANNA_FILE_LOCATION);
873 setStatus(Status::Unavailable);
875 Handlers backup(a_handlers);
877 for(handler_iterator ii = backup.begin(), maxii = backup.end(); ii != maxii; ii ++)
883 a_threadManager->join();
884 } catch(RuntimeException& ex) {
891 std::string Communicator::asString() const
893 string result("comm::Communicator { ");
894 result += Component::asString();
895 result += " | RequestedStop: ";
896 result += anna::functions::asString(a_requestedStop);
898 result += a_status.asString();
899 result += anna::functions::asString(" | Handlers: %d", a_handlers.size());
900 result += anna::functions::asString(" | RecoveryTime: %d ms", a_recoveryTime.getValue());
901 return result += " }";
904 xml::Node* Communicator::asXML(xml::Node* parent) const
906 parent = app::Component::asXML(parent);
907 xml::Node* result = parent->createChild("comm.Communicator");
908 result->createAttribute("RequestedStop", anna::functions::asString(a_requestedStop));
909 result->createAttribute("RecoveryTime", a_recoveryTime);
910 result->createAttribute("TryingConnectionTime", a_tryingConnectionTime);
911 result->createAttribute("Timeout", a_timeout);
912 result->createAttribute("Status", a_status.asString());
913 result->createAttribute("Mode", (a_workMode == WorkMode::Single) ? "Single" : "Clone");
914 result->createAttribute("LevelOfDenialService", a_levelOfDenialService);
915 Network::instantiate().asXML(result);
916 xml::Node* node = result->createChild("comm.Handlers");
918 for(const_handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++)
919 handler(ii)->asXML(node);
921 node = result->createChild("comm.Services");
923 for(const_service_iterator ii = service_begin(), maxii = service_end(); ii != maxii; ii ++)
924 service(ii)->asXML(node);
926 a_connectionRecover->asXML(result);
927 CongestionController& congestionController = CongestionController::instantiate();
928 congestionController.asXML(result);
933 * Se invoca desde app::Application::clone -> app::Component::do_cloneParent (ojo EN EL PROCESO ORIGINAL).
934 * Ofrece la posiblidad de que el componente del proceso original liberen los recursos que no va
937 * Cada vez que se clona tiene que sacar de su comprobacion las conexiones que ha sufrido. Ya que de otro
938 * modo podria estar tratantado contestaciones a peticiones realizadas desde los procesos hijos => estos
939 * nunca recibirian las respuestas a sus peticiones.
941 * Estas conexiones no se pueden cerrar porque deben mantenerse abiertas para que los procesos hijos las
942 * hereden, solo es que este proceso no debe atenderlas.
944 * El codigo del servidor (el proceso original) no hace nada con ese Socket, lo cierra porque ya lo ha duplicado
945 * el proceso hijo y sigue atendiendo peticiones de conexion.
947 void Communicator::do_cloneParent()
948 throw(RuntimeException) {
949 LOGMETHOD(TraceMethod traceMethod("comm::Communicator", "do_cloneParent", ANNA_FILE_LOCATION));
951 Handler::Type::_v type;
953 for(handler_iterator ii = handler_begin(), maxii = handler_end(); ii != maxii; ii ++) {
954 handler = Communicator::handler(ii);
955 type = handler->getType();
957 if(type == Handler::Type::RemoteConnection || type == Handler::Type::ClientSocket)
958 a_poll->erase(handler->getfd());
962 handler = a_mainHandler;
963 a_mainHandler = NULL;
965 } catch(RuntimeException& ex) {
971 * Se invoca desde app::Application::clone -> app::Component::do_cloneChild (ojo EN EL NUEVO PROCESO).
972 * Ofrece la posiblidad de que los componentes que acaban de ser duplicados liberen los recursos que no van
973 * a usar, ya que la copia solo se dedicara a tratar los mensajes que entren por su conexion asociada y
974 * las respuestas a las peticiones originadas en el tratamiento del mismo.
976 * (1) Recordar que el handler que tiene asociado el clon esta registrado para evitar su cierre.
977 * (3) Limpia la lista porque de otro modo si el proceso clon hiciera nuevas conexiones podria encontrar
978 * que el fd ya esta guardado en la lista. No se puede invocar directamente al detach, porque este
979 * metodo modifica la lista a_handlers, y nos haria perder la cuenta del iterator.
980 * (4) Realiza un duplicado fisico de las conexiones realizadas por el proceso original, elimina el fd de
981 * la lista a comprobar, este fd sera cerrado por el metodo Handler::clone.
982 * (5) Registra el Handler por el que ha sido creado este nuevo proceso, asi que cuando se invoque al detach
983 * con ese handler => el programa terminara la ejecucion.
985 void Communicator::do_cloneChild()
987 LOGMETHOD(TraceMethod traceMethod("comm::Communicator", "do_cloneChild", ANNA_FILE_LOCATION));
989 string msg("comm::Communicator::do_cloneChild | MainHandler: ");
990 msg += a_mainHandler->asString();
991 Logger::information(msg, ANNA_FILE_LOCATION);
993 Handler* handler(NULL);
994 vector <Handler*> toDetach;
996 for(handler_iterator ii = handler_begin(); ii != handler_end(); ii ++) { // (1)
997 handler = Communicator::handler(ii);
999 if(handler == a_mainHandler)
1002 switch(handler->getType()) {
1003 case Handler::Type::Custom:
1004 case Handler::Type::RemoteConnection:
1005 case Handler::Type::ClientSocket:
1006 a_poll->erase(handler->getfd()); // (4)
1008 string msg("comm::Communicator::do_cloneChild | ");
1009 msg += handler->asString();
1010 Logger::debug(msg, ANNA_FILE_LOCATION);
1013 a_poll->insert(handler->getfd());
1016 toDetach.push_back(handler); // (3)
1021 for(vector <Handler*>::iterator ii = toDetach.begin(), maxii = toDetach.end(); ii != maxii; ii ++)
1027 singlethreadedAccept();
1028 } catch(RuntimeException& ex) {
1035 int Communicator::SortByFileDescriptor::value(const Handler* handler)
1037 return handler->getfd();