5e47a25cda2fff518da8d31efb31b150690c4454
[anna.git] / example / comm / rrClient / main.cpp
1 // ANNA - Anna is Not Nothingness Anymore                                                         //
2 //                                                                                                //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo                         //
4 //                                                                                                //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite                           //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
7
8
9 /**
10    Realiza peticiones automaticas sobre el servidor de operaciones aritmeticas.
11    
12    Mediante un servicio de reparto por RoundRobin.
13
14    El servidor de este cliente: server.p rserver.p 
15 */
16
17 #include <iostream>
18
19 #include <string.h>
20 #include <unistd.h>
21
22 #include <anna/core/core.hpp>
23 #include <anna/app/functions.hpp>
24 #include <anna/comm/comm.hpp>
25
26 #include <anna/timex/Engine.hpp>
27 #include <anna/timex/Clock.hpp>
28
29 #include <anna/test/Response.hpp>
30 #include <anna/test/Request.hpp>
31
32 class Sender : public anna::timex::Clock {
33 public:
34    Sender () : Clock ("Sender", (Millisecond)1000), 
35       a_messageBySecond (0), 
36       a_nquarter (0), 
37       a_requests ("Request"), 
38       a_errorCounter (0),
39       a_txMessageCounter (0)
40    {;}
41
42    void setMessageBySecond (const int messageBySecond) throw () { a_messageBySecond = messageBySecond; }
43
44    int getTxMessageCounter () const throw () { return a_txMessageCounter; }
45
46 private:
47    int a_messageBySecond;
48    int a_nquarter;
49    int a_errorCounter;
50    int a_txMessageCounter;
51    ThreadData <test::Request> a_requests;
52
53    bool tick () throw (RuntimeException);
54 };
55
56 class MyCommunicator : public Communicator {
57 public:
58    MyCommunicator () : Communicator (), a_avgResponseTime (0), a_rxMessageCounter (0), a_responses ("Response")  {;}
59
60 private:
61    ThreadData <test::Response> a_responses;
62    int a_avgResponseTime;
63    int a_rxMessageCounter;
64
65    void eventReceiveMessage (ClientSocket &, const Message&) throw (RuntimeException);
66 /*
67    void eventBreakConnection (const ClientSocket&) throw ();
68
69    void eventBreakConnection (Server* server) throw () {
70       comm::Communicator::eventBreakConnection (server);
71    }
72 */
73    void eventBreakConnection (const Service* service) throw ();
74    
75    static bool isOk (const test::Response& response) throw ();
76 };
77
78 class RRClient : public anna::comm::Application {
79 public:
80    RRClient ();
81
82    Service* getService () const throw () { return a_service; }
83    const Sender* getSender () const throw () { return &a_sender; }
84
85 private:
86    MyCommunicator a_communicator;
87    anna::timex::Engine a_timeController;
88    Sender a_sender;
89    Service* a_service;
90
91    void initialize () throw (RuntimeException);
92    void run () throw (RuntimeException);
93 };
94
95 using namespace std;
96
97 int main (int argc, const char** argv)
98 {
99    CommandLine& commandLine (CommandLine::instantiate ());
100    RRClient app;
101
102    srand (time (NULL));
103
104    try {
105       commandLine.initialize (argv, argc);
106       commandLine.verify ();
107
108       Logger::setLevel (Logger::Information);
109       string traceFile ("client.");
110       traceFile += anna::functions::asString ((int) getpid ());
111       traceFile += ".trace";
112       Logger::initialize ("arithmeticClient", new TraceWriter (traceFile.c_str (),4096000));
113
114       app.start ();
115    }
116    catch (Exception& ex) {
117       cout << ex.asString () << endl;
118    }
119
120    return 0;
121 }
122
123 RRClient::RRClient () :
124    Application ("arithmeticClient", "Cliente de operaciones aritmeticas", "1.0"),
125    a_communicator (),
126    a_timeController ((Millisecond)1000, (Millisecond)250)
127 {
128    CommandLine& commandLine (CommandLine::instantiate ());
129
130    commandLine.add ("p", CommandLine::Argument::Mandatory, "Puertos de los servidores (separados por comas)");
131    commandLine.add ("a", CommandLine::Argument::Mandatory, "Direccin IP Puerto en el que el servidor atiende respuestas.");
132    commandLine.add ("n", CommandLine::Argument::Mandatory, "Numero de mensajes por segundo");
133    commandLine.add ("trace", CommandLine::Argument::Optional, "Nivel de trazas (debug,warning, error,...)");
134 }
135
136 void RRClient::initialize ()
137    throw (RuntimeException)
138 {
139    CommandLine& cl (CommandLine::instantiate ());
140
141    if (cl.exists ("trace"))
142       Logger::setLevel (Logger::asLevel (cl.getValue ("trace")));
143
144    a_sender.setMessageBySecond (cl.getIntegerValue ("n"));
145       
146    Network& network = Network::instantiate ();
147    Tokenizer ports (cl.getValue ("p"), ",");
148    int port;
149
150    a_service = new comm::RoundRobinDelivery ("Service_Arithmetic", true);
151
152    for (Tokenizer::const_iterator ii = ports.begin (), maxii = ports.end (); ii != maxii; ii ++) {
153       port = atoi (Tokenizer::data (ii));
154       a_service->attach (network.createServer (cl.getValue ("a"), port, true));
155    }
156
157    a_communicator.attach (a_service);      
158 }
159
160 void RRClient::run ()
161    throw (RuntimeException)
162 {
163    a_timeController.activate (a_sender);
164
165    a_communicator.accept ();
166 }
167
168 void MyCommunicator::eventReceiveMessage (ClientSocket&, const Message& message)
169    throw (RuntimeException)
170 {
171   LOGMETHOD (TraceMethod tm ("MyCommunicator", "eventReceiveMessage", ANNA_FILE_LOCATION));
172
173    test::Response& response = a_responses.get ();
174    response.decode (message.getBody ());
175
176    const anna::Millisecond now = anna::functions::millisecond ();
177    const int delay =  now - (Millisecond) response.initTime;
178
179    if (delay > 0 && isOk (response) == true) {
180       a_rxMessageCounter ++;
181       a_avgResponseTime += delay;
182
183       LOGINFORMATION (
184          string msg = anna::functions::asString (
185             "%d %c %d = %d", response.x, response.op, response.y, response.result
186          );
187          msg += anna::functions::asText (" | Delay: ", delay);
188          Logger::information (msg, ANNA_FILE_LOCATION);
189       );
190    }
191    else {
192       LOGWARNING (
193          string msg = anna::functions::asString (
194             "Flip: %d %c %d = %d", response.x, response.op, response.y, response.result
195          );
196          msg += anna::functions::asText (" | Message: ", message.getBody ());
197          msg += anna::functions::asText (" | Delay: ", delay);
198          Logger::warning (msg, ANNA_FILE_LOCATION);
199       );
200    }
201 }
202
203 void MyCommunicator::eventBreakConnection (const Service* service)
204    throw ()
205 {
206   LOGMETHOD (TraceMethod tm ("MyCommunicator", "eventBreakConnection", ANNA_FILE_LOCATION));
207
208    if (a_rxMessageCounter == 0)
209       return;
210
211    comm::Communicator::eventBreakConnection (service);
212
213    if (service->isAvailable () == true) {
214       LOGNOTICE (Logger::notice (service->asString (), ANNA_FILE_LOCATION));
215       return;
216    }
217
218    LOGNOTICE (
219       RRClient& app = static_cast <RRClient&> (anna::app::functions::getApp ());   
220       string msg ("Tiempo medio respuesta: ");
221       msg += anna::functions::asString (a_avgResponseTime / a_rxMessageCounter);
222       msg += " ms";
223       msg += anna::functions::asText (" | Rx: ", a_rxMessageCounter);
224       msg += anna::functions::asText (" | Tx: ", app.getSender ()->getTxMessageCounter ());
225       Logger::notice (msg, ANNA_FILE_LOCATION);
226
227       cout << msg << endl << endl;
228    );
229    requestStop ();
230 }
231
232 bool MyCommunicator::isOk (const test::Response& response) 
233    throw ()
234 {
235    if (response.op != '+' && response.op != '-' && response.op != '*' && response.op != '/') 
236       return false;
237       
238    int result = 0;
239    
240    switch (response.op) {
241       case '+':
242          result = response.x + response.y;
243          break;
244       case '-':
245          result = response.x - response.y;
246          break;
247       case '*':
248          result = response.x * response.y;
249          break;
250       case '/':
251          result = (response.y != 0) ? (response.x / response.y): 0;
252          break;
253    }
254     
255    return result == response.result;   
256 }
257
258 bool Sender::tick ()
259    throw (RuntimeException)
260 {
261   LOGMETHOD (TraceMethod tm (Logger::Local7, "Sender", "tick", ANNA_FILE_LOCATION));
262
263    Service* service = static_cast <RRClient&> (anna::comm::functions::getApp ()).getService ();
264    Communicator* communicator = anna::app::functions::component <Communicator> (ANNA_FILE_LOCATION);
265
266    if (a_messageBySecond == 0) 
267       throw RuntimeException ("Hay que indicar el numero de mensajes por segundo", ANNA_FILE_LOCATION);
268
269
270    if (a_errorCounter > 100) {
271       communicator->requestStop ();
272       Logger::warning ("Terminado por errores continuos en la conexion", ANNA_FILE_LOCATION);
273       return false;
274    }
275
276    test::Request& request = a_requests.get ();
277
278    for (int n = 0; n < a_messageBySecond && communicator->hasRequestedStop () == false; n ++) {
279       request.op = '+';
280       request.x = rand () % 1000;
281       request.y = rand () % 1000;
282       request.initTime = anna::functions::millisecond ();
283
284       try {
285          service->send (request);
286          a_txMessageCounter ++;
287       }
288       catch (RuntimeException& ex) {
289          a_errorCounter ++;
290          ex.trace ();
291          break;
292       }
293    }
294    
295    return true;
296 }
297