3 import os, sys, re, datetime, logging, json, signal
5 from kafka import KafkaConsumer
7 from argparse import ArgumentParser
10 def _exit(message = None, rc=0):
11 if (message): printMsg(message)
12 printMsg("Exiting [rc={}]".format(rc))
16 return '%b.%d.%Y-%H.%M.%S.%f'
19 def printMsg(message):
20 print ("[{}] [{}] {}".format(datetime.datetime.now().strftime(timeFormat()), GNode, message))
23 class MyKafkaConsumer:
24 kafka_service = os.environ.get('KAFKA_SERVER')
25 kafka_port = os.environ.get('KAFKA_PORT')
27 if not kafka_service: kafka_service = "localhost"
28 if not kafka_port: kafka_port = "9092"
30 server = kafka_service + ':' + kafka_port
32 def __init__(self, topic, mregexp, aor, showReadWhenMatchingEnabled):
35 printMsg("Auto offset reset: '{}'".format(aor))
38 printMsg("Working topic: '{}'".format(topic))
41 self.showReadWhenMatchingEnabled = showReadWhenMatchingEnabled
46 with open(GJfile) as json_file:
47 self.json = json.load(json_file)
48 printMsg("Established break condition for expected json ('{}'): {}".format(GJfile, self.json))
50 printMsg("No expected json ('any' selected)")
52 # Interpreted as regexp indicator:
53 self.mregexp = mregexp
55 if (self.mregexp): printMsg("Json file will be matched as regular expression")
56 else: printMsg("Json file will be matched as exact comparison")
59 self.consumer = KafkaConsumer(topic,
60 bootstrap_servers=[self.server],
61 auto_offset_reset=aor,
62 enable_auto_commit=True,
63 value_deserializer=lambda x:
64 json.loads(x.decode('utf-8')))
66 def match(self, json):
68 for key, value in sorted(json.items()):
69 match = re.match(self.json[key], value)
70 if not match: return False
74 return (sorted(self.json.items()) == sorted(json.items()))
76 def receive_data(self):
77 printMsg('Reading data from kafka ...')
79 for message in self.consumer:
81 if (self.showReadWhenMatchingEnabled if self.json else True):
82 printMsg('Data read: {}'.format(message))
84 # Break condition (in case that a expected json is blocking consumer):
86 #if (sorted(self.json.items()) == sorted(message.value.items())):
87 if (self.match(message.value)):
88 printMsg('Message retrieved: condition fulfilled !')
93 #saved='{}.read.{}.json'.format(os.path.basename(GJfile), GSequence)
94 saved='{}.read.{}.json'.format(GJfile, GSequence)
95 with open(saved, 'w') as outfile:
96 json.dump(message.value, outfile)
98 printMsg('Message retrieved: condition not fulfilled (saved {}) !'.format(saved))
102 self.consumer.close()
105 def handler_function(signum, frame):
108 _exit('Timeout !', rc)
111 def parse_arguments():
113 parser = ArgumentParser(description='Kafka Consumer Simulator')
114 parser.add_argument('-n', '--node', help='Node name (spaces are removed)', required=True)
115 parser.add_argument('-t', '--topic', help='Kafka topic', required=True)
116 parser.add_argument('-o', '--auto-offset-reset', help='Auto offset reset (earliest, latest, etc.). By default: latest', required=False)
117 parser.add_argument('-f', '--file', help='Json file to consume (consumer will break execution with rc=0)', required=False)
118 parser.add_argument('-m', '--match-regexp', help='Match file as regexp (exact match by default)', required=False, action='store_true')
119 parser.add_argument('-w', '--timeout', help='Timeout (in seconds) for completion. Timeout will break execution with rc=1', required=False)
120 parser.add_argument('-d', '--debug', help='Debug kafka', required=False, action='store_true')
121 parser.add_argument('-r', '--show-read-when-matching-enabled', help='Show read messages. False by default when json file for matching is provided.', required=False, action='store_true')
123 arguments = parser.parse_args()
128 #####################
130 #####################
132 if __name__ == "__main__":
134 arguments = parse_arguments()
136 GNode = arguments.node.replace(" ", "")
137 topic = arguments.topic
138 auto_offset_reset = arguments.auto_offset_reset
139 GJfile = arguments.file
140 mregexp = arguments.match_regexp
141 timeout = arguments.timeout
142 debug = arguments.debug
143 show_read_when_matching_enabled = arguments.show_read_when_matching_enabled
150 now = datetime.datetime.now().strftime(timeFormat())
151 logfile='{}-Consumer-{}.log'.format(GNode, now)
152 printMsg("Logging kafka in: '{}'".format(logfile))
153 logging.basicConfig(filename=logfile, format='%(asctime)s - %(message)s', datefmt=timeFormat(), level=logging.DEBUG)
156 printMsg("Established timeout for consumer: '{} seconds'".format(timeout))
157 # Signal to terminate after timeout:
158 signal.signal(signal.SIGALRM,handler_function)
159 signal.alarm(int(timeout))
163 if (mregexp): printMsg("To match json file as regexp, you must provide the file. See help (-h) !")
164 printMsg("Show messages read (matching is enabled): {}".format(str(show_read_when_matching_enabled)));
166 if not auto_offset_reset: auto_offset_reset='latest'
170 consumer = MyKafkaConsumer(topic, mregexp, auto_offset_reset, show_read_when_matching_enabled)
171 consumer.receive_data()
175 except Exception as e: