bug in RC
[anna.git] / include / anna / comm / comm.hpp
1 // ANNA - Anna is Not Nothingness Anymore                                                         //
2 //                                                                                                //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo                         //
4 //                                                                                                //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite                           //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
7
8
9 #ifndef anna_comm_comm_hpp
10 #define anna_comm_comm_hpp
11
12 namespace anna {
13 /**
14 Proporciona las clases necesarias para la comunicacion entre procesos.
15
16 A continuacion presentamos el codigo de ejemplo de un servidor aritmetico. Recibe dos operadores y una
17 operacion (+,-,*,/) y devuelve el resultado de la operacion. Para estudiar los sistemas de control de
18 congestion hemos incorporado la posibilidad de que el servidor espere un numero de segundo indeterminado
19 antes de dar la respuesta.
20
21 \code
22
23    #include <iostream>
24
25    #include <anna.h>
26    #include <anna/comm/h>
27
28    #include <test.Request.h>
29    #include <test.Response.h>
30
31    using namespace std;
32    using namespace test;
33
34    //-----------------------------------------------------------------------------------------
35    // Define el comunicador de nuestra aplicacin.
36    //
37    // Las peticiones y respuestas van codificadas mediante un comm::Codec pero podriamos
38    // haber utilizado cualquier otro medio de codificacin ya que la capa de transporte
39    // es totalmente independiente del contenido del mensaje.
40    //
41    // De cara a la capa de transporte lo unico que importa es el cliente y el servidor
42    // codifiquen/decodifiquen de la misma forma.
43    //-----------------------------------------------------------------------------------------
44    class MyCommunicator : public comm::Communicator {
45    public:
46       MyCommunicator () {;}
47
48       void setDelay (const Millisecond &delay) throw () { a_delay = delay; }
49
50    private:
51       Request a_request;
52       Response a_response;
53       Millisecond a_delay;
54
55       void eventReceiveMessage (comm::ClientSocket &, const DataBlock& data)
56          throw (RuntimeException);
57    };
58
59    class ArithmeticServer : public comm::Application {
60    public:
61       ArithmeticServer ();
62
63    private:
64       MyCommunicator a_communicator;
65       comm::ServerSocket* a_serverSocket;
66
67       void initialize () throw (RuntimeException);
68       void run () throw (RuntimeException);
69    };
70
71    using namespace std;
72    using namespace anna::comm;
73
74    int main (int argc, const char** argv)
75    {
76       CommandLine& commandLine (CommandLine::instantiate ());
77       ArithmeticServer app;
78
79       srand (time (NULL));
80
81       try {
82          commandLine.initialize (argv, argc);
83          commandLine.verify ();
84
85          Logger::setLevel (Logger::Debug);
86          Logger::initialize ("arithmeticServer", new anna::TraceWriter ("file.trace", 4048000));
87
88          app.start ();
89       }
90       catch (Exception& ex) {
91          cout << ex.asString () << endl;
92       }
93
94       return 0;
95    }
96
97    ArithmeticServer::ArithmeticServer () :
98       Application ("arithmeticServer", "Servidor de operaciones aritmeticas", "1.0")
99    {
100       CommandLine& commandLine (CommandLine::instantiate ());
101
102       commandLine.add ("p", CommandLine::Argument::Mandatory, "Puerto en el que atender peticiones");
103       commandLine.add ("a", CommandLine::Argument::Mandatory, "Direccin IP en la que atender");
104       commandLine.add ("d", CommandLine::Argument::Mandatory, "Retardo aplicado a la contestacio");
105       commandLine.add ("r", CommandLine::Argument::Optional, "Indicador de reuso de direccin", false);
106       commandLine.add ("limit", CommandLine::Argument::Mandatory, "% de ocupacion que permitimos");
107    }
108
109    //-----------------------------------------------------------------------------------------
110    // Inicializa el servidor de sockets.
111    //-----------------------------------------------------------------------------------------
112    void ArithmeticServer::initialize ()
113       throw (RuntimeException)
114    {
115       CommandLine& cl (CommandLine::instantiate ());
116
117       int port = cl.getIntegerValue ("p");
118       const comm::Device* device = Network::instantiate ().find (Device::asAddress (cl.getValue ("a")));
119
120       a_serverSocket = new ServerSocket (INetAddress (device, port), cl.exists ("r"));
121    }
122
123    //-----------------------------------------------------------------------------------------
124    // Atiende las peticiones.
125    // Cuando hay un nuevo mensaje invocar�a Communicator::eventReceiveMessage
126    //-----------------------------------------------------------------------------------------
127    void ArithmeticServer::run ()
128       throw (RuntimeException)
129    {
130       CommandLine& cl (CommandLine::instantiate ());
131
132       a_communicator.attach (a_serverSocket);
133       a_communicator.setDelay (cl.getIntegerValue ("d"));
134
135       CongestionController::instantiate ().setLimit (cl.getIntegerValue ("limit"));
136
137       a_communicator.accept ();
138    }
139
140    //-----------------------------------------------------------------------------------------
141    // Manejador de peticiones.
142    // Calcular�la operacin solicitada y devolver�el resultado.
143    //
144    // clientSocket: Socket cliente por el que podemos responder a la peticin.
145    // transport: Instancia del transporto que ha interpretado el mensaje (getMessage).
146    //-----------------------------------------------------------------------------------------
147    void MyCommunicator::eventReceiveMessage (ClientSocket& clientSocket, const DataBlock& data)
148       throw (RuntimeException)
149    {
150       LOGMETHOD (TraceMethod tm ("MyCommunicator", "eventReceiveMessage", ANNA_FILE_LOCATION));
151
152       static int messageCounter = 0;
153       static int successCounter = 0;
154
155       int value;
156
157       CongestionController& congestionController = CongestionController::instantiate ();
158
159       messageCounter ++;
160
161       if (congestionController.getAdvice (clientSocket) == CongestionController::Advice::Discard)
162          return;
163
164       successCounter ++;
165
166       int random = rand () % (a_delay / 10);
167       int sign = rand () % 2;
168
169       if (sign == 0)
170          random *= -1;
171
172       anna::functions::sleep (a_delay + random);
173
174       a_request.decode (data);
175
176       a_response.x = a_request.x;
177       a_response.y = a_request.y;
178
179       switch (a_response.op = a_request.op) {
180          case '+':
181             a_response.result = a_request.x + a_request.y;
182             break;
183          case '-':
184             a_response.result = a_request.x - a_request.y;
185             break;
186          case '*':
187             a_response.result = a_request.x * a_request.y;
188             break;
189          case '/':
190             a_response.result = (a_request.y != 0) ? (a_request.x / a_request.y): 0;
191             break;
192       }
193
194       LOGDEBUG (
195          string msg = anna::functions::asString (
196             "%d %c %d = %d", a_request.x, a_request.op, a_request.y, a_response.result
197          );
198          Logger::debug (msg, ANNA_FILE_LOCATION);
199       );
200
201       try {
202          clientSocket.send (a_response.code ());
203       }
204       catch (Exception& ex) {
205          ex.trace ();
206       }
207    }
208 \endcode
209
210 El siguiente ejemplo muestra un cliente correspondiente al servidor anterior, que lanza un numero
211 determinado de peticiones por segundo.
212
213 \code
214    #include <iostream>
215
216    #include <string.h>
217
218    #include <anna.h>
219    #include <anna/comm/h>
220
221    #include <anna.timex.Engine.h>
222    #include <anna.timex.Clock.h>
223
224    #include <test.Response.h>
225    #include <test.Request.h>
226
227    class Sender : public timex::Clock {
228    public:
229       Sender () : Clock ("Sender", 250), a_messageBySecond (0), a_nquarter (0) {;}
230
231       void setMessageBySecond (const int messageBySecond) throw () { a_messageBySecond = messageBySecond; }
232
233    private:
234       int a_messageBySecond;
235       int a_nquarter;
236       test::Request a_request;
237
238       void tick () throw (RuntimeException);
239    };
240
241    //-----------------------------------------------------------------------------------------
242    // Define el comunicador de nuestra aplicacin.
243    //
244    // Las peticiones y respuestas van codificadas mediante un comm::Codec pero podriamos
245    // haber utilizado cualquier otro medio de codificacin ya que la capa de transporte
246    // es totalmente independiente del contenido del mensaje.
247    //-----------------------------------------------------------------------------------------
248    class MyCommunicator : public Communicator {
249    public:
250       MyCommunicator () : Communicator () {;}
251
252    private:
253       test::Response a_response;
254
255       void eventReceiveMessage (ClientSocket &, const DataBlock&)
256          throw (RuntimeException);
257    };
258
259    class HeavyClient : public anna::comm::Application {
260    public:
261       HeavyClient ();
262
263       Server* getServer () const throw () { return a_server; }
264
265    private:
266       MyCommunicator a_communicator;
267       timex::Engine a_timeController;
268       Sender a_sender;
269       Server* a_server;
270
271       void initialize () throw (RuntimeException);
272       void run () throw (RuntimeException);
273    };
274
275    using namespace std;
276
277    int main (int argc, const char** argv)
278    {
279       CommandLine& commandLine (CommandLine::instantiate ());
280       HeavyClient app;
281
282       srand (time (NULL));
283
284       try {
285          commandLine.initialize (argv, argc);
286          commandLine.verify ();
287
288          Logger::setLevel (Logger::Debug);
289          Logger::initialize ("arithmeticClient", new TraceWriter ("file.trace", 1048000));
290
291          app.start ();
292       }
293       catch (Exception& ex) {
294          cout << ex.asString () << endl;
295       }
296
297       return 0;
298    }
299
300    HeavyClient::HeavyClient () :
301       Application ("arithmeticClient", "Cliente de operaciones aritmeticas", "1.0"),
302       a_communicator (),
303       a_timeController ((Millisecond)10000, (Millisecond)250)
304    {
305       CommandLine& commandLine (CommandLine::instantiate ());
306
307       commandLine.add ("p", CommandLine::Argument::Mandatory, "Puerto en el que el servidor atiende respuestas.");
308       commandLine.add ("a", CommandLine::Argument::Mandatory, "Direccin IP Puerto en el que el servidor atiende respuestas.");
309       commandLine.add ("n", CommandLine::Argument::Mandatory, "Numero de mensajes por segundo");
310
311    }
312
313    //-----------------------------------------------------------------------------------------
314    // Inicializa las conexiones usadas por la aplicacin.
315    //
316    // Primero establece los datos para la conexin con el servidor aritm�ico y luego
317    // establece los datos del socket servidor necesario para atender respuestas y aceptar
318    // nuevas conexiones de procesos clientes (que no ser�el caso).
319    // Configura el men para que trabaje con el comunicador de esta aplicacin.
320    //-----------------------------------------------------------------------------------------
321    void HeavyClient::initialize ()
322       throw (RuntimeException)
323    {
324       CommandLine& cl (CommandLine::instantiate ());
325
326       Network& network = Network::instantiate ();
327
328       Host* host = network.find ("host000");
329       host->assign (network.find (Device::asAddress (cl.getValue ("a"))));
330       a_server = host->createServer ("rawServer", cl.getIntegerValue ("p"), true);
331       a_sender.setMessageBySecond (cl.getIntegerValue ("n"));
332    }
333
334    //-----------------------------------------------------------------------------------------
335    // Activa el reloj que dara el pulso para enviar los mensajes al servidor y comienza a
336    // atender las peticiones.
337    //-----------------------------------------------------------------------------------------
338    void HeavyClient::run ()
339       throw (RuntimeException)
340    {
341       a_timeController.activate (a_sender);
342
343       a_communicator.accept ();
344    }
345
346    //-----------------------------------------------------------------------------------------
347    // Manejador de respuesta.
348    //
349    // clientSocket: Socket cliente por el que podemos responder a la peticin.
350    // transport: Instancia del transporto que ha interpretado el mensaje (getMessage).
351    //-----------------------------------------------------------------------------------------
352    void MyCommunicator::eventReceiveMessage (ClientSocket&, const DataBlock& data)
353       throw (RuntimeException)
354    {
355       a_response.decode (data);
356
357       string msg = anna::functions::asString (
358          "%d %c %d = %d", a_response.x, a_response.op, a_response.y, a_response.result
359       );
360       Logger::information (msg, ANNA_FILE_LOCATION);
361    }
362
363    void Sender::tick ()
364       throw (RuntimeException)
365    {
366       Server* server = static_cast <HeavyClient&> (anna::app::functions::getApp ()).getServer ();
367       Communicator* communicator = anna::app::functions::component <Communicator> (ANNA_FILE_LOCATION);
368
369       int maxn = a_messageBySecond / 4;
370
371       if (++ a_nquarter == 4) {
372          maxn += a_messageBySecond % 4;
373          a_nquarter = 0;
374       }
375
376       if (maxn == 0)
377          return;
378
379       maxn = rand () % maxn;
380
381       for (int n = 0; n < maxn; n ++) {
382          a_request.op = '+';
383          a_request.x = rand () % 1000;
384          a_request.y = rand () % 1000;
385
386          try {
387             server->send (a_request.code ());
388          }
389          catch (RuntimeException& ex) {
390             ex.trace ();
391             break;
392          }
393       }
394    }
395
396 \endcode
397
398 El ejecutable debera enlazarse con las librerias:
399    \li libanna.core.a
400    \li libanna.xml.a
401    \li libanna.app.a
402    \li libanna.comm.a
403
404 El <b>Packet Header</b> es anna/comm/h
405 */
406 namespace comm {
407 }
408 }
409
410 #include <anna/comm/Application.hpp>
411 #include <anna/comm/Buffer.hpp>
412 #include <anna/comm/ClientSocket.hpp>
413 #include <anna/comm/Codec.hpp>
414 #include <anna/comm/Communicator.hpp>
415 #include <anna/comm/CongestionController.hpp>
416 #include <anna/comm/DatagramSocket.hpp>
417 #include <anna/comm/Device.hpp>
418 #include <anna/comm/Delivery.hpp>
419 #include <anna/comm/DirectTransport.hpp>
420 #include <anna/comm/Host.hpp>
421 #include <anna/comm/LargeBinaryCodec.hpp>
422 #include <anna/comm/LiteTransport.hpp>
423 #include <anna/comm/Message.hpp>
424 #include <anna/comm/Network.hpp>
425 #include <anna/comm/INetAddress.hpp>
426 #include <anna/comm/Transport.hpp>
427 #include <anna/comm/Server.hpp>
428 #include <anna/comm/Service.hpp>
429 #include <anna/comm/ServerAllocator.hpp>
430 #include <anna/comm/ServerSocket.hpp>
431 #include <anna/comm/handler/BinderSocket.hpp>
432 #include <anna/comm/Service.hpp>
433 #include <anna/comm/Status.hpp>
434 #include <anna/comm/Variable.hpp>
435 #include <anna/comm/Receiver.hpp>
436 #include <anna/comm/ReceiverFactory.hpp>
437 #include <anna/comm/ReceiverFactoryImpl.hpp>
438 #include <anna/comm/RoundRobinDelivery.hpp>
439 #include <anna/comm/ByRangeDelivery.hpp>
440 #include <anna/comm/IndexedDelivery.hpp>
441
442 using namespace anna::comm;
443
444 #endif
445