Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Miryam-Schwartz committed Nov 17, 2024
1 parent b17c119 commit 0ccdea3
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 93 deletions.
35 changes: 3 additions & 32 deletions plugins/fluentd_telemetry_plugin/src/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
# pylint: disable=no-name-in-module,import-error
from utils.utils import Utils
from utils.args_parser import ArgsParser
from utils.logger import Logger, LOG_LEVELS
from utils.logger import Logger
from utils.singleton import Singleton

#pylint: disable=too-many-instance-attributes
Expand All @@ -54,7 +54,8 @@ def __init__(self, conf_parser):
self.telem_parser = TelemetryParser(self.config_parser, self.streaming_metrics_mgr,
self.last_streamed_data_sample_per_endpoint,
self.attributes_mngr)
self.init_streaming_attributes()
self.attributes_mngr.init_streaming_attributes(self.telem_parser,
self.ufm_telemetry_endpoints, self.config_parser)

@property
def ufm_telemetry_host(self):
Expand Down Expand Up @@ -162,36 +163,6 @@ def fluent_sender(self):
use_c=_use_c)
return self._fluent_sender

def init_streaming_attributes(self): # pylint: disable=too-many-locals
Logger.log_message('Updating The streaming attributes', LOG_LEVELS.DEBUG)
# load the saved attributes
self.attributes_mngr.get_saved_streaming_attributes()
telemetry_endpoints = self.ufm_telemetry_endpoints
processed_endpoints = {}
for endpoint in telemetry_endpoints: # pylint: disable=too-many-nested-blocks
_host = endpoint.get(self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_HOST)
_port = endpoint.get(self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_PORT)
_url = endpoint.get(self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_URL)
_msg_tag = endpoint.get(self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_MSG_TAG_NAME)
# the ID of the endpoint is the full URL without filters like the shading,etc...
endpoint_id = f'{_host}:{_port}:{_url.split("?")[0]}'
is_processed = processed_endpoints.get(endpoint_id)
if not is_processed:
telemetry_data = self.telem_parser.get_metrics(_host, _port, _url, _msg_tag)
if telemetry_data:

# CSV format
rows = telemetry_data.split("\n")
if len(rows):
headers = rows[0].split(",")
for attribute in headers:
self.attributes_mngr.add_streaming_attribute(attribute)

processed_endpoints[endpoint_id] = True
# update the streaming attributes files
self.attributes_mngr.update_saved_streaming_attributes()
Logger.log_message('The streaming attributes were updated successfully')


