Deleting to swap chapter 13 and 14
This commit is contained in:
parent
80c742c923
commit
89b1e299ec
@ -1,25 +0,0 @@
|
||||
from kafka import KafkaConsumer
|
||||
import json, sys
|
||||
host_id = '10.52.90.101:9092'
|
||||
topic_id = 'NSP-EQUIPMENT'
|
||||
consumer = KafkaConsumer(bootstrap_servers=['10.52.90.101:9092'],
|
||||
auto_offset_reset='earliest',
|
||||
consumer_timeout_ms=1000,
|
||||
api_version=(0, 10, 1))
|
||||
consumer.subscribe([topic_id])
|
||||
print(consumer)
|
||||
try:
|
||||
for message in consumer:
|
||||
print(message)
|
||||
if message is None:
|
||||
continue
|
||||
else:
|
||||
msg = json.loads(message.value)
|
||||
print(json.dumps(msg, indent=4, sort_keys=True))
|
||||
|
||||
|
||||
except KeyboardInterrupt:
|
||||
sys.stderr.write('++++++ Aborted by user ++++++++\n')
|
||||
|
||||
finally:
|
||||
consumer.close()
|
||||
@ -1,15 +0,0 @@
|
||||
import requests
|
||||
|
||||
token = 'VEtOLWFkbWluNjg0NGE2YjQtNjIwMy00NTEwLWI2YzItMjc1MGU1MDFkZmNm'
|
||||
|
||||
NSP_KAFKA_API = url = "https://10.52.90.101:8544/nbi-notification/api/v1/notifications/subscriptions"
|
||||
|
||||
def get_subscription():
|
||||
headers = {'Authorization': 'Bearer {}'.format(token) }
|
||||
|
||||
response = requests.request("GET", url, data={}, headers=headers, verify=False)
|
||||
print(response.text)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
get_subscription()
|
||||
@ -1,85 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
import threading, logging, time
|
||||
import multiprocessing
|
||||
import sys
|
||||
from kafka import KafkaConsumer
|
||||
|
||||
"""Collect command-line options in a dictionary"""
|
||||
|
||||
def getopts(argv):
|
||||
opts = {} # Empty dictionary to store key-value pairs.
|
||||
while argv: # While there are arguments left to parse...
|
||||
if argv[0][0] == '-': # Found a "-name value" pair.
|
||||
opts[argv[0]] = argv[1] # Add key and value to the dictionary.
|
||||
argv = argv[1:] # Reduce the argument list by copying it starting from index 1.
|
||||
return opts
|
||||
|
||||
class Consumer(multiprocessing.Process):
|
||||
host_id = ''
|
||||
topic_id = ''
|
||||
def __init__(self, host, topic):
|
||||
self.host_id = host + ''
|
||||
self.topic_id = topic
|
||||
multiprocessing.Process.__init__(self)
|
||||
self.stop_event = multiprocessing.Event()
|
||||
|
||||
def stop(self):
|
||||
self.stop_event.set()
|
||||
|
||||
def run(self):
|
||||
consumer = KafkaConsumer(bootstrap_servers=self.host_id,
|
||||
auto_offset_reset='earliest',
|
||||
consumer_timeout_ms=1000,
|
||||
api_version=(0, 8, 0))
|
||||
consumer.subscribe([self.topic_id])
|
||||
|
||||
while not self.stop_event.is_set():
|
||||
for message in consumer:
|
||||
print(message)
|
||||
print("---------------------------------------")
|
||||
if self.stop_event.is_set():
|
||||
break
|
||||
|
||||
consumer.close()
|
||||
|
||||
|
||||
def main():
|
||||
from sys import argv
|
||||
myargs = getopts(argv)
|
||||
HOST_IP=''
|
||||
TOPIC=''
|
||||
|
||||
if '-host' in myargs: # Example usage.
|
||||
HOST_IP = myargs['-host']
|
||||
else:
|
||||
print ('please use as follows python kafka_test_consumer.py -host HostIP -topic TOPIC')
|
||||
|
||||
if '-topic' in myargs: # Example usage.
|
||||
TOPIC = myargs['-topic']
|
||||
else:
|
||||
print ('please use as follows python kafka_test_consumer.py -host HostIP -topic TOPIC')
|
||||
print(myargs)
|
||||
|
||||
tasks = [
|
||||
# Producer(),
|
||||
Consumer(HOST_IP,TOPIC)
|
||||
]
|
||||
|
||||
for t in tasks:
|
||||
t.start()
|
||||
|
||||
# time.sleep(10)
|
||||
|
||||
# for task in tasks:
|
||||
# task.stop()
|
||||
|
||||
for task in tasks:
|
||||
task.join()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(
|
||||
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
|
||||
level=logging.INFO
|
||||
)
|
||||
main()
|
||||
@ -1,31 +0,0 @@
|
||||
import requests
|
||||
|
||||
token = 'VEtOLWFkbWluMDE5NDdhZWUtMjRjZi00ZjEwLTg5MDItYzBhNTdjOGU2NGE2'
|
||||
|
||||
url = "https://10.52.90.101:8544/nbi-notification/api/v1/notifications/subscriptions"
|
||||
|
||||
def create_subscription(category):
|
||||
headers = {'Authorization': 'Bearer {}'.format(token) }
|
||||
subscription = {
|
||||
"categories": [
|
||||
{
|
||||
"name": "{}".format(category)
|
||||
}
|
||||
]
|
||||
}
|
||||
#print(subscription)
|
||||
response = requests.request("POST", url, json=subscription,
|
||||
headers=headers, verify=False)
|
||||
print(response.text)
|
||||
|
||||
subscriptionId = response.json()["response"]["data"]["subscriptionId"]
|
||||
topicId = response.json()["response"]["data"]["topicId"]
|
||||
timeOfSubscription = response.json()["response"]["data"]["timeOfSubscription"]
|
||||
expiresAt = response.json()["response"]["data"]["expiresAt"]
|
||||
sub_data = [subscriptionId,topicId,timeOfSubscription,expiresAt]
|
||||
print(sub_data)
|
||||
return sub_data
|
||||
|
||||
if __name__ == '__main__':
|
||||
#create_subscription("NSP-EQUIPMENT")
|
||||
create_subscription("NSP-PACKET-ALL")
|
||||
Loading…
x
Reference in New Issue
Block a user