Remove warnings
[anna.git] / example / comm / rServer / main.cpp
1 // ANNA - Anna is Not Nothingness Anymore                                                         //
2 //                                                                                                //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo                         //
4 //                                                                                                //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite                           //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
7
8
9 /*
10   Ejemplo de programa servidor. Atiende peticiones aritmeticas, el protocolo de transporte
11   sera el comm::Transport y el contenido del mensaje sera el resutaldo de un comm::Codec con los
12   (x, y, op) -> El resultado sera estos tres componente mas result.
13   
14   Ejemplo de uso del sistema de receiveres, que son capaces de tratar N peticiones de forma
15   totalmetne simultanea en caso de estar en un entorno MT.
16
17   Para poder probar el sistema de congestion se puede indicar un numero de milisegundos de 
18   retardo aplicados a cada contestacion.
19
20   Los clientes pueden ser: client.p o kclient.p
21 */
22 #include <iostream>
23
24 #include <anna/core/core.hpp>
25 #include <anna/comm/comm.hpp>
26
27 #include <anna/xml/Node.hpp>
28 #include <anna/xml/Attribute.hpp>
29
30 #include <anna/app/functions.hpp>
31
32 #include <anna/test/Request.hpp>
33 #include <anna/test/Response.hpp>
34 #include <anna/test/Communicator.hpp>
35
36 using namespace std;
37 using namespace test;
38
39 class MyCommunicator : public test::Communicator {
40 public:
41    MyCommunicator (Communicator::WorkMode::_v workMode) :  test::Communicator () {;}
42 };
43
44 class MyReceiver : public Receiver {
45 public:
46    virtual ~MyReceiver() {;}
47    static const char* className () throw () { return "MyReceiver"; }
48    
49 private:
50    Request a_request;
51    Response a_response;
52    MyCommunicator* a_communicator;
53    
54    MyReceiver () : Receiver ("MyReceiver") { ; }
55    void initialize () throw (RuntimeException);
56    void apply (comm::ClientSocket &, const Message&) throw (RuntimeException);
57    
58    friend class Allocator <MyReceiver>;      
59 };
60
61 class ArithmeticServer : public comm::Application {
62 public:
63    ArithmeticServer ();
64    ~ArithmeticServer () { delete a_communicator; }
65       
66 private:
67    MyCommunicator* a_communicator;
68    ReceiverFactoryImpl <MyReceiver> a_receiverFactory;
69    comm::ServerSocket* a_serverSocket;
70
71    void initialize () throw (RuntimeException);
72    void run () throw (RuntimeException);
73    xml::Node* asXML (xml::Node* app) const throw ();
74    void signalTerminate () throw (RuntimeException);
75 };
76
77 using namespace std;
78 using namespace anna::comm;
79
80 int main (int argc, const char** argv)
81 {
82    CommandLine& commandLine (CommandLine::instantiate ());
83    ArithmeticServer app;
84
85    srand (time (NULL));
86    
87    try {
88       commandLine.initialize (argv, argc);
89       commandLine.verify ();
90
91       Logger::setLevel (Logger::Debug); 
92       string traceFile ("server.");
93       traceFile += anna::functions::asString ((int) getpid ());
94       traceFile += ".trace";      
95       Logger::initialize ("arithmeticServer", new TraceWriter (traceFile.c_str (),4096000));
96  
97       app.start ();
98    }
99    catch (Exception& ex) {
100       cout << ex.asString () << endl;
101    }
102    
103    return 0;
104 }
105
106 ArithmeticServer::ArithmeticServer () : 
107    Application ("arithmeticServer", "Servidor de operaciones (iRS)", "1.0"),
108    a_communicator (NULL)
109 {
110    CommandLine& commandLine (CommandLine::instantiate ());
111       
112    commandLine.add ("p", CommandLine::Argument::Mandatory, "Puerto en el que atender peticiones");
113    commandLine.add ("a", CommandLine::Argument::Mandatory, "Direccin IP en la que atender");
114    commandLine.add ("d", CommandLine::Argument::Mandatory, "Retardo aplicado a la contestacio");
115    commandLine.add ("maxpending", CommandLine::Argument::Optional, "Numero maximo de bytes en la cola de entrada");
116    commandLine.add ("limit", CommandLine::Argument::Mandatory, "% de ocupacion que permitimos");
117    commandLine.add ("r", CommandLine::Argument::Optional, "Indicador de reuso de direccion", false);
118    commandLine.add ("n", CommandLine::Argument::Optional, "Numero de mensajes a servir", true);
119    commandLine.add ("trace", CommandLine::Argument::Optional, "Nivel de trazas (debug,warning, error,...)");
120    commandLine.add ("clone", CommandLine::Argument::Optional, "Aplica el metodo de clonado en el tratamiento de mensajes", false);
121    commandLine.add ("chunksize", CommandLine::Argument::Optional, "Tamano del chunk de lectura");
122 }
123
124 //-----------------------------------------------------------------------------------------
125 // Inicializa el servidor de sockets.
126 //-----------------------------------------------------------------------------------------
127 void ArithmeticServer::initialize () 
128    throw (RuntimeException)
129 {
130    LOGMETHOD (TraceMethod tm ("ArithmeticServer", "initialize", ANNA_FILE_LOCATION));
131
132    CommandLine& cl (CommandLine::instantiate ());
133
134    int port = cl.getIntegerValue ("p");
135    const comm::Device* device = Network::instantiate ().find (Device::asAddress (cl.getValue ("a")));
136
137    if (cl.exists ("trace"))
138       Logger::setLevel (Logger::asLevel (cl.getValue ("trace")));
139
140    if (cl.exists ("chunksize"))
141       comm::Communicator::setReceivingChunkSize (cl.getIntegerValue ("chunksize"));
142
143    a_serverSocket = new ServerSocket (INetAddress (device, port), cl.exists ("r"));
144    a_serverSocket->setCategory (777);
145    a_serverSocket->setReceiverFactory (a_receiverFactory);
146
147    comm::Communicator::WorkMode::_v workMode = (cl.exists ("clone")) ? comm::Communicator::WorkMode::Clone: comm::Communicator::WorkMode::Single;
148
149    a_communicator = new MyCommunicator (workMode);
150 }
151
152 //-----------------------------------------------------------------------------------------
153 // Atiende las peticiones.
154 // Cuando hay un nuevo mensaje invocar�a Communicator::eventReceiveMessage
155 //-----------------------------------------------------------------------------------------
156 void ArithmeticServer::run ()
157    throw (RuntimeException)
158 {
159    LOGMETHOD (TraceMethod tm ("ArithmeticServer", "run", ANNA_FILE_LOCATION));
160
161    CommandLine& cl (CommandLine::instantiate ());
162
163    a_communicator->attach (a_serverSocket);
164    a_communicator->setDelay ((Millisecond)cl.getIntegerValue ("d"));
165
166    if (cl.exists ("n") == true)
167       a_communicator->setMaxMessage (cl.getIntegerValue ("n"));
168
169    CongestionController& ccgg = CongestionController::instantiate ();
170
171    ccgg.setLimit (cl.getIntegerValue ("limit"));
172
173    if (cl.exists ("maxpending"))
174       ccgg.setMaxPendingBytes (cl.getIntegerValue ("maxpending"));
175
176    a_communicator->accept ();
177 }
178
179 xml::Node* ArithmeticServer::asXML (xml::Node* app) const 
180    throw ()
181 {
182    xml::Node* node = app::Application::asXML (app);
183    
184    node->createAttribute ("MaxMessage", a_communicator->getMaxMessage ());
185    node->createAttribute ("Message", a_communicator->getMessage ());
186    
187    return node;
188 }
189
190 void ArithmeticServer::signalTerminate () 
191    throw (RuntimeException)
192 {
193    a_communicator->terminate ();
194    comm::Application::signalTerminate ();
195 }
196
197 void MyReceiver::initialize ()
198    throw (RuntimeException)
199 {
200    a_communicator = app::functions::component <MyCommunicator> (ANNA_FILE_LOCATION);
201 }
202
203 void MyReceiver::apply (ClientSocket& clientSocket, const Message& message) 
204    throw (RuntimeException)
205 {
206    LOGMETHOD (TraceMethod tm ("MyReceiver", "apply", ANNA_FILE_LOCATION));
207
208    if (a_communicator->canContinue (clientSocket) == false)
209       return;      
210
211    a_request.decode (message.getBody ());
212
213    a_communicator->delay ();
214
215    a_response.x = a_request.x;
216    a_response.y = a_request.y;
217    a_response.initTime = a_request.initTime;
218
219    switch (a_response.op = a_request.op) {
220       case '+':
221          a_response.result = a_request.x + a_request.y;
222          break;
223       case '-':
224          a_response.result = a_request.x - a_request.y;
225          break;
226       case '*':
227          a_response.result = a_request.x * a_request.y;
228          break;
229       case '/':
230          a_response.result = (a_request.y != 0) ? (a_request.x / a_request.y): 0;
231          break;
232    }
233
234    LOGINFORMATION (
235       string msg = anna::functions::asString ("%d %c %d = %d", a_request.x, a_request.op, a_request.y, a_response.result);
236       msg += anna::functions::asText (" | InitTime: ",  a_response.initTime);
237       Logger::information (msg, ANNA_FILE_LOCATION);
238    )
239
240    try {
241       clientSocket.send (a_response);
242    }
243    catch (Exception& ex) {
244       ex.trace ();
245    }
246 }