Updated license
[anna.git] / include / anna / comm / CongestionController.hpp
1 // ANNA - Anna is Not Nothingness Anymore
2 //
3 // (c) Copyright 2005-2014 Eduardo Ramos Testillano & Francisco Ruiz Rayo
4 //
5 // https://bitbucket.org/testillano/anna
6 //
7 // Redistribution and use in source and binary forms, with or without
8 // modification, are permitted provided that the following conditions
9 // are met:
10 //
11 //     * Redistributions of source code must retain the above copyright
12 // notice, this list of conditions and the following disclaimer.
13 //     * Redistributions in binary form must reproduce the above
14 // copyright notice, this list of conditions and the following disclaimer
15 // in the documentation and/or other materials provided with the
16 // distribution.
17 //     * Neither the name of Google Inc. nor the names of its
18 // contributors may be used to endorse or promote products derived from
19 // this software without specific prior written permission.
20 //
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 //
33 // Authors: eduardo.ramos.testillano@gmail.com
34 //          cisco.tierra@gmail.com
35
36
37 #ifndef anna_comm_CongestionController_hpp
38 #define anna_comm_CongestionController_hpp
39
40 #include <utility>
41
42 #include <anna/config/defines.hpp>
43 #include <anna/core/Singleton.hpp>
44 #include <anna/core/mt/NRMutex.hpp>
45 #include <anna/core/util/Average.hpp>
46
47 namespace anna {
48
49 namespace xml {
50 class Node;
51 }
52
53 namespace comm {
54
55 class ClientSocket;
56
57 namespace handler {
58 class LocalConnection;
59 }
60
61 /**
62    Gestor de congestion autonomo.
63
64 Su funcionamiento a grandes rasgos es el siguiente:
65
66    -# El campo \em getsockopt (...SO_RCVBUF...) nos da el tamaño maximo del buffer de recepcion
67    para un socket dado. Segun hemos visto puede varias desde 32K hasta 80 K, dependiendo del sistema
68    operativo y la configuracion. Independientemente del valor particular este sera nuestro limite
69    maximo (100%), ya que si la longitud de la cola de espera alcanza este valor, el socket dejara de
70    estar disponible para todos los clientes.
71    -# El metodo \em ioctl (....FIONREAD...) nos da el numero de bytes pendientes de tratar en el socket.
72    -# La carga del socket estara definida por (2) * 100 / (1).
73    -# Con esta clase podemos limitar la carga maxima soportada por los sockets de nuestra
74    aplicacion. Por defecto sera 60% del valor (1), un valor mas conservador podria ser 40%.
75    -# La zona sin congestion estara entre 0% y el valor (4). La zona de congestion estara entre (valor (4) + 1)%  y el 100%.
76    -# La zona de congestion se divide, a su vez, en 4 zonas logicas: Si la carga del socket esta en el
77    primer 25% se descartan mensajes con una probabilidad del 25%. Si la carga esta en el segundo 25% se
78    descartan mensajes con una probabilidad del 50%. Si la carga esta entre el 51% y el 85% se descartan
79    paquetes con una probabilidad del 85% y si esta entre el 86% y el 100% se descartan los paquetes con
80    una probabilidad del 99%.
81
82 Ventajas:
83
84    \li Trata cada socket de forma independiente, con lo que ajustamos mucho mejor el rendimiento de
85    nuestra aplicacion.
86    \li No hay que imaginar los 4 niveles de a8_controlniveles. Solo tendremos que establecer el nivel
87    de carga maximo y el resto lo hace el proceso por si mismo. Sea como sea, es mucho mas facil ajustar
88    un unico valor que los 8 o 10 necesarios con el sistema anterior.
89    \li No requiere ninguna configuracion en base de datos.
90 */
91 class CongestionController : public Singleton <CongestionController> {
92 public:
93   /**
94    * Nivel máximo de congestión.
95    */
96   static const int MaxLevel = 4;
97
98   /**
99      Posibles resultados de CongestionController::getAdvice.
100   */
101   struct Advice {
102     enum _v {
103       None, /**< No hay datos para tomar una decision.  */
104       Process, /**< Se puede procesar el mensaje sin contribuir a la congestion */
105       Discard /**< Se aconseja descartar para limitar la congestion */
106     };
107   };
108
109   /**
110      Posibles formas de calcular la carga que hara aconsejar el descarte o aceptacion de un mensaje.
111   */
112   struct Mode {
113     enum _v {
114       Auto, /**< Escoge la forma de calculo que mejor se adapte al modo de compilacion. Normalmente
115                 El calculo de tipo Local sera mas util en modo MT, mientras que el Global puede ser
116                 mas aconsejable en modo ST.
117                 */
118       Local, /**< El calculo de la carga se calcula de forma independiente para cada Socket */
119       Global /**< El calculo de la carga se calcula de forma conjunta para todos los Sockets */
120     };
121   };
122
123   typedef std::pair <int, int> Workload;
124
125   static const int DefaultLimit = 60;
126
127   /**
128    * Valor máximo que se puede asignar al control de bytes pendientes de tratar.
129    */
130   static const int MaxPendingBytes;
131
132   /**
133      Devuelve el modo que estamos usando para calcular la carga.
134      \return El modo que estamos usando para calcular la carga.
135   */
136   Mode::_v getMode() const throw() { return a_mode; }
137
138   /**
139    * Devuelve el número total de mensajes recibididos.
140    * \return el número total de mensajes recibididos.
141    */
142   int getMessageCounter() const throw() { return a_messageCounter; }
143
144   /**
145    * Devuelve el número de mensajes tratados correctamente.
146    * \return el número de mensajes tratados correctamente.
147    */
148   int getSuccessCounter() const throw() { return a_messageCounter - a_discardedCounter; }
149
150   /**
151      Establece el limite de carga que vamos a imponer.
152      \param limit Limite de carga. Debe ser un valor entre 0 y 100.
153   */
154   void setLimit(const int limit) throw();
155
156   /**
157      Establece el modo en que vamos a calcular la carga.
158      \param mode Modo usado para calcular la carga.
159   */
160   void setMode(const Mode::_v mode) throw() {
161     if((a_mode = mode) == Mode::Auto) {
162       WHEN_MULTITHREAD(a_effectiveMode = Mode::Local);
163       WHEN_SINGLETHREAD(a_effectiveMode = Mode::Global);
164     } else
165       a_effectiveMode = mode;
166   }
167
168   /**
169      Devuelve \em true si esta instancia ha recibido datos o \em false en otro caso.
170      \return \em true si esta instancia ha recibido datos o \em false en otro caso.
171   */
172   bool isEmpty() const throw() { return a_avgWorkload.isEmpty(); }
173
174   /**
175    *  Establece el nº máximo de bytes que deberían tener los socket en la cola de entrada.
176    * \param maxPendingBytes Número máximo de bytes que debería tener los socket en la cola de entrada.
177    * \warning El valor indicado debe estar dentro del siguiente ámbito:
178    * \code
179    * [#comm::Communicator::getReadingChunkSize, min (#comm::Communicator::getReadingChunkSize * 4, #MaxPendingBytes)]
180    * \endcode
181    */
182   void setMaxPendingBytes(const int maxPendingBytes) throw(RuntimeException);
183
184   /**
185      Devuelve el consejo sobre si debemos tratar/o no el ultimo mensaje recibido
186      por el ClientSocket recibido como parametro.
187      \param clientSocket Socket cliente por el que hemos recibido el ultimo mensaje.
188      \return Advice::Process para indicar que debemos procesar el mensaje
189      , Advice::None para indicar que todavia no tiene datos para tomar una
190      decision clara o Advice::Discard para indicar que debemos descartar.
191   */
192   Advice::_v getAdvice(const ClientSocket& clientSocket) throw();
193
194   /**
195      Devuelve el consejo sobre si debemos tratar/o no el ultimo mensaje recibido
196      por el ClientSocket recibido como parametro.
197      \param clientSocket Socket cliente por el que hemos recibido el ultimo mensaje.
198      \return Advice::Process para indicar que debemos procesar el mensaje
199      , Advice::None para indicar que todavia no tiene datos para tomar una
200      decision clara o Advice::Discard para indicar que debemos descartar.
201   */
202   Advice::_v getAdvice(const ClientSocket* clientSocket) throw() {
203     return (clientSocket == NULL) ? Advice::Process : getAdvice(*clientSocket);
204   }
205
206   /**
207    * Devuelve información sobre las estadísticas de carga en las que se basó para hacer
208    * los cálculos.
209    *
210    * \warning Sólo debería invocarse a este método después de invocar a #getAdvice.
211    * \warning Este método no es MT-estricto, por lo que en un entorno MT los valores
212    * sobre los que se calculó el último #getAdvice y los datos obtenidos con este método
213    * pueden haber variado ligeramente.
214    *
215    * \see #getLoad
216    * \see #getLevel
217    */
218   Workload getAccumulatedWorkload() const throw();
219
220   /**
221    * Devuelve información sobre las estadísticas de carga del socket recibido como parámetro.
222    *
223    *
224    * \param clientSocket Socket del que se quiere obtener el estado actual de ocupación.
225    *
226    * \warning Sólo debería invocarse a este método después de invocar a #getAdvice.
227    * \warning Este método no es MT-estricto, por lo que en un entorno MT los valores
228    * sobre los que se calculó el último #getAdvice y los datos obtenidos con este método
229    * pueden haber variado ligeramente.
230    *
231    * \see #getLoad
232    * \see #getLevel
233    */
234   Workload getCurrentWorkload(const ClientSocket& clientSocket) const throw();
235
236   /**
237      Devuelve un documento XML con la informacion relevante sobre esta clase.
238      \return un documento XML con la informacion relevante sobre esta clase.
239   */
240   xml::Node* asXML(xml::Node* parent) const throw();
241
242   /**
243    * Extrae la carga media que soporta es proceso. La carga es un número en [0, 100].
244    * \return la carga media que soporta es proceso. La carga es un número en [0, 100].
245    */
246   static int getLoad(const Workload& workload) throw() { return workload.second; }
247
248   /**
249    * Extrae el nivel de carga en la que está el proceso. El nivel de carga es un número en [0, 4].
250    * \return el nivel de carga en la que está el proceso. El nivel de carga es un número en [0, 4].
251    */
252   static int getLevel(const Workload& workload) throw() { return workload.first; }
253
254 private:
255   static const Millisecond DelayTrace;
256   static const int UnusedPendingBytes = -1;
257   static const Millisecond HeartBeat;
258
259   //----------------------------------------------------------------------------------
260   // a_limit: % de ocupacion maxima al que vamos a limitar los canales.
261   // a_discardLevel: Valor maximo del % de cada nivel de descarte. 0 = 25, 1 = 50,
262   // 2 = 85, 3 = 99
263   //----------------------------------------------------------------------------------
264   int a_limit;
265   int a_discardLevel [MaxLevel];
266   int a_percentage [MaxLevel];
267   NRMutex a_mutex;
268   Mode::_v a_mode;
269   Mode::_v a_effectiveMode;
270   Average <unsigned int> a_avgWorkload;
271   unsigned int a_messageCounter;
272   unsigned int a_discardedCounter;
273   mutable Millisecond a_timeTrace;
274   int a_maxPendingBytes;
275   int a_incomingSocketCounter;
276   Millisecond a_tickTime;
277
278   void incrementIncomingSocket() throw(RuntimeException);
279   void decrementIncomingSocket() throw(RuntimeException);
280
281   CongestionController();
282   CongestionController(const CongestionController&);
283
284   int calculeWorkload(const ClientSocket&) const throw();
285
286   friend class Singleton <CongestionController>;
287
288   friend class ClientSocket;
289   // getUpdatePeriod
290
291   friend class handler::LocalConnection;
292   // incrementIncomingSocket, decrementIncomingSocket
293 };
294
295 }
296 }
297
298 #endif
299