Skip to content

Commit

Permalink
security: add validating signatures and sending signatures to webhook… (
Browse files Browse the repository at this point in the history
#26)

* security: add validating signatures and sending signatures to webhook destinations

* fix: reverted config json changes

* fix: tests

* fix: tests

* fix: all tests added signatures

* fix: lint

* fixed:  kafka creds

* fixed: long files

* chore: remove lints
  • Loading branch information
danielsinai authored Apr 17, 2024
1 parent 732e207 commit ac28fa8
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 17 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,5 @@ dmypy.json
.pyre/

.idea

.vscode/
62 changes: 55 additions & 7 deletions app/invokers/webhook_invoker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import json
import logging
import time
from typing import Any, Callable

import pyjq as jq
Expand All @@ -10,7 +12,12 @@
from port_client import report_run_response, report_run_status, run_logger_factory
from pydantic import BaseModel, Field
from requests import Response
from utils import get_invocation_method_object, get_response_body, response_to_dict
from utils import (
get_invocation_method_object,
get_response_body,
response_to_dict,
sign_sha_256,
)

logging.basicConfig(level=settings.LOG_LEVEL)
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -137,6 +144,12 @@ def _request(
request_payload.body,
)
run_logger("Sending the request")
request_payload.headers["X-Port-Timestamp"] = str(int(time.time()))
request_payload.headers["X-Port-Signature"] = sign_sha_256(
json.dumps(request_payload.body, separators=(",", ":")),
settings.PORT_CLIENT_SECRET,
request_payload.headers["X-Port-Timestamp"],
)

res = requests.request(
request_payload.method,
Expand Down Expand Up @@ -267,23 +280,58 @@ def _invoke_run(
res.raise_for_status()
run_logger("Port agent finished processing the action run")

def invoke(self, body: dict, invocation_method: dict) -> None:
def validate_incoming_signature(self, msg: dict) -> bool:
if "changelogDestination" in msg:
return True

port_signature = msg.get("headers", {}).get("X-Port-Signature")
port_timestamp = msg.get("headers", {}).get("X-Port-Timestamp")

if not port_signature or not port_timestamp:
logger.warning(
"WebhookInvoker - Could not find the required headers, skipping the"
" event invocation method for the event"
)
return False

# Remove the headers to avoid them being used in the signature verification
del msg["headers"]["X-Port-Signature"]
del msg["headers"]["X-Port-Timestamp"]

expected_sig = sign_sha_256(
json.dumps(msg, separators=(",", ":")),
settings.PORT_CLIENT_SECRET,
port_timestamp,
)
if expected_sig != port_signature:
logger.warning(
"WebhookInvoker - Could not verify signature, skipping the event"
)
return False
return True

def invoke(self, msg: dict, invocation_method: dict) -> None:
logger.info("WebhookInvoker - start - destination: %s", invocation_method)
run_id = body["context"].get("runId")
run_id = msg["context"].get("runId")

if not self.validate_incoming_signature(msg):
return

logger.info("WebhookInvoker - validating signature")

mapping = self._find_mapping(body)
mapping = self._find_mapping(msg)
if mapping is None:
logger.info(
"WebhookInvoker - Could not find suitable mapping for the event"
f" - body: {body} {', run_id: ' + run_id if run_id else ''}",
f" - msg: {msg} {', run_id: ' + run_id if run_id else ''}",
)
return

if run_id:
self._invoke_run(run_id, mapping, body, invocation_method)
self._invoke_run(run_id, mapping, msg, invocation_method)
# Used for changelog destination event trigger
elif invocation_method.get("url"):
request_payload = self._prepare_payload(mapping, body, invocation_method)
request_payload = self._prepare_payload(mapping, msg, invocation_method)
res = self._request(request_payload, lambda _: None)
res.raise_for_status()
else:
Expand Down
11 changes: 11 additions & 0 deletions app/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import base64
import hashlib
import hmac
import logging

from requests import Response
Expand Down Expand Up @@ -32,3 +35,11 @@ def get_response_body(response: Response) -> dict | str | None:
return response.json()
except ValueError:
return response.text


def sign_sha_256(input: str, secret: str, timestamp: str) -> str:
to_sign = f"{timestamp}.{input}"
new_hmac = hmac.new(bytes(secret, "utf-8"), digestmod=hashlib.sha256)
new_hmac.update(bytes(to_sign, "utf-8"))
signed = base64.b64encode(new_hmac.digest()).decode("utf-8")
return f"v1,{signed}"
26 changes: 26 additions & 0 deletions tests/unit/processors/kafka/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from core.config import Mapping
from pydantic import parse_obj_as

from app.utils import sign_sha_256


@pytest.fixture
def mock_requests(monkeypatch: MonkeyPatch, request: Any) -> None:
Expand Down Expand Up @@ -204,6 +206,12 @@ def webhook_run_payload() -> dict:
},
"properties": {},
},
"headers": {
"X-Port-Signature": "v1,uuBMfcio3oscejO5bOtL97K1AmiZjxDvou7sChjMNeE=",
# the real signature of this payload using the secret
# key test and the hardcoded timestamp mock
"X-Port-Timestamp": 1713277889,
},
}


