Update date in Doxyfile
[anna.git] / source / comm / CongestionController.cpp
1 // ANNA - Anna is Not Nothingness Anymore                                                         //
2 //                                                                                                //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo                         //
4 //                                                                                                //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite                           //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
7
8
9 #include <sys/ioctl.h>
10 #include <fcntl.h>
11 #include <sys/types.h>
12 #include <sys/socket.h>
13 #include <stdlib.h>
14
15 #include <anna/core/tracing/Logger.hpp>
16 #include <anna/core/functions.hpp>
17 #include <anna/core/RuntimeException.hpp>
18
19 #include <anna/xml/Node.hpp>
20 #include <anna/xml/Attribute.hpp>
21
22 #include <anna/app/functions.hpp>
23
24 #include <anna/comm/CongestionController.hpp>
25 #include <anna/comm/ClientSocket.hpp>
26 #include <anna/comm/Communicator.hpp>
27
28 using namespace std;
29 using namespace anna;
30
31 // static
32 const int comm::CongestionController::MaxPendingBytes = 64 * 1024;
33 const Millisecond comm::CongestionController::DelayTrace(5000);
34
35 // Si pasa este tiempo sin recibir peticiones de consejo => entiende que el proceso ha dejado de recibir carga => inicializará las estadísticas
36 //static
37 const Millisecond comm::CongestionController::HeartBeat(1000);
38
39 comm::CongestionController::CongestionController() :
40   a_avgWorkload("comm::CongestionController::AvgWorkload"),
41   a_timeTrace(0),
42   a_maxPendingBytes(UnusedPendingBytes),
43   a_incomingSocketCounter(0),
44   a_tickTime(0) {
45   srand(time(NULL));
46   a_percentage [0] = 25;
47   a_percentage [1] = 50;
48   a_percentage [2] = 85;
49   a_percentage [3] = 99;
50   a_limit = 0;
51   setLimit(DefaultLimit);
52   setMode(Mode::Auto);
53 }
54
55 void comm::CongestionController::setLimit(const int limit)
56 throw() {
57   Guard guard(a_mutex, "comm::CongestionController (setLimit)");
58
59   if(limit < 0  || limit > 100) {
60     LOGWARNING(
61       Logger::warning(
62         functions::asString("comm::CongestionController::setLimit | Limit %d out of range [0,100]", limit),
63         ANNA_FILE_LOCATION
64       );
65     );
66     return;
67   }
68
69   if(a_limit == limit)
70     return;
71
72   int congestionArea = 100 - (a_limit = limit);
73   a_discardLevel [0] = (congestionArea * a_percentage [0] / 100) + a_limit;
74   a_discardLevel [1] = (congestionArea * a_percentage [1] / 100) + a_limit;
75   a_discardLevel [2] = (congestionArea * a_percentage [2] / 100) + a_limit;
76   a_discardLevel [3] = 100;
77   a_messageCounter = a_discardedCounter = 0;
78   LOGINFORMATION(
79     string msg = functions::asString(
80                    "comm::CongestionController::setLimit | Limit: %d | ", limit
81                  );
82
83     for(int i = 0; i < MaxLevel; i ++)
84     msg += functions::asString("(Level %d: %d%%) ", i + 1, a_discardLevel [i]);
85     Logger::information(msg, ANNA_FILE_LOCATION);
86   );
87 }
88
89 void comm::CongestionController::setMaxPendingBytes(const int maxPendingBytes)
90 throw(RuntimeException) {
91   if(maxPendingBytes == UnusedPendingBytes) {
92     a_maxPendingBytes = UnusedPendingBytes;
93     return;
94   }
95
96 //   const int minimum (Communicator::getReceivingChunkSize ());
97 //   const int maximum (min (minimum << 1, CongestionController::MaxPendingBytes));
98   const int minimum(0);
99   const int maximum(CongestionController::MaxPendingBytes);
100
101   if(maxPendingBytes < minimum || maxPendingBytes > maximum) {
102     string msg("comm::CongestionController::setMaxPendingBytes | ");
103     msg += functions::asString("MaxPendingBytes (%d bytes) must be between %d bytes and %d bytes", maxPendingBytes, minimum, maximum);
104     throw RuntimeException(msg, ANNA_FILE_LOCATION);
105   }
106
107   a_maxPendingBytes = maxPendingBytes;
108 }
109
110 /*
111  * Se invoca desde [Tzzz] seguramente que con el clientSocket protegido por la sección crítica establecida
112  * por su manejador asociado.
113  */
114 comm::CongestionController::Advice::_v comm::CongestionController::getAdvice(const ClientSocket& clientSocket)
115 throw() {
116   Guard guard(a_mutex, "comm::CongestionController::getAdvice");
117
118   if(a_limit == 0) {
119     a_messageCounter ++;
120     return Advice::Process;
121   }
122
123   Advice::_v result = Advice::Process;
124   int workload = calculeWorkload(clientSocket);
125   int discard = 0;
126
127   // Si le da la vuelta al marcador
128   if(++ a_messageCounter == 0) {
129     a_messageCounter = 1;
130     a_discardedCounter = 0;
131     a_avgWorkload.clear();
132   }
133
134   const Millisecond now = functions::millisecond();
135
136   if((now - a_tickTime) > HeartBeat)
137     a_avgWorkload.clear();
138
139   a_tickTime = now;
140
141   if(workload != -1) {
142     a_avgWorkload += workload;
143
144     if(a_effectiveMode == Mode::Global)
145       workload = a_avgWorkload;
146
147     LOGDEBUG(
148       std::string msg("comm::CongestionController::getAdvice | ");
149       msg += a_avgWorkload.asString();
150       msg += functions::asText(" | inmediate workload: ", workload);
151       Logger::debug(msg, ANNA_FILE_LOCATION)
152     );
153
154     if(workload  < a_limit) {
155       result = Advice::Process;
156       discard = 0;
157     } else {
158       result = Advice::Discard;
159       discard = 100;
160
161       for(int i = 0; i < MaxLevel; i ++) {
162         if(workload > a_discardLevel [i])
163           continue;
164
165         result = ((rand() % 101) > (discard = a_percentage [i])) ? Advice::Process : Advice::Discard;
166         break;
167       }
168     }
169   }
170
171   if(result == Advice::Process) {
172     LOGDEBUG(
173       const int ratio = (a_messageCounter - a_discardedCounter) * 100 / a_messageCounter;
174       string msg = functions::asString(
175                      "Mode: %s | Limit: %d%% | Workload: %d%% | Filter: %d%% | Ratio: %d%% | Result: Process",
176                      ((a_effectiveMode == Mode::Local) ? "Local" : "Global"), a_limit, workload, discard, ratio
177                    );
178       Logger::debug(msg, ANNA_FILE_LOCATION);
179     );
180   } else {
181     a_discardedCounter ++;
182     const int ratio = (a_messageCounter - a_discardedCounter) * 100 / a_messageCounter;
183
184     if(Logger::isActive(Logger::Debug) == true) {
185       string msg = functions::asString(
186                      "Mode: %s | Limit: %d%% | Workload: %d%% | Filter: %d%% | Ratio (%u/%u): %d%% | Result: Discard",
187                      ((a_effectiveMode == Mode::Local) ? "Local" : "Global"), a_limit, workload, discard, a_messageCounter, a_discardedCounter, ratio
188                    );
189       Logger::debug(msg, ANNA_FILE_LOCATION);
190     } else if(Logger::isActive(Logger::Warning)) {
191       Millisecond now = functions::millisecond();
192
193       if((now - a_timeTrace) >= DelayTrace) {
194         string msg = functions::asString(
195                        "Mode: %s | Limit: %d%% | Workload: %d%% | Filter: %d%% | Ratio (%u/%u): %d%% | Result: Discard",
196                        ((a_effectiveMode == Mode::Local) ? "Local" : "Global"), a_limit, workload, discard, a_messageCounter, a_discardedCounter, ratio
197                      );
198         Logger::warning(msg, ANNA_FILE_LOCATION);
199         a_timeTrace = now;
200       }
201     }
202   }
203
204   return result;
205 }
206
207 void comm::CongestionController::incrementIncomingSocket()
208 throw(RuntimeException) {
209   Guard guard(a_mutex, "comm::CongestionController::incrementIncomingSocket");
210   a_incomingSocketCounter ++;
211 }
212
213 void comm::CongestionController::decrementIncomingSocket()
214 throw(RuntimeException) {
215   if(a_incomingSocketCounter ==  0)
216     return;
217
218   Guard guard(a_mutex, "comm::CongestionController::incrementIncomingSocket");
219   a_timeTrace = 0;
220
221   if(a_incomingSocketCounter == 0) {
222     a_avgWorkload.clear();
223     return;
224   }
225
226   const int average = a_avgWorkload;
227
228   unsigned int individualWorkload = average / a_incomingSocketCounter;
229
230   if(-- a_incomingSocketCounter < 0)
231     a_incomingSocketCounter = 0;
232
233   if(a_incomingSocketCounter > 0)
234     a_avgWorkload.setValue(individualWorkload * a_incomingSocketCounter * a_incomingSocketCounter, a_incomingSocketCounter);
235   else
236     a_avgWorkload.clear();
237 }
238
239 int comm::CongestionController::calculeWorkload(const ClientSocket& clientSocket) const
240 throw() {
241   int maxSize;
242
243   if(a_maxPendingBytes == UnusedPendingBytes)
244     maxSize = clientSocket.getReceiveBufferSize();
245   else
246     maxSize = a_maxPendingBytes;
247
248   // El tamaño actual tiene en cuenta lo que ha cargado en memoria, más lo que hay en el I/O pendiente de cargar
249   const int size = clientSocket.getTotalPendingBytes();
250   const int result = (maxSize == 0) ? -1 : (size * 100) / maxSize;
251   LOGDEBUG(
252     string msg = functions::asString("fd: %d | Size (%d%%): %d/%d", clientSocket.getfd(), result, size, maxSize);
253     Logger::debug(msg, ANNA_FILE_LOCATION);
254   );
255   return result;
256 }
257
258 xml::Node* comm::CongestionController::asXML(xml::Node* parent) const
259 throw() {
260   static const char* modetxt [] = { "Auto", "Local", "Global" };
261   xml::Node* result = parent->createChild("comm.CongestionController");
262   result->createAttribute("Limit", a_limit);
263   result->createAttribute("Mode", modetxt [a_mode]);
264   result->createAttribute("EffectiveMode", modetxt [a_effectiveMode]);
265
266   if(a_maxPendingBytes != UnusedPendingBytes)
267     result->createAttribute("MaxPendingBytes", a_maxPendingBytes);
268
269   xml::Node* w;
270   Workload current = getAccumulatedWorkload();
271   w = result->createChild("CurrentStatus");
272   w->createAttribute("Level", getLevel(current));
273   w->createAttribute("Workload", getLoad(current));
274   w->createAttribute("Process", a_messageCounter);
275   w->createAttribute("Discard", a_discardedCounter);
276   w->createAttribute("IncominSockets", a_incomingSocketCounter);
277   w = result->createChild("Levels");
278   w->createAttribute(
279     "Ratio", ((a_messageCounter == 0) ? 0 : (a_messageCounter - a_discardedCounter) * 100 / a_messageCounter)
280   );
281   xml::Node* node;
282   node = w->createChild("Level");
283   node->createAttribute("Id", 0);
284   node->createAttribute("Limit", a_limit - 1);
285   node->createAttribute("Discarding", 0);
286
287   for(int i = 0; i < MaxLevel; i ++) {
288     node = w->createChild("Level");
289     node->createAttribute("Id", i + 1);
290     node->createAttribute("Limit", a_discardLevel [i]);
291     node->createAttribute("Discarding", a_percentage [i]);
292   }
293
294   return result;
295 }
296
297 comm::CongestionController::Workload comm::CongestionController::getAccumulatedWorkload() const
298 throw() {
299   Workload result;
300   const Millisecond now = functions::millisecond();
301
302   if((now - a_tickTime) > HeartBeat) {
303     CongestionController* _this = const_cast <CongestionController*>(this);
304     _this->a_avgWorkload.clear();
305     _this->a_tickTime = now;
306   }
307
308   result.second = a_avgWorkload;
309
310   if(result.second < a_limit)
311     result.first = 0;
312   else {
313     result.first = 4;
314
315     for(int ii = 0; ii < MaxLevel; ii ++) {
316       if(result.second > a_discardLevel [ii])
317         continue;
318
319       result.first = ii + 1;
320       break;
321     }
322   }
323
324   return result;
325 }
326
327
328 comm::CongestionController::Workload comm::CongestionController::getCurrentWorkload(const comm::ClientSocket& clientSocket) const
329 throw() {
330   Workload result;
331   result.second = calculeWorkload(clientSocket);
332
333   if(result.second < a_limit)
334     result.first = 0;
335   else {
336     result.first = 4;
337
338     for(int ii = 0; ii < MaxLevel; ii ++) {
339       if(result.second > a_discardLevel [ii])
340         continue;
341
342       result.first = ii + 1;
343       break;
344     }
345   }
346
347   return result;
348 }