1 // ANNA - Anna is Not Nothingness Anymore //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
14 //#include <asm/ioctls.h>
18 #include <anna/core/tracing/TraceMethod.hpp>
19 #include <anna/core/functions.hpp>
21 #include <anna/xml/Node.hpp>
22 #include <anna/xml/Attribute.hpp>
24 #include <anna/app/functions.hpp>
26 #include <anna/comm/ClientSocket.hpp>
27 #include <anna/comm/INetAddress.hpp>
28 #include <anna/comm/TransportFactory.hpp>
29 #include <anna/comm/Transport.hpp>
30 #include <anna/comm/Message.hpp>
31 #include <anna/comm/Network.hpp>
32 #include <anna/comm/Host.hpp>
33 #include <anna/comm/Server.hpp>
34 #include <anna/comm/Communicator.hpp>
35 #include <anna/comm/ReceiverFactory.hpp>
36 #include <anna/comm/Receiver.hpp>
37 #include <anna/comm/CongestionController.hpp>
39 #include <anna/comm/internal/Poll.hpp>
45 const Millisecond comm::ClientSocket::DefaultMaxConnectionDelay(200);
46 const Millisecond comm::ClientSocket::DefaultMaxWriteDelay(500);
49 void comm::ClientSocket::initialize()
51 a_status = Status::None;
55 a_cachedServer = NULL;
56 a_msMaxConnectionDelay = DefaultMaxConnectionDelay;
57 a_msMaxWriteDelay = DefaultMaxWriteDelay;
63 * Se invoca desde el handler::ServerSocket y no hace falta protegerlo con una SCC
64 * porque ya está protegido por éste.
66 void comm::ClientSocket::setfd(const int fd)
67 throw(RuntimeException) {
69 Socket::a_isBound = true;
70 activate(Status::Connected);
73 if (Socket::a_type == Socket::Type::Stream) {
75 socklen_t len(sizeof(addr));
76 Network& network(Network::instantiate());
77 anna_comm_socket_check(
78 getsockname(fd, (struct sockaddr *) &addr, &len),
79 "Cannot obtain socket name"
81 Socket::a_localAccessPoint = INetAddress(network.find(addr.sin_addr.s_addr), ntohs(addr.sin_port));
82 anna_comm_socket_check(
83 getpeername(fd, (struct sockaddr *) &addr, &len),
84 "Cannot obtain peer name"
86 a_remoteAccessPoint = INetAddress(network.find(addr.sin_addr.s_addr), ntohs(addr.sin_port));
89 setBlockingMode(false);
92 string msg("comm::ClientSocket::setfd | ");
94 Logger::debug(msg, ANNA_FILE_LOCATION);
99 * Si se establece el receptor directamente, sin tener una factoria intermedia, hay que
100 * invocar al metodo comm::Receiver::initialize.
102 void comm::ClientSocket::setReceiver(comm::Receiver* receiver)
103 throw(RuntimeException) {
104 if (a_receiver && Socket::a_receiverFactory)
105 Socket::a_receiverFactory->release(a_receiver);
107 a_receiver = receiver;
109 if (a_receiverFactory == NULL)
110 a_receiver->initialize();
113 // Protegido desde el Socket::close
114 void comm::ClientSocket::do_close()
118 if (a_transport != NULL) {
119 a_transport->clear();
120 Guard guard(Socket::a_transportFactory, typeid(Socket::a_transportFactory).name());
121 Socket::a_transportFactory->release(a_transport);
125 a_status = Status::None;
128 a_cachedServer = NULL;
132 if (Socket::a_receiverFactory)
133 Socket::a_receiverFactory->release(a_receiver);
139 comm::Server* comm::ClientSocket::getServer()
140 throw(RuntimeException) {
141 Guard guard(*this, "comm::ClientSocket (getServer)");
142 const INetAddress& remoteAddress = getRemoteAccessPoint().getINetAddress();
143 const Device* device = remoteAddress.getDevice(false);
144 const int remotePort = remoteAddress.getPort();
149 if (a_cachedServer != NULL)
150 if (a_cachedServer->getRemotePort() == remotePort && a_cachedServer->getHost()->contains(device))
151 return a_cachedServer;
153 Network& network = Network::instantiate();
154 comm::Server* result = NULL;
157 for (Network::host_iterator ii = network.host_begin(), maxii = network.host_end(); ii != maxii; ii ++) {
158 if (Network::host(ii)->contains(device) == false)
161 if ((aux = Network::host(ii)->find_server(remotePort)) == NULL)
168 return (a_cachedServer = result);
171 //_________________________________________________________________________________
172 // (1) Si termina el periodo de espera sin devolver ninguna actividad => podemos considerar que
173 // se ha conectado. Es posible que haya algún componente hardware que no permita hacer
175 //_________________________________________________________________________________
176 void comm::ClientSocket::connect()
177 throw(RuntimeException) {
178 Guard guard(*this, "comm::ClientSocket (connect)");
179 anna_socket_assert(isConnected() == true, "Already connected");
181 if (Socket::isOpened() == false) {
184 setBlockingMode(false);
187 if (Socket::a_localAccessPoint.isNull() == false && Socket::isBound() == false)
192 a_remoteAccessPoint.translate(*this, s, len);
193 a_cachedServer = NULL;
194 const Millisecond msinit = functions::millisecond();
201 waiting.fd = Socket::a_fd;
202 waiting.events = POLLOUT;
204 while ((msnow - msinit) <= a_msMaxConnectionDelay) {
208 if ((r = do_connect(s, len)) != -1)
210 } catch (RuntimeException&) {
215 if ((xerrno = errno) == EISCONN) {
220 if (xerrno != EINPROGRESS && xerrno != EALREADY)
223 anna_signal_shield(r, poll(&waiting, 1, a_msMaxConnectionDelay));
230 msnow = functions::millisecond();
234 std::string msg("comm::ClientSocket::connect | ");
236 msg += " | N-loop: ";
237 msg += functions::asString(tryCounter);
239 msg += functions::asString(msnow - msinit);
242 throw RuntimeException(msg, xerrno, ANNA_FILE_LOCATION);
245 activate(Status::Connected);
248 string msg("comm::ClientSocket::connect | ");
250 Logger::debug(msg, ANNA_FILE_LOCATION);
254 void comm::ClientSocket::send(comm::Message& message)
255 throw(RuntimeException) {
256 Guard guard(*this, "comm::ClientSocket (send)");
257 do_write(getTransport()->code(message));
260 void comm::ClientSocket::send(comm::Message* message)
261 throw(RuntimeException) {
263 throw RuntimeException("comm::ClientSocket::send | Cannot send a NULL message", ANNA_FILE_LOCATION);
268 //_______________________________________________________________________
269 // Si el clientsocket no esta marcado como "en proceso", es decir, todavia estamos
270 // recogiendo mensajes de el, se cierra directamente, en otro cpero aso se marca
271 // como pendiente de cerrar y cuando se recogan todos los mensajes que contiene
272 // su buffer, se cerrará.
273 //_______________________________________________________________________
274 void comm::ClientSocket::requestClose()
276 Communicator* communicator = app::functions::component <Communicator> (ANNA_FILE_LOCATION);
277 a_status |= Status::ClosePending;
278 // Sólo tiene efecto real en modo ST
279 communicator->notifyPendingClose();
282 communicator->detach (this);
287 //---------------------------------------------------------------------------------------
288 // Cuando el communicator detecta actividad en el socket invoca a este metodo.
289 // (1) Si devuelve 0 es porque han borrado el extremo remoto.
290 // (2) Si el buffer intermedio tiene mas de 4 veces el numero de bytes planificado =>
291 // lo cerramos independientemente del tamanio esperado.
292 // (3) Si el buffer tiene un numero de bytes mayor al planificiado y todavia no ha
293 // detectado ningun mensaje => lo cerramos.
295 // Se invoca desde un entorno protegido, por lo que no hace falta SSCC.
296 //---------------------------------------------------------------------------------------
297 comm::Socket::Notify::_v comm::ClientSocket::receive()
298 throw(RuntimeException) {
301 if (a_reader.getMaxSize() == 0) {
302 string msg(asString());
303 msg += " | I/O buffer length was not established";
304 throw RuntimeException(msg, ANNA_FILE_LOCATION);
307 const char* buffer(a_reader.getData());
309 const int r(do_read(buffer, a_reader.getMaxSize()));
311 a_buffer.setup(buffer, r);
313 // Si ha leido el máximo permitido, entonces obtiene los bytes que quedan por leer en el sistema I/O.
314 // Si ha leído menos del máximo, entonces no queda nada en el sistema I/O
315 if (r == a_reader.getMaxSize()) {
316 if (ioctl(a_fd, FIONREAD, &a_pendingBytes.bytesToRead) != 0)
317 a_pendingBytes.bytesToRead = 0;
319 a_pendingBytes.bytesToRead = 0;
322 string msg("comm::ClientSocket::receive | ");
324 msg += " | Pending: ";
325 msg += anna::functions::asString(a_data, 24);
326 msg += functions::asText(" | Received N-bytes: ", a_buffer.getSize());
328 msg += anna::functions::asString(a_buffer, 24);
329 Logger::debug(msg, ANNA_FILE_LOCATION);
333 if (getIgnoreIncomingMessages() == true) {
334 app::functions::component <Communicator> (ANNA_FILE_LOCATION)->eventIgnoreBurst(*this, a_buffer);
337 string msg("comm::ClientSocket::receive | ");
339 msg += " | Ignoring incoming messages";
340 Logger::debug(msg, ANNA_FILE_LOCATION);
345 result = Notify::ReceiveData;
347 const int overQuotaSize(getTransport()->getOverQuotaSize());
349 if (a_expectedSize == -1)
350 calculeExpectedSize(a_data);
352 if (a_expectedSize == -1 && getBufferSize() > overQuotaSize) { // (3)
353 string msg(asString());
354 msg += functions::asText(" | Closed local point due to excessive memory consumption: N-Bytes: ", r);
355 msg += functions::asText("/BufferSize: ", getBufferSize());
356 msg += functions::asText("/OverQuotaSize: ", overQuotaSize);
358 Logger::error(msg, ANNA_FILE_LOCATION);
359 app::functions::component <Communicator> (ANNA_FILE_LOCATION)->eventOverQuota(*this);
362 result = Notify::Close;
363 } else if (isCorrupt() == true)
364 result = Notify::Corrupt;
366 result = Notify::Close;
371 comm::Socket::Notify::_v comm::ClientSocket::wait(const Millisecond &timeout, const bool _receive)
372 throw(RuntimeException) {
373 Guard guard(*this, "comm::ClientSocket (wait)");
375 if (Socket::isOpened() == false) {
376 string msg("comm::ClientSocket::wait | ");
378 msg += " | Is not opened";
379 throw RuntimeException(msg, ANNA_FILE_LOCATION);
383 poll.setTimeout(timeout);
386 Notify::_v result = (poll.fetch() != -1) ? ((_receive == true) ? receive() : Notify::ReceiveData) : Notify::None;
388 string msg("comm::ClientSocket::wait | Timeout: ");
389 msg += functions::asString(timeout);
390 msg += functions::asText(" ms | Receive: ", _receive);
391 msg += functions::asText(" | fd: ", getfd());
392 msg += " | Result: ";
393 msg += Socket::asText(result);
394 Logger::debug(msg, ANNA_FILE_LOCATION);
399 //---------------------------------------------------------------------------------------
400 // Cuando el ClientSocket notifica que ha recibido datos, el communicator invoca a este
401 // metodo por si hubiera algn mensaje completo.
403 // (1) Si todavia no conoce el tamao esperado del mensaje => intenta calcularlo.
404 // (2) Si ya conoce la longitud y en el buffer recibido quedan , al menos, ese nmero
405 // de bytes => hay al menos un mensaje completo. Lo procesa y deja todo preparado
406 // para la proxima invocacion de este metodo.
407 // (3) Todavia no tenemos un mensajes completo, lo guardamos y esperamos a que se vuelva
408 // a leer lo que falta desde 'ClientSocket::receive'.
409 // (4) Si nos quedan bytes por analizar pero no tenemos suficientes para calcular
410 // la longitud esperada => prepara el a_data para recibir el siguiente bloque.
411 //---------------------------------------------------------------------------------------
412 const DataBlock* comm::ClientSocket::fetch()
413 throw(RuntimeException) {
414 if (a_status & Status::ClosePending)
417 const int remainingSize(a_data.getSize() - a_offset);
419 if (remainingSize <= 0) {
425 const DataBlock* result(NULL);
427 a_buffer.setup(a_data.getData() + a_offset, remainingSize);
429 if (a_expectedSize == -1) // (1)
430 calculeExpectedSize(a_buffer);
432 if (isCorrupt() == true)
435 if (a_expectedSize != -1) {
436 if (a_expectedSize <= remainingSize) { // (2)
437 a_buffer.resize(a_expectedSize);
439 a_offset += a_expectedSize;
448 if (result == NULL) {
450 if (a_offset >= a_data.getSize())
453 a_data.remove(a_offset);
459 string msg("comm::ClientSocket::fetch | ");
461 msg += functions::asText(" | N-bytes: ", a_data.getSize());
462 msg += " | Pending: ";
463 msg += anna::functions::asString(a_data, 24);
464 Logger::debug(msg, ANNA_FILE_LOCATION);
471 void comm::ClientSocket::calculeExpectedSize(const DataBlock& data)
474 if ((a_expectedSize = getTransport()->calculeSize(data)) != -1) {
476 string msg("comm::ClientSocket::calculeExpectedSize | ");
478 Logger::debug(msg, ANNA_FILE_LOCATION)
482 if (a_expectedSize < -1) {
483 activate(Status::Corrupt);
484 string msg(asString());
485 msg += " | Invalid expected size";
486 Logger::error(msg, ANNA_FILE_LOCATION);
488 } catch (RuntimeException& ex) {
490 activate(Status::Corrupt);
494 comm::Transport* comm::ClientSocket::reserveTransport()
495 throw(RuntimeException) {
496 if (a_transport == NULL) {
497 if (Socket::a_transportFactory == NULL) {
498 string msg(asString());
499 msg += " | Transport factory was not especified";
500 throw RuntimeException(msg, ANNA_FILE_LOCATION);
503 Guard guard(this, "comm::ClientSocket::reserveTransport");
505 if (a_transport == NULL) {
506 Guard guard(Socket::a_transportFactory, typeid(Socket::a_transportFactory).name());
507 a_transport = Socket::a_transportFactory->create();
508 a_transport->clear();
515 comm::Transport* comm::ClientSocket::unsafe_reserveTransport()
516 throw(RuntimeException) {
517 if (a_transport == NULL) {
518 if (Socket::a_transportFactory == NULL) {
519 string msg(asString());
520 msg += " | Transport factory was not especified";
521 throw RuntimeException(msg, ANNA_FILE_LOCATION);
524 a_transport = Socket::a_transportFactory->create();
525 a_transport->clear();
531 comm::Receiver* comm::ClientSocket::reserveReceiver()
532 throw(RuntimeException) {
533 if (a_receiver == NULL && Socket::a_receiverFactory != NULL) {
534 Guard guard(this, "comm::ClientSocket::reserveReceiver");
536 // NO hace falta proteger porque ya lo hace su 'create'
537 if (a_receiver == NULL)
538 a_receiver = Socket::a_receiverFactory->create();
544 // Este metodo se sobre-escribira en commsec::ClientSocket para establecer la conexion mediante las
546 int comm::ClientSocket::do_connect(const sockaddr* s, const int len)
547 throw(RuntimeException) {
549 anna_signal_shield(r, ::connect(a_fd, s, len));
553 //----------------------------------------------------------------------------------------------------
554 // Este metodo se sobre-escribe en comm::DatagramSocket
556 // (1) si no ha sido capaz de escribir todo el mensaje tenemos que esperar a que el bloque anterior
557 // haya salido, ya que si intentamos escribir de forma contienua habra un momento que obtengamos
558 // un error 'Resource temporarily unavailable'.
559 // (2) La cola de salida esta llena momentaneamente, esperamos unos milisegundos para volver a
561 // (3) Obtenmos el error "Broken Pipe" ya solo se puede cerrar el socket y tratar de comenzar
562 // (4) Si envió un trozo, pero no pudo enviar el mensaje completo => cierra el socket para evitar
563 // problemas de interpretación en el extremo remoto.
564 //----------------------------------------------------------------------------------------------------
565 void comm::ClientSocket::do_write(const DataBlock& message)
566 throw(RuntimeException) {
567 int size = message.getSize();
568 const char* data = message.getData();
575 bool sendSomething = false;
577 // Si ha enviado parte del mensaje => tiene que enviarlo completo
578 while (retry == true && cx < 5) {
579 anna_signal_shield(r, write(Socket::a_fd, data, size));
589 if (xerrno == EAGAIN) { // (2)
592 waiting.fd = Socket::a_fd;
593 waiting.events = POLLOUT;
594 anna_signal_shield(r, poll(&waiting, 1, a_msMaxWriteDelay));
596 // Si hay un error definitivo deja de intentarlo y lanza la excepción.
597 // Si se ha enviado algo supondremos que el extremo remoto es capaz de recuperarse
600 } else { /* (r < left) */
605 sendSomething = true;
610 if (xerrno == EPIPE || sendSomething == true) { // (3)(4)
611 // activate (Status::ClosePending);
613 r = shutdown(getfd(), SHUT_WR);
616 string msg("comm::ClientSocket::do_write | fd: ");
617 msg += functions::asString(getfd());
618 throw RuntimeException(msg, xerrno, ANNA_FILE_LOCATION);
622 string msg("comm::ClientSocket::do_write | ");
624 msg += functions::asText(" | N-Loop: ", nloop);
625 msg += functions::asText(" | Sent: ", message);
626 Logger::debug(msg, ANNA_FILE_LOCATION);
630 //----------------------------------------------------------------------------
631 // (1) Cuando intentamos leer del socket asociado a un servidor que ha caido
632 // no devuelve 0 (como cuando se cae el socket de un cliente) sino que
633 // devuelve el error (ECONNRESET = 131).
635 // Devuelve 0 para indicar que el Socket se ha cerrado en el extremo remoto
636 //----------------------------------------------------------------------------
637 int comm::ClientSocket::do_read(const char* data, const int maxSize)
638 throw(RuntimeException) {
642 if ((result = ::read(Socket::a_fd, (void*) data, maxSize)) < 0) {
646 if (errno == ECONNRESET) { // (1)
651 string msg(asString());
652 msg += " | Cannot receive";
653 throw RuntimeException(msg, errno, ANNA_FILE_LOCATION);
655 } while (errno == EINTR && result < 0);
660 void comm::ClientSocket::forgot()
662 if (a_transport != NULL)
663 a_transport->clear();
665 deactivate(Status::Corrupt);
671 //--------------------------------------------------------------------------------------
672 // (1) Recupera el tamano para verificar que se ha podido establecer la solicitud.
673 //--------------------------------------------------------------------------------------
674 void comm::ClientSocket::getSocketOptions()
675 throw(RuntimeException) {
676 if ((a_rcvBufferSize = comm::Communicator::getReceivingChunkSize()) == -1) {
677 socklen_t l = sizeof(int);
678 int fd = Socket::getfd();
679 anna_comm_socket_check(
680 getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &a_rcvBufferSize, &l),
681 "Cannot obtain SO_RCVBUF"
685 a_reader.allocate(a_rcvBufferSize);
688 int comm::ClientSocket::getTotalPendingBytes() const
690 // El nº de bytes pendientes de procesar será lo que queda en la cola I/O + los bytes que tenemos cargados en memoria
691 // los bytes pendientes se obtiene al leer el chunk
692 return a_pendingBytes.bytesToRead + getBufferSize();
696 string comm::ClientSocket::asString() const
698 string msg("comm::ClientSocket { ");
701 return msg += " <freed> }";
703 msg += Socket::asString();
704 msg += functions::asText(" | RcvBufferSize: ", a_rcvBufferSize);
705 msg += " bytes | Status: ";
706 msg += Status::asString(a_status);
707 msg += functions::asString("| MaxConDelay: %d ms", a_msMaxConnectionDelay.getValue());
710 msg += functions::asText(" | OverQuotaSize: ", a_transport->getOverQuotaSize());
712 msg += functions::asString(" | Reserved: %d | Pending: %d | Offset: %d | ExpectedSize: %d", a_data.getSize(), getBufferSize(), a_offset, a_expectedSize);
713 msg += " | Remote access point: ";
714 a_remoteAccessPoint.asString(msg);
718 xml::Node* comm::ClientSocket::asXML(xml::Node* parent) const
719 throw(RuntimeException) {
720 xml::Node* clientSocket = parent->createChild("comm.ClientSocket");
723 clientSocket->createAttribute("Freed", "yes");
727 Socket::asXML(clientSocket);
728 clientSocket->createAttribute("Status", Status::asString(a_status));
729 clientSocket->createAttribute("RcvBufferSize", a_rcvBufferSize);
730 clientSocket->createAttribute("MaxConnDelay", a_msMaxConnectionDelay);
731 clientSocket->createAttribute("IgnoreIncomingMessages", functions::asString(a_ignoreIncomingMessages));
734 clientSocket->createAttribute("OverQuotaSize", a_transport->getOverQuotaSize());
736 xml::Node* buffer = clientSocket->createChild("Buffer");
737 buffer->createAttribute("Reserved", a_data.getSize());
738 buffer->createAttribute("Pending", getTotalPendingBytes());
739 a_remoteAccessPoint.asXML("comm.RemotePoint", clientSocket);
742 a_receiver->asXML(clientSocket);
747 string comm::ClientSocket::Status::asString(const int status)
754 if (status & Status::Connected)
755 result = "Connected ";
757 if (status & Status::Corrupt)
758 result += "Corrupt ";
760 if (status & Status::ClosePending)
761 result += "ClosePending ";
763 if (status & Status::Working)
764 result += "Working ";