X-Git-Url: https://git.teslayout.com/public/public/public/?a=blobdiff_plain;f=source%2Fdiameter.comm%2FOriginHost.cpp;fp=source%2Fdiameter.comm%2FOriginHost.cpp;h=ed181bdd4873bf04702c52be81368985494b2e5e;hb=d723d5bf571eb48c641b092058eaa38bb6c4fcc8;hp=0000000000000000000000000000000000000000;hpb=61f1340da3cae5159d2e3bc14fc47c6d4bf9453e;p=anna.git diff --git a/source/diameter.comm/OriginHost.cpp b/source/diameter.comm/OriginHost.cpp new file mode 100644 index 0000000..ed181bd --- /dev/null +++ b/source/diameter.comm/OriginHost.cpp @@ -0,0 +1,415 @@ +// ANNA - Anna is Not Nothingness Anymore // +// // +// (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo // +// // +// See project site at http://redmine.teslayout.com/projects/anna-suite // +// See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE // + + +// Standard +#include + +// Project +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace anna::diameter::comm; + + +OriginHost::OriginHost(anna::diameter::comm::Engine* commEngine, unsigned int applicationId) : + a_commEngine(commEngine), a_applicationId(applicationId) { + + a_codecEngine = anna::diameter::codec::EngineManager::instantiate().getCodecEngine(applicationId); // i know, this is going to exist (getCodecEngine is not null) + + a_logFile = ""; + a_burstLogFile = ""; + a_splitLog = false; + a_detailedLog = false; + a_dumpLog = false; + a_entity = NULL; + a_diameterServer = NULL; + + // Comm resources: + a_requestRetransmissions = 0; + + // Burst + a_burstCycle = 1; + a_burstRepeat = false; + a_burstActive = false; + a_burstLoadIndx = 0; + a_burstDeliveryIt = a_burstMessages.begin(); + a_otaRequest = 0; + a_burstPopCounter = 0; +} + +const std::string &OriginHost::getName() const throw() { + return a_commEngine->getOriginHostName(); +} + +void OriginHost::createEntity(const std::string &entityRepresentation, const anna::Millisecond &bindTimeout, const anna::Millisecond &applicationTimeout) throw(anna::RuntimeException) { + + anna::socket_v servers = anna::functions::getSocketVectorFromString(entityRepresentation); + std::string entityDescription = "Launcher diameter entity for "; entityDescription += getName(); + a_entity = (anna::diameter::comm::Entity*)a_commEngine->createEntity(servers, entityDescription); + a_entity->setClassCodeTimeout(anna::diameter::comm::ClassCode::Bind, bindTimeout); + a_entity->setClassCodeTimeout(anna::diameter::comm::ClassCode::ApplicationMessage, applicationTimeout); +} + +void OriginHost::createDiameterServer(const std::string &serverRepresentation, int sessions, const anna::Millisecond &inactivityTimeout, const anna::Millisecond &applicationTimeout, const std::string &ceaPathfile) throw(anna::RuntimeException) { + + //if(sessions <= 0) return; negative implies no limit for accepted connections + + std::string address; int port; + anna::functions::getAddressAndPortFromSocketLiteral(serverRepresentation, address, port); + std::string serverDescription = "Launcher diameter local server for "; serverDescription += getName(); + a_commEngine->setCEA(ceaPathfile); + a_diameterServer = (anna::diameter::comm::LocalServer*)(a_commEngine->createLocalServer(address, port, sessions)); + // we could set sessions = 0, and after application run(), use setMaxConnections(real sessions) + // over the local server in order to start it. + + a_diameterServer->setDescription(serverDescription); + a_diameterServer->setAllowedInactivityTime(inactivityTimeout); + a_diameterServer->setClassCodeTimeout(anna::diameter::comm::ClassCode::ApplicationMessage, applicationTimeout); +} + +anna::diameter::comm::Message *OriginHost::createCommMessage() throw(anna::RuntimeException) { + anna::diameter::comm::Message *result = a_commMessages.create(); + result->setRetries(a_requestRetransmissions); + if (a_requestRetransmissions > 0) result->setOnExpiry(anna::diameter::comm::Message::OnExpiry::Retransmit); + return result; +} + + +void OriginHost::releaseCommMessage(anna::diameter::comm::Message *msg) throw() { + a_commMessages.release(msg); +} + + +void OriginHost::writeLogFile(const anna::DataBlock & db, const std::string &logExtension, const std::string &detail) const throw() { + anna::diameter::codec::Message codecMsg; + try { codecMsg.decode(db); } catch(anna::RuntimeException &ex) { ex.trace(); } + writeLogFile(codecMsg, logExtension, detail); +} + +// Already decoded: +void OriginHost::writeLogFile(const anna::diameter::codec::Message &decodedMessage, const std::string &logExtension, const std::string &detail) const throw() { + // Open target file: + std::string targetFile = a_logFile; + + if(a_splitLog) { + targetFile += "."; + targetFile += logExtension; + } + + std::ofstream out(targetFile.c_str(), std::ifstream::out | std::ifstream::app); + // Set text to dump: + std::string title = "["; + title += logExtension; + title += "]"; + // Build complete log: + std::string log = "\n"; + std::string xml = decodedMessage.asXMLString(); + + + if(a_detailedLog) { + anna::time::Date now; + now.setNow(); + title += " "; + title += now.asString(); + log += anna::functions::highlight(title, anna::functions::TextHighlightMode::OverAndUnderline); + log += xml; + log += "\n"; + log += anna::functions::highlight("Used resource"); + log += detail; + log += "\n"; + } else { + log += title; + log += "\n"; + log += xml; + log += "\n"; + } + + if(a_dumpLog) { + // .......xml + std::string name = anna::functions::asString((anna::Millisecond)anna::functions::millisecond()); + name += "."; + name += getCommEngine()->getOriginHostName(); + name += "."; + name += anna::functions::asString(decodedMessage.getHopByHop()); + name += "."; + name += anna::functions::asString(decodedMessage.getEndToEnd()); + name += "."; + name += anna::functions::asString(decodedMessage.getId().first); + name += "."; + name += ((decodedMessage.getId().second) ? "request.":"answer."); + name += logExtension; + name += ".xml"; + std::ofstream outMsg(name.c_str(), std::ifstream::out | std::ifstream::app); + outMsg.write(xml.c_str(), xml.size()); + outMsg.close(); + } + + // Write and close + out.write(log.c_str(), log.size()); + out.close(); +} + +void OriginHost::writeBurstLogFile(const std::string &buffer) throw() { + std::ofstream out(a_burstLogFile.c_str(), std::ifstream::out | std::ifstream::app); + out.write(buffer.c_str(), buffer.size()); + out.close(); // close() will be called when the object is destructed (i.e., when it goes out of scope). + // you'd call close() only if you indeed for some reason wanted to close the filestream + // earlier than it goes out of scope. +} + +int OriginHost::clearBurst() throw() { + int size = a_burstMessages.size(); + + if(size) { + std::map::const_iterator it; + std::map::const_iterator it_min(a_burstMessages.begin()); + std::map::const_iterator it_max(a_burstMessages.end()); + + for(it = it_min; it != it_max; it++) releaseCommMessage((*it).second); + + a_burstMessages.clear(); + } else { + std::string msg = "Burst list already empty. Nothing done"; + std::cout << msg << std::endl; + LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION)); + } + + a_burstActive = false; + a_burstLoadIndx = 0; + a_burstDeliveryIt = a_burstMessages.begin(); + return size; +} + +int OriginHost::loadBurstMessage(const anna::DataBlock & db) throw(anna::RuntimeException) { + anna::diameter::comm::Message *msg = createCommMessage(); + msg->setBody(db); + a_burstMessages[a_burstLoadIndx++] = msg; + return (a_burstLoadIndx - 1); +} + +int OriginHost::stopBurst() throw() { + if(!a_burstActive) { + std::string msg = "Burst launch is already stopped. Nothing done"; + std::cout << msg << std::endl; + LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION)); + return -1; + } + + a_burstActive = false; + // Remaining on cycle: + return (a_burstMessages.size() - (*a_burstDeliveryIt).first); +} + +int OriginHost::popBurst(int releaseAmount) throw() { + if(!a_burstActive) { + std::string msg = "Burst launch is stopped. Nothing done"; + std::cout << msg << std::endl; + LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION)); + return -1; + } + + if(releaseAmount < 1) { + std::string msg = "No valid release amount is specified. Ignoring burst pop"; + std::cout << msg << std::endl; + LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION)); + return -2; + } + + int currentOTArequests = a_entity->getOTARequests(); + a_burstPopCounter = (releaseAmount > currentOTArequests) ? currentOTArequests : releaseAmount; + return a_burstPopCounter; +} + +int OriginHost::pushBurst(int loadAmount) throw() { + if(a_burstMessages.size() == 0) { + std::string msg = "Burst data not found (empty list). Ignoring burst launch"; + std::cout << msg << std::endl; + LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION)); + return -1; + } + + if(loadAmount < 1) { + std::string msg = "No valid load amount is specified. Ignoring burst push"; + std::cout << msg << std::endl; + LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION)); + return -2; + } + + a_burstActive = true; + int count; + + for(count = 0; count < loadAmount; count++) + if(!sendBurstMessage()) break; + + return count; +} + +int OriginHost::sendBurst(int loadAmount) throw() { + if(a_burstMessages.size() == 0) { + std::string msg = "Burst data not found (empty list). Ignoring burst launch"; + std::cout << msg << std::endl; + LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION)); + return -1; + } + + if(loadAmount < 1) { + std::string msg = "No valid load amount is specified. Ignoring burst send"; + std::cout << msg << std::endl; + LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION)); + return -2; + } + + int count; + + for(count = 0; count < loadAmount; count++) + if(!sendBurstMessage(true /* anyway */)) break; + + return count; +} + +int OriginHost::startBurst(int initialLoad) throw() { + if(initialLoad < 1) { + std::string msg = "No initial load is specified. Ignoring burst start"; + std::cout << msg << std::endl; + LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION)); + return -2; + } + + a_burstActive = true; + a_burstCycle = 1; + a_burstDeliveryIt = a_burstMessages.begin(); + return (pushBurst(initialLoad)); +} + +bool OriginHost::sendBurstMessage(bool anyway) throw() { + if(!anyway && !burstActive()) return false; + + if(a_burstPopCounter > 0) { + if(burstLogEnabled()) writeBurstLogFile("x"); + + a_burstPopCounter--; + return false; + } + + if(a_burstDeliveryIt == a_burstMessages.end()) { + a_burstDeliveryIt = a_burstMessages.begin(); + + if(!anyway) { + if(a_burstRepeat) { + a_burstCycle++; + + if(burstLogEnabled()) writeBurstLogFile(anna::functions::asString("\nCompleted burst cycle. Starting again (repeat mode) on cycle %d.\n", a_burstCycle)); + } else { + if(burstLogEnabled()) writeBurstLogFile("\nCompleted burst cycle. Burst finished (repeat mode disabled).\n"); + + stopBurst(); + return false; + } + } + } + + anna::diameter::comm::Message *msg = (*a_burstDeliveryIt).second; + int order = (*a_burstDeliveryIt).first + 1; + a_burstDeliveryIt++; + bool dot = true; + // sending + bool result = a_entity->send(msg); + + if(burstLogEnabled()) { + if(a_burstMessages.size() >= 100) + dot = (order % (a_burstMessages.size() / 100)); + + if(dot) { + writeBurstLogFile("."); + } else { + writeBurstLogFile(anna::functions::asString(" %d", order)); + int otaReqs = a_entity->getOTARequests(); + + if(result && (otaReqs != a_otaRequest)) { + // false if was a sending after an answer received (no OTA change in this case) + // true after push and pop operations + a_otaRequest = otaReqs; + writeBurstLogFile(anna::functions::asString("[OTA %d]", a_otaRequest)); + } + } + } + + // Detailed log: + if(logEnabled()) { + anna::diameter::comm::Server *usedServer = a_entity->getLastUsedResource(); + anna::diameter::comm::ClientSession *usedClientSession = usedServer ? usedServer->getLastUsedResource() : NULL; + std::string detail = usedClientSession ? usedClientSession->asString() : ""; // esto no deberia ocurrir + writeLogFile(msg->getBody(), (result ? "sent2e" : "send2eError"), detail); // el del nodo de trabajo + } + + return result; +} + +std::string OriginHost::lookBurst(int order) const throw() { + + if (order == -1) order = a_burstDeliveryIt->first; + + std::string result = "No message found for order provided ("; + result += anna::functions::asString(order); + result += ")"; + std::map::const_iterator it = a_burstMessages.find(order - 1); + + if(it != a_burstMessages.end()) { + anna::diameter::codec::Message codecMsg; + try { codecMsg.decode((*it).second->getBody()); result = codecMsg.asXMLString(); } catch(anna::RuntimeException &ex) { ex.trace(); } + } + + return result; +} + +std::string OriginHost::gotoBurst(int order) throw() { + std::string result = "Position not found for order provided ("; + std::map::iterator it = a_burstMessages.find(order - 1); + + if(it != a_burstMessages.end()) { + a_burstDeliveryIt = it; + result = "Position updated for order provided ("; + } + + result += anna::functions::asString(order); + result += ")"; + return result; +} + +anna::xml::Node* OriginHost::asXML(anna::xml::Node* parent) const +throw() { + anna::xml::Node* result = parent->createChild("OriginHost"); + + result->createAttribute("originHost", getName()); + result->createAttribute("ApplicationId", a_applicationId); + result->createAttribute("originRealm", a_commEngine->getOriginRealmName()); + result->createAttribute("LogFile", a_logFile); + result->createAttribute("SplitLog", a_splitLog ? "yes" : "no"); + result->createAttribute("DetailedLog", a_detailedLog ? "yes" : "no"); + result->createAttribute("DumpLog", a_dumpLog ? "yes" : "no"); + result->createAttribute("BurstLogFile", a_burstLogFile); + result->createAttribute("RequestRetransmissions", a_requestRetransmissions); + + anna::xml::Node* commEngine = result->createChild("CommEngine"); + a_commEngine->asXML(commEngine); + + return result; +} + +std::string OriginHost::asXMLString() const throw() { + anna::xml::Node root("root"); + return anna::xml::Compiler().apply(asXML(&root)); +}