Testing library separation: now not in launcher but isolated
[anna.git] / source / diameter.comm / OriginHost.cpp
diff --git a/source/diameter.comm/OriginHost.cpp b/source/diameter.comm/OriginHost.cpp
new file mode 100644 (file)
index 0000000..ed181bd
--- /dev/null
@@ -0,0 +1,415 @@
+// ANNA - Anna is Not Nothingness Anymore                                                         //
+//                                                                                                //
+// (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo                         //
+//                                                                                                //
+// See project site at http://redmine.teslayout.com/projects/anna-suite                           //
+// See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
+
+
+// Standard
+#include <string>
+
+// Project
+#include <anna/diameter.comm/OriginHost.hpp>
+
+#include <anna/diameter.comm/Engine.hpp>
+#include <anna/diameter.comm/Message.hpp>
+#include <anna/diameter.comm/Entity.hpp>
+#include <anna/diameter.comm/LocalServer.hpp>
+#include <anna/diameter/codec/EngineManager.hpp>
+#include <anna/core/core.hpp>
+#include <anna/time/Date.hpp>
+#include <anna/xml/Compiler.hpp>
+
+using namespace anna::diameter::comm;
+
+
+OriginHost::OriginHost(anna::diameter::comm::Engine* commEngine, unsigned int applicationId) :
+                       a_commEngine(commEngine), a_applicationId(applicationId) {
+
+  a_codecEngine = anna::diameter::codec::EngineManager::instantiate().getCodecEngine(applicationId); // i know, this is going to exist (getCodecEngine is not null)
+
+  a_logFile = "";
+  a_burstLogFile = "";
+  a_splitLog = false;
+  a_detailedLog = false;
+  a_dumpLog = false;
+  a_entity = NULL;
+  a_diameterServer = NULL;
+
+  // Comm resources:
+  a_requestRetransmissions = 0;
+
+  // Burst
+  a_burstCycle = 1;
+  a_burstRepeat = false;
+  a_burstActive = false;
+  a_burstLoadIndx = 0;
+  a_burstDeliveryIt = a_burstMessages.begin();
+  a_otaRequest = 0;
+  a_burstPopCounter = 0;
+}
+
+const std::string &OriginHost::getName() const throw() {
+  return a_commEngine->getOriginHostName();
+}
+
+void OriginHost::createEntity(const std::string &entityRepresentation, const anna::Millisecond &bindTimeout, const anna::Millisecond &applicationTimeout) throw(anna::RuntimeException) {
+
+  anna::socket_v servers = anna::functions::getSocketVectorFromString(entityRepresentation);
+  std::string entityDescription = "Launcher diameter entity for "; entityDescription += getName();
+  a_entity = (anna::diameter::comm::Entity*)a_commEngine->createEntity(servers, entityDescription);
+  a_entity->setClassCodeTimeout(anna::diameter::comm::ClassCode::Bind, bindTimeout);
+  a_entity->setClassCodeTimeout(anna::diameter::comm::ClassCode::ApplicationMessage, applicationTimeout);
+}
+
+void OriginHost::createDiameterServer(const std::string &serverRepresentation, int sessions, const anna::Millisecond &inactivityTimeout, const anna::Millisecond &applicationTimeout, const std::string &ceaPathfile) throw(anna::RuntimeException) {
+
+  //if(sessions <= 0) return; negative implies no limit for accepted connections
+
+  std::string address; int port;
+  anna::functions::getAddressAndPortFromSocketLiteral(serverRepresentation, address, port);
+  std::string serverDescription = "Launcher diameter local server for "; serverDescription += getName();
+  a_commEngine->setCEA(ceaPathfile);
+  a_diameterServer = (anna::diameter::comm::LocalServer*)(a_commEngine->createLocalServer(address, port, sessions));
+          // we could set sessions = 0, and after application run(), use setMaxConnections(real sessions)
+          // over the local server in order to start it.
+
+  a_diameterServer->setDescription(serverDescription);
+  a_diameterServer->setAllowedInactivityTime(inactivityTimeout);
+  a_diameterServer->setClassCodeTimeout(anna::diameter::comm::ClassCode::ApplicationMessage, applicationTimeout);
+}
+
+anna::diameter::comm::Message *OriginHost::createCommMessage() throw(anna::RuntimeException) {
+  anna::diameter::comm::Message *result = a_commMessages.create();
+  result->setRetries(a_requestRetransmissions);
+  if (a_requestRetransmissions > 0) result->setOnExpiry(anna::diameter::comm::Message::OnExpiry::Retransmit);
+  return result;
+}
+
+
+void OriginHost::releaseCommMessage(anna::diameter::comm::Message *msg) throw() {
+  a_commMessages.release(msg);
+}
+
+
+void OriginHost::writeLogFile(const anna::DataBlock & db, const std::string &logExtension, const std::string &detail) const throw() {
+  anna::diameter::codec::Message codecMsg;
+  try { codecMsg.decode(db); } catch(anna::RuntimeException &ex) { ex.trace(); }
+  writeLogFile(codecMsg, logExtension, detail);
+}
+
+// Already decoded:
+void OriginHost::writeLogFile(const anna::diameter::codec::Message &decodedMessage, const std::string &logExtension, const std::string &detail) const throw() {
+  // Open target file:
+  std::string targetFile = a_logFile;
+
+  if(a_splitLog) {
+    targetFile += ".";
+    targetFile += logExtension;
+  }
+
+  std::ofstream out(targetFile.c_str(), std::ifstream::out | std::ifstream::app);
+  // Set text to dump:
+  std::string title = "[";
+  title += logExtension;
+  title += "]";
+  // Build complete log:
+  std::string log = "\n";
+  std::string xml = decodedMessage.asXMLString();
+
+
+  if(a_detailedLog) {
+    anna::time::Date now;
+    now.setNow();
+    title += " ";
+    title += now.asString();
+    log += anna::functions::highlight(title, anna::functions::TextHighlightMode::OverAndUnderline);
+    log += xml;
+    log += "\n";
+    log += anna::functions::highlight("Used resource");
+    log += detail;
+    log += "\n";
+  } else {
+    log += title;
+    log += "\n";
+    log += xml;
+    log += "\n";
+  }
+
+  if(a_dumpLog) {
+    // <unix ms timestamp>.<originHost>.<hop by hop>.<end to end>.<message code>.<request|answer>.<type of event>.xml
+    std::string name = anna::functions::asString((anna::Millisecond)anna::functions::millisecond());
+    name += ".";
+    name += getCommEngine()->getOriginHostName();
+    name += ".";
+    name += anna::functions::asString(decodedMessage.getHopByHop());
+    name += ".";
+    name += anna::functions::asString(decodedMessage.getEndToEnd());
+    name += ".";
+    name += anna::functions::asString(decodedMessage.getId().first);
+    name += ".";
+    name += ((decodedMessage.getId().second) ? "request.":"answer.");
+    name += logExtension;
+    name += ".xml";
+    std::ofstream outMsg(name.c_str(), std::ifstream::out | std::ifstream::app);
+    outMsg.write(xml.c_str(), xml.size());
+    outMsg.close();
+  }
+
+  // Write and close
+  out.write(log.c_str(), log.size());
+  out.close();
+}
+
+void OriginHost::writeBurstLogFile(const std::string &buffer) throw() {
+  std::ofstream out(a_burstLogFile.c_str(), std::ifstream::out | std::ifstream::app);
+  out.write(buffer.c_str(), buffer.size());
+  out.close();    // close() will be called when the object is destructed (i.e., when it goes out of scope).
+  // you'd call close() only if you indeed for some reason wanted to close the filestream
+  // earlier than it goes out of scope.
+}
+
+int OriginHost::clearBurst() throw() {
+  int size = a_burstMessages.size();
+
+  if(size) {
+    std::map<int, anna::diameter::comm::Message*>::const_iterator it;
+    std::map<int, anna::diameter::comm::Message*>::const_iterator it_min(a_burstMessages.begin());
+    std::map<int, anna::diameter::comm::Message*>::const_iterator it_max(a_burstMessages.end());
+
+    for(it = it_min; it != it_max; it++) releaseCommMessage((*it).second);
+
+    a_burstMessages.clear();
+  } else {
+    std::string msg = "Burst list already empty. Nothing done";
+    std::cout << msg << std::endl;
+    LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
+  }
+
+  a_burstActive = false;
+  a_burstLoadIndx = 0;
+  a_burstDeliveryIt = a_burstMessages.begin();
+  return size;
+}
+
+int OriginHost::loadBurstMessage(const anna::DataBlock & db) throw(anna::RuntimeException) {
+  anna::diameter::comm::Message *msg = createCommMessage();
+  msg->setBody(db);
+  a_burstMessages[a_burstLoadIndx++] = msg;
+  return (a_burstLoadIndx - 1);
+}
+
+int OriginHost::stopBurst() throw() {
+  if(!a_burstActive) {
+    std::string msg = "Burst launch is already stopped. Nothing done";
+    std::cout << msg << std::endl;
+    LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
+    return -1;
+  }
+
+  a_burstActive = false;
+  // Remaining on cycle:
+  return (a_burstMessages.size() - (*a_burstDeliveryIt).first);
+}
+
+int OriginHost::popBurst(int releaseAmount) throw() {
+  if(!a_burstActive) {
+    std::string msg = "Burst launch is stopped. Nothing done";
+    std::cout << msg << std::endl;
+    LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
+    return -1;
+  }
+
+  if(releaseAmount < 1) {
+    std::string msg = "No valid release amount is specified. Ignoring burst pop";
+    std::cout << msg << std::endl;
+    LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
+    return -2;
+  }
+
+  int currentOTArequests = a_entity->getOTARequests();
+  a_burstPopCounter = (releaseAmount > currentOTArequests) ? currentOTArequests : releaseAmount;
+  return a_burstPopCounter;
+}
+
+int OriginHost::pushBurst(int loadAmount) throw() {
+  if(a_burstMessages.size() == 0) {
+    std::string msg = "Burst data not found (empty list). Ignoring burst launch";
+    std::cout << msg << std::endl;
+    LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
+    return -1;
+  }
+
+  if(loadAmount < 1) {
+    std::string msg = "No valid load amount is specified. Ignoring burst push";
+    std::cout << msg << std::endl;
+    LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
+    return -2;
+  }
+
+  a_burstActive = true;
+  int count;
+
+  for(count = 0; count < loadAmount; count++)
+    if(!sendBurstMessage()) break;
+
+  return count;
+}
+
+int OriginHost::sendBurst(int loadAmount) throw() {
+  if(a_burstMessages.size() == 0) {
+    std::string msg = "Burst data not found (empty list). Ignoring burst launch";
+    std::cout << msg << std::endl;
+    LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
+    return -1;
+  }
+
+  if(loadAmount < 1) {
+    std::string msg = "No valid load amount is specified. Ignoring burst send";
+    std::cout << msg << std::endl;
+    LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
+    return -2;
+  }
+
+  int count;
+
+  for(count = 0; count < loadAmount; count++)
+    if(!sendBurstMessage(true /* anyway */)) break;
+
+  return count;
+}
+
+int OriginHost::startBurst(int initialLoad) throw() {
+  if(initialLoad < 1) {
+    std::string msg = "No initial load is specified. Ignoring burst start";
+    std::cout << msg << std::endl;
+    LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
+    return -2;
+  }
+
+  a_burstActive = true;
+  a_burstCycle = 1;
+  a_burstDeliveryIt = a_burstMessages.begin();
+  return (pushBurst(initialLoad));
+}
+
+bool OriginHost::sendBurstMessage(bool anyway) throw() {
+  if(!anyway && !burstActive()) return false;
+
+  if(a_burstPopCounter > 0) {
+    if(burstLogEnabled()) writeBurstLogFile("x");
+
+    a_burstPopCounter--;
+    return false;
+  }
+
+  if(a_burstDeliveryIt == a_burstMessages.end()) {
+    a_burstDeliveryIt = a_burstMessages.begin();
+
+    if(!anyway) {
+      if(a_burstRepeat) {
+        a_burstCycle++;
+
+        if(burstLogEnabled()) writeBurstLogFile(anna::functions::asString("\nCompleted burst cycle. Starting again (repeat mode) on cycle %d.\n", a_burstCycle));
+      } else {
+        if(burstLogEnabled()) writeBurstLogFile("\nCompleted burst cycle. Burst finished (repeat mode disabled).\n");
+
+        stopBurst();
+        return false;
+      }
+    }
+  }
+
+  anna::diameter::comm::Message *msg = (*a_burstDeliveryIt).second;
+  int order = (*a_burstDeliveryIt).first + 1;
+  a_burstDeliveryIt++;
+  bool dot = true;
+  // sending
+  bool result = a_entity->send(msg);
+
+  if(burstLogEnabled()) {
+    if(a_burstMessages.size() >= 100)
+      dot = (order  % (a_burstMessages.size() / 100));
+
+    if(dot) {
+      writeBurstLogFile(".");
+    } else {
+      writeBurstLogFile(anna::functions::asString(" %d", order));
+      int otaReqs  = a_entity->getOTARequests();
+
+      if(result && (otaReqs != a_otaRequest)) {
+        // false if was a sending after an answer received (no OTA change in this case)
+        // true after push and pop operations
+        a_otaRequest = otaReqs;
+        writeBurstLogFile(anna::functions::asString("[OTA %d]", a_otaRequest));
+      }
+    }
+  }
+
+  // Detailed log:
+  if(logEnabled()) {
+    anna::diameter::comm::Server *usedServer = a_entity->getLastUsedResource();
+    anna::diameter::comm::ClientSession *usedClientSession = usedServer ? usedServer->getLastUsedResource() : NULL;
+    std::string detail = usedClientSession ? usedClientSession->asString() : "<null client session>"; // esto no deberia ocurrir
+    writeLogFile(msg->getBody(), (result ? "sent2e" : "send2eError"), detail); // el del nodo de trabajo
+  }
+
+  return result;
+}
+
+std::string OriginHost::lookBurst(int order) const throw() {
+
+  if (order == -1) order = a_burstDeliveryIt->first;
+
+  std::string result = "No message found for order provided (";
+  result += anna::functions::asString(order);
+  result += ")";
+  std::map<int, anna::diameter::comm::Message*>::const_iterator it = a_burstMessages.find(order - 1);
+
+  if(it != a_burstMessages.end()) {
+    anna::diameter::codec::Message codecMsg;
+    try { codecMsg.decode((*it).second->getBody()); result = codecMsg.asXMLString(); } catch(anna::RuntimeException &ex) { ex.trace(); }
+  }
+
+  return result;
+}
+
+std::string OriginHost::gotoBurst(int order) throw() {
+  std::string result = "Position not found for order provided (";
+  std::map<int, anna::diameter::comm::Message*>::iterator it = a_burstMessages.find(order - 1);
+
+  if(it != a_burstMessages.end()) {
+    a_burstDeliveryIt = it;
+    result = "Position updated for order provided (";
+  }
+
+  result += anna::functions::asString(order);
+  result += ")";
+  return result;
+}
+
+anna::xml::Node* OriginHost::asXML(anna::xml::Node* parent) const
+throw() {
+  anna::xml::Node* result = parent->createChild("OriginHost");
+
+  result->createAttribute("originHost", getName());
+  result->createAttribute("ApplicationId", a_applicationId);
+  result->createAttribute("originRealm", a_commEngine->getOriginRealmName());
+  result->createAttribute("LogFile", a_logFile);
+  result->createAttribute("SplitLog", a_splitLog ? "yes" : "no");
+  result->createAttribute("DetailedLog", a_detailedLog ? "yes" : "no");
+  result->createAttribute("DumpLog", a_dumpLog ? "yes" : "no");
+  result->createAttribute("BurstLogFile", a_burstLogFile);
+  result->createAttribute("RequestRetransmissions", a_requestRetransmissions);
+
+  anna::xml::Node* commEngine = result->createChild("CommEngine");
+  a_commEngine->asXML(commEngine);
+
+  return result;
+}
+
+std::string OriginHost::asXMLString() const throw() {
+  anna::xml::Node root("root");
+  return anna::xml::Compiler().apply(asXML(&root));
+}