acf2bd312639c2cf8671023f100001c619dd8863
[anna.git] / source / comm / ClientSocket.cpp
1 // ANNA - Anna is Not Nothingness Anymore                                                         //
2 //                                                                                                //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo                         //
4 //                                                                                                //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite                           //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
7
8
9 #include <sys/ioctl.h>
10 #include <sys/time.h>
11
12 #include <poll.h>
13
14 //#include <asm/ioctls.h>
15
16 #include <algorithm>
17
18 #include <anna/core/tracing/TraceMethod.hpp>
19 #include <anna/core/functions.hpp>
20
21 #include <anna/xml/Node.hpp>
22 #include <anna/xml/Attribute.hpp>
23
24 #include <anna/app/functions.hpp>
25
26 #include <anna/comm/ClientSocket.hpp>
27 #include <anna/comm/INetAddress.hpp>
28 #include <anna/comm/TransportFactory.hpp>
29 #include <anna/comm/Transport.hpp>
30 #include <anna/comm/Message.hpp>
31 #include <anna/comm/Network.hpp>
32 #include <anna/comm/Host.hpp>
33 #include <anna/comm/Server.hpp>
34 #include <anna/comm/Communicator.hpp>
35 #include <anna/comm/ReceiverFactory.hpp>
36 #include <anna/comm/Receiver.hpp>
37 #include <anna/comm/CongestionController.hpp>
38
39 #include <anna/comm/internal/Poll.hpp>
40
41 using namespace std;
42 using namespace anna;
43
44 // static
45 const Millisecond comm::ClientSocket::DefaultMaxConnectionDelay(200);
46 const Millisecond comm::ClientSocket::DefaultMaxWriteDelay(500);
47
48
49 void comm::ClientSocket::initialize()
50 throw() {
51    a_status = Status::None;
52    a_transport = NULL;
53    a_expectedSize = -1;
54    a_offset = 0;
55    a_cachedServer = NULL;
56    a_msMaxConnectionDelay = DefaultMaxConnectionDelay;
57    a_msMaxWriteDelay = DefaultMaxWriteDelay;
58    a_rcvBufferSize = -1;
59    a_receiver = NULL;
60 }
61
62 /*
63  * Se invoca desde el handler::ServerSocket y no hace falta protegerlo con una SCC
64  * porque ya está protegido por éste.
65  */
66 void comm::ClientSocket::setfd(const int fd)
67 throw(RuntimeException) {
68    Socket::a_fd = fd;
69    Socket::a_isBound = true;
70    activate(Status::Connected);
71    a_offset = 0;
72
73    if (Socket::a_type == Socket::Type::Stream) {
74       sockaddr_in addr;
75       socklen_t len(sizeof(addr));
76       Network& network(Network::instantiate());
77       anna_comm_socket_check(
78          getsockname(fd, (struct sockaddr *) &addr, &len),
79          "Cannot obtain socket name"
80       );
81       Socket::a_localAccessPoint = INetAddress(network.find(addr.sin_addr.s_addr), ntohs(addr.sin_port));
82       anna_comm_socket_check(
83          getpeername(fd, (struct sockaddr *) &addr, &len),
84          "Cannot obtain peer name"
85       );
86       a_remoteAccessPoint = INetAddress(network.find(addr.sin_addr.s_addr), ntohs(addr.sin_port));
87    }
88
89    setBlockingMode(false);
90    getSocketOptions();
91    LOGDEBUG(
92       string msg("comm::ClientSocket::setfd | ");
93       msg += asString();
94       Logger::debug(msg, ANNA_FILE_LOCATION);
95    );
96 }
97
98 /*
99  * Si se establece el receptor directamente, sin tener una factoria intermedia, hay que
100  * invocar al metodo comm::Receiver::initialize.
101  */
102 void comm::ClientSocket::setReceiver(comm::Receiver* receiver)
103 throw(RuntimeException) {
104    if (a_receiver && Socket::a_receiverFactory)
105       Socket::a_receiverFactory->release(a_receiver);
106
107    a_receiver = receiver;
108
109    if (a_receiverFactory == NULL)
110       a_receiver->initialize();
111 }
112
113 // Protegido desde el Socket::close
114 void comm::ClientSocket::do_close()
115 throw() {
116    Socket::do_close();
117
118    if (a_transport != NULL) {
119       a_transport->clear();
120       Guard guard(Socket::a_transportFactory, typeid(Socket::a_transportFactory).name());
121       Socket::a_transportFactory->release(a_transport);
122       a_transport = NULL;
123    }
124
125    a_status = Status::None;
126    a_expectedSize = -1;
127    a_data.clear();
128    a_cachedServer = NULL;
129    a_offset = 0;
130
131    if (a_receiver) {
132       if (Socket::a_receiverFactory)
133          Socket::a_receiverFactory->release(a_receiver);
134
135       a_receiver = NULL;
136    }
137 }
138
139 comm::Server* comm::ClientSocket::getServer()
140 throw(RuntimeException) {
141    Guard guard(*this, "comm::ClientSocket (getServer)");
142    const INetAddress& remoteAddress = getRemoteAccessPoint().getINetAddress();
143    const Device* device = remoteAddress.getDevice(false);
144    const int remotePort = remoteAddress.getPort();
145
146    if (device == NULL)
147       return NULL;
148
149    if (a_cachedServer != NULL)
150       if (a_cachedServer->getRemotePort() == remotePort && a_cachedServer->getHost()->contains(device))
151          return a_cachedServer;
152
153    Network& network = Network::instantiate();
154    comm::Server* result = NULL;
155    comm::Server* aux;
156
157    for (Network::host_iterator ii = network.host_begin(), maxii = network.host_end(); ii != maxii; ii ++) {
158       if (Network::host(ii)->contains(device) == false)
159          continue;
160
161       if ((aux = Network::host(ii)->find_server(remotePort)) == NULL)
162          continue;
163
164       result = aux;
165       break;
166    }
167
168    return (a_cachedServer = result);
169 }
170
171 //_________________________________________________________________________________
172 // (1) Si termina el periodo de espera sin devolver ninguna actividad => podemos considerar que
173 // se ha conectado. Es posible que haya algún componente hardware que no permita hacer
174 // la conexión.
175 //_________________________________________________________________________________
176 void comm::ClientSocket::connect()
177 throw(RuntimeException) {
178    Guard guard(*this, "comm::ClientSocket (connect)");
179    anna_socket_assert(isConnected() == true, "Already connected");
180
181    if (Socket::isOpened() == false) {
182       Socket::open();
183       getSocketOptions();
184       setBlockingMode(false);
185    }
186
187    if (Socket::a_localAccessPoint.isNull() == false && Socket::isBound() == false)
188       Socket::bind();
189
190    sockaddr* s(NULL);
191    int len(0);
192    a_remoteAccessPoint.translate(*this, s, len);
193    a_cachedServer = NULL;
194    const Millisecond msinit = functions::millisecond();
195    Millisecond msnow;
196    int xerrno;
197    int r;
198    int tryCounter(0);
199    pollfd waiting;
200    msnow = msinit;
201    waiting.fd = Socket::a_fd;
202    waiting.events = POLLOUT;
203
204    while ((msnow - msinit) <= a_msMaxConnectionDelay) {
205       tryCounter ++;
206
207       try {
208          if ((r = do_connect(s, len)) != -1)
209             break;
210       } catch (RuntimeException&) {
211          close();
212          throw;
213       }
214
215       if ((xerrno = errno) == EISCONN) {
216          r = 0;
217          break;
218       }
219
220       if (xerrno != EINPROGRESS && xerrno != EALREADY)
221          break;
222
223       anna_signal_shield(r, poll(&waiting, 1, a_msMaxConnectionDelay));
224
225       if (r == 0) {                                                                           // (1)
226          r = -1;
227          xerrno = ETIMEDOUT;
228       }
229
230       msnow = functions::millisecond();
231    }
232
233    if (r == -1) {
234       std::string msg("comm::ClientSocket::connect | ");
235       msg += asString();
236       msg += " | N-loop: ";
237       msg += functions::asString(tryCounter);
238       msg += " | Wait: ";
239       msg += functions::asString(msnow - msinit);
240       msg += " ms";
241       close();
242       throw RuntimeException(msg, xerrno, ANNA_FILE_LOCATION);
243    }
244
245    activate(Status::Connected);
246    a_offset = 0;
247    LOGDEBUG(
248       string msg("comm::ClientSocket::connect | ");
249       msg += asString();
250       Logger::debug(msg, ANNA_FILE_LOCATION);
251    );
252 }
253
254 void comm::ClientSocket::send(comm::Message& message)
255 throw(RuntimeException) {
256    Guard guard(*this, "comm::ClientSocket (send)");
257    do_write(getTransport()->code(message));
258 }
259
260 void comm::ClientSocket::send(comm::Message* message)
261 throw(RuntimeException) {
262    if (message == NULL)
263       throw RuntimeException("comm::ClientSocket::send | Cannot send a NULL message", ANNA_FILE_LOCATION);
264
265    send(*message);
266 }
267
268 //_______________________________________________________________________
269 // Si el clientsocket no esta marcado como "en proceso", es decir, todavia estamos
270 // recogiendo mensajes de el, se cierra directamente, en otro cpero aso se marca
271 // como pendiente de cerrar y cuando se recogan todos los mensajes que contiene
272 // su buffer, se cerrará.
273 //_______________________________________________________________________
274 void comm::ClientSocket::requestClose()
275 throw() {
276    Communicator* communicator = app::functions::component <Communicator> (ANNA_FILE_LOCATION);
277    a_status |= Status::ClosePending;
278    // Sólo tiene efecto real en modo ST
279    communicator->notifyPendingClose();
280    /*
281       else {
282          communicator->detach (this);
283       }
284    */
285 }
286
287 //---------------------------------------------------------------------------------------
288 // Cuando el communicator detecta actividad en el socket invoca a este metodo.
289 // (1) Si devuelve 0 es porque han borrado el extremo remoto.
290 // (2) Si el buffer intermedio tiene mas de 4 veces el numero de bytes planificado =>
291 // lo cerramos independientemente del tamanio esperado.
292 // (3) Si el buffer tiene un numero de bytes mayor al planificiado y todavia no ha
293 // detectado ningun mensaje => lo cerramos.
294 //
295 // Se invoca desde un entorno protegido, por lo que no hace falta SSCC.
296 //---------------------------------------------------------------------------------------
297 comm::Socket::Notify::_v comm::ClientSocket::receive()
298 throw(RuntimeException) {
299    Notify::_v result;
300    int queueLength = 0;
301
302    if (a_reader.getMaxSize() == 0) {
303       string msg(asString());
304       msg += " | I/O buffer length was not established";
305       throw RuntimeException(msg, ANNA_FILE_LOCATION);
306    }
307
308    const char* buffer(a_reader.getData());
309
310    const int r(do_read(buffer, a_reader.getMaxSize()));
311
312    a_buffer.setup(buffer, r);
313
314    // Si ha leido el máximo permitido, entonces obtiene los bytes que quedan por leer en el sistema I/O.
315    // Si ha leído menos del máximo, entonces no queda nada en el sistema I/O
316    if (r == a_reader.getMaxSize()) {
317       if (ioctl(a_fd, FIONREAD, &a_pendingBytes.bytesToRead) != 0)
318          a_pendingBytes.bytesToRead = 0;
319    } else
320       a_pendingBytes.bytesToRead = 0;
321
322    LOGDEBUG(
323       string msg("comm::ClientSocket::receive | ");
324       msg += asString();
325       msg += " | Pending: ";
326       msg += anna::functions::asString(a_data, 24);
327       msg += functions::asText(" | Received N-bytes: ", a_buffer.getSize());
328       msg += ", Buffer: ";
329       msg += anna::functions::asString(a_buffer, 24);
330       Logger::debug(msg, ANNA_FILE_LOCATION);
331    );
332
333    if (r > 0) {                                                              // (1)
334       if (getIgnoreIncomingMessages() == true) {
335          app::functions::component <Communicator> (ANNA_FILE_LOCATION)->eventIgnoreBurst(*this, a_buffer);
336          forgot();
337          LOGDEBUG(
338             string msg("comm::ClientSocket::receive | ");
339             msg += asString();
340             msg += " | Ignoring incoming messages";
341             Logger::debug(msg, ANNA_FILE_LOCATION);
342          );
343          return Notify::None;
344       }
345
346       result = Notify::ReceiveData;
347       a_data += a_buffer;
348       const int overQuotaSize(getTransport()->getOverQuotaSize());
349
350       if (a_expectedSize == -1)
351          calculeExpectedSize(a_data);
352
353       if (a_expectedSize == -1 && getBufferSize() > overQuotaSize) {                         // (3)
354          string msg(asString());
355          msg += functions::asText(" | Closed local point due to excessive memory consumption: N-Bytes: ", r);
356          msg += functions::asText("/BufferSize: ", getBufferSize());
357          msg += functions::asText("/OverQuotaSize: ", overQuotaSize);
358          msg += ')';
359          Logger::error(msg, ANNA_FILE_LOCATION);
360          app::functions::component <Communicator> (ANNA_FILE_LOCATION)->eventOverQuota(*this);
361          a_data.clear();
362          a_offset = 0;
363          result = Notify::Close;
364       } else if (isCorrupt() == true)
365          result = Notify::Corrupt;
366    } else
367       result = Notify::Close;
368
369    return result;
370 }
371
372 comm::Socket::Notify::_v comm::ClientSocket::wait(const Millisecond &timeout, const bool _receive)
373 throw(RuntimeException) {
374    Guard guard(*this, "comm::ClientSocket (wait)");
375
376    if (Socket::isOpened() == false) {
377       string msg("comm::ClientSocket::wait | ");
378       msg += asString();
379       msg += " | Is not opened";
380       throw RuntimeException(msg, ANNA_FILE_LOCATION);
381    }
382
383    Poll poll;
384    poll.setTimeout(timeout);
385    poll.insert(a_fd);
386    poll.waitMessage();
387    Notify::_v result = (poll.fetch() != -1) ? ((_receive == true) ? receive() : Notify::ReceiveData) : Notify::None;
388    LOGDEBUG(
389       string msg("comm::ClientSocket::wait | Timeout: ");
390       msg += functions::asString(timeout);
391       msg += functions::asText(" ms | Receive: ", _receive);
392       msg += functions::asText(" | fd: ", getfd());
393       msg += " | Result: ";
394       msg += Socket::asText(result);
395       Logger::debug(msg, ANNA_FILE_LOCATION);
396    );
397    return result;
398 }
399
400 //---------------------------------------------------------------------------------------
401 // Cuando el ClientSocket notifica que ha recibido datos, el communicator invoca a este
402 // metodo por si hubiera algn mensaje completo.
403 //
404 // (1) Si todavia no conoce el tamao esperado del mensaje => intenta calcularlo.
405 // (2) Si ya conoce la longitud y en el buffer recibido quedan , al menos, ese nmero
406 // de bytes => hay al menos un mensaje completo. Lo procesa y deja todo preparado
407 // para la proxima invocacion de este metodo.
408 // (3) Todavia no tenemos un mensajes completo, lo guardamos y esperamos a que se vuelva
409 // a leer lo que falta desde 'ClientSocket::receive'.
410 // (4) Si nos quedan bytes por analizar pero no tenemos suficientes para calcular
411 // la longitud esperada => prepara el a_data para recibir el siguiente bloque.
412 //---------------------------------------------------------------------------------------
413 const DataBlock* comm::ClientSocket::fetch()
414 throw(RuntimeException) {
415    if (a_status & Status::ClosePending)
416       return NULL;
417
418    const int remainingSize(a_data.getSize() - a_offset);
419
420    if (remainingSize <= 0) {
421       a_data.clear();
422       a_offset = 0;
423       return NULL;
424    }
425
426    const DataBlock* result(NULL);
427
428    a_buffer.setup(a_data.getData() + a_offset, remainingSize);
429
430    if (a_expectedSize == -1)                                               // (1)
431       calculeExpectedSize(a_buffer);
432
433    if (isCorrupt() == true)
434       return NULL;
435
436    if (a_expectedSize != -1) {
437       if (a_expectedSize <= remainingSize) {                               // (2)
438          a_buffer.resize(a_expectedSize);
439          result = &a_buffer;
440          a_offset += a_expectedSize;
441          a_expectedSize = -1;
442       }
443
444       // (3)
445    }
446
447    // (4)
448
449    if (result == NULL) {
450       if (a_offset > 0) {
451          if (a_offset >= a_data.getSize())
452             a_data.clear();
453          else
454             a_data.remove(a_offset);
455
456          a_offset = 0;
457       }
458
459       LOGDEBUG(
460          string msg("comm::ClientSocket::fetch | ");
461          msg += asString();
462          msg += functions::asText(" | N-bytes: ", a_data.getSize());
463          msg += " | Pending: ";
464          msg += anna::functions::asString(a_data, 24);
465          Logger::debug(msg, ANNA_FILE_LOCATION);
466       );
467    }
468
469    return result;
470 }
471
472 void comm::ClientSocket::calculeExpectedSize(const DataBlock& data)
473 throw() {
474    try {
475       if ((a_expectedSize = getTransport()->calculeSize(data)) != -1) {
476          LOGDEBUG(
477             string msg("comm::ClientSocket::calculeExpectedSize | ");
478             msg += asString();
479             Logger::debug(msg, ANNA_FILE_LOCATION)
480          );
481       }
482
483       if (a_expectedSize < -1) {
484          activate(Status::Corrupt);
485          string msg(asString());
486          msg += " | Invalid expected size";
487          Logger::error(msg, ANNA_FILE_LOCATION);
488       }
489    } catch (RuntimeException& ex) {
490       ex.trace();
491       activate(Status::Corrupt);
492    }
493 }
494
495 comm::Transport* comm::ClientSocket::reserveTransport()
496 throw(RuntimeException) {
497    if (a_transport == NULL) {
498       if (Socket::a_transportFactory == NULL) {
499          string msg(asString());
500          msg += " | Transport factory was not especified";
501          throw RuntimeException(msg, ANNA_FILE_LOCATION);
502       }
503
504       Guard guard(this, "comm::ClientSocket::reserveTransport");
505
506       if (a_transport == NULL) {
507          Guard guard(Socket::a_transportFactory, typeid(Socket::a_transportFactory).name());
508          a_transport = Socket::a_transportFactory->create();
509          a_transport->clear();
510       }
511    }
512
513    return a_transport;
514 }
515
516 comm::Transport* comm::ClientSocket::unsafe_reserveTransport()
517 throw(RuntimeException) {
518    if (a_transport == NULL) {
519       if (Socket::a_transportFactory == NULL) {
520          string msg(asString());
521          msg += " | Transport factory was not especified";
522          throw RuntimeException(msg, ANNA_FILE_LOCATION);
523       }
524
525       a_transport = Socket::a_transportFactory->create();
526       a_transport->clear();
527    }
528
529    return a_transport;
530 }
531
532 comm::Receiver* comm::ClientSocket::reserveReceiver()
533 throw(RuntimeException) {
534    if (a_receiver == NULL && Socket::a_receiverFactory != NULL) {
535       Guard guard(this, "comm::ClientSocket::reserveReceiver");
536
537       // NO hace falta proteger porque ya lo hace su 'create'
538       if (a_receiver == NULL)
539          a_receiver = Socket::a_receiverFactory->create();
540    }
541
542    return a_receiver;
543 }
544
545 // Este metodo se sobre-escribira en commsec::ClientSocket para establecer la conexion mediante las
546 // funciones SSH
547 int comm::ClientSocket::do_connect(const sockaddr* s, const  int len)
548 throw(RuntimeException) {
549    int r;
550    anna_signal_shield(r, ::connect(a_fd, s, len));
551    return r;
552 }
553
554 //----------------------------------------------------------------------------------------------------
555 // Este metodo se sobre-escribe en comm::DatagramSocket
556 //
557 // (1) si no ha sido capaz de escribir todo el mensaje tenemos que esperar a que el bloque anterior
558 // haya salido, ya que si intentamos escribir de forma contienua habra un momento que obtengamos
559 // un error 'Resource temporarily unavailable'.
560 // (2) La cola de salida esta llena momentaneamente, esperamos unos milisegundos para volver a
561 // tratar de enviar.
562 // (3) Obtenmos el error "Broken Pipe" ya solo se puede cerrar el socket y tratar de comenzar
563 // (4) Si envió un trozo, pero no pudo enviar el mensaje completo => cierra el socket para evitar
564 //     problemas de interpretación en el extremo remoto.
565 //----------------------------------------------------------------------------------------------------
566 void comm::ClientSocket::do_write(const DataBlock& message)
567 throw(RuntimeException) {
568    int size = message.getSize();
569    const char* data =  message.getData();
570    int r(0);
571    int nloop(0);
572    int cx(0);
573    bool retry = true;
574    bool isok = false;
575    int xerrno;
576    bool sendSomething = false;
577
578    // Si ha enviado parte del mensaje => tiene que enviarlo completo
579    while (retry == true && cx < 5) {
580       anna_signal_shield(r, write(Socket::a_fd, data, size));
581
582       if (r == size) {
583          isok = true;
584          break;
585       }
586
587       xerrno = errno;
588
589       if (r < 0) {
590          if (xerrno == EAGAIN) {                              // (2)
591             cx ++;
592             pollfd waiting;
593             waiting.fd = Socket::a_fd;
594             waiting.events = POLLOUT;
595             anna_signal_shield(r, poll(&waiting, 1, a_msMaxWriteDelay));
596          } else {
597             // Si hay un error definitivo deja de intentarlo y lanza la excepción.
598             // Si se ha enviado algo supondremos que el extremo remoto es capaz de recuperarse
599             retry = false;
600          }
601       } else  { /* (r < left) */
602          nloop ++;
603          data += r;
604          size -= r;
605          cx = 0;
606          sendSomething = true;
607       }
608    }
609
610    if (isok == false) {
611       if (xerrno == EPIPE || sendSomething == true) {                               // (3)(4)
612 //         activate (Status::ClosePending);
613          requestClose();
614          r = shutdown(getfd(), SHUT_WR);
615       }
616
617       string msg("comm::ClientSocket::do_write | fd: ");
618       msg += functions::asString(getfd());
619       throw RuntimeException(msg, xerrno, ANNA_FILE_LOCATION);
620    }
621
622    LOGDEBUG(
623       string msg("comm::ClientSocket::do_write | ");
624       msg += asString();
625       msg += functions::asText(" | N-Loop: ", nloop);
626       msg += functions::asText(" | Sent: ", message);
627       Logger::debug(msg, ANNA_FILE_LOCATION);
628    );
629 }
630
631 //----------------------------------------------------------------------------
632 // (1) Cuando intentamos leer del socket asociado a un servidor que ha caido
633 //     no devuelve 0 (como cuando se cae el socket de un cliente) sino que
634 //     devuelve el error (ECONNRESET = 131).
635 //
636 // Devuelve 0 para indicar que el Socket se ha cerrado en el extremo remoto
637 //----------------------------------------------------------------------------
638 int comm::ClientSocket::do_read(const char* data, const int maxSize)
639 throw(RuntimeException) {
640    int result;
641
642    do {
643       if ((result = ::read(Socket::a_fd, (void*) data, maxSize)) < 0) {
644          if (errno == EINTR)
645             continue;
646
647          if (errno == ECONNRESET) { // (1)
648             result = 0;
649             break;
650          }
651
652          string msg(asString());
653          msg += " | Cannot receive";
654          throw RuntimeException(msg, errno, ANNA_FILE_LOCATION);
655       }
656    } while (errno == EINTR && result < 0);
657
658    return result;
659 }
660
661 void comm::ClientSocket::forgot()
662 throw() {
663    if (a_transport != NULL)
664       a_transport->clear();
665
666    deactivate(Status::Corrupt);
667    a_expectedSize = -1;
668    a_data.clear();
669    a_offset = 0;
670 }
671
672 //--------------------------------------------------------------------------------------
673 // (1) Recupera el tamano para verificar que se ha podido establecer la solicitud.
674 //--------------------------------------------------------------------------------------
675 void comm::ClientSocket::getSocketOptions()
676 throw(RuntimeException) {
677    if ((a_rcvBufferSize = comm::Communicator::getReceivingChunkSize()) == -1) {
678       socklen_t l = sizeof(int);
679       int fd = Socket::getfd();
680       anna_comm_socket_check(
681          getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &a_rcvBufferSize, &l),
682          "Cannot obtain SO_RCVBUF"
683       );
684    }
685
686    a_reader.allocate(a_rcvBufferSize);
687 }
688
689 int comm::ClientSocket::getTotalPendingBytes() const
690 throw() {
691    // El nº de bytes pendientes de procesar será lo que queda en la cola I/O + los bytes que tenemos cargados en memoria
692    // los bytes pendientes se obtiene al leer el chunk
693    return a_pendingBytes.bytesToRead + getBufferSize();
694 }
695
696
697 string comm::ClientSocket::asString() const
698 throw() {
699    string msg("comm::ClientSocket { ");
700
701    if (this == NULL)
702       return msg += " <freed> }";
703
704    msg += Socket::asString();
705    msg += functions::asText(" | RcvBufferSize: ", a_rcvBufferSize);
706    msg += " bytes | Status: ";
707    msg += Status::asString(a_status);
708    msg += functions::asString("| MaxConDelay: %d ms", a_msMaxConnectionDelay.getValue());
709
710    if (a_transport)
711       msg += functions::asText(" | OverQuotaSize: ", a_transport->getOverQuotaSize());
712
713    msg += functions::asString(" | Reserved: %d | Pending: %d | Offset: %d | ExpectedSize: %d", a_data.getSize(), getBufferSize(), a_offset, a_expectedSize);
714    msg += " | Remote access point: ";
715    a_remoteAccessPoint.asString(msg);
716    return msg += " }";
717 }
718
719 xml::Node* comm::ClientSocket::asXML(xml::Node* parent) const
720 throw(RuntimeException) {
721    xml::Node* clientSocket = parent->createChild("comm.ClientSocket");
722
723    if (this == NULL) {
724       clientSocket->createAttribute("Freed", "yes");
725       return clientSocket;
726    }
727
728    Socket::asXML(clientSocket);
729    clientSocket->createAttribute("Status", Status::asString(a_status));
730    clientSocket->createAttribute("RcvBufferSize", a_rcvBufferSize);
731    clientSocket->createAttribute("MaxConnDelay", a_msMaxConnectionDelay);
732    clientSocket->createAttribute("IgnoreIncomingMessages", functions::asString(a_ignoreIncomingMessages));
733
734    if (a_transport)
735       clientSocket->createAttribute("OverQuotaSize", a_transport->getOverQuotaSize());
736
737    xml::Node* buffer = clientSocket->createChild("Buffer");
738    buffer->createAttribute("Reserved", a_data.getSize());
739    buffer->createAttribute("Pending", getTotalPendingBytes());
740    a_remoteAccessPoint.asXML("comm.RemotePoint", clientSocket);
741
742    if (a_receiver)
743       a_receiver->asXML(clientSocket);
744
745    return clientSocket;
746 }
747
748 string comm::ClientSocket::Status::asString(const int status)
749 throw() {
750    string result;
751
752    if (status == 0)
753       result = "None ";
754    else {
755       if (status & Status::Connected)
756          result = "Connected ";
757
758       if (status & Status::Corrupt)
759          result += "Corrupt ";
760
761       if (status & Status::ClosePending)
762          result += "ClosePending ";
763
764       if (status & Status::Working)
765          result += "Working ";
766    }
767
768    return result;
769 }
770
771