b140daa9e42315395f08965051310ecfbcc1acbc
[anna.git] / example / comm / rServer / main.cpp
1 // ANNA - Anna is Not Nothingness Anymore
2 //
3 // (c) Copyright 2005-2014 Eduardo Ramos Testillano & Francisco Ruiz Rayo
4 //
5 // http://redmine.teslayout.com/projects/anna-suite
6 //
7 // Redistribution and use in source and binary forms, with or without
8 // modification, are permitted provided that the following conditions
9 // are met:
10 //
11 //     * Redistributions of source code must retain the above copyright
12 // notice, this list of conditions and the following disclaimer.
13 //     * Redistributions in binary form must reproduce the above
14 // copyright notice, this list of conditions and the following disclaimer
15 // in the documentation and/or other materials provided with the
16 // distribution.
17 //     *  Neither the name of the copyright holder nor the names of its
18 // contributors may be used to endorse or promote products derived from
19 // this software without specific prior written permission.
20 //
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 //
33 // Authors: eduardo.ramos.testillano@gmail.com
34 //          cisco.tierra@gmail.com
35
36
37 /*
38   Ejemplo de programa servidor. Atiende peticiones aritmeticas, el protocolo de transporte
39   sera el comm::Transport y el contenido del mensaje sera el resutaldo de un comm::Codec con los
40   (x, y, op) -> El resultado sera estos tres componente mas result.
41   
42   Ejemplo de uso del sistema de receiveres, que son capaces de tratar N peticiones de forma
43   totalmetne simultanea en caso de estar en un entorno MT.
44
45   Para poder probar el sistema de congestion se puede indicar un numero de milisegundos de 
46   retardo aplicados a cada contestacion.
47
48   Los clientes pueden ser: client.p o kclient.p
49 */
50 #include <iostream>
51
52 #include <anna/core/core.hpp>
53 #include <anna/comm/comm.hpp>
54
55 #include <anna/xml/Node.hpp>
56 #include <anna/xml/Attribute.hpp>
57
58 #include <anna/app/functions.hpp>
59
60 #include <anna/test/Request.hpp>
61 #include <anna/test/Response.hpp>
62 #include <anna/test/Communicator.hpp>
63
64 using namespace std;
65 using namespace test;
66
67 class MyCommunicator : public test::Communicator {
68 public:
69    MyCommunicator (Communicator::WorkMode::_v workMode) :  test::Communicator () {;}
70 };
71
72 class MyReceiver : public Receiver {
73 public:
74    static const char* className () throw () { return "MyReceiver"; }
75    
76 private:
77    Request a_request;
78    Response a_response;
79    MyCommunicator* a_communicator;
80    
81    MyReceiver () : Receiver ("MyReceiver") { ; }
82    void initialize () throw (RuntimeException);
83    void apply (comm::ClientSocket &, const Message&) throw (RuntimeException);
84    
85    friend class Allocator <MyReceiver>;      
86 };
87
88 class ArithmeticServer : public comm::Application {
89 public:
90    ArithmeticServer ();
91    ~ArithmeticServer () { delete a_communicator; }
92       
93 private:
94    MyCommunicator* a_communicator;
95    ReceiverFactoryImpl <MyReceiver> a_receiverFactory;
96    comm::ServerSocket* a_serverSocket;
97
98    void initialize () throw (RuntimeException);
99    void run () throw (RuntimeException);
100    xml::Node* asXML (xml::Node* app) const throw ();
101    void signalTerminate () throw (RuntimeException);
102 };
103
104 using namespace std;
105 using namespace anna::comm;
106
107 int main (int argc, const char** argv)
108 {
109    CommandLine& commandLine (CommandLine::instantiate ());
110    ArithmeticServer app;
111
112    srand (time (NULL));
113    
114    try {
115       commandLine.initialize (argv, argc);
116       commandLine.verify ();
117
118       Logger::setLevel (Logger::Debug); 
119       string traceFile ("server.");
120       traceFile += anna::functions::asString ((int) getpid ());
121       traceFile += ".trace";      
122       Logger::initialize ("arithmeticServer", new TraceWriter (traceFile.c_str (),4096000));
123  
124       app.start ();
125    }
126    catch (Exception& ex) {
127       cout << ex.asString () << endl;
128    }
129    
130    return 0;
131 }
132
133 ArithmeticServer::ArithmeticServer () : 
134    Application ("arithmeticServer", "Servidor de operaciones (iRS)", "1.0"),
135    a_communicator (NULL)
136 {
137    CommandLine& commandLine (CommandLine::instantiate ());
138       
139    commandLine.add ("p", CommandLine::Argument::Mandatory, "Puerto en el que atender peticiones");
140    commandLine.add ("a", CommandLine::Argument::Mandatory, "Direccin IP en la que atender");
141    commandLine.add ("d", CommandLine::Argument::Mandatory, "Retardo aplicado a la contestacio");
142    commandLine.add ("maxpending", CommandLine::Argument::Optional, "Nº máximo de bytes en la cola de entrada");
143    commandLine.add ("limit", CommandLine::Argument::Mandatory, "% de ocupacion que permitimos");
144    commandLine.add ("r", CommandLine::Argument::Optional, "Indicador de reuso de direccin", false);
145    commandLine.add ("n", CommandLine::Argument::Optional, "Numero de mensajes a servir", true);
146    commandLine.add ("trace", CommandLine::Argument::Optional, "Nivel de trazas (debug,warning, error,...)");
147    commandLine.add ("clone", CommandLine::Argument::Optional, "Aplica el metodo de clonado en el tratamiento de mensajes", false);
148    commandLine.add ("chunksize", CommandLine::Argument::Optional, "Tamaño del chunk de lectura");
149 }
150
151 //-----------------------------------------------------------------------------------------
152 // Inicializa el servidor de sockets.
153 //-----------------------------------------------------------------------------------------
154 void ArithmeticServer::initialize () 
155    throw (RuntimeException)
156 {
157    LOGMETHOD (TraceMethod tm ("ArithmeticServer", "initialize", ANNA_FILE_LOCATION));
158
159    CommandLine& cl (CommandLine::instantiate ());
160
161    int port = cl.getIntegerValue ("p");
162    const comm::Device* device = Network::instantiate ().find (Device::asAddress (cl.getValue ("a")));
163
164    if (cl.exists ("trace"))
165       Logger::setLevel (Logger::asLevel (cl.getValue ("trace")));
166
167    if (cl.exists ("chunksize"))
168       comm::Communicator::setReceivingChunkSize (cl.getIntegerValue ("chunksize"));
169
170    a_serverSocket = new ServerSocket (INetAddress (device, port), cl.exists ("r"));
171    a_serverSocket->setCategory (777);
172    a_serverSocket->setReceiverFactory (a_receiverFactory);
173
174    comm::Communicator::WorkMode::_v workMode = (cl.exists ("clone")) ? comm::Communicator::WorkMode::Clone: comm::Communicator::WorkMode::Single;
175
176    a_communicator = new MyCommunicator (workMode);
177 }
178
179 //-----------------------------------------------------------------------------------------
180 // Atiende las peticiones.
181 // Cuando hay un nuevo mensaje invocar�a Communicator::eventReceiveMessage
182 //-----------------------------------------------------------------------------------------
183 void ArithmeticServer::run ()
184    throw (RuntimeException)
185 {
186    LOGMETHOD (TraceMethod tm ("ArithmeticServer", "run", ANNA_FILE_LOCATION));
187
188    CommandLine& cl (CommandLine::instantiate ());
189
190    a_communicator->attach (a_serverSocket);
191    a_communicator->setDelay ((Millisecond)cl.getIntegerValue ("d"));
192
193    if (cl.exists ("n") == true)
194       a_communicator->setMaxMessage (cl.getIntegerValue ("n"));
195
196    CongestionController& ccgg = CongestionController::instantiate ();
197
198    ccgg.setLimit (cl.getIntegerValue ("limit"));
199
200    if (cl.exists ("maxpending"))
201       ccgg.setMaxPendingBytes (cl.getIntegerValue ("maxpending"));
202
203    a_communicator->accept ();
204 }
205
206 xml::Node* ArithmeticServer::asXML (xml::Node* app) const 
207    throw ()
208 {
209    xml::Node* node = app::Application::asXML (app);
210    
211    node->createAttribute ("MaxMessage", a_communicator->getMaxMessage ());
212    node->createAttribute ("Message", a_communicator->getMessage ());
213    
214    return node;
215 }
216
217 void ArithmeticServer::signalTerminate () 
218    throw (RuntimeException)
219 {
220    a_communicator->terminate ();
221    comm::Application::signalTerminate ();
222 }
223
224 void MyReceiver::initialize ()
225    throw (RuntimeException)
226 {
227    a_communicator = app::functions::component <MyCommunicator> (ANNA_FILE_LOCATION);
228 }
229
230 void MyReceiver::apply (ClientSocket& clientSocket, const Message& message) 
231    throw (RuntimeException)
232 {
233    LOGMETHOD (TraceMethod tm ("MyReceiver", "apply", ANNA_FILE_LOCATION));
234
235    if (a_communicator->canContinue (clientSocket) == false)
236       return;      
237
238    a_request.decode (message.getBody ());
239
240    a_communicator->delay ();
241
242    a_response.x = a_request.x;
243    a_response.y = a_request.y;
244    a_response.initTime = a_request.initTime;
245
246    switch (a_response.op = a_request.op) {
247       case '+':
248          a_response.result = a_request.x + a_request.y;
249          break;
250       case '-':
251          a_response.result = a_request.x - a_request.y;
252          break;
253       case '*':
254          a_response.result = a_request.x * a_request.y;
255          break;
256       case '/':
257          a_response.result = (a_request.y != 0) ? (a_request.x / a_request.y): 0;
258          break;
259    }
260
261    LOGINFORMATION (
262       string msg = anna::functions::asString ("%d %c %d = %d", a_request.x, a_request.op, a_request.y, a_response.result);
263       msg += anna::functions::asText (" | InitTime: ",  a_response.initTime);
264       Logger::information (msg, ANNA_FILE_LOCATION);
265    )
266
267    try {
268       clientSocket.send (a_response);
269    }
270    catch (Exception& ex) {
271       ex.trace ();
272    }
273 }