Improvements from anna fork
[anna.git] / example / diameter / launcher / deployments / aots / agents / KAFKA / Producer.py
1 #!/usr/bin/env python
2
3 import os, sys, datetime, logging, json, time
4
5 from kafka import KafkaProducer
6 from json import loads
7 from argparse import ArgumentParser
8
9
10 def _exit(message = None, rc=0):
11   if (message): printMsg(message)
12   printMsg("Exiting [rc={}]".format(rc))
13   sys.exit(rc)
14
15
16 def timeFormat():
17   return '%b.%d.%Y-%H.%M.%S.%f'
18
19
20 def printMsg(message):
21   print ("[{}] [{}] {}".format(datetime.datetime.now().strftime(timeFormat()), GNode, message))
22
23
24 class MyKafkaProducer:
25     kafka_service = os.environ.get('KAFKA_SERVER')
26     kafka_port = os.environ.get('KAFKA_PORT')
27
28     if not kafka_service: kafka_service = "localhost"
29     if not kafka_port: kafka_port = "9092"
30
31     server = kafka_service + ':' + kafka_port
32
33     def __init__(self):
34         self.producer = KafkaProducer(bootstrap_servers=[self.server],
35                         value_serializer=lambda x:
36                         json.dumps(x).encode('utf-8'))
37
38     def send_data(self, topic, data):
39         json = self.__read_json_data(data)
40         self.producer.send(topic, value=json)
41         self.producer.flush()
42         printMsg("Json produced on topic '{}': {}".format(topic, json))
43
44     def __read_json_data(self,jsonData):
45         with open(jsonData,'r') as schema_file:
46             schema_data = schema_file.read()
47         schema = json.loads(schema_data)
48         return schema
49
50     def set_producer_topic(self, kafka_topic):
51         self.kafka_topic = kafka_topic
52
53     def close(self):
54         self.producer.close()
55
56 class MyProducer:
57     #def __init__(self):
58     producer = MyKafkaProducer()
59
60     def send_data(self, topic, data):
61         self.producer.send_data(topic, data)
62
63     def close(self):
64         self.producer.close()
65
66
67 def parse_arguments():
68
69   parser = ArgumentParser(description='Kafka Producer Simulator')
70   parser.add_argument('-n', '--node', help='Node name (spaces are removed)', required=True)
71   parser.add_argument('-t', '--topic', help='Kafka topic', required=True)
72   parser.add_argument('-w', '--delay', help='Delay (in milliseconds) before producing', required=False)
73   parser.add_argument('-f', '--file', help='Json file to produce', required=True)
74   parser.add_argument('-d', '--debug', help='Debug kafka', required=False, action='store_true')
75
76   arguments = parser.parse_args()
77
78   return arguments
79
80
81 #####################
82 #      M A I N      #
83 #####################
84
85 if __name__ == "__main__":
86
87   arguments = parse_arguments()
88
89   GNode = arguments.node.replace(" ", "")
90   topic = arguments.topic
91   delay = arguments.delay
92   jfile = arguments.file
93   debug = arguments.debug
94
95   printMsg("Starting")
96
97   if (debug):
98     now = datetime.datetime.now().strftime(timeFormat())
99     logfile='{}-Producer-{}.log'.format(GNode, now)
100     printMsg("Logging kafka in: '{}'".format(logfile))
101     logging.basicConfig(filename=logfile, format='%(asctime)s - %(message)s', datefmt=timeFormat(), level=logging.DEBUG)
102
103   if not os.path.exists(jfile):
104     _exit("Invalid provided file '{}'".format(jfile), 1)
105
106   if (delay):
107     try:
108       delay = int(delay)
109     except:
110       _exit("Invalid provided delay '{}'. Must be an integer value !".format(str(delay)))
111
112     printMsg("Delay before producing: {} milliseconds ...".format(delay))
113     time.sleep(delay / 1000)
114     printMsg("Delay completed !".format(delay))
115
116   producer = None
117   try:
118     producer = MyProducer()
119     producer.send_data(topic, jfile)
120     producer.close()
121     _exit()
122
123   except Exception as e:
124     producer.close()
125     _exit(e, 1)