3 import os, sys, datetime, logging, json, time
5 from kafka import KafkaProducer
7 from argparse import ArgumentParser
10 def _exit(message = None, rc=0):
11 if (message): printMsg(message)
12 printMsg("Exiting [rc={}]".format(rc))
17 return '%b.%d.%Y-%H.%M.%S.%f'
20 def printMsg(message):
21 print ("[{}] [{}] {}".format(datetime.datetime.now().strftime(timeFormat()), GNode, message))
24 class MyKafkaProducer:
25 kafka_service = os.environ.get('KAFKA_SERVER')
26 kafka_port = os.environ.get('KAFKA_PORT')
28 if not kafka_service: kafka_service = "localhost"
29 if not kafka_port: kafka_port = "9092"
31 server = kafka_service + ':' + kafka_port
34 self.producer = KafkaProducer(bootstrap_servers=[self.server],
35 value_serializer=lambda x:
36 json.dumps(x).encode('utf-8'))
38 def send_data(self, topic, data):
39 json = self.__read_json_data(data)
40 self.producer.send(topic, value=json)
42 printMsg("Json produced on topic '{}': {}".format(topic, json))
44 def __read_json_data(self,jsonData):
45 with open(jsonData,'r') as schema_file:
46 schema_data = schema_file.read()
47 schema = json.loads(schema_data)
50 def set_producer_topic(self, kafka_topic):
51 self.kafka_topic = kafka_topic
58 producer = MyKafkaProducer()
60 def send_data(self, topic, data):
61 self.producer.send_data(topic, data)
67 def parse_arguments():
69 parser = ArgumentParser(description='Kafka Producer Simulator')
70 parser.add_argument('-n', '--node', help='Node name (spaces are removed)', required=True)
71 parser.add_argument('-t', '--topic', help='Kafka topic', required=True)
72 parser.add_argument('-w', '--delay', help='Delay (in milliseconds) before producing', required=False)
73 parser.add_argument('-f', '--file', help='Json file to produce', required=True)
74 parser.add_argument('-d', '--debug', help='Debug kafka', required=False, action='store_true')
76 arguments = parser.parse_args()
85 if __name__ == "__main__":
87 arguments = parse_arguments()
89 GNode = arguments.node.replace(" ", "")
90 topic = arguments.topic
91 delay = arguments.delay
92 jfile = arguments.file
93 debug = arguments.debug
98 now = datetime.datetime.now().strftime(timeFormat())
99 logfile='{}-Producer-{}.log'.format(GNode, now)
100 printMsg("Logging kafka in: '{}'".format(logfile))
101 logging.basicConfig(filename=logfile, format='%(asctime)s - %(message)s', datefmt=timeFormat(), level=logging.DEBUG)
103 if not os.path.exists(jfile):
104 _exit("Invalid provided file '{}'".format(jfile), 1)
110 _exit("Invalid provided delay '{}'. Must be an integer value !".format(str(delay)))
112 printMsg("Delay before producing: {} milliseconds ...".format(delay))
113 time.sleep(delay / 1000)
114 printMsg("Delay completed !".format(delay))
118 producer = MyProducer()
119 producer.send_data(topic, jfile)
123 except Exception as e: