1 // ANNA - Anna is Not Nothingness Anymore //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
11 #include <anna/core/mt/ThreadManager.hpp>
12 #include <anna/core/util/CommandLine.hpp>
13 #include <anna/core/mt/Mutex.hpp>
14 #include <anna/core/tracing/Logger.hpp>
15 #include <anna/core/mt/Runnable.hpp>
16 #include <anna/core/mt/Thread.hpp>
17 #include <anna/core/tracing/TraceWriter.hpp>
18 #include <anna/core/util/SortedVector.hpp>
20 #include <anna/io/functions.hpp>
25 class Client : public Runnable {
27 Client (const int id, const int value) : Runnable (), a_id (id), a_value (value) {;}
33 void run () throw (RuntimeException);
35 anna_complete_runnable (Client);
40 static int theMethod (const int id, const int value) throw (RuntimeException);
47 int getId () const throw () { return a_id; }
48 int getValue () const throw () { return a_value; }
50 void setId (const int id) throw () { a_id = id; }
51 void setValue (const int value) throw () { a_value = value; }
53 void sendRequest () throw (RuntimeException);
54 int waitResponse () throw (RuntimeException);
55 void notifyResponse (const int result) throw (RuntimeException);
57 void initialize () throw (RuntimeException);
60 static const int ReadChannel = 0;
61 static const int WriteChannel = 1;
63 static ThreadManager a_threadManager;
72 class Context : public SafeRecycler <Waiter> {
74 Waiter* createWaiter (const int id, const int value) throw (RuntimeException);
75 void destroyWaiter (Waiter* waiter) throw (RuntimeException);
76 Waiter* findWaiter (const int id) throw () { return a_waiters.find (id); }
80 static int value (const Waiter* waiter) throw () { return waiter->getId (); }
83 typedef SortedVector <Waiter, SortByID, int> waiter_container;
85 waiter_container a_waiters;
88 class Receiver : public Runnable {
90 Receiver (const int id, const int value) : a_id (id), a_value (value) {;}
96 void run () throw (RuntimeException);
98 anna_complete_runnable (Waiter);
101 Context* g_context = NULL;
102 ThreadManager Waiter::a_threadManager ("WaiterThreadManager");
104 int main (const int argc, const char** argv)
106 CommandLine& ccll = CommandLine::instantiate ();
107 ThreadManager* threadManager;
109 g_context = new Context;
112 ccll.add ("maxthr", CommandLine::Argument::Mandatory, "Maximum number of threads");
113 ccll.add ("mode", CommandLine::Argument::Mandatory, "Manager instance mode");
114 ccll.add ("nthr", CommandLine::Argument::Mandatory, "Number of threads to create");
115 ccll.add ("trace", CommandLine::Argument::Optional, "Trace level");
117 ccll.initialize (argv, argc);
120 Logger::initialize ("thread", new TraceWriter ("file.trace", 2048000));
122 if (ccll.exists ("trace"))
123 Logger::setLevel (Logger::asLevel (ccll.getValue ("trace")));
125 ThreadManager::Mode::_v mode = ThreadManager::Mode::asEnum (ccll.getValue ("mode"));
127 if (mode == ThreadManager::Mode::None) {
128 string msg (ccll.getValue ("mode"));
129 msg += " mode not valid (";
130 msg += ThreadManager::Mode::asList ();
132 throw RuntimeException (msg, ANNA_FILE_LOCATION);
135 const int maxThread = ccll.getIntegerValue ("maxthr");
136 int nthr = max (2, ccll.getIntegerValue ("nthr"));
138 threadManager = new ThreadManager ("MyManager", mode, maxThread, Thread::Flag::Joinable);
139 Thread* thread = NULL;
141 // Primero genera los clientes que generan la primera petición de cáloulo. Todos
142 // terminan llamando al método SOAP::theMethod, que será el que monte la espera
143 // y "visualize la respuesta"
144 for (int ii = 0, min = 0, max = 10; ii < nthr; ii ++) {
146 Client* client = new Client (ii, min + rand () % max);
147 // Crear los threads Tx
148 thread = threadManager->createThread ();
149 thread->start (*client);
155 threadManager->join ();
157 catch (RuntimeException& ex) {
158 cout << ex.asString () << endl << endl;
163 // Esto simula el proceso cliente que envía la petición al proceso gSOAP
164 /* Se ejecuta desde el thread Tx */
166 throw (RuntimeException)
168 int result = SOAP::theMethod (a_id, a_value);
171 string msg (asString ());
172 msg += functions::asText (" | Result: ", result);
173 Logger::debug (msg, ANNA_FILE_LOCATION);
177 // La implementación del método SOAP debe devolver un valor, pero no puede
178 // calcular por sí misma la respuesta, sino que tiene que apoyarse en un tercero
179 // que será el que le dé la respuesta "real" en algún momento.
181 int SOAP::theMethod (const int id, const int value)
182 throw (RuntimeException)
186 Waiter* waiter = NULL;
189 waiter = g_context->createWaiter (id, value);
190 waiter->initialize ();
191 waiter->sendRequest ();
192 result = waiter->waitResponse ();
194 cout << "Id: " << id << " | Value: " << value << " | Result: " << result << endl << endl;
196 catch (RuntimeException& ex) {
197 cout << "Id: " << id << " | Value: " << value << " | " << ex.asString () << endl << endl;
201 g_context->destroyWaiter (waiter); // Sólo lo libera cuando ya no necesita usar sus datos.
207 void Waiter::initialize ()
208 throw (RuntimeException)
210 if (pipe (a_pipe) == -1) {
211 string msg (functions::asText ("Waiter::initialize | Id: ", a_id));
212 throw RuntimeException (msg, errno, ANNA_FILE_LOCATION);
217 void Waiter::sendRequest ()
218 throw (RuntimeException)
221 /* Ojo que esto generará memory-leaks, pero no es el objeto de éste ejemplo la buena gestión de la memoria */
222 Receiver* receiver = new Receiver (a_id, a_value);
223 a_threadManager.createThread ()->start (*receiver);
225 catch (RuntimeException& ex) {
226 string msg ("Waiter::sendRequest | ");
227 msg += ex.asString ();
228 throw RuntimeException (msg, ANNA_FILE_LOCATION);
233 int Waiter::waitResponse ()
234 throw (RuntimeException)
236 bool receive = io::functions::waitInput (a_pipe [ReadChannel], (Millisecond)500);
238 if (receive == false) {
239 string msg (anna::functions::asText ("Id: ", a_id));
240 msg += anna::functions::asText (" | Value: ", a_value);
242 throw RuntimeException (msg, ANNA_FILE_LOCATION);
245 // Esta variable se ha establecido en el notifyResponse (Ty)
250 void Waiter::notifyResponse (const int result)
251 throw (RuntimeException)
257 ssize_t r = write (a_pipe [WriteChannel], &byte, sizeof (byte));
260 // Modela al thread que recibe la respuesta correspondiente a la petición por la
261 // que debería haber un Waiter esperando.
263 void Receiver::run ()
264 throw (RuntimeException)
266 anna::functions::sleep ((Millisecond)(rand () % 1000));
268 int result = a_value + a_value;
270 // Protege el contexto para evitar condiciones de carrera, como que en el "mismo" instante
271 // en que el receptor decide responder, el waiter en Tx decida que ha terminado de esperar
273 Guard guard (g_context, "Receiver::run");
275 Waiter* waiter = g_context->findWaiter (a_id);
277 // Si el Waiter se cansó de esperar la respuesta ...
281 waiter->notifyResponse (result);
284 Waiter* Context::createWaiter (const int id, const int value)
285 throw (RuntimeException)
287 Guard guard (*this, "Context::createWaiter");
289 Waiter* result = a_waiters.find (id);
292 throw RuntimeException ("Waiter already used", ANNA_FILE_LOCATION);
294 result = Recycler <Waiter>::create ();
297 result->setValue (value);
299 a_waiters.add (result);
304 void Context::destroyWaiter (Waiter* waiter)
305 throw (RuntimeException)
307 Guard guard (this, "Context::destroyWaiter");
309 // Lo elimina de la lista ordenada
310 a_waiters.erase (waiter);
312 // Lo marca como disponible para usar
313 Recycler <Waiter>::release (waiter);