Expand All @@ -214,6 +222,16 @@ def get_run_message(invocation_method: dict) -> bytes:
webhook_run_payload["payload"]["action"][
"invocationMethod"
] = invocation_method
# When mutating the payload, we need to ensure that the
# headers are also updated
timestamp = webhook_run_payload["headers"]["X-Port-Timestamp"]
webhook_run_payload["headers"] = {}
webhook_run_payload["headers"]["X-Port-Signature"] = sign_sha_256(
json.dumps(webhook_run_payload, separators=(",", ":")),
"test",
str(timestamp),
)
webhook_run_payload["headers"]["X-Port-Timestamp"] = timestamp
return json.dumps(webhook_run_payload).encode()

return get_run_message
Expand Down Expand Up @@ -248,3 +266,11 @@ def mock_control_the_payload_config(monkeypatch: MonkeyPatch) -> list[dict[str,
)

return control_the_payload_config


@pytest.fixture
def mock_timestamp(monkeypatch: MonkeyPatch, request: Any) -> None:
def mock_timestamp() -> int:
return 1713277889

monkeypatch.setattr("time.time", mock_timestamp)
54 changes: 48 additions & 6 deletions tests/unit/processors/kafka/test_kafka_to_webhook_processor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import json
import time
from copy import deepcopy
from threading import Timer
from unittest import mock
from unittest.mock import ANY, call
Expand All @@ -9,6 +12,7 @@
from pytest_mock import MockFixture
from streamers.kafka.kafka_streamer import KafkaStreamer

from app.utils import sign_sha_256
from tests.unit.processors.kafka.conftest import Consumer, terminate_consumer


Expand All @@ -21,7 +25,10 @@
],
indirect=True,
)
def test_single_stream_success(mock_requests: None, mock_kafka: dict) -> None:
@pytest.mark.parametrize("mock_timestamp", [{}], indirect=True)
def test_single_stream_success(
mock_requests: None, mock_kafka: dict, mock_timestamp: None
) -> None:
Timer(0.01, terminate_consumer).start()

with mock.patch.object(consumer_logger, "error") as mock_error:
Expand All @@ -40,7 +47,10 @@ def test_single_stream_success(mock_requests: None, mock_kafka: dict) -> None:
],
indirect=True,
)
def test_single_stream_failed(mock_requests: None, mock_kafka: dict) -> None:
@pytest.mark.parametrize("mock_timestamp", [{}], indirect=True)
def test_single_stream_failed(
mock_requests: None, mock_kafka: dict, mock_timestamp: None
) -> None:
Timer(0.01, terminate_consumer).start()

with mock.patch.object(consumer_logger, "error") as mock_error:
Expand Down Expand Up @@ -69,16 +79,24 @@ def test_single_stream_failed(mock_requests: None, mock_kafka: dict) -> None:
],
indirect=True,
)
@pytest.mark.parametrize("mock_timestamp", [{}], indirect=True)
def test_single_stream_success_control_the_payload(
monkeypatch: MonkeyPatch,
mocker: MockFixture,
mock_requests: None,
mock_kafka: dict,
mock_timestamp: None,
mock_control_the_payload_config: list[Mapping],
) -> None:
expected_body = mock_kafka
expected_body = deepcopy(mock_kafka)
expected_headers = {"MY-HEADER": mock_kafka["resourceType"]}
expected_query: dict[str, ANY] = {}
if "changelogDestination" not in mock_kafka:
del expected_body["headers"]["X-Port-Signature"]
del expected_body["headers"]["X-Port-Timestamp"]

expected_headers["X-Port-Timestamp"] = ANY
expected_headers["X-Port-Signature"] = ANY
Timer(0.01, terminate_consumer).start()
request_mock = mocker.patch("requests.request")
request_mock.return_value.headers = {}
Expand Down Expand Up @@ -123,22 +141,30 @@ def test_single_stream_success_control_the_payload(
],
indirect=True,
)
@pytest.mark.parametrize("mock_timestamp", [{}], indirect=True)
def test_invocation_method_synchronized(
monkeypatch: MonkeyPatch,
mocker: MockFixture,
mock_requests: None,
mock_kafka: dict,
mock_timestamp: None,
mock_control_the_payload_config: list[Mapping],
webhook_run_payload: dict,
) -> None:
expected_body = webhook_run_payload
expected_body = deepcopy(webhook_run_payload)
expected_headers = {"MY-HEADER": mock_kafka["resourceType"]}

