Improvements from anna fork
[anna.git] / example / diameter / launcher / deployments / aots / agents / KAFKA / Admin.py
1 #!/usr/bin/env python
2
3 import os, sys, datetime, logging
4
5 from kafka.admin import KafkaAdminClient
6 from argparse import ArgumentParser
7
8 from kafka.errors import (UnknownError, KafkaConnectionError, FailedPayloadsError,
9                           KafkaTimeoutError, KafkaUnavailableError,
10                           LeaderNotAvailableError, UnknownTopicOrPartitionError,
11                           NotLeaderForPartitionError, ReplicaNotAvailableError)
12
13
14 def _exit(message = None, rc=0):
15   if (message): printMsg(message)
16   printMsg("Exiting [rc={}]".format(rc))
17   sys.exit(rc)
18
19
20 def timeFormat():
21   return '%b.%d.%Y-%H.%M.%S.%f'
22
23
24 def printMsg(message):
25   print ("[{}] [{}] {}".format(datetime.datetime.now().strftime(timeFormat()), GNode, message))
26
27
28 class MyKafkaAdmin:
29     kafka_service = os.environ.get('KAFKA_SERVER')
30     kafka_port = os.environ.get('KAFKA_PORT')
31
32     if not kafka_service: kafka_service = "localhost"
33     if not kafka_port: kafka_port = "9092"
34
35     server = kafka_service + ':' + kafka_port
36
37     def __init__(self, topic, rtopic):
38
39       # Topic:
40       printMsg("Working topic: '{}'".format(topic))
41
42       # Action 'remove topic':
43       printMsg("Remove topic from message bus: {}".format(str(rtopic)))
44
45       if (rtopic):
46         admin = KafkaAdminClient(bootstrap_servers=self.server)
47         try:
48           admin.delete_topics([topic])
49         except UnknownTopicOrPartitionError as e:
50           printMsg("UnknownTopicOrPartitionError => missing messages for topic (removal considered successful)")
51
52
53 def parse_arguments():
54
55   parser = ArgumentParser(description='Kafka Manager')
56   parser.add_argument('-n', '--node', help='Node name (spaces are removed)', required=True)
57   parser.add_argument('-t', '--topic', help='Kafka topic', required=True)
58   parser.add_argument('-r', '--remove-topic', help='Remove topic from message bus', required=False, action='store_true')
59   parser.add_argument('-d', '--debug', help='Debug kafka', required=False, action='store_true')
60
61   arguments = parser.parse_args()
62
63   return arguments
64
65
66 #####################
67 #      M A I N      #
68 #####################
69
70 if __name__ == "__main__":
71
72   arguments = parse_arguments()
73
74   GNode = arguments.node.replace(" ", "")
75   topic = arguments.topic
76   rtopic = arguments.remove_topic
77   debug = arguments.debug
78
79   printMsg("Starting")
80
81   if (debug):
82     now = datetime.datetime.now().strftime(timeFormat())
83     logfile='{}-Admin-{}.log'.format(GNode, now)
84     printMsg("Logging kafka in: '{}'".format(logfile))
85     logging.basicConfig(filename=logfile, format='%(asctime)s - %(message)s', datefmt=timeFormat(), level=logging.DEBUG)
86
87   try:
88     admin = MyKafkaAdmin(topic, rtopic)
89     _exit()
90
91   except Exception as e:
92     _exit(e, 1)