diff --git a/plugins/fluentd_telemetry_plugin/conf/fluentd_telemetry_plugin.cfg b/plugins/fluentd_telemetry_plugin/conf/fluentd_telemetry_plugin.cfg index 75601d021..adf206684 100644 --- a/plugins/fluentd_telemetry_plugin/conf/fluentd_telemetry_plugin.cfg +++ b/plugins/fluentd_telemetry_plugin/conf/fluentd_telemetry_plugin.cfg @@ -15,6 +15,7 @@ bulk_streaming = True compressed_streaming = True stream_only_new_samples = True enabled = False +include_meta_data = True [logs-config] logs_file_name = /log/tfs.log diff --git a/plugins/fluentd_telemetry_plugin/src/schemas/set_conf.schema.json b/plugins/fluentd_telemetry_plugin/src/schemas/set_conf.schema.json index 92ab10d18..c4f9ebb5d 100644 --- a/plugins/fluentd_telemetry_plugin/src/schemas/set_conf.schema.json +++ b/plugins/fluentd_telemetry_plugin/src/schemas/set_conf.schema.json @@ -100,6 +100,9 @@ }, "enabled": { "type": "boolean" + }, + "include_meta_data": { + "type": "boolean" } } }, diff --git a/plugins/fluentd_telemetry_plugin/src/streamer.py b/plugins/fluentd_telemetry_plugin/src/streamer.py index e18500f1c..70ee4d2de 100644 --- a/plugins/fluentd_telemetry_plugin/src/streamer.py +++ b/plugins/fluentd_telemetry_plugin/src/streamer.py @@ -63,6 +63,10 @@ class UFMTelemetryConstants: },{ "name": '--enable_streaming', "help": "If true, the streaming will be started once the required configurations have been set" + },{ + "name": '--include_meta_data', + "help": "If true, the streaming message will include some meta data fields, " + "like the streaming timestamp and message type" },{ "name": '--stream_only_new_samples', "help": "If True, the data will be streamed only in case new samples were pulled from the telemetry" @@ -105,6 +109,7 @@ class UFMTelemetryStreamingConfigParser(ConfigParser): STREAMING_SECTION_BULK_STREAMING = "bulk_streaming" STREAMING_SECTION_STREAM_ONLY_NEW_SAMPLES = "stream_only_new_samples" STREAMING_SECTION_ENABLED = "enabled" + STREAMING_SECTION_INCLUDE_META_DATA = "include_meta_data" META_FIELDS_SECTION = "meta-fields" @@ -159,6 +164,12 @@ def get_enable_streaming_flag(self): self.STREAMING_SECTION_ENABLED, False) + def get_include_meta_data_flag(self): + return self.safe_get_bool(self.args.include_meta_data, + self.STREAMING_SECTION, + self.STREAMING_SECTION_INCLUDE_META_DATA, + True) + def get_fluentd_host(self): return self.get_config_value(self.args.fluentd_host, self.FLUENTD_ENDPOINT_SECTION, @@ -430,7 +441,7 @@ def _stream_data_to_fluentd(self, data_to_stream, fluentd_msg_tag=''): "timestamp": datetime.datetime.fromtimestamp(int(time.time())).strftime('%Y-%m-%d %H:%M:%S'), "type": "full", "values": data_to_stream - } + } if self.config_parser.get_include_meta_data_flag() else data_to_stream if self.compressed_streaming_flag: compressed = gzip.compress(json.dumps(fluentd_message).encode('utf-8'))