First commit
[anna.git] / example / core / thread / main.cpp
1 // ANNA - Anna is Not 'N' Anymore
2 //
3 // (c) Copyright 2005-2014 Eduardo Ramos Testillano & Francisco Ruiz Rayo
4 //
5 // https://bitbucket.org/testillano/anna
6 //
7 // Redistribution and use in source and binary forms, with or without
8 // modification, are permitted provided that the following conditions
9 // are met:
10 //
11 //     * Redistributions of source code must retain the above copyright
12 // notice, this list of conditions and the following disclaimer.
13 //     * Redistributions in binary form must reproduce the above
14 // copyright notice, this list of conditions and the following disclaimer
15 // in the documentation and/or other materials provided with the
16 // distribution.
17 //     * Neither the name of Google Inc. nor the names of its
18 // contributors may be used to endorse or promote products derived from
19 // this software without specific prior written permission.
20 //
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 //
33 // Authors: eduardo.ramos.testillano@gmail.com
34 //          cisco.tierra@gmail.com
35
36
37 #include <iostream>
38
39 #include <anna/core/mt/ThreadManager.hpp>
40 #include <anna/core/util/CommandLine.hpp>
41 #include <anna/core/mt/Mutex.hpp>
42 #include <anna/core/tracing/Logger.hpp>
43 #include <anna/core/mt/Runnable.hpp>
44 #include <anna/core/mt/Thread.hpp>
45 #include <anna/core/tracing/TraceWriter.hpp>
46 #include <anna/core/util/SortedVector.hpp>
47
48 #include <anna/io/functions.hpp>
49
50 using namespace std;
51 using namespace anna;
52
53 class Client : public Runnable {
54 public:
55    Client (const int id, const int value) : Runnable (), a_id (id), a_value (value) {;}
56
57 private:
58    const int a_id;
59    const int a_value;
60
61    void run () throw (RuntimeException);
62
63    anna_complete_runnable (Client);
64 };
65
66 class SOAP {
67 public:
68    static int theMethod (const int id, const int value) throw (RuntimeException);
69 };
70
71 class Receiver;
72
73 class Waiter {
74 public:
75    int getId () const throw () { return a_id; }
76    int getValue () const throw () { return a_value; }
77
78    void setId (const int id) throw () { a_id = id; }
79    void setValue (const int value) throw () { a_value = value; }
80
81    void sendRequest () throw (RuntimeException);
82    int waitResponse () throw (RuntimeException);
83    void notifyResponse (const int result) throw (RuntimeException);   
84
85    void initialize () throw (RuntimeException);
86    
87 private:
88    static const int ReadChannel = 0;
89    static const int WriteChannel = 1;
90    
91    static ThreadManager a_threadManager;
92
93    int a_pipe [2];
94    int a_id;
95    int a_value;
96    int a_result;
97    Receiver* a_receiver;
98 };
99
100 class Context : public SafeRecycler <Waiter> {
101 public:  
102    Waiter* createWaiter (const int id, const int value) throw (RuntimeException);
103    void destroyWaiter (Waiter* waiter) throw (RuntimeException);
104    Waiter* findWaiter (const int id) throw () { return a_waiters.find (id); }
105
106 private:
107    struct SortByID {
108       static int value (const Waiter* waiter) throw () { return waiter->getId (); }
109    };
110
111    typedef SortedVector <Waiter, SortByID, int> waiter_container;
112
113    waiter_container a_waiters;
114 };
115
116 class Receiver : public Runnable {
117 public:
118    Receiver (const int id, const int value) : a_id (id), a_value (value) {;}
119
120 private:
121    int a_id;
122    int a_value;
123
124    void run () throw (RuntimeException);
125
126    anna_complete_runnable (Waiter);
127 };
128
129 Context* g_context = NULL;
130 ThreadManager Waiter::a_threadManager ("WaiterThreadManager");
131
132 int main (const int argc, const char** argv)
133 {
134    CommandLine& ccll = CommandLine::instantiate ();
135    ThreadManager* threadManager;
136    
137    g_context = new Context;
138
139    try {
140       ccll.add ("maxthr", CommandLine::Argument::Mandatory, "Maximum number of threads");
141       ccll.add ("mode", CommandLine::Argument::Mandatory, "Manager instance mode");
142       ccll.add ("nthr", CommandLine::Argument::Mandatory, "Number of threads to create");
143       ccll.add ("trace", CommandLine::Argument::Optional, "Trace level");
144
145       ccll.initialize (argv, argc);
146       ccll.verify ();
147
148       Logger::initialize ("thread", new TraceWriter ("file.trace", 2048000));
149
150       if (ccll.exists ("trace"))
151          Logger::setLevel (Logger::asLevel (ccll.getValue ("trace")));
152          
153       ThreadManager::Mode::_v mode = ThreadManager::Mode::asEnum (ccll.getValue ("mode"));
154
155       if (mode == ThreadManager::Mode::None) {
156          string msg (ccll.getValue ("mode"));
157          msg += " mode not valid (";
158          msg +=  ThreadManager::Mode::asList ();
159          msg += ")";
160          throw RuntimeException (msg, ANNA_FILE_LOCATION);
161       }
162
163       const int maxThread = ccll.getIntegerValue ("maxthr");
164       int nthr = max (2, ccll.getIntegerValue ("nthr"));
165      
166       threadManager = new ThreadManager ("MyManager", mode, maxThread, Thread::Flag::Joinable);
167       Thread* thread = NULL;
168
169       // Primero genera los clientes que generan la primera petición de cáloulo. Todos
170       // terminan llamando al método SOAP::theMethod, que será el que monte la  espera
171       // y "visualize la respuesta"
172       for (int ii = 0, min = 0, max = 10; ii < nthr; ii ++) {
173           
174          Client* client = new Client (ii, min + rand () % max);
175          // Crear los threads Tx 
176          thread = threadManager->createThread ();
177          thread->start (*client);
178
179          min = max;
180          max += 10;
181       }
182       
183       threadManager->join ();
184    }
185    catch (RuntimeException& ex) {
186       cout << ex.asString () << endl << endl;
187       ex.trace ();
188    }
189 }
190
191 // Esto simula el proceso cliente que envía la petición al proceso gSOAP
192 /* Se ejecuta desde el thread Tx */
193 void Client::run () 
194    throw (RuntimeException)
195 {
196    int result = SOAP::theMethod (a_id, a_value);
197
198    LOGDEBUG (
199       string msg (asString ());
200       msg += functions::asText (" | Result: ", result);
201       Logger::debug (msg, ANNA_FILE_LOCATION);
202    );
203 }
204
205 // La implementación del método SOAP debe devolver un valor, pero no puede 
206 // calcular por sí misma la respuesta, sino que tiene que apoyarse en un tercero
207 // que será el que le dé la respuesta "real" en algún momento.
208 /* Tx */
209 int SOAP::theMethod (const int id, const int value)
210    throw (RuntimeException)
211 {
212    int result = -1;
213
214    Waiter* waiter = NULL;
215
216    try {
217       waiter = g_context->createWaiter (id, value);
218       waiter->initialize ();
219       waiter->sendRequest (); 
220       result = waiter->waitResponse ();
221       
222       cout << "Id: " << id << " | Value: " << value << " | Result: " << result << endl << endl;      
223    }
224    catch (RuntimeException& ex) {
225       cout << "Id: " << id << " | Value: " << value << " | " << ex.asString () << endl << endl;
226       ex.trace ();
227    }  
228
229    g_context->destroyWaiter (waiter);  // Sólo lo libera cuando ya no necesita usar sus datos.
230    
231    return result;
232 }
233
234 /* Tx */
235 void Waiter::initialize () 
236    throw (RuntimeException)
237 {
238    if (pipe (a_pipe) == -1) {
239       string msg (functions::asText ("Waiter::initialize | Id: ", a_id));
240       throw RuntimeException (msg, errno, ANNA_FILE_LOCATION); 
241    }     
242 }
243
244 /* Tx */
245 void Waiter::sendRequest () 
246    throw (RuntimeException)
247 {
248    try {
249       /* Ojo que esto generará memory-leaks, pero no es el objeto de éste ejemplo la buena gestión de la memoria */
250       Receiver* receiver = new Receiver (a_id, a_value);   
251       a_threadManager.createThread ()->start (*receiver);         
252    }
253    catch (RuntimeException& ex) {
254       string msg ("Waiter::sendRequest | ");
255       msg += ex.asString ();
256       throw RuntimeException (msg, ANNA_FILE_LOCATION);
257    }
258 }
259
260 /* Tx */
261 int Waiter::waitResponse () 
262    throw (RuntimeException)
263 {
264    bool receive = io::functions::waitInput (a_pipe [ReadChannel], (Millisecond)500);
265    
266    if (receive == false) {      
267       string msg (anna::functions::asText ("Id: ", a_id));
268       msg += anna::functions::asText (" | Value: ", a_value);
269       msg += " | Timeout";            
270       throw RuntimeException (msg, ANNA_FILE_LOCATION);     
271    }
272    
273    // Esta variable se ha establecido en el notifyResponse (Ty)
274    return a_result;
275 }
276
277 /* Ty */
278 void Waiter::notifyResponse (const int result) 
279    throw (RuntimeException)
280 {
281    int byte = 0;
282    
283    a_result = result;
284    
285    write (a_pipe [WriteChannel], &byte, sizeof (byte));
286 }
287
288 // Modela al thread que recibe la respuesta correspondiente a la petición por la 
289 // que debería haber un Waiter esperando.
290 /* Ty */
291 void Receiver::run () 
292    throw (RuntimeException)
293 {
294    anna::functions::sleep ((Millisecond)(rand () % 1000));
295
296    int result = a_value + a_value;
297
298    // Protege el contexto para evitar condiciones de carrera, como que en el "mismo" instante
299    // en que el receptor decide responder, el waiter en Tx decida que ha terminado de esperar
300    // la respuesta.
301    Guard guard (g_context, "Receiver::run");
302    
303    Waiter* waiter = g_context->findWaiter (a_id);
304
305    // Si el Waiter se cansó de esperar la respuesta ...
306    if (waiter == NULL)
307       return;
308
309    waiter->notifyResponse (result);
310 }
311
312 Waiter* Context::createWaiter (const int id, const int value) 
313    throw (RuntimeException)
314 {
315    Guard guard (*this, "Context::createWaiter");
316    
317    Waiter* result = a_waiters.find (id);
318    
319    if (result != NULL) 
320       throw RuntimeException ("Waiter already used", ANNA_FILE_LOCATION);
321    
322    result = Recycler <Waiter>::create ();
323    
324    result->setId (id);
325    result->setValue (value);
326    
327    a_waiters.add (result);
328    
329    return result;   
330 }
331
332 void Context::destroyWaiter (Waiter* waiter) 
333    throw (RuntimeException)
334 {
335    Guard guard (this, "Context::destroyWaiter");
336
337    // Lo elimina de la lista ordenada
338    a_waiters.erase (waiter);   
339    
340    // Lo marca como disponible para usar
341    Recycler <Waiter>::release (waiter);   
342 }
343