diff --git a/src/pybennu/pybennu/executables/groundtruth_monitor.py b/src/pybennu/pybennu/executables/groundtruth_monitor.py index efe4fb1..2e77262 100644 --- a/src/pybennu/pybennu/executables/groundtruth_monitor.py +++ b/src/pybennu/pybennu/executables/groundtruth_monitor.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python3 + """ A monitor that ingests messages published by a provider into elastic This script will run on an ELK box as a daemon. It subscribes to a provider @@ -18,7 +20,6 @@ """ import os -import json import argparse import signal import sys @@ -26,8 +27,7 @@ from datetime import datetime from configparser import ConfigParser, NoOptionError -from elasticsearch import Elasticsearch -from elasticsearch import helpers +from elasticsearch import Elasticsearch, helpers from pybennu.distributed.subscriber import Subscriber from pybennu.providers.utils.daemon import Daemon @@ -71,7 +71,7 @@ def __init__(self, args): E.Endpoint_str_set(self.publish_endpoint, 'udp://239.0.0.1:40000') elastic_ip = '127.0.0.1' self._filter = '/' - + if 'power-groundtruth-monitor' in config.sections(): try: E.Endpoint_str_set(self.publish_endpoint, config.get('power-groundtruth-monitor', @@ -84,7 +84,7 @@ def __init__(self, args): except NoOptionError: print("WARNING: elasticsearch-ip not found in the configuration file." " Using default: {}".format(elastic_ip)) - try: + try: self._filter = config.get('power-groundtruth-monitor', 'filter').strip() except NoOptionError: print("WARNING: filter not found in the configuration file." @@ -102,7 +102,7 @@ def __init__(self, args): except NoOptionError: print("WARNING: elasticsearch-ip not found in the configuration file." " Using default: {}".format(elastic_ip)) - try: + try: self._filter = config.get('groundtruth-monitor', 'filter').strip() except NoOptionError: print("WARNING: filter not found in the configuration file." @@ -201,7 +201,7 @@ def __parse_filter(self): self.fields.append('') self.devices.append('') self.providers.append('') - + def __filter_message(self, message_dict): in_filter = False for i in range(len(self.providers)): @@ -216,7 +216,7 @@ def push_updates_to_elastic(self): messages = list(self.new_data) self.new_data = [] index = 'bennu-' + datetime.now().strftime('%Y.%m.%d') - actions = [{'_index': index, 'type': 'doc', '_source': message} for message in messages] + actions = [{'_index': index, 'type': 'doc', '_source': message} for message in messages] try: res = helpers.bulk(self.__es, actions, request_timeout=30) except: @@ -256,7 +256,7 @@ def main(): args = parser.parse_args() handle = GroundTruthMonitor(args) - handle.handler = handle._subscription_handler + handle.subscription_handler = handle._subscription_handler if args.operation == 'start': if args.daemonize: handle.start() diff --git a/src/pybennu/pybennu/executables/pybennu_probe.py b/src/pybennu/pybennu/executables/pybennu_probe.py index d7eee13..8fd68c9 100644 --- a/src/pybennu/pybennu/executables/pybennu_probe.py +++ b/src/pybennu/pybennu/executables/pybennu_probe.py @@ -57,7 +57,7 @@ def main(): endpoint = E.new_Endpoint() E.Endpoint_str_set(endpoint, args.endpoint) probe = client.Client(endpoint) - probe.handler = handler + probe.reply_handler = handler msg = "" if args.command == 'query':