From 21154232c76b2eb27b16251b7444dca66dfc9e36 Mon Sep 17 00:00:00 2001 From: antoniochavesgarcia Date: Fri, 9 Feb 2024 13:35:09 +0100 Subject: [PATCH] feat! (model training & inference): Added support to Telegraf-formated JSONs via String data type). --- .../tensorflow/README.md | 2 +- .../tensorflow/decoders.py | 15 ++++++++------- model_inference/pytorch/decoders.py | 15 ++++++++------- model_inference/tensorflow/README.md | 2 +- model_inference/tensorflow/decoders.py | 17 ++++++++++------- model_training/tensorflow/README.md | 2 +- model_training/tensorflow/decoders.py | 15 ++++++++------- 7 files changed, 37 insertions(+), 31 deletions(-) diff --git a/federated-module/federated_model_training/tensorflow/README.md b/federated-module/federated_model_training/tensorflow/README.md index e81f6f7..ef59bf9 100644 --- a/federated-module/federated_model_training/tensorflow/README.md +++ b/federated-module/federated_model_training/tensorflow/README.md @@ -7,7 +7,7 @@ A brief introduction of its files: - File `KafkaModelEngine.py` class contains all the necesary functions to read and load Tensorflow models from Apache Kafka in cloud. - File `FederatedKafkaMLAggregationSink.py` class contains all the necesary functions to send the trained model and metrics to Apache Kafka in cloud. - File `federated_mainTraining.py` and `federated_singleClassicTraining.py` contains the functions for each kind of federated training. -- File `decoders.py` decoders (RAW, Avro, JSON) used to decode data streams. +- File `decoders.py` decoders (RAW, Avro, JSON, TELEGRAF_STR_JSON) used to decode data streams. - File `config.py` to configure debug. - File `utils.py` common functions used by other files. diff --git a/federated-module/federated_model_training/tensorflow/decoders.py b/federated-module/federated_model_training/tensorflow/decoders.py index 43d1a1b..03b2777 100644 --- a/federated-module/federated_model_training/tensorflow/decoders.py +++ b/federated-module/federated_model_training/tensorflow/decoders.py @@ -70,12 +70,13 @@ def decode(self, x, y): return (res_x, res_y) class JsonDecoder: - """JSON class decoder implementation - ARGS: - configuration (dic): configuration properties - Attributes: - scheme(str): scheme of the JSON implementation + """JSON class decoder implementation""" - """ def decode(self, x): - return json.loads(x) \ No newline at end of file + return json.loads(x) + +class TelegrafStringJsonDecoder: + """TELEGRAF_STR_JSON class decoder implementation""" + + def decode(self, x): + return json.loads(json.loads(x)["fields"]["value"]) \ No newline at end of file diff --git a/model_inference/pytorch/decoders.py b/model_inference/pytorch/decoders.py index b835a8f..d83a08b 100644 --- a/model_inference/pytorch/decoders.py +++ b/model_inference/pytorch/decoders.py @@ -69,12 +69,13 @@ def decode(self, x, y): return res class JsonDecoder: - """JSON class decoder implementation - ARGS: - configuration (dic): configuration properties - Attributes: - scheme(str): scheme of the JSON implementation + """JSON class decoder implementation""" - """ def decode(self, x): - return json.loads(x) \ No newline at end of file + return json.loads(x) + +class TelegrafStringJsonDecoder: + """TELEGRAF_STR_JSON class decoder implementation""" + + def decode(self, x): + return json.loads(json.loads(x)["fields"]["value"]) \ No newline at end of file diff --git a/model_inference/tensorflow/README.md b/model_inference/tensorflow/README.md index d947f9d..24c318e 100644 --- a/model_inference/tensorflow/README.md +++ b/model_inference/tensorflow/README.md @@ -4,7 +4,7 @@ This module contains the inference task that will be executed when a TensorFlow A brief introduction of its files: - File `inference.py` main file of this module that will be executed when executed the inference Job. -- File `decoders.py` decoders (RAW, Avro) used to decode data streams. +- File `decoders.py` decoders (RAW, Avro, JSON, TELEGRAF_STR_JSON) used to decode data streams. - File `config.py` to configure debug. - File `utils.py` common functions used by other files. diff --git a/model_inference/tensorflow/decoders.py b/model_inference/tensorflow/decoders.py index ce5ab30..84e7fb7 100644 --- a/model_inference/tensorflow/decoders.py +++ b/model_inference/tensorflow/decoders.py @@ -15,6 +15,8 @@ def get_decoder(input_format, configuration): return AvroDecoder(configuration) elif input_format == 'JSON': return JsonDecoder() + elif input_format == 'TELEGRAF_STR_JSON': + return TelegrafStringJsonDecoder() else: raise ValueError(input_format) @@ -57,12 +59,13 @@ def decode(self, msg): return res class JsonDecoder: - """JSON class decoder implementation - ARGS: - configuration (dic): configuration properties - Attributes: - scheme(str): scheme of the JSON implementation + """JSON class decoder implementation""" - """ def decode(self, x): - return json.loads(x) \ No newline at end of file + return json.loads(x) + +class TelegrafStringJsonDecoder: + """TELEGRAF_STR_JSON class decoder implementation""" + + def decode(self, x): + return json.loads(json.loads(x)["fields"]["value"]) \ No newline at end of file diff --git a/model_training/tensorflow/README.md b/model_training/tensorflow/README.md index a8853af..1d0972d 100644 --- a/model_training/tensorflow/README.md +++ b/model_training/tensorflow/README.md @@ -4,7 +4,7 @@ This module contains the training task that will be executed when a TensorFlow t A brief introduction of its files: - File `training.py` main file of this module that will be executed when executed the training Job. -- File `decoders.py` decoders (RAW, Avro) used to decode data streams. +- File `decoders.py` decoders (RAW, Avro, JSON, TELEGRAF_STR_JSON) used to decode data streams. - File `config.py` to configure debug. - File `utils.py` common functions used by other files. diff --git a/model_training/tensorflow/decoders.py b/model_training/tensorflow/decoders.py index 43d1a1b..03b2777 100644 --- a/model_training/tensorflow/decoders.py +++ b/model_training/tensorflow/decoders.py @@ -70,12 +70,13 @@ def decode(self, x, y): return (res_x, res_y) class JsonDecoder: - """JSON class decoder implementation - ARGS: - configuration (dic): configuration properties - Attributes: - scheme(str): scheme of the JSON implementation + """JSON class decoder implementation""" - """ def decode(self, x): - return json.loads(x) \ No newline at end of file + return json.loads(x) + +class TelegrafStringJsonDecoder: + """TELEGRAF_STR_JSON class decoder implementation""" + + def decode(self, x): + return json.loads(json.loads(x)["fields"]["value"]) \ No newline at end of file