First commit
[anna.git] / include / anna / comm / ClientSocket.hpp
1 // ANNA - Anna is Not 'N' Anymore
2 //
3 // (c) Copyright 2005-2014 Eduardo Ramos Testillano & Francisco Ruiz Rayo
4 //
5 // https://bitbucket.org/testillano/anna
6 //
7 // Redistribution and use in source and binary forms, with or without
8 // modification, are permitted provided that the following conditions
9 // are met:
10 //
11 //     * Redistributions of source code must retain the above copyright
12 // notice, this list of conditions and the following disclaimer.
13 //     * Redistributions in binary form must reproduce the above
14 // copyright notice, this list of conditions and the following disclaimer
15 // in the documentation and/or other materials provided with the
16 // distribution.
17 //     * Neither the name of Google Inc. nor the names of its
18 // contributors may be used to endorse or promote products derived from
19 // this software without specific prior written permission.
20 //
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 //
33 // Authors: eduardo.ramos.testillano@gmail.com
34 //          cisco.tierra@gmail.com
35
36
37 #ifndef anna_comm_ClientSocket_hpp
38 #define anna_comm_ClientSocket_hpp
39
40 #include <anna/comm/Socket.hpp>
41 #include <anna/comm/Buffer.hpp>
42 #include <anna/comm/Transport.hpp>
43 #include <anna/core/util/Millisecond.hpp>
44
45 namespace anna {
46
47 namespace comm {
48
49 class Communicator;
50 class DatagramSocket;
51 class Transport;
52 class Message;
53 class Server;
54 class CongestionController;
55 class Receiver;
56
57 namespace handler {
58 class MetaClientSocket;
59 class DatagramSocket;
60 }
61
62 /**
63    Implementa los socket cliente (tambien conocidos como @ref Socket) que nuestra
64    aplicacion tiene establecidos con un punto remoto, donde habra un ServerSocket
65    aceptando peticiones de conexion.
66 */
67 class ClientSocket : public Socket {
68 public:
69   /**
70      Numero de milisegundos por defecto que espera antes de dar por fallida una conexion con
71      otro proceso servidor.
72   */
73   static const Millisecond DefaultMaxConnectionDelay;
74
75   /**
76      Numero de milisegundos por defecto que queda bloqueado un proceso a la espera de poder
77      escribir en un socket cuyo buffer de salida esta lleno.
78   */
79   static const Millisecond DefaultMaxWriteDelay;
80
81   /**
82      Crea un socket cliente liberado.
83      \param transportFactory factoria de protocolos de transporte a usar por este sockets.
84      \param domain Dominio del socket.
85      \param type Tipo de socket.
86      \warning La factoria de protocolos debe estar disponible mientras el Socket esta activo.
87   */
88   ClientSocket(TransportFactory* transportFactory = NULL, Domain::_v domain = Socket::Domain::Inet, Type::_v type = Socket::Type::Stream) :
89     Socket(domain, type, transportFactory),
90     a_data(true),
91     a_reader(true),
92     a_ignoreIncomingMessages(false) {
93     initialize();
94   }
95
96   /**
97      Crea un socket cliente que sera conectado remotamente a la direccion y puerto indicado.
98
99      \param remoteAddress direccion de red remota a la que conectar.
100      \param transportFactory factoria de protocolos de transporte a usar por este sockets.
101      \param type Tipo de socket.
102      \warning La factoria de protocolos debe estar disponible mientras el Socket esta activo.
103   */
104   ClientSocket(const INetAddress& remoteAddress, TransportFactory* transportFactory = NULL, const Type::_v type = Socket::Type::Stream) :
105     Socket(Socket::Domain::Inet, type, transportFactory),
106     a_remoteAccessPoint(remoteAddress),
107     a_data(true),
108     a_reader(true),
109     a_ignoreIncomingMessages(false) {
110     initialize();
111   }
112
113   /**
114      Crea un socket cliente que sera conectado al archivo indicado.
115
116      \param path Ruta del archivo que vamos a usar para transferir datos ataves de este socket.
117      \param type Tipo de socket.
118   */
119   ClientSocket(const std::string& path, const Type::_v type = Socket::Type::Stream) :
120     Socket(path, type),
121     a_remoteAccessPoint(path),
122     a_data(true),
123     a_reader(true),
124     a_ignoreIncomingMessages(false) {
125     initialize();
126   }
127
128   /**
129      Crea un socket cliente conectado al servidor indicado por la direccion y puerto remoto. Ademas el socket cliente
130      seria conectado localmente (ver @ref Socket::bind) a la direccion y puerto locales indicados.
131
132      \param transportFactory factoria de protocolos de transporte a usar por este sockets.
133      \param remoteAddress direccion del servidor.
134      \param localAddress Puede ser usado para limitar la direccion por la que atendiende peticiones un servidor de socket
135      instalado en una maquina con mas de una direccion.
136      compartido por mas de un proceso activo.
137      \param type Tipo de socket.
138      \warning La factoria de protocolos debe estar disponible mientras el Socket esta activo.
139   */
140   ClientSocket(const INetAddress& remoteAddress,  const INetAddress& localAddress, TransportFactory* transportFactory = NULL, const Type::_v type = Socket::Type::Stream) :
141     Socket(localAddress, type, transportFactory),
142     a_remoteAccessPoint(remoteAddress),
143     a_data(true),
144     a_reader(true),
145     a_ignoreIncomingMessages(false) {
146     initialize();
147   }
148
149   /**
150      Destructor.
151   */
152   virtual ~ClientSocket() { close(); }
153
154   /**
155      Devuelve la direccion remota del socket.
156      \return La direccion remota del socket.
157   */
158   const AccessPoint& getRemoteAccessPoint() const throw() { return a_remoteAccessPoint; }
159
160   /**
161      Devuelve el estado de conexion de este socket.
162      \return \em true si el socket mantiene la conexion realizada mediante el metodo #connect o
163      \em false en otro caso.
164   */
165   bool isConnected() const throw() { return (a_status & Status::Connected) != 0; }
166
167   /**
168      Devuelve el estado del contenido del socket.
169      \return \em true si el socket mantiene el sincronismo o \em false en otro caso.
170   */
171   bool isSynchronized() const throw() { return (a_status & Status::Corrupt) == 0; }
172
173   /**
174      Devuelve el estado del contenido del socket.
175      \return \em true si el socket NO mantiene el sincronismo o \em false en otro caso.
176   */
177   bool isCorrupt() const throw() { return (a_status & Status::Corrupt); }
178
179   /**
180    * Devuelve el estado de cierre del socket.
181    * \return \em true si el socket está a la espera de ser cerrado o \em false en otro caso.
182    */
183   bool isClosedPending() const throw() { return (a_status & Status::ClosePending) != 0; }
184
185   /**
186      Devuelve la instancia de anna::comm::Server asociado a una conexion remota iniciada por
187      nuestra aplicacion.
188      \return la instancia de anna::comm::Server asociado a una conexion remota iniciada por
189      nuestra aplicacion.
190      \warning Puede ser NULL en caso de que este ClientSocket corresponda a una conexion local
191      establecida por un proceso remoto contra un ServerSocket de nuestra aplicacion.
192   */
193   Server* getServer() throw(RuntimeException);
194
195   /**
196      Devuelve el numero maximo de milisegundos esperados para obtener conexion al invocar
197      al metodo #connect.
198      \return el numero maximo de milisegundos esperados para obtener conexion al invocar
199      al metodo #connect.
200   */
201   const Millisecond &getMaxConnectionDelay() const throw() { return a_msMaxConnectionDelay; }
202
203   /**
204      Devuelve el numero maximo de milisegundos que queda bloqueado el proceso/thread a la espera
205      de escribir en un socket cuyo buffer de salida esta lleno.
206      \return Devuelve el numero maximo de milisegundos que queda bloqueado el proceso/thread a la espera
207      de escribir en un socket cuyo buffer de salida esta lleno.
208   */
209   const Millisecond &getMaxWriteDelay() const throw() { return a_msMaxWriteDelay; }
210
211   /**
212      Obtiene toda la informacion referente a este socket a partir de la conexion realizada atraves del \em fd
213      recibido como parametro.
214      \param fd File descriptor correspondiente a una conexion local.
215      \warning Exclusivamente uso interno.
216   */
217   virtual void setfd(const int fd) throw(RuntimeException);
218
219   /**
220      Establece el numero maximo de milisegundos esperados para obtener la conexion al
221      invocar al metodo #connect.
222      \param msMaxConnectionDelay Numero de milisegundos esperados para obtener conexion.
223
224      \see anna::comm::Server::setMaxConnectionDelay.
225   */
226   void setMaxConnectionDelay(const Millisecond &msMaxConnectionDelay)
227   throw() {
228     a_msMaxConnectionDelay = msMaxConnectionDelay;
229   }
230
231   /**
232      Establece el numero maximo de milisegundos que queda bloqueado el proceso/thread a la espera
233      de escribir en un socket cuyo buffer de salida esta lleno.
234
235      \param msMaxWriteDelay Numero de milisegundos esperados en caso de que el buffer del socket se llene.
236   */
237   void setMaxWriteDelay(const Millisecond &msMaxWriteDelay) throw() { a_msMaxWriteDelay = msMaxWriteDelay; }
238
239   /**
240    * Devuelve \em true si el indicador que ignora los mensajes entrantes está activo, o \em false en otro caso.
241    * \return \em true si el indicador que ignora los mensajes entrantes está activo, o \em false en otro caso.
242    */
243   bool getIgnoreIncomingMessages() const throw() { return a_ignoreIncomingMessages; }
244
245   /**
246    * Establece el indicador que provoca ignorar los mensajes entrantes.
247    * \param ignoreIncomingMessages \em true si el indicador que ignora los mensajes entrantes está activo, o \em false en otro caso.
248    */
249   void setIgnoreIncomingMessages(const bool ignoreIncomingMessages) throw() { a_ignoreIncomingMessages = ignoreIncomingMessages; }
250
251   /**
252      Intenta la conexion remota con la direccion indicada en el constructor.
253   */
254   virtual void connect() throw(RuntimeException);
255
256   /**
257      Envia el mensaje recibido como parametro. Al bloque de datos correspondiente a la
258      codificacion del mensaje se incorpora la informacion necesaria para el protocolo
259      de la capa de transporte indicado en el constuctor.
260
261      \param message Mensaje que vamos codificar para enviar a la capa de transporte.
262   */
263   void send(Message& message) throw(RuntimeException);
264
265   /**
266      Envia el mensaje recibido como parametro. Al bloque de datos correspondiente a la
267      codificacion del mensaje se incorpora la informacion necesaria para el protocolo
268      de la capa de transporte indicado en el constuctor.
269
270      \param message Mensaje que vamos codificar para enviar a la capa de transporte.
271   */
272   void send(Message* message) throw(RuntimeException);
273
274   /**
275      Espera la llegada de mensajes por este ClientSocket durante un numero de milisegundos
276      recibido como parametro o hasta que llegue un mensaje. Un ejemplo de uso, que deberia
277      ser completado con el control de condiciones de error podria ser:
278
279      \code
280
281      void f (anna::ClientSocket* clientSocket)
282         throw (anna::RuntimeException)
283      {
284         anna::Guard (clientSocket);
285
286         if (clientSocket->wait (2000) == ClientSocket::Notify::ReceiveData) {
287            const Message* data;
288
289            while ((data = clientSocket->fetch ()) != NULL) {
290               ...
291               ..... procesa los datos ....
292               ...
293            }
294         }
295      }
296
297      \endcode
298
299      \param timeout Numero de milisegundos en que va a quedar bloqueado a la espera de
300      obtener el primer mensaje.
301      \param receive Un valor \em true indica que se tratara el mensaje detectado, un \em false
302      indica que solo esperara la llegada de un bloque de datos, pero no se procesara en absoluto, solo
303      devolvera Notify::ReceiveData.
304
305      \return El resultado de la espera. En caso de no haber recibido ningun mensaje
306      devolvera comm::Socket::Notify::None.
307
308      \warning \li El hilo de ejecucion que invoque a este metodo queda bloqueado durante el
309      tiempo que indica el 'timeout' o hasta que llegue un mensaje.
310      \li La invocacion al metodo #receive y al #fetch deben estar protegidas por la misma
311      seccion critica.
312   */
313   Notify::_v wait(const Millisecond &timeout, const bool receive = true) throw(RuntimeException);
314
315   /**
316      Elimina el contenido del buffer de recepcion.
317   */
318   void forgot() throw();
319
320   /**
321      Obtiene la capa de transporte asociada a este ClientSocket.
322      \return la capa de transporte asociada a este ClientSocket.
323   */
324   Transport* getTransport() throw(RuntimeException) {
325     return (a_transport != NULL) ? a_transport : reserveTransport();
326   }
327
328   /**
329      Obtiene el receptor asociado a este ClientSocket. Puede ser NULL
330      \return el receptor asociado a este ClientSocket.Puede ser NULL
331      \warning Exclusivamente uso interno.
332   */
333   Receiver* getReceiver() throw(RuntimeException) {
334     return (a_receiver != NULL) ? a_receiver : ((a_receiverFactory == NULL) ? NULL : reserveReceiver());
335   }
336
337   /**
338    * Establece el receptor externo que tratara los mensajes recibidos por este socket.
339    * Si el receptor se establece directamente por el usuario, sin tener una factoria de receptores
340    * intermedia, invoca directamente al metodo anna::comm::Receiver::initialize.
341    * \param receive La instancia del receptor externo que tratar los mensajes de este socket.
342    */
343   void setReceiver(Receiver* receive) throw(RuntimeException);
344
345   /**
346      Activa la solicitud de cierre del socket, que se llevara a cabo cuando el nucleo considere
347      que se puede realizar de forma segura, desde el punto de vista de la informacion bloqueada
348      por secciones criticas en los distintos componentes, que hacen uso del socket.
349   */
350   void requestClose() throw();
351
352   /**
353      Devuelve el estado de la solicitud de cierre del socket.
354      \return el estado de la solicitud de cierre del socket.
355   */
356   bool hasRequestedClose() const throw() { return (a_status & Status::ClosePending) != 0; }
357
358   /**
359      Devuelve una cadena con la informacin referente a este socket.
360      \return Una cadena con la informacin referente a este socket.
361   */
362   virtual std::string asString() const throw();
363
364   /**
365      Devuelve un nodo XML con la informacin referente a este objeto.
366      \param parent Nodo XML a partir del cual introducir la informacin.
367      \return Un nodo XML con la informacin referente a este objeto.
368   */
369   virtual xml::Node* asXML(xml::Node* parent) const throw(RuntimeException);
370
371   /**
372      Devuelve el nombre logico de esta clase.
373      \return el nombre logico de esta clase.
374   */
375   static const char* className() throw() { return "anna::comm::ClientSocket"; }
376
377 protected:
378   struct Status {
379     enum _v { None = 0, Connected = 1, Corrupt = 2, ClosePending = 4, Working = 8 };
380
381     static std::string asString(const int status) throw();
382   };
383
384   Transport* a_transport;
385   AccessPoint a_remoteAccessPoint;
386
387   /**
388      Devuelve el numero de bytes contenido en el buffer intermedio de recepcion.
389      \return El numero de bytes contenido en el buffer intermedio de recepcion.
390   */
391   int getBufferSize() const throw() { return a_data.getSize() - a_offset; }
392
393   /**
394      Devuelve el numero maximo de bytes que puede tener en el buffer intermedio de recepcion.
395      \return el numero maximo de bytes que puede tener en el buffer intermedio de recepcion.
396   */
397   int getReceiveBufferSize() const throw() { return a_rcvBufferSize; }
398
399   /**
400      Obtiene los parametros de funcionamiento del Socket.
401      \warning Exclusivamente uso interno.
402   */
403   void getSocketOptions() throw(RuntimeException);
404
405   /**
406      Recupera el ultimo mensaje recibido.
407      El invocador debera haber establecido la forma de asegurar que hay que hay datos
408       disponibles. Ademas debera establecer el tratamiento adecuado para cada uno
409      de los posibles resultados.
410      \return El resultado de la operacion.
411      \warning La invocacion al metodo #receive y al #fetch deben estar protegidas por la
412      misma seccion critica. Por ejemplo:
413
414      \code
415
416      void f (anna::ClientSocket* clientSocket)
417         throw (anna::RuntimeException)
418      {
419         anna::Guard (clientSocket);
420
421         if (clientSocket->receive () == ClientSocket::Notify::ReceiveData) {
422            const Message* data;
423
424            while ((data = clientSocket->fetch ()) != NULL) {
425               ...
426               ..... procesa los datos ....
427               ...
428            }
429         }
430      }
431
432      \endcode
433   */
434   Notify::_v receive() throw(RuntimeException);
435
436   /**
437      Recupera el ultimo bloque de datos recuperado mediante el metodo #receive o #wait.
438      El bloque de datos deberia ser interpretado segun las reglas de la capa de transporte
439      asociada a este ClientSocket.
440
441      \return El ultimo bloque recuperado mediante el metodo #receive o #wait. Puede ser NULL si no hay
442      disponible ningun bloque.
443
444      \warning Exclusivamente uso interno. Debe invocarse con una seccion
445      critica activa sobre el ClientSocket.
446   */
447   const DataBlock* fetch() throw(RuntimeException);
448
449   void activate(const Status::_v v) throw() { a_status |= v; }
450   void deactivate(const Status::_v v) throw() { a_status &= ~v; }
451   void deactivate(const int v) throw() { a_status &= ~v; }
452
453   virtual int do_connect(const sockaddr*, const int len) throw(RuntimeException);
454   virtual void do_write(const DataBlock&) throw(RuntimeException);
455   virtual int do_read(const char* data, const int size) throw(RuntimeException);
456   virtual void do_close() throw();
457
458 private:
459   struct PendingBytes {
460     Millisecond validUntil;
461     int bytesToRead;
462
463     PendingBytes() { validUntil = 0; bytesToRead = 0; }
464   };
465
466   //-----------------------------------------------------------------------------------------
467   // a_status : Combinacion de bit's que indica el estado de la instancia.
468   // a_expectedSize: Numero de bytes esperados. valdra -1 si todavia no hay informacion
469   //                 para calcularlo.
470   // a_data: Ultimo bloque de datos leido. Puede contener un numero indeterminado de mensajes
471   // a_buffer: Referencia al espacio que ocupa el mensaje que estamos tratanto. Apunta
472   // a un espacio dentro de 'a_data'-
473   // a_reader: Buffer usado para leer los datos del Socket.
474   // a_offset: Desplazamiento que hay que aplicar sobre los datos contenidos en 'a_data'.
475   // a_cachedServer: Ultimo server calculado en el metodo getServer.
476   //-----------------------------------------------------------------------------------------
477   int a_status;
478   int a_expectedSize;
479   DataBlock a_data;
480   Buffer a_buffer;
481   DataBlock a_reader;
482   int a_offset;
483   comm::Server* a_cachedServer;
484   Millisecond a_msMaxConnectionDelay;
485   Millisecond a_msMaxWriteDelay;
486   int a_rcvBufferSize;
487   Receiver* a_receiver;
488   mutable PendingBytes a_pendingBytes;
489   bool a_ignoreIncomingMessages;
490
491   void initialize() throw();
492   void calculeExpectedSize(const DataBlock&) throw();
493   Transport* reserveTransport() throw(RuntimeException);
494   Transport* unsafe_reserveTransport() throw(RuntimeException);
495   Receiver* reserveReceiver() throw(RuntimeException);
496   int getTotalPendingBytes() const throw();
497
498   friend class Communicator;
499   // unsafe_reserveTransport
500
501
502   friend class CongestionController;
503   friend class handler::MetaClientSocket;
504   friend class handler::DatagramSocket;
505 };
506
507 }
508 }
509
510 #endif
511