Avoid service overhead about checking netstat ports
[anna.git] / source / comm / ClientSocket.cpp
1 // ANNA - Anna is Not Nothingness Anymore                                                         //
2 //                                                                                                //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo                         //
4 //                                                                                                //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite                           //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
7
8
9 #include <sys/ioctl.h>
10 #include <sys/time.h>
11
12 #include <poll.h>
13
14 //#include <asm/ioctls.h>
15
16 #include <algorithm>
17
18 #include <anna/core/tracing/TraceMethod.hpp>
19 #include <anna/core/functions.hpp>
20
21 #include <anna/xml/Node.hpp>
22 #include <anna/xml/Attribute.hpp>
23
24 #include <anna/app/functions.hpp>
25
26 #include <anna/comm/ClientSocket.hpp>
27 #include <anna/comm/INetAddress.hpp>
28 #include <anna/comm/TransportFactory.hpp>
29 #include <anna/comm/Transport.hpp>
30 #include <anna/comm/Message.hpp>
31 #include <anna/comm/Network.hpp>
32 #include <anna/comm/Host.hpp>
33 #include <anna/comm/Server.hpp>
34 #include <anna/comm/Communicator.hpp>
35 #include <anna/comm/ReceiverFactory.hpp>
36 #include <anna/comm/Receiver.hpp>
37 #include <anna/comm/CongestionController.hpp>
38
39 #include <anna/comm/internal/Poll.hpp>
40
41 using namespace std;
42 using namespace anna;
43
44 // static
45 const Millisecond comm::ClientSocket::DefaultMaxConnectionDelay(200);
46 const Millisecond comm::ClientSocket::DefaultMaxWriteDelay(500);
47
48
49 void comm::ClientSocket::initialize()
50 throw() {
51    a_status = Status::None;
52    a_transport = NULL;
53    a_expectedSize = -1;
54    a_offset = 0;
55    a_cachedServer = NULL;
56    a_msMaxConnectionDelay = DefaultMaxConnectionDelay;
57    a_msMaxWriteDelay = DefaultMaxWriteDelay;
58    a_rcvBufferSize = -1;
59    a_receiver = NULL;
60 }
61
62 /*
63  * Se invoca desde el handler::ServerSocket y no hace falta protegerlo con una SCC
64  * porque ya está protegido por éste.
65  */
66 void comm::ClientSocket::setfd(const int fd)
67 throw(RuntimeException) {
68    Socket::a_fd = fd;
69    Socket::a_isBound = true;
70    activate(Status::Connected);
71    a_offset = 0;
72
73    if (Socket::a_type == Socket::Type::Stream) {
74       sockaddr_in addr;
75       socklen_t len(sizeof(addr));
76       Network& network(Network::instantiate());
77       anna_comm_socket_check(
78          getsockname(fd, (struct sockaddr *) &addr, &len),
79          "Cannot obtain socket name"
80       );
81       Socket::a_localAccessPoint = INetAddress(network.find(addr.sin_addr.s_addr), ntohs(addr.sin_port));
82       anna_comm_socket_check(
83          getpeername(fd, (struct sockaddr *) &addr, &len),
84          "Cannot obtain peer name"
85       );
86       a_remoteAccessPoint = INetAddress(network.find(addr.sin_addr.s_addr), ntohs(addr.sin_port));
87    }
88
89    setBlockingMode(false);
90    getSocketOptions();
91    LOGDEBUG(
92       string msg("comm::ClientSocket::setfd | ");
93       msg += asString();
94       Logger::debug(msg, ANNA_FILE_LOCATION);
95    );
96 }
97
98 /*
99  * Si se establece el receptor directamente, sin tener una factoria intermedia, hay que
100  * invocar al metodo comm::Receiver::initialize.
101  */
102 void comm::ClientSocket::setReceiver(comm::Receiver* receiver)
103 throw(RuntimeException) {
104    if (a_receiver && Socket::a_receiverFactory)
105       Socket::a_receiverFactory->release(a_receiver);
106
107    a_receiver = receiver;
108
109    if (a_receiverFactory == NULL)
110       a_receiver->initialize();
111 }
112
113 // Protegido desde el Socket::close
114 void comm::ClientSocket::do_close()
115 throw() {
116    Socket::do_close();
117
118    if (a_transport != NULL) {
119       a_transport->clear();
120       Guard guard(Socket::a_transportFactory, typeid(Socket::a_transportFactory).name());
121       Socket::a_transportFactory->release(a_transport);
122       a_transport = NULL;
123    }
124
125    a_status = Status::None;
126    a_expectedSize = -1;
127    a_data.clear();
128    a_cachedServer = NULL;
129    a_offset = 0;
130
131    if (a_receiver) {
132       if (Socket::a_receiverFactory)
133          Socket::a_receiverFactory->release(a_receiver);
134
135       a_receiver = NULL;
136    }
137 }
138
139 comm::Server* comm::ClientSocket::getServer()
140 throw(RuntimeException) {
141    Guard guard(*this, "comm::ClientSocket (getServer)");
142    const INetAddress& remoteAddress = getRemoteAccessPoint().getINetAddress();
143    const Device* device = remoteAddress.getDevice(false);
144    const int remotePort = remoteAddress.getPort();
145
146    if (device == NULL)
147       return NULL;
148
149    if (a_cachedServer != NULL)
150       if (a_cachedServer->getRemotePort() == remotePort && a_cachedServer->getHost()->contains(device))
151          return a_cachedServer;
152
153    Network& network = Network::instantiate();
154    comm::Server* result = NULL;
155    comm::Server* aux;
156
157    for (Network::host_iterator ii = network.host_begin(), maxii = network.host_end(); ii != maxii; ii ++) {
158       if (Network::host(ii)->contains(device) == false)
159          continue;
160
161       if ((aux = Network::host(ii)->find_server(remotePort)) == NULL)
162          continue;
163
164       result = aux;
165       break;
166    }
167
168    return (a_cachedServer = result);
169 }
170
171 //_________________________________________________________________________________
172 // (1) Si termina el periodo de espera sin devolver ninguna actividad => podemos considerar que
173 // se ha conectado. Es posible que haya algún componente hardware que no permita hacer
174 // la conexión.
175 //_________________________________________________________________________________
176 void comm::ClientSocket::connect()
177 throw(RuntimeException) {
178    Guard guard(*this, "comm::ClientSocket (connect)");
179    anna_socket_assert(isConnected() == true, "Already connected");
180
181    if (Socket::isOpened() == false) {
182       Socket::open();
183       getSocketOptions();
184       setBlockingMode(false);
185    }
186
187    if (Socket::a_localAccessPoint.isNull() == false && Socket::isBound() == false)
188       Socket::bind();
189
190    sockaddr* s(NULL);
191    int len(0);
192    a_remoteAccessPoint.translate(*this, s, len);
193    a_cachedServer = NULL;
194    const Millisecond msinit = functions::millisecond();
195    Millisecond msnow;
196    int xerrno;
197    int r;
198    int tryCounter(0);
199    pollfd waiting;
200    msnow = msinit;
201    waiting.fd = Socket::a_fd;
202    waiting.events = POLLOUT;
203
204    while ((msnow - msinit) <= a_msMaxConnectionDelay) {
205       tryCounter ++;
206
207       try {
208          if ((r = do_connect(s, len)) != -1)
209             break;
210       } catch (RuntimeException&) {
211          close();
212          throw;
213       }
214
215       if ((xerrno = errno) == EISCONN) {
216          r = 0;
217          break;
218       }
219
220       if (xerrno != EINPROGRESS && xerrno != EALREADY)
221          break;
222
223       anna_signal_shield(r, poll(&waiting, 1, a_msMaxConnectionDelay));
224
225       if (r == 0) {                                                                           // (1)
226          r = -1;
227          xerrno = ETIMEDOUT;
228       }
229
230       msnow = functions::millisecond();
231    }
232
233    if (r == -1) {
234       std::string msg("comm::ClientSocket::connect | ");
235       msg += asString();
236       msg += " | N-loop: ";
237       msg += functions::asString(tryCounter);
238       msg += " | Wait: ";
239       msg += functions::asString(msnow - msinit);
240       msg += " ms";
241       close();
242       throw RuntimeException(msg, xerrno, ANNA_FILE_LOCATION);
243    }
244
245    activate(Status::Connected);
246    a_offset = 0;
247    LOGDEBUG(
248       string msg("comm::ClientSocket::connect | ");
249       msg += asString();
250       Logger::debug(msg, ANNA_FILE_LOCATION);
251    );
252 }
253
254 void comm::ClientSocket::send(comm::Message& message)
255 throw(RuntimeException) {
256    Guard guard(*this, "comm::ClientSocket (send)");
257    do_write(getTransport()->code(message));
258 }
259
260 void comm::ClientSocket::send(comm::Message* message)
261 throw(RuntimeException) {
262    if (message == NULL)
263       throw RuntimeException("comm::ClientSocket::send | Cannot send a NULL message", ANNA_FILE_LOCATION);
264
265    send(*message);
266 }
267
268 //_______________________________________________________________________
269 // Si el clientsocket no esta marcado como "en proceso", es decir, todavia estamos
270 // recogiendo mensajes de el, se cierra directamente, en otro cpero aso se marca
271 // como pendiente de cerrar y cuando se recogan todos los mensajes que contiene
272 // su buffer, se cerrará.
273 //_______________________________________________________________________
274 void comm::ClientSocket::requestClose()
275 throw() {
276    Communicator* communicator = app::functions::component <Communicator> (ANNA_FILE_LOCATION);
277    a_status |= Status::ClosePending;
278    // Sólo tiene efecto real en modo ST
279    communicator->notifyPendingClose();
280    /*
281       else {
282          communicator->detach (this);
283       }
284    */
285 }
286
287 //---------------------------------------------------------------------------------------
288 // Cuando el communicator detecta actividad en el socket invoca a este metodo.
289 // (1) Si devuelve 0 es porque han borrado el extremo remoto.
290 // (2) Si el buffer intermedio tiene mas de 4 veces el numero de bytes planificado =>
291 // lo cerramos independientemente del tamanio esperado.
292 // (3) Si el buffer tiene un numero de bytes mayor al planificiado y todavia no ha
293 // detectado ningun mensaje => lo cerramos.
294 //
295 // Se invoca desde un entorno protegido, por lo que no hace falta SSCC.
296 //---------------------------------------------------------------------------------------
297 comm::Socket::Notify::_v comm::ClientSocket::receive()
298 throw(RuntimeException) {
299    Notify::_v result;
300
301    if (a_reader.getMaxSize() == 0) {
302       string msg(asString());
303       msg += " | I/O buffer length was not established";
304       throw RuntimeException(msg, ANNA_FILE_LOCATION);
305    }
306
307    const char* buffer(a_reader.getData());
308
309    const int r(do_read(buffer, a_reader.getMaxSize()));
310
311    a_buffer.setup(buffer, r);
312
313    // Si ha leido el máximo permitido, entonces obtiene los bytes que quedan por leer en el sistema I/O.
314    // Si ha leído menos del máximo, entonces no queda nada en el sistema I/O
315    if (r == a_reader.getMaxSize()) {
316       if (ioctl(a_fd, FIONREAD, &a_pendingBytes.bytesToRead) != 0)
317          a_pendingBytes.bytesToRead = 0;
318    } else
319       a_pendingBytes.bytesToRead = 0;
320
321    LOGDEBUG(
322       string msg("comm::ClientSocket::receive | ");
323       msg += asString();
324       msg += " | Pending: ";
325       msg += anna::functions::asString(a_data, 24);
326       msg += functions::asText(" | Received N-bytes: ", a_buffer.getSize());
327       msg += ", Buffer: ";
328       msg += anna::functions::asString(a_buffer, 24);
329       Logger::debug(msg, ANNA_FILE_LOCATION);
330    );
331
332    if (r > 0) {                                                              // (1)
333       if (getIgnoreIncomingMessages() == true) {
334          app::functions::component <Communicator> (ANNA_FILE_LOCATION)->eventIgnoreBurst(*this, a_buffer);
335          forgot();
336          LOGDEBUG(
337             string msg("comm::ClientSocket::receive | ");
338             msg += asString();
339             msg += " | Ignoring incoming messages";
340             Logger::debug(msg, ANNA_FILE_LOCATION);
341          );
342          return Notify::None;
343       }
344
345       result = Notify::ReceiveData;
346       a_data += a_buffer;
347       const int overQuotaSize(getTransport()->getOverQuotaSize());
348
349       if (a_expectedSize == -1)
350          calculeExpectedSize(a_data);
351
352       if (a_expectedSize == -1 && getBufferSize() > overQuotaSize) {                         // (3)
353          string msg(asString());
354          msg += functions::asText(" | Closed local point due to excessive memory consumption: N-Bytes: ", r);
355          msg += functions::asText("/BufferSize: ", getBufferSize());
356          msg += functions::asText("/OverQuotaSize: ", overQuotaSize);
357          msg += ')';
358          Logger::error(msg, ANNA_FILE_LOCATION);
359          app::functions::component <Communicator> (ANNA_FILE_LOCATION)->eventOverQuota(*this);
360          a_data.clear();
361          a_offset = 0;
362          result = Notify::Close;
363       } else if (isCorrupt() == true)
364          result = Notify::Corrupt;
365    } else
366       result = Notify::Close;
367
368    return result;
369 }
370
371 comm::Socket::Notify::_v comm::ClientSocket::wait(const Millisecond &timeout, const bool _receive)
372 throw(RuntimeException) {
373    Guard guard(*this, "comm::ClientSocket (wait)");
374
375    if (Socket::isOpened() == false) {
376       string msg("comm::ClientSocket::wait | ");
377       msg += asString();
378       msg += " | Is not opened";
379       throw RuntimeException(msg, ANNA_FILE_LOCATION);
380    }
381
382    Poll poll;
383    poll.setTimeout(timeout);
384    poll.insert(a_fd);
385    poll.waitMessage();
386    Notify::_v result = (poll.fetch() != -1) ? ((_receive == true) ? receive() : Notify::ReceiveData) : Notify::None;
387    LOGDEBUG(
388       string msg("comm::ClientSocket::wait | Timeout: ");
389       msg += functions::asString(timeout);
390       msg += functions::asText(" ms | Receive: ", _receive);
391       msg += functions::asText(" | fd: ", getfd());
392       msg += " | Result: ";
393       msg += Socket::asText(result);
394       Logger::debug(msg, ANNA_FILE_LOCATION);
395    );
396    return result;
397 }
398
399 //---------------------------------------------------------------------------------------
400 // Cuando el ClientSocket notifica que ha recibido datos, el communicator invoca a este
401 // metodo por si hubiera algn mensaje completo.
402 //
403 // (1) Si todavia no conoce el tamao esperado del mensaje => intenta calcularlo.
404 // (2) Si ya conoce la longitud y en el buffer recibido quedan , al menos, ese nmero
405 // de bytes => hay al menos un mensaje completo. Lo procesa y deja todo preparado
406 // para la proxima invocacion de este metodo.
407 // (3) Todavia no tenemos un mensajes completo, lo guardamos y esperamos a que se vuelva
408 // a leer lo que falta desde 'ClientSocket::receive'.
409 // (4) Si nos quedan bytes por analizar pero no tenemos suficientes para calcular
410 // la longitud esperada => prepara el a_data para recibir el siguiente bloque.
411 //---------------------------------------------------------------------------------------
412 const DataBlock* comm::ClientSocket::fetch()
413 throw(RuntimeException) {
414    if (a_status & Status::ClosePending)
415       return NULL;
416
417    const int remainingSize(a_data.getSize() - a_offset);
418
419    if (remainingSize <= 0) {
420       a_data.clear();
421       a_offset = 0;
422       return NULL;
423    }
424
425    const DataBlock* result(NULL);
426
427    a_buffer.setup(a_data.getData() + a_offset, remainingSize);
428
429    if (a_expectedSize == -1)                                               // (1)
430       calculeExpectedSize(a_buffer);
431
432    if (isCorrupt() == true)
433       return NULL;
434
435    if (a_expectedSize != -1) {
436       if (a_expectedSize <= remainingSize) {                               // (2)
437          a_buffer.resize(a_expectedSize);
438          result = &a_buffer;
439          a_offset += a_expectedSize;
440          a_expectedSize = -1;
441       }
442
443       // (3)
444    }
445
446    // (4)
447
448    if (result == NULL) {
449       if (a_offset > 0) {
450          if (a_offset >= a_data.getSize())
451             a_data.clear();
452          else
453             a_data.remove(a_offset);
454
455          a_offset = 0;
456       }
457
458       LOGDEBUG(
459          string msg("comm::ClientSocket::fetch | ");
460          msg += asString();
461          msg += functions::asText(" | N-bytes: ", a_data.getSize());
462          msg += " | Pending: ";
463          msg += anna::functions::asString(a_data, 24);
464          Logger::debug(msg, ANNA_FILE_LOCATION);
465       );
466    }
467
468    return result;
469 }
470
471 void comm::ClientSocket::calculeExpectedSize(const DataBlock& data)
472 throw() {
473    try {
474       if ((a_expectedSize = getTransport()->calculeSize(data)) != -1) {
475          LOGDEBUG(
476             string msg("comm::ClientSocket::calculeExpectedSize | ");
477             msg += asString();
478             Logger::debug(msg, ANNA_FILE_LOCATION)
479          );
480       }
481
482       if (a_expectedSize < -1) {
483          activate(Status::Corrupt);
484          string msg(asString());
485          msg += " | Invalid expected size";
486          Logger::error(msg, ANNA_FILE_LOCATION);
487       }
488    } catch (RuntimeException& ex) {
489       ex.trace();
490       activate(Status::Corrupt);
491    }
492 }
493
494 comm::Transport* comm::ClientSocket::reserveTransport()
495 throw(RuntimeException) {
496    if (a_transport == NULL) {
497       if (Socket::a_transportFactory == NULL) {
498          string msg(asString());
499          msg += " | Transport factory was not especified";
500          throw RuntimeException(msg, ANNA_FILE_LOCATION);
501       }
502
503       Guard guard(this, "comm::ClientSocket::reserveTransport");
504
505       if (a_transport == NULL) {
506          Guard guard(Socket::a_transportFactory, typeid(Socket::a_transportFactory).name());
507          a_transport = Socket::a_transportFactory->create();
508          a_transport->clear();
509       }
510    }
511
512    return a_transport;
513 }
514
515 comm::Transport* comm::ClientSocket::unsafe_reserveTransport()
516 throw(RuntimeException) {
517    if (a_transport == NULL) {
518       if (Socket::a_transportFactory == NULL) {
519          string msg(asString());
520          msg += " | Transport factory was not especified";
521          throw RuntimeException(msg, ANNA_FILE_LOCATION);
522       }
523
524       a_transport = Socket::a_transportFactory->create();
525       a_transport->clear();
526    }
527
528    return a_transport;
529 }
530
531 comm::Receiver* comm::ClientSocket::reserveReceiver()
532 throw(RuntimeException) {
533    if (a_receiver == NULL && Socket::a_receiverFactory != NULL) {
534       Guard guard(this, "comm::ClientSocket::reserveReceiver");
535
536       // NO hace falta proteger porque ya lo hace su 'create'
537       if (a_receiver == NULL)
538          a_receiver = Socket::a_receiverFactory->create();
539    }
540
541    return a_receiver;
542 }
543
544 // Este metodo se sobre-escribira en commsec::ClientSocket para establecer la conexion mediante las
545 // funciones SSH
546 int comm::ClientSocket::do_connect(const sockaddr* s, const  int len)
547 throw(RuntimeException) {
548    int r;
549    anna_signal_shield(r, ::connect(a_fd, s, len));
550    return r;
551 }
552
553 //----------------------------------------------------------------------------------------------------
554 // Este metodo se sobre-escribe en comm::DatagramSocket
555 //
556 // (1) si no ha sido capaz de escribir todo el mensaje tenemos que esperar a que el bloque anterior
557 // haya salido, ya que si intentamos escribir de forma contienua habra un momento que obtengamos
558 // un error 'Resource temporarily unavailable'.
559 // (2) La cola de salida esta llena momentaneamente, esperamos unos milisegundos para volver a
560 // tratar de enviar.
561 // (3) Obtenmos el error "Broken Pipe" ya solo se puede cerrar el socket y tratar de comenzar
562 // (4) Si envió un trozo, pero no pudo enviar el mensaje completo => cierra el socket para evitar
563 //     problemas de interpretación en el extremo remoto.
564 //----------------------------------------------------------------------------------------------------
565 void comm::ClientSocket::do_write(const DataBlock& message)
566 throw(RuntimeException) {
567    int size = message.getSize();
568    const char* data =  message.getData();
569    int r(0);
570    int nloop(0);
571    int cx(0);
572    bool retry = true;
573    bool isok = false;
574    int xerrno;
575    bool sendSomething = false;
576
577    // Si ha enviado parte del mensaje => tiene que enviarlo completo
578    while (retry == true && cx < 5) {
579       anna_signal_shield(r, write(Socket::a_fd, data, size));
580
581       if (r == size) {
582          isok = true;
583          break;
584       }
585
586       xerrno = errno;
587
588       if (r < 0) {
589          if (xerrno == EAGAIN) {                              // (2)
590             cx ++;
591             pollfd waiting;
592             waiting.fd = Socket::a_fd;
593             waiting.events = POLLOUT;
594             anna_signal_shield(r, poll(&waiting, 1, a_msMaxWriteDelay));
595          } else {
596             // Si hay un error definitivo deja de intentarlo y lanza la excepción.
597             // Si se ha enviado algo supondremos que el extremo remoto es capaz de recuperarse
598             retry = false;
599          }
600       } else  { /* (r < left) */
601          nloop ++;
602          data += r;
603          size -= r;
604          cx = 0;
605          sendSomething = true;
606       }
607    }
608
609    if (isok == false) {
610       if (xerrno == EPIPE || sendSomething == true) {                               // (3)(4)
611 //         activate (Status::ClosePending);
612          requestClose();
613          r = shutdown(getfd(), SHUT_WR);
614       }
615
616       string msg("comm::ClientSocket::do_write | fd: ");
617       msg += functions::asString(getfd());
618       throw RuntimeException(msg, xerrno, ANNA_FILE_LOCATION);
619    }
620
621    LOGDEBUG(
622       string msg("comm::ClientSocket::do_write | ");
623       msg += asString();
624       msg += functions::asText(" | N-Loop: ", nloop);
625       msg += functions::asText(" | Sent: ", message);
626       Logger::debug(msg, ANNA_FILE_LOCATION);
627    );
628 }
629
630 //----------------------------------------------------------------------------
631 // (1) Cuando intentamos leer del socket asociado a un servidor que ha caido
632 //     no devuelve 0 (como cuando se cae el socket de un cliente) sino que
633 //     devuelve el error (ECONNRESET = 131).
634 //
635 // Devuelve 0 para indicar que el Socket se ha cerrado en el extremo remoto
636 //----------------------------------------------------------------------------
637 int comm::ClientSocket::do_read(const char* data, const int maxSize)
638 throw(RuntimeException) {
639    int result;
640
641    do {
642       if ((result = ::read(Socket::a_fd, (void*) data, maxSize)) < 0) {
643          if (errno == EINTR)
644             continue;
645
646          if (errno == ECONNRESET) { // (1)
647             result = 0;
648             break;
649          }
650
651          string msg(asString());
652          msg += " | Cannot receive";
653          throw RuntimeException(msg, errno, ANNA_FILE_LOCATION);
654       }
655    } while (errno == EINTR && result < 0);
656
657    return result;
658 }
659
660 void comm::ClientSocket::forgot()
661 throw() {
662    if (a_transport != NULL)
663       a_transport->clear();
664
665    deactivate(Status::Corrupt);
666    a_expectedSize = -1;
667    a_data.clear();
668    a_offset = 0;
669 }
670
671 //--------------------------------------------------------------------------------------
672 // (1) Recupera el tamano para verificar que se ha podido establecer la solicitud.
673 //--------------------------------------------------------------------------------------
674 void comm::ClientSocket::getSocketOptions()
675 throw(RuntimeException) {
676    if ((a_rcvBufferSize = comm::Communicator::getReceivingChunkSize()) == -1) {
677       socklen_t l = sizeof(int);
678       int fd = Socket::getfd();
679       anna_comm_socket_check(
680          getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &a_rcvBufferSize, &l),
681          "Cannot obtain SO_RCVBUF"
682       );
683    }
684
685    a_reader.allocate(a_rcvBufferSize);
686 }
687
688 int comm::ClientSocket::getTotalPendingBytes() const
689 throw() {
690    // El nº de bytes pendientes de procesar será lo que queda en la cola I/O + los bytes que tenemos cargados en memoria
691    // los bytes pendientes se obtiene al leer el chunk
692    return a_pendingBytes.bytesToRead + getBufferSize();
693 }
694
695
696 string comm::ClientSocket::asString() const
697 throw() {
698    string msg("comm::ClientSocket { ");
699
700    if (this == NULL)
701       return msg += " <freed> }";
702
703    msg += Socket::asString();
704    msg += functions::asText(" | RcvBufferSize: ", a_rcvBufferSize);
705    msg += " bytes | Status: ";
706    msg += Status::asString(a_status);
707    msg += functions::asString("| MaxConDelay: %d ms", a_msMaxConnectionDelay.getValue());
708
709    if (a_transport)
710       msg += functions::asText(" | OverQuotaSize: ", a_transport->getOverQuotaSize());
711
712    msg += functions::asString(" | Reserved: %d | Pending: %d | Offset: %d | ExpectedSize: %d", a_data.getSize(), getBufferSize(), a_offset, a_expectedSize);
713    msg += " | Remote access point: ";
714    a_remoteAccessPoint.asString(msg);
715    return msg += " }";
716 }
717
718 xml::Node* comm::ClientSocket::asXML(xml::Node* parent) const
719 throw(RuntimeException) {
720    xml::Node* clientSocket = parent->createChild("comm.ClientSocket");
721
722    if (this == NULL) {
723       clientSocket->createAttribute("Freed", "yes");
724       return clientSocket;
725    }
726
727    Socket::asXML(clientSocket);
728    clientSocket->createAttribute("Status", Status::asString(a_status));
729    clientSocket->createAttribute("RcvBufferSize", a_rcvBufferSize);
730    clientSocket->createAttribute("MaxConnDelay", a_msMaxConnectionDelay);
731    clientSocket->createAttribute("IgnoreIncomingMessages", functions::asString(a_ignoreIncomingMessages));
732
733    if (a_transport)
734       clientSocket->createAttribute("OverQuotaSize", a_transport->getOverQuotaSize());
735
736    xml::Node* buffer = clientSocket->createChild("Buffer");
737    buffer->createAttribute("Reserved", a_data.getSize());
738    buffer->createAttribute("Pending", getTotalPendingBytes());
739    a_remoteAccessPoint.asXML("comm.RemotePoint", clientSocket);
740
741    if (a_receiver)
742       a_receiver->asXML(clientSocket);
743
744    return clientSocket;
745 }
746
747 string comm::ClientSocket::Status::asString(const int status)
748 throw() {
749    string result;
750
751    if (status == 0)
752       result = "None ";
753    else {
754       if (status & Status::Connected)
755          result = "Connected ";
756
757       if (status & Status::Corrupt)
758          result += "Corrupt ";
759
760       if (status & Status::ClosePending)
761          result += "ClosePending ";
762
763       if (status & Status::Working)
764          result += "Working ";
765    }
766
767    return result;
768 }
769
770