Fix README about oracle integration
[anna.git] / source / timex / Engine.cpp
1 // ANNA - Anna is Not Nothingness Anymore                                                         //
2 //                                                                                                //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo                         //
4 //                                                                                                //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite                           //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
7
8
9 #include <algorithm>
10
11 #include <anna/core/functions.hpp>
12 #include <anna/core/tracing/TraceMethod.hpp>
13 #include <anna/core/tracing/TraceFunction.hpp>
14 #include <anna/core/mt/Thread.hpp>
15
16 #include <anna/app/functions.hpp>
17
18 #include <anna/xml/Node.hpp>
19 #include <anna/xml/Attribute.hpp>
20
21 #include <anna/comm/Communicator.hpp>
22
23 #include <anna/timex/Engine.hpp>
24 #include <anna/timex/internal/TickConsumer.hpp>
25 #include <anna/timex/internal/TickProducer.hpp>
26 #include <anna/timex/TimeEventObserver.hpp>
27 #include <anna/timex/internal/sccs.hpp>
28
29 using namespace std;
30 using namespace anna;
31 using namespace anna::comm;
32
33 /**
34    Minimum resolution (in milliseconds) supported by the time manager
35 */
36 //static
37 const Millisecond anna::timex::Engine::minResolution(10);
38
39
40 anna::timex::Engine::Engine(const Millisecond & maxTimeout, const Millisecond & resolution) :
41   app::Component(getClassName()),
42   a_currentQuantum(0),
43   a_maxQuantum(0),
44   a_maxTimeout(maxTimeout),
45   a_resolution(resolution),
46   a_isActive(false),
47   a_tickProducer(NULL),
48   a_tickConsumer(NULL),
49   a_expiredQuantum(NULL),
50   a_timeTable(NULL) {
51   timex::sccs::activate();
52 }
53
54 anna::timex::Engine::~Engine() {
55   delete a_tickConsumer;
56   delete [] a_timeTable;
57 }
58
59 //--------------------------------------------------------------------------------------------
60 // Inicializa la configuracin.
61 //
62 // (2) En Solaris el ualarm tiene una deriva constante que parece ser que dependen de la
63 //     arquitectura de la m�uina. Para calcular esta deriva y poder corregirla en los
64 //     posteriores ualarm vamos a calcular la diferencia entre el tiempo esperado y el momento
65 //     en que realmente llega la seal de ualarm.
66 //--------------------------------------------------------------------------------------------
67 void anna::timex::Engine::do_initialize()
68 throw(RuntimeException) {
69   LOGMETHOD(TraceMethod tm("timex::Engine", "do_initialize", ANNA_FILE_LOCATION));
70
71   if(a_maxQuantum > 0) {
72     Logger::write(Logger::Warning, "Time controller was previously started", ANNA_FILE_LOCATION);
73     return;
74   }
75
76   if(a_resolution < minResolution)
77     throw RuntimeException(functions::asString("Resolution must be greater than %d milliseconds", minResolution.getValue()), ANNA_FILE_LOCATION);
78
79   if(a_resolution < (Millisecond)100) {
80     LOGWARNING(Logger::warning(functions::asString("Resolutions under 100 milliseconds (%d in this case) slightly increase the CPU usage", a_resolution.getValue()), ANNA_FILE_LOCATION));
81   }
82
83   if(a_maxTimeout <= a_resolution)
84     throw RuntimeException(functions::asString("Max-Timeout must be greater than %d milliseconds", a_resolution.getValue()), ANNA_FILE_LOCATION);
85
86   a_maxQuantum = a_maxTimeout / a_resolution;
87
88   while((a_maxQuantum * a_resolution) <= a_maxTimeout) a_maxQuantum ++;
89
90   a_timeTable = new Quantum [a_maxQuantum];
91   Communicator* communicator = app::functions::component <Communicator> (ANNA_FILE_LOCATION);
92   communicator->attach(a_tickConsumer = new TickConsumer(this));
93   /**
94    * Esto siempre se ejecutará en un thread aparte aunque la librería haya sido generada en modo ST,
95    * porque así evitamos tener que generar el pulso de reloj mediante una alarma.
96    */
97   a_tickProducer = new TickProducer(this, a_tickConsumer->getfdWrite());
98   pthread_attr_t attr;
99   pthread_attr_init(&attr);
100   int errorCode;
101
102   if((errorCode = pthread_create(&a_threadProducer, &attr, TickProducer::exec, a_tickProducer)) != 0)
103     throw RuntimeException(std::string("timex::Engine::do_initialize"), errorCode, ANNA_FILE_LOCATION);
104
105   LOGDEBUG(
106     string msg("Time controller | Max Timeout: ");
107     msg += functions::asString(a_maxTimeout);
108     msg += " | Resolution: ";
109     msg += functions::asString(a_resolution);
110     msg += " | Max Quantum: ";
111     msg += functions::asString(a_maxQuantum);
112     msg += " quantums";
113     Logger::write(Logger::Debug, msg, ANNA_FILE_LOCATION);
114   );
115 }
116
117 // Reimplementado de app::Component
118 void anna::timex::Engine::do_cloneParent()
119 throw() {
120 }
121
122 /*
123  * Se invoca desde app::Application::clone -> app::Component::do_cloneChild (ojo EN EL NUEVO PROCESO).
124  * Instala la senhal de tick en el proceso, ya que la alarma no se hereda directamente.
125  */
126 void anna::timex::Engine::do_cloneChild()
127 throw(RuntimeException) {
128 }
129
130 //----------------------------------------------------------------------------
131 // No para los hilos de generacion, sino que evita que se escriban los bytes
132 // en el 'pipe'.
133 //----------------------------------------------------------------------------
134 void anna::timex::Engine::pause()
135 throw(RuntimeException) {
136   Guard guard(this, "timex::Engine (pause)");
137
138   if(a_tickProducer->isInPause() == false) {
139     LOGWARNING(
140       string msg("timex::Engine::pause | Pending TimeEvents: ");
141       msg += functions::asString((const int) a_directory.size());
142       Logger::warning(msg, ANNA_FILE_LOCATION);
143     );
144     a_tickProducer->setIsInPause(true);
145   }
146 }
147
148 void anna::timex::Engine::play()
149 throw(RuntimeException) {
150   Guard guard(this, "timex::Engine (play)");
151
152   if(a_tickProducer->isInPause() == true) {
153     LOGWARNING(
154       string msg("timex::Engine::play | Pending TimeEvents: ");
155       msg += functions::asString((const int) a_directory.size());
156       Logger::warning(msg, ANNA_FILE_LOCATION);
157     );
158     a_tickProducer->setIsInPause(false);
159   }
160 }
161
162 void anna::timex::Engine::activate(timex::TimeEvent* timeEvent)
163 throw(RuntimeException) {
164   LOGMETHOD(TraceMethod tm(Logger::Local7, "timex::Engine", "activate", ANNA_FILE_LOCATION));
165
166   if(a_maxQuantum == 0)
167     throw RuntimeException("Engine::initialize was not called", ANNA_FILE_LOCATION);
168
169   if(timeEvent == NULL)
170     throw RuntimeException("Cannot activate a NULL TimeEvent", ANNA_FILE_LOCATION);
171
172   if(timeEvent->a_controller != NULL) {
173     string msg(timeEvent->asString());
174     msg += " | Already activated";
175     throw RuntimeException(msg, ANNA_FILE_LOCATION);
176   }
177
178   const Millisecond & timeout(timeEvent->getTimeout());
179
180   if(timeout > a_maxTimeout || timeout < a_resolution) {
181     string msg("Invalid TimeEvent timeout | Max Timeout: ");
182     msg += functions::asString(a_maxTimeout);
183     msg += " | Min timeout (resolution): ";
184     msg += functions::asString(a_resolution);
185     msg += " | ";
186     msg += timeEvent->asString();
187     throw RuntimeException(msg, ANNA_FILE_LOCATION);
188   }
189
190   Guard guard(this, "timex::Engine (activate)");
191
192   if(a_tickProducer->isInPause() == true) {
193     string msg("Cannot activate TimeEvents with timex::Engine in pause | ");
194     msg += timeEvent->asString();
195     throw RuntimeException(msg, ANNA_FILE_LOCATION);
196   }
197
198   const TimeEvent::Id id(timeEvent->getId());
199
200   if(a_directory.find(id) != a_directory.end())
201     throw RuntimeException(functions::asString("Id %ld (0x%x) already in use", id, id), ANNA_FILE_LOCATION);
202
203   int iq = getQuantum(timeout);
204   Quantum* quantum = &a_timeTable [iq];
205   bool delayed;
206   timeEvent->a_controller = this;
207   a_directory.insert(Directory::value_type(id, quantum));
208
209   if(quantum == a_expiredQuantum) {
210     a_delayedQuantum.push_back(timeEvent);
211     delayed = true;
212   } else {
213     quantum->push_back(timeEvent);
214     delayed = false;
215   }
216
217   LOGDEBUG(
218     string msg("timex::Engine::activate | ");
219     msg += timeEvent->asString();
220     msg += " | Current quantum: ";
221     msg += functions::asString(a_currentQuantum);
222     msg += " | Quantum programmed: ";
223     msg += functions::asString(iq);
224     msg += " (";
225     msg += functions::asHexString(anna_ptrnumber_cast(quantum));
226     msg += functions::asText(") | Delayed: ", delayed);
227     Logger::debug(msg, ANNA_FILE_LOCATION);
228   );
229 }
230
231 anna::timex::TimeEvent* anna::timex::Engine::getTimeEvent(const timex::TimeEvent::Id eventTimeId)
232 throw() {
233   LOGMETHOD(TraceMethod tm(Logger::Local7, "timex::Engine", "getTimeEvent", ANNA_FILE_LOCATION));
234   Directory::iterator iid;
235   TimeEvent* result(NULL);
236   Quantum* quantum;
237   TimeEvent* aux;
238   Guard guard(this, "timex::Engine (getTimeEvent)");
239
240   if((iid = a_directory.find(eventTimeId)) == a_directory.end())
241     return NULL;
242
243   quantum = iid->second;
244
245   for(Quantum::iterator ii = quantum->begin(), maxii = quantum->end(); ii != maxii; ii ++) {
246     if((aux = *ii)->getId() == eventTimeId) {
247       result = aux;
248       break;
249     }
250   }
251
252   return result;
253 }
254
255 void anna::timex::Engine::cancel(timex::TimeEvent* timeEvent)
256 throw(RuntimeException) {
257   LOGMETHOD(TraceMethod tm(Logger::Local7, "timex::Engine", "cancel", ANNA_FILE_LOCATION));
258
259   if(timeEvent == NULL)
260     throw RuntimeException("Cannot cancel a NULL TimeEvent", ANNA_FILE_LOCATION);
261
262   if(timeEvent->a_controller == NULL) {
263     LOGDEBUG(
264       string msg(timeEvent->asString());
265       msg += " | Not activated";
266       Logger::write(Logger::Debug, msg, ANNA_FILE_LOCATION);
267     );
268     return;
269   }
270
271   Directory::iterator iid;
272   Quantum* quantum;
273   Guard guard(this, "timex::Engine (cancel)");
274
275   if((iid = a_directory.find(timeEvent->getId())) == a_directory.end())
276     return;
277
278   if((quantum = iid->second) == a_expiredQuantum) {
279     LOGWARNING(
280       string msg("timex::Engine::cancel | ");
281       msg += timeEvent->asString();
282       msg += " | Processing already programmed";
283       Logger::warning(msg, ANNA_FILE_LOCATION);
284     );
285     return;
286   }
287
288   a_directory.erase(iid);
289   quantum->erase(find(quantum->begin(), quantum->end(), timeEvent));
290   timeEvent->a_controller = NULL;
291   LOGDEBUG(
292     string msg("timex::Engine::cancel | ");
293     msg += timeEvent->asString();
294     msg += " | Current quantum: ";
295     msg += functions::asString(a_currentQuantum);
296     msg += " | Quantum programmed: ";
297     msg += functions::asHexString(anna_ptrnumber_cast(quantum));
298     Logger::debug(msg, ANNA_FILE_LOCATION);
299   );
300   notifyRelease(timeEvent);
301 }
302
303 void anna::timex::Engine::do_stop()
304 throw() {
305   LOGMETHOD(TraceMethod tm("timex::Engine", "do_stop", ANNA_FILE_LOCATION));
306   Quantum::iterator ii, maxii;
307   Guard guard(this, "timex::Engine (do_stop)");
308   app::functions::component <Communicator> (ANNA_FILE_LOCATION)->detach(a_tickConsumer);
309
310   if(a_tickProducer)
311     a_tickProducer->requestStop();
312
313   a_directory.clear();
314   TimeEvent* timeEvent;
315
316   for(int iq = 0; iq < a_maxQuantum; iq ++) {
317     Quantum& quantum(a_timeTable [iq]);
318
319     for(ii = quantum.begin(), maxii = quantum.end(); ii != maxii; ii ++) {
320       timeEvent = Engine::timeEvent(ii);
321
322       try {
323         timeEvent->a_controller = NULL;
324         timeEvent->stop();
325         notifyRelease(timeEvent);
326       } catch(Exception& ex) {
327         ex.trace();
328       }
329     }
330
331     quantum.clear();
332   }
333 }
334
335 void anna::timex::Engine::kill()
336 throw() {
337   Guard guard(this, "timex::Engine (kill)");
338   app::functions::component <Communicator> (ANNA_FILE_LOCATION)->detach(a_tickConsumer);
339
340   if(a_tickProducer)
341     a_tickProducer->requestStop();
342
343   string msg("timex::Engine::kill | ");
344   msg += functions::asString(" | Pending TimeEvents: %d", a_directory.size());
345   Logger::critical(msg, ANNA_FILE_LOCATION);
346
347   for(int iq = 0; iq < a_maxQuantum; iq ++)
348     a_timeTable [iq].clear();
349
350   a_delayedQuantum.clear();
351   a_directory.clear();
352 }
353
354 //----------------------------------------------------------------------------------
355 // (1) Podr� ser que al invocar al m�odo 'expire' de una evento se halla originado
356 //     la cancelacin de algn otro evento. As�que si el id de evento ya no esta en
357 //     en el directorio => da igual, se ignora ya que luego borraremos todos los
358 //     eventos del Quantum.
359 // (2) Si mientas estamos caducando este Quantum hay que activar transacciones que
360 //     debe ir sobre el => se guardan en un vector temporal y se copian al final
361 //     del proceso, con lo que evita totalmente la posiblidad de perdida.
362 // (3) Si el temporizador ha sido reactivado no tiene que liberarlo.
363 //----------------------------------------------------------------------------------
364 void anna::timex::Engine::tick()
365 throw(RuntimeException) {
366   LOGMETHOD(TraceMethod tm(Logger::Local7, "timex::Engine", "tick", ANNA_FILE_LOCATION));
367   /*
368      if (Logger::isActive (Logger::Local6) == true && a_directory.size () > 0) {
369         string msg ("Tick | Quantum: ");
370         msg += functions::asString (a_currentQuantum);
371         msg += " | Tx: ";
372         msg += functions::asString (functions::millisecond ());
373         msg += " | N: ";
374         msg += functions::asString (a_directory.size ());
375         Logger::write (Logger::Local6, msg, ANNA_FILE_LOCATION);
376      };
377   */
378   Guard guard(this, "timex::Engine (tick)");
379   int counter(0);
380   eventBeginQuantum();
381   a_expiredQuantum = &a_timeTable [a_currentQuantum];
382   TimeEvent* timeEvent;
383   Directory::iterator iid;
384   quantum_iterator ii, maxii;
385   bool observed;
386
387   for(ii = a_expiredQuantum->begin(), maxii = a_expiredQuantum->end(); ii != maxii; ii ++) {
388     timeEvent = Engine::timeEvent(ii);
389
390     if((iid = a_directory.find(timeEvent->getId())) == a_directory.end())            // (1)
391       continue;
392
393     a_directory.erase(iid);
394
395     if(Logger::isActive(Logger::Debug)) {
396       string msg(timeEvent->asString());
397       msg += " | Quantum programmed: ";
398       msg += functions::asHexString(anna_ptrnumber_cast(a_expiredQuantum));
399       msg += " | Expired";
400       Logger::write(Logger::Debug, msg, ANNA_FILE_LOCATION);
401     }
402
403     timeEvent->a_controller = NULL;
404     /*
405      * Si tiene NO tiene observer asociado, seguramente se auto-liberará en el expire
406      * Si tiene observer, se invocará al método correspondiente.
407      */
408     observed = (timeEvent->a_observer != NULL);
409
410     try {
411       timeEvent->expire(this);
412       counter ++;
413     } catch(Exception& ex) {
414       ex.trace();
415     }
416
417     // Si tiene observer => el expire NO puede haber liberado el TimeEvent
418     if(observed == true) {
419       if(timeEvent->a_controller == NULL)                                           // (3)
420         timeEvent->a_observer->release(timeEvent);
421     }
422   }
423
424   a_expiredQuantum->clear();
425
426   if(a_delayedQuantum.size() > 0) {                                                 // (2)
427     for(ii = a_delayedQuantum.begin(), maxii = a_delayedQuantum.end(); ii != maxii; ii ++)
428       a_expiredQuantum->push_back(Engine::timeEvent(ii));
429
430     a_delayedQuantum.clear();
431   }
432
433   a_expiredQuantum = NULL;
434
435   if(++ a_currentQuantum >= a_maxQuantum)
436     a_currentQuantum = 0;
437
438   eventEndQuantum(counter);
439 }
440
441 void anna::timex::Engine::notifyRelease(timex::TimeEvent* timeEvent)
442 throw() {
443   if(timeEvent->a_observer != NULL)
444     timeEvent->a_observer->release(timeEvent);
445 }
446
447 string anna::timex::Engine::asString() const
448 throw() {
449   string msg("timex::Engine { ");
450   msg += app::Component::asString();
451   msg += " | Max Timeout: ";
452   msg += functions::asString(a_maxTimeout);
453   msg += " ms | Resolution: ";
454   msg += functions::asString(a_resolution);
455   msg += " ms | Programming: ";
456   msg += functions::asString(a_maxQuantum);
457   msg += " quantums | Pending TimeEvents: ";
458   msg += functions::asString((const int) a_directory.size());
459   return msg += " }";
460 }
461
462 xml::Node* anna::timex::Engine::asXML(xml::Node* parent) const
463 throw() {
464   parent = Component::asXML(parent);
465   xml::Node* result = parent->createChild("timex.Engine");
466   result->createAttribute("MaxTimeout", a_maxTimeout);
467   result->createAttribute("Resolution", a_resolution);
468   result->createAttribute("N", a_directory.size());
469   result->createAttribute("Pause", functions::asString(a_tickProducer->isInPause()));
470   xml::Node* node = result->createChild("Quantum");
471   node->createAttribute("Current", a_currentQuantum);
472   node->createAttribute("Max", a_maxQuantum);
473   return result;
474 }