diff --git a/filip/clients/ngsi_v2/cb.py b/filip/clients/ngsi_v2/cb.py index 57b4998b..cfe4c45a 100644 --- a/filip/clients/ngsi_v2/cb.py +++ b/filip/clients/ngsi_v2/cb.py @@ -1898,7 +1898,7 @@ def notify(self, message: Message) -> None: except requests.RequestException as err: msg = ( f"Sending notifcation message failed! \n " - f"{message.model_dump_json(inent=2)}" + f"{message.model_dump_json(indent=2)}" ) self.log_error(err=err, msg=msg) raise diff --git a/filip/models/ngsi_v2/subscriptions.py b/filip/models/ngsi_v2/subscriptions.py index f0be6504..78101eb6 100644 --- a/filip/models/ngsi_v2/subscriptions.py +++ b/filip/models/ngsi_v2/subscriptions.py @@ -11,9 +11,66 @@ Field, \ Json from .base import AttrsFormat, EntityPattern, Http, Status, Expression -from filip.utils.validators import validate_mqtt_url, validate_mqtt_topic +from filip.utils.validators import ( + validate_mqtt_url, + validate_mqtt_topic +) from filip.models.ngsi_v2.context import ContextEntity +from filip.models.ngsi_v2.base import ( + EntityPattern, + Expression, + BaseValueAttribute +) from filip.custom_types import AnyMqttUrl +import warnings + +# The pydantic models still have a .json() function, but this method is deprecated. +warnings.filterwarnings("ignore", category=UserWarning, + message='Field name "json" shadows an attribute in parent "Http"') +warnings.filterwarnings("ignore", category=UserWarning, + message='Field name "json" shadows an attribute in parent "Mqtt"') + + +class NgsiPayloadAttr(BaseValueAttribute): + """ + Model for NGSI V2 type payload in httpCustom/mqttCustom notifications. + The difference between this model and the usual BaseValueAttribute model is that + a metadata field is not allowed. + In the absence of type/value in some attribute field, one should resort to partial + representations ( as specified in the orion api manual), done by the BaseValueAttr. + model. + """ + model_config = ConfigDict(extra="forbid") + + +class NgsiPayload(BaseModel): + """ + Model for NGSI V2 type payload in httpCustom/mqttCustom notifications. + Differences between this model and the usual Context entity models include: + - id and type are not mandatory + - an attribute metadata field is not allowed + """ + model_config = ConfigDict( + extra="allow", validate_default=True + ) + id: Optional[str] = Field( + default=None, + max_length=256, + min_length=1, + frozen=True + ) + type: Optional[Union[str, Enum]] = Field( + default=None, + max_length=256, + min_length=1, + frozen=True, + ) + + @model_validator(mode='after') + def validate_notification_attrs(self): + for v in self.model_dump(exclude={"id", "type"}).values(): + assert isinstance(NgsiPayloadAttr.model_validate(v), NgsiPayloadAttr) + return self class Message(BaseModel): @@ -69,6 +126,35 @@ class HttpCustom(Http): 'default payload (see "Notification Messages" sections) ' 'is used.' ) + json: Optional[Dict[str, Union[str, Json]]] = Field( + default=None, + description='get a json as notification. If omitted, the default' + 'payload (see "Notification Messages" sections) is used.' + ) + ngsi: Optional[NgsiPayload] = Field( + default=None, + description='get an NGSI-v2 normalized entity as notification.If omitted, ' + 'the default payload (see "Notification Messages" sections) is used.' + ) + timeout: Optional[int] = Field( + default=None, + description="Maximum time (in milliseconds) the subscription waits for the " + "response. The maximum value allowed for this parameter is 1800000 " + "(30 minutes). If timeout is defined to 0 or omitted, then the value " + "passed as -httpTimeout CLI parameter is used. See section in the " + "'Command line options' for more details." + ) + + @model_validator(mode='after') + def validate_notification_payloads(self): + fields = [self.payload, self.json, self.ngsi] + filled_fields = [field for field in fields if field is not None] + + if len(filled_fields) > 1: + raise ValueError("Only one of payload, json or ngsi fields accepted at the " + "same time in httpCustom.") + + return self class Mqtt(BaseModel): @@ -82,7 +168,7 @@ class Mqtt(BaseModel): 'only includes host and port)') topic: str = Field( description='to specify the MQTT topic to use', - ) + ) valid_type = field_validator("topic")(validate_mqtt_topic) qos: Optional[int] = Field( default=0, @@ -124,6 +210,22 @@ class MqttCustom(Mqtt): 'default payload (see "Notification Messages" sections) ' 'is used.' ) + json: Optional[Dict[str, Any]] = Field( + default=None, + description='get a json as notification. If omitted, the default' + 'payload (see "Notification Messages" sections) is used.' + ) + ngsi: Optional[NgsiPayload] = Field( + default=None, + description='get an NGSI-v2 normalized entity as notification.If omitted, ' + 'the default payload (see "Notification Messages" sections) is used.' + ) + + @model_validator(mode='after') + def validate_payload_type(self): + assert len([v for k, v in self.model_dump().items() + if ((v is not None) and (k in ['payload', 'ngsi', 'json']))]) <= 1 + return self class Notification(BaseModel): @@ -205,17 +307,17 @@ class Notification(BaseModel): '[A=0, B=null, C=null]. This ' ) - @field_validator('httpCustom') - def validate_http(cls, http_custom, values): - if http_custom is not None: - assert values['http'] is None - return http_custom + @model_validator(mode='after') + def validate_http(self): + if self.httpCustom is not None: + assert self.http is None + return self - @field_validator('exceptAttrs') - def validate_attr(cls, except_attrs, values): - if except_attrs is not None: - assert values['attrs'] is None - return except_attrs + @model_validator(mode='after') + def validate_attr(self): + if self.exceptAttrs is not None: + assert self.attrs is None + return self @model_validator(mode='after') def validate_endpoints(self): @@ -247,7 +349,7 @@ class Response(Notification): 'Last notification timestamp in ISO8601 format.' ) lastFailure: Optional[datetime] = Field( - default = None, + default=None, description='(not editable, only present in GET operations): ' 'Last failure timestamp in ISO8601 format. Not present if ' 'subscription has never had a problem with notifications.' @@ -342,21 +444,21 @@ class Subscription(BaseModel): ) subject: Subject = Field( description="An object that describes the subject of the subscription.", - example={ + examples=[{ 'entities': [{'idPattern': '.*', 'type': 'Room'}], 'condition': { 'attrs': ['temperature'], 'expression': {'q': 'temperature>40'}, }, - }, + }], ) notification: Notification = Field( description="An object that describes the notification to send when " "the subscription is triggered.", - example={ + examples=[{ 'http': {'url': 'http://localhost:1234'}, 'attrs': ['temperature', 'humidity'], - }, + }], ) expires: Optional[datetime] = Field( default=None, @@ -364,11 +466,10 @@ class Subscription(BaseModel): "Permanent subscriptions must omit this field." ) - throttling: Optional[conint(strict=True, ge=0,)] = Field( + throttling: Optional[conint(strict=True, ge=0, )] = Field( default=None, strict=True, description="Minimal period of time in seconds which " "must elapse between two consecutive notifications. " "It is optional." ) - diff --git a/tests/clients/test_ngsi_v2_cb.py b/tests/clients/test_ngsi_v2_cb.py index a4b7fbf9..3cae6b43 100644 --- a/tests/clients/test_ngsi_v2_cb.py +++ b/tests/clients/test_ngsi_v2_cb.py @@ -910,7 +910,91 @@ def test_notification(self): "throttling": 0 }) + mqtt_custom_topic = "notification/custom" + sub_with_mqtt_custom_notification_payload = Subscription.model_validate({ + "description": "Test mqtt custom notification with payload message", + "subject": { + "entities": [ + { + "id": "Test:001", + "type": "Test" + } + ] + }, + "notification": { + "mqttCustom": { + "url": mqtt_url_internal, + "topic": mqtt_custom_topic, + "payload": "The value of the %22temperature%22 attribute %28of the device ${id}, ${type}%29 is" + " ${temperature}. Humidity is ${humidity} and CO2 is ${co2}." + }, + "attrs": ["temperature", "humidity", "co2"], + "onlyChangedAttrs": False + }, + "expires": datetime.now() + timedelta(days=1), + "throttling": 0 + }) + + sub_with_mqtt_custom_notification_json = Subscription.model_validate({ + "description": "Test mqtt custom notification with json message", + "subject": { + "entities": [ + { + "id": "Test:001", + "type": "Test" + } + ] + }, + "notification": { + "mqttCustom": { + "url": mqtt_url_internal, + "topic": mqtt_custom_topic, + "json": { + "t": "${temperature}", + "h": "${humidity}", + "c": "${co2}" + } + }, + "attrs": ["temperature", "humidity", "co2"], + "onlyChangedAttrs": False + }, + "expires": datetime.now() + timedelta(days=1), + "throttling": 0 + }) + + sub_with_mqtt_custom_notification_ngsi = Subscription.model_validate({ + "description": "Test mqtt custom notification with ngsi message", + "subject": { + "entities": [ + { + "id": "Test:001", + "type": "Test" + } + ] + }, + "notification": { + "mqttCustom": { + "url": mqtt_url_internal, + "topic": mqtt_custom_topic, + "ngsi": { + "id": "prefix:${id}", + "type": "newType", + "temperature": { + "value": 123, + "type": "Number" + }, + + } + }, + "attrs": ["temperature", "humidity", "co2"], + "onlyChangedAttrs": False + }, + "expires": datetime.now() + timedelta(days=1), + "throttling": 0 + }) + # MQTT settings + custom_sub_message = None sub_message = None sub_messages = {} @@ -923,6 +1007,7 @@ def on_connect(client, userdata, flags, reasonCode, properties=None): logger.info("Successfully, connected with result code " + str( reasonCode)) client.subscribe(mqtt_topic) + client.subscribe(mqtt_custom_topic) def on_subscribe(client, userdata, mid, granted_qos, properties=None): logger.info("Successfully subscribed to with QoS: %s", granted_qos) @@ -931,8 +1016,12 @@ def on_message(client, userdata, msg): logger.info("Received MQTT message: " + msg.topic + " " + str( msg.payload)) nonlocal sub_message - sub_message = Message.model_validate_json(msg.payload) - sub_messages[sub_message.subscriptionId] = sub_message + nonlocal custom_sub_message + if msg.topic == mqtt_topic: + sub_message = Message.model_validate_json(msg.payload) + sub_messages[sub_message.subscriptionId] = sub_message + elif msg.topic == mqtt_custom_topic: + custom_sub_message = msg.payload def on_disconnect(client, userdata, flags, reasonCode, properties=None): logger.info("MQTT client disconnected with reasonCode " @@ -1027,6 +1116,56 @@ def on_disconnect(client, userdata, flags, reasonCode, properties=None): sub_messages[sub_id_3].data[0].get_attribute( "temperature").value, 10) + # test4 notification with mqtt custom notification (payload) + sub_id_4 = client.post_subscription( + subscription=sub_with_mqtt_custom_notification_payload) + time.sleep(1) + client.update_attribute_value(entity_id=entity.id, + attr_name="temperature", + value=44 + ) + time.sleep(1) + sub_4 = client.get_subscription(sub_id_4) + self.assertEqual(first=custom_sub_message, + second=b'The value of the "temperature" attribute (of the device Test:001, Test) is 44. ' + b'Humidity is 20 and CO2 is 30.') + self.assertEqual(sub_4.notification.timesSent, 1) + client.delete_subscription(sub_id_4) + + # test5 notification with mqtt custom notification (json) + sub_id_5 = client.post_subscription( + subscription=sub_with_mqtt_custom_notification_json) + time.sleep(1) + client.update_attribute_value(entity_id=entity.id, + attr_name="humidity", + value=67 + ) + time.sleep(1) + sub_5 = client.get_subscription(sub_id_5) + self.assertEqual(first=custom_sub_message, + second=b'{"t":44,"h":67,"c":30}') + self.assertEqual(sub_5.notification.timesSent, 1) + client.delete_subscription(sub_id_5) + + # test6 notification with mqtt custom notification (ngsi) + sub_id_6 = client.post_subscription( + subscription=sub_with_mqtt_custom_notification_ngsi) + time.sleep(1) + client.update_attribute_value(entity_id=entity.id, + attr_name="co2", + value=78 + ) + time.sleep(1) + sub_6 = client.get_subscription(sub_id_6) + sub_message = Message.model_validate_json(custom_sub_message) + self.assertEqual(sub_6.notification.timesSent, 1) + self.assertEqual(len(sub_message.data[0].get_attributes()), 3) + self.assertEqual(sub_message.data[0].id, "prefix:Test:001") + self.assertEqual(sub_message.data[0].type, "newType") + self.assertEqual(sub_message.data[0].get_attribute("co2").value, 78) + self.assertEqual(sub_message.data[0].get_attribute("temperature").value, 123) + client.delete_subscription(sub_id_6) + @clean_test(fiware_service=settings.FIWARE_SERVICE, fiware_servicepath=settings.FIWARE_SERVICEPATH, cb_url=settings.CB_URL) diff --git a/tests/models/test_ngsi_v2_subscriptions.py b/tests/models/test_ngsi_v2_subscriptions.py index 6e9d453a..69c58708 100644 --- a/tests/models/test_ngsi_v2_subscriptions.py +++ b/tests/models/test_ngsi_v2_subscriptions.py @@ -12,7 +12,9 @@ Mqtt, \ MqttCustom, \ Notification, \ - Subscription + Subscription, \ + NgsiPayload, \ + NgsiPayloadAttr from filip.models.base import FiwareHeader from filip.utils.cleanup import clear_all, clean_test from tests.config import settings @@ -90,6 +92,10 @@ def test_notification_models(self): with self.assertRaises(ValidationError): Mqtt(url="mqtt://test.de:1883", topic='/,t') + with self.assertRaises(ValidationError): + HttpCustom(url="https://working-url.de:80", json={}, ngsi={}) + with self.assertRaises(ValidationError): + HttpCustom(url="https://working-url.de:80", payload="", json={}) httpCustom = HttpCustom(url=self.http_url) mqtt = Mqtt(url=self.mqtt_url, topic=self.mqtt_topic) @@ -100,10 +106,33 @@ def test_notification_models(self): notification = Notification.model_validate(self.notification) with self.assertRaises(ValidationError): notification.mqtt = httpCustom + notification = Notification.model_validate(self.notification) with self.assertRaises(ValidationError): notification.mqtt = mqtt + notification = Notification.model_validate(self.notification) with self.assertRaises(ValidationError): notification.mqtt = mqttCustom + with self.assertRaises(ValidationError): + HttpCustom(url=self.http_url, json={}, payload="") + with self.assertRaises(ValidationError): + MqttCustom(url=self.mqtt_url, + topic=self.mqtt_topic, ngsi=NgsiPayload(), payload="") + with self.assertRaises(ValidationError): + HttpCustom(url=self.http_url, ngsi=NgsiPayload(), json="") + + #Test validator for ngsi payload type + with self.assertRaises(ValidationError): + attr_dict = { + "metadata": {} + } + NgsiPayloadAttr(**attr_dict) + with self.assertRaises(ValidationError): + attr_dict = { + "id": "entityId", + "type": "entityType", + "k": "v" + } + NgsiPayload(NgsiPayloadAttr(**attr_dict),id="someId",type="someType") # test onlyChangedAttrs-field notification = Notification.model_validate(self.notification) @@ -122,7 +151,8 @@ def test_subscription_models(self) -> None: Returns: None """ - sub = Subscription.model_validate(self.sub_dict) + tmp_dict=self.sub_dict.copy() + sub = Subscription.model_validate(tmp_dict) fiware_header = FiwareHeader(service=settings.FIWARE_SERVICE, service_path=settings.FIWARE_SERVICEPATH) with ContextBrokerClient( @@ -141,6 +171,65 @@ def compare_dicts(dict1: dict, dict2: dict): compare_dicts(sub.model_dump(exclude={'id'}), sub_res.model_dump(exclude={'id'})) + tmp_dict.update({"notification":{ + "httpCustom": { + "url": "http://localhost:1234", + "ngsi":{ + "patchattr":{ + "value":"${temperature/2}", + "type":"Calculated" + } + }, + "method":"POST" + }, + "attrs": [ + "temperature", + "humidity" + ] + }}) + sub = Subscription.model_validate(tmp_dict) + sub_id = client.post_subscription(subscription=sub) + sub_res = client.get_subscription(subscription_id=sub_id) + compare_dicts(sub.model_dump(exclude={'id'}), + sub_res.model_dump(exclude={'id'})) + + tmp_dict.update({"notification":{ + "httpCustom": { + "url": "http://localhost:1234", + "json":{ + "t":"${temperate}", + "h":"${humidity}" + }, + "method":"POST" + }, + "attrs": [ + "temperature", + "humidity" + ] + }}) + sub = Subscription.model_validate(tmp_dict) + sub_id = client.post_subscription(subscription=sub) + sub_res = client.get_subscription(subscription_id=sub_id) + compare_dicts(sub.model_dump(exclude={'id'}), + sub_res.model_dump(exclude={'id'})) + + tmp_dict.update({"notification":{ + "httpCustom": { + "url": "http://localhost:1234", + "payload":"Temperature is ${temperature} and humidity ${humidity}", + "method":"POST" + }, + "attrs": [ + "temperature", + "humidity" + ] + }}) + sub = Subscription.model_validate(tmp_dict) + sub_id = client.post_subscription(subscription=sub) + sub_res = client.get_subscription(subscription_id=sub_id) + compare_dicts(sub.model_dump(exclude={'id'}), + sub_res.model_dump(exclude={'id'})) + # test validation of throttling with self.assertRaises(ValidationError): sub.throttling = -1