1 // ANNA - Anna is Not Nothingness Anymore
3 // (c) Copyright 2005-2014 Eduardo Ramos Testillano & Francisco Ruiz Rayo
5 // http://redmine.teslayout.com/projects/anna-suite
7 // Redistribution and use in source and binary forms, with or without
8 // modification, are permitted provided that the following conditions
11 // * Redistributions of source code must retain the above copyright
12 // notice, this list of conditions and the following disclaimer.
13 // * Redistributions in binary form must reproduce the above
14 // copyright notice, this list of conditions and the following disclaimer
15 // in the documentation and/or other materials provided with the
17 // * Neither the name of the copyright holder nor the names of its
18 // contributors may be used to endorse or promote products derived from
19 // this software without specific prior written permission.
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33 // Authors: eduardo.ramos.testillano@gmail.com
34 // cisco.tierra@gmail.com
37 #include <sys/ioctl.h>
39 #include <sys/types.h>
40 #include <sys/socket.h>
43 #include <anna/core/tracing/Logger.hpp>
44 #include <anna/core/functions.hpp>
45 #include <anna/core/RuntimeException.hpp>
47 #include <anna/xml/Node.hpp>
48 #include <anna/xml/Attribute.hpp>
50 #include <anna/app/functions.hpp>
52 #include <anna/comm/CongestionController.hpp>
53 #include <anna/comm/ClientSocket.hpp>
54 #include <anna/comm/Communicator.hpp>
60 const int comm::CongestionController::MaxPendingBytes = 64 * 1024;
61 const Millisecond comm::CongestionController::DelayTrace(5000);
63 // Si pasa este tiempo sin recibir peticiones de consejo => entiende que el proceso ha dejado de recibir carga => inicializará las estadísticas
65 const Millisecond comm::CongestionController::HeartBeat(1000);
67 comm::CongestionController::CongestionController() :
68 a_avgWorkload("comm::CongestionController::AvgWorkload"),
70 a_maxPendingBytes(UnusedPendingBytes),
71 a_incomingSocketCounter(0),
74 a_percentage [0] = 25;
75 a_percentage [1] = 50;
76 a_percentage [2] = 85;
77 a_percentage [3] = 99;
79 setLimit(DefaultLimit);
83 void comm::CongestionController::setLimit(const int limit)
85 Guard guard(a_mutex, "comm::CongestionController (setLimit)");
87 if(limit < 0 || limit > 100) {
90 functions::asString("comm::CongestionController::setLimit | Limit %d out of range [0,100]", limit),
100 int congestionArea = 100 - (a_limit = limit);
101 a_discardLevel [0] = (congestionArea * a_percentage [0] / 100) + a_limit;
102 a_discardLevel [1] = (congestionArea * a_percentage [1] / 100) + a_limit;
103 a_discardLevel [2] = (congestionArea * a_percentage [2] / 100) + a_limit;
104 a_discardLevel [3] = 100;
105 a_messageCounter = a_discardedCounter = 0;
107 string msg = functions::asString(
108 "comm::CongestionController::setLimit | Limit: %d | ", limit
111 for(int i = 0; i < MaxLevel; i ++)
112 msg += functions::asString("(Level %d: %d%%) ", i + 1, a_discardLevel [i]);
113 Logger::information(msg, ANNA_FILE_LOCATION);
117 void comm::CongestionController::setMaxPendingBytes(const int maxPendingBytes)
118 throw(RuntimeException) {
119 if(maxPendingBytes == UnusedPendingBytes) {
120 a_maxPendingBytes = UnusedPendingBytes;
124 // const int minimum (Communicator::getReceivingChunkSize ());
125 // const int maximum (min (minimum << 1, CongestionController::MaxPendingBytes));
126 const int minimum(0);
127 const int maximum(CongestionController::MaxPendingBytes);
129 if(maxPendingBytes < minimum || maxPendingBytes > maximum) {
130 string msg("comm::CongestionController::setMaxPendingBytes | ");
131 msg += functions::asString("MaxPendingBytes (%d bytes) must be between %d bytes and %d bytes", maxPendingBytes, minimum, maximum);
132 throw RuntimeException(msg, ANNA_FILE_LOCATION);
135 a_maxPendingBytes = maxPendingBytes;
139 * Se invoca desde [Tzzz] seguramente que con el clientSocket protegido por la sección crítica establecida
140 * por su manejador asociado.
142 comm::CongestionController::Advice::_v comm::CongestionController::getAdvice(const ClientSocket& clientSocket)
144 Guard guard(a_mutex, "comm::CongestionController::getAdvice");
148 return Advice::Process;
151 Advice::_v result = Advice::Process;
152 int workload = calculeWorkload(clientSocket);
155 // Si le da la vuelta al marcador
156 if(++ a_messageCounter == 0) {
157 a_messageCounter = 1;
158 a_discardedCounter = 0;
159 a_avgWorkload.clear();
162 const Millisecond now = functions::millisecond();
164 if((now - a_tickTime) > HeartBeat)
165 a_avgWorkload.clear();
170 a_avgWorkload += workload;
172 if(a_effectiveMode == Mode::Global)
173 workload = a_avgWorkload;
176 std::string msg("comm::CongestionController::getAdvice | ");
177 msg += a_avgWorkload.asString();
178 msg += functions::asText(" | inmediate workload: ", workload);
179 Logger::debug(msg, ANNA_FILE_LOCATION)
182 if(workload < a_limit) {
183 result = Advice::Process;
186 result = Advice::Discard;
189 for(int i = 0; i < MaxLevel; i ++) {
190 if(workload > a_discardLevel [i])
193 result = ((rand() % 101) > (discard = a_percentage [i])) ? Advice::Process : Advice::Discard;
199 if(result == Advice::Process) {
201 const int ratio = (a_messageCounter - a_discardedCounter) * 100 / a_messageCounter;
202 string msg = functions::asString(
203 "Mode: %s | Limit: %d%% | Workload: %d%% | Filter: %d%% | Ratio: %d%% | Result: Process",
204 ((a_effectiveMode == Mode::Local) ? "Local" : "Global"), a_limit, workload, discard, ratio
206 Logger::debug(msg, ANNA_FILE_LOCATION);
209 a_discardedCounter ++;
210 const int ratio = (a_messageCounter - a_discardedCounter) * 100 / a_messageCounter;
212 if(Logger::isActive(Logger::Debug) == true) {
213 string msg = functions::asString(
214 "Mode: %s | Limit: %d%% | Workload: %d%% | Filter: %d%% | Ratio (%u/%u): %d%% | Result: Discard",
215 ((a_effectiveMode == Mode::Local) ? "Local" : "Global"), a_limit, workload, discard, a_messageCounter, a_discardedCounter, ratio
217 Logger::debug(msg, ANNA_FILE_LOCATION);
218 } else if(Logger::isActive(Logger::Warning)) {
219 Millisecond now = functions::millisecond();
221 if((now - a_timeTrace) >= DelayTrace) {
222 string msg = functions::asString(
223 "Mode: %s | Limit: %d%% | Workload: %d%% | Filter: %d%% | Ratio (%u/%u): %d%% | Result: Discard",
224 ((a_effectiveMode == Mode::Local) ? "Local" : "Global"), a_limit, workload, discard, a_messageCounter, a_discardedCounter, ratio
226 Logger::warning(msg, ANNA_FILE_LOCATION);
235 void comm::CongestionController::incrementIncomingSocket()
236 throw(RuntimeException) {
237 Guard guard(a_mutex, "comm::CongestionController::incrementIncomingSocket");
238 a_incomingSocketCounter ++;
241 void comm::CongestionController::decrementIncomingSocket()
242 throw(RuntimeException) {
243 if(a_incomingSocketCounter == 0)
246 Guard guard(a_mutex, "comm::CongestionController::incrementIncomingSocket");
249 if(a_incomingSocketCounter == 0) {
250 a_avgWorkload.clear();
254 const int average = a_avgWorkload;
256 unsigned int individualWorkload = average / a_incomingSocketCounter;
258 if(-- a_incomingSocketCounter < 0)
259 a_incomingSocketCounter = 0;
261 if(a_incomingSocketCounter > 0)
262 a_avgWorkload.setValue(individualWorkload * a_incomingSocketCounter * a_incomingSocketCounter, a_incomingSocketCounter);
264 a_avgWorkload.clear();
267 int comm::CongestionController::calculeWorkload(const ClientSocket& clientSocket) const
271 if(a_maxPendingBytes == UnusedPendingBytes)
272 maxSize = clientSocket.getReceiveBufferSize();
274 maxSize = a_maxPendingBytes;
276 // El tamaño actual tiene en cuenta lo que ha cargado en memoria, más lo que hay en el I/O pendiente de cargar
277 const int size = clientSocket.getTotalPendingBytes();
278 const int result = (maxSize == 0) ? -1 : (size * 100) / maxSize;
280 string msg = functions::asString("fd: %d | Size (%d%%): %d/%d", clientSocket.getfd(), result, size, maxSize);
281 Logger::debug(msg, ANNA_FILE_LOCATION);
286 xml::Node* comm::CongestionController::asXML(xml::Node* parent) const
288 static const char* modetxt [] = { "Auto", "Local", "Global" };
289 xml::Node* result = parent->createChild("comm.CongestionController");
290 result->createAttribute("Limit", a_limit);
291 result->createAttribute("Mode", modetxt [a_mode]);
292 result->createAttribute("EffectiveMode", modetxt [a_effectiveMode]);
294 if(a_maxPendingBytes != UnusedPendingBytes)
295 result->createAttribute("MaxPendingBytes", a_maxPendingBytes);
298 Workload current = getAccumulatedWorkload();
299 w = result->createChild("CurrentStatus");
300 w->createAttribute("Level", getLevel(current));
301 w->createAttribute("Workload", getLoad(current));
302 w->createAttribute("Process", a_messageCounter);
303 w->createAttribute("Discard", a_discardedCounter);
304 w->createAttribute("IncominSockets", a_incomingSocketCounter);
305 w = result->createChild("Levels");
307 "Ratio", ((a_messageCounter == 0) ? 0 : (a_messageCounter - a_discardedCounter) * 100 / a_messageCounter)
310 node = w->createChild("Level");
311 node->createAttribute("Id", 0);
312 node->createAttribute("Limit", a_limit - 1);
313 node->createAttribute("Discarding", 0);
315 for(int i = 0; i < MaxLevel; i ++) {
316 node = w->createChild("Level");
317 node->createAttribute("Id", i + 1);
318 node->createAttribute("Limit", a_discardLevel [i]);
319 node->createAttribute("Discarding", a_percentage [i]);
325 comm::CongestionController::Workload comm::CongestionController::getAccumulatedWorkload() const
328 const Millisecond now = functions::millisecond();
330 if((now - a_tickTime) > HeartBeat) {
331 CongestionController* _this = const_cast <CongestionController*>(this);
332 _this->a_avgWorkload.clear();
333 _this->a_tickTime = now;
336 result.second = a_avgWorkload;
338 if(result.second < a_limit)
343 for(int ii = 0; ii < MaxLevel; ii ++) {
344 if(result.second > a_discardLevel [ii])
347 result.first = ii + 1;
356 comm::CongestionController::Workload comm::CongestionController::getCurrentWorkload(const comm::ClientSocket& clientSocket) const
359 result.second = calculeWorkload(clientSocket);
361 if(result.second < a_limit)
366 for(int ii = 0; ii < MaxLevel; ii ++) {
367 if(result.second > a_discardLevel [ii])
370 result.first = ii + 1;