1 // ANNA - Anna is Not Nothingness Anymore
3 // (c) Copyright 2005-2014 Eduardo Ramos Testillano & Francisco Ruiz Rayo
5 // http://redmine.teslayout.com/projects/anna-suite
7 // Redistribution and use in source and binary forms, with or without
8 // modification, are permitted provided that the following conditions
11 // * Redistributions of source code must retain the above copyright
12 // notice, this list of conditions and the following disclaimer.
13 // * Redistributions in binary form must reproduce the above
14 // copyright notice, this list of conditions and the following disclaimer
15 // in the documentation and/or other materials provided with the
17 // * Neither the name of the copyright holder nor the names of its
18 // contributors may be used to endorse or promote products derived from
19 // this software without specific prior written permission.
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33 // Authors: eduardo.ramos.testillano@gmail.com
34 // cisco.tierra@gmail.com
37 #include <sys/ioctl.h>
42 //#include <asm/ioctls.h>
46 #include <anna/core/tracing/TraceMethod.hpp>
47 #include <anna/core/functions.hpp>
49 #include <anna/xml/Node.hpp>
50 #include <anna/xml/Attribute.hpp>
52 #include <anna/app/functions.hpp>
54 #include <anna/comm/ClientSocket.hpp>
55 #include <anna/comm/INetAddress.hpp>
56 #include <anna/comm/TransportFactory.hpp>
57 #include <anna/comm/Transport.hpp>
58 #include <anna/comm/Message.hpp>
59 #include <anna/comm/Network.hpp>
60 #include <anna/comm/Host.hpp>
61 #include <anna/comm/Server.hpp>
62 #include <anna/comm/Communicator.hpp>
63 #include <anna/comm/ReceiverFactory.hpp>
64 #include <anna/comm/Receiver.hpp>
65 #include <anna/comm/CongestionController.hpp>
67 #include <anna/comm/internal/Poll.hpp>
73 const Millisecond comm::ClientSocket::DefaultMaxConnectionDelay(200);
74 const Millisecond comm::ClientSocket::DefaultMaxWriteDelay(500);
77 void comm::ClientSocket::initialize()
79 a_status = Status::None;
83 a_cachedServer = NULL;
84 a_msMaxConnectionDelay = DefaultMaxConnectionDelay;
85 a_msMaxWriteDelay = DefaultMaxWriteDelay;
91 * Se invoca desde el handler::ServerSocket y no hace falta protegerlo con una SCC
92 * porque ya está protegido por éste.
94 void comm::ClientSocket::setfd(const int fd)
95 throw(RuntimeException) {
97 Socket::a_isBound = true;
98 activate(Status::Connected);
101 if (Socket::a_type == Socket::Type::Stream) {
103 socklen_t len(sizeof(addr));
104 Network& network(Network::instantiate());
105 anna_comm_socket_check(
106 getsockname(fd, (struct sockaddr *) &addr, &len),
107 "Cannot obtain socket name"
109 Socket::a_localAccessPoint = INetAddress(network.find(addr.sin_addr.s_addr), ntohs(addr.sin_port));
110 anna_comm_socket_check(
111 getpeername(fd, (struct sockaddr *) &addr, &len),
112 "Cannot obtain peer name"
114 a_remoteAccessPoint = INetAddress(network.find(addr.sin_addr.s_addr), ntohs(addr.sin_port));
117 setBlockingMode(false);
120 string msg("comm::ClientSocket::setfd | ");
122 Logger::debug(msg, ANNA_FILE_LOCATION);
127 * Si se establece el receptor directamente, sin tener una factoria intermedia, hay que
128 * invocar al metodo comm::Receiver::initialize.
130 void comm::ClientSocket::setReceiver(comm::Receiver* receiver)
131 throw(RuntimeException) {
132 if (a_receiver && Socket::a_receiverFactory)
133 Socket::a_receiverFactory->release(a_receiver);
135 a_receiver = receiver;
137 if (a_receiverFactory == NULL)
138 a_receiver->initialize();
141 // Protegido desde el Socket::close
142 void comm::ClientSocket::do_close()
146 if (a_transport != NULL) {
147 a_transport->clear();
148 Guard guard(Socket::a_transportFactory, typeid(Socket::a_transportFactory).name());
149 Socket::a_transportFactory->release(a_transport);
153 a_status = Status::None;
156 a_cachedServer = NULL;
160 if (Socket::a_receiverFactory)
161 Socket::a_receiverFactory->release(a_receiver);
167 comm::Server* comm::ClientSocket::getServer()
168 throw(RuntimeException) {
169 Guard guard(*this, "comm::ClientSocket (getServer)");
170 const INetAddress& remoteAddress = getRemoteAccessPoint().getINetAddress();
171 const Device* device = remoteAddress.getDevice(false);
172 const int remotePort = remoteAddress.getPort();
177 if (a_cachedServer != NULL)
178 if (a_cachedServer->getRemotePort() == remotePort && a_cachedServer->getHost()->contains(device))
179 return a_cachedServer;
181 Network& network = Network::instantiate();
182 comm::Server* result = NULL;
185 for (Network::host_iterator ii = network.host_begin(), maxii = network.host_end(); ii != maxii; ii ++) {
186 if (Network::host(ii)->contains(device) == false)
189 if ((aux = Network::host(ii)->find_server(remotePort)) == NULL)
196 return (a_cachedServer = result);
199 //_________________________________________________________________________________
200 // (1) Si termina el periodo de espera sin devolver ninguna actividad => podemos considerar que
201 // se ha conectado. Es posible que haya algún componente hardware que no permita hacer
203 //_________________________________________________________________________________
204 void comm::ClientSocket::connect()
205 throw(RuntimeException) {
206 Guard guard(*this, "comm::ClientSocket (connect)");
207 anna_socket_assert(isConnected() == true, "Already connected");
209 if (Socket::isOpened() == false) {
212 setBlockingMode(false);
215 if (Socket::a_localAccessPoint.isNull() == false && Socket::isBound() == false)
220 a_remoteAccessPoint.translate(*this, s, len);
221 a_cachedServer = NULL;
222 const Millisecond msinit = functions::millisecond();
229 waiting.fd = Socket::a_fd;
230 waiting.events = POLLOUT;
232 while ((msnow - msinit) <= a_msMaxConnectionDelay) {
236 if ((r = do_connect(s, len)) != -1)
238 } catch (RuntimeException&) {
243 if ((xerrno = errno) == EISCONN) {
248 if (xerrno != EINPROGRESS && xerrno != EALREADY)
251 anna_signal_shield(r, poll(&waiting, 1, a_msMaxConnectionDelay));
258 msnow = functions::millisecond();
262 std::string msg("comm::ClientSocket::connect | ");
264 msg += " | N-loop: ";
265 msg += functions::asString(tryCounter);
267 msg += functions::asString(msnow - msinit);
270 throw RuntimeException(msg, xerrno, ANNA_FILE_LOCATION);
273 activate(Status::Connected);
276 string msg("comm::ClientSocket::connect | ");
278 Logger::debug(msg, ANNA_FILE_LOCATION);
282 void comm::ClientSocket::send(comm::Message& message)
283 throw(RuntimeException) {
284 Guard guard(*this, "comm::ClientSocket (send)");
285 do_write(getTransport()->code(message));
288 void comm::ClientSocket::send(comm::Message* message)
289 throw(RuntimeException) {
291 throw RuntimeException("comm::ClientSocket::send | Cannot send a NULL message", ANNA_FILE_LOCATION);
296 //_______________________________________________________________________
297 // Si el clientsocket no esta marcado como "en proceso", es decir, todavia estamos
298 // recogiendo mensajes de el, se cierra directamente, en otro cpero aso se marca
299 // como pendiente de cerrar y cuando se recogan todos los mensajes que contiene
300 // su buffer, se cerrará.
301 //_______________________________________________________________________
302 void comm::ClientSocket::requestClose()
304 Communicator* communicator = app::functions::component <Communicator> (ANNA_FILE_LOCATION);
305 a_status |= Status::ClosePending;
306 // Sólo tiene efecto real en modo ST
307 communicator->notifyPendingClose();
310 communicator->detach (this);
315 //---------------------------------------------------------------------------------------
316 // Cuando el communicator detecta actividad en el socket invoca a este metodo.
317 // (1) Si devuelve 0 es porque han borrado el extremo remoto.
318 // (2) Si el buffer intermedio tiene mas de 4 veces el numero de bytes planificado =>
319 // lo cerramos independientemente del tamanio esperado.
320 // (3) Si el buffer tiene un numero de bytes mayor al planificiado y todavia no ha
321 // detectado ningun mensaje => lo cerramos.
323 // Se invoca desde un entorno protegido, por lo que no hace falta SSCC.
324 //---------------------------------------------------------------------------------------
325 comm::Socket::Notify::_v comm::ClientSocket::receive()
326 throw(RuntimeException) {
330 if (a_reader.getMaxSize() == 0) {
331 string msg(asString());
332 msg += " | I/O buffer length was not established";
333 throw RuntimeException(msg, ANNA_FILE_LOCATION);
336 const char* buffer(a_reader.getData());
338 const int r(do_read(buffer, a_reader.getMaxSize()));
340 a_buffer.setup(buffer, r);
342 // Si ha leido el máximo permitido, entonces obtiene los bytes que quedan por leer en el sistema I/O.
343 // Si ha leído menos del máximo, entonces no queda nada en el sistema I/O
344 if (r == a_reader.getMaxSize()) {
345 if (ioctl(a_fd, FIONREAD, &a_pendingBytes.bytesToRead) != 0)
346 a_pendingBytes.bytesToRead = 0;
348 a_pendingBytes.bytesToRead = 0;
351 string msg("comm::ClientSocket::receive | ");
353 msg += " | Pending: ";
354 msg += anna::functions::asString(a_data, 24);
355 msg += functions::asText(" | Received N-bytes: ", a_buffer.getSize());
357 msg += anna::functions::asString(a_buffer, 24);
358 Logger::debug(msg, ANNA_FILE_LOCATION);
362 if (getIgnoreIncomingMessages() == true) {
363 app::functions::component <Communicator> (ANNA_FILE_LOCATION)->eventIgnoreBurst(*this, a_buffer);
366 string msg("comm::ClientSocket::receive | ");
368 msg += " | Ignoring incoming messages";
369 Logger::debug(msg, ANNA_FILE_LOCATION);
374 result = Notify::ReceiveData;
376 const int overQuotaSize(getTransport()->getOverQuotaSize());
378 if (a_expectedSize == -1)
379 calculeExpectedSize(a_data);
381 if (a_expectedSize == -1 && getBufferSize() > overQuotaSize) { // (3)
382 string msg(asString());
383 msg += functions::asText(" | Closed local point due to excessive memory consumption: N-Bytes: ", r);
384 msg += functions::asText("/BufferSize: ", getBufferSize());
385 msg += functions::asText("/OverQuotaSize: ", overQuotaSize);
387 Logger::error(msg, ANNA_FILE_LOCATION);
388 app::functions::component <Communicator> (ANNA_FILE_LOCATION)->eventOverQuota(*this);
391 result = Notify::Close;
392 } else if (isCorrupt() == true)
393 result = Notify::Corrupt;
395 result = Notify::Close;
400 comm::Socket::Notify::_v comm::ClientSocket::wait(const Millisecond &timeout, const bool _receive)
401 throw(RuntimeException) {
402 Guard guard(*this, "comm::ClientSocket (wait)");
404 if (Socket::isOpened() == false) {
405 string msg("comm::ClientSocket::wait | ");
407 msg += " | Is not opened";
408 throw RuntimeException(msg, ANNA_FILE_LOCATION);
412 poll.setTimeout(timeout);
415 Notify::_v result = (poll.fetch() != -1) ? ((_receive == true) ? receive() : Notify::ReceiveData) : Notify::None;
417 string msg("comm::ClientSocket::wait | Timeout: ");
418 msg += functions::asString(timeout);
419 msg += functions::asText(" ms | Receive: ", _receive);
420 msg += functions::asText(" | fd: ", getfd());
421 msg += " | Result: ";
422 msg += Socket::asText(result);
423 Logger::debug(msg, ANNA_FILE_LOCATION);
428 //---------------------------------------------------------------------------------------
429 // Cuando el ClientSocket notifica que ha recibido datos, el communicator invoca a este
430 // metodo por si hubiera algn mensaje completo.
432 // (1) Si todavia no conoce el tamao esperado del mensaje => intenta calcularlo.
433 // (2) Si ya conoce la longitud y en el buffer recibido quedan , al menos, ese nmero
434 // de bytes => hay al menos un mensaje completo. Lo procesa y deja todo preparado
435 // para la proxima invocacion de este metodo.
436 // (3) Todavia no tenemos un mensajes completo, lo guardamos y esperamos a que se vuelva
437 // a leer lo que falta desde 'ClientSocket::receive'.
438 // (4) Si nos quedan bytes por analizar pero no tenemos suficientes para calcular
439 // la longitud esperada => prepara el a_data para recibir el siguiente bloque.
440 //---------------------------------------------------------------------------------------
441 const DataBlock* comm::ClientSocket::fetch()
442 throw(RuntimeException) {
443 if (a_status & Status::ClosePending)
446 const int remainingSize(a_data.getSize() - a_offset);
448 if (remainingSize <= 0) {
454 const DataBlock* result(NULL);
456 a_buffer.setup(a_data.getData() + a_offset, remainingSize);
458 if (a_expectedSize == -1) // (1)
459 calculeExpectedSize(a_buffer);
461 if (isCorrupt() == true)
464 if (a_expectedSize != -1) {
465 if (a_expectedSize <= remainingSize) { // (2)
466 a_buffer.resize(a_expectedSize);
468 a_offset += a_expectedSize;
477 if (result == NULL) {
479 if (a_offset >= a_data.getSize())
482 a_data.remove(a_offset);
488 string msg("comm::ClientSocket::fetch | ");
490 msg += functions::asText(" | N-bytes: ", a_data.getSize());
491 msg += " | Pending: ";
492 msg += anna::functions::asString(a_data, 24);
493 Logger::debug(msg, ANNA_FILE_LOCATION);
500 void comm::ClientSocket::calculeExpectedSize(const DataBlock& data)
503 if ((a_expectedSize = getTransport()->calculeSize(data)) != -1) {
505 string msg("comm::ClientSocket::calculeExpectedSize | ");
507 Logger::debug(msg, ANNA_FILE_LOCATION)
511 if (a_expectedSize < -1) {
512 activate(Status::Corrupt);
513 string msg(asString());
514 msg += " | Invalid expected size";
515 Logger::error(msg, ANNA_FILE_LOCATION);
517 } catch (RuntimeException& ex) {
519 activate(Status::Corrupt);
523 comm::Transport* comm::ClientSocket::reserveTransport()
524 throw(RuntimeException) {
525 if (a_transport == NULL) {
526 if (Socket::a_transportFactory == NULL) {
527 string msg(asString());
528 msg += " | Transport factory was not especified";
529 throw RuntimeException(msg, ANNA_FILE_LOCATION);
532 Guard guard(this, "comm::ClientSocket::reserveTransport");
534 if (a_transport == NULL) {
535 Guard guard(Socket::a_transportFactory, typeid(Socket::a_transportFactory).name());
536 a_transport = Socket::a_transportFactory->create();
537 a_transport->clear();
544 comm::Transport* comm::ClientSocket::unsafe_reserveTransport()
545 throw(RuntimeException) {
546 if (a_transport == NULL) {
547 if (Socket::a_transportFactory == NULL) {
548 string msg(asString());
549 msg += " | Transport factory was not especified";
550 throw RuntimeException(msg, ANNA_FILE_LOCATION);
553 a_transport = Socket::a_transportFactory->create();
554 a_transport->clear();
560 comm::Receiver* comm::ClientSocket::reserveReceiver()
561 throw(RuntimeException) {
562 if (a_receiver == NULL && Socket::a_receiverFactory != NULL) {
563 Guard guard(this, "comm::ClientSocket::reserveReceiver");
565 // NO hace falta proteger porque ya lo hace su 'create'
566 if (a_receiver == NULL)
567 a_receiver = Socket::a_receiverFactory->create();
573 // Este metodo se sobre-escribira en commsec::ClientSocket para establecer la conexion mediante las
575 int comm::ClientSocket::do_connect(const sockaddr* s, const int len)
576 throw(RuntimeException) {
578 anna_signal_shield(r, ::connect(a_fd, s, len));
582 //----------------------------------------------------------------------------------------------------
583 // Este metodo se sobre-escribe en comm::DatagramSocket
585 // (1) si no ha sido capaz de escribir todo el mensaje tenemos que esperar a que el bloque anterior
586 // haya salido, ya que si intentamos escribir de forma contienua habra un momento que obtengamos
587 // un error 'Resource temporarily unavailable'.
588 // (2) La cola de salida esta llena momentaneamente, esperamos unos milisegundos para volver a
590 // (3) Obtenmos el error "Broken Pipe" ya solo se puede cerrar el socket y tratar de comenzar
591 // (4) Si envió un trozo, pero no pudo enviar el mensaje completo => cierra el socket para evitar
592 // problemas de interpretación en el extremo remoto.
593 //----------------------------------------------------------------------------------------------------
594 void comm::ClientSocket::do_write(const DataBlock& message)
595 throw(RuntimeException) {
596 int size = message.getSize();
597 const char* data = message.getData();
604 bool sendSomething = false;
606 // Si ha enviado parte del mensaje => tiene que enviarlo completo
607 while (retry == true && cx < 5) {
608 anna_signal_shield(r, write(Socket::a_fd, data, size));
618 if (xerrno == EAGAIN) { // (2)
621 waiting.fd = Socket::a_fd;
622 waiting.events = POLLOUT;
623 anna_signal_shield(r, poll(&waiting, 1, a_msMaxWriteDelay));
625 // Si hay un error definitivo deja de intentarlo y lanza la excepción.
626 // Si se ha enviado algo supondremos que el extremo remoto es capaz de recuperarse
629 } else { /* (r < left) */
634 sendSomething = true;
639 if (xerrno == EPIPE || sendSomething == true) { // (3)(4)
640 // activate (Status::ClosePending);
642 r = shutdown(getfd(), SHUT_WR);
645 string msg("comm::ClientSocket::do_write | fd: ");
646 msg += functions::asString(getfd());
647 throw RuntimeException(msg, xerrno, ANNA_FILE_LOCATION);
651 string msg("comm::ClientSocket::do_write | ");
653 msg += functions::asText(" | N-Loop: ", nloop);
654 msg += functions::asText(" | Sent: ", message);
655 Logger::debug(msg, ANNA_FILE_LOCATION);
659 //----------------------------------------------------------------------------
660 // (1) Cuando intentamos leer del socket asociado a un servidor que ha caido
661 // no devuelve 0 (como cuando se cae el socket de un cliente) sino que
662 // devuelve el error (ECONNRESET = 131).
664 // Devuelve 0 para indicar que el Socket se ha cerrado en el extremo remoto
665 //----------------------------------------------------------------------------
666 int comm::ClientSocket::do_read(const char* data, const int maxSize)
667 throw(RuntimeException) {
671 if ((result = ::read(Socket::a_fd, (void*) data, maxSize)) < 0) {
675 if (errno == ECONNRESET) { // (1)
680 string msg(asString());
681 msg += " | Cannot receive";
682 throw RuntimeException(msg, errno, ANNA_FILE_LOCATION);
684 } while (errno == EINTR && result < 0);
689 void comm::ClientSocket::forgot()
691 if (a_transport != NULL)
692 a_transport->clear();
694 deactivate(Status::Corrupt);
700 //--------------------------------------------------------------------------------------
701 // (1) Recupera el tamano para verificar que se ha podido establecer la solicitud.
702 //--------------------------------------------------------------------------------------
703 void comm::ClientSocket::getSocketOptions()
704 throw(RuntimeException) {
705 if ((a_rcvBufferSize = comm::Communicator::getReceivingChunkSize()) == -1) {
706 socklen_t l = sizeof(int);
707 int fd = Socket::getfd();
708 anna_comm_socket_check(
709 getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &a_rcvBufferSize, &l),
710 "Cannot obtain SO_RCVBUF"
714 a_reader.allocate(a_rcvBufferSize);
717 int comm::ClientSocket::getTotalPendingBytes() const
719 // El nº de bytes pendientes de procesar será lo que queda en la cola I/O + los bytes que tenemos cargados en memoria
720 // los bytes pendientes se obtiene al leer el chunk
721 return a_pendingBytes.bytesToRead + getBufferSize();
725 string comm::ClientSocket::asString() const
727 string msg("comm::ClientSocket { ");
730 return msg += " <freed> }";
732 msg += Socket::asString();
733 msg += functions::asText(" | RcvBufferSize: ", a_rcvBufferSize);
734 msg += " bytes | Status: ";
735 msg += Status::asString(a_status);
736 msg += functions::asString("| MaxConDelay: %d ms", a_msMaxConnectionDelay.getValue());
739 msg += functions::asText(" | OverQuotaSize: ", a_transport->getOverQuotaSize());
741 msg += functions::asString(" | Reserved: %d | Pending: %d | Offset: %d | ExpectedSize: %d", a_data.getSize(), getBufferSize(), a_offset, a_expectedSize);
742 msg += " | Remote access point: ";
743 a_remoteAccessPoint.asString(msg);
747 xml::Node* comm::ClientSocket::asXML(xml::Node* parent) const
748 throw(RuntimeException) {
749 xml::Node* clientSocket = parent->createChild("comm.ClientSocket");
752 clientSocket->createAttribute("Freed", "yes");
756 Socket::asXML(clientSocket);
757 clientSocket->createAttribute("Status", Status::asString(a_status));
758 clientSocket->createAttribute("RcvBufferSize", a_rcvBufferSize);
759 clientSocket->createAttribute("MaxConnDelay", a_msMaxConnectionDelay);
760 clientSocket->createAttribute("IgnoreIncomingMessages", functions::asString(a_ignoreIncomingMessages));
763 clientSocket->createAttribute("OverQuotaSize", a_transport->getOverQuotaSize());
765 xml::Node* buffer = clientSocket->createChild("Buffer");
766 buffer->createAttribute("Reserved", a_data.getSize());
767 buffer->createAttribute("Pending", getTotalPendingBytes());
768 a_remoteAccessPoint.asXML("comm.RemotePoint", clientSocket);
771 a_receiver->asXML(clientSocket);
776 string comm::ClientSocket::Status::asString(const int status)
783 if (status & Status::Connected)
784 result = "Connected ";
786 if (status & Status::Corrupt)
787 result += "Corrupt ";
789 if (status & Status::ClosePending)
790 result += "ClosePending ";
792 if (status & Status::Working)
793 result += "Working ";