Fix local server for multiple applications
[anna.git] / source / diameter.comm / OriginHost.cpp
1 // ANNA - Anna is Not Nothingness Anymore                                                         //
2 //                                                                                                //
3 // (c) Copyright 2005-2015 Eduardo Ramos Testillano & Francisco Ruiz Rayo                         //
4 //                                                                                                //
5 // See project site at http://redmine.teslayout.com/projects/anna-suite                           //
6 // See accompanying file LICENSE or copy at http://www.teslayout.com/projects/public/anna.LICENSE //
7
8
9 // Standard
10 #include <string>
11
12 // Project
13 #include <anna/diameter.comm/OriginHost.hpp>
14
15 #include <anna/diameter.comm/Engine.hpp>
16 #include <anna/diameter.comm/Message.hpp>
17 #include <anna/diameter.comm/Entity.hpp>
18 #include <anna/diameter.comm/LocalServer.hpp>
19 #include <anna/diameter/codec/EngineManager.hpp>
20 #include <anna/core/core.hpp>
21 #include <anna/time/Date.hpp>
22 #include <anna/xml/Compiler.hpp>
23
24 using namespace anna::diameter::comm;
25
26
27 OriginHost::OriginHost(anna::diameter::comm::Engine* commEngine, unsigned int applicationId) :
28                        a_commEngine(commEngine), a_applicationId(applicationId) {
29
30   a_codecEngine = anna::diameter::codec::EngineManager::instantiate().getCodecEngine(applicationId); // i know, this is going to exist (getCodecEngine is not null)
31
32   a_logFile = "";
33   a_burstLogFile = "";
34   a_splitLog = false;
35   a_detailedLog = false;
36   a_dumpLog = false;
37   a_entity = NULL;
38   a_diameterServer = NULL;
39
40   // Comm resources:
41   a_requestRetransmissions = 0;
42
43   // Burst
44   a_burstCycle = 1;
45   a_burstRepeat = false;
46   a_burstActive = false;
47   a_burstLoadIndx = 0;
48   a_burstDeliveryIt = a_burstMessages.begin();
49   a_otaRequest = 0;
50   a_burstPopCounter = 0;
51 }
52
53 const std::string &OriginHost::getName() const {
54   return a_commEngine->getOriginHostName();
55 }
56
57 void OriginHost::createEntity(const std::string &entityRepresentation, const anna::Millisecond &bindTimeout, const anna::Millisecond &applicationTimeout) noexcept(false) {
58
59   anna::socket_v servers = anna::functions::getSocketVectorFromString(entityRepresentation);
60   std::string entityDescription = "Launcher diameter entity for "; entityDescription += getName();
61   a_entity = (anna::diameter::comm::Entity*)a_commEngine->createEntity(servers, entityDescription);
62   a_entity->setClassCodeTimeout(anna::diameter::comm::ClassCode::Bind, bindTimeout);
63   a_entity->setClassCodeTimeout(anna::diameter::comm::ClassCode::ApplicationMessage, applicationTimeout);
64 }
65
66 void OriginHost::createDiameterServer(const std::string &serverRepresentation, int sessions, const anna::Millisecond &inactivityTimeout, const anna::Millisecond &applicationTimeout, const std::string &ceaPathfile) noexcept(false) {
67
68   //if(sessions <= 0) return; negative implies no limit for accepted connections
69
70   std::string address; int port;
71   anna::functions::getAddressAndPortFromSocketLiteral(serverRepresentation, address, port);
72   std::string serverDescription = "Launcher diameter local server for "; serverDescription += getName();
73   a_commEngine->setCEA(ceaPathfile);
74
75   a_diameterServer = (anna::diameter::comm::LocalServer*)(a_commEngine->createLocalServer(address, port, sessions));
76           // we could set sessions = 0, and after application run(), use setMaxConnections(real sessions)
77           // over the local server in order to start it.
78
79   a_diameterServer->setDescription(serverDescription);
80   a_diameterServer->setAllowedInactivityTime(inactivityTimeout);
81   a_diameterServer->setClassCodeTimeout(anna::diameter::comm::ClassCode::ApplicationMessage, applicationTimeout);
82 }
83
84 anna::diameter::comm::Message *OriginHost::createCommMessage() noexcept(false) {
85   anna::diameter::comm::Message *result = a_commMessages.create();
86   result->setRetries(a_requestRetransmissions);
87   if (a_requestRetransmissions > 0) result->setOnExpiry(anna::diameter::comm::Message::OnExpiry::Retransmit);
88   return result;
89 }
90
91
92 void OriginHost::releaseCommMessage(anna::diameter::comm::Message *msg) {
93   a_commMessages.release(msg);
94 }
95
96
97 void OriginHost::writeLogFile(const anna::DataBlock & db, const std::string &logExtension, const std::string &detail) const {
98   anna::diameter::codec::Message codecMsg;
99   try { codecMsg.decode(db); } catch(anna::RuntimeException &ex) { ex.trace(); }
100   writeLogFile(codecMsg, logExtension, detail);
101 }
102
103 // Already decoded:
104 void OriginHost::writeLogFile(const anna::diameter::codec::Message &decodedMessage, const std::string &logExtension, const std::string &detail) const {
105   // Open target file:
106   std::string targetFile = a_logFile;
107
108   if(a_splitLog) {
109     targetFile += ".";
110     targetFile += logExtension;
111   }
112
113   std::ofstream out(targetFile.c_str(), std::ifstream::out | std::ifstream::app);
114   // Set text to dump:
115   std::string title = "[";
116   title += logExtension;
117   title += "]";
118   // Build complete log:
119   std::string log = "\n";
120   std::string xml = decodedMessage.asXMLString();
121
122
123   if(a_detailedLog) {
124     anna::time::Date now;
125     now.setNow();
126     title += " ";
127     title += now.asString();
128     log += anna::functions::highlight(title, anna::functions::TextHighlightMode::OverAndUnderline);
129     log += xml;
130     log += "\n";
131     log += anna::functions::highlight("Used resource");
132     log += detail;
133     log += "\n";
134   } else {
135     log += title;
136     log += "\n";
137     log += xml;
138     log += "\n";
139   }
140
141   if(a_dumpLog) {
142     // <unix ms timestamp>.<originHost>.<hop by hop>.<end to end>.<message code>.<request|answer>.<type of event>.xml
143     std::string name = anna::functions::asString((anna::Millisecond)anna::functions::millisecond());
144     name += ".";
145     name += getCommEngine()->getOriginHostName();
146     name += ".";
147     name += anna::functions::asString(decodedMessage.getHopByHop());
148     name += ".";
149     name += anna::functions::asString(decodedMessage.getEndToEnd());
150     name += ".";
151     name += anna::functions::asString(decodedMessage.getId().first);
152     name += ".";
153     name += ((decodedMessage.getId().second) ? "request.":"answer.");
154     name += logExtension;
155     name += ".xml";
156     std::ofstream outMsg(name.c_str(), std::ifstream::out | std::ifstream::app);
157     outMsg.write(xml.c_str(), xml.size());
158     outMsg.close();
159   }
160
161   // Write and close
162   out.write(log.c_str(), log.size());
163   out.close();
164 }
165
166 void OriginHost::writeBurstLogFile(const std::string &buffer) {
167   std::ofstream out(a_burstLogFile.c_str(), std::ifstream::out | std::ifstream::app);
168   out.write(buffer.c_str(), buffer.size());
169   out.close();    // close() will be called when the object is destructed (i.e., when it goes out of scope).
170   // you'd call close() only if you indeed for some reason wanted to close the filestream
171   // earlier than it goes out of scope.
172 }
173
174 int OriginHost::clearBurst() {
175   int size = a_burstMessages.size();
176
177   if(size) {
178     std::map<int, anna::diameter::comm::Message*>::const_iterator it;
179     std::map<int, anna::diameter::comm::Message*>::const_iterator it_min(a_burstMessages.begin());
180     std::map<int, anna::diameter::comm::Message*>::const_iterator it_max(a_burstMessages.end());
181
182     for(it = it_min; it != it_max; it++) releaseCommMessage((*it).second);
183
184     a_burstMessages.clear();
185   } else {
186     std::string msg = "Burst list already empty. Nothing done";
187     std::cout << msg << std::endl;
188     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
189   }
190
191   a_burstActive = false;
192   a_burstLoadIndx = 0;
193   a_burstDeliveryIt = a_burstMessages.begin();
194   return size;
195 }
196
197 int OriginHost::loadBurstMessage(const anna::DataBlock & db) noexcept(false) {
198   anna::diameter::comm::Message *msg = createCommMessage();
199   msg->setBody(db);
200   a_burstMessages[a_burstLoadIndx++] = msg;
201   return (a_burstLoadIndx - 1);
202 }
203
204 int OriginHost::stopBurst() {
205   if(!a_burstActive) {
206     std::string msg = "Burst launch is already stopped. Nothing done";
207     std::cout << msg << std::endl;
208     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
209     return -1;
210   }
211
212   a_burstActive = false;
213   // Remaining on cycle:
214   return (a_burstMessages.size() - (*a_burstDeliveryIt).first);
215 }
216
217 int OriginHost::popBurst(int releaseAmount) {
218   if(!a_burstActive) {
219     std::string msg = "Burst launch is stopped. Nothing done";
220     std::cout << msg << std::endl;
221     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
222     return -1;
223   }
224
225   if(releaseAmount < 1) {
226     std::string msg = "No valid release amount is specified. Ignoring burst pop";
227     std::cout << msg << std::endl;
228     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
229     return -2;
230   }
231
232   int currentOTArequests = a_entity->getOTARequests();
233   a_burstPopCounter = (releaseAmount > currentOTArequests) ? currentOTArequests : releaseAmount;
234   return a_burstPopCounter;
235 }
236
237 int OriginHost::pushBurst(int loadAmount) {
238   if(a_burstMessages.size() == 0) {
239     std::string msg = "Burst data not found (empty list). Ignoring burst launch";
240     std::cout << msg << std::endl;
241     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
242     return -1;
243   }
244
245   if(loadAmount < 1) {
246     std::string msg = "No valid load amount is specified. Ignoring burst push";
247     std::cout << msg << std::endl;
248     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
249     return -2;
250   }
251
252   a_burstActive = true;
253   int count;
254
255   for(count = 0; count < loadAmount; count++)
256     if(!sendBurstMessage()) break;
257
258   return count;
259 }
260
261 int OriginHost::sendBurst(int loadAmount) {
262   if(a_burstMessages.size() == 0) {
263     std::string msg = "Burst data not found (empty list). Ignoring burst launch";
264     std::cout << msg << std::endl;
265     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
266     return -1;
267   }
268
269   if(loadAmount < 1) {
270     std::string msg = "No valid load amount is specified. Ignoring burst send";
271     std::cout << msg << std::endl;
272     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
273     return -2;
274   }
275
276   int count;
277
278   for(count = 0; count < loadAmount; count++)
279     if(!sendBurstMessage(true /* anyway */)) break;
280
281   return count;
282 }
283
284 int OriginHost::startBurst(int initialLoad) {
285   if(initialLoad < 1) {
286     std::string msg = "No initial load is specified. Ignoring burst start";
287     std::cout << msg << std::endl;
288     LOGWARNING(anna::Logger::warning(msg, ANNA_FILE_LOCATION));
289     return -2;
290   }
291
292   a_burstActive = true;
293   a_burstCycle = 1;
294   a_burstDeliveryIt = a_burstMessages.begin();
295   return (pushBurst(initialLoad));
296 }
297
298 bool OriginHost::sendBurstMessage(bool anyway) {
299   if(!anyway && !burstActive()) return false;
300
301   if(a_burstPopCounter > 0) {
302     if(burstLogEnabled()) writeBurstLogFile("x");
303
304     a_burstPopCounter--;
305     return false;
306   }
307
308   if(a_burstDeliveryIt == a_burstMessages.end()) {
309     a_burstDeliveryIt = a_burstMessages.begin();
310
311     if(!anyway) {
312       if(a_burstRepeat) {
313         a_burstCycle++;
314
315         if(burstLogEnabled()) writeBurstLogFile(anna::functions::asString("\nCompleted burst cycle. Starting again (repeat mode) on cycle %d.\n", a_burstCycle));
316       } else {
317         if(burstLogEnabled()) writeBurstLogFile("\nCompleted burst cycle. Burst finished (repeat mode disabled).\n");
318
319         stopBurst();
320         return false;
321       }
322     }
323   }
324
325   anna::diameter::comm::Message *msg = (*a_burstDeliveryIt).second;
326   int order = (*a_burstDeliveryIt).first + 1;
327   a_burstDeliveryIt++;
328   bool dot = true;
329   // sending
330   bool result = a_entity->send(msg);
331
332   if(burstLogEnabled()) {
333     if(a_burstMessages.size() >= 100)
334       dot = (order  % (a_burstMessages.size() / 100));
335
336     if(dot) {
337       writeBurstLogFile(".");
338     } else {
339       writeBurstLogFile(anna::functions::asString(" %d", order));
340       int otaReqs  = a_entity->getOTARequests();
341
342       if(result && (otaReqs != a_otaRequest)) {
343         // false if was a sending after an answer received (no OTA change in this case)
344         // true after push and pop operations
345         a_otaRequest = otaReqs;
346         writeBurstLogFile(anna::functions::asString("[OTA %d]", a_otaRequest));
347       }
348     }
349   }
350
351   // Detailed log:
352   if(logEnabled()) {
353     anna::diameter::comm::Server *usedServer = a_entity->getLastUsedResource();
354     anna::diameter::comm::ClientSession *usedClientSession = usedServer ? usedServer->getLastUsedResource() : NULL;
355     std::string detail = usedClientSession ? usedClientSession->asString() : "[null client session]"; // esto no deberia ocurrir
356     writeLogFile(msg->getBody(), (result ? "sent2e" : "send2eError"), detail); // el del nodo de trabajo
357   }
358
359   return result;
360 }
361
362 std::string OriginHost::lookBurst(int order) const {
363
364   if (order == -1) order = a_burstDeliveryIt->first;
365
366   std::string result = "No message found for order provided (";
367   result += anna::functions::asString(order);
368   result += ")";
369   std::map<int, anna::diameter::comm::Message*>::const_iterator it = a_burstMessages.find(order - 1);
370
371   if(it != a_burstMessages.end()) {
372     anna::diameter::codec::Message codecMsg;
373     try { codecMsg.decode((*it).second->getBody()); result = codecMsg.asXMLString(); } catch(anna::RuntimeException &ex) { ex.trace(); }
374   }
375
376   return result;
377 }
378
379 std::string OriginHost::gotoBurst(int order) {
380   std::string result = "Position not found for order provided (";
381   std::map<int, anna::diameter::comm::Message*>::iterator it = a_burstMessages.find(order - 1);
382
383   if(it != a_burstMessages.end()) {
384     a_burstDeliveryIt = it;
385     result = "Position updated for order provided (";
386   }
387
388   result += anna::functions::asString(order);
389   result += ")";
390   return result;
391 }
392
393 anna::xml::Node* OriginHost::asXML(anna::xml::Node* parent) const
394 {
395   anna::xml::Node* result = parent->createChild("OriginHost");
396
397   result->createAttribute("originHost", getName());
398   result->createAttribute("ApplicationId", a_applicationId);
399   result->createAttribute("originRealm", a_commEngine->getOriginRealmName());
400   result->createAttribute("LogFile", a_logFile);
401   result->createAttribute("SplitLog", a_splitLog ? "yes" : "no");
402   result->createAttribute("DetailedLog", a_detailedLog ? "yes" : "no");
403   result->createAttribute("DumpLog", a_dumpLog ? "yes" : "no");
404   result->createAttribute("BurstLogFile", a_burstLogFile);
405   result->createAttribute("RequestRetransmissions", a_requestRetransmissions);
406
407   anna::xml::Node* commEngine = result->createChild("CommEngine");
408   a_commEngine->asXML(commEngine);
409
410   return result;
411 }
412
413 std::string OriginHost::asXMLString() const {
414   anna::xml::Node root("root");
415   return anna::xml::Compiler().apply(asXML(&root));
416 }