9a1ba5f4dffdb7797e8c221525071ff0355d5b60
[anna.git] / example / comm / rrClient / main.cpp
1 // ANNA - Anna is Not Nothingness Anymore                                                         //
2 //                                                                                                //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo                         //
4 //                                                                                                //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite                           //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
7
8
9 /**
10    Realiza peticiones automaticas sobre el servidor de operaciones aritmeticas.
11    
12    Mediante un servicio de reparto por RoundRobin.
13
14    El servidor de este cliente: server.p rserver.p 
15 */
16
17 #include <iostream>
18
19 #include <string.h>
20 #include <unistd.h>
21
22 #include <anna/core/core.hpp>
23 #include <anna/app/functions.hpp>
24 #include <anna/comm/comm.hpp>
25
26 #include <anna/timex/Engine.hpp>
27 #include <anna/timex/Clock.hpp>
28
29 #include <anna/test/Response.hpp>
30 #include <anna/test/Request.hpp>
31
32 class Sender : public anna::timex::Clock {
33 public:
34    Sender () : Clock ("Sender", (Millisecond)1000), 
35       a_messageBySecond (0), 
36       a_nquarter (0), 
37       a_requests ("Request"), 
38       a_errorCounter (0),
39       a_txMessageCounter (0)
40    {;}
41
42    void setMessageBySecond (const int messageBySecond) { a_messageBySecond = messageBySecond; }
43
44    int getTxMessageCounter () const { return a_txMessageCounter; }
45
46 private:
47    int a_messageBySecond;
48    int a_nquarter;
49    int a_errorCounter;
50    int a_txMessageCounter;
51    ThreadData <test::Request> a_requests;
52
53    bool tick () noexcept(false);
54 };
55
56 class MyCommunicator : public Communicator {
57 public:
58    MyCommunicator () : Communicator (), a_avgResponseTime (0), a_rxMessageCounter (0), a_responses ("Response")  {;}
59
60 private:
61    using Communicator::eventBreakConnection;
62
63    ThreadData <test::Response> a_responses;
64    int a_avgResponseTime;
65    int a_rxMessageCounter;
66
67    void eventReceiveMessage (ClientSocket &, const Message&) noexcept(false);
68 /*
69    void eventBreakConnection (const ClientSocket&) ;
70
71    void eventBreakConnection (Server* server) {
72       comm::Communicator::eventBreakConnection (server);
73    }
74 */
75    void eventBreakConnection (const Service* service) ;
76    
77    static bool isOk (const test::Response& response) ;
78 };
79
80 class RRClient : public anna::comm::Application {
81 public:
82    RRClient ();
83
84    Service* getService () const { return a_service; }
85    const Sender* getSender () const { return &a_sender; }
86
87 private:
88    MyCommunicator a_communicator;
89    anna::timex::Engine a_timeController;
90    Sender a_sender;
91    Service* a_service;
92
93    void initialize () noexcept(false);
94    void run () noexcept(false);
95 };
96
97 using namespace std;
98
99 int main (int argc, const char** argv)
100 {
101    CommandLine& commandLine (CommandLine::instantiate ());
102    RRClient app;
103
104    srand (time (NULL));
105
106    try {
107       commandLine.initialize (argv, argc);
108       commandLine.verify ();
109
110       Logger::setLevel (Logger::Information);
111       string traceFile ("client.");
112       traceFile += anna::functions::asString ((int) getpid ());
113       traceFile += ".trace";
114       Logger::initialize ("arithmeticClient", new TraceWriter (traceFile.c_str (),4096000));
115
116       app.start ();
117    }
118    catch (Exception& ex) {
119       cout << ex.asString () << endl;
120    }
121
122    return 0;
123 }
124
125 RRClient::RRClient () :
126    Application ("arithmeticClient", "Cliente de operaciones aritmeticas", "1.0"),
127    a_communicator (),
128    a_timeController ((Millisecond)1000, (Millisecond)250)
129 {
130    CommandLine& commandLine (CommandLine::instantiate ());
131
132    commandLine.add ("p", CommandLine::Argument::Mandatory, "Puertos de los servidores (separados por comas)");
133    commandLine.add ("a", CommandLine::Argument::Mandatory, "Direccin IP Puerto en el que el servidor atiende respuestas.");
134    commandLine.add ("n", CommandLine::Argument::Mandatory, "Numero de mensajes por segundo");
135    commandLine.add ("trace", CommandLine::Argument::Optional, "Nivel de trazas (debug,warning, error,...)");
136 }
137
138 void RRClient::initialize ()
139    noexcept(false)
140 {
141    CommandLine& cl (CommandLine::instantiate ());
142
143    if (cl.exists ("trace"))
144       Logger::setLevel (Logger::asLevel (cl.getValue ("trace")));
145
146    a_sender.setMessageBySecond (cl.getIntegerValue ("n"));
147       
148    Network& network = Network::instantiate ();
149    Tokenizer ports (cl.getValue ("p"), ",");
150    int port;
151
152    a_service = new comm::RoundRobinDelivery ("Service_Arithmetic", true);
153
154    for (Tokenizer::const_iterator ii = ports.begin (), maxii = ports.end (); ii != maxii; ii ++) {
155       port = atoi (Tokenizer::data (ii));
156       a_service->attach (network.createServer (cl.getValue ("a"), port, true));
157    }
158
159    a_communicator.attach (a_service);      
160 }
161
162 void RRClient::run ()
163    noexcept(false)
164 {
165    a_timeController.activate (a_sender);
166
167    a_communicator.accept ();
168 }
169
170 void MyCommunicator::eventReceiveMessage (ClientSocket&, const Message& message)
171    noexcept(false)
172 {
173   LOGMETHOD (TraceMethod tm ("MyCommunicator", "eventReceiveMessage", ANNA_FILE_LOCATION));
174
175    test::Response& response = a_responses.get ();
176    response.decode (message.getBody ());
177
178    const anna::Millisecond now = anna::functions::millisecond ();
179    const int delay =  now - (Millisecond) response.initTime;
180
181    if (delay > 0 && isOk (response) == true) {
182       a_rxMessageCounter ++;
183       a_avgResponseTime += delay;
184
185       LOGINFORMATION (
186          string msg = anna::functions::asString (
187             "%d %c %d = %d", response.x, response.op, response.y, response.result
188          );
189          msg += anna::functions::asText (" | Delay: ", delay);
190          Logger::information (msg, ANNA_FILE_LOCATION);
191       );
192    }
193    else {
194       LOGWARNING (
195          string msg = anna::functions::asString (
196             "Flip: %d %c %d = %d", response.x, response.op, response.y, response.result
197          );
198          msg += anna::functions::asText (" | Message: ", message.getBody ());
199          msg += anna::functions::asText (" | Delay: ", delay);
200          Logger::warning (msg, ANNA_FILE_LOCATION);
201       );
202    }
203 }
204
205 void MyCommunicator::eventBreakConnection (const Service* service)
206    
207 {
208   LOGMETHOD (TraceMethod tm ("MyCommunicator", "eventBreakConnection", ANNA_FILE_LOCATION));
209
210    if (a_rxMessageCounter == 0)
211       return;
212
213    comm::Communicator::eventBreakConnection (service);
214
215    if (service->isAvailable () == true) {
216       LOGNOTICE (Logger::notice (service->asString (), ANNA_FILE_LOCATION));
217       return;
218    }
219
220    LOGNOTICE (
221       RRClient& app = static_cast <RRClient&> (anna::app::functions::getApp ());   
222       string msg ("Tiempo medio respuesta: ");
223       msg += anna::functions::asString (a_avgResponseTime / a_rxMessageCounter);
224       msg += " ms";
225       msg += anna::functions::asText (" | Rx: ", a_rxMessageCounter);
226       msg += anna::functions::asText (" | Tx: ", app.getSender ()->getTxMessageCounter ());
227       Logger::notice (msg, ANNA_FILE_LOCATION);
228
229       cout << msg << endl << endl;
230    );
231    requestStop ();
232 }
233
234 bool MyCommunicator::isOk (const test::Response& response) 
235    
236 {
237    if (response.op != '+' && response.op != '-' && response.op != '*' && response.op != '/') 
238       return false;
239       
240    int result = 0;
241    
242    switch (response.op) {
243       case '+':
244          result = response.x + response.y;
245          break;
246       case '-':
247          result = response.x - response.y;
248          break;
249       case '*':
250          result = response.x * response.y;
251          break;
252       case '/':
253          result = (response.y != 0) ? (response.x / response.y): 0;
254          break;
255    }
256     
257    return result == response.result;   
258 }
259
260 bool Sender::tick ()
261    noexcept(false)
262 {
263   LOGMETHOD (TraceMethod tm (Logger::Local7, "Sender", "tick", ANNA_FILE_LOCATION));
264
265    Service* service = static_cast <RRClient&> (anna::comm::functions::getApp ()).getService ();
266    Communicator* communicator = anna::app::functions::component <Communicator> (ANNA_FILE_LOCATION);
267
268    if (a_messageBySecond == 0) 
269       throw RuntimeException ("Hay que indicar el numero de mensajes por segundo", ANNA_FILE_LOCATION);
270
271
272    if (a_errorCounter > 100) {
273       communicator->requestStop ();
274       Logger::warning ("Terminado por errores continuos en la conexion", ANNA_FILE_LOCATION);
275       return false;
276    }
277
278    test::Request& request = a_requests.get ();
279
280    for (int n = 0; n < a_messageBySecond && communicator->hasRequestedStop () == false; n ++) {
281       request.op = '+';
282       request.x = rand () % 1000;
283       request.y = rand () % 1000;
284       request.initTime = anna::functions::millisecond ();
285
286       try {
287          service->send (request);
288          a_txMessageCounter ++;
289       }
290       catch (RuntimeException& ex) {
291          a_errorCounter ++;
292          ex.trace ();
293          break;
294       }
295    }
296    
297    return true;
298 }
299