1 // ANNA - Anna is Not 'N' Anymore
3 // (c) Copyright 2005-2014 Eduardo Ramos Testillano & Francisco Ruiz Rayo
5 // https://bitbucket.org/testillano/anna
7 // Redistribution and use in source and binary forms, with or without
8 // modification, are permitted provided that the following conditions
11 // * Redistributions of source code must retain the above copyright
12 // notice, this list of conditions and the following disclaimer.
13 // * Redistributions in binary form must reproduce the above
14 // copyright notice, this list of conditions and the following disclaimer
15 // in the documentation and/or other materials provided with the
17 // * Neither the name of Google Inc. nor the names of its
18 // contributors may be used to endorse or promote products derived from
19 // this software without specific prior written permission.
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33 // Authors: eduardo.ramos.testillano@gmail.com
34 // cisco.tierra@gmail.com
39 #include <anna/core/mt/ThreadManager.hpp>
40 #include <anna/core/util/CommandLine.hpp>
41 #include <anna/core/mt/Mutex.hpp>
42 #include <anna/core/tracing/Logger.hpp>
43 #include <anna/core/mt/Runnable.hpp>
44 #include <anna/core/mt/Thread.hpp>
45 #include <anna/core/tracing/TraceWriter.hpp>
46 #include <anna/core/util/SortedVector.hpp>
48 #include <anna/io/functions.hpp>
53 class Client : public Runnable {
55 Client (const int id, const int value) : Runnable (), a_id (id), a_value (value) {;}
61 void run () throw (RuntimeException);
63 anna_complete_runnable (Client);
68 static int theMethod (const int id, const int value) throw (RuntimeException);
75 int getId () const throw () { return a_id; }
76 int getValue () const throw () { return a_value; }
78 void setId (const int id) throw () { a_id = id; }
79 void setValue (const int value) throw () { a_value = value; }
81 void sendRequest () throw (RuntimeException);
82 int waitResponse () throw (RuntimeException);
83 void notifyResponse (const int result) throw (RuntimeException);
85 void initialize () throw (RuntimeException);
88 static const int ReadChannel = 0;
89 static const int WriteChannel = 1;
91 static ThreadManager a_threadManager;
100 class Context : public SafeRecycler <Waiter> {
102 Waiter* createWaiter (const int id, const int value) throw (RuntimeException);
103 void destroyWaiter (Waiter* waiter) throw (RuntimeException);
104 Waiter* findWaiter (const int id) throw () { return a_waiters.find (id); }
108 static int value (const Waiter* waiter) throw () { return waiter->getId (); }
111 typedef SortedVector <Waiter, SortByID, int> waiter_container;
113 waiter_container a_waiters;
116 class Receiver : public Runnable {
118 Receiver (const int id, const int value) : a_id (id), a_value (value) {;}
124 void run () throw (RuntimeException);
126 anna_complete_runnable (Waiter);
129 Context* g_context = NULL;
130 ThreadManager Waiter::a_threadManager ("WaiterThreadManager");
132 int main (const int argc, const char** argv)
134 CommandLine& ccll = CommandLine::instantiate ();
135 ThreadManager* threadManager;
137 g_context = new Context;
140 ccll.add ("maxthr", CommandLine::Argument::Mandatory, "Maximum number of threads");
141 ccll.add ("mode", CommandLine::Argument::Mandatory, "Manager instance mode");
142 ccll.add ("nthr", CommandLine::Argument::Mandatory, "Number of threads to create");
143 ccll.add ("trace", CommandLine::Argument::Optional, "Trace level");
145 ccll.initialize (argv, argc);
148 Logger::initialize ("thread", new TraceWriter ("file.trace", 2048000));
150 if (ccll.exists ("trace"))
151 Logger::setLevel (Logger::asLevel (ccll.getValue ("trace")));
153 ThreadManager::Mode::_v mode = ThreadManager::Mode::asEnum (ccll.getValue ("mode"));
155 if (mode == ThreadManager::Mode::None) {
156 string msg (ccll.getValue ("mode"));
157 msg += " mode not valid (";
158 msg += ThreadManager::Mode::asList ();
160 throw RuntimeException (msg, ANNA_FILE_LOCATION);
163 const int maxThread = ccll.getIntegerValue ("maxthr");
164 int nthr = max (2, ccll.getIntegerValue ("nthr"));
166 threadManager = new ThreadManager ("MyManager", mode, maxThread, Thread::Flag::Joinable);
167 Thread* thread = NULL;
169 // Primero genera los clientes que generan la primera petición de cáloulo. Todos
170 // terminan llamando al método SOAP::theMethod, que será el que monte la espera
171 // y "visualize la respuesta"
172 for (int ii = 0, min = 0, max = 10; ii < nthr; ii ++) {
174 Client* client = new Client (ii, min + rand () % max);
175 // Crear los threads Tx
176 thread = threadManager->createThread ();
177 thread->start (*client);
183 threadManager->join ();
185 catch (RuntimeException& ex) {
186 cout << ex.asString () << endl << endl;
191 // Esto simula el proceso cliente que envía la petición al proceso gSOAP
192 /* Se ejecuta desde el thread Tx */
194 throw (RuntimeException)
196 int result = SOAP::theMethod (a_id, a_value);
199 string msg (asString ());
200 msg += functions::asText (" | Result: ", result);
201 Logger::debug (msg, ANNA_FILE_LOCATION);
205 // La implementación del método SOAP debe devolver un valor, pero no puede
206 // calcular por sí misma la respuesta, sino que tiene que apoyarse en un tercero
207 // que será el que le dé la respuesta "real" en algún momento.
209 int SOAP::theMethod (const int id, const int value)
210 throw (RuntimeException)
214 Waiter* waiter = NULL;
217 waiter = g_context->createWaiter (id, value);
218 waiter->initialize ();
219 waiter->sendRequest ();
220 result = waiter->waitResponse ();
222 cout << "Id: " << id << " | Value: " << value << " | Result: " << result << endl << endl;
224 catch (RuntimeException& ex) {
225 cout << "Id: " << id << " | Value: " << value << " | " << ex.asString () << endl << endl;
229 g_context->destroyWaiter (waiter); // Sólo lo libera cuando ya no necesita usar sus datos.
235 void Waiter::initialize ()
236 throw (RuntimeException)
238 if (pipe (a_pipe) == -1) {
239 string msg (functions::asText ("Waiter::initialize | Id: ", a_id));
240 throw RuntimeException (msg, errno, ANNA_FILE_LOCATION);
245 void Waiter::sendRequest ()
246 throw (RuntimeException)
249 /* Ojo que esto generará memory-leaks, pero no es el objeto de éste ejemplo la buena gestión de la memoria */
250 Receiver* receiver = new Receiver (a_id, a_value);
251 a_threadManager.createThread ()->start (*receiver);
253 catch (RuntimeException& ex) {
254 string msg ("Waiter::sendRequest | ");
255 msg += ex.asString ();
256 throw RuntimeException (msg, ANNA_FILE_LOCATION);
261 int Waiter::waitResponse ()
262 throw (RuntimeException)
264 bool receive = io::functions::waitInput (a_pipe [ReadChannel], (Millisecond)500);
266 if (receive == false) {
267 string msg (anna::functions::asText ("Id: ", a_id));
268 msg += anna::functions::asText (" | Value: ", a_value);
270 throw RuntimeException (msg, ANNA_FILE_LOCATION);
273 // Esta variable se ha establecido en el notifyResponse (Ty)
278 void Waiter::notifyResponse (const int result)
279 throw (RuntimeException)
285 write (a_pipe [WriteChannel], &byte, sizeof (byte));
288 // Modela al thread que recibe la respuesta correspondiente a la petición por la
289 // que debería haber un Waiter esperando.
291 void Receiver::run ()
292 throw (RuntimeException)
294 anna::functions::sleep ((Millisecond)(rand () % 1000));
296 int result = a_value + a_value;
298 // Protege el contexto para evitar condiciones de carrera, como que en el "mismo" instante
299 // en que el receptor decide responder, el waiter en Tx decida que ha terminado de esperar
301 Guard guard (g_context, "Receiver::run");
303 Waiter* waiter = g_context->findWaiter (a_id);
305 // Si el Waiter se cansó de esperar la respuesta ...
309 waiter->notifyResponse (result);
312 Waiter* Context::createWaiter (const int id, const int value)
313 throw (RuntimeException)
315 Guard guard (*this, "Context::createWaiter");
317 Waiter* result = a_waiters.find (id);
320 throw RuntimeException ("Waiter already used", ANNA_FILE_LOCATION);
322 result = Recycler <Waiter>::create ();
325 result->setValue (value);
327 a_waiters.add (result);
332 void Context::destroyWaiter (Waiter* waiter)
333 throw (RuntimeException)
335 Guard guard (this, "Context::destroyWaiter");
337 // Lo elimina de la lista ordenada
338 a_waiters.erase (waiter);
340 // Lo marca como disponible para usar
341 Recycler <Waiter>::release (waiter);