From 0415e41128155da68306a4fa2dba80752cd5f2f7 Mon Sep 17 00:00:00 2001 From: Felix Delattre Date: Tue, 16 Sep 2025 09:48:21 +0200 Subject: [PATCH 1/2] Added K_CE_OVERRIDE to cloudevents plugin. --- eoapi_notifier/outputs/cloudevents.py | 21 ++++++++ helm-chart/eoapi-notifier/values.yaml | 1 + tests/test_cloudevents_output.py | 72 +++++++++++++++++++++++++++ 3 files changed, 94 insertions(+) diff --git a/eoapi_notifier/outputs/cloudevents.py b/eoapi_notifier/outputs/cloudevents.py index 368f282..3f108b5 100644 --- a/eoapi_notifier/outputs/cloudevents.py +++ b/eoapi_notifier/outputs/cloudevents.py @@ -5,6 +5,7 @@ Supports standard CloudEvents environment variables and KNative SinkBinding. """ +import json import os from typing import Any from uuid import uuid4 @@ -29,6 +30,7 @@ class CloudEventsConfig(BasePluginConfig): max_retries: int = 3 retry_backoff: float = 1.0 max_header_length: int = 4096 + overrides: dict[str, str] = {} @field_validator("endpoint") @classmethod @@ -43,6 +45,10 @@ def apply_knative_overrides(self) -> "CloudEventsConfig": if k_sink := os.getenv("K_SINK"): self.endpoint = k_sink + if k_ce_overrides := os.getenv("K_CE_OVERRIDES"): + overrides_data = json.loads(k_ce_overrides) + self.overrides = overrides_data.get("extensions", {}) + return self @classmethod @@ -209,6 +215,17 @@ def _convert_to_cloudevent(self, event: NotificationEvent) -> CloudEvent: source = self.config.source event_type_base = self.config.event_type + # Apply KNative CE overrides if present + ce_extensions = {} + if k_ce_overrides := os.getenv("K_CE_OVERRIDES"): + try: + overrides_data = json.loads(k_ce_overrides) + ce_extensions = overrides_data.get("extensions", {}) + except json.JSONDecodeError: + self.logger.warning( + "Invalid K_CE_OVERRIDES JSON, ignoring: %s", k_ce_overrides + ) + # Map operation to event type suffix operation_map = {"INSERT": "created", "UPDATE": "updated", "DELETE": "deleted"} operation = operation_map.get(event.operation.upper(), event.operation.lower()) @@ -233,6 +250,10 @@ def _convert_to_cloudevent(self, event: NotificationEvent) -> CloudEvent: if truncated_collection: attributes["collection"] = truncated_collection + # Apply KNative CE extension overrides + for key, value in ce_extensions.items(): + attributes[key] = str(value) + # Event data payload data = { "id": event.id, diff --git a/helm-chart/eoapi-notifier/values.yaml b/helm-chart/eoapi-notifier/values.yaml index 3a135b6..459b0b9 100644 --- a/helm-chart/eoapi-notifier/values.yaml +++ b/helm-chart/eoapi-notifier/values.yaml @@ -82,6 +82,7 @@ secrets: # KNative Support: # The cloudevents plugin supports K_SINK variables for KNative SinkBinding: # - K_SINK: Overrides CLOUDEVENTS_ENDPOINT (automatically set by SinkBinding) +# - K_CE_OVERRIDE: A JSON object that specifies overrides to the outbound event. env: {} # Examples - Standard environment variables: # PGSTAC_HOST: postgresql-service diff --git a/tests/test_cloudevents_output.py b/tests/test_cloudevents_output.py index 64fdfce..d39b049 100644 --- a/tests/test_cloudevents_output.py +++ b/tests/test_cloudevents_output.py @@ -54,6 +54,29 @@ def test_connection_info(self) -> None: config = CloudEventsConfig(endpoint="https://example.com/webhook") assert "POST https://example.com/webhook" in config.get_connection_info() + @patch.dict( + os.environ, + {"K_CE_OVERRIDES": '{"extensions": {"extra": "test", "num": 42}}'}, + ) + def test_k_ce_overrides_valid(self) -> None: + """Test K_CE_OVERRIDES with valid extensions.""" + config = CloudEventsConfig() + assert config.overrides == {"extra": "test", "num": 42} + + @patch.dict(os.environ, {"K_CE_OVERRIDES": '{"other": "field"}'}) + def test_k_ce_overrides_no_extensions(self) -> None: + """Test K_CE_OVERRIDES without extensions field.""" + config = CloudEventsConfig() + assert config.overrides == {} + + @patch.dict(os.environ, {"K_CE_OVERRIDES": "invalid-json"}) + def test_k_ce_overrides_invalid_json(self) -> None: + """Test K_CE_OVERRIDES with invalid JSON.""" + from pydantic import ValidationError + + with pytest.raises(ValidationError): + CloudEventsConfig() + class TestCloudEventsAdapter: """Test CloudEvents output adapter.""" @@ -286,3 +309,52 @@ async def test_health_check(self, adapter: CloudEventsAdapter) -> None: # Running with client adapter._client = MagicMock() assert await adapter.health_check() is True + + @patch.dict( + os.environ, + { + "K_CE_OVERRIDES": ( + '{"extensions": {"extra": "test-value", "priority": "high"}}' + ) + }, + ) + def test_convert_to_cloudevent_with_overrides( + self, adapter: CloudEventsAdapter, sample_event: NotificationEvent + ) -> None: + """Test CloudEvent conversion with K_CE_OVERRIDES.""" + cloud_event = adapter._convert_to_cloudevent(sample_event) + + assert isinstance(cloud_event, CloudEvent) + assert cloud_event["extra"] == "test-value" + assert cloud_event["priority"] == "high" + + @patch.dict(os.environ, {"K_CE_OVERRIDES": '{"extensions": {"number": 123}}'}) + def test_convert_to_cloudevent_with_number_override( + self, adapter: CloudEventsAdapter, sample_event: NotificationEvent + ) -> None: + """Test CloudEvent conversion with number in K_CE_OVERRIDES.""" + cloud_event = adapter._convert_to_cloudevent(sample_event) + + assert cloud_event["number"] == "123" # Should be converted to string + + @patch.dict(os.environ, {"K_CE_OVERRIDES": "invalid-json"}) + def test_convert_to_cloudevent_invalid_overrides( + self, adapter: CloudEventsAdapter, sample_event: NotificationEvent + ) -> None: + """Test CloudEvent conversion with invalid K_CE_OVERRIDES JSON.""" + cloud_event = adapter._convert_to_cloudevent(sample_event) + + # Should work normally without overrides + assert isinstance(cloud_event, CloudEvent) + assert cloud_event["source"] == "/eoapi/stac" + + @patch.dict(os.environ, {"K_CE_OVERRIDES": '{"other": "field"}'}) + def test_convert_to_cloudevent_no_extensions( + self, adapter: CloudEventsAdapter, sample_event: NotificationEvent + ) -> None: + """Test CloudEvent conversion with K_CE_OVERRIDES but no extensions field.""" + cloud_event = adapter._convert_to_cloudevent(sample_event) + + # Should work normally without extensions + assert isinstance(cloud_event, CloudEvent) + assert cloud_event["source"] == "/eoapi/stac" From f5ab59457c8deddb043725be0b562f5855fb48dd Mon Sep 17 00:00:00 2001 From: Felix Delattre Date: Thu, 18 Sep 2025 12:15:44 +0200 Subject: [PATCH 2/2] Don't look up K_CE_OVERRIDES in every call. --- eoapi_notifier/outputs/cloudevents.py | 32 ++++++++++++++++++--------- tests/test_cloudevents_output.py | 16 ++++++++++---- 2 files changed, 34 insertions(+), 14 deletions(-) diff --git a/eoapi_notifier/outputs/cloudevents.py b/eoapi_notifier/outputs/cloudevents.py index 3f108b5..c129dc7 100644 --- a/eoapi_notifier/outputs/cloudevents.py +++ b/eoapi_notifier/outputs/cloudevents.py @@ -94,6 +94,26 @@ def __init__(self, config: CloudEventsConfig) -> None: super().__init__(config) self.config: CloudEventsConfig = config self._client: httpx.AsyncClient | None = None + # Parse K_CE_OVERRIDES once during initialization + self._ce_extensions = self._parse_k_ce_overrides() + + def _parse_k_ce_overrides(self) -> dict[str, str]: + """Parse K_CE_OVERRIDES environment variable once during initialization.""" + k_ce_overrides = os.getenv("K_CE_OVERRIDES") + if not k_ce_overrides: + return {} + + try: + overrides_data = json.loads(k_ce_overrides) + extensions = overrides_data.get("extensions", {}) + if isinstance(extensions, dict): + return {str(k): str(v) for k, v in extensions.items()} + return {} + except json.JSONDecodeError: + self.logger.warning( + "Invalid K_CE_OVERRIDES JSON, ignoring: %s", k_ce_overrides + ) + return {} async def start(self) -> None: """Start the HTTP client.""" @@ -215,16 +235,8 @@ def _convert_to_cloudevent(self, event: NotificationEvent) -> CloudEvent: source = self.config.source event_type_base = self.config.event_type - # Apply KNative CE overrides if present - ce_extensions = {} - if k_ce_overrides := os.getenv("K_CE_OVERRIDES"): - try: - overrides_data = json.loads(k_ce_overrides) - ce_extensions = overrides_data.get("extensions", {}) - except json.JSONDecodeError: - self.logger.warning( - "Invalid K_CE_OVERRIDES JSON, ignoring: %s", k_ce_overrides - ) + # Use pre-parsed KNative CE overrides + ce_extensions = self._ce_extensions # Map operation to event type suffix operation_map = {"INSERT": "created", "UPDATE": "updated", "DELETE": "deleted"} diff --git a/tests/test_cloudevents_output.py b/tests/test_cloudevents_output.py index d39b049..b7f467d 100644 --- a/tests/test_cloudevents_output.py +++ b/tests/test_cloudevents_output.py @@ -319,9 +319,11 @@ async def test_health_check(self, adapter: CloudEventsAdapter) -> None: }, ) def test_convert_to_cloudevent_with_overrides( - self, adapter: CloudEventsAdapter, sample_event: NotificationEvent + self, config: CloudEventsConfig, sample_event: NotificationEvent ) -> None: """Test CloudEvent conversion with K_CE_OVERRIDES.""" + # Create adapter after environment variable is set + adapter = CloudEventsAdapter(config) cloud_event = adapter._convert_to_cloudevent(sample_event) assert isinstance(cloud_event, CloudEvent) @@ -330,18 +332,22 @@ def test_convert_to_cloudevent_with_overrides( @patch.dict(os.environ, {"K_CE_OVERRIDES": '{"extensions": {"number": 123}}'}) def test_convert_to_cloudevent_with_number_override( - self, adapter: CloudEventsAdapter, sample_event: NotificationEvent + self, config: CloudEventsConfig, sample_event: NotificationEvent ) -> None: """Test CloudEvent conversion with number in K_CE_OVERRIDES.""" + # Create adapter after environment variable is set + adapter = CloudEventsAdapter(config) cloud_event = adapter._convert_to_cloudevent(sample_event) assert cloud_event["number"] == "123" # Should be converted to string @patch.dict(os.environ, {"K_CE_OVERRIDES": "invalid-json"}) def test_convert_to_cloudevent_invalid_overrides( - self, adapter: CloudEventsAdapter, sample_event: NotificationEvent + self, config: CloudEventsConfig, sample_event: NotificationEvent ) -> None: """Test CloudEvent conversion with invalid K_CE_OVERRIDES JSON.""" + # Create adapter after environment variable is set + adapter = CloudEventsAdapter(config) cloud_event = adapter._convert_to_cloudevent(sample_event) # Should work normally without overrides @@ -350,9 +356,11 @@ def test_convert_to_cloudevent_invalid_overrides( @patch.dict(os.environ, {"K_CE_OVERRIDES": '{"other": "field"}'}) def test_convert_to_cloudevent_no_extensions( - self, adapter: CloudEventsAdapter, sample_event: NotificationEvent + self, config: CloudEventsConfig, sample_event: NotificationEvent ) -> None: """Test CloudEvent conversion with K_CE_OVERRIDES but no extensions field.""" + # Create adapter after environment variable is set + adapter = CloudEventsAdapter(config) cloud_event = adapter._convert_to_cloudevent(sample_event) # Should work normally without extensions