Fix symlinks
[anna.git] / source / diameter.comm / OriginHost.cpp
1 // ANNA - Anna is Not Nothingness Anymore                                                         //
2 //                                                                                                //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo                         //
4 //                                                                                                //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite                           //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
7
8
9 // Standard
10 #include <string>
11
12 // Project
13 #include <anna/diameter.comm/OriginHost.hpp>
14
15 #include <anna/diameter.comm/Engine.hpp>
16 #include <anna/diameter.comm/Message.hpp>
17 #include <anna/diameter.comm/Entity.hpp>
18 #include <anna/diameter.comm/LocalServer.hpp>
19 #include <anna/diameter/codec/EngineManager.hpp>
20 #include <anna/core/core.hpp>
21 #include <anna/time/Date.hpp>
22 #include <anna/xml/Compiler.hpp>
23
24 using namespace anna::diameter::comm;
25
26
27 OriginHost::OriginHost(anna::diameter::comm::Engine* commEngine, unsigned int applicationId) :
28                        a_commEngine(commEngine), a_applicationId(applicationId) {
29
30   a_codecEngine = anna::diameter::codec::EngineManager::instantiate().getCodecEngine(applicationId); // i know, this is going to exist (getCodecEngine is not null)
31
32   a_logFile = "";
33   a_burstLogFile = "";
34   a_splitLog = false;
35   a_detailedLog = false;
36   a_dumpLog = false;
37   a_entity = NULL;
38   a_diameterServer = NULL;
39
40   // Comm resources:
41   a_requestRetransmissions = 0;
42
43   // Burst
44   a_burstCycle = 1;
45   a_burstRepeat = false;
46   a_burstActive = false;
47   a_burstLoadIndx = 0;
48   a_burstDeliveryIt = a_burstMessages.begin();
49   a_otaRequest = 0;
50   a_burstPopCounter = 0;
51 }
52
53 const std::string &OriginHost::getName() const throw() {
54   return a_commEngine->getOriginHostName();
55 }
56
57 void OriginHost::createEntity(const std::string &entityRepresentation, const anna::Millisecond &bindTimeout, const anna::Millisecond &applicationTimeout) throw(anna::RuntimeException) {
58
59   anna::socket_v servers = anna::functions::getSocketVectorFromString(entityRepresentation);
60   std::string entityDescription = "Launcher diameter entity for "; entityDescription += getName();
61   a_entity = (anna::diameter::comm::Entity*)a_commEngine->createEntity(servers, entityDescription);
62   a_entity->setClassCodeTimeout(anna::diameter::comm::ClassCode::Bind, bindTimeout);
63   a_entity->setClassCodeTimeout(anna::diameter::comm::ClassCode::ApplicationMessage, applicationTimeout);
64 }
65
66 void OriginHost::createDiameterServer(const std::string &serverRepresentation, int sessions, const anna::Millisecond &inactivityTimeout, const anna::Millisecond &applicationTimeout, const std::string &ceaPathfile) throw(anna::RuntimeException) {
67
68   //if(sessions <= 0) return; negative implies no limit for accepted connections
69
70   std::string address; int port;
71   anna::functions::getAddressAndPortFromSocketLiteral(serverRepresentation, address, port);
72   std::string serverDescription = "Launcher diameter local server for "; serverDescription += getName();
73   a_commEngine->setCEA(ceaPathfile);
74   a_diameterServer = (anna::diameter::comm::LocalServer*)(a_commEngine->createLocalServer(address, port, sessions));
75           // we could set sessions = 0, and after application run(), use setMaxConnections(real sessions)
76           // over the local server in order to start it.
77
78   a_diameterServer->setDescription(serverDescription);
79   a_diameterServer->setAllowedInactivityTime(inactivityTimeout);
80   a_diameterServer->setClassCodeTimeout(anna::diameter::comm::ClassCode::ApplicationMessage, applicationTimeout);
81 }
82
83 anna::diameter::comm::Message *OriginHost::createCommMessage() throw(anna::RuntimeException) {
84   anna::diameter::comm::Message *result = a_commMessages.create();
85   result->setRetries(a_requestRetransmissions);
86   if (a_requestRetransmissions > 0) result->setOnExpiry(anna::diameter::comm::Message::OnExpiry::Retransmit);
87   return result;
88 }
89
90
91 void OriginHost::releaseCommMessage(anna::diameter::comm::Message *msg) throw() {
92   a_commMessages.release(msg);
93 }
94
95
96 void OriginHost::writeLogFile(const anna::DataBlock & db, const std::string &logExtension, const std::string &detail) const throw() {
97   anna::diameter::codec::Message codecMsg;
98   try { codecMsg.decode(db); } catch(anna::RuntimeException &ex) { ex.trace(); }
99   writeLogFile(codecMsg, logExtension, detail);
100 }
101
102 // Already decoded:
103 void OriginHost::writeLogFile(const anna::diameter::codec::Message &decodedMessage, const std::string &logExtension, const std::string &detail) const throw() {
104   // Open target file:
105   std::string targetFile = a_logFile;
106
107   if(a_splitLog) {
108     targetFile += ".";
109     targetFile += logExtension;
110   }
111
112   std::ofstream out(targetFile.c_str(), std::ifstream::out | std::ifstream::app);
113   // Set text to dump:
114   std::string title = "[";
115   title += logExtension;
116   title += "]";
117   // Build complete log:
118   std::string log = "\n";
119   std::string xml = decodedMessage.asXMLString();
120
121
122   if(a_detailedLog) {
123     anna::time::Date now;
124     now.setNow();
125     title += " ";
126     title += now.asString();
127     log += anna::functions::highlight(title, anna::functions::TextHighlightMode::OverAndUnderline);
128     log += xml;
129     log += "\n";
130     log += anna::functions::highlight("Used resource");
131     log += detail;
132     log += "\n";
133   } else {
134     log += title;
135     log += "\n";
136     log += xml;
137     log += "\n";
138   }
139
140   if(a_dumpLog) {
141     // <unix ms timestamp>.<originHost>.<hop by hop>.<end to end>.<message code>.<request|answer>.<type of event>.xml
142     std::string name = anna::functions::asString((anna::Millisecond)anna::functions::millisecond());
143     name += ".";
144     name += getCommEngine()->getOriginHostName();
145     name += ".";
146     name += anna::functions::asString(decodedMessage.getHopByHop());
147     name += ".";
148     name += anna::functions::asString(decodedMessage.getEndToEnd());
149     name += ".";
150     name += anna::functions::asString(decodedMessage.getId().first);
151     name += ".";
152     name += ((decodedMessage.getId().second) ? "request.":"answer.");
153     name += logExtension;
154     name += ".xml";
155     std::ofstream outMsg(name.c_str(), std::ifstream::out | std::ifstream::app);
156     outMsg.write(xml.c_str(), xml.size());
157     outMsg.close();
158   }
159
160   // Write and close
161   out.write(log.c_str(), log.size());
162   out.close();
163 }
164
165 void OriginHost::writeBurstLogFile(const std::string &buffer) throw() {
166   std::ofstream out(a_burstLogFile.c_str(), std::ifstream::out | std::ifstream::app);
167   out.write(buffer.c_str(), buffer.size());
168   out.close();    // close() will be called when the object is destructed (i.e., when it goes out of scope).
169   // you'd call close() only if you indeed for some reason wanted to close the filestream
170   // earlier than it goes out of scope.
171 }
172
173 int OriginHost::clearBurst() throw() {
174   int size = a_burstMessages.size();
175
176   if(size) {
177     std::map<int, anna::diameter::comm::Message*>::const_iterator it;
178     std::map<int, anna::diameter::comm::Message*>::const_iterator it_min(a_burstMessages.begin());
179     std::map<int, anna::diameter::comm::Message*>::const_iterator it_max(a_burstMessages.end());
180
181     for(it = it_min; it != it_max; it++) releaseCommMessage((*it).second);
182
183     a_burstMessages.clear();
184   } else {
185     std::string msg = "Burst list already empty. Nothing done";
186     std::cout << msg << std::endl;
187     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
188   }
189
190   a_burstActive = false;
191   a_burstLoadIndx = 0;
192   a_burstDeliveryIt = a_burstMessages.begin();
193   return size;
194 }
195
196 int OriginHost::loadBurstMessage(const anna::DataBlock & db) throw(anna::RuntimeException) {
197   anna::diameter::comm::Message *msg = createCommMessage();
198   msg->setBody(db);
199   a_burstMessages[a_burstLoadIndx++] = msg;
200   return (a_burstLoadIndx - 1);
201 }
202
203 int OriginHost::stopBurst() throw() {
204   if(!a_burstActive) {
205     std::string msg = "Burst launch is already stopped. Nothing done";
206     std::cout << msg << std::endl;
207     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
208     return -1;
209   }
210
211   a_burstActive = false;
212   // Remaining on cycle:
213   return (a_burstMessages.size() - (*a_burstDeliveryIt).first);
214 }
215
216 int OriginHost::popBurst(int releaseAmount) throw() {
217   if(!a_burstActive) {
218     std::string msg = "Burst launch is stopped. Nothing done";
219     std::cout << msg << std::endl;
220     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
221     return -1;
222   }
223
224   if(releaseAmount < 1) {
225     std::string msg = "No valid release amount is specified. Ignoring burst pop";
226     std::cout << msg << std::endl;
227     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
228     return -2;
229   }
230
231   int currentOTArequests = a_entity->getOTARequests();
232   a_burstPopCounter = (releaseAmount > currentOTArequests) ? currentOTArequests : releaseAmount;
233   return a_burstPopCounter;
234 }
235
236 int OriginHost::pushBurst(int loadAmount) throw() {
237   if(a_burstMessages.size() == 0) {
238     std::string msg = "Burst data not found (empty list). Ignoring burst launch";
239     std::cout << msg << std::endl;
240     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
241     return -1;
242   }
243
244   if(loadAmount < 1) {
245     std::string msg = "No valid load amount is specified. Ignoring burst push";
246     std::cout << msg << std::endl;
247     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
248     return -2;
249   }
250
251   a_burstActive = true;
252   int count;
253
254   for(count = 0; count < loadAmount; count++)
255     if(!sendBurstMessage()) break;
256
257   return count;
258 }
259
260 int OriginHost::sendBurst(int loadAmount) throw() {
261   if(a_burstMessages.size() == 0) {
262     std::string msg = "Burst data not found (empty list). Ignoring burst launch";
263     std::cout << msg << std::endl;
264     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
265     return -1;
266   }
267
268   if(loadAmount < 1) {
269     std::string msg = "No valid load amount is specified. Ignoring burst send";
270     std::cout << msg << std::endl;
271     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
272     return -2;
273   }
274
275   int count;
276
277   for(count = 0; count < loadAmount; count++)
278     if(!sendBurstMessage(true /* anyway */)) break;
279
280   return count;
281 }
282
283 int OriginHost::startBurst(int initialLoad) throw() {
284   if(initialLoad < 1) {
285     std::string msg = "No initial load is specified. Ignoring burst start";
286     std::cout << msg << std::endl;
287     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
288     return -2;
289   }
290
291   a_burstActive = true;
292   a_burstCycle = 1;
293   a_burstDeliveryIt = a_burstMessages.begin();
294   return (pushBurst(initialLoad));
295 }
296
297 bool OriginHost::sendBurstMessage(bool anyway) throw() {
298   if(!anyway && !burstActive()) return false;
299
300   if(a_burstPopCounter > 0) {
301     if(burstLogEnabled()) writeBurstLogFile("x");
302
303     a_burstPopCounter--;
304     return false;
305   }
306
307   if(a_burstDeliveryIt == a_burstMessages.end()) {
308     a_burstDeliveryIt = a_burstMessages.begin();
309
310     if(!anyway) {
311       if(a_burstRepeat) {
312         a_burstCycle++;
313
314         if(burstLogEnabled()) writeBurstLogFile(anna::functions::asString("\nCompleted burst cycle. Starting again (repeat mode) on cycle %d.\n", a_burstCycle));
315       } else {
316         if(burstLogEnabled()) writeBurstLogFile("\nCompleted burst cycle. Burst finished (repeat mode disabled).\n");
317
318         stopBurst();
319         return false;
320       }
321     }
322   }
323
324   anna::diameter::comm::Message *msg = (*a_burstDeliveryIt).second;
325   int order = (*a_burstDeliveryIt).first + 1;
326   a_burstDeliveryIt++;
327   bool dot = true;
328   // sending
329   bool result = a_entity->send(msg);
330
331   if(burstLogEnabled()) {
332     if(a_burstMessages.size() >= 100)
333       dot = (order  % (a_burstMessages.size() / 100));
334
335     if(dot) {
336       writeBurstLogFile(".");
337     } else {
338       writeBurstLogFile(anna::functions::asString(" %d", order));
339       int otaReqs  = a_entity->getOTARequests();
340
341       if(result && (otaReqs != a_otaRequest)) {
342         // false if was a sending after an answer received (no OTA change in this case)
343         // true after push and pop operations
344         a_otaRequest = otaReqs;
345         writeBurstLogFile(anna::functions::asString("[OTA %d]", a_otaRequest));
346       }
347     }
348   }
349
350   // Detailed log:
351   if(logEnabled()) {
352     anna::diameter::comm::Server *usedServer = a_entity->getLastUsedResource();
353     anna::diameter::comm::ClientSession *usedClientSession = usedServer ? usedServer->getLastUsedResource() : NULL;
354     std::string detail = usedClientSession ? usedClientSession->asString() : "<null client session>"; // esto no deberia ocurrir
355     writeLogFile(msg->getBody(), (result ? "sent2e" : "send2eError"), detail); // el del nodo de trabajo
356   }
357
358   return result;
359 }
360
361 std::string OriginHost::lookBurst(int order) const throw() {
362
363   if (order == -1) order = a_burstDeliveryIt->first;
364
365   std::string result = "No message found for order provided (";
366   result += anna::functions::asString(order);
367   result += ")";
368   std::map<int, anna::diameter::comm::Message*>::const_iterator it = a_burstMessages.find(order - 1);
369
370   if(it != a_burstMessages.end()) {
371     anna::diameter::codec::Message codecMsg;
372     try { codecMsg.decode((*it).second->getBody()); result = codecMsg.asXMLString(); } catch(anna::RuntimeException &ex) { ex.trace(); }
373   }
374
375   return result;
376 }
377
378 std::string OriginHost::gotoBurst(int order) throw() {
379   std::string result = "Position not found for order provided (";
380   std::map<int, anna::diameter::comm::Message*>::iterator it = a_burstMessages.find(order - 1);
381
382   if(it != a_burstMessages.end()) {
383     a_burstDeliveryIt = it;
384     result = "Position updated for order provided (";
385   }
386
387   result += anna::functions::asString(order);
388   result += ")";
389   return result;
390 }
391
392 anna::xml::Node* OriginHost::asXML(anna::xml::Node* parent) const
393 throw() {
394   anna::xml::Node* result = parent->createChild("OriginHost");
395
396   result->createAttribute("originHost", getName());
397   result->createAttribute("ApplicationId", a_applicationId);
398   result->createAttribute("originRealm", a_commEngine->getOriginRealmName());
399   result->createAttribute("LogFile", a_logFile);
400   result->createAttribute("SplitLog", a_splitLog ? "yes" : "no");
401   result->createAttribute("DetailedLog", a_detailedLog ? "yes" : "no");
402   result->createAttribute("DumpLog", a_dumpLog ? "yes" : "no");
403   result->createAttribute("BurstLogFile", a_burstLogFile);
404   result->createAttribute("RequestRetransmissions", a_requestRetransmissions);
405
406   anna::xml::Node* commEngine = result->createChild("CommEngine");
407   a_commEngine->asXML(commEngine);
408
409   return result;
410 }
411
412 std::string OriginHost::asXMLString() const throw() {
413   anna::xml::Node root("root");
414   return anna::xml::Compiler().apply(asXML(&root));
415 }