Dynamic lib selection and deployment
[anna.git] / example / diameter / launcher / OriginHost.cpp
1 // ANNA - Anna is Not Nothingness Anymore                                                         //
2 //                                                                                                //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo                         //
4 //                                                                                                //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite                           //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
7
8
9 // Standard
10 #include <string>
11
12 // Project
13 #include <anna/diameter.comm/Message.hpp>
14 #include <anna/diameter/stack/Dictionary.hpp>
15 #include <anna/diameter/codec/EngineManager.hpp>
16 #include <anna/core/core.hpp>
17 #include <anna/time/Date.hpp>
18 #include <anna/xml/Compiler.hpp>
19
20 // Process
21 #include <OriginHost.hpp>
22 #include <MyDiameterEngine.hpp>
23
24
25 namespace anna {
26   namespace diameter {
27     namespace stack {
28       class Dictionary;
29     }
30   }
31 }
32
33 OriginHost::OriginHost(const std::string &originHost, unsigned int applicationId, const anna::diameter::stack::Dictionary *baseProtocolDictionary) :
34   a_originHost(originHost), a_applicationId(applicationId) {
35
36   std::string commEngineName = a_originHost + "_DiameterCommEngine";
37   a_commEngine = new MyDiameterEngine(commEngineName.c_str(), baseProtocolDictionary);
38   a_commEngine->setAutoBind(false);  // allow to create client-sessions without binding them, in order to set timeouts.
39   a_codecEngine = anna::diameter::codec::EngineManager::instantiate().getCodecEngine(applicationId); // i know, this is going to exist (getCodecEngine is not null)
40
41   a_logFile = "";
42   a_burstLogFile = "";
43   a_splitLog = false;
44   a_detailedLog = false;
45   a_dumpLog = false;
46   a_entity = NULL;
47   a_diameterServer = NULL;
48
49   // Comm resources:
50   a_allowedInactivityTime = (anna::Millisecond)90000;
51   a_tcpConnectDelay = (anna::Millisecond)200;
52   a_answersTimeout = (anna::Millisecond)10000;
53   a_ceaTimeout = (anna::Millisecond)10000;
54   a_watchdogPeriod = (anna::Millisecond)30000;
55   a_requestRetransmissions = 0;
56
57   // Burst
58   a_burstCycle = 1;
59   a_burstRepeat = false;
60   a_burstActive = false;
61   a_burstLoadIndx = 0;
62   a_burstDeliveryIt = a_burstMessages.begin();
63   a_otaRequest = 0;
64   a_burstPopCounter = 0;
65 }
66
67
68 void OriginHost::createEntity(const std::string &entityRepresentation, const anna::Millisecond &bindTimeout, const anna::Millisecond &applicationTimeout) throw(anna::RuntimeException) {
69
70   anna::socket_v servers = anna::functions::getSocketVectorFromString(entityRepresentation);
71   std::string entityDescription = "Launcher diameter entity for "; entityDescription += a_originHost;
72   a_entity = (MyDiameterEntity*)(a_commEngine->createEntity(servers, entityDescription));
73   a_entity->setClassCodeTimeout(anna::diameter::comm::ClassCode::Bind, bindTimeout);
74   a_entity->setClassCodeTimeout(anna::diameter::comm::ClassCode::ApplicationMessage, applicationTimeout);
75 }
76
77 void OriginHost::startDiameterServer(const std::string &serverRepresentation, int sessions, const anna::Millisecond &inactivityTimeout, const anna::Millisecond &applicationTimeout, const std::string &ceaPathfile) throw(anna::RuntimeException) {
78
79   //if(sessions <= 0) return; negative implies no limit for accepted connections
80
81   std::string address; int port;
82   anna::functions::getAddressAndPortFromSocketLiteral(serverRepresentation, address, port);
83   std::string serverDescription = "Launcher diameter local server for "; serverDescription += a_originHost;
84   a_commEngine->setCEA(ceaPathfile);
85   a_diameterServer = (MyLocalServer*)(a_commEngine->createLocalServer(address, port, sessions));
86           // we could set sessions = 0, and after application run(), use setMaxConnections(real sessions)
87           // over the local server in order to start it.
88
89   a_diameterServer->setDescription(serverDescription);
90   a_diameterServer->setAllowedInactivityTime(inactivityTimeout);
91   a_diameterServer->setClassCodeTimeout(anna::diameter::comm::ClassCode::ApplicationMessage, applicationTimeout);
92 }
93
94 anna::diameter::comm::Message *OriginHost::createCommMessage() throw(anna::RuntimeException) {
95   anna::diameter::comm::Message *result = a_commMessages.create();
96   result->setRetries(a_requestRetransmissions);
97   if (a_requestRetransmissions > 0) result->setOnExpiry(anna::diameter::comm::Message::OnExpiry::Retransmit);
98   return result;
99 }
100
101
102 void OriginHost::releaseCommMessage(anna::diameter::comm::Message *msg) throw() {
103   a_commMessages.release(msg);
104 }
105
106
107 void OriginHost::writeLogFile(const anna::DataBlock & db, const std::string &logExtension, const std::string &detail) const throw() {
108   anna::diameter::codec::Message codecMsg;
109   try { codecMsg.decode(db); } catch(anna::RuntimeException &ex) { ex.trace(); }
110   writeLogFile(codecMsg, logExtension, detail);
111 }
112
113 // Already decoded:
114 void OriginHost::writeLogFile(const anna::diameter::codec::Message &decodedMessage, const std::string &logExtension, const std::string &detail) const throw() {
115   // Open target file:
116   std::string targetFile = a_logFile;
117
118   if(a_splitLog) {
119     targetFile += ".";
120     targetFile += logExtension;
121   }
122
123   std::ofstream out(targetFile.c_str(), std::ifstream::out | std::ifstream::app);
124   // Set text to dump:
125   std::string title = "[";
126   title += logExtension;
127   title += "]";
128   // Build complete log:
129   std::string log = "\n";
130   std::string xml = decodedMessage.asXMLString();
131
132
133   if(a_detailedLog) {
134     anna::time::Date now;
135     now.setNow();
136     title += " ";
137     title += now.asString();
138     log += anna::functions::highlight(title, anna::functions::TextHighlightMode::OverAndUnderline);
139     log += xml;
140     log += "\n";
141     log += anna::functions::highlight("Used resource");
142     log += detail;
143     log += "\n";
144   } else {
145     log += title;
146     log += "\n";
147     log += xml;
148     log += "\n";
149   }
150
151   if(a_dumpLog) {
152     // <unix ms timestamp>.<originHost>.<hop by hop>.<end to end>.<message code>.<request|answer>.<type of event>.xml
153     std::string name = anna::functions::asString((anna::Millisecond)anna::functions::millisecond());
154     name += ".";
155     name += getMyDiameterEngine()->getOriginHost();
156     name += ".";
157     name += anna::functions::asString(decodedMessage.getHopByHop());
158     name += ".";
159     name += anna::functions::asString(decodedMessage.getEndToEnd());
160     name += ".";
161     name += anna::functions::asString(decodedMessage.getId().first);
162     name += ".";
163     name += ((decodedMessage.getId().second) ? "request.":"answer.");
164     name += logExtension;
165     name += ".xml";
166     std::ofstream outMsg(name.c_str(), std::ifstream::out | std::ifstream::app);
167     outMsg.write(xml.c_str(), xml.size());
168     outMsg.close();
169   }
170
171   // Write and close
172   out.write(log.c_str(), log.size());
173   out.close();
174 }
175
176 void OriginHost::writeBurstLogFile(const std::string &buffer) throw() {
177   std::ofstream out(a_burstLogFile.c_str(), std::ifstream::out | std::ifstream::app);
178   out.write(buffer.c_str(), buffer.size());
179   out.close();    // close() will be called when the object is destructed (i.e., when it goes out of scope).
180   // you'd call close() only if you indeed for some reason wanted to close the filestream
181   // earlier than it goes out of scope.
182 }
183
184 int OriginHost::clearBurst() throw() {
185   int size = a_burstMessages.size();
186
187   if(size) {
188     std::map<int, anna::diameter::comm::Message*>::const_iterator it;
189     std::map<int, anna::diameter::comm::Message*>::const_iterator it_min(a_burstMessages.begin());
190     std::map<int, anna::diameter::comm::Message*>::const_iterator it_max(a_burstMessages.end());
191
192     for(it = it_min; it != it_max; it++) releaseCommMessage((*it).second);
193
194     a_burstMessages.clear();
195   } else {
196     std::string msg = "Burst list already empty. Nothing done";
197     std::cout << msg << std::endl;
198     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
199   }
200
201   a_burstActive = false;
202   a_burstLoadIndx = 0;
203   a_burstDeliveryIt = a_burstMessages.begin();
204   return size;
205 }
206
207 int OriginHost::loadBurstMessage(const anna::DataBlock & db) throw(anna::RuntimeException) {
208   anna::diameter::comm::Message *msg = createCommMessage();
209   msg->setBody(db);
210   a_burstMessages[a_burstLoadIndx++] = msg;
211   return (a_burstLoadIndx - 1);
212 }
213
214 int OriginHost::stopBurst() throw() {
215   if(!a_burstActive) {
216     std::string msg = "Burst launch is already stopped. Nothing done";
217     std::cout << msg << std::endl;
218     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
219     return -1;
220   }
221
222   a_burstActive = false;
223   // Remaining on cycle:
224   return (a_burstMessages.size() - (*a_burstDeliveryIt).first);
225 }
226
227 int OriginHost::popBurst(int releaseAmount) throw() {
228   if(!a_burstActive) {
229     std::string msg = "Burst launch is stopped. Nothing done";
230     std::cout << msg << std::endl;
231     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
232     return -1;
233   }
234
235   if(releaseAmount < 1) {
236     std::string msg = "No valid release amount is specified. Ignoring burst pop";
237     std::cout << msg << std::endl;
238     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
239     return -2;
240   }
241
242   int currentOTArequests = a_entity->getOTARequests();
243   a_burstPopCounter = (releaseAmount > currentOTArequests) ? currentOTArequests : releaseAmount;
244   return a_burstPopCounter;
245 }
246
247 int OriginHost::pushBurst(int loadAmount) throw() {
248   if(a_burstMessages.size() == 0) {
249     std::string msg = "Burst data not found (empty list). Ignoring burst launch";
250     std::cout << msg << std::endl;
251     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
252     return -1;
253   }
254
255   if(loadAmount < 1) {
256     std::string msg = "No valid load amount is specified. Ignoring burst push";
257     std::cout << msg << std::endl;
258     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
259     return -2;
260   }
261
262   a_burstActive = true;
263   int count;
264
265   for(count = 0; count < loadAmount; count++)
266     if(!sendBurstMessage()) break;
267
268   return count;
269 }
270
271 int OriginHost::sendBurst(int loadAmount) throw() {
272   if(a_burstMessages.size() == 0) {
273     std::string msg = "Burst data not found (empty list). Ignoring burst launch";
274     std::cout << msg << std::endl;
275     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
276     return -1;
277   }
278
279   if(loadAmount < 1) {
280     std::string msg = "No valid load amount is specified. Ignoring burst send";
281     std::cout << msg << std::endl;
282     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
283     return -2;
284   }
285
286   int count;
287
288   for(count = 0; count < loadAmount; count++)
289     if(!sendBurstMessage(true /* anyway */)) break;
290
291   return count;
292 }
293
294 int OriginHost::startBurst(int initialLoad) throw() {
295   if(initialLoad < 1) {
296     std::string msg = "No initial load is specified. Ignoring burst start";
297     std::cout << msg << std::endl;
298     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
299     return -2;
300   }
301
302   a_burstActive = true;
303   a_burstCycle = 1;
304   a_burstDeliveryIt = a_burstMessages.begin();
305   return (pushBurst(initialLoad));
306 }
307
308 bool OriginHost::sendBurstMessage(bool anyway) throw() {
309   if(!anyway && !burstActive()) return false;
310
311   if(a_burstPopCounter > 0) {
312     if(burstLogEnabled()) writeBurstLogFile("x");
313
314     a_burstPopCounter--;
315     return false;
316   }
317
318   if(a_burstDeliveryIt == a_burstMessages.end()) {
319     a_burstDeliveryIt = a_burstMessages.begin();
320
321     if(!anyway) {
322       if(a_burstRepeat) {
323         a_burstCycle++;
324
325         if(burstLogEnabled()) writeBurstLogFile(anna::functions::asString("\nCompleted burst cycle. Starting again (repeat mode) on cycle %d.\n", a_burstCycle));
326       } else {
327         if(burstLogEnabled()) writeBurstLogFile("\nCompleted burst cycle. Burst finished (repeat mode disabled).\n");
328
329         stopBurst();
330         return false;
331       }
332     }
333   }
334
335   anna::diameter::comm::Message *msg = (*a_burstDeliveryIt).second;
336   int order = (*a_burstDeliveryIt).first + 1;
337   a_burstDeliveryIt++;
338   bool dot = true;
339   // sending
340   bool result = a_entity->send(msg, anna::CommandLine::instantiate().exists("balance"));
341
342   if(burstLogEnabled()) {
343     if(a_burstMessages.size() >= 100)
344       dot = (order  % (a_burstMessages.size() / 100));
345
346     if(dot) {
347       writeBurstLogFile(".");
348     } else {
349       writeBurstLogFile(anna::functions::asString(" %d", order));
350       int otaReqs  = a_entity->getOTARequests();
351
352       if(result && (otaReqs != a_otaRequest)) {
353         // false if was a sending after an answer received (no OTA change in this case)
354         // true after push and pop operations
355         a_otaRequest = otaReqs;
356         writeBurstLogFile(anna::functions::asString("[OTA %d]", a_otaRequest));
357       }
358     }
359   }
360
361   // Detailed log:
362   if(logEnabled()) {
363     anna::diameter::comm::Server *usedServer = a_entity->getLastUsedResource();
364     anna::diameter::comm::ClientSession *usedClientSession = usedServer ? usedServer->getLastUsedResource() : NULL;
365     std::string detail = usedClientSession ? usedClientSession->asString() : "<null client session>"; // esto no deberia ocurrir
366     writeLogFile(msg->getBody(), (result ? "sent2e" : "send2eError"), detail); // el del nodo de trabajo
367   }
368
369   return result;
370 }
371
372 std::string OriginHost::lookBurst(int order) const throw() {
373
374   if (order == -1) order = a_burstDeliveryIt->first;
375
376   std::string result = "No message found for order provided (";
377   result += anna::functions::asString(order);
378   result += ")";
379   std::map<int, anna::diameter::comm::Message*>::const_iterator it = a_burstMessages.find(order - 1);
380
381   if(it != a_burstMessages.end()) {
382     anna::diameter::codec::Message codecMsg;
383     try { codecMsg.decode((*it).second->getBody()); result = codecMsg.asXMLString(); } catch(anna::RuntimeException &ex) { ex.trace(); }
384   }
385
386   return result;
387 }
388
389 std::string OriginHost::gotoBurst(int order) throw() {
390   std::string result = "Position not found for order provided (";
391   std::map<int, anna::diameter::comm::Message*>::iterator it = a_burstMessages.find(order - 1);
392
393   if(it != a_burstMessages.end()) {
394     a_burstDeliveryIt = it;
395     result = "Position updated for order provided (";
396   }
397
398   result += anna::functions::asString(order);
399   result += ")";
400   return result;
401 }
402
403 anna::xml::Node* OriginHost::asXML(anna::xml::Node* parent) const
404 throw() {
405   anna::xml::Node* result = parent->createChild("OriginHost");
406
407   result->createAttribute("originHost", a_originHost);
408   result->createAttribute("ApplicationId", a_applicationId);
409   result->createAttribute("originRealm", a_commEngine->getOriginRealm());
410   result->createAttribute("LogFile", a_logFile);
411   result->createAttribute("SplitLog", a_splitLog ? "yes" : "no");
412   result->createAttribute("DetailedLog", a_detailedLog ? "yes" : "no");
413   result->createAttribute("DumpLog", a_dumpLog ? "yes" : "no");
414   result->createAttribute("BurstLogFile", a_burstLogFile);
415   result->createAttribute("RequestRetransmissions", a_requestRetransmissions);
416
417   anna::xml::Node* commEngine = result->createChild("CommEngine");
418   a_commEngine->asXML(commEngine);
419
420   return result;
421 }
422
423 std::string OriginHost::asXMLString() const throw() {
424   anna::xml::Node root("root");
425   return anna::xml::Compiler().apply(asXML(&root));
426 }