1 // ANNA - Anna is Not Nothingness Anymore //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
13 #include <anna/diameter.comm/OriginHost.hpp>
15 #include <anna/diameter.comm/Engine.hpp>
16 #include <anna/diameter.comm/Message.hpp>
17 #include <anna/diameter.comm/Entity.hpp>
18 #include <anna/diameter.comm/LocalServer.hpp>
19 #include <anna/diameter/codec/EngineManager.hpp>
20 #include <anna/core/core.hpp>
21 #include <anna/time/Date.hpp>
22 #include <anna/xml/Compiler.hpp>
24 using namespace anna::diameter::comm;
27 OriginHost::OriginHost(anna::diameter::comm::Engine* commEngine, unsigned int applicationId) :
28 a_commEngine(commEngine), a_applicationId(applicationId) {
30 a_codecEngine = anna::diameter::codec::EngineManager::instantiate().getCodecEngine(applicationId); // i know, this is going to exist (getCodecEngine is not null)
35 a_detailedLog = false;
38 a_diameterServer = NULL;
41 a_requestRetransmissions = 0;
45 a_burstRepeat = false;
46 a_burstActive = false;
48 a_burstDeliveryIt = a_burstMessages.begin();
50 a_burstPopCounter = 0;
53 const std::string &OriginHost::getName() const {
54 return a_commEngine->getOriginHostName();
57 void OriginHost::createEntity(const std::string &entityRepresentation, const anna::Millisecond &bindTimeout, const anna::Millisecond &applicationTimeout) noexcept(false) {
59 anna::socket_v servers = anna::functions::getSocketVectorFromString(entityRepresentation);
60 std::string entityDescription = "Launcher diameter entity for "; entityDescription += getName();
61 a_entity = (anna::diameter::comm::Entity*)a_commEngine->createEntity(servers, entityDescription);
62 a_entity->setClassCodeTimeout(anna::diameter::comm::ClassCode::Bind, bindTimeout);
63 a_entity->setClassCodeTimeout(anna::diameter::comm::ClassCode::ApplicationMessage, applicationTimeout);
66 void OriginHost::createDiameterServer(const std::string &serverRepresentation, int sessions, const anna::Millisecond &inactivityTimeout, const anna::Millisecond &applicationTimeout, const std::string &ceaPathfile) noexcept(false) {
68 //if(sessions <= 0) return; negative implies no limit for accepted connections
70 std::string address; int port;
71 anna::functions::getAddressAndPortFromSocketLiteral(serverRepresentation, address, port);
72 std::string serverDescription = "Launcher diameter local server for "; serverDescription += getName();
73 a_commEngine->setCEA(ceaPathfile);
75 a_diameterServer = (anna::diameter::comm::LocalServer*)(a_commEngine->createLocalServer(address, port, sessions));
76 // we could set sessions = 0, and after application run(), use setMaxConnections(real sessions)
77 // over the local server in order to start it.
79 a_diameterServer->setDescription(serverDescription);
80 a_diameterServer->setAllowedInactivityTime(inactivityTimeout);
81 a_diameterServer->setClassCodeTimeout(anna::diameter::comm::ClassCode::ApplicationMessage, applicationTimeout);
84 anna::diameter::comm::Message *OriginHost::createCommMessage() noexcept(false) {
85 anna::diameter::comm::Message *result = a_commMessages.create();
86 result->setRetries(a_requestRetransmissions);
87 if (a_requestRetransmissions > 0) result->setOnExpiry(anna::diameter::comm::Message::OnExpiry::Retransmit);
92 void OriginHost::releaseCommMessage(anna::diameter::comm::Message *msg) {
93 a_commMessages.release(msg);
97 void OriginHost::writeLogFile(const anna::DataBlock & db, const std::string &logExtension, const std::string &detail) const {
98 anna::diameter::codec::Message codecMsg;
99 try { codecMsg.decode(db); } catch(anna::RuntimeException &ex) { ex.trace(); }
100 writeLogFile(codecMsg, logExtension, detail);
104 void OriginHost::writeLogFile(const anna::diameter::codec::Message &decodedMessage, const std::string &logExtension, const std::string &detail) const {
106 std::string targetFile = a_logFile;
110 targetFile += logExtension;
113 std::ofstream out(targetFile.c_str(), std::ifstream::out | std::ifstream::app);
115 std::string title = "[";
116 title += logExtension;
118 // Build complete log:
119 std::string log = "\n";
120 std::string xml = decodedMessage.asXMLString();
124 anna::time::Date now;
127 title += now.asString();
128 log += anna::functions::highlight(title, anna::functions::TextHighlightMode::OverAndUnderline);
131 log += anna::functions::highlight("Used resource");
142 // <unix ms timestamp>.<originHost>.<hop by hop>.<end to end>.<message code>.<request|answer>.<type of event>.xml
143 std::string name = anna::functions::asString((anna::Millisecond)anna::functions::millisecond());
145 name += getCommEngine()->getOriginHostName();
147 name += anna::functions::asString(decodedMessage.getHopByHop());
149 name += anna::functions::asString(decodedMessage.getEndToEnd());
151 name += anna::functions::asString(decodedMessage.getId().first);
153 name += ((decodedMessage.getId().second) ? "request.":"answer.");
154 name += logExtension;
156 std::ofstream outMsg(name.c_str(), std::ifstream::out | std::ifstream::app);
157 outMsg.write(xml.c_str(), xml.size());
162 out.write(log.c_str(), log.size());
166 void OriginHost::writeBurstLogFile(const std::string &buffer) {
167 std::ofstream out(a_burstLogFile.c_str(), std::ifstream::out | std::ifstream::app);
168 out.write(buffer.c_str(), buffer.size());
169 out.close(); // close() will be called when the object is destructed (i.e., when it goes out of scope).
170 // you'd call close() only if you indeed for some reason wanted to close the filestream
171 // earlier than it goes out of scope.
174 int OriginHost::clearBurst() {
175 int size = a_burstMessages.size();
178 std::map<int, anna::diameter::comm::Message*>::const_iterator it;
179 std::map<int, anna::diameter::comm::Message*>::const_iterator it_min(a_burstMessages.begin());
180 std::map<int, anna::diameter::comm::Message*>::const_iterator it_max(a_burstMessages.end());
182 for(it = it_min; it != it_max; it++) releaseCommMessage((*it).second);
184 a_burstMessages.clear();
186 std::string msg = "Burst list already empty. Nothing done";
187 std::cout << msg << std::endl;
188 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
191 a_burstActive = false;
193 a_burstDeliveryIt = a_burstMessages.begin();
197 int OriginHost::loadBurstMessage(const anna::DataBlock & db) noexcept(false) {
198 anna::diameter::comm::Message *msg = createCommMessage();
200 a_burstMessages[a_burstLoadIndx++] = msg;
201 return (a_burstLoadIndx - 1);
204 int OriginHost::stopBurst() {
206 std::string msg = "Burst launch is already stopped. Nothing done";
207 std::cout << msg << std::endl;
208 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
212 a_burstActive = false;
213 // Remaining on cycle:
214 return (a_burstMessages.size() - (*a_burstDeliveryIt).first);
217 int OriginHost::popBurst(int releaseAmount) {
219 std::string msg = "Burst launch is stopped. Nothing done";
220 std::cout << msg << std::endl;
221 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
225 if(releaseAmount < 1) {
226 std::string msg = "No valid release amount is specified. Ignoring burst pop";
227 std::cout << msg << std::endl;
228 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
232 int currentOTArequests = a_entity->getOTARequests();
233 a_burstPopCounter = (releaseAmount > currentOTArequests) ? currentOTArequests : releaseAmount;
234 return a_burstPopCounter;
237 int OriginHost::pushBurst(int loadAmount) {
238 if(a_burstMessages.size() == 0) {
239 std::string msg = "Burst data not found (empty list). Ignoring burst launch";
240 std::cout << msg << std::endl;
241 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
246 std::string msg = "No valid load amount is specified. Ignoring burst push";
247 std::cout << msg << std::endl;
248 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
252 a_burstActive = true;
255 for(count = 0; count < loadAmount; count++)
256 if(!sendBurstMessage()) break;
261 int OriginHost::sendBurst(int loadAmount) {
262 if(a_burstMessages.size() == 0) {
263 std::string msg = "Burst data not found (empty list). Ignoring burst launch";
264 std::cout << msg << std::endl;
265 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
270 std::string msg = "No valid load amount is specified. Ignoring burst send";
271 std::cout << msg << std::endl;
272 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
278 for(count = 0; count < loadAmount; count++)
279 if(!sendBurstMessage(true /* anyway */)) break;
284 int OriginHost::startBurst(int initialLoad) {
285 if(initialLoad < 1) {
286 std::string msg = "No initial load is specified. Ignoring burst start";
287 std::cout << msg << std::endl;
288 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
292 a_burstActive = true;
294 a_burstDeliveryIt = a_burstMessages.begin();
295 return (pushBurst(initialLoad));
298 bool OriginHost::sendBurstMessage(bool anyway) {
299 if(!anyway && !burstActive()) return false;
301 if(a_burstPopCounter > 0) {
302 if(burstLogEnabled()) writeBurstLogFile("x");
308 if(a_burstDeliveryIt == a_burstMessages.end()) {
309 a_burstDeliveryIt = a_burstMessages.begin();
315 if(burstLogEnabled()) writeBurstLogFile(anna::functions::asString("\nCompleted burst cycle. Starting again (repeat mode) on cycle %d.\n", a_burstCycle));
317 if(burstLogEnabled()) writeBurstLogFile("\nCompleted burst cycle. Burst finished (repeat mode disabled).\n");
325 anna::diameter::comm::Message *msg = (*a_burstDeliveryIt).second;
326 int order = (*a_burstDeliveryIt).first + 1;
330 bool result = a_entity->send(msg);
332 if(burstLogEnabled()) {
333 if(a_burstMessages.size() >= 100)
334 dot = (order % (a_burstMessages.size() / 100));
337 writeBurstLogFile(".");
339 writeBurstLogFile(anna::functions::asString(" %d", order));
340 int otaReqs = a_entity->getOTARequests();
342 if(result && (otaReqs != a_otaRequest)) {
343 // false if was a sending after an answer received (no OTA change in this case)
344 // true after push and pop operations
345 a_otaRequest = otaReqs;
346 writeBurstLogFile(anna::functions::asString("[OTA %d]", a_otaRequest));
353 anna::diameter::comm::Server *usedServer = a_entity->getLastUsedResource();
354 anna::diameter::comm::ClientSession *usedClientSession = usedServer ? usedServer->getLastUsedResource() : NULL;
355 std::string detail = usedClientSession ? usedClientSession->asString() : "[null client session]"; // esto no deberia ocurrir
356 writeLogFile(msg->getBody(), (result ? "sent2e" : "send2eError"), detail); // el del nodo de trabajo
362 std::string OriginHost::lookBurst(int order) const {
364 if (order == -1) order = a_burstDeliveryIt->first;
366 std::string result = "No message found for order provided (";
367 result += anna::functions::asString(order);
369 std::map<int, anna::diameter::comm::Message*>::const_iterator it = a_burstMessages.find(order - 1);
371 if(it != a_burstMessages.end()) {
372 anna::diameter::codec::Message codecMsg;
373 try { codecMsg.decode((*it).second->getBody()); result = codecMsg.asXMLString(); } catch(anna::RuntimeException &ex) { ex.trace(); }
379 std::string OriginHost::gotoBurst(int order) {
380 std::string result = "Position not found for order provided (";
381 std::map<int, anna::diameter::comm::Message*>::iterator it = a_burstMessages.find(order - 1);
383 if(it != a_burstMessages.end()) {
384 a_burstDeliveryIt = it;
385 result = "Position updated for order provided (";
388 result += anna::functions::asString(order);
393 anna::xml::Node* OriginHost::asXML(anna::xml::Node* parent) const
395 anna::xml::Node* result = parent->createChild("OriginHost");
397 result->createAttribute("originHost", getName());
398 result->createAttribute("ApplicationId", a_applicationId);
399 result->createAttribute("originRealm", a_commEngine->getOriginRealmName());
400 result->createAttribute("LogFile", a_logFile);
401 result->createAttribute("SplitLog", a_splitLog ? "yes" : "no");
402 result->createAttribute("DetailedLog", a_detailedLog ? "yes" : "no");
403 result->createAttribute("DumpLog", a_dumpLog ? "yes" : "no");
404 result->createAttribute("BurstLogFile", a_burstLogFile);
405 result->createAttribute("RequestRetransmissions", a_requestRetransmissions);
407 anna::xml::Node* commEngine = result->createChild("CommEngine");
408 a_commEngine->asXML(commEngine);
413 std::string OriginHost::asXMLString() const {
414 anna::xml::Node root("root");
415 return anna::xml::Compiler().apply(asXML(&root));