First commit
[anna.git] / example / comm / rrClient / main.cpp
1 // ANNA - Anna is Not 'N' Anymore
2 //
3 // (c) Copyright 2005-2014 Eduardo Ramos Testillano & Francisco Ruiz Rayo
4 //
5 // https://bitbucket.org/testillano/anna
6 //
7 // Redistribution and use in source and binary forms, with or without
8 // modification, are permitted provided that the following conditions
9 // are met:
10 //
11 //     * Redistributions of source code must retain the above copyright
12 // notice, this list of conditions and the following disclaimer.
13 //     * Redistributions in binary form must reproduce the above
14 // copyright notice, this list of conditions and the following disclaimer
15 // in the documentation and/or other materials provided with the
16 // distribution.
17 //     * Neither the name of Google Inc. nor the names of its
18 // contributors may be used to endorse or promote products derived from
19 // this software without specific prior written permission.
20 //
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 //
33 // Authors: eduardo.ramos.testillano@gmail.com
34 //          cisco.tierra@gmail.com
35
36
37 /**
38    Realiza peticiones automaticas sobre el servidor de operaciones aritmeticas.
39    
40    Mediante un servicio de reparto por RoundRobin.
41
42    El servidor de este cliente: server.p rserver.p 
43 */
44
45 #include <iostream>
46
47 #include <string.h>
48 #include <unistd.h>
49
50 #include <anna/core/core.hpp>
51 #include <anna/app/functions.hpp>
52 #include <anna/comm/comm.hpp>
53
54 #include <anna/timex/Engine.hpp>
55 #include <anna/timex/Clock.hpp>
56
57 #include <anna/test/Response.hpp>
58 #include <anna/test/Request.hpp>
59
60 class Sender : public timex::Clock {
61 public:
62    Sender () : Clock ("Sender", (Millisecond)1000), 
63       a_messageBySecond (0), 
64       a_nquarter (0), 
65       a_requests ("Request"), 
66       a_errorCounter (0),
67       a_txMessageCounter (0)
68    {;}
69
70    void setMessageBySecond (const int messageBySecond) throw () { a_messageBySecond = messageBySecond; }
71
72    int getTxMessageCounter () const throw () { return a_txMessageCounter; }
73
74 private:
75    int a_messageBySecond;
76    int a_nquarter;
77    int a_errorCounter;
78    int a_txMessageCounter;
79    ThreadData <test::Request> a_requests;
80
81    bool tick () throw (RuntimeException);
82 };
83
84 class MyCommunicator : public Communicator {
85 public:
86    MyCommunicator () : Communicator (), a_avgResponseTime (0), a_rxMessageCounter (0), a_responses ("Response")  {;}
87
88 private:
89    ThreadData <test::Response> a_responses;
90    int a_avgResponseTime;
91    int a_rxMessageCounter;
92
93    void eventReceiveMessage (ClientSocket &, const Message&) throw (RuntimeException);
94 /*
95    void eventBreakConnection (const ClientSocket&) throw ();
96
97    void eventBreakConnection (Server* server) throw () {
98       comm::Communicator::eventBreakConnection (server);
99    }
100 */
101    void eventBreakConnection (const Service* service) throw ();
102    
103    static bool isOk (const test::Response& response) throw ();
104 };
105
106 class RRClient : public anna::comm::Application {
107 public:
108    RRClient ();
109
110    Service* getService () const throw () { return a_service; }
111    const Sender* getSender () const throw () { return &a_sender; }
112
113 private:
114    MyCommunicator a_communicator;
115    timex::Engine a_timeController;
116    Sender a_sender;
117    Service* a_service;
118
119    void initialize () throw (RuntimeException);
120    void run () throw (RuntimeException);
121 };
122
123 using namespace std;
124
125 int main (int argc, const char** argv)
126 {
127    CommandLine& commandLine (CommandLine::instantiate ());
128    RRClient app;
129
130    srand (time (NULL));
131
132    try {
133       commandLine.initialize (argv, argc);
134       commandLine.verify ();
135
136       Logger::setLevel (Logger::Information);
137       string traceFile ("client.");
138       traceFile += anna::functions::asString ((int) getpid ());
139       traceFile += ".trace";
140       Logger::initialize ("arithmeticClient", new TraceWriter (traceFile.c_str (),4096000));
141
142       app.start ();
143    }
144    catch (Exception& ex) {
145       cout << ex.asString () << endl;
146    }
147
148    return 0;
149 }
150
151 RRClient::RRClient () :
152    Application ("arithmeticClient", "Cliente de operaciones aritmeticas", "1.0"),
153    a_communicator (),
154    a_timeController ((Millisecond)1000, (Millisecond)250)
155 {
156    CommandLine& commandLine (CommandLine::instantiate ());
157
158    commandLine.add ("p", CommandLine::Argument::Mandatory, "Puertos de los servidores (separados por comas)");
159    commandLine.add ("a", CommandLine::Argument::Mandatory, "Direccin IP Puerto en el que el servidor atiende respuestas.");
160    commandLine.add ("n", CommandLine::Argument::Mandatory, "Numero de mensajes por segundo");
161    commandLine.add ("trace", CommandLine::Argument::Optional, "Nivel de trazas (debug,warning, error,...)");
162 }
163
164 void RRClient::initialize ()
165    throw (RuntimeException)
166 {
167    CommandLine& cl (CommandLine::instantiate ());
168
169    if (cl.exists ("trace"))
170       Logger::setLevel (Logger::asLevel (cl.getValue ("trace")));
171
172    a_sender.setMessageBySecond (cl.getIntegerValue ("n"));
173       
174    Network& network = Network::instantiate ();
175    Tokenizer ports (cl.getValue ("p"), ",");
176    int port;
177
178    a_service = new comm::RoundRobinDelivery ("Service_Arithmetic", true);
179
180    for (Tokenizer::const_iterator ii = ports.begin (), maxii = ports.end (); ii != maxii; ii ++) {
181       port = atoi (Tokenizer::data (ii));
182       a_service->attach (network.createServer (cl.getValue ("a"), port, true));
183    }
184
185    a_communicator.attach (a_service);      
186 }
187
188 void RRClient::run ()
189    throw (RuntimeException)
190 {
191    a_timeController.activate (a_sender);
192
193    a_communicator.accept ();
194 }
195
196 void MyCommunicator::eventReceiveMessage (ClientSocket&, const Message& message)
197    throw (RuntimeException)
198 {
199   LOGMETHOD (TraceMethod tm ("MyCommunicator", "eventReceiveMessage", ANNA_FILE_LOCATION));
200
201    test::Response& response = a_responses.get ();
202    response.decode (message.getBody ());
203
204    const anna::Millisecond now = anna::functions::millisecond ();
205    const int delay =  now - (Millisecond) response.initTime;
206
207    if (delay > 0 && isOk (response) == true) {
208       a_rxMessageCounter ++;
209       a_avgResponseTime += delay;
210
211       LOGINFORMATION (
212          string msg = anna::functions::asString (
213             "%d %c %d = %d", response.x, response.op, response.y, response.result
214          );
215          msg += anna::functions::asText (" | Delay: ", delay);
216          Logger::information (msg, ANNA_FILE_LOCATION);
217       );
218    }
219    else {
220       LOGWARNING (
221          string msg = anna::functions::asString (
222             "Flip: %d %c %d = %d", response.x, response.op, response.y, response.result
223          );
224          msg += anna::functions::asText (" | Message: ", message.getBody ());
225          msg += anna::functions::asText (" | Delay: ", delay);
226          Logger::warning (msg, ANNA_FILE_LOCATION);
227       );
228    }
229 }
230
231 void MyCommunicator::eventBreakConnection (const Service* service)
232    throw ()
233 {
234   LOGMETHOD (TraceMethod tm ("MyCommunicator", "eventBreakConnection", ANNA_FILE_LOCATION));
235
236    if (a_rxMessageCounter == 0)
237       return;
238
239    comm::Communicator::eventBreakConnection (service);
240
241    if (service->isAvailable () == true) {
242       LOGNOTICE (Logger::notice (service->asString (), ANNA_FILE_LOCATION));
243       return;
244    }
245
246    LOGNOTICE (
247       RRClient& app = static_cast <RRClient&> (anna::app::functions::getApp ());   
248       string msg ("Tiempo medio respuesta: ");
249       msg += anna::functions::asString (a_avgResponseTime / a_rxMessageCounter);
250       msg += " ms";
251       msg += anna::functions::asText (" | Rx: ", a_rxMessageCounter);
252       msg += anna::functions::asText (" | Tx: ", app.getSender ()->getTxMessageCounter ());
253       Logger::notice (msg, ANNA_FILE_LOCATION);
254
255       cout << msg << endl << endl;
256    );
257    requestStop ();
258 }
259
260 bool MyCommunicator::isOk (const test::Response& response) 
261    throw ()
262 {
263    if (response.op != '+' && response.op != '-' && response.op != '*' && response.op != '/') 
264       return false;
265       
266    int result = 0;
267    
268    switch (response.op) {
269       case '+':
270          result = response.x + response.y;
271          break;
272       case '-':
273          result = response.x - response.y;
274          break;
275       case '*':
276          result = response.x * response.y;
277          break;
278       case '/':
279          result = (response.y != 0) ? (response.x / response.y): 0;
280          break;
281    }
282     
283    return result == response.result;   
284 }
285
286 bool Sender::tick ()
287    throw (RuntimeException)
288 {
289   LOGMETHOD (TraceMethod tm (Logger::Local7, "Sender", "tick", ANNA_FILE_LOCATION));
290
291    Service* service = static_cast <RRClient&> (anna::comm::functions::getApp ()).getService ();
292    Communicator* communicator = anna::app::functions::component <Communicator> (ANNA_FILE_LOCATION);
293
294    if (a_messageBySecond == 0) 
295       throw RuntimeException ("Hay que indicar el nÂș de mensajes por segundo", ANNA_FILE_LOCATION);
296
297
298    if (a_errorCounter > 100) {
299       communicator->requestStop ();
300       Logger::warning ("Terminado por errores continuos en la conexion", ANNA_FILE_LOCATION);
301       return false;
302    }
303
304    test::Request& request = a_requests.get ();
305
306    for (int n = 0; n < a_messageBySecond && communicator->hasRequestedStop () == false; n ++) {
307       request.op = '+';
308       request.x = rand () % 1000;
309       request.y = rand () % 1000;
310       request.initTime = anna::functions::millisecond ();
311
312       try {
313          service->send (request);
314          a_txMessageCounter ++;
315       }
316       catch (RuntimeException& ex) {
317          a_errorCounter ++;
318          ex.trace ();
319          break;
320       }
321    }
322    
323    return true;
324 }
325