Updated license
[anna.git] / source / comm / ClientSocket.cpp
1 // ANNA - Anna is Not Nothingness Anymore
2 //
3 // (c) Copyright 2005-2014 Eduardo Ramos Testillano & Francisco Ruiz Rayo
4 //
5 // https://bitbucket.org/testillano/anna
6 //
7 // Redistribution and use in source and binary forms, with or without
8 // modification, are permitted provided that the following conditions
9 // are met:
10 //
11 //     * Redistributions of source code must retain the above copyright
12 // notice, this list of conditions and the following disclaimer.
13 //     * Redistributions in binary form must reproduce the above
14 // copyright notice, this list of conditions and the following disclaimer
15 // in the documentation and/or other materials provided with the
16 // distribution.
17 //     * Neither the name of Google Inc. nor the names of its
18 // contributors may be used to endorse or promote products derived from
19 // this software without specific prior written permission.
20 //
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 //
33 // Authors: eduardo.ramos.testillano@gmail.com
34 //          cisco.tierra@gmail.com
35
36
37 #include <sys/ioctl.h>
38 #include <sys/time.h>
39
40 #include <poll.h>
41
42 //#include <asm/ioctls.h>
43
44 #include <algorithm>
45
46 #include <anna/core/tracing/TraceMethod.hpp>
47 #include <anna/core/functions.hpp>
48
49 #include <anna/xml/Node.hpp>
50 #include <anna/xml/Attribute.hpp>
51
52 #include <anna/app/functions.hpp>
53
54 #include <anna/comm/ClientSocket.hpp>
55 #include <anna/comm/INetAddress.hpp>
56 #include <anna/comm/TransportFactory.hpp>
57 #include <anna/comm/Transport.hpp>
58 #include <anna/comm/Message.hpp>
59 #include <anna/comm/Network.hpp>
60 #include <anna/comm/Host.hpp>
61 #include <anna/comm/Server.hpp>
62 #include <anna/comm/Communicator.hpp>
63 #include <anna/comm/ReceiverFactory.hpp>
64 #include <anna/comm/Receiver.hpp>
65 #include <anna/comm/CongestionController.hpp>
66
67 #include <anna/comm/internal/Poll.hpp>
68
69 using namespace std;
70 using namespace anna;
71
72 // static
73 const Millisecond comm::ClientSocket::DefaultMaxConnectionDelay(200);
74 const Millisecond comm::ClientSocket::DefaultMaxWriteDelay(500);
75
76
77 void comm::ClientSocket::initialize()
78 throw() {
79    a_status = Status::None;
80    a_transport = NULL;
81    a_expectedSize = -1;
82    a_offset = 0;
83    a_cachedServer = NULL;
84    a_msMaxConnectionDelay = DefaultMaxConnectionDelay;
85    a_msMaxWriteDelay = DefaultMaxWriteDelay;
86    a_rcvBufferSize = -1;
87    a_receiver = NULL;
88 }
89
90 /*
91  * Se invoca desde el handler::ServerSocket y no hace falta protegerlo con una SCC
92  * porque ya está protegido por éste.
93  */
94 void comm::ClientSocket::setfd(const int fd)
95 throw(RuntimeException) {
96    Socket::a_fd = fd;
97    Socket::a_isBound = true;
98    activate(Status::Connected);
99    a_offset = 0;
100
101    if (Socket::a_type == Socket::Type::Stream) {
102       sockaddr_in addr;
103       socklen_t len(sizeof(addr));
104       Network& network(Network::instantiate());
105       anna_comm_socket_check(
106          getsockname(fd, (struct sockaddr *) &addr, &len),
107          "Cannot obtain socket name"
108       );
109       Socket::a_localAccessPoint = INetAddress(network.find(addr.sin_addr.s_addr), ntohs(addr.sin_port));
110       anna_comm_socket_check(
111          getpeername(fd, (struct sockaddr *) &addr, &len),
112          "Cannot obtain peer name"
113       );
114       a_remoteAccessPoint = INetAddress(network.find(addr.sin_addr.s_addr), ntohs(addr.sin_port));
115    }
116
117    setBlockingMode(false);
118    getSocketOptions();
119    LOGDEBUG(
120       string msg("comm::ClientSocket::setfd | ");
121       msg += asString();
122       Logger::debug(msg, ANNA_FILE_LOCATION);
123    );
124 }
125
126 /*
127  * Si se establece el receptor directamente, sin tener una factoria intermedia, hay que
128  * invocar al metodo comm::Receiver::initialize.
129  */
130 void comm::ClientSocket::setReceiver(comm::Receiver* receiver)
131 throw(RuntimeException) {
132    if (a_receiver && Socket::a_receiverFactory)
133       Socket::a_receiverFactory->release(a_receiver);
134
135    a_receiver = receiver;
136
137    if (a_receiverFactory == NULL)
138       a_receiver->initialize();
139 }
140
141 // Protegido desde el Socket::close
142 void comm::ClientSocket::do_close()
143 throw() {
144    Socket::do_close();
145
146    if (a_transport != NULL) {
147       a_transport->clear();
148       Guard guard(Socket::a_transportFactory, typeid(Socket::a_transportFactory).name());
149       Socket::a_transportFactory->release(a_transport);
150       a_transport = NULL;
151    }
152
153    a_status = Status::None;
154    a_expectedSize = -1;
155    a_data.clear();
156    a_cachedServer = NULL;
157    a_offset = 0;
158
159    if (a_receiver) {
160       if (Socket::a_receiverFactory)
161          Socket::a_receiverFactory->release(a_receiver);
162
163       a_receiver = NULL;
164    }
165 }
166
167 comm::Server* comm::ClientSocket::getServer()
168 throw(RuntimeException) {
169    Guard guard(*this, "comm::ClientSocket (getServer)");
170    const INetAddress& remoteAddress = getRemoteAccessPoint().getINetAddress();
171    const Device* device = remoteAddress.getDevice(false);
172    const int remotePort = remoteAddress.getPort();
173
174    if (device == NULL)
175       return NULL;
176
177    if (a_cachedServer != NULL)
178       if (a_cachedServer->getRemotePort() == remotePort && a_cachedServer->getHost()->contains(device))
179          return a_cachedServer;
180
181    Network& network = Network::instantiate();
182    comm::Server* result = NULL;
183    comm::Server* aux;
184
185    for (Network::host_iterator ii = network.host_begin(), maxii = network.host_end(); ii != maxii; ii ++) {
186       if (Network::host(ii)->contains(device) == false)
187          continue;
188
189       if ((aux = Network::host(ii)->find_server(remotePort)) == NULL)
190          continue;
191
192       result = aux;
193       break;
194    }
195
196    return (a_cachedServer = result);
197 }
198
199 //_________________________________________________________________________________
200 // (1) Si termina el periodo de espera sin devolver ninguna actividad => podemos considerar que
201 // se ha conectado. Es posible que haya algún componente hardware que no permita hacer
202 // la conexión.
203 //_________________________________________________________________________________
204 void comm::ClientSocket::connect()
205 throw(RuntimeException) {
206    Guard guard(*this, "comm::ClientSocket (connect)");
207    anna_socket_assert(isConnected() == true, "Already connected");
208
209    if (Socket::isOpened() == false) {
210       Socket::open();
211       getSocketOptions();
212       setBlockingMode(false);
213    }
214
215    if (Socket::a_localAccessPoint.isNull() == false && Socket::isBound() == false)
216       Socket::bind();
217
218    sockaddr* s(NULL);
219    int len(0);
220    a_remoteAccessPoint.translate(*this, s, len);
221    a_cachedServer = NULL;
222    const Millisecond msinit = functions::millisecond();
223    Millisecond msnow;
224    int xerrno;
225    int r;
226    int tryCounter(0);
227    pollfd waiting;
228    msnow = msinit;
229    waiting.fd = Socket::a_fd;
230    waiting.events = POLLOUT;
231
232    while ((msnow - msinit) <= a_msMaxConnectionDelay) {
233       tryCounter ++;
234
235       try {
236          if ((r = do_connect(s, len)) != -1)
237             break;
238       } catch (RuntimeException&) {
239          close();
240          throw;
241       }
242
243       if ((xerrno = errno) == EISCONN) {
244          r = 0;
245          break;
246       }
247
248       if (xerrno != EINPROGRESS && xerrno != EALREADY)
249          break;
250
251       anna_signal_shield(r, poll(&waiting, 1, a_msMaxConnectionDelay));
252
253       if (r == 0) {                                                                           // (1)
254          r = -1;
255          xerrno = ETIMEDOUT;
256       }
257
258       msnow = functions::millisecond();
259    }
260
261    if (r == -1) {
262       std::string msg("comm::ClientSocket::connect | ");
263       msg += asString();
264       msg += " | N-loop: ";
265       msg += functions::asString(tryCounter);
266       msg += " | Wait: ";
267       msg += functions::asString(msnow - msinit);
268       msg += " ms";
269       close();
270       throw RuntimeException(msg, xerrno, ANNA_FILE_LOCATION);
271    }
272
273    activate(Status::Connected);
274    a_offset = 0;
275    LOGDEBUG(
276       string msg("comm::ClientSocket::connect | ");
277       msg += asString();
278       Logger::debug(msg, ANNA_FILE_LOCATION);
279    );
280 }
281
282 void comm::ClientSocket::send(comm::Message& message)
283 throw(RuntimeException) {
284    Guard guard(*this, "comm::ClientSocket (send)");
285    do_write(getTransport()->code(message));
286 }
287
288 void comm::ClientSocket::send(comm::Message* message)
289 throw(RuntimeException) {
290    if (message == NULL)
291       throw RuntimeException("comm::ClientSocket::send | Cannot send a NULL message", ANNA_FILE_LOCATION);
292
293    send(*message);
294 }
295
296 //_______________________________________________________________________
297 // Si el clientsocket no esta marcado como "en proceso", es decir, todavia estamos
298 // recogiendo mensajes de el, se cierra directamente, en otro cpero aso se marca
299 // como pendiente de cerrar y cuando se recogan todos los mensajes que contiene
300 // su buffer, se cerrará.
301 //_______________________________________________________________________
302 void comm::ClientSocket::requestClose()
303 throw() {
304    Communicator* communicator = app::functions::component <Communicator> (ANNA_FILE_LOCATION);
305    a_status |= Status::ClosePending;
306    // Sólo tiene efecto real en modo ST
307    communicator->notifyPendingClose();
308    /*
309       else {
310          communicator->detach (this);
311       }
312    */
313 }
314
315 //---------------------------------------------------------------------------------------
316 // Cuando el communicator detecta actividad en el socket invoca a este metodo.
317 // (1) Si devuelve 0 es porque han borrado el extremo remoto.
318 // (2) Si el buffer intermedio tiene mas de 4 veces el numero de bytes planificado =>
319 // lo cerramos independientemente del tamanio esperado.
320 // (3) Si el buffer tiene un numero de bytes mayor al planificiado y todavia no ha
321 // detectado ningun mensaje => lo cerramos.
322 //
323 // Se invoca desde un entorno protegido, por lo que no hace falta SSCC.
324 //---------------------------------------------------------------------------------------
325 comm::Socket::Notify::_v comm::ClientSocket::receive()
326 throw(RuntimeException) {
327    Notify::_v result;
328    int queueLength = 0;
329
330    if (a_reader.getMaxSize() == 0) {
331       string msg(asString());
332       msg += " | I/O buffer length was not established";
333       throw RuntimeException(msg, ANNA_FILE_LOCATION);
334    }
335
336    const char* buffer(a_reader.getData());
337
338    const int r(do_read(buffer, a_reader.getMaxSize()));
339
340    a_buffer.setup(buffer, r);
341
342    // Si ha leido el máximo permitido, entonces obtiene los bytes que quedan por leer en el sistema I/O.
343    // Si ha leído menos del máximo, entonces no queda nada en el sistema I/O
344    if (r == a_reader.getMaxSize()) {
345       if (ioctl(a_fd, FIONREAD, &a_pendingBytes.bytesToRead) != 0)
346          a_pendingBytes.bytesToRead = 0;
347    } else
348       a_pendingBytes.bytesToRead = 0;
349
350    LOGDEBUG(
351       string msg("comm::ClientSocket::receive | ");
352       msg += asString();
353       msg += " | Pending: ";
354       msg += anna::functions::asString(a_data, 24);
355       msg += functions::asText(" | Received N-bytes: ", a_buffer.getSize());
356       msg += ", Buffer: ";
357       msg += anna::functions::asString(a_buffer, 24);
358       Logger::debug(msg, ANNA_FILE_LOCATION);
359    );
360
361    if (r > 0) {                                                              // (1)
362       if (getIgnoreIncomingMessages() == true) {
363          app::functions::component <Communicator> (ANNA_FILE_LOCATION)->eventIgnoreBurst(*this, a_buffer);
364          forgot();
365          LOGDEBUG(
366             string msg("comm::ClientSocket::receive | ");
367             msg += asString();
368             msg += " | Ignoring incoming messages";
369             Logger::debug(msg, ANNA_FILE_LOCATION);
370          );
371          return Notify::None;
372       }
373
374       result = Notify::ReceiveData;
375       a_data += a_buffer;
376       const int overQuotaSize(getTransport()->getOverQuotaSize());
377
378       if (a_expectedSize == -1)
379          calculeExpectedSize(a_data);
380
381       if (a_expectedSize == -1 && getBufferSize() > overQuotaSize) {                         // (3)
382          string msg(asString());
383          msg += functions::asText(" | Closed local point due to excessive memory consumption: N-Bytes: ", r);
384          msg += functions::asText("/BufferSize: ", getBufferSize());
385          msg += functions::asText("/OverQuotaSize: ", overQuotaSize);
386          msg += ')';
387          Logger::error(msg, ANNA_FILE_LOCATION);
388          app::functions::component <Communicator> (ANNA_FILE_LOCATION)->eventOverQuota(*this);
389          a_data.clear();
390          a_offset = 0;
391          result = Notify::Close;
392       } else if (isCorrupt() == true)
393          result = Notify::Corrupt;
394    } else
395       result = Notify::Close;
396
397    return result;
398 }
399
400 comm::Socket::Notify::_v comm::ClientSocket::wait(const Millisecond &timeout, const bool _receive)
401 throw(RuntimeException) {
402    Guard guard(*this, "comm::ClientSocket (wait)");
403
404    if (Socket::isOpened() == false) {
405       string msg("comm::ClientSocket::wait | ");
406       msg += asString();
407       msg += " | Is not opened";
408       throw RuntimeException(msg, ANNA_FILE_LOCATION);
409    }
410
411    Poll poll;
412    poll.setTimeout(timeout);
413    poll.insert(a_fd);
414    poll.waitMessage();
415    Notify::_v result = (poll.fetch() != -1) ? ((_receive == true) ? receive() : Notify::ReceiveData) : Notify::None;
416    LOGDEBUG(
417       string msg("comm::ClientSocket::wait | Timeout: ");
418       msg += functions::asString(timeout);
419       msg += functions::asText(" ms | Receive: ", _receive);
420       msg += functions::asText(" | fd: ", getfd());
421       msg += " | Result: ";
422       msg += Socket::asText(result);
423       Logger::debug(msg, ANNA_FILE_LOCATION);
424    );
425    return result;
426 }
427
428 //---------------------------------------------------------------------------------------
429 // Cuando el ClientSocket notifica que ha recibido datos, el communicator invoca a este
430 // metodo por si hubiera algn mensaje completo.
431 //
432 // (1) Si todavia no conoce el tamao esperado del mensaje => intenta calcularlo.
433 // (2) Si ya conoce la longitud y en el buffer recibido quedan , al menos, ese nmero
434 // de bytes => hay al menos un mensaje completo. Lo procesa y deja todo preparado
435 // para la proxima invocacion de este metodo.
436 // (3) Todavia no tenemos un mensajes completo, lo guardamos y esperamos a que se vuelva
437 // a leer lo que falta desde 'ClientSocket::receive'.
438 // (4) Si nos quedan bytes por analizar pero no tenemos suficientes para calcular
439 // la longitud esperada => prepara el a_data para recibir el siguiente bloque.
440 //---------------------------------------------------------------------------------------
441 const DataBlock* comm::ClientSocket::fetch()
442 throw(RuntimeException) {
443    if (a_status & Status::ClosePending)
444       return NULL;
445
446    const int remainingSize(a_data.getSize() - a_offset);
447
448    if (remainingSize <= 0) {
449       a_data.clear();
450       a_offset = 0;
451       return NULL;
452    }
453
454    const DataBlock* result(NULL);
455
456    a_buffer.setup(a_data.getData() + a_offset, remainingSize);
457
458    if (a_expectedSize == -1)                                               // (1)
459       calculeExpectedSize(a_buffer);
460
461    if (isCorrupt() == true)
462       return NULL;
463
464    if (a_expectedSize != -1) {
465       if (a_expectedSize <= remainingSize) {                               // (2)
466          a_buffer.resize(a_expectedSize);
467          result = &a_buffer;
468          a_offset += a_expectedSize;
469          a_expectedSize = -1;
470       }
471
472       // (3)
473    }
474
475    // (4)
476
477    if (result == NULL) {
478       if (a_offset > 0) {
479          if (a_offset >= a_data.getSize())
480             a_data.clear();
481          else
482             a_data.remove(a_offset);
483
484          a_offset = 0;
485       }
486
487       LOGDEBUG(
488          string msg("comm::ClientSocket::fetch | ");
489          msg += asString();
490          msg += functions::asText(" | N-bytes: ", a_data.getSize());
491          msg += " | Pending: ";
492          msg += anna::functions::asString(a_data, 24);
493          Logger::debug(msg, ANNA_FILE_LOCATION);
494       );
495    }
496
497    return result;
498 }
499
500 void comm::ClientSocket::calculeExpectedSize(const DataBlock& data)
501 throw() {
502    try {
503       if ((a_expectedSize = getTransport()->calculeSize(data)) != -1) {
504          LOGDEBUG(
505             string msg("comm::ClientSocket::calculeExpectedSize | ");
506             msg += asString();
507             Logger::debug(msg, ANNA_FILE_LOCATION)
508          );
509       }
510
511       if (a_expectedSize < -1) {
512          activate(Status::Corrupt);
513          string msg(asString());
514          msg += " | Invalid expected size";
515          Logger::error(msg, ANNA_FILE_LOCATION);
516       }
517    } catch (RuntimeException& ex) {
518       ex.trace();
519       activate(Status::Corrupt);
520    }
521 }
522
523 comm::Transport* comm::ClientSocket::reserveTransport()
524 throw(RuntimeException) {
525    if (a_transport == NULL) {
526       if (Socket::a_transportFactory == NULL) {
527          string msg(asString());
528          msg += " | Transport factory was not especified";
529          throw RuntimeException(msg, ANNA_FILE_LOCATION);
530       }
531
532       Guard guard(this, "comm::ClientSocket::reserveTransport");
533
534       if (a_transport == NULL) {
535          Guard guard(Socket::a_transportFactory, typeid(Socket::a_transportFactory).name());
536          a_transport = Socket::a_transportFactory->create();
537          a_transport->clear();
538       }
539    }
540
541    return a_transport;
542 }
543
544 comm::Transport* comm::ClientSocket::unsafe_reserveTransport()
545 throw(RuntimeException) {
546    if (a_transport == NULL) {
547       if (Socket::a_transportFactory == NULL) {
548          string msg(asString());
549          msg += " | Transport factory was not especified";
550          throw RuntimeException(msg, ANNA_FILE_LOCATION);
551       }
552
553       a_transport = Socket::a_transportFactory->create();
554       a_transport->clear();
555    }
556
557    return a_transport;
558 }
559
560 comm::Receiver* comm::ClientSocket::reserveReceiver()
561 throw(RuntimeException) {
562    if (a_receiver == NULL && Socket::a_receiverFactory != NULL) {
563       Guard guard(this, "comm::ClientSocket::reserveReceiver");
564
565       // NO hace falta proteger porque ya lo hace su 'create'
566       if (a_receiver == NULL)
567          a_receiver = Socket::a_receiverFactory->create();
568    }
569
570    return a_receiver;
571 }
572
573 // Este metodo se sobre-escribira en commsec::ClientSocket para establecer la conexion mediante las
574 // funciones SSH
575 int comm::ClientSocket::do_connect(const sockaddr* s, const  int len)
576 throw(RuntimeException) {
577    int r;
578    anna_signal_shield(r, ::connect(a_fd, s, len));
579    return r;
580 }
581
582 //----------------------------------------------------------------------------------------------------
583 // Este metodo se sobre-escribe en comm::DatagramSocket
584 //
585 // (1) si no ha sido capaz de escribir todo el mensaje tenemos que esperar a que el bloque anterior
586 // haya salido, ya que si intentamos escribir de forma contienua habra un momento que obtengamos
587 // un error 'Resource temporarily unavailable'.
588 // (2) La cola de salida esta llena momentaneamente, esperamos unos milisegundos para volver a
589 // tratar de enviar.
590 // (3) Obtenmos el error "Broken Pipe" ya solo se puede cerrar el socket y tratar de comenzar
591 // (4) Si envió un trozo, pero no pudo enviar el mensaje completo => cierra el socket para evitar
592 //     problemas de interpretación en el extremo remoto.
593 //----------------------------------------------------------------------------------------------------
594 void comm::ClientSocket::do_write(const DataBlock& message)
595 throw(RuntimeException) {
596    int size = message.getSize();
597    const char* data =  message.getData();
598    int r(0);
599    int nloop(0);
600    int cx(0);
601    bool retry = true;
602    bool isok = false;
603    int xerrno;
604    bool sendSomething = false;
605
606    // Si ha enviado parte del mensaje => tiene que enviarlo completo
607    while (retry == true && cx < 5) {
608       anna_signal_shield(r, write(Socket::a_fd, data, size));
609
610       if (r == size) {
611          isok = true;
612          break;
613       }
614
615       xerrno = errno;
616
617       if (r < 0) {
618          if (xerrno == EAGAIN) {                              // (2)
619             cx ++;
620             pollfd waiting;
621             waiting.fd = Socket::a_fd;
622             waiting.events = POLLOUT;
623             anna_signal_shield(r, poll(&waiting, 1, a_msMaxWriteDelay));
624          } else {
625             // Si hay un error definitivo deja de intentarlo y lanza la excepción.
626             // Si se ha enviado algo supondremos que el extremo remoto es capaz de recuperarse
627             retry = false;
628          }
629       } else  { /* (r < left) */
630          nloop ++;
631          data += r;
632          size -= r;
633          cx = 0;
634          sendSomething = true;
635       }
636    }
637
638    if (isok == false) {
639       if (xerrno == EPIPE || sendSomething == true) {                               // (3)(4)
640 //         activate (Status::ClosePending);
641          requestClose();
642          r = shutdown(getfd(), SHUT_WR);
643       }
644
645       string msg("comm::ClientSocket::do_write | fd: ");
646       msg += functions::asString(getfd());
647       throw RuntimeException(msg, xerrno, ANNA_FILE_LOCATION);
648    }
649
650    LOGDEBUG(
651       string msg("comm::ClientSocket::do_write | ");
652       msg += asString();
653       msg += functions::asText(" | N-Loop: ", nloop);
654       msg += functions::asText(" | Sent: ", message);
655       Logger::debug(msg, ANNA_FILE_LOCATION);
656    );
657 }
658
659 //----------------------------------------------------------------------------
660 // (1) Cuando intentamos leer del socket asociado a un servidor que ha caido
661 //     no devuelve 0 (como cuando se cae el socket de un cliente) sino que
662 //     devuelve el error (ECONNRESET = 131).
663 //
664 // Devuelve 0 para indicar que el Socket se ha cerrado en el extremo remoto
665 //----------------------------------------------------------------------------
666 int comm::ClientSocket::do_read(const char* data, const int maxSize)
667 throw(RuntimeException) {
668    int result;
669
670    do {
671       if ((result = ::read(Socket::a_fd, (void*) data, maxSize)) < 0) {
672          if (errno == EINTR)
673             continue;
674
675          if (errno == ECONNRESET) { // (1)
676             result = 0;
677             break;
678          }
679
680          string msg(asString());
681          msg += " | Cannot receive";
682          throw RuntimeException(msg, errno, ANNA_FILE_LOCATION);
683       }
684    } while (errno == EINTR && result < 0);
685
686    return result;
687 }
688
689 void comm::ClientSocket::forgot()
690 throw() {
691    if (a_transport != NULL)
692       a_transport->clear();
693
694    deactivate(Status::Corrupt);
695    a_expectedSize = -1;
696    a_data.clear();
697    a_offset = 0;
698 }
699
700 //--------------------------------------------------------------------------------------
701 // (1) Recupera el tamano para verificar que se ha podido establecer la solicitud.
702 //--------------------------------------------------------------------------------------
703 void comm::ClientSocket::getSocketOptions()
704 throw(RuntimeException) {
705    if ((a_rcvBufferSize = comm::Communicator::getReceivingChunkSize()) == -1) {
706       socklen_t l = sizeof(int);
707       int fd = Socket::getfd();
708       anna_comm_socket_check(
709          getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &a_rcvBufferSize, &l),
710          "Cannot obtain SO_RCVBUF"
711       );
712    }
713
714    a_reader.allocate(a_rcvBufferSize);
715 }
716
717 int comm::ClientSocket::getTotalPendingBytes() const
718 throw() {
719    // El nº de bytes pendientes de procesar será lo que queda en la cola I/O + los bytes que tenemos cargados en memoria
720    // los bytes pendientes se obtiene al leer el chunk
721    return a_pendingBytes.bytesToRead + getBufferSize();
722 }
723
724
725 string comm::ClientSocket::asString() const
726 throw() {
727    string msg("comm::ClientSocket { ");
728
729    if (this == NULL)
730       return msg += " <freed> }";
731
732    msg += Socket::asString();
733    msg += functions::asText(" | RcvBufferSize: ", a_rcvBufferSize);
734    msg += " bytes | Status: ";
735    msg += Status::asString(a_status);
736    msg += functions::asString("| MaxConDelay: %d ms", a_msMaxConnectionDelay.getValue());
737
738    if (a_transport)
739       msg += functions::asText(" | OverQuotaSize: ", a_transport->getOverQuotaSize());
740
741    msg += functions::asString(" | Reserved: %d | Pending: %d | Offset: %d | ExpectedSize: %d", a_data.getSize(), getBufferSize(), a_offset, a_expectedSize);
742    msg += " | Remote access point: ";
743    a_remoteAccessPoint.asString(msg);
744    return msg += " }";
745 }
746
747 xml::Node* comm::ClientSocket::asXML(xml::Node* parent) const
748 throw(RuntimeException) {
749    xml::Node* clientSocket = parent->createChild("comm.ClientSocket");
750
751    if (this == NULL) {
752       clientSocket->createAttribute("Freed", "yes");
753       return clientSocket;
754    }
755
756    Socket::asXML(clientSocket);
757    clientSocket->createAttribute("Status", Status::asString(a_status));
758    clientSocket->createAttribute("RcvBufferSize", a_rcvBufferSize);
759    clientSocket->createAttribute("MaxConnDelay", a_msMaxConnectionDelay);
760    clientSocket->createAttribute("IgnoreIncomingMessages", functions::asString(a_ignoreIncomingMessages));
761
762    if (a_transport)
763       clientSocket->createAttribute("OverQuotaSize", a_transport->getOverQuotaSize());
764
765    xml::Node* buffer = clientSocket->createChild("Buffer");
766    buffer->createAttribute("Reserved", a_data.getSize());
767    buffer->createAttribute("Pending", getTotalPendingBytes());
768    a_remoteAccessPoint.asXML("comm.RemotePoint", clientSocket);
769
770    if (a_receiver)
771       a_receiver->asXML(clientSocket);
772
773    return clientSocket;
774 }
775
776 string comm::ClientSocket::Status::asString(const int status)
777 throw() {
778    string result;
779
780    if (status == 0)
781       result = "None ";
782    else {
783       if (status & Status::Connected)
784          result = "Connected ";
785
786       if (status & Status::Corrupt)
787          result += "Corrupt ";
788
789       if (status & Status::ClosePending)
790          result += "ClosePending ";
791
792       if (status & Status::Working)
793          result += "Working ";
794    }
795
796    return result;
797 }
798
799