Skip to content

Commit

Permalink
feat: add trusted issuers for request token verification (#488)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexhook authored Oct 22, 2024
1 parent dc7da94 commit eb5d648
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 10 deletions.
28 changes: 20 additions & 8 deletions pybotx/bot/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
Mapping,
Optional,
Sequence,
Set,
Tuple,
Union,
)
Expand Down Expand Up @@ -302,12 +303,13 @@ def async_execute_raw_bot_command(
verify_request: bool = True,
request_headers: Optional[Mapping[str, str]] = None,
logging_command: bool = True,
trusted_issuers: Optional[Set[str]] = None,
) -> None:
if logging_command:
log_incoming_request(raw_bot_command, message="Got command: ")

if verify_request:
self._verify_request(request_headers)
self._verify_request(request_headers, trusted_issuers=trusted_issuers)

try:
bot_api_command: BotAPICommand = parse_obj_as(
Expand Down Expand Up @@ -336,6 +338,7 @@ async def sync_execute_raw_smartapp_event(
verify_request: bool = True,
request_headers: Optional[Mapping[str, str]] = None,
logging_command: bool = True,
trusted_issuers: Optional[Set[str]] = None,
) -> BotAPISyncSmartAppEventResponse:
if logging_command:
log_incoming_request(
Expand All @@ -344,7 +347,7 @@ async def sync_execute_raw_smartapp_event(
)

if verify_request:
self._verify_request(request_headers)
self._verify_request(request_headers, trusted_issuers=trusted_issuers)

try:
bot_api_smartapp_event: BotAPISyncSmartAppEvent = parse_obj_as(
Expand Down Expand Up @@ -374,16 +377,15 @@ async def raw_get_status(
query_params: Dict[str, str],
verify_request: bool = True,
request_headers: Optional[Mapping[str, str]] = None,
trusted_issuers: Optional[Set[str]] = None,
) -> Dict[str, Any]:
logger.opt(lazy=True).debug(
"Got status: {status}",
status=lambda: pformat_jsonable_obj(query_params),
)

if verify_request:
if request_headers is None:
raise RequestHeadersNotProvidedError
self._verify_request(request_headers)
self._verify_request(request_headers, trusted_issuers=trusted_issuers)

try:
bot_api_status_recipient = BotAPIStatusRecipient.parse_obj(query_params)
Expand All @@ -406,13 +408,12 @@ async def set_raw_botx_method_result(
raw_botx_method_result: Dict[str, Any],
verify_request: bool = True,
request_headers: Optional[Mapping[str, str]] = None,
trusted_issuers: Optional[Set[str]] = None,
) -> None:
logger.debug("Got callback: {callback}", callback=raw_botx_method_result)

if verify_request:
if request_headers is None:
raise RequestHeadersNotProvidedError
self._verify_request(request_headers)
self._verify_request(request_headers, trusted_issuers=trusted_issuers)

callback: BotXMethodCallback = parse_obj_as(
# Same ignore as in pydantic
Expand Down Expand Up @@ -2068,6 +2069,8 @@ async def collect_metric(
def _verify_request( # noqa: WPS231, WPS238
self,
headers: Optional[Mapping[str, str]],
*,
trusted_issuers: Optional[Set[str]] = None,
) -> None:
if headers is None:
raise RequestHeadersNotProvidedError
Expand Down Expand Up @@ -2108,11 +2111,20 @@ def _verify_request( # noqa: WPS231, WPS238
leeway=1,
options={
"verify_aud": False,
"verify_iss": False,
},
)
except jwt.InvalidTokenError as exc:
raise UnverifiedRequestError(exc.args[0]) from exc

issuer = token_payload.get("iss")
if issuer is None:
raise UnverifiedRequestError('Token is missing the "iss" claim')

if issuer != bot_account.host:
if not trusted_issuers or issuer not in trusted_issuers:
raise UnverifiedRequestError("Invalid issuer")

@staticmethod
def _build_main_collector(
collectors: Sequence[HandlerCollector],
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pybotx"
version = "0.72.0"
version = "0.73.0"
description = "A python library for interacting with eXpress BotX API"
authors = [
"Sidnev Nikolay <[email protected]>",
Expand Down
123 changes: 122 additions & 1 deletion tests/test_verify_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,85 @@ async def test__verify_request__invalid_issuer(
assert "Invalid issuer" in str(exc.value)


async def test__verify_request__trusted_issuers_have_token_issuer(
bot_account: BotAccountWithSecret,
authorization_token_payload: Dict[str, Any],
) -> None:
# - Arrange -
collector = HandlerCollector()
built_bot = Bot(collectors=[collector], bot_accounts=[bot_account])
token_issuer = "another.example.com"
authorization_token_payload["iss"] = token_issuer
token = jwt.encode(
payload=authorization_token_payload,
key=bot_account.secret_key,
)

# - Act -
async with lifespan_wrapper(built_bot) as bot:
bot._verify_request(
{"authorization": f"Bearer {token}"},
trusted_issuers={token_issuer},
)


async def test__verify_request__trusted_issuers_have_not_token_issuer(
bot_account: BotAccountWithSecret,
authorization_token_payload: Dict[str, Any],
) -> None:
# - Arrange -
collector = HandlerCollector()
built_bot = Bot(collectors=[collector], bot_accounts=[bot_account])
authorization_token_payload["iss"] = "another.example.com"
token = jwt.encode(
payload=authorization_token_payload,
key=bot_account.secret_key,
)

# - Act -
async with lifespan_wrapper(built_bot) as bot:
with pytest.raises(UnverifiedRequestError) as exc:
bot._verify_request(
{"authorization": f"Bearer {token}"},
trusted_issuers={"another-another.example.com"},
)

# - Assert -
assert "Invalid issuer" in str(exc.value)


async def test__verify_request__token_issuer_is_missed(
bot_account: BotAccountWithSecret,
authorization_token_payload: Dict[str, Any],
) -> None:
# - Arrange -
collector = HandlerCollector()
built_bot = Bot(collectors=[collector], bot_accounts=[bot_account])
del authorization_token_payload["iss"]
token = jwt.encode(
payload=authorization_token_payload,
key=bot_account.secret_key,
)

# - Act -
async with lifespan_wrapper(built_bot) as bot:
with pytest.raises(UnverifiedRequestError) as exc:
bot._verify_request(
{"authorization": f"Bearer {token}"},
)

# - Assert -
assert 'Token is missing the "iss" claim' in str(exc.value)


@pytest.mark.parametrize(
"target_func_name",
("async_execute_raw_bot_command", "raw_get_status", "set_raw_botx_method_result"),
(
"async_execute_raw_bot_command",
"sync_execute_raw_smartapp_event",
"raw_get_status",
"set_raw_botx_method_result",
),
)
async def test__verify_request__without_headers(
api_incoming_message_factory: Callable[..., Dict[str, Any]],
Expand Down Expand Up @@ -311,6 +387,31 @@ async def test__async_execute_raw_bot_command__verify_request__called(
bot._verify_request.assert_called()


async def test__sync_execute_raw_smartapp_event__verify_request__called(
api_sync_smartapp_event_factory: Callable[..., Dict[str, Any]],
collector_with_sync_smartapp_event_handler: HandlerCollector,
bot_account: BotAccountWithSecret,
) -> None:
# - Arrange -
built_bot = Bot(
collectors=[collector_with_sync_smartapp_event_handler],
bot_accounts=[bot_account],
)
payload = api_sync_smartapp_event_factory(bot_id=bot_account.id)

# - Act -
async with lifespan_wrapper(built_bot) as bot:
bot._verify_request = Mock() # type: ignore
await bot.sync_execute_raw_smartapp_event(
payload,
verify_request=True,
request_headers={},
)

# - Assert -
bot._verify_request.assert_called()


async def test__raw_get_status__verify_request__called(
api_incoming_message_factory: Callable[..., Dict[str, Any]],
bot_account: BotAccountWithSecret,
Expand Down Expand Up @@ -384,6 +485,26 @@ async def test__async_execute_raw_bot_command__verify_request__not_called(
bot.async_execute_bot_command.assert_called()


async def test__sync_execute_raw_smartapp_event__verify_request__not_called(
api_incoming_message_factory: Callable[..., Dict[str, Any]],
bot_account: BotAccountWithSecret,
) -> None:
# - Arrange -
collector = HandlerCollector()
built_bot = Bot(collectors=[collector], bot_accounts=[bot_account])
payload = api_incoming_message_factory()

# - Act -
async with lifespan_wrapper(built_bot) as bot:
bot._verify_request = Mock() # type: ignore
bot.sync_execute_raw_smartapp_event = Mock() # type: ignore
bot.sync_execute_raw_smartapp_event(payload, verify_request=False)

# - Assert -
bot._verify_request.assert_not_called()
bot.sync_execute_raw_smartapp_event.assert_called()


async def test__raw_get_status__verify_request__not_called(
api_incoming_message_factory: Callable[..., Dict[str, Any]],
bot_account: BotAccountWithSecret,
Expand Down

0 comments on commit eb5d648

Please sign in to comment.