From 1fa971b1d81a9f41170e11125b4e42e07d3a609d Mon Sep 17 00:00:00 2001 From: Stefano Simonelli Date: Mon, 7 Oct 2019 09:31:51 +0000 Subject: [PATCH 1/5] FOGL-3168: c++ north plugin renamed / python north plugin removed Signed-off-by: Stefano Simonelli --- .../north/{ocs_V2 => ocs}/CMakeLists.txt | 2 +- C/plugins/north/{ocs_V2 => ocs}/plugin.cpp | 2 +- .../CMakeLists.txt | 2 +- .../{PI_Server_V2 => pi_server}/plugin.cpp | 4 +- CMakeLists.txt | 4 +- python/foglamp/plugins/north/ocs/README.rst | 6 - python/foglamp/plugins/north/ocs/__init__.py | 0 python/foglamp/plugins/north/ocs/ocs.py | 538 ---------- .../plugins/north/pi_server/README.rst | 6 - .../plugins/north/pi_server/__init__.py | 0 .../plugins/north/pi_server/pi_server.py | 976 ------------------ 11 files changed, 7 insertions(+), 1533 deletions(-) rename C/plugins/north/{ocs_V2 => ocs}/CMakeLists.txt (98%) rename C/plugins/north/{ocs_V2 => ocs}/plugin.cpp (99%) rename C/plugins/north/{PI_Server_V2 => pi_server}/CMakeLists.txt (97%) rename C/plugins/north/{PI_Server_V2 => pi_server}/plugin.cpp (99%) delete mode 100644 python/foglamp/plugins/north/ocs/README.rst delete mode 100644 python/foglamp/plugins/north/ocs/__init__.py delete mode 100644 python/foglamp/plugins/north/ocs/ocs.py delete mode 100644 python/foglamp/plugins/north/pi_server/README.rst delete mode 100644 python/foglamp/plugins/north/pi_server/__init__.py delete mode 100644 python/foglamp/plugins/north/pi_server/pi_server.py diff --git a/C/plugins/north/ocs_V2/CMakeLists.txt b/C/plugins/north/ocs/CMakeLists.txt similarity index 98% rename from C/plugins/north/ocs_V2/CMakeLists.txt rename to C/plugins/north/ocs/CMakeLists.txt index 27897bb12b..ccf17b57d3 100644 --- a/C/plugins/north/ocs_V2/CMakeLists.txt +++ b/C/plugins/north/ocs/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 2.6.0) -project(ocs_V2) +project(ocs) set(CMAKE_CXX_FLAGS "-std=c++11 -O3") # Add here all needed FogLAMP libraries as list diff --git a/C/plugins/north/ocs_V2/plugin.cpp b/C/plugins/north/ocs/plugin.cpp similarity index 99% rename from C/plugins/north/ocs_V2/plugin.cpp rename to C/plugins/north/ocs/plugin.cpp index 10d5d86a32..2fc5897a70 100644 --- a/C/plugins/north/ocs_V2/plugin.cpp +++ b/C/plugins/north/ocs/plugin.cpp @@ -25,7 +25,7 @@ using namespace std; using namespace rapidjson; -#define PLUGIN_NAME "ocs_V2" +#define PLUGIN_NAME "ocs" #define TYPE_ID_KEY "type-id" #define TYPE_ID_DEFAULT 1 diff --git a/C/plugins/north/PI_Server_V2/CMakeLists.txt b/C/plugins/north/pi_server/CMakeLists.txt similarity index 97% rename from C/plugins/north/PI_Server_V2/CMakeLists.txt rename to C/plugins/north/pi_server/CMakeLists.txt index 0274a044d3..90921d7d76 100644 --- a/C/plugins/north/PI_Server_V2/CMakeLists.txt +++ b/C/plugins/north/pi_server/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 2.6.0) -project(PI_Server_V2) +project(pi_server) set(CMAKE_CXX_FLAGS "-std=c++11 -O3") # Add here all needed FogLAMP libraries as list diff --git a/C/plugins/north/PI_Server_V2/plugin.cpp b/C/plugins/north/pi_server/plugin.cpp similarity index 99% rename from C/plugins/north/PI_Server_V2/plugin.cpp rename to C/plugins/north/pi_server/plugin.cpp index f8709fefa8..dd91ad0ab3 100644 --- a/C/plugins/north/PI_Server_V2/plugin.cpp +++ b/C/plugins/north/pi_server/plugin.cpp @@ -34,7 +34,7 @@ using namespace std; using namespace rapidjson; using namespace SimpleWeb; -#define PLUGIN_NAME "PI_Server_V2" +#define PLUGIN_NAME "pi_server" #define TYPE_ID_KEY "type-id" #define SENT_TYPES_KEY "sentDataTypes" #define DATA_KEY "dataTypes" @@ -258,7 +258,7 @@ extern "C" { */ static PLUGIN_INFORMATION info = { PLUGIN_NAME, // Name - "1.0.0", // Version + "1.7.0", // Version SP_PERSIST_DATA, // Flags PLUGIN_TYPE_NORTH, // Type "1.0.0", // Interface version diff --git a/CMakeLists.txt b/CMakeLists.txt index a7f423ea64..35d03ff59d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -22,6 +22,6 @@ add_subdirectory(C/services/filter-plugin-interfaces/python) add_subdirectory(C/services/filter-plugin-interfaces/python/filter_ingest_pymodule) add_subdirectory(C/tasks/north) add_subdirectory(C/plugins/utils) -add_subdirectory(C/plugins/north/PI_Server_V2) -add_subdirectory(C/plugins/north/ocs_V2) +add_subdirectory(C/plugins/north/pi_server) +add_subdirectory(C/plugins/north/ocs) diff --git a/python/foglamp/plugins/north/ocs/README.rst b/python/foglamp/plugins/north/ocs/README.rst deleted file mode 100644 index ff6688b540..0000000000 --- a/python/foglamp/plugins/north/ocs/README.rst +++ /dev/null @@ -1,6 +0,0 @@ -************************ -FogLAMP North OCS Plugin -************************ - -This plugin implements the protocol and format conversion required to talk -to OSIsoft OCS (OSIsoft Cloud Services) using OMF (OSIsoft Message Format). diff --git a/python/foglamp/plugins/north/ocs/__init__.py b/python/foglamp/plugins/north/ocs/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/python/foglamp/plugins/north/ocs/ocs.py b/python/foglamp/plugins/north/ocs/ocs.py deleted file mode 100644 index eb065c7fd4..0000000000 --- a/python/foglamp/plugins/north/ocs/ocs.py +++ /dev/null @@ -1,538 +0,0 @@ -# -*- coding: utf-8 -*- - -# FOGLAMP_BEGIN -# See: http://foglamp.readthedocs.io/ -# FOGLAMP_END - -""" The OCS North is a plugin output formatter for the FogLAMP appliance. - It is loaded by the send process (see The FogLAMP Sending Process) and runs in the context of the send process, - to send the reading data to OSIsoft OCS (OSIsoft Cloud Services) using the OSIsoft OMF format. - PICROMF = PI Connector Relay OMF -""" - -from datetime import datetime -import sys -import copy -import ast -import resource -import datetime -import time -import json -import logging -# noinspection PyPackageRequirements -import foglamp.plugins.north.common.common as plugin_common -import foglamp.plugins.north.common.exceptions as plugin_exceptions -from foglamp.common import logger - -import foglamp.plugins.north.pi_server.pi_server as pi_server - -# Module information -__author__ = "Stefano Simonelli" -__copyright__ = "Copyright (c) 2018 OSIsoft, LLC" -__license__ = "Apache 2.0" -__version__ = "${VERSION}" - - -# LOG configuration -_LOG_LEVEL_DEBUG = 10 -_LOG_LEVEL_INFO = 20 -_LOG_LEVEL_WARNING = 30 - -_LOGGER_LEVEL = _LOG_LEVEL_WARNING -_LOGGER_DESTINATION = logger.SYSLOG -_logger = None - -# Defines what and the level of details for logging -_log_debug_level = 0 -_log_performance = False -_stream_id = None - -_MODULE_NAME = "ocs_north" - -# Messages used for Information, Warning and Error notice -MESSAGES_LIST = { - - # Information messages - "i000000": "information.", - - # Warning / Error messages - "e000000": "general error.", - - "e000001": "the producerToken must be defined, use the FogLAMP API to set a proper value.", - "e000002": "the producerToken cannot be an empty string, use the FogLAMP API to set a proper value.", - - "e000010": "the type-id must be defined, use the FogLAMP API to set a proper value.", - "e000011": "the type-id cannot be an empty string, use the FogLAMP API to set a proper value.", - -} - - -# Configurations retrieved from the Configuration Manager -_config_omf_types = {} -_config = {} - -# Forces the recreation of PIServer objects when the first error occurs -_recreate_omf_objects = True - -# Messages used for Information, Warning and Error notice -_MESSAGES_LIST = { - # Information messages - "i000000": "information.", - # Warning / Error messages - "e000000": "general error.", -} - -# Configuration related to the OMF North -_CONFIG_CATEGORY_DESCRIPTION = 'Configuration of OCS North plugin' - - -# The parameters used for the interaction with OCS are : -# producerToken - It allows to ingest data into OCS using OMF. -# tenant_id / client_id / client_id - They are used for the authentication and interaction with the OCS API, -# they are associated to the specific OCS account. -# namespace - Specifies the OCS namespace where the information are stored, -# it is used for the interaction with the OCS API. -# -_CONFIG_DEFAULT_OMF = { - 'plugin': { - 'description': 'OCS (OSIsoft Cloud Services) North Plugin', - 'type': 'string', - 'default': 'ocs', - 'readonly': 'true' - }, - "URL": { - "description": "The URL of OCS (OSIsoft Cloud Services), TENANT_ID_PLACEHOLDER and NAMESPACE_ID_PLACEHOLDER, if present, will be replaced with the values of tenant_id and namespace parameters ", - "type": "string", - "default": "https://dat-a.osisoft.com/api/tenants/TENANT_ID_PLACEHOLDER/namespaces/NAMESPACE_ID_PLACEHOLDER/omf", - "order": "1", - "displayName": "URL" - }, - "producerToken": { - "description": "The producer token used to authenticate as a valid publisher and " - "required to ingest data into OCS using OMF.", - "type": "string", - "default": "ocs_north_0001", - "order": "2", - "displayName": "Producer Token" - }, - "source": { - "description": "Source of data to be sent on the stream.", - "type": "enumeration", - "default": "readings", - "options": ["readings"], - "order": "3", - "displayName": "Data Source" - }, - "compression": { - "description": "Compress message body", - "type": "boolean", - "default": "false", - "displayName": "Compression" - }, - "StaticData": { - "description": "Static data to include in each sensor reading sent to OMF.", - "type": "JSON", - "default": json.dumps( - { - "Location": "Palo Alto", - "Company": "Dianomic" - } - ), - "order": "4", - "displayName": "Static Data" - }, - "applyFilter": { - "description": "Whether to apply filter before processing the data", - "type": "boolean", - "default": "False", - "order": "5", - "displayName": "Apply Filter" - }, - "filterRule": { - "description": "JQ formatted filter to apply (applicable if applyFilter is True)", - "type": "string", - "default": ".[]", - "order": "6", - "displayName": "Filter Rule" - }, - "OMFRetrySleepTime": { - "description": "Seconds between each retry for the communication with the OMF PI Connector Relay", - "type": "integer", - "default": "1", - "order": "9", - "displayName": "Sleep Time Retry" - }, - "OMFMaxRetry": { - "description": "Max number of retries for the communication with the OMF PI Connector Relay", - "type": "integer", - "default": "5", - "order": "10", - "displayName": "Maximum Retry" - }, - "OMFHttpTimeout": { - "description": "Timeout in seconds for the HTTP operations with the OMF PI Connector Relay", - "type": "integer", - "default": "30", - "order": "13", - "displayName": "HTTP Timeout" - }, - "formatInteger": { - "description": "OMF format property to apply to the type Integer", - "type": "string", - "default": "int64", - "order": "14", - "displayName": "Integer Format" - }, - "formatNumber": { - "description": "OMF format property to apply to the type Number", - "type": "string", - "default": "float64", - "order": "15", - "displayName": "Number Format" - }, - "namespace": { - "description": "Specifies the OCS namespace where the information are stored and " - "it is used for the interaction with the OCS API.", - "type": "string", - "default": "ocs_namespace_0001", - "order": "16", - "displayName": "Namespace" - }, - "tenant_id": { - "description": "Tenant id associated to the specific OCS account.", - "type": "string", - "default": "ocs_tenant_id", - "order": "17", - "displayName": "Tenant ID" - }, - "client_id": { - "description": "Client id associated to the specific OCS account, " - "it is used to authenticate the source for using the OCS API.", - "type": "string", - "default": "ocs_client_id", - "order": "18", - "displayName": "Client ID" - }, - "client_secret": { - "description": "Client secret associated to the specific OCS account, " - "it is used to authenticate the source for using the OCS API.", - "type": "string", - "default": "ocs_client_secret", - "order": "19", - "displayName": "Client Secret" - }, - "notBlockingErrors": { - "description": "These errors are considered not blocking in the communication with the PI Server," - " the sending operation will proceed with the next block of data if one of these is encountered", - "type": "JSON", - "default": json.dumps( - [ - {'id': 400, 'message': 'Invalid value type for the property'}, - {'id': 400, 'message': 'Redefinition of the type with the same ID is not allowed'} - ] - ), - "readonly": "true" - }, -} - -# Configuration related to the OMF Types -_CONFIG_CATEGORY_OMF_TYPES_NAME = 'OCS_TYPES' -_CONFIG_CATEGORY_OMF_TYPES_DESCRIPTION = 'Configuration of OCS types' - -_CONFIG_DEFAULT_OMF_TYPES = pi_server.CONFIG_DEFAULT_OMF_TYPES - -_OMF_TEMPLATE_TYPE = pi_server.OMF_TEMPLATE_TYPE - - -def _performance_log(_function): - """ Logs information for performance measurement """ - - def wrapper(*arg): - """ wrapper """ - - # Avoids any exceptions related to the performance measurement - try: - - start = datetime.datetime.now() - - # Code execution - result = _function(*arg) - - if _log_performance: - - usage = resource.getrusage(resource.RUSAGE_SELF) - memory_process = (usage[2])/1000 - delta = datetime.datetime.now() - start - delta_milliseconds = int(delta.total_seconds() * 1000) - - _logger.info("PERFORMANCE - {0} - milliseconds |{1:>8,}| - memory MB |{2:>8,}|".format( - _function.__name__, - delta_milliseconds, - memory_process)) - - return result - - except Exception as ex: - print("ERROR - {func} - error details |{error}|".format( - func="_performance_log", - error=ex), file=sys.stderr) - raise - - return wrapper - - -def plugin_info(): - """ Returns information about the plugin. - - Args: - Returns: - dict: plugin information - Raises: - """ - - return { - 'name': "OCS North", - 'version': "1.0.0", - 'type': "north", - 'interface': "1.0", - 'config': _CONFIG_DEFAULT_OMF - } - -def _validate_configuration(data): - """ Validates the configuration retrieved from the Configuration Manager - Args: - data: configuration retrieved from the Configuration Manager - Returns: - Raises: - ValueError - """ - - _message = "" - - if 'producerToken' not in data: - _message = MESSAGES_LIST["e000001"] - - else: - if data['producerToken']['value'] == "": - - _message = MESSAGES_LIST["e000002"] - - if _message != "": - _logger.error(_message) - raise ValueError(_message) - - -def _validate_configuration_omf_type(data): - """ Validates the configuration retrieved from the Configuration Manager related to the OMF types - Args: - data: configuration retrieved from the Configuration Manager - Returns: - Raises: - ValueError - """ - - _message = "" - - if 'type-id' not in data: - _message = MESSAGES_LIST["e000010"] - else: - if data['type-id']['value'] == "": - - _message = MESSAGES_LIST["e000011"] - - if _message != "": - _logger.error(_message) - raise ValueError(_message) - -def plugin_init(data): - """ Initializes the OMF plugin for the sending of blocks of readings to the PI Connector. - Args: - Returns: - Raises: - PluginInitializeFailed - """ - - global _config - global _config_omf_types - global _logger - global _recreate_omf_objects - global _log_debug_level, _log_performance, _stream_id - - _log_debug_level = data['debug_level'] - _log_performance = data['log_performance'] - _stream_id = data['stream_id'] - - try: - # note : _module_name is used as __name__ refers to the Sending Process - logger_name = _MODULE_NAME + "_" + str(_stream_id) - - _logger = \ - logger.setup(logger_name, destination=_LOGGER_DESTINATION) if _log_debug_level == 0 else\ - logger.setup( - logger_name, - destination=_LOGGER_DESTINATION, - level=logging.INFO if _log_debug_level == 1 else logging.DEBUG) - - except Exception as ex: - _logger.error("{0} - ERROR - {1}".format( - time.strftime("%Y-%m-%d %H:%M:%S:"), - plugin_common.MESSAGES_LIST["e000012"].format(str(ex)))) - raise ex - _logger.debug("{0} - ".format("plugin_info")) - - _validate_configuration(data) - - # Retrieves the configurations and apply the related conversions - _config['_CONFIG_CATEGORY_NAME'] = data['_CONFIG_CATEGORY_NAME'] - - _config['namespace'] = data['namespace']['value'] - _config['tenant_id'] = data['tenant_id']['value'] - _config['client_id'] = data['client_id']['value'] - _config['client_secret'] = data['client_secret']['value'] - _config['URL'] = data['URL']['value'] - - # Replaces placeholders if the URL doesn't already contain the final address - _config['URL'] = _config['URL'].replace("TENANT_ID_PLACEHOLDER", _config['tenant_id']) - _config['URL'] = _config['URL'].replace("NAMESPACE_ID_PLACEHOLDER", _config['namespace']) - - _config['producerToken'] = data['producerToken']['value'] - _config['OMFMaxRetry'] = int(data['OMFMaxRetry']['value']) - _config['OMFRetrySleepTime'] = int(data['OMFRetrySleepTime']['value']) - _config['OMFHttpTimeout'] = int(data['OMFHttpTimeout']['value']) - _config['StaticData'] = ast.literal_eval(data['StaticData']['value']) - - _config['formatNumber'] = data['formatNumber']['value'] - _config['formatInteger'] = data['formatInteger']['value'] - - _config['notBlockingErrors'] = ast.literal_eval(data['notBlockingErrors']['value']) - - _config['compression'] = data['compression']['value'] - - # TODO: compare instance fetching via inspect vs as param passing - # import inspect - # _config['sending_process_instance'] = inspect.currentframe().f_back.f_locals['self'] - _config['sending_process_instance'] = data['sending_process_instance'] - - # _config_omf_types = json.loads(data['omf_types']['value']) - # noinspection PyProtectedMember - _config_omf_types = _config['sending_process_instance']._fetch_configuration( - cat_name=_CONFIG_CATEGORY_OMF_TYPES_NAME, - cat_desc=_CONFIG_CATEGORY_OMF_TYPES_DESCRIPTION, - cat_config=_CONFIG_DEFAULT_OMF_TYPES, - cat_keep_original=True) - - _validate_configuration_omf_type(_config_omf_types) - - # Converts the value field from str to a dict - for item in _config_omf_types: - - if _config_omf_types[item]['type'] == 'JSON': - - # The conversion from a dict to str changes the case and it should be fixed before the conversion - value = _config_omf_types[item]['value'].replace("true", "True") - new_value = ast.literal_eval(value) - _config_omf_types[item]['value'] = new_value - - _logger.debug("{0} - URL {1}".format("plugin_init", _config['URL'])) - - try: - _recreate_omf_objects = True - - except Exception as ex: - _logger.error(plugin_common.MESSAGES_LIST["e000011"].format(ex)) - raise plugin_exceptions.PluginInitializeFailed(ex) - - return _config - - -# noinspection PyUnusedLocal -async def plugin_send(data, raw_data, stream_id): - """ Translates and sends to the destination system the data provided by the Sending Process - Args: - data: plugin_handle from sending_process - raw_data : Data to send as retrieved from the storage layer - stream_id - Returns: - data_to_send : True, data successfully sent to the destination system - new_position : Last row_id already sent - num_sent : Number of rows sent, used for the update of the statistics - Raises: - """ - - global _recreate_omf_objects - - is_data_sent = False - config_category_name = data['_CONFIG_CATEGORY_NAME'] - type_id = _config_omf_types['type-id']['value'] - - # Sets globals for the OMF module - pi_server._logger = _logger - pi_server._log_debug_level = _log_debug_level - pi_server._log_performance = _log_performance - - ocs_north = OCSNorthPlugin(data['sending_process_instance'], data, _config_omf_types, _logger) - - try: - # Alloc the in memory buffer - buffer_size = len(raw_data) - data_to_send = [None for x in range(buffer_size)] - - is_data_available, new_position, num_sent = ocs_north.transform_in_memory_data(data_to_send, raw_data) - - if is_data_available: - - await ocs_north.create_omf_objects(raw_data, config_category_name, type_id) - - try: - await ocs_north.send_in_memory_data_to_picromf("Data", data_to_send) - - except Exception as ex: - # Forces the recreation of PIServer's objects on the first error occurred - if _recreate_omf_objects: - await ocs_north.deleted_omf_types_already_created(config_category_name, type_id) - _recreate_omf_objects = False - _logger.debug("{0} - Forces objects recreation ".format("plugin_send")) - raise ex - else: - is_data_sent = True - - except Exception as ex: - _logger.exception(plugin_common.MESSAGES_LIST["e000031"].format(ex)) - raise - - return is_data_sent, new_position, num_sent - - -# noinspection PyUnusedLocal -def plugin_shutdown(data): - """ Terminates the plugin - Returns: - Raises: - """ - try: - _logger.debug("{0} - plugin_shutdown".format(_MODULE_NAME)) - - except Exception as ex: - _logger.error(plugin_common.MESSAGES_LIST["e000013"].format(ex)) - raise - - -def plugin_reconfigure(): - """ Reconfigures the plugin, it should be called when the configuration of the plugin is changed during the - operation of the South service. - The new configuration category should be passed. - - Args: - Returns: - Raises: - """ - - pass - - -class OCSNorthPlugin(pi_server.PIServerNorthPlugin): - """ North OCS North Plugin """ - - def __init__(self, sending_process_instance, config, config_omf_types, _logger): - - super().__init__(sending_process_instance, config, config_omf_types, _logger) diff --git a/python/foglamp/plugins/north/pi_server/README.rst b/python/foglamp/plugins/north/pi_server/README.rst deleted file mode 100644 index bdc298c899..0000000000 --- a/python/foglamp/plugins/north/pi_server/README.rst +++ /dev/null @@ -1,6 +0,0 @@ -************************ -FogLAMP North OMF Plugin -************************ - -This plugin implements the protocol and format conversion required to talk -to the OSIsoft PI Connector Relay using the OSIsoft Message Format (OMF). diff --git a/python/foglamp/plugins/north/pi_server/__init__.py b/python/foglamp/plugins/north/pi_server/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/python/foglamp/plugins/north/pi_server/pi_server.py b/python/foglamp/plugins/north/pi_server/pi_server.py deleted file mode 100644 index 8b0cb955d7..0000000000 --- a/python/foglamp/plugins/north/pi_server/pi_server.py +++ /dev/null @@ -1,976 +0,0 @@ -# -*- coding: utf-8 -*- - -# FOGLAMP_BEGIN -# See: http://foglamp.readthedocs.io/ -# FOGLAMP_END - -""" The OMF North is a plugin output formatter for the FogLAMP appliance. -It is loaded by the send process (see The FogLAMP Sending Process) and runs in the context of the send process, -to send the reading data to a PI Server (or Connector) using the OSIsoft OMF format. -PICROMF = PI Connector Relay OMF""" - -import aiohttp -import asyncio -import gzip -import sys -import copy -import ast -import resource -import datetime -import time -import json -import logging -import foglamp.plugins.north.common.common as plugin_common -import foglamp.plugins.north.common.exceptions as plugin_exceptions -from foglamp.common import logger -from foglamp.common.storage_client import payload_builder - -# Module information -__author__ = "Stefano Simonelli" -__copyright__ = "Copyright (c) 2018 OSIsoft, LLC" -__license__ = "Apache 2.0" -__version__ = "${VERSION}" - - -# LOG configuration -_LOG_LEVEL_DEBUG = 10 -_LOG_LEVEL_INFO = 20 -_LOG_LEVEL_WARNING = 30 - -_LOGGER_LEVEL = _LOG_LEVEL_WARNING -_LOGGER_DESTINATION = logger.SYSLOG -_logger = None - -_MODULE_NAME = "pi_server_north" - -# Messages used for Information, Warning and Error notice -MESSAGES_LIST = { - - # Information messages - "i000000": "information.", - - # Warning / Error messages - "e000000": "general error.", - - "e000001": "the producerToken must be defined, use the FogLAMP API to set a proper value.", - "e000002": "the producerToken cannot be an empty string, use the FogLAMP API to set a proper value.", - - "e000010": "the type-id must be defined, use the FogLAMP API to set a proper value.", - "e000011": "the type-id cannot be an empty string, use the FogLAMP API to set a proper value.", - -} - - -# Defines what and the level of details for logging -_log_debug_level = 0 -_log_performance = False -_stream_id = None - -# Configurations retrieved from the Configuration Manager -_config_omf_types = {} -_config = {} - -# Forces the recreation of PIServer objects when the first error occurs -_recreate_omf_objects = True - -# Messages used for Information, Warning and Error notice -_MESSAGES_LIST = { - # Information messages - "i000000": "information.", - # Warning / Error messages - "e000000": "general error.", -} -# Configuration related to the OMF North -_CONFIG_CATEGORY_DESCRIPTION = 'PI Server North Plugin' -_CONFIG_DEFAULT_OMF = { - 'plugin': { - 'description': 'PI Server North Plugin', - 'type': 'string', - 'default': 'pi_server', - 'readonly': 'true' - }, - "URL": { - "description": "URL of PI Connector to send data to", - "type": "string", - "default": "https://pi-server:5460/ingress/messages", - "order": "1", - "displayName": "URL" - }, - "producerToken": { - "description": "Producer token for this FogLAMP stream", - "type": "string", - "default": "pi_server_north_0001", - "order": "2", - "displayName": "Producer Token" - }, - "source": { - "description": "Source of data to be sent on the stream. May be either readings or statistics.", - "type": "enumeration", - "default": "readings", - "options": ["readings", "statistics"], - "order": "3", - "displayName": "Data Source" - }, - "compression": { - "description": "Compress message body", - "type": "boolean", - "default": "true", - "displayName": "Compression" - }, - "StaticData": { - "description": "Static data to include in each sensor reading sent via OMF", - "type": "JSON", - "default": json.dumps( - { - "Location": "Palo Alto", - "Company": "Dianomic" - } - ), - "order": "4", - "displayName": "Static Data" - }, - "applyFilter": { - "description": "Should filter be applied before processing the data?", - "type": "boolean", - "default": "False", - "order": "5", - "displayName": "Apply Filter" - }, - "filterRule": { - "description": "JQ formatted filter to apply (only applicable if applyFilter is True)", - "type": "string", - "default": ".[]", - "order": "6", - "displayName": "Filter Rule" - }, - "OMFRetrySleepTime": { - "description": "Seconds between each retry for communication with the OMF PI Connector Relay. " - "This time is doubled at each attempt.", - "type": "integer", - "default": "1", - "order": "9", - "displayName": "Sleep Time Retry" - }, - "OMFMaxRetry": { - "description": "Max number of retries for communication with the OMF PI Connector Relay", - "type": "integer", - "default": "3", - "order": "10", - "displayName": "Maximum Retry" - }, - "OMFHttpTimeout": { - "description": "Timeout in seconds for HTTP operations with the OMF PI Connector Relay", - "type": "integer", - "default": "10", - "order": "13", - "displayName": "HTTP Timeout" - }, - "formatInteger": { - "description": "OMF format property to apply to the type Integer", - "type": "string", - "default": "int64", - "order": "14", - "displayName": "Integer Format" - }, - "formatNumber": { - "description": "OMF format property to apply to the type Number", - "type": "string", - "default": "float64", - "order": "15", - "displayName": "Number Format" - }, - "notBlockingErrors": { - "description": "These errors are considered not blocking in the communication with the PI Server," - " the sending operation will proceed with the next block of data if one of these is encountered", - "type": "JSON", - "default": json.dumps( - [ - {'id': 400, 'message': 'Invalid value type for the property'}, - {'id': 400, 'message': 'Redefinition of the type with the same ID is not allowed'} - ] - ), - "readonly": "true" - }, - -} - -# Configuration related to the OMF Types -_CONFIG_CATEGORY_OMF_TYPES_NAME = 'OMF_TYPES' -_CONFIG_CATEGORY_OMF_TYPES_DESCRIPTION = 'OMF Types' - -CONFIG_DEFAULT_OMF_TYPES = { - "type-id": { - "description": "Identify sensor and measurement types", - "type": "integer", - "default": "0001" - }, -} - -_OMF_PREFIX_MEASUREMENT = "measurement_" -_OMF_SUFFIX_TYPENAME = "_typename" - -OMF_TEMPLATE_TYPE = { - "typename": [ - { - "id": "static-type-id", - "type": "object", - "classification": "static", - "properties": { - } - }, - { - "id": "dynamic-type-id", - "type": "object", - "classification": "dynamic", - "properties": { - } - } - ] -} -_OMF_TEMPLATE_CONTAINER = [ - { - "id": "xxx", - "typeid": "xxx" - } -] -_OMF_TEMPLATE_STATIC_DATA = [ - { - "typeid": "xxx", - "values": [{ - "Name": "xxx" - }] - } -] -_OMF_TEMPLATE_LINK_DATA = [ - { - "typeid": "__Link", - "values": [{ - "source": { - "typeid": "xxx", - "index": "_ROOT" - }, - "target": { - "typeid": "xxx", - "index": "xxx" - } - }, { - "source": { - "typeid": "xxx", - "index": "xxx" - }, - "target": { - "containerid": "xxx" - } - }] - } -] - - -def _performance_log(_function): - """ Logs information for performance measurement """ - - def wrapper(*arg): - """ wrapper """ - - # Avoids any exceptions related to the performance measurement - try: - - start = datetime.datetime.now() - - # Code execution - result = _function(*arg) - - if _log_performance: - usage = resource.getrusage(resource.RUSAGE_SELF) - memory_process = (usage[2]) / 1000 - delta = datetime.datetime.now() - start - delta_milliseconds = int(delta.total_seconds() * 1000) - - _logger.info("PERFORMANCE - {0} - milliseconds |{1:>8,}| - memory MB |{2:>8,}|".format( - _function.__name__, - delta_milliseconds, - memory_process)) - - return result - - except Exception as ex: - print("ERROR - {func} - error details |{error}|".format( - func="_performance_log", - error=ex), file=sys.stderr) - raise - - return wrapper - - -def plugin_info(): - return { - 'name': "PI Server North", - 'version': "1.0.0", - 'type': "north", - 'interface': "1.0", - 'config': _CONFIG_DEFAULT_OMF - } - - -def _validate_configuration(data): - """ Validates the configuration retrieved from the Configuration Manager - Args: - data: configuration retrieved from the Configuration Manager - Returns: - Raises: - ValueError - """ - - _message = "" - - if 'producerToken' not in data: - _message = MESSAGES_LIST["e000001"] - - else: - if data['producerToken']['value'] == "": - - _message = MESSAGES_LIST["e000002"] - - if _message != "": - _logger.error(_message) - raise ValueError(_message) - - -def _validate_configuration_omf_type(data): - """ Validates the configuration retrieved from the Configuration Manager related to the OMF types - Args: - data: configuration retrieved from the Configuration Manager - Returns: - Raises: - ValueError - """ - - _message = "" - - if 'type-id' not in data: - _message = MESSAGES_LIST["e000010"] - else: - if data['type-id']['value'] == "": - - _message = MESSAGES_LIST["e000011"] - - if _message != "": - _logger.error(_message) - raise ValueError(_message) - - -def plugin_init(data): - """ Initializes the OMF plugin for the sending of blocks of readings to the PI Connector. - Args: - Returns: - Raises: - PluginInitializeFailed - """ - global _config - global _config_omf_types - global _logger - global _recreate_omf_objects - global _log_debug_level, _log_performance, _stream_id - - _log_debug_level = data['debug_level'] - _log_performance = data['log_performance'] - _stream_id = data['stream_id'] - - try: - # note : _module_name is used as __name__ refers to the Sending Proces - logger_name = _MODULE_NAME + "_" + str(_stream_id) - - _logger = \ - logger.setup(logger_name, destination=_LOGGER_DESTINATION) if _log_debug_level == 0 else\ - logger.setup(logger_name, destination=_LOGGER_DESTINATION, level=logging.INFO if _log_debug_level == 1 else logging.DEBUG) - - except Exception as ex: - _logger.error("{0} - ERROR - {1}".format(time.strftime("%Y-%m-%d %H:%M:%S:"), plugin_common.MESSAGES_LIST["e000012"].format(str(ex)))) - raise ex - _logger.debug("{0} - ".format("plugin_info")) - - _validate_configuration(data) - - # Retrieves the configurations and apply the related conversions - _config['_CONFIG_CATEGORY_NAME'] = data['_CONFIG_CATEGORY_NAME'] - _config['URL'] = data['URL']['value'] - _config['producerToken'] = data['producerToken']['value'] - _config['OMFMaxRetry'] = int(data['OMFMaxRetry']['value']) - _config['OMFRetrySleepTime'] = int(data['OMFRetrySleepTime']['value']) - _config['OMFHttpTimeout'] = int(data['OMFHttpTimeout']['value']) - - _config['StaticData'] = ast.literal_eval(data['StaticData']['value']) - _config['notBlockingErrors'] = ast.literal_eval(data['notBlockingErrors']['value']) - - _config['formatNumber'] = data['formatNumber']['value'] - _config['formatInteger'] = data['formatInteger']['value'] - - _config['compression'] = data['compression']['value'] - - # TODO: compare instance fetching via inspect vs as param passing - # import inspect - # _config['sending_process_instance'] = inspect.currentframe().f_back.f_locals['self'] - _config['sending_process_instance'] = data['sending_process_instance'] - - # _config_omf_types = json.loads(data['omf_types']['value']) - _config_omf_types = _config['sending_process_instance']._fetch_configuration(cat_name=_CONFIG_CATEGORY_OMF_TYPES_NAME, - cat_desc=_CONFIG_CATEGORY_OMF_TYPES_DESCRIPTION, - cat_config=CONFIG_DEFAULT_OMF_TYPES, - cat_keep_original=True) - - _validate_configuration_omf_type(_config_omf_types) - - # Converts the value field from str to a dict - for item in _config_omf_types: - if _config_omf_types[item]['type'] == 'JSON': - # The conversion from a dict to str changes the case and it should be fixed before the conversion - value = _config_omf_types[item]['value'].replace("true", "True") - new_value = ast.literal_eval(value) - _config_omf_types[item]['value'] = new_value - - _logger.debug("{0} - URL {1}".format("plugin_init", _config['URL'])) - try: - _recreate_omf_objects = True - except Exception as ex: - _logger.error(plugin_common.MESSAGES_LIST["e000011"].format(ex)) - raise plugin_exceptions.PluginInitializeFailed(ex) - - return _config - - -async def plugin_send(data, raw_data, stream_id): - """ Translates and sends to the destination system the data provided by the Sending Process - Args: - data: plugin_handle from sending_process - raw_data : Data to send as retrieved from the storage layer - stream_id - Returns: - data_to_send : True, data successfully sent to the destination system - new_position : Last row_id already sent - num_sent : Number of rows sent, used for the update of the statistics - Raises: - """ - - global _recreate_omf_objects - - is_data_sent = False - config_category_name = data['_CONFIG_CATEGORY_NAME'] - type_id = _config_omf_types['type-id']['value'] - - omf_north = PIServerNorthPlugin(data['sending_process_instance'], data, _config_omf_types, _logger) - - # Alloc the in memory buffer - buffer_size = len(raw_data) - data_to_send = [None for _ in range(buffer_size)] - - is_data_available, new_position, num_sent = omf_north.transform_in_memory_data(data_to_send, raw_data) - - if is_data_available: - - await omf_north.create_omf_objects(raw_data, config_category_name, type_id) - - try: - await omf_north.send_in_memory_data_to_picromf("Data", data_to_send) - - except Exception as ex: - # Forces the recreation of PIServer's objects on the first error occurred - if _recreate_omf_objects: - await omf_north.deleted_omf_types_already_created(config_category_name, type_id) - _recreate_omf_objects = False - _logger.debug("{0} - Forces objects recreation ".format("plugin_send")) - raise ex - else: - is_data_sent = True - - return is_data_sent, new_position, num_sent - - -def plugin_shutdown(data): - """ Terminates the plugin - Returns: - Raises: - """ - try: - _logger.debug("{0} - plugin_shutdown".format(_MODULE_NAME)) - except Exception as ex: - _logger.error(plugin_common.MESSAGES_LIST["e000013"].format(ex)) - raise - - -def plugin_reconfigure(): - """ plugin_reconfigure """ - - pass - - -class PIServerNorthPlugin(object): - """ North OMF North Plugin """ - - def __init__(self, sending_process_instance, config, config_omf_types, _logger): - - self._sending_process_instance = sending_process_instance - - self._config = config - self._config_omf_types = config_omf_types - self._logger = _logger - - async def deleted_omf_types_already_created(self, config_category_name, type_id): - """ Deletes OMF types/objects tracked as already created, it is used to force the recreation of the types - Args: - config_category_name: used to identify OMF objects already created - type_id: used to identify OMF objects already created - Returns: - Raises: - """ - payload = payload_builder.PayloadBuilder() \ - .WHERE(['configuration_key', '=', config_category_name]) \ - .AND_WHERE(['type_id', '=', type_id]) \ - .payload() - - await self._sending_process_instance._storage_async.delete_from_tbl("omf_created_objects", payload) - - async def _retrieve_omf_types_already_created(self, configuration_key, type_id): - """ Retrieves the list of OMF types already defined/sent to the PICROMF - Args: - configuration_key - part of the key to identify the type - type_id - part of the key to identify the type - Returns: - List of Asset code already defined into the PI Server - Raises: - """ - payload = payload_builder.PayloadBuilder() \ - .WHERE(['configuration_key', '=', configuration_key]) \ - .AND_WHERE(['type_id', '=', type_id]) \ - .payload() - - omf_created_objects = await self._sending_process_instance._storage_async.query_tbl_with_payload('omf_created_objects', payload) - self._logger.debug("{func} - omf_created_objects {item} ".format( - func="_retrieve_omf_types_already_created", - item=omf_created_objects)) - # Extracts only the asset_code column - rows = [] - for row in omf_created_objects['rows']: - rows.append(row['asset_code']) - - return rows - - async def _flag_created_omf_type(self, configuration_key, type_id, asset_code): - """ Stores into the Storage layer the successfully creation of the type into PICROMF. - Args: - configuration_key - part of the key to identify the type - type_id - part of the key to identify the type - asset_code - asset code defined into PICROMF - Returns: - Raises: - """ - payload = payload_builder.PayloadBuilder()\ - .INSERT(configuration_key=configuration_key, - asset_code=asset_code, - type_id=type_id)\ - .payload() - await self._sending_process_instance._storage_async.insert_into_tbl("omf_created_objects", payload) - - def _generate_omf_asset_id(self, asset_code): - """ Generates an asset id usable by AF/PI Server from an asset code stored into the Storage layer - Args: - asset_code : Asset code stored into the Storage layer - Returns: - Asset id usable by AF/PI Server - Raises: - """ - asset_id = asset_code.replace(" ", "") - return asset_id - - def _generate_omf_measurement(self, asset_code): - """ Generates the measurement id associated to an asset code - Args: - asset_code : Asset code retrieved from the Storage layer - Returns: - Measurement id associated to the specific asset code - Raises: - """ - asset_id = asset_code.replace(" ", "") - type_id = self._config_omf_types['type-id']['value'] - return type_id + _OMF_PREFIX_MEASUREMENT + asset_id - - def _generate_omf_typename_automatic(self, asset_code): - """ Generates the typename associated to an asset code for the automated generation of the OMF types - Args: - asset_code : Asset code retrieved from the Storage layer - Returns: - typename associated to the specific asset code - Raises: - """ - asset_id = asset_code.replace(" ", "") - return asset_id + _OMF_SUFFIX_TYPENAME - - async def _create_omf_objects_automatic(self, asset_info): - """ Handles the Automatic OMF Type Mapping - Args: - asset_info : Asset's information as retrieved from the Storage layer, - having also a sample value for the asset - Returns: - response_status_code: http response code related to the PICROMF request - Raises: - """ - typename, omf_type = await self._create_omf_type_automatic(asset_info) - await self._create_omf_object_links(asset_info["asset_code"], typename, omf_type) - - async def _create_omf_type_automatic(self, asset_info): - """ Automatic OMF Type Mapping - Handles the OMF type creation - Args: - asset_info : Asset's information as retrieved from the Storage layer, - having also a sample value for the asset - Returns: - typename : typename associate to the asset - omf_type : describe the OMF type as a python dict - Raises: - """ - type_id = self._config_omf_types["type-id"]["value"] - sensor_id = self._generate_omf_asset_id(asset_info["asset_code"]) - asset_data = asset_info["asset_data"] - typename = self._generate_omf_typename_automatic(sensor_id) - new_tmp_dict = copy.deepcopy(OMF_TEMPLATE_TYPE) - omf_type = {typename: new_tmp_dict["typename"]} - # Handles Static section - # Generates elements evaluating the StaticData retrieved form the Configuration Manager - omf_type[typename][0]["properties"]["Name"] = { - "type": "string", - "isindex": True - } - omf_type[typename][0]["id"] = type_id + "_" + typename + "_sensor" - for item in self._config['StaticData']: - omf_type[typename][0]["properties"][item] = {"type": "string"} - # Handles Dynamic section - omf_type[typename][1]["properties"]["Time"] = { - "type": "string", - "format": "date-time", - "isindex": True - } - omf_type[typename][1]["id"] = type_id + "_" + typename + "_measurement" - - # Applies configured format property for the specific type - for item in asset_data: - item_type = plugin_common.evaluate_type(asset_data[item]) - - self._logger.debug( - "func |{func}| - item_type |{type}| - formatInteger |{int}| - formatNumber |{float}| ".format( - func="_create_omf_type_automatic", - type=item_type, - int=self._config['formatInteger'], - float=self._config['formatNumber'])) - - # Handles OMF format property to force the proper OCS type, especially for handling decimal numbers - if item_type == "integer": - - # Forces the creation of integer as number - omf_type[typename][1]["properties"][item] = {"type": "number", - "format": self._config['formatNumber']} - - # - # omf_type[typename][1]["properties"][item] = {"type": item_type, - # "format": self._config['formatInteger']} - elif item_type == "number": - omf_type[typename][1]["properties"][item] = {"type": item_type, - "format": self._config['formatNumber']} - - elif item_type == "array": - omf_type[typename][1]["properties"][item] = { - "type": item_type, - "items": { - "type": "number", - "format": self._config['formatNumber'] - } - } - else: - omf_type[typename][1]["properties"][item] = {"type": item_type} - - - if _log_debug_level == 3: - self._logger.debug("_create_omf_type_automatic - sensor_id |{0}| - omf_type |{1}| ".format(sensor_id, str(omf_type))) - - await self.send_in_memory_data_to_picromf("Type", omf_type[typename]) - - return typename, omf_type - - async def _create_omf_objects_configuration_based(self, asset_code, asset_code_omf_type): - """ Handles the Configuration Based OMF Type Mapping - Args: - asset_code - asset_code_omf_type : describe the OMF type as a python dict - Returns: - Raises: - """ - typename, omf_type = await self._create_omf_type_configuration_based(asset_code_omf_type) - await self._create_omf_object_links(asset_code, typename, omf_type) - - async def _create_omf_type_configuration_based(self, asset_code_omf_type): - """ Configuration Based OMF Type Mapping - Handles the OMF type creation - Args: - asset_code_omf_type : describe the OMF type as a python dict - Returns: - typename : typename associate to the asset - omf_type : describe the OMF type as a python dict - Raises: - """ - type_id = self._config_omf_types["type-id"]["value"] - typename = asset_code_omf_type["typename"] - new_tmp_dict = copy.deepcopy(OMF_TEMPLATE_TYPE) - omf_type = {typename: new_tmp_dict["typename"]} - # Handles Static section - omf_type[typename][0]["properties"] = asset_code_omf_type["static"] - omf_type[typename][0]["id"] = type_id + "_" + typename + "_sensor" - # Handles Dynamic section - omf_type[typename][1]["properties"] = asset_code_omf_type["dynamic"] - omf_type[typename][1]["id"] = type_id + "_" + typename + "_measurement" - if _log_debug_level == 3: - self._logger.debug("_create_omf_type_configuration_based - omf_type |{0}| ".format(str(omf_type))) - - await self.send_in_memory_data_to_picromf("Type", omf_type[typename]) - - return typename, omf_type - - async def _create_omf_object_links(self, asset_code, typename, omf_type): - """ Handles the creation of the links between the OMF objects : - sensor, its measurement, sensor type and measurement type - Args: - asset_code - typename : name/id of the type - omf_type : describe the OMF type as a python dict - Returns: - Raises: - """ - sensor_id = self._generate_omf_asset_id(asset_code) - measurement_id = self._generate_omf_measurement(sensor_id) - type_sensor_id = omf_type[typename][0]["id"] - type_measurement_id = omf_type[typename][1]["id"] - # Handles containers - containers = copy.deepcopy(_OMF_TEMPLATE_CONTAINER) - containers[0]["id"] = measurement_id - containers[0]["typeid"] = type_measurement_id - # Handles static_data - static_data = copy.deepcopy(_OMF_TEMPLATE_STATIC_DATA) - static_data[0]["typeid"] = type_sensor_id - static_data[0]["values"][0] = copy.deepcopy(self._config['StaticData']) - static_data[0]["values"][0]['Name'] = sensor_id - # Handles link_data - link_data = copy.deepcopy(_OMF_TEMPLATE_LINK_DATA) - link_data[0]["values"][0]['source']['typeid'] = type_sensor_id - link_data[0]["values"][0]['target']['typeid'] = type_sensor_id - link_data[0]["values"][0]['target']['index'] = sensor_id - link_data[0]["values"][1]['source']['typeid'] = type_sensor_id - link_data[0]["values"][1]['source']['index'] = sensor_id - link_data[0]["values"][1]['target']['containerid'] = measurement_id - if _log_debug_level == 3: - self._logger.debug("_create_omf_object_links - asset_code |{0}| - containers |{1}| ".format(asset_code, - str(containers))) - self._logger.debug("_create_omf_object_links - asset_code |{0}| - static_data |{1}| ".format(asset_code, - str(static_data))) - self._logger.debug("_create_omf_object_links - asset_code |{0}| - link_data |{1}| ".format(asset_code, - str(link_data))) - await self.send_in_memory_data_to_picromf("Container", containers) - await self.send_in_memory_data_to_picromf("Data", static_data) - await self.send_in_memory_data_to_picromf("Data", link_data) - - return - - async def create_omf_objects(self, raw_data, config_category_name, type_id): - """ Handles the creation of the OMF types related to the asset codes using one of the 2 possible ways : - Automatic OMF Type Mapping - Configuration Based OMF Type Mapping - Args: - raw_data : data block to manage as retrieved from the Storage layer - config_category_name: used to identify OMF objects already created - type_id: used to identify OMF objects already created - Returns: - Raises: - """ - asset_codes_to_evaluate = plugin_common.identify_unique_asset_codes(raw_data) - asset_codes_already_created = await self._retrieve_omf_types_already_created(config_category_name, type_id) - - for item in asset_codes_to_evaluate: - asset_code = item["asset_code"] - - # Evaluates if it is a new OMF type - if not any(tmp_item == asset_code for tmp_item in asset_codes_already_created): - - asset_code_omf_type = "" - try: - asset_code_omf_type = copy.deepcopy(self._config_omf_types[asset_code]["value"]) - except KeyError: - configuration_based = False - else: - configuration_based = True - - if configuration_based: - self._logger.debug("creates type - configuration based - asset |{0}| ".format(asset_code)) - await self._create_omf_objects_configuration_based(asset_code, asset_code_omf_type) - else: - - # handling - Automatic OMF Type Mapping - self._logger.debug("creates type - automatic handling - asset |{0}| ".format(asset_code)) - await self._create_omf_objects_automatic(item) - - await self._flag_created_omf_type(config_category_name, type_id, asset_code) - else: - self._logger.debug("asset already created - asset |{0}| ".format(asset_code)) - - async def send_in_memory_data_to_picromf(self, message_type, omf_data): - """ Sends data to PICROMF - it retries the operation using a sleep time increased *2 for every retry - it logs a WARNING only at the end of the retry mechanism in case of a communication error - Args: - message_type: possible values {Type, Container, Data} - omf_data: OMF message to send - Returns: - Raises: - Exception: an error occurred during the OMF request - URLFetchError: in case of http response code different from 2xx - """ - sleep_time = self._config['OMFRetrySleepTime'] - _message = "" - _error = False - num_retry = 1 - msg_header = {'producertoken': self._config['producerToken'], - 'messagetype': message_type, - 'action': 'create', - 'messageformat': 'JSON', - 'omfversion': '1.0'} - omf_data_json = json.dumps(omf_data) - - self._logger.debug("OMF message length |{0}| ".format(len(omf_data_json))) - - if _log_debug_level == 3: - self._logger.debug("OMF message : |{0}| |{1}| " .format(message_type, omf_data_json)) - - while num_retry <= self._config['OMFMaxRetry']: - _error = False - try: - use_compression = True if self._config['compression'].upper() == 'TRUE' else False - if use_compression: - msg_body = gzip.compress(bytes(omf_data_json, 'utf-8')) - msg_header.update({'compression': 'gzip'}) - # https://docs.aiohttp.org/en/stable/client_advanced.html#uploading-pre-compressed-data - msg_header.update({'Content-Encoding': 'gzip'}) - else: - msg_body = omf_data_json - - self._logger.info("SEND requested with compression: %s started at: %s", str(use_compression), datetime.datetime.now().isoformat()) - async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False)) as session: - async with session.post( - url=self._config['URL'], - headers=msg_header, - data=msg_body, - timeout=self._config['OMFHttpTimeout'] - ) as resp: - - status_code = resp.status - text = await resp.text() - except (TimeoutError, asyncio.TimeoutError) as ex: - _message = plugin_common.MESSAGES_LIST["e000024"].format(self._config['URL'], "connection Timeout") - _error = plugin_exceptions.URLConnectionError(_message) - - except Exception as ex: - details = str(ex) - _message = plugin_common.MESSAGES_LIST["e000024"].format(self._config['URL'], details) - _error = plugin_exceptions.URLConnectionError(_message) - - else: - self._logger.info("PI Server responded with status: %s received at: %s", str(status_code), - datetime.datetime.now().isoformat()) - # Evaluate the HTTP status codes - if not str(status_code).startswith('2'): - if any(_['id'] == status_code and _['message'] in text for _ in self._config['notBlockingErrors']): - - # The error encountered is in the list of not blocking - # the sending operation will proceed with the next block of data - self._logger.warning(plugin_common.MESSAGES_LIST["e000032"].format(status_code, text)) - _error = "" - else: - _tmp_text = "status code " + str(status_code) + " - " + text - _message = plugin_common.MESSAGES_LIST["e000024"].format(self._config['URL'], _tmp_text) - _error = plugin_exceptions.URLConnectionError(_message) - - self._logger.debug("message type |{0}| response: |{1}| |{2}| ".format( - message_type, - status_code, - text)) - - if _error: - await asyncio.sleep(sleep_time) - num_retry += 1 - sleep_time *= 2 - else: - break - - if _error: - raise _error - - @_performance_log - def transform_in_memory_data(self, data_to_send, raw_data): - """ Transforms the in memory data into a new structure that could be converted into JSON for the PICROMF - Args: - data_to_send - Transformed/generated data - raw_data - Input data - Returns: - data_available - True, there are new data - _new_position - It corresponds to the row_id of the last element - _num_sent - Number of elements handled, used to update the statistics - Raises: - """ - - _new_position = 0 - data_available = False - - # statistics - _num_sent = 0 - - idx = 0 - - try: - - for row in raw_data: - - # Identification of the object/sensor - measurement_id = self._generate_omf_measurement(row['asset_code']) - - try: - # The expression **row['reading'] - joins the 2 dictionaries - # - # The code formats the date to the format OMF/the PI Server expects directly - # without using python date library for performance reason and - # because it is expected to receive the date in a precise/fixed format : - # 2018-05-28 16:56:55.000000+00 - data_to_send[idx] = { - "containerid": measurement_id, - "values": [ - { - "Time": row['user_ts'][0:10] + "T" + row['user_ts'][11:23] + "Z", - **row['reading'] - } - ] - } - - if _log_debug_level == 3: - self._logger.debug("stream ID : |{0}| sensor ID : |{1}| row ID : |{2}| " - .format(measurement_id, row['asset_code'], str(row['id']))) - - self._logger.debug("in memory info |{0}| ".format(data_to_send[idx])) - - idx += 1 - - # Used for the statistics update - _num_sent += 1 - - # Latest position reached - _new_position = row['id'] - - data_available = True - - except Exception as e: - self._logger.warning(plugin_common.MESSAGES_LIST["e000023"].format(e)) - - except Exception: - self._logger.error(plugin_common.MESSAGES_LIST["e000021"]) - raise - - return data_available, _new_position, _num_sent \ No newline at end of file From 59ff6ba576e391396df4a11013b31a9c4a259117 Mon Sep 17 00:00:00 2001 From: Stefano Simonelli Date: Mon, 7 Oct 2019 09:39:34 +0000 Subject: [PATCH 2/5] FOGL-3168: python north plugin unit tests removed Signed-off-by: Stefano Simonelli --- .../foglamp/plugins/north/ocs/test_ocs.py | 504 ------ .../plugins/north/pi_server/test_pi_server.py | 1531 ----------------- 2 files changed, 2035 deletions(-) delete mode 100644 tests/unit/python/foglamp/plugins/north/ocs/test_ocs.py delete mode 100644 tests/unit/python/foglamp/plugins/north/pi_server/test_pi_server.py diff --git a/tests/unit/python/foglamp/plugins/north/ocs/test_ocs.py b/tests/unit/python/foglamp/plugins/north/ocs/test_ocs.py deleted file mode 100644 index 781318efbe..0000000000 --- a/tests/unit/python/foglamp/plugins/north/ocs/test_ocs.py +++ /dev/null @@ -1,504 +0,0 @@ -# -*- coding: utf-8 -*- -""" Unit tests for the OCS plugin """ - -# FOGLAMP_BEGIN -# See: http://foglamp.readthedocs.io/ -# FOGLAMP_END - -__author__ = "Stefano Simonelli" -__copyright__ = "Copyright (c) 2018 OSIsoft, LLC" -__license__ = "Apache 2.0" -__version__ = "${VERSION}" - -import pytest -import json -import logging - -from unittest.mock import patch, MagicMock, ANY - -from foglamp.plugins.north.ocs import ocs -from foglamp.tasks.north.sending_process import SendingProcess -import foglamp.tasks.north.sending_process as module_sp -from foglamp.common.storage_client.storage_client import StorageClientAsync - -_STREAM_ID = 1 - -async def mock_async_call(p1=ANY): - """ mocks a generic async function """ - return p1 - - -@pytest.fixture -def fixture_ocs(event_loop): - """" Configures the OMF instance for the tests """ - - _omf = MagicMock() - - ocs._logger = MagicMock(spec=logging) - ocs._config_omf_types = {"type-id": {"value": "0001"}} - - return ocs - - -@pytest.fixture -def fixture_ocs_north(event_loop): - """" Configures the OMF instance for the tests """ - - sending_process_instance = MagicMock() - config = [] - config_omf_types = [] - - _logger = MagicMock(spec=logging) - - ocs_north = ocs.OCSNorthPlugin(sending_process_instance, config, config_omf_types, _logger) - - ocs_north._sending_process_instance._storage_async = MagicMock(spec=StorageClientAsync) - - return ocs_north - - -@pytest.allure.feature("unit") -@pytest.allure.story("plugin", "north", "ocs") -class TestOCS: - """Unit tests related to the public methods of the OCS plugin """ - - def test_plugin_info(self): - - plugin_info = ocs.plugin_info() - - assert plugin_info == { - 'name': "OCS North", - 'version': "1.0.0", - 'type': "north", - 'interface': "1.0", - 'config': ocs._CONFIG_DEFAULT_OMF - } - - def test_plugin_init_good(self): - """Tests plugin_init using a good set of values""" - - ocs._logger = MagicMock() - - # Used to check the conversions - data = { - "stream_id": {"value": 1}, - - "_CONFIG_CATEGORY_NAME": module_sp.SendingProcess._CONFIG_CATEGORY_NAME, - "URL": {"value": "test_URL"}, - "producerToken": {"value": "test_producerToken"}, - "OMFMaxRetry": {"value": "100"}, - "OMFRetrySleepTime": {"value": "100"}, - "OMFHttpTimeout": {"value": "100"}, - "StaticData": { - "value": json.dumps( - { - "Location": "Palo Alto", - "Company": "Dianomic" - } - ) - }, - "destination_type": {"value": "3"}, - 'sending_process_instance': MagicMock(spec=SendingProcess), - "formatNumber": {"value": "float64"}, - "formatInteger": {"value": "int64"}, - "notBlockingErrors": {"value": "{'id': 400, 'message': 'none'}"}, - "compression": {"value": "true"}, - "namespace": {"value": "ocs_namespace_0001"}, - "tenant_id": {"value": "ocs_tenant_id"}, - "client_id": {"value": "ocs_client_id"}, - "client_secret": {"value": "ocs_client_secret"}, - - } - - config_default_omf_types = ocs._CONFIG_DEFAULT_OMF_TYPES - config_default_omf_types["type-id"]["value"] = "0001" - data["debug_level"] = None - data["log_performance"] = None - data["destination_id"] = 1 - data["stream_id"] = 1 - - with patch.object(data['sending_process_instance'], '_fetch_configuration', - return_value=config_default_omf_types): - config = ocs.plugin_init(data) - - assert config['_CONFIG_CATEGORY_NAME'] == module_sp.SendingProcess._CONFIG_CATEGORY_NAME - assert config['URL'] == "test_URL" - assert config['producerToken'] == "test_producerToken" - assert config['OMFMaxRetry'] == 100 - assert config['OMFRetrySleepTime'] == 100 - assert config['OMFHttpTimeout'] == 100 - - # Check conversion from String to Dict - assert isinstance(config['StaticData'], dict) - - @pytest.mark.parametrize("data", [ - - # Bad case 1 - StaticData is a python dict instead of a string containing a dict - { - "stream_id": {"value": 1}, - - "_CONFIG_CATEGORY_NAME": module_sp.SendingProcess._CONFIG_CATEGORY_NAME, - "URL": {"value": "test_URL"}, - "producerToken": {"value": "test_producerToken"}, - "OMFMaxRetry": {"value": "100"}, - "OMFRetrySleepTime": {"value": "100"}, - "OMFHttpTimeout": {"value": "100"}, - "StaticData": { - "value": - { - "Location": "Palo Alto", - "Company": "Dianomic" - } - }, - - 'sending_process_instance': MagicMock(spec=SendingProcess), - - "formatNumber": {"value": "float64"}, - "formatInteger": {"value": "int64"}, - }, - - # Bad case 2 - OMFMaxRetry, bad value expected an int it is a string - { - "stream_id": {"value": 1}, - - "_CONFIG_CATEGORY_NAME": module_sp.SendingProcess._CONFIG_CATEGORY_NAME, - "URL": {"value": "test_URL"}, - "producerToken": {"value": "test_producerToken"}, - "OMFMaxRetry": {"value": "xxx"}, - "OMFRetrySleepTime": {"value": "100"}, - "OMFHttpTimeout": {"value": "100"}, - "StaticData": { - "value": json.dumps( - { - "Location": "Palo Alto", - "Company": "Dianomic" - } - ) - }, - - 'sending_process_instance': MagicMock(spec=SendingProcess), - - "formatNumber": {"value": "float64"}, - "formatInteger": {"value": "int64"}, - }, - - # Bad case 3- formatNumber not defined - { - "stream_id": {"value": 1}, - - "_CONFIG_CATEGORY_NAME": module_sp.SendingProcess._CONFIG_CATEGORY_NAME, - "URL": {"value": "test_URL"}, - "producerToken": {"value": "test_producerToken"}, - "OMFMaxRetry": {"value": "100"}, - "OMFRetrySleepTime": {"value": "100"}, - "OMFHttpTimeout": {"value": "100"}, - "StaticData": { - "value": json.dumps( - { - "Location": "Palo Alto", - "Company": "Dianomic" - } - ) - }, - - 'sending_process_instance': MagicMock(spec=SendingProcess), - - "formatInteger": {"value": "int64"} - }, - - - # Bad case 4 - formatInteger not defined - { - "stream_id": {"value": 1}, - - "_CONFIG_CATEGORY_NAME": module_sp.SendingProcess._CONFIG_CATEGORY_NAME, - "URL": {"value": "test_URL"}, - "producerToken": {"value": "test_producerToken"}, - "OMFMaxRetry": {"value": "100"}, - "OMFRetrySleepTime": {"value": "100"}, - "OMFHttpTimeout": {"value": "100"}, - "StaticData": { - "value": json.dumps( - { - "Location": "Palo Alto", - "Company": "Dianomic" - } - ) - }, - - 'sending_process_instance': MagicMock(spec=SendingProcess), - - "formatNumber": {"value": "float64"} - } - - ]) - def test_plugin_init_bad(self, data): - """Tests plugin_init using an invalid set of values""" - - ocs._logger = MagicMock() - - with pytest.raises(Exception): - ocs.plugin_init(data) - - @pytest.mark.parametrize( - "ret_transform_in_memory_data, " - "p_raw_data, ", - [ - ( - # ret_transform_in_memory_data - # is_data_available - new_position - num_sent - [True, 20, 10], - - # raw_data - [ - { - "id": 10, - "asset_code": "test_asset_code", - "read_key": "ef6e1368-4182-11e8-842f-0ed5f89f718b", - "reading": {"humidity": 100, "temperature": 1001}, - "user_ts": '2018-04-20 09:38:50.163164+00' - } - ] - ) - ] - ) - @pytest.mark.asyncio - async def test_plugin_send_success(self, - ret_transform_in_memory_data, - p_raw_data, - fixture_ocs - ): - - data = MagicMock() - - with patch.object(fixture_ocs.OCSNorthPlugin, - 'transform_in_memory_data', - return_value=ret_transform_in_memory_data) as patched_transform_in_memory_data: - with patch.object(fixture_ocs.OCSNorthPlugin, - 'create_omf_objects', - return_value=mock_async_call()) as patched_create_omf_objects: - with patch.object(fixture_ocs.OCSNorthPlugin, - 'send_in_memory_data_to_picromf', - return_value=mock_async_call()) as patched_send_in_memory_data_to_picromf: - await fixture_ocs.plugin_send(data, p_raw_data, _STREAM_ID) - - assert patched_transform_in_memory_data.called - assert patched_create_omf_objects.called - assert patched_send_in_memory_data_to_picromf.called - - @pytest.mark.parametrize( - "ret_transform_in_memory_data, " - "p_raw_data, ", - [ - ( - # ret_transform_in_memory_data - # is_data_available - new_position - num_sent - [True, 20, 10], - - # raw_data - { - "id": 10, - "asset_code": "test_asset_code", - "read_key": "ef6e1368-4182-11e8-842f-0ed5f89f718b", - "reading": {"humidity": 100, "temperature": 1001}, - "user_ts": '2018-04-20 09:38:50.163164+00' - } - ) - - ] - ) - @pytest.mark.asyncio - async def test_plugin_send_error( - self, - fixture_ocs, - ret_transform_in_memory_data, - p_raw_data - ): - """ Unit test for - plugin_send - error handling case - it tests especially if the ocs objects are created again in case of a communication error - - NOTE : the stderr is redirected to avoid the print of an error message that could be ignored. - """ - - data = MagicMock() - - with patch.object(fixture_ocs.OCSNorthPlugin, - 'transform_in_memory_data', - return_value=ret_transform_in_memory_data - ) as patched_transform_in_memory_data: - - with patch.object(fixture_ocs.OCSNorthPlugin, - 'create_omf_objects', - return_value=mock_async_call() - ) as patched_create_omf_objects: - - with patch.object(fixture_ocs.OCSNorthPlugin, - 'send_in_memory_data_to_picromf', - side_effect=KeyError('mocked object generated an exception') - ) as patched_send_in_memory_data_to_picromf: - - with patch.object(fixture_ocs.OCSNorthPlugin, - 'deleted_omf_types_already_created', - return_value=mock_async_call() - ) as patched_deleted_omf_types_already_created: - - with pytest.raises(Exception): - await fixture_ocs.plugin_send(data, p_raw_data, - _STREAM_ID) - assert patched_transform_in_memory_data.called - assert patched_create_omf_objects.called - assert patched_send_in_memory_data_to_picromf.called - assert patched_deleted_omf_types_already_created.called - - def test_plugin_shutdown(self): - - ocs._logger = MagicMock() - data = [] - ocs.plugin_shutdown([data]) - - def test_plugin_reconfigure(self): - - ocs._logger = MagicMock() - ocs.plugin_reconfigure() - - -class TestOCSNorthPlugin: - """Unit tests related to OCSNorthPlugin, methods used internally to the plugin""" - - @pytest.mark.parametrize( - "p_test_data, " - "p_type_id, " - "p_static_data, " - "expected_typename," - "expected_omf_type", - [ - # Case 1 - pressure / Number - ( - # Origin - Sensor data - {"asset_code": "pressure", "asset_data": {"pressure": 921.6}}, - - # type_id - "0001", - - # Static Data - { - "Location": "Palo Alto", - "Company": "Dianomic" - }, - - # Expected - 'pressure_typename', - { - 'pressure_typename': - [ - { - 'classification': 'static', - 'id': '0001_pressure_typename_sensor', - 'properties': { - 'Company': {'type': 'string'}, - 'Name': {'isindex': True, 'type': 'string'}, - 'Location': {'type': 'string'} - }, - 'type': 'object' - }, - { - 'classification': 'dynamic', - 'id': '0001_pressure_typename_measurement', - 'properties': { - - - 'Time': { - 'isindex': True, - 'format': 'date-time', - 'type': 'string' - }, - 'pressure': { - 'type': 'number', - 'format': 'float64' - } - }, - 'type': 'object' - } - ] - } - ), - # Case 2 - luxometer / Integer - ( - # Origin - Sensor data - {"asset_code": "luxometer", "asset_data": {"lux": 20}}, - - # type_id - "0002", - - # Static Data - { - "Location": "Palo Alto", - "Company": "Dianomic" - }, - - # Expected - 'luxometer_typename', - { - 'luxometer_typename': - [ - { - 'classification': 'static', - 'id': '0002_luxometer_typename_sensor', - 'properties': { - 'Company': {'type': 'string'}, - 'Name': {'isindex': True, 'type': 'string'}, - 'Location': {'type': 'string'} - }, - 'type': 'object' - }, - { - 'classification': 'dynamic', - 'id': '0002_luxometer_typename_measurement', - 'properties': { - 'Time': {'isindex': True, 'format': 'date-time', 'type': 'string'}, - 'lux': { - 'type': 'number', - 'format': 'float64' - } - }, - 'type': 'object' - } - ] - } - - ) - - ] - ) - @pytest.mark.asyncio - async def test_create_omf_type_automatic( - self, - p_test_data, - p_type_id, - p_static_data, - expected_typename, - expected_omf_type, - fixture_ocs_north): - """ Unit test for - _create_omf_type_automatic - successful case - Tests the generation of the OMF messages starting from Asset name and data - using Automatic OMF Type Mapping""" - - fixture_ocs_north._config_omf_types = {"type-id": {"value": p_type_id}} - - fixture_ocs_north._config = {} - fixture_ocs_north._config["StaticData"] = p_static_data - fixture_ocs_north._config["formatNumber"] = "float64" - fixture_ocs_north._config["formatInteger"] = "int64" - - with patch.object(fixture_ocs_north, - 'send_in_memory_data_to_picromf', - return_value=mock_async_call() - ) as patched_send_in_memory_data_to_picromf: - - typename, omf_type = await fixture_ocs_north._create_omf_type_automatic(p_test_data) - - assert typename == expected_typename - assert omf_type == expected_omf_type - - assert patched_send_in_memory_data_to_picromf.called - patched_send_in_memory_data_to_picromf.assert_any_call("Type", expected_omf_type[expected_typename]) diff --git a/tests/unit/python/foglamp/plugins/north/pi_server/test_pi_server.py b/tests/unit/python/foglamp/plugins/north/pi_server/test_pi_server.py deleted file mode 100644 index 413af98324..0000000000 --- a/tests/unit/python/foglamp/plugins/north/pi_server/test_pi_server.py +++ /dev/null @@ -1,1531 +0,0 @@ -# -*- coding: utf-8 -*- -""" Unit tests for the omf plugin """ - -# FOGLAMP_BEGIN -# See: http://foglamp.readthedocs.io/ -# FOGLAMP_END - -__author__ = "Stefano Simonelli" -__copyright__ = "Copyright (c) 2018 OSIsoft, LLC" -__license__ = "Apache 2.0" -__version__ = "${VERSION}" - -import asyncio -import logging -import pytest -import json -import time -import ast - -import aiohttp - -from unittest.mock import patch, MagicMock, ANY - -from foglamp.tasks.north.sending_process import SendingProcess -from foglamp.plugins.north.pi_server import pi_server -import foglamp.tasks.north.sending_process as module_sp - -from foglamp.common.storage_client import payload_builder - -from foglamp.common.storage_client.storage_client import StorageClientAsync - -_STREAM_ID = 1 - - -# noinspection PyProtectedMember -@pytest.fixture -def fixture_omf(event_loop): - """" Configures the OMF instance for the tests """ - - _omf = MagicMock() - - pi_server._logger = MagicMock(spec=logging) - pi_server._config_omf_types = {"type-id": {"value": "0001"}} - - return pi_server - -# noinspection PyProtectedMember -@pytest.fixture -def fixture_omf_north(event_loop): - """" Configures the OMF instance for the tests """ - - sending_process_instance = MagicMock() - config = [] - config_omf_types = [] - - _logger = MagicMock(spec=logging) - - omf_north = pi_server.PIServerNorthPlugin(sending_process_instance, config, config_omf_types, _logger) - - omf_north._sending_process_instance._storage_async = MagicMock(spec=StorageClientAsync) - - return omf_north - - -async def mock_async_call(p1=ANY): - """ mocks a generic async function """ - return p1 - - -class MockAiohttpClientSession(MagicMock): - """" mock the aiohttp.ClientSession context manager """ - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.code = args[0] - self.text = args[1] - - async def __aenter__(self): - mock_response = MagicMock(spec=aiohttp.ClientResponse) - mock_response.status = self.code - mock_response.text.side_effect = [mock_async_call(self.text)] - - return mock_response - - async def __aexit__(self, *args): - return None - - -class MockAiohttpClientSessionSuccess(MagicMock): - """" mock the aiohttp.ClientSession context manager """ - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - async def __aenter__(self): - mock_response = MagicMock(spec=aiohttp.ClientResponse) - mock_response.status = 200 - mock_response.text.side_effect = [mock_async_call('SUCCESS')] - - return mock_response - - async def __aexit__(self, *args): - return None - - -class MockAiohttpClientSessionError(MagicMock): - """" mock the aiohttp.ClientSession context manager """ - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - async def __aenter__(self): - mock_response = MagicMock(spec=aiohttp.ClientResponse) - mock_response.status = 400 - mock_response.text.side_effect = [mock_async_call('ERROR')] - - return mock_response - - async def __aexit__(self, *args): - return None - - -# noinspection PyUnresolvedReferences -@pytest.allure.feature("unit") -@pytest.allure.story("plugin", "north", "pi_server") -class TestPiServer: - """Unit tests related to the public methods of the omf plugin """ - - def test_plugin_info(self): - - assert pi_server.plugin_info() == { - 'name': "PI Server North", - 'version': "1.0.0", - 'type': "north", - 'interface': "1.0", - 'config': pi_server._CONFIG_DEFAULT_OMF - } - - def test_plugin_init_good(self): - """Tests plugin_init using a good set of values""" - - pi_server._logger = MagicMock() - - # Used to check the conversions - data = { - "stream_id": {"value": 1}, - - "_CONFIG_CATEGORY_NAME": module_sp.SendingProcess._CONFIG_CATEGORY_NAME, - "URL": {"value": "test_URL"}, - "producerToken": {"value": "test_producerToken"}, - "OMFMaxRetry": {"value": "100"}, - "OMFRetrySleepTime": {"value": "100"}, - "OMFHttpTimeout": {"value": "100"}, - "StaticData": { - "value": json.dumps( - { - "Location": "Palo Alto", - "Company": "Dianomic" - } - ) - }, - "destination_type": {"value": "1"}, - 'sending_process_instance': MagicMock(spec=SendingProcess), - "formatNumber": {"value": "float64"}, - "formatInteger": {"value": "int64"}, - "notBlockingErrors": {"value": "{'id': 400, 'message': 'none'}"}, - "compression": {"value": "true"} - - } - - config_default_omf_types = pi_server.CONFIG_DEFAULT_OMF_TYPES - config_default_omf_types["type-id"]["value"] = "0001" - data["debug_level"] = None - data["log_performance"] = None - data["destination_id"] = 1 - data["stream_id"] = 1 - - with patch.object(data['sending_process_instance'], '_fetch_configuration', - return_value=config_default_omf_types): - config = pi_server.plugin_init(data) - - assert config['_CONFIG_CATEGORY_NAME'] == module_sp.SendingProcess._CONFIG_CATEGORY_NAME - assert config['URL'] == "test_URL" - assert config['producerToken'] == "test_producerToken" - assert config['OMFMaxRetry'] == 100 - assert config['OMFRetrySleepTime'] == 100 - assert config['OMFHttpTimeout'] == 100 - - # Check conversion from String to Dict - assert isinstance(config['StaticData'], dict) - - @pytest.mark.parametrize("data", [ - - # Bad case 1 - StaticData is a python dict instead of a string containing a dict - { - "stream_id": {"value": 1}, - - "_CONFIG_CATEGORY_NAME": module_sp.SendingProcess._CONFIG_CATEGORY_NAME, - "URL": {"value": "test_URL"}, - "producerToken": {"value": "test_producerToken"}, - "OMFMaxRetry": {"value": "100"}, - "OMFRetrySleepTime": {"value": "100"}, - "OMFHttpTimeout": {"value": "100"}, - "StaticData": { - "value": - { - "Location": "Palo Alto", - "Company": "Dianomic" - } - }, - - 'sending_process_instance': MagicMock() - }, - - # Bad case 2 - OMFMaxRetry, bad value expected an int it is a string - { - "stream_id": {"value": 1}, - - "_CONFIG_CATEGORY_NAME": module_sp.SendingProcess._CONFIG_CATEGORY_NAME, - "URL": {"value": "test_URL"}, - "producerToken": {"value": "test_producerToken"}, - "OMFMaxRetry": {"value": "xxx"}, - "OMFRetrySleepTime": {"value": "100"}, - "OMFHttpTimeout": {"value": "100"}, - "StaticData": { - "value": json.dumps( - { - "Location": "Palo Alto", - "Company": "Dianomic" - } - ) - }, - - 'sending_process_instance': MagicMock() - } - - ]) - def test_plugin_init_bad(self, data): - """Tests plugin_init using an invalid set of values""" - - pi_server._logger = MagicMock() - - with pytest.raises(Exception): - pi_server.plugin_init(data) - - @pytest.mark.parametrize( - "ret_transform_in_memory_data, " - "p_raw_data, ", - [ - ( - # ret_transform_in_memory_data - # is_data_available - new_position - num_sent - [True, 20, 10], - - # raw_data - { - "id": 10, - "asset_code": "test_asset_code", - "read_key": "ef6e1368-4182-11e8-842f-0ed5f89f718b", - "reading": {"humidity": 100, "temperature": 1001}, - "user_ts": '2018-04-20 09:38:50.163164+00' - } - ), - ( - # ret_transform_in_memory_data - # is_data_available - new_position - num_sent - [False, 20, 10], - - # raw_data - { - "id": 10, - "asset_code": "test_asset_code", - "read_key": "ef6e1368-4182-11e8-842f-0ed5f89f718b", - "reading": {"humidity": 100, "temperature": 1001}, - "user_ts": '2018-04-20 09:38:50.163164+00' - } - ), - - ] - ) - @pytest.mark.asyncio - async def test_plugin_send_success( - self, - event_loop, - fixture_omf, - ret_transform_in_memory_data, - p_raw_data - ): - """ Unit test for - plugin_send - successful case """ - - data = MagicMock() - - if ret_transform_in_memory_data[0]: - # data_available - - with patch.object(fixture_omf.PIServerNorthPlugin, - 'transform_in_memory_data', - return_value=ret_transform_in_memory_data): - - with patch.object(fixture_omf.PIServerNorthPlugin, - 'create_omf_objects', - return_value=mock_async_call() - ) as patched_create_omf_objects: - with patch.object(fixture_omf.PIServerNorthPlugin, - 'send_in_memory_data_to_picromf', - return_value=mock_async_call() - ) as patched_send_in_memory_data_to_picromf: - data_sent, new_position, num_sent = await fixture_omf.plugin_send(data, p_raw_data, _STREAM_ID) - - assert patched_create_omf_objects.called - assert patched_send_in_memory_data_to_picromf.called - - assert data_sent - assert new_position == ret_transform_in_memory_data[1] - assert num_sent == ret_transform_in_memory_data[2] - - else: - # no data_available - - with patch.object(fixture_omf.PIServerNorthPlugin, - 'transform_in_memory_data', - return_value=ret_transform_in_memory_data): - - data_sent, new_position, num_sent = await fixture_omf.plugin_send(data, p_raw_data, _STREAM_ID) - - assert not data_sent - - @pytest.mark.parametrize( - "ret_transform_in_memory_data, " - "p_raw_data, ", - [ - ( - # ret_transform_in_memory_data - # is_data_available - new_position - num_sent - [True, 20, 10], - - # raw_data - { - "id": 10, - "asset_code": "test_asset_code", - "read_key": "ef6e1368-4182-11e8-842f-0ed5f89f718b", - "reading": {"humidity": 100, "temperature": 1001}, - "user_ts": '2018-04-20 09:38:50.163164+00' - } - ) - - ] - ) - @pytest.mark.asyncio - async def test_plugin_send_error( - self, - event_loop, - fixture_omf, - ret_transform_in_memory_data, - p_raw_data - ): - """ Unit test for - plugin_send - error handling case - it tests especially if the omf objects are created again in case of a communication error - NOTE : the test will print a message to the stderr containing 'mocked object generated an exception' - the message is not to be intended an as error as it is part of the successful test. - """ - - data = MagicMock() - - with patch.object(fixture_omf.PIServerNorthPlugin, - 'transform_in_memory_data', - return_value=ret_transform_in_memory_data - ) as patched_transform_in_memory_data: - - with patch.object(fixture_omf.PIServerNorthPlugin, - 'create_omf_objects', - return_value=mock_async_call() - ) as patched_create_omf_objects: - - with patch.object(fixture_omf.PIServerNorthPlugin, - 'send_in_memory_data_to_picromf', - side_effect=KeyError('mocked object generated an exception') - ) as patched_send_in_memory_data_to_picromf: - - with patch.object(fixture_omf.PIServerNorthPlugin, - 'deleted_omf_types_already_created', - return_value=mock_async_call() - ) as patched_deleted_omf_types_already_created: - - with pytest.raises(Exception): - data_sent, new_position, num_sent = await fixture_omf.plugin_send(data, p_raw_data, - _STREAM_ID) - - if ret_transform_in_memory_data[0]: - # data_available - - assert patched_transform_in_memory_data.calles - assert patched_create_omf_objects.called - assert patched_send_in_memory_data_to_picromf.called - assert patched_deleted_omf_types_already_created.called - - def test_plugin_shutdown(self): - - pi_server._logger = MagicMock() - data = [] - pi_server.plugin_shutdown([data]) - - def test_plugin_reconfigure(self): - - pi_server._logger = MagicMock() - pi_server.plugin_reconfigure() - - -class TestPIServerNorthPlugin: - """Unit tests related to PIServerNorthPlugin, methods used internally to the plugin""" - - @pytest.mark.parametrize( - "p_configuration_key, " - "p_type_id, " - "p_data_from_storage, " - "expected_data, ", - [ - - # Case 1 - ( - # p_configuration_key - "SEND_PR1", - - # p_type_id - "0001", - - # p_data_from_storage - { - "rows": - [ - - { - "configuration_key": "SEND_PR1", - "type_id": "0001", - "asset_code": "asset_code_1" - }, - { - "configuration_key": "SEND_PR1", - "type_id": "0001", - "asset_code": "asset_code_2" - } - - ] - }, - - # expected_data - [ - "asset_code_1", - "asset_code_2" - ] - ) - ] - ) - @pytest.mark.asyncio - async def test_retrieve_omf_types_already_created( - self, - p_configuration_key, - p_type_id, - p_data_from_storage, - expected_data, - fixture_omf_north - ): - """ Unit test for - _retrieve_omf_types_already_created - successful case """ - - _payload_builder = MagicMock(spec=payload_builder) - - @pytest.mark.asyncio - async def mock_query_tbl_with_payload(): - """ mock _query_tbl_with_payload """ - - return p_data_from_storage - - with patch.object(_payload_builder, 'PayloadBuilder', return_value=True): - with patch.object(fixture_omf_north._sending_process_instance._storage_async, - 'query_tbl_with_payload', - return_value=mock_query_tbl_with_payload()): - - retrieved_rows = await fixture_omf_north._retrieve_omf_types_already_created(p_configuration_key, p_type_id) - - assert retrieved_rows == expected_data - - @pytest.mark.parametrize( - "p_asset_code, " - "expected_asset_code, ", - [ - # p_asset_code # expected_asset_code - ("asset_code_1 ", "asset_code_1"), - (" asset_code_2 ", "asset_code_2"), - ("asset_ code_3", "asset_code_3"), - ] - ) - def test_generate_omf_asset_id( - self, - p_asset_code, - expected_asset_code, - fixture_omf_north - ): - """Tests _generate_omf_asset_id """ - - generated_asset_code = fixture_omf_north._generate_omf_asset_id(p_asset_code) - - assert generated_asset_code == expected_asset_code - - @pytest.mark.parametrize( - "p_type_id, " - "p_asset_code, " - "expected_measurement_id, ", - [ - # p_type_id - p_asset_code - expected_asset_code - ("0001", "asset_code_1 ", "0001measurement_asset_code_1"), - ("0002", " asset_code_2 ", "0002measurement_asset_code_2"), - ("0003", "asset_ code_3", "0003measurement_asset_code_3"), - ] - ) - def test_generate_omf_measurement( - self, - p_type_id, - p_asset_code, - expected_measurement_id, - fixture_omf_north - ): - """Tests _generate_omf_measurement """ - - fixture_omf_north._config_omf_types = {"type-id": {"value": p_type_id}} - - generated_measurement_id = fixture_omf_north._generate_omf_measurement(p_asset_code) - - assert generated_measurement_id == expected_measurement_id - - @pytest.mark.parametrize( - "p_asset_code, " - "expected_typename, ", - [ - # p_asset_code - expected_asset_code - ("asset_code_1 ", "asset_code_1_typename"), - (" asset_code_2 ", "asset_code_2_typename"), - ("asset_ code_3", "asset_code_3_typename"), - ] - ) - def test_generate_omf_typename_automatic( - self, - p_asset_code, - expected_typename - ): - """Tests _generate_omf_typename_automatic """ - - sending_process_instance = MagicMock() - config = [] - config_omf_types = [] - logger = MagicMock() - - omf_north = pi_server.PIServerNorthPlugin(sending_process_instance, config, config_omf_types, logger) - - generated_typename = omf_north._generate_omf_typename_automatic(p_asset_code) - - assert generated_typename == expected_typename - - @pytest.mark.parametrize( - "p_test_data, " - "p_type_id, " - "p_static_data, " - "expected_typename," - "expected_omf_type", - [ - # Case 1 - pressure / Number - ( - # Origin - Sensor data - {"asset_code": "pressure", "asset_data": {"pressure": 921.6}}, - - # type_id - "0001", - - # Static Data - { - "Location": "Palo Alto", - "Company": "Dianomic" - }, - - # Expected - 'pressure_typename', - { - 'pressure_typename': - [ - { - 'classification': 'static', - 'id': '0001_pressure_typename_sensor', - 'properties': { - 'Company': {'type': 'string'}, - 'Name': {'isindex': True, 'type': 'string'}, - 'Location': {'type': 'string'} - }, - 'type': 'object' - }, - { - 'classification': 'dynamic', - 'id': '0001_pressure_typename_measurement', - 'properties': { - 'Time': {'isindex': True, 'format': 'date-time', 'type': 'string'}, - 'pressure': {'type': 'number', 'format': 'float64'} - }, - 'type': 'object' - } - ] - } - ), - # Case 2 - luxometer / Integer - ( - # Origin - Sensor data - {"asset_code": "luxometer", "asset_data": {"lux": 20}}, - - # type_id - "0002", - - # Static Data - { - "Location": "Palo Alto", - "Company": "Dianomic" - }, - - # Expected - 'luxometer_typename', - { - 'luxometer_typename': - [ - { - 'classification': 'static', - 'id': '0002_luxometer_typename_sensor', - 'properties': { - 'Company': {'type': 'string'}, - 'Name': {'isindex': True, 'type': 'string'}, - 'Location': {'type': 'string'} - }, - 'type': 'object' - }, - { - 'classification': 'dynamic', - 'id': '0002_luxometer_typename_measurement', - 'properties': { - 'Time': {'isindex': True, 'format': 'date-time', 'type': 'string'}, - 'lux': {'type': 'number', 'format': 'float64'} - }, - 'type': 'object' - } - ] - } - - ), - - # Case 3 - switch / string - ( - # Origin - Sensor data - {"asset_code": "switch", "asset_data": {"button": "up"}}, - - # type_id - "0002", - - # Static Data - { - "Location": "Palo Alto", - "Company": "Dianomic" - }, - - # Expected - 'switch_typename', - { - 'switch_typename': - [ - { - 'classification': 'static', - 'id': '0002_switch_typename_sensor', - 'properties': { - 'Company': {'type': 'string'}, - 'Name': {'isindex': True, 'type': 'string'}, - 'Location': {'type': 'string'} - }, - 'type': 'object' - }, - { - 'classification': 'dynamic', - 'id': '0002_switch_typename_measurement', - 'properties': { - 'Time': {'isindex': True, 'format': 'date-time', 'type': 'string'}, - 'button': {'type': 'string'} - }, - 'type': 'object' - } - ] - } - - ) - - ] - ) - @pytest.mark.asyncio - async def test_create_omf_type_automatic( - self, - p_test_data, - p_type_id, - p_static_data, - expected_typename, - expected_omf_type, - fixture_omf_north - ): - """ Unit test for - _create_omf_type_automatic - successful case - Tests the generation of the OMF messages starting from Asset name and data - using Automatic OMF Type Mapping - """ - - fixture_omf_north._config_omf_types = {"type-id": {"value": p_type_id}} - - fixture_omf_north._config = {} - fixture_omf_north._config["StaticData"] = p_static_data - fixture_omf_north._config["formatNumber"] = "float64" - fixture_omf_north._config["formatInteger"] = "int64" - - - with patch.object( - fixture_omf_north, - 'send_in_memory_data_to_picromf', - return_value=mock_async_call() - ) as patched_send_in_memory_data_to_picromf: - - typename, omf_type = await fixture_omf_north._create_omf_type_automatic(p_test_data) - - assert typename == expected_typename - assert omf_type == expected_omf_type - - patched_send_in_memory_data_to_picromf.assert_called_with ("Type", expected_omf_type[expected_typename]) - - @pytest.mark.parametrize( - "p_type_id, " - "p_asset_code_omf_type, " - "expected_typename, " - "expected_omf_type, ", - [ - # Case 1 - ( - # p_type_id - "0001", - - # p_asset_code_omf_type - { - "typename": "position", - "static": { - "Name": { - "type": "string", - "isindex": True - }, - "Location": { - "type": "string" - } - }, - "dynamic": { - "Time": { - "type": "string", - "format": "date-time", - "isindex": True - }, - "x": { - "type": "number" - }, - "y": { - "type": "number" - }, - "z": { - "type": "number" - } - } - }, - - # expected_typename - "position", - - # expected_omf_type - { - "position": [ - { - "id": "0001_position_sensor", - "type": "object", - "classification": "static", - "properties": { - "Name": { - "type": "string", - "isindex": True - }, - "Location": { - "type": "string" - } - } - }, - { - "id": "0001_position_measurement", - "type": "object", - "classification": "dynamic", - "properties": { - "Time": { - "type": "string", - "format": "date-time", - "isindex": True - }, - "x": { - "type": "number" - }, - "y": { - "type": "number" - }, - "z": { - "type": "number" - } - - } - } - - ] - } - ) - ] - ) - @pytest.mark.asyncio - async def test_create_omf_type_configuration_based( - self, - p_type_id, - p_asset_code_omf_type, - expected_typename, - expected_omf_type, - fixture_omf_north - ): - """ Unit test for - _create_omf_type_configuration_based - successful case - Tests the generation of the OMF messages using Configuration Based OMF Type Mapping - """ - - fixture_omf_north._config_omf_types = {"type-id": {"value": p_type_id}} - - with patch.object(fixture_omf_north, - 'send_in_memory_data_to_picromf', - return_value=mock_async_call() - ) as patched_send_to_picromf: - generated_typename, \ - generated_omf_type = await fixture_omf_north._create_omf_type_configuration_based(p_asset_code_omf_type) - - assert generated_typename == expected_typename - assert generated_omf_type == expected_omf_type - - patched_send_to_picromf.assert_any_call("Type", expected_omf_type[expected_typename]) - - - @pytest.mark.parametrize( - "p_asset," - "p_type_id, " - "p_static_data, " - "p_typename," - "p_omf_type, " - "expected_container, " - "expected_static_data, " - "expected_link_data ", - [ - # Case 1 - pressure / Number - ( - # p_asset - {"asset_code": "pressure", "asset_data": {"pressure": 921.6}}, - - # type_id - "0001", - - # Static Data - { - "Location": "Palo Alto", - "Company": "Dianomic" - }, - - # p_typename - 'pressure_typename', - - # p_omf_type - { - 'pressure_typename': - [ - { - 'classification': 'static', - 'id': '0001_pressure_typename_sensor', - 'properties': { - 'Company': {'type': 'string'}, - 'Name': {'isindex': True, 'type': 'string'}, - 'Location': {'type': 'string'} - }, - 'type': 'object' - }, - { - 'classification': 'dynamic', - 'id': '0001_pressure_typename_measurement', - 'properties': { - 'Time': {'isindex': True, 'format': 'date-time', 'type': 'string'}, - 'pressure': {'type': 'number'} - }, - 'type': 'object' - } - ] - }, - - # expected_container - [ - { - 'typeid': '0001_pressure_typename_measurement', - 'id': '0001measurement_pressure' - } - ], - - # expected_static_data - [ - { - 'typeid': '0001_pressure_typename_sensor', - 'values': [ - { - 'Company': 'Dianomic', - 'Location': 'Palo Alto', - 'Name': 'pressure' - } - ] - } - ], - - # expected_link_data - [ - { - 'typeid': '__Link', 'values': [ - { - 'source': {'typeid': '0001_pressure_typename_sensor', 'index': '_ROOT'}, - 'target': {'typeid': '0001_pressure_typename_sensor', 'index': 'pressure'} - }, - { - 'source': {'typeid': '0001_pressure_typename_sensor', 'index': 'pressure'}, - 'target': {'containerid': '0001measurement_pressure'} - } - ] - } - ] - ) - - ] - ) - @pytest.mark.asyncio - async def test_create_omf_object_links( - self, - p_asset, - p_type_id, - p_static_data, - p_typename, - p_omf_type, - expected_container, - expected_static_data, - expected_link_data, - fixture_omf_north - ): - - fixture_omf_north._config_omf_types = {"type-id": {"value": p_type_id}} - fixture_omf_north._config = {"StaticData": p_static_data} - - with patch.object(fixture_omf_north, - 'send_in_memory_data_to_picromf', - side_effect=[asyncio.ensure_future(mock_async_call()) for x in range(3)] - ) as patched_send_to_picromf: - - await fixture_omf_north._create_omf_object_links(p_asset["asset_code"], p_typename, p_omf_type) - - assert patched_send_to_picromf.call_count == 3 - - patched_send_to_picromf.assert_any_call("Container", expected_container) - patched_send_to_picromf.assert_any_call("Data", expected_static_data) - patched_send_to_picromf.assert_any_call("Data", expected_link_data) - - @pytest.mark.parametrize( - "p_creation_type, " - "p_data_origin, " - "p_asset_codes_already_created, " - "p_omf_objects_configuration_based ", - [ - # Case 1 - automatic - ( - # p_creation_type - "automatic", - - # Origin - [ - { - "id": 10, - "asset_code": "test_asset_code", - "read_key": "ef6e1368-4182-11e8-842f-0ed5f89f718b", - "reading": {"humidity": 10, "temperature": 20}, - "user_ts": '2018-04-20 09:38:50.163164+00' - } - ], - - # asset_codes_already_created - [ - "test_none" - ], - - # omf_objects_configuration_based - {"none": "none"} - ), - # Case 2 - configuration - ( - # p_creation_type - "configuration", - - # Origin - [ - { - "id": 10, - "asset_code": "test_asset_code", - "read_key": "ef6e1368-4182-11e8-842f-0ed5f89f718b", - "reading": {"humidity": 10, "temperature": 20}, - "user_ts": '2018-04-20 09:38:50.163164+00' - } - ], - - # asset_codes_already_created - [ - "test_none" - ], - - # omf_objects_configuration_based - {"test_asset_code": {"value": "test_asset_code"}} - ) - - ] - ) - @pytest.mark.asyncio - async def test_create_omf_objects( - self, - p_creation_type, - p_data_origin, - p_asset_codes_already_created, - p_omf_objects_configuration_based, - fixture_omf_north - ): - """ Unit test for - create_omf_objects - successful case - Tests the evaluation of the 2 ways of creating OMF objects: automatic or configuration based - """ - - config_category_name = "SEND_PR" - type_id = "0001" - - fixture_omf_north._config_omf_types = {"type-id": {"value": type_id}} - fixture_omf_north._config_omf_types = p_omf_objects_configuration_based - - if p_creation_type == "automatic": - - with patch.object(fixture_omf_north, - '_retrieve_omf_types_already_created', - return_value=mock_async_call(p_asset_codes_already_created)): - - with patch.object( - fixture_omf_north, - '_create_omf_objects_automatic', - return_value=mock_async_call() - ) as patched_create_omf_objects_automatic: - - with patch.object(fixture_omf_north, - '_flag_created_omf_type', - return_value=mock_async_call() - ) as patched_flag_created_omf_type: - - await fixture_omf_north.create_omf_objects(p_data_origin, config_category_name, type_id) - - assert patched_create_omf_objects_automatic.called - assert patched_flag_created_omf_type.called - - elif p_creation_type == "configuration": - - with patch.object(fixture_omf_north, - '_retrieve_omf_types_already_created', - return_value=mock_async_call(p_asset_codes_already_created)): - - with patch.object( - fixture_omf_north, - '_create_omf_objects_configuration_based', - return_value=mock_async_call() - ) as patched_create_omf_objects_configuration_based: - - with patch.object(fixture_omf_north, - '_flag_created_omf_type', - return_value=mock_async_call() - ) as patched_flag_created_omf_type: - await fixture_omf_north.create_omf_objects(p_data_origin, config_category_name, type_id) - - assert patched_create_omf_objects_configuration_based.called - assert patched_flag_created_omf_type.called - else: - raise Exception("ERROR : creation type not defined !") - - @pytest.mark.parametrize( - "p_key, " - "p_value, " - "expected, ", - [ - # Good cases - ('producerToken', "xxx", "good"), - - # Bad cases - ('NO-producerToken', "", "exception"), - ('producerToken', "", "exception") - ] - ) - def test_validate_configuration( - self, - p_key, - p_value, - expected): - """ Tests the validation of the configurations retrieved from the Configuration Manager - handled by _validate_configuration """ - - pi_server._logger = MagicMock() - - data = {p_key: {'value': p_value}} - - if expected == "good": - assert not pi_server._logger.error.called - - elif expected == "exception": - with pytest.raises(ValueError): - pi_server._validate_configuration(data) - - assert pi_server._logger.error.called - - @pytest.mark.parametrize( - "p_key, " - "p_value, " - "expected, ", - [ - # Good cases - ('type-id', "xxx", "good"), - - # Bad cases - ('NO-type-id', "", "exception"), - ('type-id', "", "exception") - ] - ) - def test_validate_configuration_omf_type( - self, - p_key, - p_value, - expected): - """ Tests the validation of the configurations retrieved from the Configuration Manager - related to the OMF types """ - - pi_server._logger = MagicMock() - - data = {p_key: {'value': p_value}} - - if expected == "good": - assert not pi_server._logger.error.called - - elif expected == "exception": - with pytest.raises(ValueError): - pi_server._validate_configuration_omf_type(data) - - assert pi_server._logger.error.called - - @pytest.mark.parametrize( - "p_test_data ", - [ - # Case 1 - pressure / Number - ( - { - 'dummy': 'dummy' - } - ), - - ] - ) - @pytest.mark.asyncio - async def test_send_in_memory_data_to_picromf_success( - self, - p_test_data, - fixture_omf_north): - """ Unit test for - send_in_memory_data_to_picromf - successful case - Tests a successful communication - """ - - fixture_omf_north._config = dict(producerToken="dummy_producerToken") - fixture_omf_north._config["URL"] = "dummy_URL" - fixture_omf_north._config["OMFRetrySleepTime"] = 1 - fixture_omf_north._config["OMFHttpTimeout"] = 1 - fixture_omf_north._config["OMFMaxRetry"] = 1 - fixture_omf_north._config["compression"] = "false" - - - with patch.object(aiohttp.ClientSession, - 'post', - return_value=MockAiohttpClientSessionSuccess() - ) as patched_aiohttp: - - await fixture_omf_north.send_in_memory_data_to_picromf("Type", p_test_data) - - assert patched_aiohttp.called - assert patched_aiohttp.call_count == 1 - - @pytest.mark.parametrize( - "p_is_error, " - "p_code, " - "p_text, " - "p_test_data ", - [ - ( - False, - 400, 'Invalid value type for the property', - {'dummy': 'dummy'} - ), - ( - False, - 400, 'Redefinition of the type with the same ID is not allowed', - {'dummy': 'dummy'} - ), - ( - True, - 400, 'None', - {'dummy': 'dummy'} - ), - ( - True, - 404, 'None', - {'dummy': 'dummy'} - ), - ( - True, - 404, 'Invalid value type for the property', - {'dummy': 'dummy'} - ), - - ] - ) - @pytest.mark.asyncio - async def test_send_in_memory_data_to_picromf_success_not_blocking_error( - self, - p_is_error, - p_code, - p_text, - p_test_data, - fixture_omf_north): - """ Unit test for - send_in_memory_data_to_picromf - test cases of blocking error (exception raised) and not blocking error - """ - - fixture_omf_north._config = dict(producerToken="dummy_producerToken") - fixture_omf_north._config["URL"] = "dummy_URL" - fixture_omf_north._config["OMFRetrySleepTime"] = 1 - fixture_omf_north._config["OMFHttpTimeout"] = 1 - fixture_omf_north._config["OMFMaxRetry"] = 1 - fixture_omf_north._config["compression"] = "false" - fixture_omf_north._config["notBlockingErrors"] = ast.literal_eval(pi_server._CONFIG_DEFAULT_OMF["notBlockingErrors"]["default"]) - - with patch.object(aiohttp.ClientSession, - 'post', - return_value=MockAiohttpClientSession(p_code, p_text) - ) as patched_aiohttp: - if p_is_error: - # blocking error (exception raised) - with pytest.raises(Exception): - await fixture_omf_north.send_in_memory_data_to_picromf("Data", p_test_data) - else: - # not blocking error, operation terminated successfully - await fixture_omf_north.send_in_memory_data_to_picromf("Data", p_test_data) - - assert patched_aiohttp.called - # as OMFMaxRetry is set to 1 - assert patched_aiohttp.call_count == 1 - - @pytest.mark.parametrize( - "p_type, " - "p_test_data ", - [ - # Case 1 - pressure / Number - ( - "Type", - { - 'pressure_typename': - [ - { - 'classification': 'static', - 'id': '0001_pressure_typename_sensor', - 'properties': { - 'Company': {'type': 'string'}, - 'Name': {'isindex': True, 'type': 'string'}, - 'Location': {'type': 'string'} - }, - 'type': 'object' - }, - { - 'classification': 'dynamic', - 'id': '0001_pressure_typename_measurement', - 'properties': { - 'Time': {'isindex': True, 'format': 'date-time', 'type': 'string'}, - 'pressure': {'type': 'number'} - }, - 'type': 'object' - } - ] - } - - ), - - ] - ) - @pytest.mark.asyncio - async def test_send_in_memory_data_to_picromf_data( - self, - p_type, - p_test_data, - fixture_omf_north): - """ Unit test for - send_in_memory_data_to_picromf - successful case - Tests the data sent to the PI Server in relation of an OMF type - """ - - # Values for the test - test_url = "test_URL" - test_producer_token = "test_producerToken" - test_omf_http_timeout = 1 - test_headers = { - 'producertoken': test_producer_token, - 'messagetype': p_type, - 'action': 'create', - 'messageformat': 'JSON', - 'omfversion': '1.0'} - - fixture_omf_north._config = dict(producerToken=test_producer_token) - fixture_omf_north._config["URL"] = test_url - fixture_omf_north._config["OMFRetrySleepTime"] = 1 - fixture_omf_north._config["OMFHttpTimeout"] = test_omf_http_timeout - fixture_omf_north._config["OMFMaxRetry"] = 1 - fixture_omf_north._config["compression"] = "false" - - # To avoid the wait time - with patch.object(time, 'sleep', return_value=True): - - with patch.object(aiohttp.ClientSession, - 'post', - return_value=MockAiohttpClientSessionSuccess() - ) as patched_aiohttp: - - await fixture_omf_north.send_in_memory_data_to_picromf(p_type, p_test_data) - - str_data = json.dumps(p_test_data) - assert patched_aiohttp.call_count == 1 - patched_aiohttp.assert_called_with( - url=test_url, - headers=test_headers, - data=str_data, - timeout=test_omf_http_timeout) - - @pytest.mark.parametrize( - "p_test_data ", - [ - # Case 1 - pressure / Number - ( - { - 'dummy': 'dummy' - } - - ), - - ] - ) - @pytest.mark.asyncio - async def test_send_in_memory_data_to_picromf_error( - self, - p_test_data, - fixture_omf_north): - """ Unit test for - send_in_memory_data_to_picromf - successful case - Tests the behaviour in case of communication error: - exception erased, - message logged - and number of retries - """ - - max_retry = 3 - - fixture_omf_north._config = dict(producerToken="dummy_producerToken") - fixture_omf_north._config["URL"] = "dummy_URL" - fixture_omf_north._config["OMFRetrySleepTime"] = 1 - fixture_omf_north._config["OMFHttpTimeout"] = 1 - fixture_omf_north._config["OMFMaxRetry"] = max_retry - fixture_omf_north._config["compression"] = "false" - fixture_omf_north._config["notBlockingErrors"] = [{'id': 400, 'message': 'none'}] - - # To avoid the wait time - with patch.object(time, 'sleep', return_value=True): - - with patch.object(aiohttp.ClientSession, - 'post', - return_value=MockAiohttpClientSessionError() - ) as patched_aiohttp: - - # Tests the raising of the exception - with pytest.raises(Exception): - await fixture_omf_north.send_in_memory_data_to_picromf("Type", p_test_data) - - assert patched_aiohttp.call_count == max_retry - - @pytest.mark.parametrize( - "p_data_origin, " - "type_id, " - "expected_data_to_send, " - "expected_is_data_available, " - "expected_new_position, " - "expected_num_sent", [ - # Case 1 - ( - # Origin - [ - { - "id": 10, - "asset_code": "test_asset_code", - "read_key": "ef6e1368-4182-11e8-842f-0ed5f89f718b", - "reading": {"humidity": 11, "temperature": 38}, - "user_ts": '2018-04-20 09:38:50.163164+00' - } - ], - "0001", - # Transformed - [ - { - "containerid": "0001measurement_test_asset_code", - "values": [ - { - "Time": "2018-04-20T09:38:50.163Z", - "humidity": 11, - "temperature": 38 - } - ] - } - ], - True, 10, 1 - ), - # Case 2 - ( - # Origin - [ - { - "id": 11, - "asset_code": "test_asset_code", - "read_key": "ef6e1368-4182-11e8-842f-0ed5f89f718b", - "reading": {"tick": "tock"}, - "user_ts": '2018-04-20 09:38:50.163164+00' - } - ], - "0001", - # Transformed - [ - { - "containerid": "0001measurement_test_asset_code", - "values": [ - { - "Time": "2018-04-20T09:38:50.163Z", - "tick": "tock" - } - ] - } - ], - True, 11, 1 - ), - - # Case 3 - 2 rows - ( - # Origin - [ - { - "id": 12, - "asset_code": "test_asset_code", - "read_key": "ef6e1368-4182-11e8-842f-0ed5f89f718b", - "reading": {"pressure": 957.2}, - "user_ts": '2018-04-20 09:38:50.163164+00' - }, - { - "id": 20, - "asset_code": "test_asset_code", - "read_key": "ef6e1368-4182-11e8-842f-0ed5f89f718b", - "reading": {"y": 34, "z": 114, "x": -174}, - "user_ts": '2018-04-20 09:38:50.163164+00' - } - ], - "0001", - # Transformed - [ - { - "containerid": "0001measurement_test_asset_code", - "values": [ - { - "Time": "2018-04-20T09:38:50.163Z", - "pressure": 957.2 - } - ] - }, - { - "containerid": "0001measurement_test_asset_code", - "values": [ - { - "Time": "2018-04-20T09:38:50.163Z", - "y": 34, - "z": 114, - "x": -174, - } - ] - }, - ], - True, 20, 2 - ) - ]) - def test_plugin_transform_in_memory_data(self, - p_data_origin, - type_id, - expected_data_to_send, - expected_is_data_available, - expected_new_position, - expected_num_sent, - fixture_omf_north): - """Tests the plugin in memory transformations """ - - generated_data_to_send = [None for x in range( len(expected_data_to_send) )] - - fixture_omf_north._config_omf_types = {"type-id": {"value": type_id}} - - is_data_available, new_position, num_sent = fixture_omf_north.transform_in_memory_data(generated_data_to_send, - p_data_origin) - - assert generated_data_to_send == expected_data_to_send - - assert is_data_available == expected_is_data_available - assert new_position == expected_new_position - assert num_sent == expected_num_sent - From 71eeec9b57af81c0fc2a4b6ee24d7a2bf148d551 Mon Sep 17 00:00:00 2001 From: Stefano Simonelli Date: Mon, 7 Oct 2019 10:16:20 +0000 Subject: [PATCH 3/5] FOGL-3168: unit tests fix Signed-off-by: Stefano Simonelli --- tests/unit/C/common/test_config_category.cpp | 6 +++--- tests/unit/python/foglamp/common/test_plugin_discovery.py | 8 ++++---- .../python/foglamp/tasks/north/test_sending_process.py | 8 ++------ 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/tests/unit/C/common/test_config_category.cpp b/tests/unit/C/common/test_config_category.cpp index 11f006656e..b1933bcd5e 100644 --- a/tests/unit/C/common/test_config_category.cpp +++ b/tests/unit/C/common/test_config_category.cpp @@ -298,8 +298,8 @@ const char* bigCategory = "\"plugin\": { " \ "\"type\": \"string\", " \ "\"description\": \"PI Server North C Plugin\", " \ - "\"default\": \"PI_Server_V2\", " \ - "\"value\": \"PI_Server_V2\", \"readonly\": \"true\"}, " \ + "\"default\": \"pi_server\", " \ + "\"value\": \"pi_server\", \"readonly\": \"true\"}, " \ "\"source\": { " \ "\"type\": \"enumeration\", " \ "\"options\": [\"readings\", \"statistics\"], " \ @@ -612,6 +612,6 @@ TEST(CategoryTest, categoryValues) ASSERT_EQ(true, complex.isBool("compression")); ASSERT_EQ(true, complex.isEnumeration("source")); ASSERT_EQ(true, complex.isString("plugin")); - ASSERT_EQ(true, complex.getValue("plugin").compare("PI_Server_V2") == 0); + ASSERT_EQ(true, complex.getValue("plugin").compare("pi_server") == 0); ASSERT_EQ(true, complex.getValue("OMFMaxRetry").compare("3") == 0); } diff --git a/tests/unit/python/foglamp/common/test_plugin_discovery.py b/tests/unit/python/foglamp/common/test_plugin_discovery.py index 389fbebd84..80f4fa7e4b 100644 --- a/tests/unit/python/foglamp/common/test_plugin_discovery.py +++ b/tests/unit/python/foglamp/common/test_plugin_discovery.py @@ -436,15 +436,15 @@ def test_bad_fetch_c_south_plugin_installed(self, info, exc_count): @pytest.mark.parametrize("info, exc_count", [ ({}, 0), - ({"interface": "1.0.0", "version": "1.0.0", "type": "north", "name": "PI_Server", "config": "(null)"}, 1), - ({"interface": "1.0.0", "version": "1.0.0", "type": "north", "name": "PI_Server", "config": {}}, 1) + ({"interface": "1.0.0", "version": "1.0.0", "type": "north", "name": "pi_server", "config": "(null)"}, 1), + ({"interface": "1.0.0", "version": "1.0.0", "type": "north", "name": "pi_server", "config": {}}, 1) ]) def test_bad_fetch_c_north_plugin_installed(self, info, exc_count): with patch.object(_logger, "exception") as patch_log_exc: - with patch.object(utils, "find_c_plugin_libs", return_value=[("PI_Server", "binary")]) as patch_plugin_lib: + with patch.object(utils, "find_c_plugin_libs", return_value=[("pi_server", "binary")]) as patch_plugin_lib: with patch.object(utils, "get_plugin_info", return_value=info) as patch_plugin_info: PluginDiscovery.fetch_c_plugins_installed("north", False) - patch_plugin_info.assert_called_once_with('PI_Server', dir='north') + patch_plugin_info.assert_called_once_with('pi_server', dir='north') patch_plugin_lib.assert_called_once_with('north') assert exc_count == patch_log_exc.call_count diff --git a/tests/unit/python/foglamp/tasks/north/test_sending_process.py b/tests/unit/python/foglamp/tasks/north/test_sending_process.py index 65ce3c76c5..d6e4698e2a 100644 --- a/tests/unit/python/foglamp/tasks/north/test_sending_process.py +++ b/tests/unit/python/foglamp/tasks/north/test_sending_process.py @@ -359,9 +359,7 @@ async def test_is_stream_id_valid(self, assert SendingProcess._logger.error.called @pytest.mark.parametrize("plugin_file, plugin_type, plugin_name, expected_result", [ - ("pi_server", "north", "PI Server North", True), - ("pi_server", "north", "Empty North Plugin", False), - ("pi_server", "south", "PI Server North", False) + ("empty", "north", "Empty North Plugin", False) ]) async def test_is_north_valid(self, plugin_file, plugin_type, plugin_name, expected_result, event_loop): """Tests the possible cases of the function is_north_valid """ @@ -2222,9 +2220,7 @@ async def mock_task(): mock_audit_information.assert_called_with(SendingProcess._AUDIT_CODE, {"sentRows": 100}) @pytest.mark.parametrize("plugin_file, plugin_type, plugin_name", [ - ("empty", "north", "Empty North Plugin"), - ("pi_server", "north", "PI Server North"), - ("ocs", "north", "OCS North") + ("empty", "north", "Empty North Plugin") ]) async def test_standard_plugins(self, plugin_file, plugin_type, plugin_name, event_loop): """Tests if the standard plugins are available and loadable and if they have the required methods """ From 17fb52004d38544d3fc2fef2aae47b7423e917ad Mon Sep 17 00:00:00 2001 From: Stefano Simonelli Date: Mon, 7 Oct 2019 12:19:03 +0000 Subject: [PATCH 4/5] FOGL-3168: ocs plugin version Signed-off-by: Stefano Simonelli --- C/plugins/north/ocs/plugin.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/C/plugins/north/ocs/plugin.cpp b/C/plugins/north/ocs/plugin.cpp index 2fc5897a70..2c6941275c 100644 --- a/C/plugins/north/ocs/plugin.cpp +++ b/C/plugins/north/ocs/plugin.cpp @@ -136,7 +136,7 @@ extern "C" { */ static PLUGIN_INFORMATION info = { PLUGIN_NAME, // Name - "1.0.0", // Version + "1.7.0", // Version SP_PERSIST_DATA, // Flags PLUGIN_TYPE_NORTH, // Type "1.0.0", // Interface version From 6b219e9c59111779b6d813860f9f6c04c72fc1de Mon Sep 17 00:00:00 2001 From: Stefano Simonelli Date: Thu, 10 Oct 2019 10:06:34 +0000 Subject: [PATCH 5/5] FOGL-3168: fixed and error on the ocs plugin Signed-off-by: Stefano Simonelli --- C/plugins/north/ocs/plugin.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/C/plugins/north/ocs/plugin.cpp b/C/plugins/north/ocs/plugin.cpp index 2c6941275c..7ebb971cda 100644 --- a/C/plugins/north/ocs/plugin.cpp +++ b/C/plugins/north/ocs/plugin.cpp @@ -275,6 +275,9 @@ void plugin_start(const PLUGIN_HANDLE handle, Logger* logger = Logger::getLogger(); CONNECTOR_INFO* connInfo = (CONNECTOR_INFO *)handle; + if (storedData.compare("{}") == 0) + return; + // Parse JSON plugin_data Document JSONData; JSONData.Parse(storedData.c_str());