diff --git a/eoapi_notifier/outputs/cloudevents.py b/eoapi_notifier/outputs/cloudevents.py index 368f282..c129dc7 100644 --- a/eoapi_notifier/outputs/cloudevents.py +++ b/eoapi_notifier/outputs/cloudevents.py @@ -5,6 +5,7 @@ Supports standard CloudEvents environment variables and KNative SinkBinding. """ +import json import os from typing import Any from uuid import uuid4 @@ -29,6 +30,7 @@ class CloudEventsConfig(BasePluginConfig): max_retries: int = 3 retry_backoff: float = 1.0 max_header_length: int = 4096 + overrides: dict[str, str] = {} @field_validator("endpoint") @classmethod @@ -43,6 +45,10 @@ def apply_knative_overrides(self) -> "CloudEventsConfig": if k_sink := os.getenv("K_SINK"): self.endpoint = k_sink + if k_ce_overrides := os.getenv("K_CE_OVERRIDES"): + overrides_data = json.loads(k_ce_overrides) + self.overrides = overrides_data.get("extensions", {}) + return self @classmethod @@ -88,6 +94,26 @@ def __init__(self, config: CloudEventsConfig) -> None: super().__init__(config) self.config: CloudEventsConfig = config self._client: httpx.AsyncClient | None = None + # Parse K_CE_OVERRIDES once during initialization + self._ce_extensions = self._parse_k_ce_overrides() + + def _parse_k_ce_overrides(self) -> dict[str, str]: + """Parse K_CE_OVERRIDES environment variable once during initialization.""" + k_ce_overrides = os.getenv("K_CE_OVERRIDES") + if not k_ce_overrides: + return {} + + try: + overrides_data = json.loads(k_ce_overrides) + extensions = overrides_data.get("extensions", {}) + if isinstance(extensions, dict): + return {str(k): str(v) for k, v in extensions.items()} + return {} + except json.JSONDecodeError: + self.logger.warning( + "Invalid K_CE_OVERRIDES JSON, ignoring: %s", k_ce_overrides + ) + return {} async def start(self) -> None: """Start the HTTP client.""" @@ -209,6 +235,9 @@ def _convert_to_cloudevent(self, event: NotificationEvent) -> CloudEvent: source = self.config.source event_type_base = self.config.event_type + # Use pre-parsed KNative CE overrides + ce_extensions = self._ce_extensions + # Map operation to event type suffix operation_map = {"INSERT": "created", "UPDATE": "updated", "DELETE": "deleted"} operation = operation_map.get(event.operation.upper(), event.operation.lower()) @@ -233,6 +262,10 @@ def _convert_to_cloudevent(self, event: NotificationEvent) -> CloudEvent: if truncated_collection: attributes["collection"] = truncated_collection + # Apply KNative CE extension overrides + for key, value in ce_extensions.items(): + attributes[key] = str(value) + # Event data payload data = { "id": event.id, diff --git a/helm-chart/eoapi-notifier/values.yaml b/helm-chart/eoapi-notifier/values.yaml index 3a135b6..459b0b9 100644 --- a/helm-chart/eoapi-notifier/values.yaml +++ b/helm-chart/eoapi-notifier/values.yaml @@ -82,6 +82,7 @@ secrets: # KNative Support: # The cloudevents plugin supports K_SINK variables for KNative SinkBinding: # - K_SINK: Overrides CLOUDEVENTS_ENDPOINT (automatically set by SinkBinding) +# - K_CE_OVERRIDE: A JSON object that specifies overrides to the outbound event. env: {} # Examples - Standard environment variables: # PGSTAC_HOST: postgresql-service diff --git a/tests/test_cloudevents_output.py b/tests/test_cloudevents_output.py index 64fdfce..b7f467d 100644 --- a/tests/test_cloudevents_output.py +++ b/tests/test_cloudevents_output.py @@ -54,6 +54,29 @@ def test_connection_info(self) -> None: config = CloudEventsConfig(endpoint="https://example.com/webhook") assert "POST https://example.com/webhook" in config.get_connection_info() + @patch.dict( + os.environ, + {"K_CE_OVERRIDES": '{"extensions": {"extra": "test", "num": 42}}'}, + ) + def test_k_ce_overrides_valid(self) -> None: + """Test K_CE_OVERRIDES with valid extensions.""" + config = CloudEventsConfig() + assert config.overrides == {"extra": "test", "num": 42} + + @patch.dict(os.environ, {"K_CE_OVERRIDES": '{"other": "field"}'}) + def test_k_ce_overrides_no_extensions(self) -> None: + """Test K_CE_OVERRIDES without extensions field.""" + config = CloudEventsConfig() + assert config.overrides == {} + + @patch.dict(os.environ, {"K_CE_OVERRIDES": "invalid-json"}) + def test_k_ce_overrides_invalid_json(self) -> None: + """Test K_CE_OVERRIDES with invalid JSON.""" + from pydantic import ValidationError + + with pytest.raises(ValidationError): + CloudEventsConfig() + class TestCloudEventsAdapter: """Test CloudEvents output adapter.""" @@ -286,3 +309,60 @@ async def test_health_check(self, adapter: CloudEventsAdapter) -> None: # Running with client adapter._client = MagicMock() assert await adapter.health_check() is True + + @patch.dict( + os.environ, + { + "K_CE_OVERRIDES": ( + '{"extensions": {"extra": "test-value", "priority": "high"}}' + ) + }, + ) + def test_convert_to_cloudevent_with_overrides( + self, config: CloudEventsConfig, sample_event: NotificationEvent + ) -> None: + """Test CloudEvent conversion with K_CE_OVERRIDES.""" + # Create adapter after environment variable is set + adapter = CloudEventsAdapter(config) + cloud_event = adapter._convert_to_cloudevent(sample_event) + + assert isinstance(cloud_event, CloudEvent) + assert cloud_event["extra"] == "test-value" + assert cloud_event["priority"] == "high" + + @patch.dict(os.environ, {"K_CE_OVERRIDES": '{"extensions": {"number": 123}}'}) + def test_convert_to_cloudevent_with_number_override( + self, config: CloudEventsConfig, sample_event: NotificationEvent + ) -> None: + """Test CloudEvent conversion with number in K_CE_OVERRIDES.""" + # Create adapter after environment variable is set + adapter = CloudEventsAdapter(config) + cloud_event = adapter._convert_to_cloudevent(sample_event) + + assert cloud_event["number"] == "123" # Should be converted to string + + @patch.dict(os.environ, {"K_CE_OVERRIDES": "invalid-json"}) + def test_convert_to_cloudevent_invalid_overrides( + self, config: CloudEventsConfig, sample_event: NotificationEvent + ) -> None: + """Test CloudEvent conversion with invalid K_CE_OVERRIDES JSON.""" + # Create adapter after environment variable is set + adapter = CloudEventsAdapter(config) + cloud_event = adapter._convert_to_cloudevent(sample_event) + + # Should work normally without overrides + assert isinstance(cloud_event, CloudEvent) + assert cloud_event["source"] == "/eoapi/stac" + + @patch.dict(os.environ, {"K_CE_OVERRIDES": '{"other": "field"}'}) + def test_convert_to_cloudevent_no_extensions( + self, config: CloudEventsConfig, sample_event: NotificationEvent + ) -> None: + """Test CloudEvent conversion with K_CE_OVERRIDES but no extensions field.""" + # Create adapter after environment variable is set + adapter = CloudEventsAdapter(config) + cloud_event = adapter._convert_to_cloudevent(sample_event) + + # Should work normally without extensions + assert isinstance(cloud_event, CloudEvent) + assert cloud_event["source"] == "/eoapi/stac"