bug in RC
[anna.git] / include / anna / comm / ClientSocket.hpp
1 // ANNA - Anna is Not Nothingness Anymore                                                         //
2 //                                                                                                //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo                         //
4 //                                                                                                //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite                           //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
7
8
9 #ifndef anna_comm_ClientSocket_hpp
10 #define anna_comm_ClientSocket_hpp
11
12 #include <anna/comm/Socket.hpp>
13 #include <anna/comm/Buffer.hpp>
14 #include <anna/comm/Transport.hpp>
15 #include <anna/core/util/Millisecond.hpp>
16
17 namespace anna {
18
19 namespace comm {
20
21 class Communicator;
22 class DatagramSocket;
23 class Transport;
24 class Message;
25 class Server;
26 class CongestionController;
27 class Receiver;
28
29 namespace handler {
30 class MetaClientSocket;
31 class DatagramSocket;
32 }
33
34 /**
35    Implements client sockets (aka @ref Socket) established by our application with a remote endpoint
36    where a ServerSocket is accepting connection requests.
37 */
38 class ClientSocket : public Socket {
39 public:
40   /**
41      Numero de milisegundos por defecto que espera antes de dar por fallida una conexion con
42      otro proceso servidor.
43   */
44   static const Millisecond DefaultMaxConnectionDelay;
45
46   /**
47      Numero de milisegundos por defecto que queda bloqueado un proceso a la espera de poder
48      escribir en un socket cuyo buffer de salida esta lleno.
49   */
50   static const Millisecond DefaultMaxWriteDelay;
51
52   /**
53      Crea un socket cliente liberado.
54      \param transportFactory factoria de protocolos de transporte a usar por este sockets.
55      \param domain Dominio del socket.
56      \param type Tipo de socket.
57      \warning La factoria de protocolos debe estar disponible mientras el Socket esta activo.
58   */
59   ClientSocket(TransportFactory* transportFactory = NULL, Domain::_v domain = Socket::Domain::Inet, Type::_v type = Socket::Type::Stream) :
60     Socket(domain, type, transportFactory),
61     a_data(true),
62     a_reader(true),
63     a_ignoreIncomingMessages(false) {
64     initialize();
65   }
66
67   /**
68      Crea un socket cliente que sera conectado remotamente a la direccion y puerto indicado.
69
70      \param remoteAddress direccion de red remota a la que conectar.
71      \param transportFactory factoria de protocolos de transporte a usar por este sockets.
72      \param type Tipo de socket.
73      \warning La factoria de protocolos debe estar disponible mientras el Socket esta activo.
74   */
75   ClientSocket(const INetAddress& remoteAddress, TransportFactory* transportFactory = NULL, const Type::_v type = Socket::Type::Stream) :
76     Socket(Socket::Domain::Inet, type, transportFactory),
77     a_remoteAccessPoint(remoteAddress),
78     a_data(true),
79     a_reader(true),
80     a_ignoreIncomingMessages(false) {
81     initialize();
82   }
83
84   /**
85      Crea un socket cliente que sera conectado al archivo indicado.
86
87      \param path Ruta del archivo que vamos a usar para transferir datos ataves de este socket.
88      \param type Tipo de socket.
89   */
90   ClientSocket(const std::string& path, const Type::_v type = Socket::Type::Stream) :
91     Socket(path, type),
92     a_remoteAccessPoint(path),
93     a_data(true),
94     a_reader(true),
95     a_ignoreIncomingMessages(false) {
96     initialize();
97   }
98
99   /**
100      Crea un socket cliente conectado al servidor indicado por la direccion y puerto remoto. Ademas el socket cliente
101      seria conectado localmente (ver @ref Socket::bind) a la direccion y puerto locales indicados.
102
103      \param transportFactory factoria de protocolos de transporte a usar por este sockets.
104      \param remoteAddress direccion del servidor.
105      \param localAddress Puede ser usado para limitar la direccion por la que atendiende peticiones un servidor de socket
106      instalado en una maquina con mas de una direccion.
107      compartido por mas de un proceso activo.
108      \param type Tipo de socket.
109      \warning La factoria de protocolos debe estar disponible mientras el Socket esta activo.
110   */
111   ClientSocket(const INetAddress& remoteAddress,  const INetAddress& localAddress, TransportFactory* transportFactory = NULL, const Type::_v type = Socket::Type::Stream) :
112     Socket(localAddress, type, transportFactory),
113     a_remoteAccessPoint(remoteAddress),
114     a_data(true),
115     a_reader(true),
116     a_ignoreIncomingMessages(false) {
117     initialize();
118   }
119
120   /**
121      Destructor.
122   */
123   virtual ~ClientSocket() { close(); }
124
125   /**
126      Devuelve la direccion remota del socket.
127      \return La direccion remota del socket.
128   */
129   const AccessPoint& getRemoteAccessPoint() const throw() { return a_remoteAccessPoint; }
130
131   /**
132      Devuelve el estado de conexion de este socket.
133      \return \em true si el socket mantiene la conexion realizada mediante el metodo #connect o
134      \em false en otro caso.
135   */
136   bool isConnected() const throw() { return (a_status & Status::Connected) != 0; }
137
138   /**
139      Devuelve el estado del contenido del socket.
140      \return \em true si el socket mantiene el sincronismo o \em false en otro caso.
141   */
142   bool isSynchronized() const throw() { return (a_status & Status::Corrupt) == 0; }
143
144   /**
145      Devuelve el estado del contenido del socket.
146      \return \em true si el socket NO mantiene el sincronismo o \em false en otro caso.
147   */
148   bool isCorrupt() const throw() { return (a_status & Status::Corrupt); }
149
150   /**
151    * Devuelve el estado de cierre del socket.
152    * \return \em true si el socket está a la espera de ser cerrado o \em false en otro caso.
153    */
154   bool isClosedPending() const throw() { return (a_status & Status::ClosePending) != 0; }
155
156   /**
157      Devuelve la instancia de anna::comm::Server asociado a una conexion remota iniciada por
158      nuestra aplicacion.
159      \return la instancia de anna::comm::Server asociado a una conexion remota iniciada por
160      nuestra aplicacion.
161      \warning Puede ser NULL en caso de que este ClientSocket corresponda a una conexion local
162      establecida por un proceso remoto contra un ServerSocket de nuestra aplicacion.
163   */
164   Server* getServer() throw(RuntimeException);
165
166   /**
167      Devuelve el numero maximo de milisegundos esperados para obtener conexion al invocar
168      al metodo #connect.
169      \return el numero maximo de milisegundos esperados para obtener conexion al invocar
170      al metodo #connect.
171   */
172   const Millisecond &getMaxConnectionDelay() const throw() { return a_msMaxConnectionDelay; }
173
174   /**
175      Devuelve el numero maximo de milisegundos que queda bloqueado el proceso/thread a la espera
176      de escribir en un socket cuyo buffer de salida esta lleno.
177      \return Devuelve el numero maximo de milisegundos que queda bloqueado el proceso/thread a la espera
178      de escribir en un socket cuyo buffer de salida esta lleno.
179   */
180   const Millisecond &getMaxWriteDelay() const throw() { return a_msMaxWriteDelay; }
181
182   /**
183      Obtiene toda la informacion referente a este socket a partir de la conexion realizada atraves del \em fd
184      recibido como parametro.
185      \param fd File descriptor correspondiente a una conexion local.
186      \warning Exclusivamente uso interno.
187   */
188   virtual void setfd(const int fd) throw(RuntimeException);
189
190   /**
191      Establece el numero maximo de milisegundos esperados para obtener la conexion al
192      invocar al metodo #connect.
193      \param msMaxConnectionDelay Numero de milisegundos esperados para obtener conexion.
194
195      \see anna::comm::Server::setMaxConnectionDelay.
196   */
197   void setMaxConnectionDelay(const Millisecond &msMaxConnectionDelay)
198   throw() {
199     a_msMaxConnectionDelay = msMaxConnectionDelay;
200   }
201
202   /**
203      Establece el numero maximo de milisegundos que queda bloqueado el proceso/thread a la espera
204      de escribir en un socket cuyo buffer de salida esta lleno.
205
206      \param msMaxWriteDelay Numero de milisegundos esperados en caso de que el buffer del socket se llene.
207   */
208   void setMaxWriteDelay(const Millisecond &msMaxWriteDelay) throw() { a_msMaxWriteDelay = msMaxWriteDelay; }
209
210   /**
211    * Devuelve \em true si el indicador que ignora los mensajes entrantes está activo, o \em false en otro caso.
212    * \return \em true si el indicador que ignora los mensajes entrantes está activo, o \em false en otro caso.
213    */
214   bool getIgnoreIncomingMessages() const throw() { return a_ignoreIncomingMessages; }
215
216   /**
217    * Establece el indicador que provoca ignorar los mensajes entrantes.
218    * \param ignoreIncomingMessages \em true si el indicador que ignora los mensajes entrantes está activo, o \em false en otro caso.
219    */
220   void setIgnoreIncomingMessages(const bool ignoreIncomingMessages) throw() { a_ignoreIncomingMessages = ignoreIncomingMessages; }
221
222   /**
223      Intenta la conexion remota con la direccion indicada en el constructor.
224   */
225   virtual void connect() throw(RuntimeException);
226
227   /**
228      Envia el mensaje recibido como parametro. Al bloque de datos correspondiente a la
229      codificacion del mensaje se incorpora la informacion necesaria para el protocolo
230      de la capa de transporte indicado en el constuctor.
231
232      \param message Mensaje que vamos codificar para enviar a la capa de transporte.
233   */
234   void send(Message& message) throw(RuntimeException);
235
236   /**
237      Envia el mensaje recibido como parametro. Al bloque de datos correspondiente a la
238      codificacion del mensaje se incorpora la informacion necesaria para el protocolo
239      de la capa de transporte indicado en el constuctor.
240
241      \param message Mensaje que vamos codificar para enviar a la capa de transporte.
242   */
243   void send(Message* message) throw(RuntimeException);
244
245   /**
246      Espera la llegada de mensajes por este ClientSocket durante un numero de milisegundos
247      recibido como parametro o hasta que llegue un mensaje. Un ejemplo de uso, que deberia
248      ser completado con el control de condiciones de error podria ser:
249
250      \code
251
252      void f (anna::ClientSocket* clientSocket)
253         throw (anna::RuntimeException)
254      {
255         anna::Guard (clientSocket);
256
257         if (clientSocket->wait (2000) == ClientSocket::Notify::ReceiveData) {
258            const Message* data;
259
260            while ((data = clientSocket->fetch ()) != NULL) {
261               ...
262               ..... procesa los datos ....
263               ...
264            }
265         }
266      }
267
268      \endcode
269
270      \param timeout Numero de milisegundos en que va a quedar bloqueado a la espera de
271      obtener el primer mensaje.
272      \param receive Un valor \em true indica que se tratara el mensaje detectado, un \em false
273      indica que solo esperara la llegada de un bloque de datos, pero no se procesara en absoluto, solo
274      devolvera Notify::ReceiveData.
275
276      \return El resultado de la espera. En caso de no haber recibido ningun mensaje
277      devolvera comm::Socket::Notify::None.
278
279      \warning \li El hilo de ejecucion que invoque a este metodo queda bloqueado durante el
280      tiempo que indica el 'timeout' o hasta que llegue un mensaje.
281      \li La invocacion al metodo #receive y al #fetch deben estar protegidas por la misma
282      seccion critica.
283   */
284   Notify::_v wait(const Millisecond &timeout, const bool receive = true) throw(RuntimeException);
285
286   /**
287      Elimina el contenido del buffer de recepcion.
288   */
289   void forgot() throw();
290
291   /**
292      Obtiene la capa de transporte asociada a este ClientSocket.
293      \return la capa de transporte asociada a este ClientSocket.
294   */
295   Transport* getTransport() throw(RuntimeException) {
296     return (a_transport != NULL) ? a_transport : reserveTransport();
297   }
298
299   /**
300      Obtiene el receptor asociado a este ClientSocket. Puede ser NULL
301      \return el receptor asociado a este ClientSocket.Puede ser NULL
302      \warning Exclusivamente uso interno.
303   */
304   Receiver* getReceiver() throw(RuntimeException) {
305     return (a_receiver != NULL) ? a_receiver : ((a_receiverFactory == NULL) ? NULL : reserveReceiver());
306   }
307
308   /**
309    * Establece el receptor externo que tratara los mensajes recibidos por este socket.
310    * Si el receptor se establece directamente por el usuario, sin tener una factoria de receptores
311    * intermedia, invoca directamente al metodo anna::comm::Receiver::initialize.
312    * \param receive La instancia del receptor externo que tratar los mensajes de este socket.
313    */
314   void setReceiver(Receiver* receive) throw(RuntimeException);
315
316   /**
317      Activa la solicitud de cierre del socket, que se llevara a cabo cuando el nucleo considere
318      que se puede realizar de forma segura, desde el punto de vista de la informacion bloqueada
319      por secciones criticas en los distintos componentes, que hacen uso del socket.
320   */
321   void requestClose() throw();
322
323   /**
324      Devuelve el estado de la solicitud de cierre del socket.
325      \return el estado de la solicitud de cierre del socket.
326   */
327   bool hasRequestedClose() const throw() { return (a_status & Status::ClosePending) != 0; }
328
329   /**
330      Devuelve una cadena con la informacin referente a este socket.
331      \return Una cadena con la informacin referente a este socket.
332   */
333   virtual std::string asString() const throw();
334
335   /**
336      Devuelve un nodo XML con la informacin referente a este objeto.
337      \param parent Nodo XML a partir del cual introducir la informacin.
338      \return Un nodo XML con la informacin referente a este objeto.
339   */
340   virtual xml::Node* asXML(xml::Node* parent) const throw(RuntimeException);
341
342   /**
343      Devuelve el nombre logico de esta clase.
344      \return el nombre logico de esta clase.
345   */
346   static const char* className() throw() { return "anna::comm::ClientSocket"; }
347
348 protected:
349   struct Status {
350     enum _v { None = 0, Connected = 1, Corrupt = 2, ClosePending = 4, Working = 8 };
351
352     static std::string asString(const int status) throw();
353   };
354
355   Transport* a_transport;
356   AccessPoint a_remoteAccessPoint;
357
358   /**
359      Devuelve el numero de bytes contenido en el buffer intermedio de recepcion.
360      \return El numero de bytes contenido en el buffer intermedio de recepcion.
361   */
362   int getBufferSize() const throw() { return a_data.getSize() - a_offset; }
363
364   /**
365      Devuelve el numero maximo de bytes que puede tener en el buffer intermedio de recepcion.
366      \return el numero maximo de bytes que puede tener en el buffer intermedio de recepcion.
367   */
368   int getReceiveBufferSize() const throw() { return a_rcvBufferSize; }
369
370   /**
371      Obtiene los parametros de funcionamiento del Socket.
372      \warning Exclusivamente uso interno.
373   */
374   void getSocketOptions() throw(RuntimeException);
375
376   /**
377      Recupera el ultimo mensaje recibido.
378      El invocador debera haber establecido la forma de asegurar que hay que hay datos
379       disponibles. Ademas debera establecer el tratamiento adecuado para cada uno
380      de los posibles resultados.
381      \return El resultado de la operacion.
382      \warning La invocacion al metodo #receive y al #fetch deben estar protegidas por la
383      misma seccion critica. Por ejemplo:
384
385      \code
386
387      void f (anna::ClientSocket* clientSocket)
388         throw (anna::RuntimeException)
389      {
390         anna::Guard (clientSocket);
391
392         if (clientSocket->receive () == ClientSocket::Notify::ReceiveData) {
393            const Message* data;
394
395            while ((data = clientSocket->fetch ()) != NULL) {
396               ...
397               ..... procesa los datos ....
398               ...
399            }
400         }
401      }
402
403      \endcode
404   */
405   Notify::_v receive() throw(RuntimeException);
406
407   /**
408      Recupera el ultimo bloque de datos recuperado mediante el metodo #receive o #wait.
409      El bloque de datos deberia ser interpretado segun las reglas de la capa de transporte
410      asociada a este ClientSocket.
411
412      \return El ultimo bloque recuperado mediante el metodo #receive o #wait. Puede ser NULL si no hay
413      disponible ningun bloque.
414
415      \warning Exclusivamente uso interno. Debe invocarse con una seccion
416      critica activa sobre el ClientSocket.
417   */
418   const DataBlock* fetch() throw(RuntimeException);
419
420   void activate(const Status::_v v) throw() { a_status |= v; }
421   void deactivate(const Status::_v v) throw() { a_status &= ~v; }
422   void deactivate(const int v) throw() { a_status &= ~v; }
423
424   virtual int do_connect(const sockaddr*, const int len) throw(RuntimeException);
425   virtual void do_write(const DataBlock&) throw(RuntimeException);
426   virtual int do_read(const char* data, const int size) throw(RuntimeException);
427   virtual void do_close() throw();
428
429 private:
430   struct PendingBytes {
431     Millisecond validUntil;
432     int bytesToRead;
433
434     PendingBytes() { validUntil = 0; bytesToRead = 0; }
435   };
436
437   //-----------------------------------------------------------------------------------------
438   // a_status : Combinacion de bit's que indica el estado de la instancia.
439   // a_expectedSize: Numero de bytes esperados. valdra -1 si todavia no hay informacion
440   //                 para calcularlo.
441   // a_data: Ultimo bloque de datos leido. Puede contener un numero indeterminado de mensajes
442   // a_buffer: Referencia al espacio que ocupa el mensaje que estamos tratanto. Apunta
443   // a un espacio dentro de 'a_data'-
444   // a_reader: Buffer usado para leer los datos del Socket.
445   // a_offset: Desplazamiento que hay que aplicar sobre los datos contenidos en 'a_data'.
446   // a_cachedServer: Ultimo server calculado en el metodo getServer.
447   //-----------------------------------------------------------------------------------------
448   int a_status;
449   int a_expectedSize;
450   DataBlock a_data;
451   Buffer a_buffer;
452   DataBlock a_reader;
453   int a_offset;
454   comm::Server* a_cachedServer;
455   Millisecond a_msMaxConnectionDelay;
456   Millisecond a_msMaxWriteDelay;
457   int a_rcvBufferSize;
458   Receiver* a_receiver;
459   mutable PendingBytes a_pendingBytes;
460   bool a_ignoreIncomingMessages;
461
462   void initialize() throw();
463   void calculeExpectedSize(const DataBlock&) throw();
464   Transport* reserveTransport() throw(RuntimeException);
465   Transport* unsafe_reserveTransport() throw(RuntimeException);
466   Receiver* reserveReceiver() throw(RuntimeException);
467   int getTotalPendingBytes() const throw();
468
469   friend class Communicator;
470   // unsafe_reserveTransport
471
472
473   friend class CongestionController;
474   friend class handler::MetaClientSocket;
475   friend class handler::DatagramSocket;
476 };
477
478 }
479 }
480
481 #endif
482