1 // ANNA - Anna is Not Nothingness Anymore //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
11 #include <sys/types.h>
12 #include <sys/socket.h>
15 #include <anna/core/tracing/Logger.hpp>
16 #include <anna/core/functions.hpp>
17 #include <anna/core/RuntimeException.hpp>
19 #include <anna/xml/Node.hpp>
20 #include <anna/xml/Attribute.hpp>
22 #include <anna/app/functions.hpp>
24 #include <anna/comm/CongestionController.hpp>
25 #include <anna/comm/ClientSocket.hpp>
26 #include <anna/comm/Communicator.hpp>
32 const int comm::CongestionController::MaxPendingBytes = 64 * 1024;
33 const Millisecond comm::CongestionController::DelayTrace(5000);
35 // Si pasa este tiempo sin recibir peticiones de consejo => entiende que el proceso ha dejado de recibir carga => inicializará las estadísticas
37 const Millisecond comm::CongestionController::HeartBeat(1000);
39 comm::CongestionController::CongestionController() :
40 a_avgWorkload("comm::CongestionController::AvgWorkload"),
42 a_maxPendingBytes(UnusedPendingBytes),
43 a_incomingSocketCounter(0),
46 a_percentage [0] = 25;
47 a_percentage [1] = 50;
48 a_percentage [2] = 85;
49 a_percentage [3] = 99;
51 setLimit(DefaultLimit);
55 void comm::CongestionController::setLimit(const int limit)
57 Guard guard(a_mutex, "comm::CongestionController (setLimit)");
59 if(limit < 0 || limit > 100) {
62 functions::asString("comm::CongestionController::setLimit | Limit %d out of range [0,100]", limit),
72 int congestionArea = 100 - (a_limit = limit);
73 a_discardLevel [0] = (congestionArea * a_percentage [0] / 100) + a_limit;
74 a_discardLevel [1] = (congestionArea * a_percentage [1] / 100) + a_limit;
75 a_discardLevel [2] = (congestionArea * a_percentage [2] / 100) + a_limit;
76 a_discardLevel [3] = 100;
77 a_messageCounter = a_discardedCounter = 0;
79 string msg = functions::asString(
80 "comm::CongestionController::setLimit | Limit: %d | ", limit
83 for(int i = 0; i < MaxLevel; i ++)
84 msg += functions::asString("(Level %d: %d%%) ", i + 1, a_discardLevel [i]);
85 Logger::information(msg, ANNA_FILE_LOCATION);
89 void comm::CongestionController::setMaxPendingBytes(const int maxPendingBytes)
90 throw(RuntimeException) {
91 if(maxPendingBytes == UnusedPendingBytes) {
92 a_maxPendingBytes = UnusedPendingBytes;
96 // const int minimum (Communicator::getReceivingChunkSize ());
97 // const int maximum (min (minimum << 1, CongestionController::MaxPendingBytes));
99 const int maximum(CongestionController::MaxPendingBytes);
101 if(maxPendingBytes < minimum || maxPendingBytes > maximum) {
102 string msg("comm::CongestionController::setMaxPendingBytes | ");
103 msg += functions::asString("MaxPendingBytes (%d bytes) must be between %d bytes and %d bytes", maxPendingBytes, minimum, maximum);
104 throw RuntimeException(msg, ANNA_FILE_LOCATION);
107 a_maxPendingBytes = maxPendingBytes;
111 * Se invoca desde [Tzzz] seguramente que con el clientSocket protegido por la sección crítica establecida
112 * por su manejador asociado.
114 comm::CongestionController::Advice::_v comm::CongestionController::getAdvice(const ClientSocket& clientSocket)
116 Guard guard(a_mutex, "comm::CongestionController::getAdvice");
120 return Advice::Process;
123 Advice::_v result = Advice::Process;
124 int workload = calculeWorkload(clientSocket);
127 // Si le da la vuelta al marcador
128 if(++ a_messageCounter == 0) {
129 a_messageCounter = 1;
130 a_discardedCounter = 0;
131 a_avgWorkload.clear();
134 const Millisecond now = functions::millisecond();
136 if((now - a_tickTime) > HeartBeat)
137 a_avgWorkload.clear();
142 a_avgWorkload += workload;
144 if(a_effectiveMode == Mode::Global)
145 workload = a_avgWorkload;
148 std::string msg("comm::CongestionController::getAdvice | ");
149 msg += a_avgWorkload.asString();
150 msg += functions::asText(" | inmediate workload: ", workload);
151 Logger::debug(msg, ANNA_FILE_LOCATION)
154 if(workload < a_limit) {
155 result = Advice::Process;
158 result = Advice::Discard;
161 for(int i = 0; i < MaxLevel; i ++) {
162 if(workload > a_discardLevel [i])
165 result = ((rand() % 101) > (discard = a_percentage [i])) ? Advice::Process : Advice::Discard;
171 if(result == Advice::Process) {
173 const int ratio = (a_messageCounter - a_discardedCounter) * 100 / a_messageCounter;
174 string msg = functions::asString(
175 "Mode: %s | Limit: %d%% | Workload: %d%% | Filter: %d%% | Ratio: %d%% | Result: Process",
176 ((a_effectiveMode == Mode::Local) ? "Local" : "Global"), a_limit, workload, discard, ratio
178 Logger::debug(msg, ANNA_FILE_LOCATION);
181 a_discardedCounter ++;
182 const int ratio = (a_messageCounter - a_discardedCounter) * 100 / a_messageCounter;
184 if(Logger::isActive(Logger::Debug) == true) {
185 string msg = functions::asString(
186 "Mode: %s | Limit: %d%% | Workload: %d%% | Filter: %d%% | Ratio (%u/%u): %d%% | Result: Discard",
187 ((a_effectiveMode == Mode::Local) ? "Local" : "Global"), a_limit, workload, discard, a_messageCounter, a_discardedCounter, ratio
189 Logger::debug(msg, ANNA_FILE_LOCATION);
190 } else if(Logger::isActive(Logger::Warning)) {
191 Millisecond now = functions::millisecond();
193 if((now - a_timeTrace) >= DelayTrace) {
194 string msg = functions::asString(
195 "Mode: %s | Limit: %d%% | Workload: %d%% | Filter: %d%% | Ratio (%u/%u): %d%% | Result: Discard",
196 ((a_effectiveMode == Mode::Local) ? "Local" : "Global"), a_limit, workload, discard, a_messageCounter, a_discardedCounter, ratio
198 Logger::warning(msg, ANNA_FILE_LOCATION);
207 void comm::CongestionController::incrementIncomingSocket()
208 throw(RuntimeException) {
209 Guard guard(a_mutex, "comm::CongestionController::incrementIncomingSocket");
210 a_incomingSocketCounter ++;
213 void comm::CongestionController::decrementIncomingSocket()
214 throw(RuntimeException) {
215 if(a_incomingSocketCounter == 0)
218 Guard guard(a_mutex, "comm::CongestionController::incrementIncomingSocket");
221 if(a_incomingSocketCounter == 0) {
222 a_avgWorkload.clear();
226 const int average = a_avgWorkload;
228 unsigned int individualWorkload = average / a_incomingSocketCounter;
230 if(-- a_incomingSocketCounter < 0)
231 a_incomingSocketCounter = 0;
233 if(a_incomingSocketCounter > 0)
234 a_avgWorkload.setValue(individualWorkload * a_incomingSocketCounter * a_incomingSocketCounter, a_incomingSocketCounter);
236 a_avgWorkload.clear();
239 int comm::CongestionController::calculeWorkload(const ClientSocket& clientSocket) const
243 if(a_maxPendingBytes == UnusedPendingBytes)
244 maxSize = clientSocket.getReceiveBufferSize();
246 maxSize = a_maxPendingBytes;
248 // El tamaño actual tiene en cuenta lo que ha cargado en memoria, más lo que hay en el I/O pendiente de cargar
249 const int size = clientSocket.getTotalPendingBytes();
250 const int result = (maxSize == 0) ? -1 : (size * 100) / maxSize;
252 string msg = functions::asString("fd: %d | Size (%d%%): %d/%d", clientSocket.getfd(), result, size, maxSize);
253 Logger::debug(msg, ANNA_FILE_LOCATION);
258 xml::Node* comm::CongestionController::asXML(xml::Node* parent) const
260 static const char* modetxt [] = { "Auto", "Local", "Global" };
261 xml::Node* result = parent->createChild("comm.CongestionController");
262 result->createAttribute("Limit", a_limit);
263 result->createAttribute("Mode", modetxt [a_mode]);
264 result->createAttribute("EffectiveMode", modetxt [a_effectiveMode]);
266 if(a_maxPendingBytes != UnusedPendingBytes)
267 result->createAttribute("MaxPendingBytes", a_maxPendingBytes);
270 Workload current = getAccumulatedWorkload();
271 w = result->createChild("CurrentStatus");
272 w->createAttribute("Level", getLevel(current));
273 w->createAttribute("Workload", getLoad(current));
274 w->createAttribute("Process", a_messageCounter);
275 w->createAttribute("Discard", a_discardedCounter);
276 w->createAttribute("IncominSockets", a_incomingSocketCounter);
277 w = result->createChild("Levels");
279 "Ratio", ((a_messageCounter == 0) ? 0 : (a_messageCounter - a_discardedCounter) * 100 / a_messageCounter)
282 node = w->createChild("Level");
283 node->createAttribute("Id", 0);
284 node->createAttribute("Limit", a_limit - 1);
285 node->createAttribute("Discarding", 0);
287 for(int i = 0; i < MaxLevel; i ++) {
288 node = w->createChild("Level");
289 node->createAttribute("Id", i + 1);
290 node->createAttribute("Limit", a_discardLevel [i]);
291 node->createAttribute("Discarding", a_percentage [i]);
297 comm::CongestionController::Workload comm::CongestionController::getAccumulatedWorkload() const
300 const Millisecond now = functions::millisecond();
302 if((now - a_tickTime) > HeartBeat) {
303 CongestionController* _this = const_cast <CongestionController*>(this);
304 _this->a_avgWorkload.clear();
305 _this->a_tickTime = now;
308 result.second = a_avgWorkload;
310 if(result.second < a_limit)
315 for(int ii = 0; ii < MaxLevel; ii ++) {
316 if(result.second > a_discardLevel [ii])
319 result.first = ii + 1;
328 comm::CongestionController::Workload comm::CongestionController::getCurrentWorkload(const comm::ClientSocket& clientSocket) const
331 result.second = calculeWorkload(clientSocket);
333 if(result.second < a_limit)
338 for(int ii = 0; ii < MaxLevel; ii ++) {
339 if(result.second > a_discardLevel [ii])
342 result.first = ii + 1;