Remove warnings
[anna.git] / example / core / thread / main.cpp
1 // ANNA - Anna is Not Nothingness Anymore                                                         //
2 //                                                                                                //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo                         //
4 //                                                                                                //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite                           //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
7
8
9 #include <iostream>
10
11 #include <anna/core/mt/ThreadManager.hpp>
12 #include <anna/core/util/CommandLine.hpp>
13 #include <anna/core/mt/Mutex.hpp>
14 #include <anna/core/tracing/Logger.hpp>
15 #include <anna/core/mt/Runnable.hpp>
16 #include <anna/core/mt/Thread.hpp>
17 #include <anna/core/tracing/TraceWriter.hpp>
18 #include <anna/core/util/SortedVector.hpp>
19
20 #include <anna/io/functions.hpp>
21
22 using namespace std;
23 using namespace anna;
24
25 class Client : public Runnable {
26 public:
27    Client (const int id, const int value) : Runnable (), a_id (id), a_value (value) {;}
28
29 private:
30    const int a_id;
31    const int a_value;
32
33    void run () throw (RuntimeException);
34
35    anna_complete_runnable (Client);
36 };
37
38 class SOAP {
39 public:
40    static int theMethod (const int id, const int value) throw (RuntimeException);
41 };
42
43 class Receiver;
44
45 class Waiter {
46 public:
47    int getId () const throw () { return a_id; }
48    int getValue () const throw () { return a_value; }
49
50    void setId (const int id) throw () { a_id = id; }
51    void setValue (const int value) throw () { a_value = value; }
52
53    void sendRequest () throw (RuntimeException);
54    int waitResponse () throw (RuntimeException);
55    void notifyResponse (const int result) throw (RuntimeException);   
56
57    void initialize () throw (RuntimeException);
58    
59 private:
60    static const int ReadChannel = 0;
61    static const int WriteChannel = 1;
62    
63    static ThreadManager a_threadManager;
64
65    int a_pipe [2];
66    int a_id;
67    int a_value;
68    int a_result;
69    Receiver* a_receiver;
70 };
71
72 class Context : public SafeRecycler <Waiter> {
73 public:  
74    Waiter* createWaiter (const int id, const int value) throw (RuntimeException);
75    void destroyWaiter (Waiter* waiter) throw (RuntimeException);
76    Waiter* findWaiter (const int id) throw () { return a_waiters.find (id); }
77
78 private:
79    struct SortByID {
80       static int value (const Waiter* waiter) throw () { return waiter->getId (); }
81    };
82
83    typedef SortedVector <Waiter, SortByID, int> waiter_container;
84
85    waiter_container a_waiters;
86 };
87
88 class Receiver : public Runnable {
89 public:
90    Receiver (const int id, const int value) : a_id (id), a_value (value) {;}
91
92 private:
93    int a_id;
94    int a_value;
95
96    void run () throw (RuntimeException);
97
98    anna_complete_runnable (Waiter);
99 };
100
101 Context* g_context = NULL;
102 ThreadManager Waiter::a_threadManager ("WaiterThreadManager");
103
104 int main (const int argc, const char** argv)
105 {
106    CommandLine& ccll = CommandLine::instantiate ();
107    ThreadManager* threadManager;
108    
109    g_context = new Context;
110
111    try {
112       ccll.add ("maxthr", CommandLine::Argument::Mandatory, "Maximum number of threads");
113       ccll.add ("mode", CommandLine::Argument::Mandatory, "Manager instance mode");
114       ccll.add ("nthr", CommandLine::Argument::Mandatory, "Number of threads to create");
115       ccll.add ("trace", CommandLine::Argument::Optional, "Trace level");
116
117       ccll.initialize (argv, argc);
118       ccll.verify ();
119
120       Logger::initialize ("thread", new TraceWriter ("file.trace", 2048000));
121
122       if (ccll.exists ("trace"))
123          Logger::setLevel (Logger::asLevel (ccll.getValue ("trace")));
124          
125       ThreadManager::Mode::_v mode = ThreadManager::Mode::asEnum (ccll.getValue ("mode"));
126
127       if (mode == ThreadManager::Mode::None) {
128          string msg (ccll.getValue ("mode"));
129          msg += " mode not valid (";
130          msg +=  ThreadManager::Mode::asList ();
131          msg += ")";
132          throw RuntimeException (msg, ANNA_FILE_LOCATION);
133       }
134
135       const int maxThread = ccll.getIntegerValue ("maxthr");
136       int nthr = max (2, ccll.getIntegerValue ("nthr"));
137      
138       threadManager = new ThreadManager ("MyManager", mode, maxThread, Thread::Flag::Joinable);
139       Thread* thread = NULL;
140
141       // Primero genera los clientes que generan la primera petición de cáloulo. Todos
142       // terminan llamando al método SOAP::theMethod, que será el que monte la  espera
143       // y "visualize la respuesta"
144       for (int ii = 0, min = 0, max = 10; ii < nthr; ii ++) {
145           
146          Client* client = new Client (ii, min + rand () % max);
147          // Crear los threads Tx 
148          thread = threadManager->createThread ();
149          thread->start (*client);
150
151          min = max;
152          max += 10;
153       }
154       
155       threadManager->join ();
156    }
157    catch (RuntimeException& ex) {
158       cout << ex.asString () << endl << endl;
159       ex.trace ();
160    }
161 }
162
163 // Esto simula el proceso cliente que envía la petición al proceso gSOAP
164 /* Se ejecuta desde el thread Tx */
165 void Client::run () 
166    throw (RuntimeException)
167 {
168    int result = SOAP::theMethod (a_id, a_value);
169
170    LOGDEBUG (
171       string msg (asString ());
172       msg += functions::asText (" | Result: ", result);
173       Logger::debug (msg, ANNA_FILE_LOCATION);
174    );
175 }
176
177 // La implementación del método SOAP debe devolver un valor, pero no puede 
178 // calcular por sí misma la respuesta, sino que tiene que apoyarse en un tercero
179 // que será el que le dé la respuesta "real" en algún momento.
180 /* Tx */
181 int SOAP::theMethod (const int id, const int value)
182    throw (RuntimeException)
183 {
184    int result = -1;
185
186    Waiter* waiter = NULL;
187
188    try {
189       waiter = g_context->createWaiter (id, value);
190       waiter->initialize ();
191       waiter->sendRequest (); 
192       result = waiter->waitResponse ();
193       
194       cout << "Id: " << id << " | Value: " << value << " | Result: " << result << endl << endl;      
195    }
196    catch (RuntimeException& ex) {
197       cout << "Id: " << id << " | Value: " << value << " | " << ex.asString () << endl << endl;
198       ex.trace ();
199    }  
200
201    g_context->destroyWaiter (waiter);  // Sólo lo libera cuando ya no necesita usar sus datos.
202    
203    return result;
204 }
205
206 /* Tx */
207 void Waiter::initialize () 
208    throw (RuntimeException)
209 {
210    if (pipe (a_pipe) == -1) {
211       string msg (functions::asText ("Waiter::initialize | Id: ", a_id));
212       throw RuntimeException (msg, errno, ANNA_FILE_LOCATION); 
213    }     
214 }
215
216 /* Tx */
217 void Waiter::sendRequest () 
218    throw (RuntimeException)
219 {
220    try {
221       /* Ojo que esto generará memory-leaks, pero no es el objeto de éste ejemplo la buena gestión de la memoria */
222       Receiver* receiver = new Receiver (a_id, a_value);   
223       a_threadManager.createThread ()->start (*receiver);         
224    }
225    catch (RuntimeException& ex) {
226       string msg ("Waiter::sendRequest | ");
227       msg += ex.asString ();
228       throw RuntimeException (msg, ANNA_FILE_LOCATION);
229    }
230 }
231
232 /* Tx */
233 int Waiter::waitResponse () 
234    throw (RuntimeException)
235 {
236    bool receive = io::functions::waitInput (a_pipe [ReadChannel], (Millisecond)500);
237    
238    if (receive == false) {      
239       string msg (anna::functions::asText ("Id: ", a_id));
240       msg += anna::functions::asText (" | Value: ", a_value);
241       msg += " | Timeout";            
242       throw RuntimeException (msg, ANNA_FILE_LOCATION);     
243    }
244    
245    // Esta variable se ha establecido en el notifyResponse (Ty)
246    return a_result;
247 }
248
249 /* Ty */
250 void Waiter::notifyResponse (const int result) 
251    throw (RuntimeException)
252 {
253    int byte = 0;
254    
255    a_result = result;
256    
257    ssize_t r = write (a_pipe [WriteChannel], &byte, sizeof (byte));
258 }
259
260 // Modela al thread que recibe la respuesta correspondiente a la petición por la 
261 // que debería haber un Waiter esperando.
262 /* Ty */
263 void Receiver::run () 
264    throw (RuntimeException)
265 {
266    anna::functions::sleep ((Millisecond)(rand () % 1000));
267
268    int result = a_value + a_value;
269
270    // Protege el contexto para evitar condiciones de carrera, como que en el "mismo" instante
271    // en que el receptor decide responder, el waiter en Tx decida que ha terminado de esperar
272    // la respuesta.
273    Guard guard (g_context, "Receiver::run");
274    
275    Waiter* waiter = g_context->findWaiter (a_id);
276
277    // Si el Waiter se cansó de esperar la respuesta ...
278    if (waiter == NULL)
279       return;
280
281    waiter->notifyResponse (result);
282 }
283
284 Waiter* Context::createWaiter (const int id, const int value) 
285    throw (RuntimeException)
286 {
287    Guard guard (*this, "Context::createWaiter");
288    
289    Waiter* result = a_waiters.find (id);
290    
291    if (result != NULL) 
292       throw RuntimeException ("Waiter already used", ANNA_FILE_LOCATION);
293    
294    result = Recycler <Waiter>::create ();
295    
296    result->setId (id);
297    result->setValue (value);
298    
299    a_waiters.add (result);
300    
301    return result;   
302 }
303
304 void Context::destroyWaiter (Waiter* waiter) 
305    throw (RuntimeException)
306 {
307    Guard guard (this, "Context::destroyWaiter");
308
309    // Lo elimina de la lista ordenada
310    a_waiters.erase (waiter);   
311    
312    // Lo marca como disponible para usar
313    Recycler <Waiter>::release (waiter);   
314 }
315