51c4f508505d2bffd955f2a08746ae7cfaf494eb
[anna.git] / source / timex / Engine.cpp
1 // ANNA - Anna is Not Nothingness Anymore
2 //
3 // (c) Copyright 2005-2014 Eduardo Ramos Testillano & Francisco Ruiz Rayo
4 //
5 // http://redmine.teslayout.com/projects/anna-suite
6 //
7 // Redistribution and use in source and binary forms, with or without
8 // modification, are permitted provided that the following conditions
9 // are met:
10 //
11 //     * Redistributions of source code must retain the above copyright
12 // notice, this list of conditions and the following disclaimer.
13 //     * Redistributions in binary form must reproduce the above
14 // copyright notice, this list of conditions and the following disclaimer
15 // in the documentation and/or other materials provided with the
16 // distribution.
17 //     *  Neither the name of the copyright holder nor the names of its
18 // contributors may be used to endorse or promote products derived from
19 // this software without specific prior written permission.
20 //
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 //
33 // Authors: eduardo.ramos.testillano@gmail.com
34 //          cisco.tierra@gmail.com
35
36
37 #include <algorithm>
38
39 #include <anna/core/functions.hpp>
40 #include <anna/core/tracing/TraceMethod.hpp>
41 #include <anna/core/tracing/TraceFunction.hpp>
42 #include <anna/core/mt/Thread.hpp>
43
44 #include <anna/app/functions.hpp>
45
46 #include <anna/xml/Node.hpp>
47 #include <anna/xml/Attribute.hpp>
48
49 #include <anna/comm/Communicator.hpp>
50
51 #include <anna/timex/Engine.hpp>
52 #include <anna/timex/internal/TickConsumer.hpp>
53 #include <anna/timex/internal/TickProducer.hpp>
54 #include <anna/timex/TimeEventObserver.hpp>
55 #include <anna/timex/internal/sccs.hpp>
56
57 using namespace std;
58 using namespace anna;
59 using namespace anna::comm;
60
61 /**
62    Resolucion minima (en milisegundos) soportada por el controlador de tiempos.
63 */
64 //static
65 const Millisecond timex::Engine::minResolution(100);
66
67
68 timex::Engine::Engine(const Millisecond & maxTimeout, const Millisecond & resolution) :
69   app::Component(getClassName()),
70   a_currentQuantum(0),
71   a_maxQuantum(0),
72   a_maxTimeout(maxTimeout),
73   a_resolution(resolution),
74   a_isActive(false),
75   a_tickProducer(NULL),
76   a_tickConsumer(NULL),
77   a_expiredQuantum(NULL),
78   a_timeTable(NULL) {
79   timex::sccs::activate();
80 }
81
82 timex::Engine::~Engine() {
83   delete a_tickConsumer;
84   delete [] a_timeTable;
85 }
86
87 //--------------------------------------------------------------------------------------------
88 // Inicializa la configuracin.
89 //
90 // (2) En Solaris el ualarm tiene una deriva constante que parece ser que dependen de la
91 //     arquitectura de la m�uina. Para calcular esta deriva y poder corregirla en los
92 //     posteriores ualarm vamos a calcular la diferencia entre el tiempo esperado y el momento
93 //     en que realmente llega la seal de ualarm.
94 //--------------------------------------------------------------------------------------------
95 void timex::Engine::do_initialize()
96 throw(RuntimeException) {
97   LOGMETHOD(TraceMethod tm("timex::Engine", "do_initialize", ANNA_FILE_LOCATION));
98
99   if(a_maxQuantum > 0) {
100     Logger::write(Logger::Warning, "Time controller was previously started", ANNA_FILE_LOCATION);
101     return;
102   }
103
104   if(a_resolution < minResolution)
105     throw RuntimeException(functions::asString("Resolution must be greater than %d milliseconds", minResolution.getValue()), ANNA_FILE_LOCATION);
106
107   if(a_maxTimeout <= a_resolution)
108     throw RuntimeException(functions::asString("Max-Timeout must be greater than %d milliseconds", a_resolution.getValue()), ANNA_FILE_LOCATION);
109
110   a_maxQuantum = a_maxTimeout / a_resolution;
111
112   while((a_maxQuantum * a_resolution) <= a_maxTimeout) a_maxQuantum ++;
113
114   a_timeTable = new Quantum [a_maxQuantum];
115   Communicator* communicator = app::functions::component <Communicator> (ANNA_FILE_LOCATION);
116   communicator->attach(a_tickConsumer = new TickConsumer(this));
117   /**
118    * Esto siempre se ejecutará en un thread aparte aunque la librería haya sido generada en modo ST,
119    * porque así evitamos tener que generar el pulso de reloj mediante una alarma.
120    */
121   a_tickProducer = new TickProducer(this, a_tickConsumer->getfdWrite());
122   pthread_attr_t attr;
123   pthread_attr_init(&attr);
124   int errorCode;
125
126   if((errorCode = pthread_create(&a_threadProducer, &attr, TickProducer::exec, a_tickProducer)) != 0)
127     throw RuntimeException(std::string("timex::Engine::do_initialize"), errorCode, ANNA_FILE_LOCATION);
128
129   LOGDEBUG(
130     string msg("Time controller | Max Timeout: ");
131     msg += functions::asString(a_maxTimeout);
132     msg += " | Resolution: ";
133     msg += functions::asString(a_resolution);
134     msg += " | Max Quantum: ";
135     msg += functions::asString(a_maxQuantum);
136     msg += " quantums";
137     Logger::write(Logger::Debug, msg, ANNA_FILE_LOCATION);
138   );
139 }
140
141 // Reimplementado de app::Component
142 void timex::Engine::do_cloneParent()
143 throw() {
144 }
145
146 /*
147  * Se invoca desde app::Application::clone -> app::Component::do_cloneChild (ojo EN EL NUEVO PROCESO).
148  * Instala la senhal de tick en el proceso, ya que la alarma no se hereda directamente.
149  */
150 void timex::Engine::do_cloneChild()
151 throw(RuntimeException) {
152 }
153
154 //----------------------------------------------------------------------------
155 // No para los hilos de generacion, sino que evita que se escriban los bytes
156 // en el 'pipe'.
157 //----------------------------------------------------------------------------
158 void timex::Engine::pause()
159 throw(RuntimeException) {
160   Guard guard(this, "timex::Engine (pause)");
161
162   if(a_tickProducer->isInPause() == false) {
163     LOGWARNING(
164       string msg("timex::Engine::pause | Pending TimeEvents: ");
165       msg += functions::asString((const int) a_directory.size());
166       Logger::warning(msg, ANNA_FILE_LOCATION);
167     );
168     a_tickProducer->setIsInPause(true);
169   }
170 }
171
172 void timex::Engine::play()
173 throw(RuntimeException) {
174   Guard guard(this, "timex::Engine (play)");
175
176   if(a_tickProducer->isInPause() == true) {
177     LOGWARNING(
178       string msg("timex::Engine::play | Pending TimeEvents: ");
179       msg += functions::asString((const int) a_directory.size());
180       Logger::warning(msg, ANNA_FILE_LOCATION);
181     );
182     a_tickProducer->setIsInPause(false);
183   }
184 }
185
186 void timex::Engine::activate(timex::TimeEvent* timeEvent)
187 throw(RuntimeException) {
188   LOGMETHOD(TraceMethod tm(Logger::Local7, "timex::Engine", "activate", ANNA_FILE_LOCATION));
189
190   if(a_maxQuantum == 0)
191     throw RuntimeException("Engine::initialize was not called", ANNA_FILE_LOCATION);
192
193   if(timeEvent == NULL)
194     throw RuntimeException("Cannot activate a NULL TimeEvent", ANNA_FILE_LOCATION);
195
196   if(timeEvent->a_controller != NULL) {
197     string msg(timeEvent->asString());
198     msg += " | Already activated";
199     throw RuntimeException(msg, ANNA_FILE_LOCATION);
200   }
201
202   const Millisecond & timeout(timeEvent->getTimeout());
203
204   if(timeout > a_maxTimeout || timeout < a_resolution) {
205     string msg("Invalid TimeEvent timeout | Max Timeout: ");
206     msg += functions::asString(a_maxTimeout);
207     msg += " | Min timeout (resolution): ";
208     msg += functions::asString(a_resolution);
209     msg += " | ";
210     msg += timeEvent->asString();
211     throw RuntimeException(msg, ANNA_FILE_LOCATION);
212   }
213
214   Guard guard(this, "timex::Engine (activate)");
215
216   if(a_tickProducer->isInPause() == true) {
217     string msg("Cannot activate TimeEvents with timex::Engine in pause | ");
218     msg += timeEvent->asString();
219     throw RuntimeException(msg, ANNA_FILE_LOCATION);
220   }
221
222   const TimeEvent::Id id(timeEvent->getId());
223
224   if(a_directory.find(id) != a_directory.end())
225     throw RuntimeException(functions::asString("Id %ld (0x%x) already in use", id, id), ANNA_FILE_LOCATION);
226
227   int iq = getQuantum(timeout);
228   Quantum* quantum = &a_timeTable [iq];
229   bool delayed;
230   timeEvent->a_controller = this;
231   a_directory.insert(Directory::value_type(id, quantum));
232
233   if(quantum == a_expiredQuantum) {
234     a_delayedQuantum.push_back(timeEvent);
235     delayed = true;
236   } else {
237     quantum->push_back(timeEvent);
238     delayed = false;
239   }
240
241   LOGDEBUG(
242     string msg("timex::Engine::activate | ");
243     msg += timeEvent->asString();
244     msg += " | Current quantum: ";
245     msg += functions::asString(a_currentQuantum);
246     msg += " | Quantum programmed: ";
247     msg += functions::asString(iq);
248     msg += " (";
249     msg += functions::asHexString(anna_ptrnumber_cast(quantum));
250     msg += functions::asText(") | Delayed: ", delayed);
251     Logger::debug(msg, ANNA_FILE_LOCATION);
252   );
253 }
254
255 timex::TimeEvent* timex::Engine::getTimeEvent(const timex::TimeEvent::Id eventTimeId)
256 throw() {
257   LOGMETHOD(TraceMethod tm(Logger::Local7, "timex::Engine", "getTimeEvent", ANNA_FILE_LOCATION));
258   Directory::iterator iid;
259   TimeEvent* result(NULL);
260   Quantum* quantum;
261   TimeEvent* aux;
262   Guard guard(this, "timex::Engine (getTimeEvent)");
263
264   if((iid = a_directory.find(eventTimeId)) == a_directory.end())
265     return NULL;
266
267   quantum = iid->second;
268
269   for(Quantum::iterator ii = quantum->begin(), maxii = quantum->end(); ii != maxii; ii ++) {
270     if((aux = *ii)->getId() == eventTimeId) {
271       result = aux;
272       break;
273     }
274   }
275
276   return result;
277 }
278
279 void timex::Engine::cancel(timex::TimeEvent* timeEvent)
280 throw(RuntimeException) {
281   LOGMETHOD(TraceMethod tm(Logger::Local7, "timex::Engine", "cancel", ANNA_FILE_LOCATION));
282
283   if(timeEvent == NULL)
284     throw RuntimeException("Cannot cancel a NULL TimeEvent", ANNA_FILE_LOCATION);
285
286   if(timeEvent->a_controller == NULL) {
287     LOGDEBUG(
288       string msg(timeEvent->asString());
289       msg += " | Not activated";
290       Logger::write(Logger::Debug, msg, ANNA_FILE_LOCATION);
291     );
292     return;
293   }
294
295   Directory::iterator iid;
296   Quantum* quantum;
297   Guard guard(this, "timex::Engine (cancel)");
298
299   if((iid = a_directory.find(timeEvent->getId())) == a_directory.end())
300     return;
301
302   if((quantum = iid->second) == a_expiredQuantum) {
303     LOGWARNING(
304       string msg("timex::Engine::cancel | ");
305       msg += timeEvent->asString();
306       msg += " | Processing already programmed";
307       Logger::warning(msg, ANNA_FILE_LOCATION);
308     );
309     return;
310   }
311
312   a_directory.erase(iid);
313   quantum->erase(find(quantum->begin(), quantum->end(), timeEvent));
314   timeEvent->a_controller = NULL;
315   LOGDEBUG(
316     string msg("timex::Engine::cancel | ");
317     msg += timeEvent->asString();
318     msg += " | Current quantum: ";
319     msg += functions::asString(a_currentQuantum);
320     msg += " | Quantum programmed: ";
321     msg += functions::asHexString(anna_ptrnumber_cast(quantum));
322     Logger::debug(msg, ANNA_FILE_LOCATION);
323   );
324   notifyRelease(timeEvent);
325 }
326
327 void timex::Engine::do_stop()
328 throw() {
329   LOGMETHOD(TraceMethod tm("timex::Engine", "do_stop", ANNA_FILE_LOCATION));
330   Quantum::iterator ii, maxii;
331   Guard guard(this, "timex::Engine (do_stop)");
332   app::functions::component <Communicator> (ANNA_FILE_LOCATION)->detach(a_tickConsumer);
333
334   if(a_tickProducer)
335     a_tickProducer->requestStop();
336
337   a_directory.clear();
338   TimeEvent* timeEvent;
339
340   for(int iq = 0; iq < a_maxQuantum; iq ++) {
341     Quantum& quantum(a_timeTable [iq]);
342
343     for(ii = quantum.begin(), maxii = quantum.end(); ii != maxii; ii ++) {
344       timeEvent = Engine::timeEvent(ii);
345
346       try {
347         timeEvent->a_controller = NULL;
348         timeEvent->stop();
349         notifyRelease(timeEvent);
350       } catch(Exception& ex) {
351         ex.trace();
352       }
353     }
354
355     quantum.clear();
356   }
357 }
358
359 void timex::Engine::kill()
360 throw() {
361   Guard guard(this, "timex::Engine (kill)");
362   app::functions::component <Communicator> (ANNA_FILE_LOCATION)->detach(a_tickConsumer);
363
364   if(a_tickProducer)
365     a_tickProducer->requestStop();
366
367   string msg("timex::Engine::kill | ");
368   msg += functions::asString(" | Pending TimeEvents: %d", a_directory.size());
369   Logger::critical(msg, ANNA_FILE_LOCATION);
370
371   for(int iq = 0; iq < a_maxQuantum; iq ++)
372     a_timeTable [iq].clear();
373
374   a_delayedQuantum.clear();
375   a_directory.clear();
376 }
377
378 //----------------------------------------------------------------------------------
379 // (1) Podr� ser que al invocar al m�odo 'expire' de una evento se halla originado
380 //     la cancelacin de algn otro evento. As�que si el id de evento ya no esta en
381 //     en el directorio => da igual, se ignora ya que luego borraremos todos los
382 //     eventos del Quantum.
383 // (2) Si mientas estamos caducando este Quantum hay que activar transacciones que
384 //     debe ir sobre el => se guardan en un vector temporal y se copian al final
385 //     del proceso, con lo que evita totalmente la posiblidad de perdida.
386 // (3) Si el temporizador ha sido reactivado no tiene que liberarlo.
387 //----------------------------------------------------------------------------------
388 void timex::Engine::tick()
389 throw(RuntimeException) {
390   LOGMETHOD(TraceMethod tm(Logger::Local7, "timex::Engine", "tick", ANNA_FILE_LOCATION));
391   /*
392      if (Logger::isActive (Logger::Local6) == true && a_directory.size () > 0) {
393         string msg ("Tick | Quantum: ");
394         msg += functions::asString (a_currentQuantum);
395         msg += " | Tx: ";
396         msg += functions::asString (functions::millisecond ());
397         msg += " | N: ";
398         msg += functions::asString (a_directory.size ());
399         Logger::write (Logger::Local6, msg, ANNA_FILE_LOCATION);
400      };
401   */
402   Guard guard(this, "timex::Engine (tick)");
403   int counter(0);
404   eventBeginQuantum();
405   a_expiredQuantum = &a_timeTable [a_currentQuantum];
406   TimeEvent* timeEvent;
407   Directory::iterator iid;
408   quantum_iterator ii, maxii;
409   bool observed;
410
411   for(ii = a_expiredQuantum->begin(), maxii = a_expiredQuantum->end(); ii != maxii; ii ++) {
412     timeEvent = Engine::timeEvent(ii);
413
414     if((iid = a_directory.find(timeEvent->getId())) == a_directory.end())            // (1)
415       continue;
416
417     a_directory.erase(iid);
418
419     if(Logger::isActive(Logger::Debug)) {
420       string msg(timeEvent->asString());
421       msg += " | Quantum programmed: ";
422       msg += functions::asHexString(anna_ptrnumber_cast(a_expiredQuantum));
423       msg += " | Expired";
424       Logger::write(Logger::Debug, msg, ANNA_FILE_LOCATION);
425     }
426
427     timeEvent->a_controller = NULL;
428     /*
429      * Si tiene NO tiene observer asociado, seguramente se auto-liberará en el expire
430      * Si tiene observer, se invocará al método correspondiente.
431      */
432     observed = (timeEvent->a_observer != NULL);
433
434     try {
435       timeEvent->expire(this);
436       counter ++;
437     } catch(Exception& ex) {
438       ex.trace();
439     }
440
441     // Si tiene observer => el expire NO puede haber liberado el TimeEvent
442     if(observed == true) {
443       if(timeEvent->a_controller == NULL)                                           // (3)
444         timeEvent->a_observer->release(timeEvent);
445     }
446   }
447
448   a_expiredQuantum->clear();
449
450   if(a_delayedQuantum.size() > 0) {                                                 // (2)
451     for(ii = a_delayedQuantum.begin(), maxii = a_delayedQuantum.end(); ii != maxii; ii ++)
452       a_expiredQuantum->push_back(Engine::timeEvent(ii));
453
454     a_delayedQuantum.clear();
455   }
456
457   a_expiredQuantum = NULL;
458
459   if(++ a_currentQuantum >= a_maxQuantum)
460     a_currentQuantum = 0;
461
462   eventEndQuantum(counter);
463 }
464
465 void timex::Engine::notifyRelease(timex::TimeEvent* timeEvent)
466 throw() {
467   if(timeEvent->a_observer != NULL)
468     timeEvent->a_observer->release(timeEvent);
469 }
470
471 string timex::Engine::asString() const
472 throw() {
473   string msg("timex::Engine { ");
474   msg += app::Component::asString();
475   msg += " | Max Timeout: ";
476   msg += functions::asString(a_maxTimeout);
477   msg += " ms | Resolution: ";
478   msg += functions::asString(a_resolution);
479   msg += " ms | Programming: ";
480   msg += functions::asString(a_maxQuantum);
481   msg += " quantums | Pending TimeEvents: ";
482   msg += functions::asString((const int) a_directory.size());
483   return msg += " }";
484 }
485
486 xml::Node* timex::Engine::asXML(xml::Node* parent) const
487 throw() {
488   parent = Component::asXML(parent);
489   xml::Node* result = parent->createChild("timex.Engine");
490   result->createAttribute("MaxTimeout", a_maxTimeout);
491   result->createAttribute("Resolution", a_resolution);
492   result->createAttribute("N", a_directory.size());
493   result->createAttribute("Pause", functions::asString(a_tickProducer->isInPause()));
494   xml::Node* node = result->createChild("Quantum");
495   node->createAttribute("Current", a_currentQuantum);
496   node->createAttribute("Max", a_maxQuantum);
497   return result;
498 }