def _stream_data_to_fluentd(self, data_to_stream, fluentd_msg_tag=''):
logging.info('Streaming to Fluentd IP: %s port: %s timeout: %s',
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# pylint: disable=no-name-in-module,import-error
import logging

# pylint: disable=no-name-in-module,import-error
from utils.config_parser import ConfigParser

class UFMTelemetryStreamingConfigParser(ConfigParser):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
@author: Miryam Schwartz
@date: Nov 13, 2024
"""
# pylint: disable=no-name-in-module,import-error
import os

# pylint: disable=no-name-in-module,import-error
from utils.utils import Utils
from utils.logger import Logger, LOG_LEVELS

class TelemetryAttributesManager:
""""
Expand All @@ -27,7 +29,7 @@ def __init__(self):
self.streaming_attributes_file = "/config/tfs_streaming_attributes.json" # this path on the docker
self.streaming_attributes = {}

def get_saved_streaming_attributes(self):
def _get_saved_streaming_attributes(self):
attr = {}
if os.path.exists(self.streaming_attributes_file):
attr = Utils.read_json_from_file(self.streaming_attributes_file)
Expand All @@ -37,7 +39,7 @@ def get_saved_streaming_attributes(self):
def update_saved_streaming_attributes(self):
Utils.write_json_to_file(self.streaming_attributes_file, self.streaming_attributes)

def add_streaming_attribute(self, attribute):
def _add_streaming_attribute(self, attribute):
if self.streaming_attributes.get(attribute, None) is None:
# if the attribute is new and wasn't set before --> set default values for the new attribute
self.streaming_attributes[attribute] = {
Expand All @@ -47,3 +49,33 @@ def add_streaming_attribute(self, attribute):

def get_attr_obj(self, key):
return self.streaming_attributes.get(key)


def init_streaming_attributes(self, telemetry_parser, telemetry_endpoints, config_parser): # pylint: disable=too-many-locals
Logger.log_message('Updating The streaming attributes', LOG_LEVELS.DEBUG)
# load the saved attributes
self._get_saved_streaming_attributes()
processed_endpoints = {}
for endpoint in telemetry_endpoints: # pylint: disable=too-many-nested-blocks
_host = endpoint.get(config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_HOST)
_port = endpoint.get(config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_PORT)
_url = endpoint.get(config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_URL)
_msg_tag = endpoint.get(config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_MSG_TAG_NAME)
# the ID of the endpoint is the full URL without filters like the shading,etc...
endpoint_id = f'{_host}:{_port}:{_url.split("?")[0]}'
is_processed = processed_endpoints.get(endpoint_id)
if not is_processed:
telemetry_data = telemetry_parser.get_metrics(_host, _port, _url, _msg_tag)
if telemetry_data:

# CSV format
rows = telemetry_data.split("\n")
if len(rows):
headers = rows[0].split(",")
for attribute in headers:
self._add_streaming_attribute(attribute)

processed_endpoints[endpoint_id] = True
# update the streaming attributes files
self.update_saved_streaming_attributes()
Logger.log_message('The streaming attributes were updated successfully')
57 changes: 0 additions & 57 deletions plugins/fluentd_telemetry_plugin/src/telemetry_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,62 +18,5 @@ class UFMTelemetryConstants:
"""UFMTelemetryConstants Class"""

PLUGIN_NAME = "UFM_Telemetry_Streaming"

args_list = [
{
"name": '--ufm_telemetry_host',
"help": "Host or IP of UFM Telemetry endpoint"
},{
"name": '--ufm_telemetry_port',
"help": "Port of UFM Telemetry endpoint"
},{
"name": '--ufm_telemetry_url',
"help": "URL of UFM Telemetry endpoint"
},{
"name": '--ufm_telemetry_xdr_mode',
"help": "Telemetry XDR mode flag, "
"i.e., if True, the enabled ports types in `xdr_ports_types` "
"will be collected from the telemetry and streamed to fluentd"
},{
"name": '--ufm_telemetry_xdr_ports_types',
"help": "Telemetry XDR ports types, "
"i.e., List of XDR ports types that should be collected and streamed, "
"separated by `;`. For example legacy;aggregated;plane"
},{
"name": '--streaming_interval',
"help": "Interval for telemetry streaming in seconds"
},{
"name": '--bulk_streaming',
"help": "Bulk streaming flag, i.e. if True all telemetry rows will be streamed in one message; "
"otherwise, each row will be streamed in a separated message"
},{
"name": '--compressed_streaming',
"help": "Compressed streaming flag, i.e. if True the streamed data will be sent gzipped json; "
"otherwise, will be sent plain text as json"
},{
"name": '--c_fluent_streamer',
"help": "C Fluent Streamer flag, i.e. if True the C fluent streamer will be used; "
"otherwise, the native python streamer will be used"
},{
"name": '--enable_streaming',
"help": "If true, the streaming will be started once the required configurations have been set"
},{
"name": '--stream_only_new_samples',
"help": "If True, the data will be streamed only in case new samples were pulled from the telemetry"
},{
"name": '--fluentd_host',
"help": "Host name or IP of fluentd endpoint"
},{
"name": '--fluentd_port',
"help": "Port of fluentd endpoint"
},{
"name": '--fluentd_timeout',
"help": "Fluentd timeout in seconds"
},{
"name": '--fluentd_message_tag_name',
"help": "Tag name of fluentd endpoint message"
}
]

CSV_LINE_SEPARATOR = "\n"
CSV_ROW_ATTRS_SEPARATOR = ","

0 comments on commit 0ccdea3

Please sign in to comment.