630ab8f0770ad7908d17bc139f47312a6ac172ba
[anna.git] / example / diameter / launcher / RealmNode.cpp
1 // ANNA - Anna is Not Nothingness Anymore                                                         //
2 //                                                                                                //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo                         //
4 //                                                                                                //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite                           //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
7
8
9 // Standard
10 #include <string>
11
12 // Project
13 #include <anna/diameter.comm/Message.hpp>
14 #include <anna/core/core.hpp>
15 #include <anna/time/Date.hpp>
16 #include <anna/xml/Compiler.hpp>
17
18 // Process
19 #include <RealmNode.hpp>
20 #include <MyDiameterEngine.hpp>
21
22
23
24 RealmNode::RealmNode(const std::string &originRealm, unsigned int applicationId, anna::diameter::codec::Engine *codecEngine) :
25   a_originRealm(originRealm), a_applicationId(applicationId), a_codecEngine(codecEngine) {
26
27   std::string commEngineName = a_originRealm + "_DiameterCommEngine";
28   a_commEngine = new MyDiameterEngine(commEngineName.c_str());
29   a_commEngine->setAutoBind(false);  // allow to create client-sessions without binding them, in order to set timeouts.
30   a_commEngine->setBaseProtocolCodecEngine(getCodecEngine());
31
32   a_logFile = "";
33   a_burstLogFile = "";
34   a_splitLog = false;
35   a_detailedLog = false;
36   a_dumpLog = false;
37   a_entity = NULL;
38   a_diameterServer = NULL;
39
40   // Comm resources:
41   a_allowedInactivityTime = (anna::Millisecond)90000;
42   a_tcpConnectDelay = (anna::Millisecond)200;
43   a_answersTimeout = (anna::Millisecond)10000;
44   a_ceaTimeout = (anna::Millisecond)10000;
45   a_watchdogPeriod = (anna::Millisecond)30000;
46   a_requestRetransmissions = 0;
47
48   // Burst
49   a_burstCycle = 1;
50   a_burstRepeat = false;
51   a_burstActive = false;
52   a_burstLoadIndx = 0;
53   a_burstDeliveryIt = a_burstMessages.begin();
54   a_otaRequest = 0;
55   a_burstPopCounter = 0;
56 }
57
58
59 void RealmNode::createEntity(const std::string &entityRepresentation, const anna::Millisecond &bindTimeout, const anna::Millisecond &applicationTimeout) throw() {
60
61   anna::socket_v servers = anna::functions::getSocketVectorFromString(entityRepresentation);
62   std::string entityDescription = "Launcher diameter entity for "; entityDescription += a_originRealm;
63   a_entity = (MyDiameterEntity*)(a_commEngine->createEntity(servers, entityDescription));
64   a_entity->setClassCodeTimeout(anna::diameter::comm::ClassCode::Bind, bindTimeout);
65   a_entity->setClassCodeTimeout(anna::diameter::comm::ClassCode::ApplicationMessage, applicationTimeout);
66
67   // Codec engine for reacting answers (failed-avp):
68   a_entity->setCodecEngine(getCodecEngine());
69 }
70
71 void RealmNode::startDiameterServer(const std::string &serverRepresentation, int sessions, const anna::Millisecond &inactivityTimeout) throw(anna::RuntimeException) {
72
73   if(sessions <= 0) return;
74
75   std::string address; int port;
76   anna::functions::getAddressAndPortFromSocketLiteral(serverRepresentation, address, port);
77   std::string serverDescription = "Launcher diameter local server for "; serverDescription += a_originRealm;
78   a_diameterServer = (MyLocalServer*)(a_commEngine->createLocalServer(address, port, sessions));
79           // we could set sessions = 0, and after application run(), use setMaxConnections(real sessions)
80           // over the local server in order to start it.
81
82   a_diameterServer->setDescription(serverDescription);
83   a_diameterServer->setAllowedInactivityTime(inactivityTimeout);
84
85   // Codec engine for reacting answers (failed-avp):
86   a_diameterServer->setCodecEngine(getCodecEngine());
87 }
88
89 anna::diameter::comm::Message *RealmNode::createCommMessage() throw(anna::RuntimeException) {
90   anna::diameter::comm::Message *result = a_commMessages.create();
91   result->setRetries(a_requestRetransmissions);
92   if (a_requestRetransmissions > 0) result->setOnExpiry(anna::diameter::comm::Message::OnExpiry::Retransmit);
93   return result;
94 }
95
96
97 void RealmNode::releaseCommMessage(anna::diameter::comm::Message *msg) throw() {
98   a_commMessages.release(msg);
99 }
100
101
102 void RealmNode::writeLogFile(const anna::DataBlock & db, const std::string &logExtension, const std::string &detail) const throw() {
103 //   if (!logEnabled()) return;
104   anna::diameter::codec::Message codecMsg(getCodecEngine());
105   try { codecMsg.decode(db); } catch(anna::RuntimeException &ex) { ex.trace(); }
106   writeLogFile(codecMsg, logExtension, detail);
107
108 }
109
110 // Si ya lo tengo decodificado:
111 void RealmNode::writeLogFile(const anna::diameter::codec::Message & decodedMessage, const std::string &logExtension, const std::string &detail) const throw() {
112 //   if (!logEnabled()) return;
113   // Open target file:
114   std::string targetFile = a_logFile;
115
116   if(a_splitLog) {
117     targetFile += ".";
118     targetFile += logExtension;
119   }
120
121   std::ofstream out(targetFile.c_str(), std::ifstream::out | std::ifstream::app);
122   // Set text to dump:
123   std::string title = "[";
124   title += logExtension;
125   title += "]";
126   // Build complete log:
127   std::string log = "\n";
128   std::string xml = decodedMessage.asXMLString();
129
130
131   if(a_detailedLog) {
132     anna::time::Date now;
133     now.setNow();
134     title += " ";
135     title += now.asString();
136     log += anna::functions::highlight(title, anna::functions::TextHighlightMode::OverAndUnderline);
137     log += xml;
138     log += "\n";
139     log += anna::functions::highlight("Used resource");
140     log += detail;
141     log += "\n";
142   } else {
143     log += title;
144     log += "\n";
145     log += xml;
146     log += "\n";
147   }
148
149   if(a_dumpLog) {
150     std::string name = getMyDiameterEngine()->getRealm();
151     name += ".";
152     name += anna::functions::asString(decodedMessage.getHopByHop());
153     name += ".";
154     name += anna::functions::asString(decodedMessage.getEndToEnd());
155     name += ".";
156     name += anna::functions::asString(decodedMessage.getId().first);
157     name += ".";
158     name += ((decodedMessage.getId().second) ? "request.":"answer.");
159     name += logExtension;
160     name += ".xml";
161     std::ofstream outMsg(name.c_str(), std::ifstream::out | std::ifstream::app);
162     outMsg.write(xml.c_str(), xml.size());
163     outMsg.close();
164   }
165
166   // Write and close
167   out.write(log.c_str(), log.size());
168   out.close();
169 }
170
171 void RealmNode::writeBurstLogFile(const std::string &buffer) throw() {
172   std::ofstream out(a_burstLogFile.c_str(), std::ifstream::out | std::ifstream::app);
173   out.write(buffer.c_str(), buffer.size());
174   out.close();    // close() will be called when the object is destructed (i.e., when it goes out of scope).
175   // you'd call close() only if you indeed for some reason wanted to close the filestream
176   // earlier than it goes out of scope.
177 }
178
179 int RealmNode::clearBurst() throw() {
180   int size = a_burstMessages.size();
181
182   if(size) {
183     std::map<int, anna::diameter::comm::Message*>::const_iterator it;
184     std::map<int, anna::diameter::comm::Message*>::const_iterator it_min(a_burstMessages.begin());
185     std::map<int, anna::diameter::comm::Message*>::const_iterator it_max(a_burstMessages.end());
186
187     for(it = it_min; it != it_max; it++) releaseCommMessage((*it).second);
188
189     a_burstMessages.clear();
190   } else {
191     std::string msg = "Burst list already empty. Nothing done";
192     std::cout << msg << std::endl;
193     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
194   }
195
196   a_burstActive = false;
197   a_burstLoadIndx = 0;
198   a_burstDeliveryIt = a_burstMessages.begin();
199   return size;
200 }
201
202 int RealmNode::loadBurstMessage(const anna::DataBlock & db) throw(anna::RuntimeException) {
203   anna::diameter::comm::Message *msg = createCommMessage();
204   msg->setBody(db);
205   a_burstMessages[a_burstLoadIndx++] = msg;
206   return (a_burstLoadIndx - 1);
207 }
208
209 int RealmNode::stopBurst() throw() {
210   if(!a_burstActive) {
211     std::string msg = "Burst launch is already stopped. Nothing done";
212     std::cout << msg << std::endl;
213     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
214     return -1;
215   }
216
217   a_burstActive = false;
218   // Remaining on cycle:
219   return (a_burstMessages.size() - (*a_burstDeliveryIt).first);
220 }
221
222 int RealmNode::popBurst(int releaseAmount) throw() {
223   if(!a_burstActive) {
224     std::string msg = "Burst launch is stopped. Nothing done";
225     std::cout << msg << std::endl;
226     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
227     return -1;
228   }
229
230   if(releaseAmount < 1) {
231     std::string msg = "No valid release amount is specified. Ignoring burst pop";
232     std::cout << msg << std::endl;
233     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
234     return -2;
235   }
236
237   int currentOTArequests = a_entity->getOTARequests();
238   a_burstPopCounter = (releaseAmount > currentOTArequests) ? currentOTArequests : releaseAmount;
239   return a_burstPopCounter;
240 }
241
242 int RealmNode::pushBurst(int loadAmount) throw() {
243   if(a_burstMessages.size() == 0) {
244     std::string msg = "Burst data not found (empty list). Ignoring burst launch";
245     std::cout << msg << std::endl;
246     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
247     return -1;
248   }
249
250   if(loadAmount < 1) {
251     std::string msg = "No valid load amount is specified. Ignoring burst push";
252     std::cout << msg << std::endl;
253     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
254     return -2;
255   }
256
257   a_burstActive = true;
258   int count;
259
260   for(count = 0; count < loadAmount; count++)
261     if(!sendBurstMessage()) break;
262
263   return count;
264 }
265
266 int RealmNode::sendBurst(int loadAmount) throw() {
267   if(a_burstMessages.size() == 0) {
268     std::string msg = "Burst data not found (empty list). Ignoring burst launch";
269     std::cout << msg << std::endl;
270     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
271     return -1;
272   }
273
274   if(loadAmount < 1) {
275     std::string msg = "No valid load amount is specified. Ignoring burst send";
276     std::cout << msg << std::endl;
277     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
278     return -2;
279   }
280
281   int count;
282
283   for(count = 0; count < loadAmount; count++)
284     if(!sendBurstMessage(true /* anyway */)) break;
285
286   return count;
287 }
288
289 int RealmNode::startBurst(int initialLoad) throw() {
290   if(initialLoad < 1) {
291     std::string msg = "No initial load is specified. Ignoring burst start";
292     std::cout << msg << std::endl;
293     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
294     return -2;
295   }
296
297   a_burstActive = true;
298   a_burstCycle = 1;
299   a_burstDeliveryIt = a_burstMessages.begin();
300   return (pushBurst(initialLoad));
301 }
302
303 bool RealmNode::sendBurstMessage(bool anyway) throw() {
304   if(!anyway && !burstActive()) return false;
305
306   if(a_burstPopCounter > 0) {
307     if(burstLogEnabled()) writeBurstLogFile("x");
308
309     a_burstPopCounter--;
310     return false;
311   }
312
313   if(a_burstDeliveryIt == a_burstMessages.end()) {
314     a_burstDeliveryIt = a_burstMessages.begin();
315
316     if(!anyway) {
317       if(a_burstRepeat) {
318         a_burstCycle++;
319
320         if(burstLogEnabled()) writeBurstLogFile(anna::functions::asString("\nCompleted burst cycle. Starting again (repeat mode) on cycle %d.\n", a_burstCycle));
321       } else {
322         if(burstLogEnabled()) writeBurstLogFile("\nCompleted burst cycle. Burst finished (repeat mode disabled).\n");
323
324         stopBurst();
325         return false;
326       }
327     }
328   }
329
330   anna::diameter::comm::Message *msg = (*a_burstDeliveryIt).second;
331   int order = (*a_burstDeliveryIt).first + 1;
332   a_burstDeliveryIt++;
333   bool dot = true;
334   // sending
335   bool result = a_entity->send(msg, anna::CommandLine::instantiate().exists("balance"));
336
337   if(burstLogEnabled()) {
338     if(a_burstMessages.size() >= 100)
339       dot = (order  % (a_burstMessages.size() / 100));
340
341     if(dot) {
342       writeBurstLogFile(".");
343     } else {
344       writeBurstLogFile(anna::functions::asString(" %d", order));
345       int otaReqs  = a_entity->getOTARequests();
346
347       if(result && (otaReqs != a_otaRequest)) {
348         // false if was a sending after an answer received (no OTA change in this case)
349         // true after push and pop operations
350         a_otaRequest = otaReqs;
351         writeBurstLogFile(anna::functions::asString("[OTA %d]", a_otaRequest));
352       }
353     }
354   }
355
356   // Detailed log:
357   if(logEnabled()) {
358     anna::diameter::comm::Server *usedServer = a_entity->getLastUsedResource();
359     anna::diameter::comm::ClientSession *usedClientSession = usedServer ? usedServer->getLastUsedResource() : NULL;
360     std::string detail = usedClientSession ? usedClientSession->asString() : "<null client session>"; // esto no deberia ocurrir
361     writeLogFile(msg->getBody(), (result ? "sent2e" : "send2eError"), detail); // el del nodo de trabajo
362   }
363
364   return result;
365 }
366
367 std::string RealmNode::lookBurst(int order) const throw() {
368
369   if (order == -1) order = a_burstDeliveryIt->first;
370
371   std::string result = "No message found for order provided (";
372   result += anna::functions::asString(order);
373   result += ")";
374   std::map<int, anna::diameter::comm::Message*>::const_iterator it = a_burstMessages.find(order - 1);
375
376   if(it != a_burstMessages.end()) {
377     // Decode
378     anna::diameter::codec::Message codecMsg(getCodecEngine());
379     try { codecMsg.decode((*it).second->getBody()); } catch(anna::RuntimeException &ex) { ex.trace(); }
380     result = codecMsg.asXMLString();
381   }
382
383   return result;
384 }
385
386 std::string RealmNode::gotoBurst(int order) throw() {
387   std::string result = "Position not found for order provided (";
388   std::map<int, anna::diameter::comm::Message*>::iterator it = a_burstMessages.find(order - 1);
389
390   if(it != a_burstMessages.end()) {
391     a_burstDeliveryIt = it;
392     result = "Position updated for order provided (";
393   }
394
395   result += anna::functions::asString(order);
396   result += ")";
397   return result;
398 }
399
400 anna::xml::Node* RealmNode::asXML(anna::xml::Node* parent) const
401 throw() {
402   anna::xml::Node* result = parent->createChild("RealmNode");
403
404   result->createAttribute("OriginRealm", a_originRealm);
405   result->createAttribute("ApplicationId", a_applicationId);
406   result->createAttribute("LogFile", a_logFile);
407   result->createAttribute("SplitLog", a_splitLog ? "yes" : "no");
408   result->createAttribute("DetailedLog", a_detailedLog ? "yes" : "no");
409   result->createAttribute("DumpLog", a_dumpLog ? "yes" : "no");
410   result->createAttribute("BurstLogFile", a_burstLogFile);
411   result->createAttribute("RequestRetransmissions", a_requestRetransmissions);
412
413   anna::xml::Node* commEngine = result->createChild("CommEngine");
414   a_commEngine->asXML(commEngine);
415
416   return result;
417 }
418
419 std::string RealmNode::asXMLString() const throw() {
420   anna::xml::Node root("root");
421   return anna::xml::Compiler().apply(asXML(&root));
422 }