expected_query: dict[str, ANY] = {}
Timer(0.01, terminate_consumer).start()
request_mock = mocker.patch("requests.request")
request_patch_mock = mocker.patch("requests.patch")
mocker.patch("pathlib.Path.is_file", side_effect=(True,))

del expected_body["headers"]["X-Port-Signature"]
del expected_body["headers"]["X-Port-Timestamp"]

expected_headers["X-Port-Timestamp"] = ANY
expected_headers["X-Port-Signature"] = ANY
with mock.patch.object(consumer_logger, "error") as mock_error:
streamer = KafkaStreamer(Consumer())
streamer.stream()
Expand All @@ -150,6 +176,7 @@ def test_invocation_method_synchronized(
params=expected_query,
timeout=settings.WEBHOOK_INVOKER_TIMEOUT,
)

request_patch_mock.assert_has_calls(
calls=[
call(
Expand Down Expand Up @@ -199,27 +226,42 @@ def test_invocation_method_synchronized(
],
indirect=True,
)
@pytest.mark.parametrize("mock_timestamp", [{}], indirect=True)
def test_invocation_method_method_override(
monkeypatch: MonkeyPatch,
mocker: MockFixture,
mock_requests: None,
mock_kafka: dict,
mock_timestamp: None,
mock_control_the_payload_config: list[Mapping],
) -> None:
expected_body = mock_kafka
expected_headers = {"MY-HEADER": mock_kafka["resourceType"]}
expected_headers = {
"MY-HEADER": mock_kafka["resourceType"],
}

if "changelogDestination" not in mock_kafka:
del expected_body["headers"]["X-Port-Signature"]
del expected_body["headers"]["X-Port-Timestamp"]

expected_headers["X-Port-Timestamp"] = str(time.time())
expected_headers["X-Port-Signature"] = sign_sha_256(
json.dumps(expected_body, separators=(",", ":")), "test", time.time()
)

expected_query: dict[str, ANY] = {}
Timer(0.01, terminate_consumer).start()
request_mock = mocker.patch("requests.request")
mocker.patch("pathlib.Path.is_file", side_effect=(True,))

with mock.patch.object(consumer_logger, "error") as mock_error:
streamer = KafkaStreamer(Consumer())
streamer.stream()
request_mock.assert_called_once_with(
"GET",
ANY,
json=expected_body,
# we are removing the signature headers from the
# body is it shouldn't concern the invoked webhook
headers=expected_headers,
params=expected_query,
timeout=settings.WEBHOOK_INVOKER_TIMEOUT,
Expand Down
26 changes: 26 additions & 0 deletions tests/unit/streamers/kafka/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@
from _pytest.monkeypatch import MonkeyPatch
from confluent_kafka import Consumer as _Consumer

from app.utils import sign_sha_256


@pytest.fixture
def mock_timestamp(monkeypatch: MonkeyPatch, request: Any) -> None:
def mock_timestamp() -> int:
return 1713277889

monkeypatch.setattr("time.time", mock_timestamp)


@pytest.fixture
def mock_requests(monkeypatch: MonkeyPatch, request: Any) -> None:
Expand Down Expand Up @@ -151,11 +161,27 @@ def mock_webhook_run_message() -> Callable[[dict], bytes]:
},
"properties": {},
},
"headers": {
"X-Port-Signature": "v1,uuBMfcio3oscejO5bOtL97K1AmiZjxDvou7sChjMNeE=",
# the real signature of this payload using the secret
# key test and the hardcoded timestamp mock
"X-Port-Timestamp": 1713277889,
},
}

def get_run_message(invocation_method: dict) -> bytes:
if invocation_method is not None:
run_message["payload"]["action"]["invocationMethod"] = invocation_method
# When mutating the payload, we need to ensure that the
# headers are also updated
timestamp = run_message["headers"]["X-Port-Timestamp"]
run_message["headers"] = {}
run_message["headers"]["X-Port-Signature"] = sign_sha_256(
json.dumps(run_message, separators=(",", ":")),
"test",
str(timestamp),
)
run_message["headers"]["X-Port-Timestamp"] = timestamp
return json.dumps(run_message).encode()

return get_run_message
Loading

0 comments on commit ac28fa8

Please sign in to comment.