3 import os, sys, datetime, logging
5 from kafka.admin import KafkaAdminClient
6 from argparse import ArgumentParser
8 from kafka.errors import (UnknownError, KafkaConnectionError, FailedPayloadsError,
9 KafkaTimeoutError, KafkaUnavailableError,
10 LeaderNotAvailableError, UnknownTopicOrPartitionError,
11 NotLeaderForPartitionError, ReplicaNotAvailableError)
14 def _exit(message = None, rc=0):
15 if (message): printMsg(message)
16 printMsg("Exiting [rc={}]".format(rc))
21 return '%b.%d.%Y-%H.%M.%S.%f'
24 def printMsg(message):
25 print ("[{}] [{}] {}".format(datetime.datetime.now().strftime(timeFormat()), GNode, message))
29 kafka_service = os.environ.get('KAFKA_SERVER')
30 kafka_port = os.environ.get('KAFKA_PORT')
32 if not kafka_service: kafka_service = "localhost"
33 if not kafka_port: kafka_port = "9092"
35 server = kafka_service + ':' + kafka_port
37 def __init__(self, topic, rtopic):
40 printMsg("Working topic: '{}'".format(topic))
42 # Action 'remove topic':
43 printMsg("Remove topic from message bus: {}".format(str(rtopic)))
46 admin = KafkaAdminClient(bootstrap_servers=self.server)
48 admin.delete_topics([topic])
49 except UnknownTopicOrPartitionError as e:
50 printMsg("UnknownTopicOrPartitionError => missing messages for topic (removal considered successful)")
53 def parse_arguments():
55 parser = ArgumentParser(description='Kafka Manager')
56 parser.add_argument('-n', '--node', help='Node name (spaces are removed)', required=True)
57 parser.add_argument('-t', '--topic', help='Kafka topic', required=True)
58 parser.add_argument('-r', '--remove-topic', help='Remove topic from message bus', required=False, action='store_true')
59 parser.add_argument('-d', '--debug', help='Debug kafka', required=False, action='store_true')
61 arguments = parser.parse_args()
70 if __name__ == "__main__":
72 arguments = parse_arguments()
74 GNode = arguments.node.replace(" ", "")
75 topic = arguments.topic
76 rtopic = arguments.remove_topic
77 debug = arguments.debug
82 now = datetime.datetime.now().strftime(timeFormat())
83 logfile='{}-Admin-{}.log'.format(GNode, now)
84 printMsg("Logging kafka in: '{}'".format(logfile))
85 logging.basicConfig(filename=logfile, format='%(asctime)s - %(message)s', datefmt=timeFormat(), level=logging.DEBUG)
88 admin = MyKafkaAdmin(topic, rtopic)
91 except Exception as e: