be0d4ffb8cc2f77ef3cb63cc55586247818dd0e1
[anna.git] / example / comm / rServer / main.cpp
1 // ANNA - Anna is Not Nothingness Anymore                                                         //
2 //                                                                                                //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo                         //
4 //                                                                                                //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite                           //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
7
8
9 /*
10   Ejemplo de programa servidor. Atiende peticiones aritmeticas, el protocolo de transporte
11   sera el comm::Transport y el contenido del mensaje sera el resutaldo de un comm::Codec con los
12   (x, y, op) -> El resultado sera estos tres componente mas result.
13   
14   Ejemplo de uso del sistema de receiveres, que son capaces de tratar N peticiones de forma
15   totalmetne simultanea en caso de estar en un entorno MT.
16
17   Para poder probar el sistema de congestion se puede indicar un numero de milisegundos de 
18   retardo aplicados a cada contestacion.
19
20   Los clientes pueden ser: client.p o kclient.p
21 */
22 #include <iostream>
23
24 #include <anna/core/core.hpp>
25 #include <anna/comm/comm.hpp>
26
27 #include <anna/xml/Node.hpp>
28 #include <anna/xml/Attribute.hpp>
29
30 #include <anna/app/functions.hpp>
31
32 #include <anna/test/Request.hpp>
33 #include <anna/test/Response.hpp>
34 #include <anna/test/Communicator.hpp>
35
36 using namespace std;
37 using namespace test;
38
39 class MyCommunicator : public test::Communicator {
40 public:
41    MyCommunicator (Communicator::WorkMode::_v workMode) :  test::Communicator () {;}
42 };
43
44 class MyReceiver : public Receiver {
45 public:
46    static const char* className () throw () { return "MyReceiver"; }
47    
48 private:
49    Request a_request;
50    Response a_response;
51    MyCommunicator* a_communicator;
52    
53    MyReceiver () : Receiver ("MyReceiver") { ; }
54    void initialize () throw (RuntimeException);
55    void apply (comm::ClientSocket &, const Message&) throw (RuntimeException);
56    
57    friend class Allocator <MyReceiver>;      
58 };
59
60 class ArithmeticServer : public comm::Application {
61 public:
62    ArithmeticServer ();
63    ~ArithmeticServer () { delete a_communicator; }
64       
65 private:
66    MyCommunicator* a_communicator;
67    ReceiverFactoryImpl <MyReceiver> a_receiverFactory;
68    comm::ServerSocket* a_serverSocket;
69
70    void initialize () throw (RuntimeException);
71    void run () throw (RuntimeException);
72    xml::Node* asXML (xml::Node* app) const throw ();
73    void signalTerminate () throw (RuntimeException);
74 };
75
76 using namespace std;
77 using namespace anna::comm;
78
79 int main (int argc, const char** argv)
80 {
81    CommandLine& commandLine (CommandLine::instantiate ());
82    ArithmeticServer app;
83
84    srand (time (NULL));
85    
86    try {
87       commandLine.initialize (argv, argc);
88       commandLine.verify ();
89
90       Logger::setLevel (Logger::Debug); 
91       string traceFile ("server.");
92       traceFile += anna::functions::asString ((int) getpid ());
93       traceFile += ".trace";      
94       Logger::initialize ("arithmeticServer", new TraceWriter (traceFile.c_str (),4096000));
95  
96       app.start ();
97    }
98    catch (Exception& ex) {
99       cout << ex.asString () << endl;
100    }
101    
102    return 0;
103 }
104
105 ArithmeticServer::ArithmeticServer () : 
106    Application ("arithmeticServer", "Servidor de operaciones (iRS)", "1.0"),
107    a_communicator (NULL)
108 {
109    CommandLine& commandLine (CommandLine::instantiate ());
110       
111    commandLine.add ("p", CommandLine::Argument::Mandatory, "Puerto en el que atender peticiones");
112    commandLine.add ("a", CommandLine::Argument::Mandatory, "Direccin IP en la que atender");
113    commandLine.add ("d", CommandLine::Argument::Mandatory, "Retardo aplicado a la contestacio");
114    commandLine.add ("maxpending", CommandLine::Argument::Optional, "Numero maximo de bytes en la cola de entrada");
115    commandLine.add ("limit", CommandLine::Argument::Mandatory, "% de ocupacion que permitimos");
116    commandLine.add ("r", CommandLine::Argument::Optional, "Indicador de reuso de direccion", false);
117    commandLine.add ("n", CommandLine::Argument::Optional, "Numero de mensajes a servir", true);
118    commandLine.add ("trace", CommandLine::Argument::Optional, "Nivel de trazas (debug,warning, error,...)");
119    commandLine.add ("clone", CommandLine::Argument::Optional, "Aplica el metodo de clonado en el tratamiento de mensajes", false);
120    commandLine.add ("chunksize", CommandLine::Argument::Optional, "Tamano del chunk de lectura");
121 }
122
123 //-----------------------------------------------------------------------------------------
124 // Inicializa el servidor de sockets.
125 //-----------------------------------------------------------------------------------------
126 void ArithmeticServer::initialize () 
127    throw (RuntimeException)
128 {
129    LOGMETHOD (TraceMethod tm ("ArithmeticServer", "initialize", ANNA_FILE_LOCATION));
130
131    CommandLine& cl (CommandLine::instantiate ());
132
133    int port = cl.getIntegerValue ("p");
134    const comm::Device* device = Network::instantiate ().find (Device::asAddress (cl.getValue ("a")));
135
136    if (cl.exists ("trace"))
137       Logger::setLevel (Logger::asLevel (cl.getValue ("trace")));
138
139    if (cl.exists ("chunksize"))
140       comm::Communicator::setReceivingChunkSize (cl.getIntegerValue ("chunksize"));
141
142    a_serverSocket = new ServerSocket (INetAddress (device, port), cl.exists ("r"));
143    a_serverSocket->setCategory (777);
144    a_serverSocket->setReceiverFactory (a_receiverFactory);
145
146    comm::Communicator::WorkMode::_v workMode = (cl.exists ("clone")) ? comm::Communicator::WorkMode::Clone: comm::Communicator::WorkMode::Single;
147
148    a_communicator = new MyCommunicator (workMode);
149 }
150
151 //-----------------------------------------------------------------------------------------
152 // Atiende las peticiones.
153 // Cuando hay un nuevo mensaje invocar�a Communicator::eventReceiveMessage
154 //-----------------------------------------------------------------------------------------
155 void ArithmeticServer::run ()
156    throw (RuntimeException)
157 {
158    LOGMETHOD (TraceMethod tm ("ArithmeticServer", "run", ANNA_FILE_LOCATION));
159
160    CommandLine& cl (CommandLine::instantiate ());
161
162    a_communicator->attach (a_serverSocket);
163    a_communicator->setDelay ((Millisecond)cl.getIntegerValue ("d"));
164
165    if (cl.exists ("n") == true)
166       a_communicator->setMaxMessage (cl.getIntegerValue ("n"));
167
168    CongestionController& ccgg = CongestionController::instantiate ();
169
170    ccgg.setLimit (cl.getIntegerValue ("limit"));
171
172    if (cl.exists ("maxpending"))
173       ccgg.setMaxPendingBytes (cl.getIntegerValue ("maxpending"));
174
175    a_communicator->accept ();
176 }
177
178 xml::Node* ArithmeticServer::asXML (xml::Node* app) const 
179    throw ()
180 {
181    xml::Node* node = app::Application::asXML (app);
182    
183    node->createAttribute ("MaxMessage", a_communicator->getMaxMessage ());
184    node->createAttribute ("Message", a_communicator->getMessage ());
185    
186    return node;
187 }
188
189 void ArithmeticServer::signalTerminate () 
190    throw (RuntimeException)
191 {
192    a_communicator->terminate ();
193    comm::Application::signalTerminate ();
194 }
195
196 void MyReceiver::initialize ()
197    throw (RuntimeException)
198 {
199    a_communicator = app::functions::component <MyCommunicator> (ANNA_FILE_LOCATION);
200 }
201
202 void MyReceiver::apply (ClientSocket& clientSocket, const Message& message) 
203    throw (RuntimeException)
204 {
205    LOGMETHOD (TraceMethod tm ("MyReceiver", "apply", ANNA_FILE_LOCATION));
206
207    if (a_communicator->canContinue (clientSocket) == false)
208       return;      
209
210    a_request.decode (message.getBody ());
211
212    a_communicator->delay ();
213
214    a_response.x = a_request.x;
215    a_response.y = a_request.y;
216    a_response.initTime = a_request.initTime;
217
218    switch (a_response.op = a_request.op) {
219       case '+':
220          a_response.result = a_request.x + a_request.y;
221          break;
222       case '-':
223          a_response.result = a_request.x - a_request.y;
224          break;
225       case '*':
226          a_response.result = a_request.x * a_request.y;
227          break;
228       case '/':
229          a_response.result = (a_request.y != 0) ? (a_request.x / a_request.y): 0;
230          break;
231    }
232
233    LOGINFORMATION (
234       string msg = anna::functions::asString ("%d %c %d = %d", a_request.x, a_request.op, a_request.y, a_response.result);
235       msg += anna::functions::asText (" | InitTime: ",  a_response.initTime);
236       Logger::information (msg, ANNA_FILE_LOCATION);
237    )
238
239    try {
240       clientSocket.send (a_response);
241    }
242    catch (Exception& ex) {
243       ex.trace ();
244    }
245 }