Fix local server for multiple applications
[anna.git] / source / core / mt / ThreadManager.cpp
1 // ANNA - Anna is Not Nothingness Anymore                                                         //
2 //                                                                                                //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo                         //
4 //                                                                                                //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite                           //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
7
8
9 #include <unistd.h>
10
11 #include <iostream>
12
13 #include <anna/core/functions.hpp>
14
15 #include <anna/core/mt/ThreadManager.hpp>
16 #include <anna/core/mt/Semaphore.hpp>
17 #include <anna/core/mt/Thread.hpp>
18 #include <anna/core/tracing/TraceMethod.hpp>
19
20 using namespace std;
21 using namespace anna;
22
23 anna_assign_enum(ThreadManager::Mode) = { "None", "Unlimit", "ExceptionWhenFull", "LockWhenFull", NULL };
24
25 ThreadManager::ThreadManager(const char* name, const Mode::_v mode, const int maxSize, const int flags) :
26   a_name(name),
27   a_mode(mode),
28   a_maxSize(maxSize),
29   a_semaphore(NULL),
30   a_threadFlags(flags) {
31   if(a_mode == Mode::LockWhenFull)
32     a_semaphore = new Semaphore(0);
33 }
34
35 ThreadManager::ThreadManager(const char* name, const int flags) :
36   a_name(name),
37   a_mode(Mode::Unlimit),
38   a_maxSize(-1),
39   a_semaphore(NULL),
40   a_threadFlags(flags) {
41   if(a_mode == Mode::LockWhenFull)
42     a_semaphore = new Semaphore(0);
43 }
44
45 ThreadManager::~ThreadManager() {
46 #ifdef _MT
47
48   try {
49     // Para que no intenten sacar el nombre del ThreadManager que los creĆ³
50     for(thread_iterator ii = thread_begin(), maxii = thread_end(); ii != maxii; ii ++) {
51       thread(ii)->a_manager = NULL;
52     }
53
54     if(a_semaphore) {
55       a_semaphore->signal();
56       delete a_semaphore;
57     }
58   } catch(RuntimeException& ex) {
59     ex.trace();
60   }
61
62 #else
63
64   if(a_semaphore)
65     delete a_semaphore;
66
67 #endif
68 }
69
70 Thread* ThreadManager::createThread()
71 noexcept(false) {
72   Thread* result;
73
74   if(a_mode == Mode::None) {
75     string msg(asString());
76     msg += " | Invalid mode";
77     throw RuntimeException(msg, ANNA_FILE_LOCATION);
78   }
79
80   if(a_mode != Mode::Unlimit && a_maxSize <= 0) {
81     string msg(asString());
82     msg += " | Invalid max thread number";
83     throw RuntimeException(msg, ANNA_FILE_LOCATION);
84   }
85
86   Guard guard(this, "anna::ThreadManager::createThread");
87   const int size = Recycler <Thread>::size();
88
89   switch(a_mode) {
90   case Mode::ExceptionWhenFull:
91
92     if(a_maxSize == size) {
93       string msg(asString());
94       msg += " | No available threads";
95       throw RuntimeException(msg, ANNA_FILE_LOCATION);
96     }
97
98     // No hay "break" para que siga procesando
99   case Mode::Unlimit:
100     result = Recycler <Thread>::create();
101     break;
102   case Mode::LockWhenFull:
103
104     if(a_maxSize == size) {
105       guard.deactivate();
106       LOGDEBUG(
107         string msg(asString());
108         msg += " | Waiting for thread release";
109         Logger::debug(msg, ANNA_FILE_LOCATION);
110       );
111       a_semaphore->wait();
112       LOGDEBUG(
113         string msg(asString());
114         msg += " | Achieve thread release";
115         Logger::debug(msg, ANNA_FILE_LOCATION);
116       );
117       Guard reopen(this, "anna::ThreadManager::createThread (after signal)");
118       result = Recycler <Thread>::create();
119     } else
120       result = Recycler <Thread>::create();
121
122     break;
123   default: break;
124   }
125
126   result->a_manager = this;
127   result->setFlags(a_threadFlags);
128   LOGDEBUG(
129     string msg("ThreadManager::createThread | ");
130     msg += asString();
131     msg += " | ";
132     msg += result->asString();
133     Logger::debug(msg, ANNA_FILE_LOCATION);
134   );
135   return result;
136 }
137
138 void ThreadManager::join()
139 noexcept(false) {
140   LOGDEBUG(
141     string msg("ThreadManager::join (init) | ");
142     msg += asString();
143     Logger::debug(msg, ANNA_FILE_LOCATION);
144   );
145 #ifdef _MT
146   lock();
147   const pthread_t self(pthread_self());
148   pthread_t* threads = new pthread_t [Recycler <Thread>::size()];
149   int index = 0;
150
151   for(thread_iterator ii = thread_begin(), maxii = thread_end(); ii != maxii; ii ++) {
152     threads [index] = thread(ii)->getId();
153
154     if(pthread_equal(threads [index], self) != 0) {
155       string msg(asString());
156       msg += " | ";
157       thread(ii)->asString();
158       msg += " | Threads owns to this Manager";
159       unlock();
160       throw RuntimeException(msg, ANNA_FILE_LOCATION);
161     }
162
163     index ++;
164   }
165
166   unlock();
167   int errorCode;
168
169   for(int ii = 0; ii < index; ii ++) {
170     if((errorCode = pthread_join(threads [ii], NULL)) != 0) {
171       string msg(asString());
172       msg += " | Bad join";
173       throw RuntimeException(msg, errorCode, ANNA_FILE_LOCATION);
174     }
175   }
176
177   delete [] threads;
178 #endif
179   LOGDEBUG(
180     string msg("ThreadManager::join (final) | ");
181     msg += asString();
182     Logger::debug(msg, ANNA_FILE_LOCATION);
183   );
184 }
185
186 void ThreadManager::releaseThread(Thread* thread)
187 noexcept(false) {
188   if(thread->isRunning() == true) {
189     string msg(thread->asString());
190     msg += " | Still activated";
191     throw RuntimeException(msg, ANNA_FILE_LOCATION);
192   }
193
194   if(a_mode == Mode::LockWhenFull) {
195     Guard guard(this, "anna::ThreadManager::releaseThread");
196     // No hace falta acceder mediate SafeRecycler porque ya tenemos una SSCC establecida.
197     const int size = Recycler <Thread>::getSize();
198     Recycler <Thread>::release(thread);
199
200     if(size == a_maxSize)
201       a_semaphore->signal();
202   } else
203     SafeRecycler <Thread>::release(thread);
204
205   LOGDEBUG(
206     string msg("ThreadManager::releaseThread | ");
207     msg += asString();
208     Logger::debug(msg, ANNA_FILE_LOCATION);
209   )
210 }
211
212 string ThreadManager::asString() const
213 {
214   string result("anna::ThreadManager { Name: ");
215   result += a_name;
216   result += " | Mode: ";
217   result += Mode::asCString(a_mode);
218   result += " | MaxSize: ";
219
220   if(a_maxSize > 0)
221     result += functions::asString(a_maxSize);
222   else
223     result += "<unlimited>";
224
225   result += functions::asText(" | Size: ", Recycler <Thread>::size());
226   return result += " }";
227 }
228