Fix absolute path for build directory
[anna.git] / example / diameter / launcher / deployments / aots / agents / KAFKA / Consumer.py
1 #!/usr/bin/env python
2
3 import os, sys, re, datetime, logging, json, signal
4
5 from kafka import KafkaConsumer
6 from json import loads
7 from argparse import ArgumentParser
8
9
10 def _exit(message = None, rc=0):
11   if (message): printMsg(message)
12   printMsg("Exiting [rc={}]".format(rc))
13   sys.exit(rc)
14
15 def timeFormat():
16   return '%b.%d.%Y-%H.%M.%S.%f'
17
18
19 def printMsg(message):
20   print ("[{}] [{}] {}".format(datetime.datetime.now().strftime(timeFormat()), GNode, message))
21
22
23 class MyKafkaConsumer:
24     kafka_service = os.environ.get('KAFKA_SERVER')
25     kafka_port = os.environ.get('KAFKA_PORT')
26
27     if not kafka_service: kafka_service = "localhost"
28     if not kafka_port: kafka_port = "9092"
29
30     server = kafka_service + ':' + kafka_port
31
32     def __init__(self, topic, mregexp, aor, showReadWhenMatchingEnabled):
33
34         # Auto offset reset:
35         printMsg("Auto offset reset: '{}'".format(aor))
36
37         # Topic:
38         printMsg("Working topic: '{}'".format(topic))
39
40         # Show fulfilled:
41         self.showReadWhenMatchingEnabled = showReadWhenMatchingEnabled
42
43         # Json content:
44         self.json = None
45         if GJfile:
46           with open(GJfile) as json_file:
47             self.json = json.load(json_file)
48           printMsg("Established break condition for expected json ('{}'): {}".format(GJfile, self.json))
49         else:
50           printMsg("No expected json ('any' selected)")
51
52         # Interpreted as regexp indicator:
53         self.mregexp = mregexp
54         if GJfile:
55           if (self.mregexp): printMsg("Json file will be matched as regular expression")
56           else: printMsg("Json file will be matched as exact comparison")
57
58
59         self.consumer = KafkaConsumer(topic,
60                         bootstrap_servers=[self.server],
61                         auto_offset_reset=aor,
62                         enable_auto_commit=True,
63                         value_deserializer=lambda x:
64                         json.loads(x.decode('utf-8')))
65
66     def match(self, json):
67        if self.mregexp:
68          for key, value in  sorted(json.items()):
69            match = re.match(self.json[key], value)
70            if not match: return False
71          return True
72
73        else:
74          return (sorted(self.json.items()) == sorted(json.items()))
75
76     def receive_data(self):
77         printMsg('Reading data from kafka ...')
78
79         for message in self.consumer:
80             # Show read
81             if (self.showReadWhenMatchingEnabled if self.json else True):
82               printMsg('Data read: {}'.format(message))
83
84             # Break condition (in case that a expected json is blocking consumer):
85             if (self.json):
86               #if (sorted(self.json.items()) == sorted(message.value.items())):
87               if (self.match(message.value)):
88                 printMsg('Message retrieved: condition fulfilled !')
89                 return message
90               else:
91                 global GSequence
92                 GSequence += 1
93                 #saved='{}.read.{}.json'.format(os.path.basename(GJfile), GSequence)
94                 saved='{}.read.{}.json'.format(GJfile, GSequence)
95                 with open(saved, 'w') as outfile:
96                   json.dump(message.value, outfile)
97
98                 printMsg('Message retrieved: condition not fulfilled (saved {}) !'.format(saved))
99
100
101     def close (self):
102       self.consumer.close()
103
104
105 def handler_function(signum, frame):
106   rc=0
107   if (GJfile): rc=1
108   _exit('Timeout !', rc)
109
110
111 def parse_arguments():
112
113   parser = ArgumentParser(description='Kafka Consumer Simulator')
114   parser.add_argument('-n', '--node', help='Node name (spaces are removed)', required=True)
115   parser.add_argument('-t', '--topic', help='Kafka topic', required=True)
116   parser.add_argument('-o', '--auto-offset-reset', help='Auto offset reset (earliest, latest, etc.). By default: latest', required=False)
117   parser.add_argument('-f', '--file', help='Json file to consume (consumer will break execution with rc=0)', required=False)
118   parser.add_argument('-m', '--match-regexp', help='Match file as regexp (exact match by default)', required=False, action='store_true')
119   parser.add_argument('-w', '--timeout', help='Timeout (in seconds) for completion. Timeout will break execution with rc=1', required=False)
120   parser.add_argument('-d', '--debug', help='Debug kafka', required=False, action='store_true')
121   parser.add_argument('-r', '--show-read-when-matching-enabled', help='Show read messages. False by default when json file for matching is provided.', required=False, action='store_true')
122
123   arguments = parser.parse_args()
124
125   return arguments
126
127
128 #####################
129 #      M A I N      #
130 #####################
131
132 if __name__ == "__main__":
133
134   arguments = parse_arguments()
135
136   GNode = arguments.node.replace(" ", "")
137   topic = arguments.topic
138   auto_offset_reset = arguments.auto_offset_reset
139   GJfile = arguments.file
140   mregexp = arguments.match_regexp
141   timeout = arguments.timeout
142   debug = arguments.debug
143   show_read_when_matching_enabled = arguments.show_read_when_matching_enabled
144
145   GSequence = 0
146
147   printMsg("Starting")
148
149   if (debug):
150     now = datetime.datetime.now().strftime(timeFormat())
151     logfile='{}-Consumer-{}.log'.format(GNode, now)
152     printMsg("Logging kafka in: '{}'".format(logfile))
153     logging.basicConfig(filename=logfile, format='%(asctime)s - %(message)s', datefmt=timeFormat(), level=logging.DEBUG)
154
155   if (timeout):
156     printMsg("Established timeout for consumer: '{} seconds'".format(timeout))
157     # Signal to terminate after timeout:
158     signal.signal(signal.SIGALRM,handler_function)
159     signal.alarm(int(timeout))
160
161
162   if not GJfile:
163     if (mregexp): printMsg("To match json file as regexp, you must provide the file. See help (-h) !")
164     printMsg("Show messages read (matching is enabled): {}".format(str(show_read_when_matching_enabled)));
165
166   if not auto_offset_reset: auto_offset_reset='latest'
167
168   consumer = None
169   try:
170     consumer = MyKafkaConsumer(topic, mregexp, auto_offset_reset, show_read_when_matching_enabled)
171     consumer.receive_data()
172     consumer.close()
173     _exit()
174
175   except Exception as e:
176     consumer.close()
177     _exit(e, 1)