1 // ANNA - Anna is Not Nothingness Anymore //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
13 #include <anna/diameter.comm/Message.hpp>
14 #include <anna/diameter/stack/Dictionary.hpp>
15 #include <anna/diameter/codec/EngineManager.hpp>
16 #include <anna/core/core.hpp>
17 #include <anna/time/Date.hpp>
18 #include <anna/xml/Compiler.hpp>
21 #include <OriginHost.hpp>
22 #include <MyDiameterEngine.hpp>
33 OriginHost::OriginHost(const std::string &originHost, unsigned int applicationId, const anna::diameter::stack::Dictionary *baseProtocolDictionary) :
34 a_originHost(originHost), a_applicationId(applicationId) {
36 std::string commEngineName = a_originHost + "_DiameterCommEngine";
37 a_commEngine = new MyDiameterEngine(commEngineName.c_str(), baseProtocolDictionary);
38 a_commEngine->setAutoBind(false); // allow to create client-sessions without binding them, in order to set timeouts.
39 a_codecEngine = anna::diameter::codec::EngineManager::instantiate().getCodecEngine(applicationId);
44 a_detailedLog = false;
47 a_diameterServer = NULL;
50 a_allowedInactivityTime = (anna::Millisecond)90000;
51 a_tcpConnectDelay = (anna::Millisecond)200;
52 a_answersTimeout = (anna::Millisecond)10000;
53 a_ceaTimeout = (anna::Millisecond)10000;
54 a_watchdogPeriod = (anna::Millisecond)30000;
55 a_requestRetransmissions = 0;
59 a_burstRepeat = false;
60 a_burstActive = false;
62 a_burstDeliveryIt = a_burstMessages.begin();
64 a_burstPopCounter = 0;
68 void OriginHost::createEntity(const std::string &entityRepresentation, const anna::Millisecond &bindTimeout, const anna::Millisecond &applicationTimeout) throw(anna::RuntimeException) {
70 anna::socket_v servers = anna::functions::getSocketVectorFromString(entityRepresentation);
71 std::string entityDescription = "Launcher diameter entity for "; entityDescription += a_originHost;
72 a_entity = (MyDiameterEntity*)(a_commEngine->createEntity(servers, entityDescription));
73 a_entity->setClassCodeTimeout(anna::diameter::comm::ClassCode::Bind, bindTimeout);
74 a_entity->setClassCodeTimeout(anna::diameter::comm::ClassCode::ApplicationMessage, applicationTimeout);
77 void OriginHost::startDiameterServer(const std::string &serverRepresentation, int sessions, const anna::Millisecond &inactivityTimeout, const anna::Millisecond &applicationTimeout, const std::string &ceaPathfile) throw(anna::RuntimeException) {
79 //if(sessions <= 0) return; negative implies no limit for accepted connections
81 std::string address; int port;
82 anna::functions::getAddressAndPortFromSocketLiteral(serverRepresentation, address, port);
83 std::string serverDescription = "Launcher diameter local server for "; serverDescription += a_originHost;
84 a_commEngine->setCEA(ceaPathfile);
85 a_diameterServer = (MyLocalServer*)(a_commEngine->createLocalServer(address, port, sessions));
86 // we could set sessions = 0, and after application run(), use setMaxConnections(real sessions)
87 // over the local server in order to start it.
89 a_diameterServer->setDescription(serverDescription);
90 a_diameterServer->setAllowedInactivityTime(inactivityTimeout);
91 a_diameterServer->setClassCodeTimeout(anna::diameter::comm::ClassCode::ApplicationMessage, applicationTimeout);
94 anna::diameter::comm::Message *OriginHost::createCommMessage() throw(anna::RuntimeException) {
95 anna::diameter::comm::Message *result = a_commMessages.create();
96 result->setRetries(a_requestRetransmissions);
97 if (a_requestRetransmissions > 0) result->setOnExpiry(anna::diameter::comm::Message::OnExpiry::Retransmit);
102 void OriginHost::releaseCommMessage(anna::diameter::comm::Message *msg) throw() {
103 a_commMessages.release(msg);
107 void OriginHost::writeLogFile(const anna::DataBlock & db, const std::string &logExtension, const std::string &detail) const throw() {
108 anna::diameter::codec::Message codecMsg;
109 try { codecMsg.decode(db); } catch(anna::RuntimeException &ex) { ex.trace(); }
110 writeLogFile(codecMsg, logExtension, detail);
114 void OriginHost::writeLogFile(const anna::diameter::codec::Message &decodedMessage, const std::string &logExtension, const std::string &detail) const throw() {
116 std::string targetFile = a_logFile;
120 targetFile += logExtension;
123 std::ofstream out(targetFile.c_str(), std::ifstream::out | std::ifstream::app);
125 std::string title = "[";
126 title += logExtension;
128 // Build complete log:
129 std::string log = "\n";
130 std::string xml = decodedMessage.asXMLString();
134 anna::time::Date now;
137 title += now.asString();
138 log += anna::functions::highlight(title, anna::functions::TextHighlightMode::OverAndUnderline);
141 log += anna::functions::highlight("Used resource");
152 // <unix ms timestamp>.<originHost>.<hop by hop>.<end to end>.<message code>.<request|answer>.<type of event>.xml
153 std::string name = anna::functions::asString((anna::Millisecond)anna::functions::millisecond());
155 name += getMyDiameterEngine()->getOriginHost();
157 name += anna::functions::asString(decodedMessage.getHopByHop());
159 name += anna::functions::asString(decodedMessage.getEndToEnd());
161 name += anna::functions::asString(decodedMessage.getId().first);
163 name += ((decodedMessage.getId().second) ? "request.":"answer.");
164 name += logExtension;
166 std::ofstream outMsg(name.c_str(), std::ifstream::out | std::ifstream::app);
167 outMsg.write(xml.c_str(), xml.size());
172 out.write(log.c_str(), log.size());
176 void OriginHost::writeBurstLogFile(const std::string &buffer) throw() {
177 std::ofstream out(a_burstLogFile.c_str(), std::ifstream::out | std::ifstream::app);
178 out.write(buffer.c_str(), buffer.size());
179 out.close(); // close() will be called when the object is destructed (i.e., when it goes out of scope).
180 // you'd call close() only if you indeed for some reason wanted to close the filestream
181 // earlier than it goes out of scope.
184 int OriginHost::clearBurst() throw() {
185 int size = a_burstMessages.size();
188 std::map<int, anna::diameter::comm::Message*>::const_iterator it;
189 std::map<int, anna::diameter::comm::Message*>::const_iterator it_min(a_burstMessages.begin());
190 std::map<int, anna::diameter::comm::Message*>::const_iterator it_max(a_burstMessages.end());
192 for(it = it_min; it != it_max; it++) releaseCommMessage((*it).second);
194 a_burstMessages.clear();
196 std::string msg = "Burst list already empty. Nothing done";
197 std::cout << msg << std::endl;
198 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
201 a_burstActive = false;
203 a_burstDeliveryIt = a_burstMessages.begin();
207 int OriginHost::loadBurstMessage(const anna::DataBlock & db) throw(anna::RuntimeException) {
208 anna::diameter::comm::Message *msg = createCommMessage();
210 a_burstMessages[a_burstLoadIndx++] = msg;
211 return (a_burstLoadIndx - 1);
214 int OriginHost::stopBurst() throw() {
216 std::string msg = "Burst launch is already stopped. Nothing done";
217 std::cout << msg << std::endl;
218 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
222 a_burstActive = false;
223 // Remaining on cycle:
224 return (a_burstMessages.size() - (*a_burstDeliveryIt).first);
227 int OriginHost::popBurst(int releaseAmount) throw() {
229 std::string msg = "Burst launch is stopped. Nothing done";
230 std::cout << msg << std::endl;
231 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
235 if(releaseAmount < 1) {
236 std::string msg = "No valid release amount is specified. Ignoring burst pop";
237 std::cout << msg << std::endl;
238 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
242 int currentOTArequests = a_entity->getOTARequests();
243 a_burstPopCounter = (releaseAmount > currentOTArequests) ? currentOTArequests : releaseAmount;
244 return a_burstPopCounter;
247 int OriginHost::pushBurst(int loadAmount) throw() {
248 if(a_burstMessages.size() == 0) {
249 std::string msg = "Burst data not found (empty list). Ignoring burst launch";
250 std::cout << msg << std::endl;
251 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
256 std::string msg = "No valid load amount is specified. Ignoring burst push";
257 std::cout << msg << std::endl;
258 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
262 a_burstActive = true;
265 for(count = 0; count < loadAmount; count++)
266 if(!sendBurstMessage()) break;
271 int OriginHost::sendBurst(int loadAmount) throw() {
272 if(a_burstMessages.size() == 0) {
273 std::string msg = "Burst data not found (empty list). Ignoring burst launch";
274 std::cout << msg << std::endl;
275 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
280 std::string msg = "No valid load amount is specified. Ignoring burst send";
281 std::cout << msg << std::endl;
282 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
288 for(count = 0; count < loadAmount; count++)
289 if(!sendBurstMessage(true /* anyway */)) break;
294 int OriginHost::startBurst(int initialLoad) throw() {
295 if(initialLoad < 1) {
296 std::string msg = "No initial load is specified. Ignoring burst start";
297 std::cout << msg << std::endl;
298 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
302 a_burstActive = true;
304 a_burstDeliveryIt = a_burstMessages.begin();
305 return (pushBurst(initialLoad));
308 bool OriginHost::sendBurstMessage(bool anyway) throw() {
309 if(!anyway && !burstActive()) return false;
311 if(a_burstPopCounter > 0) {
312 if(burstLogEnabled()) writeBurstLogFile("x");
318 if(a_burstDeliveryIt == a_burstMessages.end()) {
319 a_burstDeliveryIt = a_burstMessages.begin();
325 if(burstLogEnabled()) writeBurstLogFile(anna::functions::asString("\nCompleted burst cycle. Starting again (repeat mode) on cycle %d.\n", a_burstCycle));
327 if(burstLogEnabled()) writeBurstLogFile("\nCompleted burst cycle. Burst finished (repeat mode disabled).\n");
335 anna::diameter::comm::Message *msg = (*a_burstDeliveryIt).second;
336 int order = (*a_burstDeliveryIt).first + 1;
340 bool result = a_entity->send(msg, anna::CommandLine::instantiate().exists("balance"));
342 if(burstLogEnabled()) {
343 if(a_burstMessages.size() >= 100)
344 dot = (order % (a_burstMessages.size() / 100));
347 writeBurstLogFile(".");
349 writeBurstLogFile(anna::functions::asString(" %d", order));
350 int otaReqs = a_entity->getOTARequests();
352 if(result && (otaReqs != a_otaRequest)) {
353 // false if was a sending after an answer received (no OTA change in this case)
354 // true after push and pop operations
355 a_otaRequest = otaReqs;
356 writeBurstLogFile(anna::functions::asString("[OTA %d]", a_otaRequest));
363 anna::diameter::comm::Server *usedServer = a_entity->getLastUsedResource();
364 anna::diameter::comm::ClientSession *usedClientSession = usedServer ? usedServer->getLastUsedResource() : NULL;
365 std::string detail = usedClientSession ? usedClientSession->asString() : "<null client session>"; // esto no deberia ocurrir
366 writeLogFile(msg->getBody(), (result ? "sent2e" : "send2eError"), detail); // el del nodo de trabajo
372 std::string OriginHost::lookBurst(int order) const throw() {
374 if (order == -1) order = a_burstDeliveryIt->first;
376 std::string result = "No message found for order provided (";
377 result += anna::functions::asString(order);
379 std::map<int, anna::diameter::comm::Message*>::const_iterator it = a_burstMessages.find(order - 1);
381 if(it != a_burstMessages.end()) {
382 anna::diameter::codec::Message codecMsg;
383 try { codecMsg.decode((*it).second->getBody()); result = codecMsg.asXMLString(); } catch(anna::RuntimeException &ex) { ex.trace(); }
389 std::string OriginHost::gotoBurst(int order) throw() {
390 std::string result = "Position not found for order provided (";
391 std::map<int, anna::diameter::comm::Message*>::iterator it = a_burstMessages.find(order - 1);
393 if(it != a_burstMessages.end()) {
394 a_burstDeliveryIt = it;
395 result = "Position updated for order provided (";
398 result += anna::functions::asString(order);
403 anna::xml::Node* OriginHost::asXML(anna::xml::Node* parent) const
405 anna::xml::Node* result = parent->createChild("OriginHost");
407 result->createAttribute("originHost", a_originHost);
408 result->createAttribute("ApplicationId", a_applicationId);
409 result->createAttribute("originRealm", a_commEngine->getOriginRealm());
410 result->createAttribute("LogFile", a_logFile);
411 result->createAttribute("SplitLog", a_splitLog ? "yes" : "no");
412 result->createAttribute("DetailedLog", a_detailedLog ? "yes" : "no");
413 result->createAttribute("DumpLog", a_dumpLog ? "yes" : "no");
414 result->createAttribute("BurstLogFile", a_burstLogFile);
415 result->createAttribute("RequestRetransmissions", a_requestRetransmissions);
417 anna::xml::Node* commEngine = result->createChild("CommEngine");
418 a_commEngine->asXML(commEngine);
423 std::string OriginHost::asXMLString() const throw() {
424 anna::xml::Node root("root");
425 return anna::xml::Compiler().apply(asXML(&root));