1 // ANNA - Anna is Not Nothingness Anymore //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
13 #include <anna/diameter.comm/Message.hpp>
14 #include <anna/core/core.hpp>
15 #include <anna/time/Date.hpp>
16 #include <anna/xml/Compiler.hpp>
19 #include "RealmNode.hpp"
20 #include "MyDiameterEngine.hpp"
24 RealmNode::RealmNode(const std::string &originRealm, unsigned int applicationId, anna::diameter::codec::Engine *codecEngine) :
25 a_originRealm(originRealm), a_applicationId(applicationId), a_codecEngine(codecEngine) {
27 std::string commEngineName = a_originRealm + "_DiameterCommEngine";
28 a_commEngine = new MyDiameterEngine(commEngineName.c_str());
29 a_commEngine->setAutoBind(false); // allow to create client-sessions without binding them, in order to set timeouts.
30 a_commEngine->setBaseProtocolCodecEngine(getCodecEngine());
35 a_detailedLog = false;
38 a_diameterServer = NULL;
41 a_allowedInactivityTime = (anna::Millisecond)90000;
42 a_tcpConnectDelay = (anna::Millisecond)200;
43 a_answersTimeout = (anna::Millisecond)10000;
44 a_ceaTimeout = (anna::Millisecond)10000;
45 a_watchdogPeriod = (anna::Millisecond)30000;
46 a_requestRetransmissions = 0;
50 a_burstRepeat = false;
51 a_burstActive = false;
53 a_burstDeliveryIt = a_burstMessages.begin();
55 a_burstPopCounter = 0;
59 void RealmNode::createEntity(const std::string &entityRepresentation, const anna::Millisecond &bindTimeout, const anna::Millisecond &applicationTimeout) throw() {
61 anna::socket_v servers = anna::functions::getSocketVectorFromString(entityRepresentation);
62 std::string entityDescription = "Launcher diameter entity for "; entityDescription += a_originRealm;
63 a_entity = (MyDiameterEntity*)(a_commEngine->createEntity(servers, entityDescription));
64 a_entity->setClassCodeTimeout(anna::diameter::comm::ClassCode::Bind, bindTimeout);
65 a_entity->setClassCodeTimeout(anna::diameter::comm::ClassCode::ApplicationMessage, applicationTimeout);
67 // Codec engine for reacting answers (failed-avp):
68 a_entity->setCodecEngine(getCodecEngine());
71 void RealmNode::startDiameterServer(const std::string &serverRepresentation, int sessions, const anna::Millisecond &inactivityTimeout) throw(anna::RuntimeException) {
73 if(sessions <= 0) return;
75 std::string address; int port;
76 anna::functions::getAddressAndPortFromSocketLiteral(serverRepresentation, address, port);
77 std::string serverDescription = "Launcher diameter local server for "; serverDescription += a_originRealm;
78 a_diameterServer = (MyLocalServer*)(a_commEngine->createLocalServer(address, port, sessions));
79 // we could set sessions = 0, and after application run(), use setMaxConnections(real sessions)
80 // over the local server in order to start it.
82 a_diameterServer->setDescription(serverDescription);
83 a_diameterServer->setAllowedInactivityTime(inactivityTimeout);
85 // Codec engine for reacting answers (failed-avp):
86 a_diameterServer->setCodecEngine(getCodecEngine());
89 anna::diameter::comm::Message *RealmNode::createCommMessage() throw(anna::RuntimeException) {
90 anna::diameter::comm::Message *result = a_commMessages.create();
91 result->setRetries(a_requestRetransmissions);
92 if (a_requestRetransmissions > 0) result->setOnExpiry(anna::diameter::comm::Message::OnExpiry::Retransmit);
97 void RealmNode::releaseCommMessage(anna::diameter::comm::Message *msg) throw() {
98 a_commMessages.release(msg);
102 void RealmNode::writeLogFile(const anna::DataBlock & db, const std::string &logExtension, const std::string &detail) const throw() {
103 // if (!logEnabled()) return;
104 anna::diameter::codec::Message codecMsg(getCodecEngine());
105 try { codecMsg.decode(db); } catch(anna::RuntimeException &ex) { ex.trace(); }
106 writeLogFile(codecMsg, logExtension, detail);
110 // Si ya lo tengo decodificado:
111 void RealmNode::writeLogFile(const anna::diameter::codec::Message & decodedMessage, const std::string &logExtension, const std::string &detail) const throw() {
112 // if (!logEnabled()) return;
114 std::string targetFile = a_logFile;
118 targetFile += logExtension;
121 std::ofstream out(targetFile.c_str(), std::ifstream::out | std::ifstream::app);
123 std::string title = "[";
124 title += logExtension;
126 // Build complete log:
127 std::string log = "\n";
128 std::string xml = decodedMessage.asXMLString();
132 anna::time::Date now;
135 title += now.asString();
136 log += anna::functions::highlight(title, anna::functions::TextHighlightMode::OverAndUnderline);
139 log += anna::functions::highlight("Used resource");
150 std::string name = getMyDiameterEngine()->getRealm();
152 name += anna::functions::asString(decodedMessage.getHopByHop());
154 name += anna::functions::asString(decodedMessage.getEndToEnd());
156 name += anna::functions::asString(decodedMessage.getId().first);
158 name += ((decodedMessage.getId().second) ? "request.":"answer.");
159 name += logExtension;
161 std::ofstream outMsg(name.c_str(), std::ifstream::out | std::ifstream::app);
162 outMsg.write(xml.c_str(), xml.size());
167 out.write(log.c_str(), log.size());
171 void RealmNode::writeBurstLogFile(const std::string &buffer) throw() {
172 std::ofstream out(a_burstLogFile.c_str(), std::ifstream::out | std::ifstream::app);
173 out.write(buffer.c_str(), buffer.size());
174 out.close(); // close() will be called when the object is destructed (i.e., when it goes out of scope).
175 // you'd call close() only if you indeed for some reason wanted to close the filestream
176 // earlier than it goes out of scope.
179 int RealmNode::clearBurst() throw() {
180 int size = a_burstMessages.size();
183 std::map<int, anna::diameter::comm::Message*>::const_iterator it;
184 std::map<int, anna::diameter::comm::Message*>::const_iterator it_min(a_burstMessages.begin());
185 std::map<int, anna::diameter::comm::Message*>::const_iterator it_max(a_burstMessages.end());
187 for(it = it_min; it != it_max; it++) releaseCommMessage((*it).second);
189 a_burstMessages.clear();
191 std::string msg = "Burst list already empty. Nothing done";
192 std::cout << msg << std::endl;
193 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
196 a_burstActive = false;
198 a_burstDeliveryIt = a_burstMessages.begin();
202 int RealmNode::loadBurstMessage(const anna::DataBlock & db) throw(anna::RuntimeException) {
203 anna::diameter::comm::Message *msg = createCommMessage();
205 a_burstMessages[a_burstLoadIndx++] = msg;
206 return (a_burstLoadIndx - 1);
209 int RealmNode::stopBurst() throw() {
211 std::string msg = "Burst launch is already stopped. Nothing done";
212 std::cout << msg << std::endl;
213 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
217 a_burstActive = false;
218 // Remaining on cycle:
219 return (a_burstMessages.size() - (*a_burstDeliveryIt).first);
222 int RealmNode::popBurst(int releaseAmount) throw() {
224 std::string msg = "Burst launch is stopped. Nothing done";
225 std::cout << msg << std::endl;
226 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
230 if(releaseAmount < 1) {
231 std::string msg = "No valid release amount is specified. Ignoring burst pop";
232 std::cout << msg << std::endl;
233 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
237 int currentOTArequests = a_entity->getOTARequests();
238 a_burstPopCounter = (releaseAmount > currentOTArequests) ? currentOTArequests : releaseAmount;
239 return a_burstPopCounter;
242 int RealmNode::pushBurst(int loadAmount) throw() {
243 if(a_burstMessages.size() == 0) {
244 std::string msg = "Burst data not found (empty list). Ignoring burst launch";
245 std::cout << msg << std::endl;
246 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
251 std::string msg = "No valid load amount is specified. Ignoring burst push";
252 std::cout << msg << std::endl;
253 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
257 a_burstActive = true;
260 for(count = 0; count < loadAmount; count++)
261 if(!sendBurstMessage()) break;
266 int RealmNode::sendBurst(int loadAmount) throw() {
267 if(a_burstMessages.size() == 0) {
268 std::string msg = "Burst data not found (empty list). Ignoring burst launch";
269 std::cout << msg << std::endl;
270 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
275 std::string msg = "No valid load amount is specified. Ignoring burst send";
276 std::cout << msg << std::endl;
277 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
283 for(count = 0; count < loadAmount; count++)
284 if(!sendBurstMessage(true /* anyway */)) break;
289 int RealmNode::startBurst(int initialLoad) throw() {
290 if(initialLoad < 1) {
291 std::string msg = "No initial load is specified. Ignoring burst start";
292 std::cout << msg << std::endl;
293 LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
297 a_burstActive = true;
299 a_burstDeliveryIt = a_burstMessages.begin();
300 return (pushBurst(initialLoad));
303 bool RealmNode::sendBurstMessage(bool anyway) throw() {
304 if(!anyway && !burstActive()) return false;
306 if(a_burstPopCounter > 0) {
307 if(burstLogEnabled()) writeBurstLogFile("x");
313 if(a_burstDeliveryIt == a_burstMessages.end()) {
314 a_burstDeliveryIt = a_burstMessages.begin();
320 if(burstLogEnabled()) writeBurstLogFile(anna::functions::asString("\nCompleted burst cycle. Starting again (repeat mode) on cycle %d.\n", a_burstCycle));
322 if(burstLogEnabled()) writeBurstLogFile("\nCompleted burst cycle. Burst finished (repeat mode disabled).\n");
330 anna::diameter::comm::Message *msg = (*a_burstDeliveryIt).second;
331 int order = (*a_burstDeliveryIt).first + 1;
335 bool result = a_entity->send(msg, anna::CommandLine::instantiate().exists("balance"));
337 if(burstLogEnabled()) {
338 if(a_burstMessages.size() >= 100)
339 dot = (order % (a_burstMessages.size() / 100));
342 writeBurstLogFile(".");
344 writeBurstLogFile(anna::functions::asString(" %d", order));
345 int otaReqs = a_entity->getOTARequests();
347 if(result && (otaReqs != a_otaRequest)) {
348 // false if was a sending after an answer received (no OTA change in this case)
349 // true after push and pop operations
350 a_otaRequest = otaReqs;
351 writeBurstLogFile(anna::functions::asString("[OTA %d]", a_otaRequest));
358 anna::diameter::comm::Server *usedServer = a_entity->getLastUsedResource();
359 anna::diameter::comm::ClientSession *usedClientSession = usedServer ? usedServer->getLastUsedResource() : NULL;
360 std::string detail = usedClientSession ? usedClientSession->asString() : "<null client session>"; // esto no deberia ocurrir
361 writeLogFile(msg->getBody(), (result ? "sent2e" : "send2eError"), detail); // el del nodo de trabajo
367 std::string RealmNode::lookBurst(int order) const throw() {
368 std::string result = "No message found for order provided (";
369 result += anna::functions::asString(order);
371 std::map<int, anna::diameter::comm::Message*>::const_iterator it = a_burstMessages.find(order - 1);
373 if(it != a_burstMessages.end()) {
375 anna::diameter::codec::Message codecMsg(getCodecEngine()); // XXXXXXXXXXXXXXXX el del nodo de trabajo
376 try { codecMsg.decode((*it).second->getBody()); } catch(anna::RuntimeException &ex) { ex.trace(); }
377 result = codecMsg.asXMLString();
383 std::string RealmNode::gotoBurst(int order) throw() {
384 std::string result = "Position not found for order provided (";
385 std::map<int, anna::diameter::comm::Message*>::iterator it = a_burstMessages.find(order - 1);
387 if(it != a_burstMessages.end()) {
388 a_burstDeliveryIt = it;
389 result = "Position updated for order provided (";
392 result += anna::functions::asString(order);
397 anna::xml::Node* RealmNode::asXML(anna::xml::Node* parent) const
399 anna::xml::Node* result = parent->createChild("RealmNode");
401 result->createAttribute("OriginRealm", a_originRealm);
402 result->createAttribute("ApplicationId", a_applicationId);
403 result->createAttribute("LogFile", a_logFile);
404 result->createAttribute("SplitLog", a_splitLog ? "yes" : "no");
405 result->createAttribute("DetailedLog", a_detailedLog ? "yes" : "no");
406 result->createAttribute("DumpLog", a_dumpLog ? "yes" : "no");
407 result->createAttribute("BurstLogFile", a_burstLogFile);
408 result->createAttribute("RequestRetransmissions", a_requestRetransmissions);
410 anna::xml::Node* commEngine = result->createChild("CommEngine");
411 a_commEngine->asXML(commEngine);
416 std::string RealmNode::asXMLString() const throw() {
417 anna::xml::Node root("root");
418 return anna::xml::Compiler().apply(asXML(&root));