1 // ANNA - Anna is Not Nothingness Anymore //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
9 #include <anna/diameter.comm/OamModule.hpp>
10 #include <anna/diameter.comm/LocalServer.hpp>
11 #include <anna/diameter.comm/ServerSession.hpp>
12 #include <anna/diameter.comm/ServerSocket.hpp>
13 #include <anna/diameter.comm/Engine.hpp>
14 #include <anna/core/functions.hpp>
15 #include <anna/statistics/Engine.hpp>
16 #include <anna/diameter.comm/TimerManager.hpp>
18 #include <anna/core/tracing/Logger.hpp>
19 #include <anna/core/tracing/TraceMethod.hpp>
20 #include <anna/core/functions.hpp>
21 #include <anna/app/functions.hpp>
22 #include <anna/xml/Node.hpp>
23 #include <anna/comm/Communicator.hpp>
24 #include <anna/comm/Network.hpp>
25 #include <anna/comm/Host.hpp>
26 #include <anna/comm/ClientSocket.hpp>
33 using namespace anna::diameter::comm;
35 LocalServer::LocalServer() :
39 a_currentConnections(0),
40 a_allowedInactivityTime(ServerSession::DefaultAllowedInactivityTime),
45 a_lastUsedResource(NULL) {}
48 void LocalServer::initializeStatisticResources() {
49 std::string accName = "local server '";
50 accName += anna::functions::socketLiteralAsString(a_key.first, a_key.second);
52 a_messageStatistics.initialize(accName);
55 void LocalServer::resetStatistics() {
56 a_messageStatistics.getAccumulator()->reset();
59 void LocalServer::updateProcessingTimeStatisticConcept(const double &value, const anna::diameter::CommandId &cid) {
60 a_messageStatistics.process(MessageStatistics::ConceptType::SentRequestProcessingTime, cid, value);
61 LOGDEBUG(anna::Logger::debug(a_messageStatistics.getAccumulator()->asString(), ANNA_FILE_LOCATION));
64 void LocalServer::updateReceivedMessageSizeStatisticConcept(const double &value, const anna::diameter::CommandId &cid) {
65 a_messageStatistics.process(MessageStatistics::ConceptType::ReceivedMessageSize, cid, value);
66 LOGDEBUG(anna::Logger::debug(a_messageStatistics.getAccumulator()->asString(), ANNA_FILE_LOCATION));
69 ServerSession* LocalServer::allocateServerSession() { return a_serverSessionsRecycler.create(); }
70 void LocalServer::releaseServerSession(ServerSession *serverSession) { a_serverSessionsRecycler.release(serverSession); }
73 LocalServer::serverSession_iterator LocalServer::serverSession_find(const serverSession_key &key) {
74 return a_serverSessions.find(key);
78 LocalServer::serverSession_key LocalServer::getServerSessionKey(const anna::comm::ClientSocket &clientSocket) const {
79 return (anna::functions::hash(clientSocket.getRemoteAccessPoint().getINetAddress().serialize().c_str()));
83 void LocalServer::availabilityLost() {
85 std::string socket = anna::functions::socketLiteralAsString(a_key.first, a_key.second);
87 std::string msg = "diameter::comm::LocalServer { Socket: ";
89 msg += " } has lost its availability";
90 anna::Logger::debug(msg, ANNA_FILE_LOCATION);
93 OamModule &oamModule = OamModule::instantiate();
94 oamModule.activateAlarm(OamModule::Alarm::c_LostAvailabilityOverLocalServerDefinedAs__s__, socket.c_str());
95 oamModule.count(OamModule::Counter::LostAvailabilityOverLocalServer);
96 a_engine->refreshAvailabilityForLocalServers();
100 void LocalServer::availabilityRecovered() {
102 std::string socket = anna::functions::socketLiteralAsString(a_key.first, a_key.second);
104 std::string msg = "diameter::comm::LocalServer { Socket: ";
106 msg += " } has recovered its availability";
107 anna::Logger::debug(msg, ANNA_FILE_LOCATION);
110 OamModule &oamModule = OamModule::instantiate();
111 oamModule.cancelAlarm(OamModule::Alarm::c_LostAvailabilityOverLocalServerDefinedAs__s__, socket.c_str());
112 oamModule.count(OamModule::Counter::RecoveredAvailabilityOverLocalServer);
113 a_engine->refreshAvailabilityForLocalServers();
118 bool LocalServer::refreshAvailability() {
120 if(a_available) { // check not-bound state for all server-sessions:
121 // bool isolate = true;
123 // for (const_serverSession_iterator it = serverSession_begin(); it != serverSession_end(); it++)
124 // if (serverSession(it)->getState() != ServerSession::State::Closed) { isolate = false; break; }
127 // El problema de lo anterior, es que cuando se acepta una conexion, aun no ha llegado el CER (receive). Un server session
128 // esta en estado "Bound" cuando llega dicho CER y consecuentemente envio un CEA. Nos basaremos en 'a_currentConnections':
129 if(a_currentConnections == 0) {
137 // Here not available
138 // for (const_serverSession_iterator it = serverSession_begin(); it != serverSession_end(); it++)
139 // if (serverSession(it)->getState() == ServerSession::State::Bound) {
140 if(a_currentConnections > 0) { // really == 0
141 availabilityRecovered();
149 void LocalServer::enable(bool unlock) noexcept(false) {
151 if(unlock) a_lock = false;
155 if(a_serverSocket && a_serverSocket->isOpened()) return; // communicator attach twice gets poll bad file descriptor and application stops !
157 // Resolve local address:
158 anna::comm::Network& network = anna::comm::Network::instantiate();
160 anna::comm::Host *host = network.resolve(a_key.first /* addr */);
161 const anna::comm::Device *device = *(host->device_begin());
162 anna::comm::INetAddress localAddress(device, a_key.second /* port */);
163 // Create server socket and assign receiver factory
164 a_serverSocket = new ServerSocket(localAddress, this);
165 a_serverSocket->setCategory(a_category);
169 void LocalServer::attach() {
171 // Attach to communicator
172 anna::comm::Communicator * communicator = anna::app::functions::component <anna::comm::Communicator> (ANNA_FILE_LOCATION);
173 communicator->attach((anna::comm::ServerSocket*)a_serverSocket); // invokes handler insert and then initialize -> server socket bind (*)
175 OamModule &oamModule = OamModule::instantiate();
176 oamModule.count(OamModule::Counter::ServerSocketsOpened);
177 } catch(anna::RuntimeException& ex) {
178 ex.trace(); // fails on (*) (i.e. Address already in use), within communicator attach
180 a_serverSocket->close();
184 void LocalServer::attachPlanning() {
185 LOGMETHOD(anna::TraceMethod tttm("diameter::comm::LocalServer", "attachPlanning", ANNA_FILE_LOCATION));
188 TimerManager::instantiate().createTimer(this);
189 } catch(anna::RuntimeException& ex) {
191 anna::Logger::error("CAPTURED EXCEPTION activating attachPlanning timer", ANNA_FILE_LOCATION);
196 void LocalServer::disable(bool lock) noexcept(false) {
199 anna::comm::Communicator * communicator = anna::app::functions::component <anna::comm::Communicator> (ANNA_FILE_LOCATION);
200 communicator->detach((anna::comm::ServerSocket*)a_serverSocket);
201 //delete(a_serverSocket);
203 OamModule &oamModule = OamModule::instantiate();
204 oamModule.count(OamModule::Counter::ServerSocketsClosed);
208 void LocalServer::lostConnection() {
209 a_currentConnections--;
214 void LocalServer::newConnection() noexcept(false) {
215 a_currentConnections++;
218 if(a_currentConnections == a_maxConnections) {
219 LOGWARNING(anna::Logger::warning("The maximum number of connections allowed over diameter server socket have already been served", ANNA_FILE_LOCATION));
223 // Inform local server (availability changes):
224 refreshAvailability();
227 OamModule &oamModule = OamModule::instantiate();
228 oamModule.count(OamModule::Counter::CreatedConnectionForServerSession);
233 ServerSession *LocalServer::createServerSession(const anna::comm::ClientSocket &clientSocket) noexcept(false) {
234 LOGMETHOD(anna::TraceMethod tttm("diameter::comm::LocalServer", "createServerSession", ANNA_FILE_LOCATION));
235 ServerSession* result(NULL);
236 // First erase deprecated ones:
237 std::vector<const ServerSession*> deprecated_server_sessions;
238 const ServerSession* ss;
240 for(const_serverSession_iterator it = serverSession_begin(); it != serverSession_end(); it++) {
241 ss = serverSession(it);
244 deprecated_server_sessions.push_back(ss);
247 std::vector<const ServerSession*>::iterator dc_ncit;
248 std::vector<const ServerSession*>::iterator dc_min(deprecated_server_sessions.begin());
249 std::vector<const ServerSession*>::iterator dc_max(deprecated_server_sessions.end());
250 serverSession_iterator ii;
252 for(dc_ncit = dc_min; dc_ncit != dc_max; dc_ncit++) {
253 ii = serverSession_find((*dc_ncit)->getSocketId());
254 a_serverSessions.erase(ii);
257 // End erase deprecated server sessions
259 if((result = allocateServerSession()) == NULL)
260 throw anna::RuntimeException("diameter::comm::LocalServer::allocateServerSession returns NULL", ANNA_FILE_LOCATION);
263 result->initialize(); // warning: recycler does not initialize its objects and at least...
264 // Assignments (it could be done at allocate):
265 serverSession_key key = getServerSessionKey(clientSocket);
266 result->setAllowedInactivityTime(getAllowedInactivityTime());
267 result->setClientSocket((anna::comm::ClientSocket*)(&clientSocket));
268 result->a_parent = this;
269 result->a_socketId = key; // de momento...
270 result->initializeSequences(); // despues de asignar el LocalServer y el socketId (sequences are seed-based by mean exclusive hash)
271 a_serverSessions.insert(serverSession_value_type(key, result));
273 a_deliveryIterator = serverSession_begin();
279 void LocalServer::closeServerSession(ServerSession* serverSession)
281 if(serverSession == NULL)
285 std::string msg("diameter::comm::LocalServer::closeServerSession | ");
286 msg += serverSession->asString();
287 // msg += " | Destroy: ";
288 // msg += (destroy ? "yes" : "no");
289 anna::Logger::debug(msg, ANNA_FILE_LOCATION);
291 serverSession_iterator ii = serverSession_find(serverSession->getSocketId());
293 if(ii == serverSession_end())
296 // Remove origin-realm / origin-host for server session in delivery map
297 // This is related to http://redmine.teslayout.com/issues/41
298 a_engine->manageDrDhServerSession(serverSession, false /* desregister */);
301 //serverSession->setState(ServerSession::State::Closing); NOT MANAGED WITH SERVER SESSIONS
302 serverSession->unbind(true /* always forceDisconnect on server sessions ... */);
303 releaseServerSession(serverSession);
304 } catch(anna::RuntimeException& ex) {
308 //a_serverSessions.erase(ii); // IMPORTANTE: posible fuente de cores de este tipo, en relacion con ServerSession::finalize() => delete(this)
309 // #0 0x0000003ca1c2e26d in raise () from /lib64/tls/libc.so.6
311 // #0 0x0000003ca1c2e26d in raise () from /lib64/tls/libc.so.6
312 // #1 0x0000003ca1c2fa6e in abort () from /lib64/tls/libc.so.6
313 // #2 0x0000003ca8cb1148 in __gnu_cxx::__verbose_terminate_handler () from /usr/lib64/libstdc++.so.6
314 // #3 0x0000003ca8caf176 in __cxa_call_unexpected () from /usr/lib64/libstdc++.so.6
315 // #4 0x0000003ca8caf1a3 in std::terminate () from /usr/lib64/libstdc++.so.6
316 // #5 0x0000003ca8caf1b6 in std::terminate () from /usr/lib64/libstdc++.so.6
317 // #6 0x0000003ca8caf0c8 in __cxa_call_unexpected () from /usr/lib64/libstdc++.so.6
318 // #7 0x000000000047a4a7 in anna::diameter::comm::LocalServer::lostConnection (this=0x8aeb10) at comm.db/diameter.comm.LocalServer.cc:200
319 // #8 0x000000000047a9e6 in anna::diameter::comm::LocalServer::closeServerSession (this=0x8aeb10, serverSession=0xc37a00)
320 // at comm.db/diameter.comm.LocalServer.cc:275
321 // #9 0x000000000048d288 in anna::diameter::comm::ServerSession::finalize (this=0xc37a00) at comm.db/diameter.comm.ServerSession.cc:510
322 // #10 0x0000000000494e4f in anna::diameter::comm::ServerSessionReceiver::eventBreakLocalConnection (this=0xc119c0, clientSocket=@0xb0ea00)
323 // SOLUCION: no borrar aqui, marcar como "deprecated". Este estado no se necesita realmente puesto que nadie volvera a usar este recurso.
324 // Pero simplemente se podria usar para purgar mediante temporizacion (entonces s� se har�a el erase)
325 serverSession->a_deprecated = true;
326 // WE WILL ERASE AT createServerSession
327 a_deliveryIterator = serverSession_begin();
332 ServerSession* LocalServer::findServerSession(int socketId, anna::Exception::Mode::_v emode)
334 serverSession_iterator ii = serverSession_find(socketId);
336 if(ii != serverSession_end())
337 return serverSession(ii);
339 if(emode != anna::Exception::Mode::Ignore) {
340 std::string msg("diameter::comm::LocalServer::findServerSession | SocketId: ");
341 msg += anna::functions::asString(socketId);
342 msg += " | ServerSession not found";
343 anna::RuntimeException ex(msg, ANNA_FILE_LOCATION);
345 if(emode == anna::Exception::Mode::Throw)
354 ServerSession* LocalServer::findServerSession(const anna::comm::ClientSocket &clientSocket, anna::Exception::Mode::_v emode)
356 return findServerSession(getServerSessionKey(clientSocket), emode);
360 int LocalServer::getOTARequests() const {
363 for(const_serverSession_iterator it = serverSession_begin(); it != serverSession_end(); it++)
364 result += serverSession(it)->getOTARequests();
369 void LocalServer::close() noexcept(false) {
370 LOGMETHOD(anna::TraceMethod tttm("diameter::comm::LocalServer", "close", ANNA_FILE_LOCATION));
371 // Close listener (permanently to avoid reopening when local connections are being deleted):
372 disable(true /* lock */);
374 for(serverSession_iterator it = serverSession_begin(); it != serverSession_end(); it++)
375 closeServerSession(serverSession(it));
378 void LocalServer::setClassCodeTimeout(const ClassCode::_v v, const anna::Millisecond & millisecond) {
379 LOGMETHOD(anna::TraceMethod tttm("diameter::comm::LocalServer", "setClassCodeTimeout", ANNA_FILE_LOCATION));
381 for(serverSession_iterator it = serverSession_begin(); it != serverSession_end(); it++) {
383 serverSession(it)->setClassCodeTimeout(v, millisecond);
384 } catch(anna::RuntimeException &ex) {
390 void LocalServer::setMaxConnections(int maxConnections) noexcept(false) {
391 LOGMETHOD(anna::TraceMethod tttm("anna::diameter::comm::LocalServer", "setMaxConnections", ANNA_FILE_LOCATION));
393 // Negative & initial
394 if(maxConnections < 0) {
395 LOGDEBUG(anna::Logger::debug("Provided negative value means no limit accepting connections over server socket. Opening listen port (if closed)...", ANNA_FILE_LOCATION));
396 a_maxConnections = -1;
402 if(maxConnections == a_maxConnections) {
403 LOGDEBUG(anna::Logger::debug("Provided equal to current. Ignore operation", ANNA_FILE_LOCATION));
408 if(maxConnections < a_currentConnections) {
409 std::string msg = "There are more current connections (";
410 msg += anna::functions::entriesAsString(a_currentConnections);
411 msg += ") than provided maximum (";
412 msg += anna::functions::entriesAsString(maxConnections);
413 msg += "). Command rejected (you should release connections before logical limitation)";
414 throw anna::RuntimeException(msg, ANNA_FILE_LOCATION);
418 if(maxConnections > a_currentConnections) {
419 LOGDEBUG(anna::Logger::debug("Increasing connection margin (new limit is greater than current connections). Opening listen port (if closed)...", ANNA_FILE_LOCATION));
421 } else { // maxConnections == a_currentConnections: listen port must be closed if it is opened
424 if(maxConnections == 0)
425 anna::Logger::debug("Provided zero value means disabling diameter server", ANNA_FILE_LOCATION);
426 anna::Logger::debug("Zeroing connections margin (new limit is equal to current connections). Closing listen port (if opened)...", ANNA_FILE_LOCATION);
434 std::string msg("Updating max connections from ");
435 msg += (a_maxConnections == -1) ? "'no limit'" : anna::functions::asString(a_maxConnections);
437 msg += (maxConnections == -1) ? "'no limit'" : anna::functions::asString(maxConnections);
438 anna::Logger::debug(msg, ANNA_FILE_LOCATION);
441 a_maxConnections = maxConnections;
445 bool LocalServer::send(const Message* message, int socketId) noexcept(false) {
446 LOGMETHOD(anna::TraceMethod tttm("diameter::comm::LocalServer", "send", ANNA_FILE_LOCATION));
450 std::string msg = "The local server ";
452 if(a_description != "") {
454 msg += a_description;
457 msg += "is currently unavailable (no server sessions to send the message)";
458 anna::Logger::warning(msg, ANNA_FILE_LOCATION);
463 if(socketId != -1) { // socket id provided
464 // Send (it was a request because of key (socketId) != -1, we forward the answer):
466 ServerSession * fixedServerSession = a_engine->findServerSession(socketId); // exception if not found
467 fixedServerSession->send(message);
469 } catch(anna::RuntimeException &ex) {
470 std::string msg = "Cannot deliver answer through a fixed server session (socket id ";
471 msg += anna::functions::asString(socketId);
472 msg += "). Perhaps it existed but not now. Ignore";
473 anna::Logger::error(msg, ANNA_FILE_LOCATION);
479 // Socket is not provided: use readSocketId
480 socketId = (a_currentConnections > 1) ? readSocketId(message) : -1; // optimization
482 if(a_deliveryIterator == serverSession_end()) a_deliveryIterator = serverSession_begin();
484 a_lastUsedResource = (*a_deliveryIterator).second;
487 a_lastUsedResource = findServerSession(socketId); // exception if not found
488 } else { // Round Robin delivery between client-sessions
489 if(getCurrentConnections() != 1) { // optimize
490 // Next server-session:
491 a_deliveryIterator++;
497 a_lastUsedResource->send(message);
498 return true; // no matter if response is NULL (answers, i.e.) or not.
499 } catch(anna::RuntimeException &ex) {
503 // Here, sent has failed:
505 OamModule &oamModule = OamModule::instantiate();
506 std::string socket = anna::functions::socketLiteralAsString(getKey().first, getKey().second);
507 oamModule.activateAlarm(OamModule::Alarm::UnableToDeliverDiameterMessageToClientFromLocalServer__s__, socket.c_str());
508 oamModule.count(OamModule::Counter::UnableToDeliverToClient);
513 bool LocalServer::broadcast(const Message* message) noexcept(false) {
514 LOGMETHOD(anna::TraceMethod tttm("diameter::comm::LocalServer", "broadcast", ANNA_FILE_LOCATION));
517 for(serverSession_iterator it = serverSession_begin(); it != serverSession_end(); it++) {
519 serverSession(it)->send(message);
520 } catch(anna::RuntimeException &ex) {
529 void LocalServer::eventPeerShutdown(const ServerSession* serverSession) {
531 std::string msg(serverSession->asString());
532 msg += " | eventPeerShutdown";
533 anna::Logger::warning(msg, ANNA_FILE_LOCATION);
537 void LocalServer::eventRequestRetransmission(const ServerSession* serverSession, Message *request) {
539 std::string msg(serverSession->asString());
540 msg += " | eventRequestRetransmission";
541 anna::Logger::warning(msg, ANNA_FILE_LOCATION);
545 std::string LocalServer::asString() const {
546 std::string result("diameter::comm::LocalServer { ");
548 result += anna::functions::socketLiteralAsString(getKey().first, getKey().second);
549 result += " | Description: ";
550 result += (a_description != "") ? a_description : "undefined";
551 result += " | Available (any server session bound): ";
552 result += a_available ? "yes" : "no";
553 result += " | Max Connections: ";
554 result += anna::functions::asString(a_maxConnections);
555 result += " | Current Connections: ";
556 result += anna::functions::asString(a_currentConnections);
557 // Current connections ??
558 result += " | Allowed inactivity time for server sessions: ";
559 result += a_allowedInactivityTime.asString();
560 result += " | Server socket: ";
561 result += a_serverSocket ? a_serverSocket->asString() : "closed";
562 result += anna::functions::asString(" | OTA requests: %d%s", getOTARequests(), idle() ? " (idle)" : "");
563 result += " | Last Incoming Activity Time: ";
564 result += a_lastIncomingActivityTime.asString();
565 result += " | Last Outgoing Activity Time: ";
566 result += a_lastOutgoingActivityTime.asString();
568 // result += a_messageStatistics.getAccumulator()->asString();
569 // ServerSessions only in xml
570 return result += " }";
574 anna::xml::Node* LocalServer::asXML(anna::xml::Node* parent) const {
575 anna::xml::Node* result = parent->createChild("diameter.LocalServer");
576 result->createAttribute("Description", (a_description != "") ? a_description : "undefined");
577 result->createAttribute("Available", a_available ? "yes" : "no");
578 result->createAttribute("MaxConnections", a_maxConnections);
579 result->createAttribute("CurrentConnections", a_currentConnections);
580 // Current connections ??
581 result->createAttribute("AllowedInactivityTimeForServerSessions", a_allowedInactivityTime.asString());
584 a_serverSocket->asXML(result);
586 result->createAttribute("ServerSocket", "closed");
588 result->createAttribute("OTArequests", anna::functions::asString("%d%s", getOTARequests(), idle() ? " (idle)" : ""));
589 result->createAttribute("LastIncomingActivityTime", a_lastIncomingActivityTime.asString());
590 result->createAttribute("LastOutgoingActivityTime", a_lastOutgoingActivityTime.asString());
592 anna::xml::Node* stats = result->createChild("MessageStatistics");
593 a_messageStatistics.getAccumulator()->asXML(stats);
594 anna::xml::Node* serverSessions = result->createChild("ServerSessions"); // LocalServer.ServerSessions
596 for(const_serverSession_iterator it = serverSession_begin(); it != serverSession_end(); it++)
597 serverSession(it)->asXML(serverSessions);
602 //------------------------------------------------------------------------------
603 //------------------------------------ LocalServer::updateIncomingActivityTime()
604 //------------------------------------------------------------------------------
605 void LocalServer::updateIncomingActivityTime() {
606 a_lastIncomingActivityTime = anna::functions::millisecond();
609 std::string msg = "Updated INCOMING activity on local server (milliseconds unix): ";
610 msg += anna::functions::asString(a_lastIncomingActivityTime.getValue());
611 anna::Logger::debug(msg, ANNA_FILE_LOCATION);
616 //------------------------------------------------------------------------------
617 //------------------------------------ LocalServer::updateOutgoingActivityTime()
618 //------------------------------------------------------------------------------
619 void LocalServer::updateOutgoingActivityTime(void) {
620 a_lastOutgoingActivityTime = anna::functions::millisecond();
623 std::string msg = "Updated OUTGOING activity on local server (milliseconds unix): ";
624 msg += anna::functions::asString(a_lastOutgoingActivityTime.getValue());
625 anna::Logger::debug(msg, ANNA_FILE_LOCATION);