First commit
[anna.git] / source / comm / CongestionController.cpp
1 // ANNA - Anna is Not 'N' Anymore
2 //
3 // (c) Copyright 2005-2014 Eduardo Ramos Testillano & Francisco Ruiz Rayo
4 //
5 // https://bitbucket.org/testillano/anna
6 //
7 // Redistribution and use in source and binary forms, with or without
8 // modification, are permitted provided that the following conditions
9 // are met:
10 //
11 //     * Redistributions of source code must retain the above copyright
12 // notice, this list of conditions and the following disclaimer.
13 //     * Redistributions in binary form must reproduce the above
14 // copyright notice, this list of conditions and the following disclaimer
15 // in the documentation and/or other materials provided with the
16 // distribution.
17 //     * Neither the name of Google Inc. nor the names of its
18 // contributors may be used to endorse or promote products derived from
19 // this software without specific prior written permission.
20 //
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 //
33 // Authors: eduardo.ramos.testillano@gmail.com
34 //          cisco.tierra@gmail.com
35
36
37 #include <sys/ioctl.h>
38 #include <fcntl.h>
39 #include <sys/types.h>
40 #include <sys/socket.h>
41 #include <stdlib.h>
42
43 #include <anna/core/tracing/Logger.hpp>
44 #include <anna/core/functions.hpp>
45 #include <anna/core/RuntimeException.hpp>
46
47 #include <anna/xml/Node.hpp>
48 #include <anna/xml/Attribute.hpp>
49
50 #include <anna/app/functions.hpp>
51
52 #include <anna/comm/CongestionController.hpp>
53 #include <anna/comm/ClientSocket.hpp>
54 #include <anna/comm/Communicator.hpp>
55
56 using namespace std;
57 using namespace anna;
58
59 // static
60 const int comm::CongestionController::MaxPendingBytes = 64 * 1024;
61 const Millisecond comm::CongestionController::DelayTrace(5000);
62
63 // Si pasa este tiempo sin recibir peticiones de consejo => entiende que el proceso ha dejado de recibir carga => inicializará las estadísticas
64 //static
65 const Millisecond comm::CongestionController::HeartBeat(1000);
66
67 comm::CongestionController::CongestionController() :
68   a_avgWorkload("comm::CongestionController::AvgWorkload"),
69   a_timeTrace(0),
70   a_maxPendingBytes(UnusedPendingBytes),
71   a_incomingSocketCounter(0),
72   a_tickTime(0) {
73   srand(time(NULL));
74   a_percentage [0] = 25;
75   a_percentage [1] = 50;
76   a_percentage [2] = 85;
77   a_percentage [3] = 99;
78   a_limit = 0;
79   setLimit(DefaultLimit);
80   setMode(Mode::Auto);
81 }
82
83 void comm::CongestionController::setLimit(const int limit)
84 throw() {
85   Guard guard(a_mutex, "comm::CongestionController (setLimit)");
86
87   if(limit < 0  || limit > 100) {
88     LOGWARNING(
89       Logger::warning(
90         functions::asString("comm::CongestionController::setLimit | Limit %d out of range [0,100]", limit),
91         ANNA_FILE_LOCATION
92       );
93     );
94     return;
95   }
96
97   if(a_limit == limit)
98     return;
99
100   int congestionArea = 100 - (a_limit = limit);
101   a_discardLevel [0] = (congestionArea * a_percentage [0] / 100) + a_limit;
102   a_discardLevel [1] = (congestionArea * a_percentage [1] / 100) + a_limit;
103   a_discardLevel [2] = (congestionArea * a_percentage [2] / 100) + a_limit;
104   a_discardLevel [3] = 100;
105   a_messageCounter = a_discardedCounter = 0;
106   LOGINFORMATION(
107     string msg = functions::asString(
108                    "comm::CongestionController::setLimit | Limit: %d | ", limit
109                  );
110
111     for(int i = 0; i < MaxLevel; i ++)
112     msg += functions::asString("(Level %d: %d%%) ", i + 1, a_discardLevel [i]);
113     Logger::information(msg, ANNA_FILE_LOCATION);
114   );
115 }
116
117 void comm::CongestionController::setMaxPendingBytes(const int maxPendingBytes)
118 throw(RuntimeException) {
119   if(maxPendingBytes == UnusedPendingBytes) {
120     a_maxPendingBytes = UnusedPendingBytes;
121     return;
122   }
123
124 //   const int minimum (Communicator::getReceivingChunkSize ());
125 //   const int maximum (min (minimum << 1, CongestionController::MaxPendingBytes));
126   const int minimum(0);
127   const int maximum(CongestionController::MaxPendingBytes);
128
129   if(maxPendingBytes < minimum || maxPendingBytes > maximum) {
130     string msg("comm::CongestionController::setMaxPendingBytes | ");
131     msg += functions::asString("MaxPendingBytes (%d bytes) must be between %d bytes and %d bytes", maxPendingBytes, minimum, maximum);
132     throw RuntimeException(msg, ANNA_FILE_LOCATION);
133   }
134
135   a_maxPendingBytes = maxPendingBytes;
136 }
137
138 /*
139  * Se invoca desde [Tzzz] seguramente que con el clientSocket protegido por la sección crítica establecida
140  * por su manejador asociado.
141  */
142 comm::CongestionController::Advice::_v comm::CongestionController::getAdvice(const ClientSocket& clientSocket)
143 throw() {
144   Guard guard(a_mutex, "comm::CongestionController::getAdvice");
145
146   if(a_limit == 0) {
147     a_messageCounter ++;
148     return Advice::Process;
149   }
150
151   Advice::_v result = Advice::Process;
152   int workload = calculeWorkload(clientSocket);
153   int discard = 0;
154
155   // Si le da la vuelta al marcador
156   if(++ a_messageCounter == 0) {
157     a_messageCounter = 1;
158     a_discardedCounter = 0;
159     a_avgWorkload.clear();
160   }
161
162   const Millisecond now = functions::millisecond();
163
164   if((now - a_tickTime) > HeartBeat)
165     a_avgWorkload.clear();
166
167   a_tickTime = now;
168
169   if(workload != -1) {
170     a_avgWorkload += workload;
171
172     if(a_effectiveMode == Mode::Global)
173       workload = a_avgWorkload;
174
175     LOGDEBUG(
176       std::string msg("comm::CongestionController::getAdvice | ");
177       msg += a_avgWorkload.asString();
178       msg += functions::asText(" | inmediate workload: ", workload);
179       Logger::debug(msg, ANNA_FILE_LOCATION)
180     );
181
182     if(workload  < a_limit) {
183       result = Advice::Process;
184       discard = 0;
185     } else {
186       result = Advice::Discard;
187       discard = 100;
188
189       for(int i = 0; i < MaxLevel; i ++) {
190         if(workload > a_discardLevel [i])
191           continue;
192
193         result = ((rand() % 101) > (discard = a_percentage [i])) ? Advice::Process : Advice::Discard;
194         break;
195       }
196     }
197   }
198
199   if(result == Advice::Process) {
200     LOGDEBUG(
201       const int ratio = (a_messageCounter - a_discardedCounter) * 100 / a_messageCounter;
202       string msg = functions::asString(
203                      "Mode: %s | Limit: %d%% | Workload: %d%% | Filter: %d%% | Ratio: %d%% | Result: Process",
204                      ((a_effectiveMode == Mode::Local) ? "Local" : "Global"), a_limit, workload, discard, ratio
205                    );
206       Logger::debug(msg, ANNA_FILE_LOCATION);
207     );
208   } else {
209     a_discardedCounter ++;
210     const int ratio = (a_messageCounter - a_discardedCounter) * 100 / a_messageCounter;
211
212     if(Logger::isActive(Logger::Debug) == true) {
213       string msg = functions::asString(
214                      "Mode: %s | Limit: %d%% | Workload: %d%% | Filter: %d%% | Ratio (%u/%u): %d%% | Result: Discard",
215                      ((a_effectiveMode == Mode::Local) ? "Local" : "Global"), a_limit, workload, discard, a_messageCounter, a_discardedCounter, ratio
216                    );
217       Logger::debug(msg, ANNA_FILE_LOCATION);
218     } else if(Logger::isActive(Logger::Warning)) {
219       Millisecond now = functions::millisecond();
220
221       if((now - a_timeTrace) >= DelayTrace) {
222         string msg = functions::asString(
223                        "Mode: %s | Limit: %d%% | Workload: %d%% | Filter: %d%% | Ratio (%u/%u): %d%% | Result: Discard",
224                        ((a_effectiveMode == Mode::Local) ? "Local" : "Global"), a_limit, workload, discard, a_messageCounter, a_discardedCounter, ratio
225                      );
226         Logger::warning(msg, ANNA_FILE_LOCATION);
227         a_timeTrace = now;
228       }
229     }
230   }
231
232   return result;
233 }
234
235 void comm::CongestionController::incrementIncomingSocket()
236 throw(RuntimeException) {
237   Guard guard(a_mutex, "comm::CongestionController::incrementIncomingSocket");
238   a_incomingSocketCounter ++;
239 }
240
241 void comm::CongestionController::decrementIncomingSocket()
242 throw(RuntimeException) {
243   if(a_incomingSocketCounter ==  0)
244     return;
245
246   Guard guard(a_mutex, "comm::CongestionController::incrementIncomingSocket");
247   a_timeTrace = 0;
248
249   if(a_incomingSocketCounter == 0) {
250     a_avgWorkload.clear();
251     return;
252   }
253
254   const int average = a_avgWorkload;
255
256   unsigned int individualWorkload = average / a_incomingSocketCounter;
257
258   if(-- a_incomingSocketCounter < 0)
259     a_incomingSocketCounter = 0;
260
261   if(a_incomingSocketCounter > 0)
262     a_avgWorkload.setValue(individualWorkload * a_incomingSocketCounter * a_incomingSocketCounter, a_incomingSocketCounter);
263   else
264     a_avgWorkload.clear();
265 }
266
267 int comm::CongestionController::calculeWorkload(const ClientSocket& clientSocket) const
268 throw() {
269   int maxSize;
270
271   if(a_maxPendingBytes == UnusedPendingBytes)
272     maxSize = clientSocket.getReceiveBufferSize();
273   else
274     maxSize = a_maxPendingBytes;
275
276   // El tamaño actual tiene en cuenta lo que ha cargado en memoria, más lo que hay en el I/O pendiente de cargar
277   const int size = clientSocket.getTotalPendingBytes();
278   const int result = (maxSize == 0) ? -1 : (size * 100) / maxSize;
279   LOGDEBUG(
280     string msg = functions::asString("fd: %d | Size (%d%%): %d/%d", clientSocket.getfd(), result, size, maxSize);
281     Logger::debug(msg, ANNA_FILE_LOCATION);
282   );
283   return result;
284 }
285
286 xml::Node* comm::CongestionController::asXML(xml::Node* parent) const
287 throw() {
288   static const char* modetxt [] = { "Auto", "Local", "Global" };
289   xml::Node* result = parent->createChild("comm.CongestionController");
290   result->createAttribute("Limit", a_limit);
291   result->createAttribute("Mode", modetxt [a_mode]);
292   result->createAttribute("EffectiveMode", modetxt [a_effectiveMode]);
293
294   if(a_maxPendingBytes != UnusedPendingBytes)
295     result->createAttribute("MaxPendingBytes", a_maxPendingBytes);
296
297   xml::Node* w;
298   Workload current = getAccumulatedWorkload();
299   w = result->createChild("CurrentStatus");
300   w->createAttribute("Level", getLevel(current));
301   w->createAttribute("Workload", getLoad(current));
302   w->createAttribute("Process", a_messageCounter);
303   w->createAttribute("Discard", a_discardedCounter);
304   w->createAttribute("IncominSockets", a_incomingSocketCounter);
305   w = result->createChild("Levels");
306   w->createAttribute(
307     "Ratio", ((a_messageCounter == 0) ? 0 : (a_messageCounter - a_discardedCounter) * 100 / a_messageCounter)
308   );
309   xml::Node* node;
310   node = w->createChild("Level");
311   node->createAttribute("Id", 0);
312   node->createAttribute("Limit", a_limit - 1);
313   node->createAttribute("Discarding", 0);
314
315   for(int i = 0; i < MaxLevel; i ++) {
316     node = w->createChild("Level");
317     node->createAttribute("Id", i + 1);
318     node->createAttribute("Limit", a_discardLevel [i]);
319     node->createAttribute("Discarding", a_percentage [i]);
320   }
321
322   return result;
323 }
324
325 comm::CongestionController::Workload comm::CongestionController::getAccumulatedWorkload() const
326 throw() {
327   Workload result;
328   const Millisecond now = functions::millisecond();
329
330   if((now - a_tickTime) > HeartBeat) {
331     CongestionController* _this = const_cast <CongestionController*>(this);
332     _this->a_avgWorkload.clear();
333     _this->a_tickTime = now;
334   }
335
336   result.second = a_avgWorkload;
337
338   if(result.second < a_limit)
339     result.first = 0;
340   else {
341     result.first = 4;
342
343     for(int ii = 0; ii < MaxLevel; ii ++) {
344       if(result.second > a_discardLevel [ii])
345         continue;
346
347       result.first = ii + 1;
348       break;
349     }
350   }
351
352   return result;
353 }
354
355
356 comm::CongestionController::Workload comm::CongestionController::getCurrentWorkload(const comm::ClientSocket& clientSocket) const
357 throw() {
358   Workload result;
359   result.second = calculeWorkload(clientSocket);
360
361   if(result.second < a_limit)
362     result.first = 0;
363   else {
364     result.first = 4;
365
366     for(int ii = 0; ii < MaxLevel; ii ++) {
367       if(result.second > a_discardLevel [ii])
368         continue;
369
370       result.first = ii + 1;
371       break;
372     }
373   }
374
375   return result;
376 }