First commit
[anna.git] / source / core / mt / ThreadManager.cpp
1 // ANNA - Anna is Not 'N' Anymore
2 //
3 // (c) Copyright 2005-2014 Eduardo Ramos Testillano & Francisco Ruiz Rayo
4 //
5 // https://bitbucket.org/testillano/anna
6 //
7 // Redistribution and use in source and binary forms, with or without
8 // modification, are permitted provided that the following conditions
9 // are met:
10 //
11 //     * Redistributions of source code must retain the above copyright
12 // notice, this list of conditions and the following disclaimer.
13 //     * Redistributions in binary form must reproduce the above
14 // copyright notice, this list of conditions and the following disclaimer
15 // in the documentation and/or other materials provided with the
16 // distribution.
17 //     * Neither the name of Google Inc. nor the names of its
18 // contributors may be used to endorse or promote products derived from
19 // this software without specific prior written permission.
20 //
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 //
33 // Authors: eduardo.ramos.testillano@gmail.com
34 //          cisco.tierra@gmail.com
35
36
37 #include <unistd.h>
38
39 #include <iostream>
40
41 #include <anna/core/functions.hpp>
42
43 #include <anna/core/mt/ThreadManager.hpp>
44 #include <anna/core/mt/Semaphore.hpp>
45 #include <anna/core/mt/Thread.hpp>
46 #include <anna/core/tracing/TraceMethod.hpp>
47
48 using namespace std;
49 using namespace anna;
50
51 anna_assign_enum(ThreadManager::Mode) = { "None", "Unlimit", "ExceptionWhenFull", "LockWhenFull", NULL };
52
53 ThreadManager::ThreadManager(const char* name, const Mode::_v mode, const int maxSize, const int flags) :
54   a_name(name),
55   a_mode(mode),
56   a_maxSize(maxSize),
57   a_semaphore(NULL),
58   a_threadFlags(flags) {
59   if(a_mode == Mode::LockWhenFull)
60     a_semaphore = new Semaphore(0);
61 }
62
63 ThreadManager::ThreadManager(const char* name, const int flags) :
64   a_name(name),
65   a_mode(Mode::Unlimit),
66   a_maxSize(-1),
67   a_semaphore(NULL),
68   a_threadFlags(flags) {
69   if(a_mode == Mode::LockWhenFull)
70     a_semaphore = new Semaphore(0);
71 }
72
73 ThreadManager::~ThreadManager() {
74 #ifdef _MT
75
76   try {
77     // Para que no intenten sacar el nombre del ThreadManager que los creĆ³
78     for(thread_iterator ii = thread_begin(), maxii = thread_end(); ii != maxii; ii ++) {
79       thread(ii)->a_manager = NULL;
80     }
81
82     if(a_semaphore) {
83       a_semaphore->signal();
84       delete a_semaphore;
85     }
86   } catch(RuntimeException& ex) {
87     ex.trace();
88   }
89
90 #else
91
92   if(a_semaphore)
93     delete a_semaphore;
94
95 #endif
96 }
97
98 Thread* ThreadManager::createThread()
99 throw(RuntimeException) {
100   Thread* result;
101
102   if(a_mode == Mode::None) {
103     string msg(asString());
104     msg += " | Invalid mode";
105     throw RuntimeException(msg, ANNA_FILE_LOCATION);
106   }
107
108   if(a_mode != Mode::Unlimit && a_maxSize <= 0) {
109     string msg(asString());
110     msg += " | Invalid max thread number";
111     throw RuntimeException(msg, ANNA_FILE_LOCATION);
112   }
113
114   Guard guard(this, "anna::ThreadManager::createThread");
115   const int size = Recycler <Thread>::size();
116
117   switch(a_mode) {
118   case Mode::ExceptionWhenFull:
119
120     if(a_maxSize == size) {
121       string msg(asString());
122       msg += " | No available threads";
123       throw RuntimeException(msg, ANNA_FILE_LOCATION);
124     }
125
126     // No hay "break" para que siga procesando
127   case Mode::Unlimit:
128     result = Recycler <Thread>::create();
129     break;
130   case Mode::LockWhenFull:
131
132     if(a_maxSize == size) {
133       guard.deactivate();
134       LOGDEBUG(
135         string msg(asString());
136         msg += " | Waiting for thread release";
137         Logger::debug(msg, ANNA_FILE_LOCATION);
138       );
139       a_semaphore->wait();
140       LOGDEBUG(
141         string msg(asString());
142         msg += " | Achieve thread release";
143         Logger::debug(msg, ANNA_FILE_LOCATION);
144       );
145       Guard reopen(this, "anna::ThreadManager::createThread (after signal)");
146       result = Recycler <Thread>::create();
147     } else
148       result = Recycler <Thread>::create();
149
150     break;
151   }
152
153   result->a_manager = this;
154   result->setFlags(a_threadFlags);
155   LOGDEBUG(
156     string msg("ThreadManager::createThread | ");
157     msg += asString();
158     msg += " | ";
159     msg += result->asString();
160     Logger::debug(msg, ANNA_FILE_LOCATION);
161   );
162   return result;
163 }
164
165 void ThreadManager::join()
166 throw(RuntimeException) {
167   LOGDEBUG(
168     string msg("ThreadManager::join (init) | ");
169     msg += asString();
170     Logger::debug(msg, ANNA_FILE_LOCATION);
171   );
172 #ifdef _MT
173   lock();
174   const pthread_t self(pthread_self());
175   pthread_t* threads = new pthread_t [Recycler <Thread>::size()];
176   int index = 0;
177
178   for(thread_iterator ii = thread_begin(), maxii = thread_end(); ii != maxii; ii ++) {
179     threads [index] = thread(ii)->getId();
180
181     if(pthread_equal(threads [index], self) != 0) {
182       string msg(asString());
183       msg += " | ";
184       thread(ii)->asString();
185       msg += " | Threads owns to this Manager";
186       unlock();
187       throw RuntimeException(msg, ANNA_FILE_LOCATION);
188     }
189
190     index ++;
191   }
192
193   unlock();
194   int errorCode;
195
196   for(int ii = 0; ii < index; ii ++) {
197     if((errorCode = pthread_join(threads [ii], NULL)) != 0) {
198       string msg(asString());
199       msg += " | Bad join";
200       throw RuntimeException(msg, errorCode, ANNA_FILE_LOCATION);
201     }
202   }
203
204   delete [] threads;
205 #endif
206   LOGDEBUG(
207     string msg("ThreadManager::join (final) | ");
208     msg += asString();
209     Logger::debug(msg, ANNA_FILE_LOCATION);
210   );
211 }
212
213 void ThreadManager::releaseThread(Thread* thread)
214 throw(RuntimeException) {
215   if(thread->isRunning() == true) {
216     string msg(thread->asString());
217     msg += " | Still activated";
218     throw RuntimeException(msg, ANNA_FILE_LOCATION);
219   }
220
221   if(a_mode == Mode::LockWhenFull) {
222     Guard guard(this, "anna::ThreadManager::releaseThread");
223     // No hace falta acceder mediate SafeRecycler porque ya tenemos una SSCC establecida.
224     const int size = Recycler <Thread>::getSize();
225     Recycler <Thread>::release(thread);
226
227     if(size == a_maxSize)
228       a_semaphore->signal();
229   } else
230     SafeRecycler <Thread>::release(thread);
231
232   LOGDEBUG(
233     string msg("ThreadManager::releaseThread | ");
234     msg += asString();
235     Logger::debug(msg, ANNA_FILE_LOCATION);
236   )
237 }
238
239 string ThreadManager::asString() const
240 throw() {
241   string result("anna::ThreadManager { Name: ");
242   result += a_name;
243   result += " | Mode: ";
244   result += Mode::asCString(a_mode);
245   result += " | MaxSize: ";
246
247   if(a_maxSize > 0)
248     result += functions::asString(a_maxSize);
249   else
250     result += "<unlimited>";
251
252   result += functions::asText(" | Size: ", Recycler <Thread>::size());
253   return result += " }";
254 }
255