First commit
[anna.git] / include / anna / comm / comm.hpp
1 // ANNA - Anna is Not 'N' Anymore
2 //
3 // (c) Copyright 2005-2014 Eduardo Ramos Testillano & Francisco Ruiz Rayo
4 //
5 // https://bitbucket.org/testillano/anna
6 //
7 // Redistribution and use in source and binary forms, with or without
8 // modification, are permitted provided that the following conditions
9 // are met:
10 //
11 //     * Redistributions of source code must retain the above copyright
12 // notice, this list of conditions and the following disclaimer.
13 //     * Redistributions in binary form must reproduce the above
14 // copyright notice, this list of conditions and the following disclaimer
15 // in the documentation and/or other materials provided with the
16 // distribution.
17 //     * Neither the name of Google Inc. nor the names of its
18 // contributors may be used to endorse or promote products derived from
19 // this software without specific prior written permission.
20 //
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 //
33 // Authors: eduardo.ramos.testillano@gmail.com
34 //          cisco.tierra@gmail.com
35
36
37 #ifndef anna_comm_comm_hpp
38 #define anna_comm_comm_hpp
39
40 namespace anna {
41 /**
42 Proporciona las clases necesarias para la comunicacion entre procesos.
43
44 A continuacion presentamos el codigo de ejemplo de un servidor aritmetico. Recibe dos operadores y una
45 operacion (+,-,*,/) y devuelve el resultado de la operacion. Para estudiar los sistemas de control de
46 congestion hemos incorporado la posibilidad de que el servidor espere un numero de segundo indeterminado
47 antes de dar la respuesta.
48
49 \code
50
51    #include <iostream>
52
53    #include <anna.h>
54    #include <anna/comm/h>
55
56    #include <test.Request.h>
57    #include <test.Response.h>
58
59    using namespace std;
60    using namespace test;
61
62    //-----------------------------------------------------------------------------------------
63    // Define el comunicador de nuestra aplicacin.
64    //
65    // Las peticiones y respuestas van codificadas mediante un comm::Codec pero podriamos
66    // haber utilizado cualquier otro medio de codificacin ya que la capa de transporte
67    // es totalmente independiente del contenido del mensaje.
68    //
69    // De cara a la capa de transporte lo unico que importa es el cliente y el servidor
70    // codifiquen/decodifiquen de la misma forma.
71    //-----------------------------------------------------------------------------------------
72    class MyCommunicator : public comm::Communicator {
73    public:
74       MyCommunicator () {;}
75
76       void setDelay (const Millisecond &delay) throw () { a_delay = delay; }
77
78    private:
79       Request a_request;
80       Response a_response;
81       Millisecond a_delay;
82
83       void eventReceiveMessage (comm::ClientSocket &, const DataBlock& data)
84          throw (RuntimeException);
85    };
86
87    class ArithmeticServer : public comm::Application {
88    public:
89       ArithmeticServer ();
90
91    private:
92       MyCommunicator a_communicator;
93       comm::ServerSocket* a_serverSocket;
94
95       void initialize () throw (RuntimeException);
96       void run () throw (RuntimeException);
97    };
98
99    using namespace std;
100    using namespace anna::comm;
101
102    int main (int argc, const char** argv)
103    {
104       CommandLine& commandLine (CommandLine::instantiate ());
105       ArithmeticServer app;
106
107       srand (time (NULL));
108
109       try {
110          commandLine.initialize (argv, argc);
111          commandLine.verify ();
112
113          Logger::setLevel (Logger::Debug);
114          Logger::initialize ("arithmeticServer", new anna::TraceWriter ("file.trace", 4048000));
115
116          app.start ();
117       }
118       catch (Exception& ex) {
119          cout << ex.asString () << endl;
120       }
121
122       return 0;
123    }
124
125    ArithmeticServer::ArithmeticServer () :
126       Application ("arithmeticServer", "Servidor de operaciones aritmeticas", "1.0")
127    {
128       CommandLine& commandLine (CommandLine::instantiate ());
129
130       commandLine.add ("p", CommandLine::Argument::Mandatory, "Puerto en el que atender peticiones");
131       commandLine.add ("a", CommandLine::Argument::Mandatory, "Direccin IP en la que atender");
132       commandLine.add ("d", CommandLine::Argument::Mandatory, "Retardo aplicado a la contestacio");
133       commandLine.add ("r", CommandLine::Argument::Optional, "Indicador de reuso de direccin", false);
134       commandLine.add ("limit", CommandLine::Argument::Mandatory, "% de ocupacion que permitimos");
135    }
136
137    //-----------------------------------------------------------------------------------------
138    // Inicializa el servidor de sockets.
139    //-----------------------------------------------------------------------------------------
140    void ArithmeticServer::initialize ()
141       throw (RuntimeException)
142    {
143       CommandLine& cl (CommandLine::instantiate ());
144
145       int port = cl.getIntegerValue ("p");
146       const comm::Device* device = Network::instantiate ().find (Device::asAddress (cl.getValue ("a")));
147
148       a_serverSocket = new ServerSocket (INetAddress (device, port), cl.exists ("r"));
149    }
150
151    //-----------------------------------------------------------------------------------------
152    // Atiende las peticiones.
153    // Cuando hay un nuevo mensaje invocar�a Communicator::eventReceiveMessage
154    //-----------------------------------------------------------------------------------------
155    void ArithmeticServer::run ()
156       throw (RuntimeException)
157    {
158       CommandLine& cl (CommandLine::instantiate ());
159
160       a_communicator.attach (a_serverSocket);
161       a_communicator.setDelay (cl.getIntegerValue ("d"));
162
163       CongestionController::instantiate ().setLimit (cl.getIntegerValue ("limit"));
164
165       a_communicator.accept ();
166    }
167
168    //-----------------------------------------------------------------------------------------
169    // Manejador de peticiones.
170    // Calcular�la operacin solicitada y devolver�el resultado.
171    //
172    // clientSocket: Socket cliente por el que podemos responder a la peticin.
173    // transport: Instancia del transporto que ha interpretado el mensaje (getMessage).
174    //-----------------------------------------------------------------------------------------
175    void MyCommunicator::eventReceiveMessage (ClientSocket& clientSocket, const DataBlock& data)
176       throw (RuntimeException)
177    {
178       LOGMETHOD (TraceMethod tm ("MyCommunicator", "eventReceiveMessage", ANNA_FILE_LOCATION));
179
180       static int messageCounter = 0;
181       static int successCounter = 0;
182
183       int value;
184
185       CongestionController& congestionController = CongestionController::instantiate ();
186
187       messageCounter ++;
188
189       if (congestionController.getAdvice (clientSocket) == CongestionController::Advice::Discard)
190          return;
191
192       successCounter ++;
193
194       int random = rand () % (a_delay / 10);
195       int sign = rand () % 2;
196
197       if (sign == 0)
198          random *= -1;
199
200       anna::functions::sleep (a_delay + random);
201
202       a_request.decode (data);
203
204       a_response.x = a_request.x;
205       a_response.y = a_request.y;
206
207       switch (a_response.op = a_request.op) {
208          case '+':
209             a_response.result = a_request.x + a_request.y;
210             break;
211          case '-':
212             a_response.result = a_request.x - a_request.y;
213             break;
214          case '*':
215             a_response.result = a_request.x * a_request.y;
216             break;
217          case '/':
218             a_response.result = (a_request.y != 0) ? (a_request.x / a_request.y): 0;
219             break;
220       }
221
222       LOGDEBUG (
223          string msg = anna::functions::asString (
224             "%d %c %d = %d", a_request.x, a_request.op, a_request.y, a_response.result
225          );
226          Logger::debug (msg, ANNA_FILE_LOCATION);
227       );
228
229       try {
230          clientSocket.send (a_response.code ());
231       }
232       catch (Exception& ex) {
233          ex.trace ();
234       }
235    }
236 \endcode
237
238 El siguiente ejemplo muestra un cliente correspondiente al servidor anterior, que lanza un numero
239 determinado de peticiones por segundo.
240
241 \code
242    #include <iostream>
243
244    #include <string.h>
245
246    #include <anna.h>
247    #include <anna/comm/h>
248
249    #include <anna.timex.Engine.h>
250    #include <anna.timex.Clock.h>
251
252    #include <test.Response.h>
253    #include <test.Request.h>
254
255    class Sender : public timex::Clock {
256    public:
257       Sender () : Clock ("Sender", 250), a_messageBySecond (0), a_nquarter (0) {;}
258
259       void setMessageBySecond (const int messageBySecond) throw () { a_messageBySecond = messageBySecond; }
260
261    private:
262       int a_messageBySecond;
263       int a_nquarter;
264       test::Request a_request;
265
266       void tick () throw (RuntimeException);
267    };
268
269    //-----------------------------------------------------------------------------------------
270    // Define el comunicador de nuestra aplicacin.
271    //
272    // Las peticiones y respuestas van codificadas mediante un comm::Codec pero podriamos
273    // haber utilizado cualquier otro medio de codificacin ya que la capa de transporte
274    // es totalmente independiente del contenido del mensaje.
275    //-----------------------------------------------------------------------------------------
276    class MyCommunicator : public Communicator {
277    public:
278       MyCommunicator () : Communicator () {;}
279
280    private:
281       test::Response a_response;
282
283       void eventReceiveMessage (ClientSocket &, const DataBlock&)
284          throw (RuntimeException);
285    };
286
287    class HeavyClient : public anna::comm::Application {
288    public:
289       HeavyClient ();
290
291       Server* getServer () const throw () { return a_server; }
292
293    private:
294       MyCommunicator a_communicator;
295       timex::Engine a_timeController;
296       Sender a_sender;
297       Server* a_server;
298
299       void initialize () throw (RuntimeException);
300       void run () throw (RuntimeException);
301    };
302
303    using namespace std;
304
305    int main (int argc, const char** argv)
306    {
307       CommandLine& commandLine (CommandLine::instantiate ());
308       HeavyClient app;
309
310       srand (time (NULL));
311
312       try {
313          commandLine.initialize (argv, argc);
314          commandLine.verify ();
315
316          Logger::setLevel (Logger::Debug);
317          Logger::initialize ("arithmeticClient", new TraceWriter ("file.trace", 1048000));
318
319          app.start ();
320       }
321       catch (Exception& ex) {
322          cout << ex.asString () << endl;
323       }
324
325       return 0;
326    }
327
328    HeavyClient::HeavyClient () :
329       Application ("arithmeticClient", "Cliente de operaciones aritmeticas", "1.0"),
330       a_communicator (),
331       a_timeController ((Millisecond)10000, (Millisecond)250)
332    {
333       CommandLine& commandLine (CommandLine::instantiate ());
334
335       commandLine.add ("p", CommandLine::Argument::Mandatory, "Puerto en el que el servidor atiende respuestas.");
336       commandLine.add ("a", CommandLine::Argument::Mandatory, "Direccin IP Puerto en el que el servidor atiende respuestas.");
337       commandLine.add ("n", CommandLine::Argument::Mandatory, "Numero de mensajes por segundo");
338
339    }
340
341    //-----------------------------------------------------------------------------------------
342    // Inicializa las conexiones usadas por la aplicacin.
343    //
344    // Primero establece los datos para la conexin con el servidor aritm�ico y luego
345    // establece los datos del socket servidor necesario para atender respuestas y aceptar
346    // nuevas conexiones de procesos clientes (que no ser�el caso).
347    // Configura el men para que trabaje con el comunicador de esta aplicacin.
348    //-----------------------------------------------------------------------------------------
349    void HeavyClient::initialize ()
350       throw (RuntimeException)
351    {
352       CommandLine& cl (CommandLine::instantiate ());
353
354       Network& network = Network::instantiate ();
355
356       Host* host = network.find ("host000");
357       host->assign (network.find (Device::asAddress (cl.getValue ("a"))));
358       a_server = host->createServer ("rawServer", cl.getIntegerValue ("p"), true);
359       a_sender.setMessageBySecond (cl.getIntegerValue ("n"));
360    }
361
362    //-----------------------------------------------------------------------------------------
363    // Activa el reloj que dara el pulso para enviar los mensajes al servidor y comienza a
364    // atender las peticiones.
365    //-----------------------------------------------------------------------------------------
366    void HeavyClient::run ()
367       throw (RuntimeException)
368    {
369       a_timeController.activate (a_sender);
370
371       a_communicator.accept ();
372    }
373
374    //-----------------------------------------------------------------------------------------
375    // Manejador de respuesta.
376    //
377    // clientSocket: Socket cliente por el que podemos responder a la peticin.
378    // transport: Instancia del transporto que ha interpretado el mensaje (getMessage).
379    //-----------------------------------------------------------------------------------------
380    void MyCommunicator::eventReceiveMessage (ClientSocket&, const DataBlock& data)
381       throw (RuntimeException)
382    {
383       a_response.decode (data);
384
385       string msg = anna::functions::asString (
386          "%d %c %d = %d", a_response.x, a_response.op, a_response.y, a_response.result
387       );
388       Logger::information (msg, ANNA_FILE_LOCATION);
389    }
390
391    void Sender::tick ()
392       throw (RuntimeException)
393    {
394       Server* server = static_cast <HeavyClient&> (anna::app::functions::getApp ()).getServer ();
395       Communicator* communicator = anna::app::functions::component <Communicator> (ANNA_FILE_LOCATION);
396
397       int maxn = a_messageBySecond / 4;
398
399       if (++ a_nquarter == 4) {
400          maxn += a_messageBySecond % 4;
401          a_nquarter = 0;
402       }
403
404       if (maxn == 0)
405          return;
406
407       maxn = rand () % maxn;
408
409       for (int n = 0; n < maxn; n ++) {
410          a_request.op = '+';
411          a_request.x = rand () % 1000;
412          a_request.y = rand () % 1000;
413
414          try {
415             server->send (a_request.code ());
416          }
417          catch (RuntimeException& ex) {
418             ex.trace ();
419             break;
420          }
421       }
422    }
423
424 \endcode
425
426 El ejecutable debera enlazarse con las librerias:
427    \li libanna.core.a
428    \li libanna.xml.a
429    \li libanna.app.a
430    \li libanna.comm.a
431
432 El <b>Packet Header</b> es anna/comm/h
433 */
434 namespace comm {
435 }
436 }
437
438 #include <anna/comm/Application.hpp>
439 #include <anna/comm/Buffer.hpp>
440 #include <anna/comm/ClientSocket.hpp>
441 #include <anna/comm/Codec.hpp>
442 #include <anna/comm/Communicator.hpp>
443 #include <anna/comm/CongestionController.hpp>
444 #include <anna/comm/DatagramSocket.hpp>
445 #include <anna/comm/Device.hpp>
446 #include <anna/comm/Delivery.hpp>
447 #include <anna/comm/DirectTransport.hpp>
448 #include <anna/comm/Host.hpp>
449 #include <anna/comm/LargeBinaryCodec.hpp>
450 #include <anna/comm/LiteTransport.hpp>
451 #include <anna/comm/Message.hpp>
452 #include <anna/comm/Network.hpp>
453 #include <anna/comm/INetAddress.hpp>
454 #include <anna/comm/Transport.hpp>
455 #include <anna/comm/Server.hpp>
456 #include <anna/comm/Service.hpp>
457 #include <anna/comm/ServerAllocator.hpp>
458 #include <anna/comm/ServerSocket.hpp>
459 #include <anna/comm/handler/BinderSocket.hpp>
460 #include <anna/comm/Service.hpp>
461 #include <anna/comm/Status.hpp>
462 #include <anna/comm/Variable.hpp>
463 #include <anna/comm/Receiver.hpp>
464 #include <anna/comm/ReceiverFactory.hpp>
465 #include <anna/comm/ReceiverFactoryImpl.hpp>
466 #include <anna/comm/RoundRobinDelivery.hpp>
467 #include <anna/comm/ByRangeDelivery.hpp>
468 #include <anna/comm/IndexedDelivery.hpp>
469
470 using namespace anna::comm;
471
472 #endif
473