From cfcf95c4f6d94559796a6fd31feed4265296e784 Mon Sep 17 00:00:00 2001 From: Keith Hall Date: Thu, 7 Sep 2023 13:29:32 +0300 Subject: [PATCH 1/6] change dependency from requests to httpx --- python/zeroeventhub/CHANGELOG.md | 7 + python/zeroeventhub/README.md | 74 +++-- python/zeroeventhub/poetry.lock | 208 +++++-------- python/zeroeventhub/pyproject.toml | 22 +- python/zeroeventhub/tests/test_client.py | 280 +++++++++--------- .../tests/test_page_event_receiver.py | 38 +-- python/zeroeventhub/zeroeventhub/__init__.py | 6 +- python/zeroeventhub/zeroeventhub/client.py | 106 +++---- python/zeroeventhub/zeroeventhub/event.py | 13 + .../zeroeventhub/event_receiver.py | 27 +- .../zeroeventhub/page_event_receiver.py | 17 +- 11 files changed, 387 insertions(+), 411 deletions(-) create mode 100644 python/zeroeventhub/CHANGELOG.md create mode 100644 python/zeroeventhub/zeroeventhub/event.py diff --git a/python/zeroeventhub/CHANGELOG.md b/python/zeroeventhub/CHANGELOG.md new file mode 100644 index 0000000..ffa1abf --- /dev/null +++ b/python/zeroeventhub/CHANGELOG.md @@ -0,0 +1,7 @@ +# ZeroEventHub Changelog + +## v2.0 + +Switched from `requests` to `httpx` due to a memory leak in `requests` with long-lived sessions and many requests. +- As a result, we are now using an `AsyncClient` which is the `httpx` equivalent of a `requests.Session`. After observing usage patterns, it was noted that the session was always passed in as a parameter, due to consuming feeds which require authentication, so `AsyncClient` has been made mandatory instead of optional to simplify the code a bit. +- It was also observed that it can be useful to directly receive the parsed response as it is streamed from the server instead of being forced to use an `EventReceiver` protocol. This makes it easier to process received events in batches. As the above dependency change is a breaking change anyway, and due to the switch to async calls, it was the perfect time to switch the `Client.fetch_events` method to return an `AsyncGenerator`. A helper method called `receive_events` has been introduced to make it easy to keep the `EventReceiver` behavior from before. diff --git a/python/zeroeventhub/README.md b/python/zeroeventhub/README.md index 553d639..f18c565 100644 --- a/python/zeroeventhub/README.md +++ b/python/zeroeventhub/README.md @@ -12,43 +12,55 @@ database. Example of simple single-partition consumption. *Note about the exampl * Things starting with "their" is supplied by the service you connect to ```python +>>> import zeroeventhub +>>> import httpx +>>> import asyncio +>>> from typing import Sequence +>>> from unittest.mock import MagicMock, Mock, PropertyMock + +>>> my_db = MagicMock() +>>> my_person_event_repository = Mock() +>>> my_person_event_repository.read_cursors_from_db.return_value = None + # Step 1: Setup -their_partition_count = 1 # documented contract with server -zeh_session = requests.Session() # you can setup the authentication on the session -client = zeroeventhub.Client(their_service_url, their_partition_count, zeh_session) +>>> their_partition_count = 1 # documented contract with server +>>> their_service_url = "https://localhost:8192/person/feed/v1" +>>> my_zeh_session = httpx.AsyncClient() # you can setup the authentication on the session +>>> client = zeroeventhub.Client(their_service_url, their_partition_count, my_zeh_session) # Step 2: Load the cursors from last time we ran -cursors = my_get_cursors_from_db() -if not cursors: - # we have never run before, so we can get all events with FIRST_CURSOR - # (if we just want to receive new events from now, we would use LAST_CURSOR) - cursors = [ - zeroeventhub.Cursor(partition_id, zeroeventhub.FIRST_CURSOR) - for partition_id in range(their_partition_count) - ] +>>> cursors = my_person_event_repository.read_cursors_from_db() +>>> if not cursors: +... # we have never run before, so we can get all events with FIRST_CURSOR +... # (if we just want to receive new events from now, we would use LAST_CURSOR) +... cursors = [ +... zeroeventhub.Cursor(partition_id, zeroeventhub.FIRST_CURSOR) +... for partition_id in range(their_partition_count) +... ] # Step 3: Enter listening loop... -page_of_events = PageEventReceiver() -while myStillWantToReadEvents: - # Step 4: Use ZeroEventHub client to fetch the next page of events. - client.fetch_events( - cursors, - my_page_size_hint, - page_of_events - ) - - # Step 5: Write the effect of changes to our own database and the updated - # cursor value in the same transaction. - with db.begin_transaction() as tx: - my_write_effect_of_events_to_db(tx, page_of_events.events) - - my_write_cursors_to_db(tx, page_of_events.latest_checkpoints) - - tx.commit() - - cursors = page_of_events.latest_checkpoints +>>> my_still_want_to_read_events = PropertyMock(side_effect=[True, False]) + +>>> async def poll_for_events(cursors: Sequence[zeroeventhub.Cursor]) -> None: +... page_of_events = zeroeventhub.PageEventReceiver() +... while my_still_want_to_read_events(): +... # Step 4: Use ZeroEventHub client to fetch the next page of events. +... await zeroeventhub.receive_events(page_of_events, +... client.fetch_events(cursors), +... ) +... +... # Step 5: Write the effect of changes to our own database and the updated +... # cursor value in the same transaction. +... with my_db.begin_transaction() as tx: +... my_person_event_repository.write_effect_of_events_to_db(tx, page_of_events.events) +... my_person_event_repository.write_cursors_to_db(tx, page_of_events.latest_checkpoints) +... tx.commit() +... +... cursors = page_of_events.latest_checkpoints +... page_of_events.clear() + +>>> asyncio.run(poll_for_events(cursors)) - page_of_events.clear() ``` ## Development diff --git a/python/zeroeventhub/poetry.lock b/python/zeroeventhub/poetry.lock index 56eb767..a9b3d02 100644 --- a/python/zeroeventhub/poetry.lock +++ b/python/zeroeventhub/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.4.2 and should not be changed by hand. +# This file is automatically @generated by Poetry and should not be changed by hand. [[package]] name = "annotated-types" @@ -247,7 +247,7 @@ pycparser = "*" name = "charset-normalizer" version = "3.2.0" description = "The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet." -category = "main" +category = "dev" optional = false python-versions = ">=3.7.0" files = [ @@ -373,64 +373,64 @@ files = [ [[package]] name = "coverage" -version = "7.3.0" +version = "7.3.1" description = "Code coverage measurement for Python" category = "dev" optional = false python-versions = ">=3.8" files = [ - {file = "coverage-7.3.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:db76a1bcb51f02b2007adacbed4c88b6dee75342c37b05d1822815eed19edee5"}, - {file = "coverage-7.3.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c02cfa6c36144ab334d556989406837336c1d05215a9bdf44c0bc1d1ac1cb637"}, - {file = "coverage-7.3.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:477c9430ad5d1b80b07f3c12f7120eef40bfbf849e9e7859e53b9c93b922d2af"}, - {file = "coverage-7.3.0-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ce2ee86ca75f9f96072295c5ebb4ef2a43cecf2870b0ca5e7a1cbdd929cf67e1"}, - {file = "coverage-7.3.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:68d8a0426b49c053013e631c0cdc09b952d857efa8f68121746b339912d27a12"}, - {file = "coverage-7.3.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:b3eb0c93e2ea6445b2173da48cb548364f8f65bf68f3d090404080d338e3a689"}, - {file = "coverage-7.3.0-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:90b6e2f0f66750c5a1178ffa9370dec6c508a8ca5265c42fbad3ccac210a7977"}, - {file = "coverage-7.3.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:96d7d761aea65b291a98c84e1250cd57b5b51726821a6f2f8df65db89363be51"}, - {file = "coverage-7.3.0-cp310-cp310-win32.whl", hash = "sha256:63c5b8ecbc3b3d5eb3a9d873dec60afc0cd5ff9d9f1c75981d8c31cfe4df8527"}, - {file = "coverage-7.3.0-cp310-cp310-win_amd64.whl", hash = "sha256:97c44f4ee13bce914272589b6b41165bbb650e48fdb7bd5493a38bde8de730a1"}, - {file = "coverage-7.3.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:74c160285f2dfe0acf0f72d425f3e970b21b6de04157fc65adc9fd07ee44177f"}, - {file = "coverage-7.3.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:b543302a3707245d454fc49b8ecd2c2d5982b50eb63f3535244fd79a4be0c99d"}, - {file = "coverage-7.3.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ad0f87826c4ebd3ef484502e79b39614e9c03a5d1510cfb623f4a4a051edc6fd"}, - {file = "coverage-7.3.0-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:13c6cbbd5f31211d8fdb477f0f7b03438591bdd077054076eec362cf2207b4a7"}, - {file = "coverage-7.3.0-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fac440c43e9b479d1241fe9d768645e7ccec3fb65dc3a5f6e90675e75c3f3e3a"}, - {file = "coverage-7.3.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:3c9834d5e3df9d2aba0275c9f67989c590e05732439b3318fa37a725dff51e74"}, - {file = "coverage-7.3.0-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:4c8e31cf29b60859876474034a83f59a14381af50cbe8a9dbaadbf70adc4b214"}, - {file = "coverage-7.3.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:7a9baf8e230f9621f8e1d00c580394a0aa328fdac0df2b3f8384387c44083c0f"}, - {file = "coverage-7.3.0-cp311-cp311-win32.whl", hash = "sha256:ccc51713b5581e12f93ccb9c5e39e8b5d4b16776d584c0f5e9e4e63381356482"}, - {file = "coverage-7.3.0-cp311-cp311-win_amd64.whl", hash = "sha256:887665f00ea4e488501ba755a0e3c2cfd6278e846ada3185f42d391ef95e7e70"}, - {file = "coverage-7.3.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:d000a739f9feed900381605a12a61f7aaced6beae832719ae0d15058a1e81c1b"}, - {file = "coverage-7.3.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:59777652e245bb1e300e620ce2bef0d341945842e4eb888c23a7f1d9e143c446"}, - {file = "coverage-7.3.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c9737bc49a9255d78da085fa04f628a310c2332b187cd49b958b0e494c125071"}, - {file = "coverage-7.3.0-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:5247bab12f84a1d608213b96b8af0cbb30d090d705b6663ad794c2f2a5e5b9fe"}, - {file = "coverage-7.3.0-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e2ac9a1de294773b9fa77447ab7e529cf4fe3910f6a0832816e5f3d538cfea9a"}, - {file = "coverage-7.3.0-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:85b7335c22455ec12444cec0d600533a238d6439d8d709d545158c1208483873"}, - {file = "coverage-7.3.0-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:36ce5d43a072a036f287029a55b5c6a0e9bd73db58961a273b6dc11a2c6eb9c2"}, - {file = "coverage-7.3.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:211a4576e984f96d9fce61766ffaed0115d5dab1419e4f63d6992b480c2bd60b"}, - {file = "coverage-7.3.0-cp312-cp312-win32.whl", hash = "sha256:56afbf41fa4a7b27f6635bc4289050ac3ab7951b8a821bca46f5b024500e6321"}, - {file = "coverage-7.3.0-cp312-cp312-win_amd64.whl", hash = "sha256:7f297e0c1ae55300ff688568b04ff26b01c13dfbf4c9d2b7d0cb688ac60df479"}, - {file = "coverage-7.3.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:ac0dec90e7de0087d3d95fa0533e1d2d722dcc008bc7b60e1143402a04c117c1"}, - {file = "coverage-7.3.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:438856d3f8f1e27f8e79b5410ae56650732a0dcfa94e756df88c7e2d24851fcd"}, - {file = "coverage-7.3.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1084393c6bda8875c05e04fce5cfe1301a425f758eb012f010eab586f1f3905e"}, - {file = "coverage-7.3.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:49ab200acf891e3dde19e5aa4b0f35d12d8b4bd805dc0be8792270c71bd56c54"}, - {file = "coverage-7.3.0-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a67e6bbe756ed458646e1ef2b0778591ed4d1fcd4b146fc3ba2feb1a7afd4254"}, - {file = "coverage-7.3.0-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:8f39c49faf5344af36042b293ce05c0d9004270d811c7080610b3e713251c9b0"}, - {file = "coverage-7.3.0-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:7df91fb24c2edaabec4e0eee512ff3bc6ec20eb8dccac2e77001c1fe516c0c84"}, - {file = "coverage-7.3.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:34f9f0763d5fa3035a315b69b428fe9c34d4fc2f615262d6be3d3bf3882fb985"}, - {file = "coverage-7.3.0-cp38-cp38-win32.whl", hash = "sha256:bac329371d4c0d456e8d5f38a9b0816b446581b5f278474e416ea0c68c47dcd9"}, - {file = "coverage-7.3.0-cp38-cp38-win_amd64.whl", hash = "sha256:b859128a093f135b556b4765658d5d2e758e1fae3e7cc2f8c10f26fe7005e543"}, - {file = "coverage-7.3.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:fc0ed8d310afe013db1eedd37176d0839dc66c96bcfcce8f6607a73ffea2d6ba"}, - {file = "coverage-7.3.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:e61260ec93f99f2c2d93d264b564ba912bec502f679793c56f678ba5251f0393"}, - {file = "coverage-7.3.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:97af9554a799bd7c58c0179cc8dbf14aa7ab50e1fd5fa73f90b9b7215874ba28"}, - {file = "coverage-7.3.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:3558e5b574d62f9c46b76120a5c7c16c4612dc2644c3d48a9f4064a705eaee95"}, - {file = "coverage-7.3.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:37d5576d35fcb765fca05654f66aa71e2808d4237d026e64ac8b397ffa66a56a"}, - {file = "coverage-7.3.0-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:07ea61bcb179f8f05ffd804d2732b09d23a1238642bf7e51dad62082b5019b34"}, - {file = "coverage-7.3.0-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:80501d1b2270d7e8daf1b64b895745c3e234289e00d5f0e30923e706f110334e"}, - {file = "coverage-7.3.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:4eddd3153d02204f22aef0825409091a91bf2a20bce06fe0f638f5c19a85de54"}, - {file = "coverage-7.3.0-cp39-cp39-win32.whl", hash = "sha256:2d22172f938455c156e9af2612650f26cceea47dc86ca048fa4e0b2d21646ad3"}, - {file = "coverage-7.3.0-cp39-cp39-win_amd64.whl", hash = "sha256:60f64e2007c9144375dd0f480a54d6070f00bb1a28f65c408370544091c9bc9e"}, - {file = "coverage-7.3.0-pp38.pp39.pp310-none-any.whl", hash = "sha256:5492a6ce3bdb15c6ad66cb68a0244854d9917478877a25671d70378bdc8562d0"}, - {file = "coverage-7.3.0.tar.gz", hash = "sha256:49dbb19cdcafc130f597d9e04a29d0a032ceedf729e41b181f51cd170e6ee865"}, + {file = "coverage-7.3.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:cd0f7429ecfd1ff597389907045ff209c8fdb5b013d38cfa7c60728cb484b6e3"}, + {file = "coverage-7.3.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:966f10df9b2b2115da87f50f6a248e313c72a668248be1b9060ce935c871f276"}, + {file = "coverage-7.3.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0575c37e207bb9b98b6cf72fdaaa18ac909fb3d153083400c2d48e2e6d28bd8e"}, + {file = "coverage-7.3.1-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:245c5a99254e83875c7fed8b8b2536f040997a9b76ac4c1da5bff398c06e860f"}, + {file = "coverage-7.3.1-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4c96dd7798d83b960afc6c1feb9e5af537fc4908852ef025600374ff1a017392"}, + {file = "coverage-7.3.1-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:de30c1aa80f30af0f6b2058a91505ea6e36d6535d437520067f525f7df123887"}, + {file = "coverage-7.3.1-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:50dd1e2dd13dbbd856ffef69196781edff26c800a74f070d3b3e3389cab2600d"}, + {file = "coverage-7.3.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:b9c0c19f70d30219113b18fe07e372b244fb2a773d4afde29d5a2f7930765136"}, + {file = "coverage-7.3.1-cp310-cp310-win32.whl", hash = "sha256:770f143980cc16eb601ccfd571846e89a5fe4c03b4193f2e485268f224ab602f"}, + {file = "coverage-7.3.1-cp310-cp310-win_amd64.whl", hash = "sha256:cdd088c00c39a27cfa5329349cc763a48761fdc785879220d54eb785c8a38520"}, + {file = "coverage-7.3.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:74bb470399dc1989b535cb41f5ca7ab2af561e40def22d7e188e0a445e7639e3"}, + {file = "coverage-7.3.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:025ded371f1ca280c035d91b43252adbb04d2aea4c7105252d3cbc227f03b375"}, + {file = "coverage-7.3.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a6191b3a6ad3e09b6cfd75b45c6aeeffe7e3b0ad46b268345d159b8df8d835f9"}, + {file = "coverage-7.3.1-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7eb0b188f30e41ddd659a529e385470aa6782f3b412f860ce22b2491c89b8593"}, + {file = "coverage-7.3.1-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:75c8f0df9dfd8ff745bccff75867d63ef336e57cc22b2908ee725cc552689ec8"}, + {file = "coverage-7.3.1-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:7eb3cd48d54b9bd0e73026dedce44773214064be93611deab0b6a43158c3d5a0"}, + {file = "coverage-7.3.1-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:ac3c5b7e75acac31e490b7851595212ed951889918d398b7afa12736c85e13ce"}, + {file = "coverage-7.3.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:5b4ee7080878077af0afa7238df1b967f00dc10763f6e1b66f5cced4abebb0a3"}, + {file = "coverage-7.3.1-cp311-cp311-win32.whl", hash = "sha256:229c0dd2ccf956bf5aeede7e3131ca48b65beacde2029f0361b54bf93d36f45a"}, + {file = "coverage-7.3.1-cp311-cp311-win_amd64.whl", hash = "sha256:c6f55d38818ca9596dc9019eae19a47410d5322408140d9a0076001a3dcb938c"}, + {file = "coverage-7.3.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:5289490dd1c3bb86de4730a92261ae66ea8d44b79ed3cc26464f4c2cde581fbc"}, + {file = "coverage-7.3.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:ca833941ec701fda15414be400c3259479bfde7ae6d806b69e63b3dc423b1832"}, + {file = "coverage-7.3.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:cd694e19c031733e446c8024dedd12a00cda87e1c10bd7b8539a87963685e969"}, + {file = "coverage-7.3.1-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:aab8e9464c00da5cb9c536150b7fbcd8850d376d1151741dd0d16dfe1ba4fd26"}, + {file = "coverage-7.3.1-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:87d38444efffd5b056fcc026c1e8d862191881143c3aa80bb11fcf9dca9ae204"}, + {file = "coverage-7.3.1-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:8a07b692129b8a14ad7a37941a3029c291254feb7a4237f245cfae2de78de037"}, + {file = "coverage-7.3.1-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:2829c65c8faaf55b868ed7af3c7477b76b1c6ebeee99a28f59a2cb5907a45760"}, + {file = "coverage-7.3.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:1f111a7d85658ea52ffad7084088277135ec5f368457275fc57f11cebb15607f"}, + {file = "coverage-7.3.1-cp312-cp312-win32.whl", hash = "sha256:c397c70cd20f6df7d2a52283857af622d5f23300c4ca8e5bd8c7a543825baa5a"}, + {file = "coverage-7.3.1-cp312-cp312-win_amd64.whl", hash = "sha256:5ae4c6da8b3d123500f9525b50bf0168023313963e0e2e814badf9000dd6ef92"}, + {file = "coverage-7.3.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:ca70466ca3a17460e8fc9cea7123c8cbef5ada4be3140a1ef8f7b63f2f37108f"}, + {file = "coverage-7.3.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:f2781fd3cabc28278dc982a352f50c81c09a1a500cc2086dc4249853ea96b981"}, + {file = "coverage-7.3.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6407424621f40205bbe6325686417e5e552f6b2dba3535dd1f90afc88a61d465"}, + {file = "coverage-7.3.1-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:04312b036580ec505f2b77cbbdfb15137d5efdfade09156961f5277149f5e344"}, + {file = "coverage-7.3.1-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ac9ad38204887349853d7c313f53a7b1c210ce138c73859e925bc4e5d8fc18e7"}, + {file = "coverage-7.3.1-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:53669b79f3d599da95a0afbef039ac0fadbb236532feb042c534fbb81b1a4e40"}, + {file = "coverage-7.3.1-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:614f1f98b84eb256e4f35e726bfe5ca82349f8dfa576faabf8a49ca09e630086"}, + {file = "coverage-7.3.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:f1a317fdf5c122ad642db8a97964733ab7c3cf6009e1a8ae8821089993f175ff"}, + {file = "coverage-7.3.1-cp38-cp38-win32.whl", hash = "sha256:defbbb51121189722420a208957e26e49809feafca6afeef325df66c39c4fdb3"}, + {file = "coverage-7.3.1-cp38-cp38-win_amd64.whl", hash = "sha256:f4f456590eefb6e1b3c9ea6328c1e9fa0f1006e7481179d749b3376fc793478e"}, + {file = "coverage-7.3.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:f12d8b11a54f32688b165fd1a788c408f927b0960984b899be7e4c190ae758f1"}, + {file = "coverage-7.3.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:f09195dda68d94a53123883de75bb97b0e35f5f6f9f3aa5bf6e496da718f0cb6"}, + {file = "coverage-7.3.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c6601a60318f9c3945be6ea0f2a80571f4299b6801716f8a6e4846892737ebe4"}, + {file = "coverage-7.3.1-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:07d156269718670d00a3b06db2288b48527fc5f36859425ff7cec07c6b367745"}, + {file = "coverage-7.3.1-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:636a8ac0b044cfeccae76a36f3b18264edcc810a76a49884b96dd744613ec0b7"}, + {file = "coverage-7.3.1-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:5d991e13ad2ed3aced177f524e4d670f304c8233edad3210e02c465351f785a0"}, + {file = "coverage-7.3.1-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:586649ada7cf139445da386ab6f8ef00e6172f11a939fc3b2b7e7c9082052fa0"}, + {file = "coverage-7.3.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:4aba512a15a3e1e4fdbfed2f5392ec221434a614cc68100ca99dcad7af29f3f8"}, + {file = "coverage-7.3.1-cp39-cp39-win32.whl", hash = "sha256:6bc6f3f4692d806831c136c5acad5ccedd0262aa44c087c46b7101c77e139140"}, + {file = "coverage-7.3.1-cp39-cp39-win_amd64.whl", hash = "sha256:553d7094cb27db58ea91332e8b5681bac107e7242c23f7629ab1316ee73c4981"}, + {file = "coverage-7.3.1-pp38.pp39.pp310-none-any.whl", hash = "sha256:220eb51f5fb38dfdb7e5d54284ca4d0cd70ddac047d750111a68ab1798945194"}, + {file = "coverage-7.3.1.tar.gz", hash = "sha256:6cb7fe1581deb67b782c153136541e20901aa312ceedaf1467dcb35255787952"}, ] [package.extras] @@ -672,7 +672,7 @@ pyflakes = ">=3.1.0,<3.2.0" name = "h11" version = "0.14.0" description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1" -category = "dev" +category = "main" optional = false python-versions = ">=3.7" files = [ @@ -706,7 +706,7 @@ lxml = ["lxml"] name = "httpcore" version = "0.17.3" description = "A minimal low-level HTTP client." -category = "dev" +category = "main" optional = false python-versions = ">=3.7" files = [ @@ -728,7 +728,7 @@ socks = ["socksio (>=1.0.0,<2.0.0)"] name = "httpx" version = "0.24.1" description = "The next generation HTTP client." -category = "dev" +category = "main" optional = false python-versions = ">=3.7" files = [ @@ -988,17 +988,6 @@ files = [ {file = "mccabe-0.7.0.tar.gz", hash = "sha256:348e0240c33b60bbdf4e523192ef919f28cb2c3d7d5c7794f74009290f236325"}, ] -[[package]] -name = "mock-protocol" -version = "1.0.0" -description = "A unittest mock library than understands annotations" -category = "dev" -optional = false -python-versions = ">=3.6, <4" -files = [ - {file = "mock_protocol-1.0.0.tar.gz", hash = "sha256:2ecf2d7664c3e5de2cc570a108fcbd1b27dad82b40e639f6be7df51bad3e95dc"}, -] - [[package]] name = "more-itertools" version = "10.1.0" @@ -1554,14 +1543,14 @@ tomli = {version = ">=1.1.0", markers = "python_version < \"3.11\""} [[package]] name = "pytest" -version = "7.4.1" +version = "7.4.2" description = "pytest: simple powerful testing with Python" category = "dev" optional = false python-versions = ">=3.7" files = [ - {file = "pytest-7.4.1-py3-none-any.whl", hash = "sha256:460c9a59b14e27c602eb5ece2e47bec99dc5fc5f6513cf924a7d03a578991b1f"}, - {file = "pytest-7.4.1.tar.gz", hash = "sha256:2f2301e797521b23e4d2585a0a3d7b5e50fdddaaf7e7d6773ea26ddb17c213ab"}, + {file = "pytest-7.4.2-py3-none-any.whl", hash = "sha256:1d881c6124e08ff0a1bb75ba3ec0bfd8b5354a01c194ddd5a0a870a48d99b002"}, + {file = "pytest-7.4.2.tar.gz", hash = "sha256:a766259cfab564a2ad52cb1aae1b881a75c3eb7e34ca3779697c23ed47c47069"}, ] [package.dependencies] @@ -1749,7 +1738,7 @@ rpds-py = ">=0.7.0" name = "requests" version = "2.31.0" description = "Python HTTP for Humans." -category = "main" +category = "dev" optional = false python-versions = ">=3.7" files = [ @@ -1783,25 +1772,19 @@ files = [ requests = ">=2.0.1,<3.0.0" [[package]] -name = "responses" -version = "0.22.0" -description = "A utility library for mocking out the `requests` Python library." +name = "respx" +version = "0.20.2" +description = "A utility for mocking out the Python HTTPX and HTTP Core libraries." category = "dev" optional = false python-versions = ">=3.7" files = [ - {file = "responses-0.22.0-py3-none-any.whl", hash = "sha256:dcf294d204d14c436fddcc74caefdbc5764795a40ff4e6a7740ed8ddbf3294be"}, - {file = "responses-0.22.0.tar.gz", hash = "sha256:396acb2a13d25297789a5866b4881cf4e46ffd49cc26c43ab1117f40b973102e"}, + {file = "respx-0.20.2-py2.py3-none-any.whl", hash = "sha256:ab8e1cf6da28a5b2dd883ea617f8130f77f676736e6e9e4a25817ad116a172c9"}, + {file = "respx-0.20.2.tar.gz", hash = "sha256:07cf4108b1c88b82010f67d3c831dae33a375c7b436e54d87737c7f9f99be643"}, ] [package.dependencies] -requests = ">=2.22.0,<3.0" -toml = "*" -types-toml = "*" -urllib3 = ">=1.25.10" - -[package.extras] -tests = ["coverage (>=6.0.0)", "flake8", "mypy", "pytest (>=7.0.0)", "pytest-asyncio", "pytest-cov", "pytest-httpserver", "types-requests"] +httpx = ">=0.21.0" [[package]] name = "rpds-py" @@ -1981,18 +1964,6 @@ typing-extensions = {version = ">=3.10.0", markers = "python_version < \"3.10\"" [package.extras] full = ["httpx (>=0.22.0)", "itsdangerous", "jinja2", "python-multipart", "pyyaml"] -[[package]] -name = "toml" -version = "0.10.2" -description = "Python Library for Tom's Obvious, Minimal Language" -category = "dev" -optional = false -python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" -files = [ - {file = "toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b"}, - {file = "toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"}, -] - [[package]] name = "tomli" version = "2.0.1" @@ -2029,45 +2000,6 @@ files = [ {file = "trove_classifiers-2023.8.7-py3-none-any.whl", hash = "sha256:a676626a31286130d56de2ea1232484df97c567eb429d56cfcb0637e681ecf09"}, ] -[[package]] -name = "types-requests" -version = "2.31.0.2" -description = "Typing stubs for requests" -category = "dev" -optional = false -python-versions = "*" -files = [ - {file = "types-requests-2.31.0.2.tar.gz", hash = "sha256:6aa3f7faf0ea52d728bb18c0a0d1522d9bfd8c72d26ff6f61bfc3d06a411cf40"}, - {file = "types_requests-2.31.0.2-py3-none-any.whl", hash = "sha256:56d181c85b5925cbc59f4489a57e72a8b2166f18273fd8ba7b6fe0c0b986f12a"}, -] - -[package.dependencies] -types-urllib3 = "*" - -[[package]] -name = "types-toml" -version = "0.10.8.7" -description = "Typing stubs for toml" -category = "dev" -optional = false -python-versions = "*" -files = [ - {file = "types-toml-0.10.8.7.tar.gz", hash = "sha256:58b0781c681e671ff0b5c0319309910689f4ab40e8a2431e205d70c94bb6efb1"}, - {file = "types_toml-0.10.8.7-py3-none-any.whl", hash = "sha256:61951da6ad410794c97bec035d59376ce1cbf4453dc9b6f90477e81e4442d631"}, -] - -[[package]] -name = "types-urllib3" -version = "1.26.25.14" -description = "Typing stubs for urllib3" -category = "dev" -optional = false -python-versions = "*" -files = [ - {file = "types-urllib3-1.26.25.14.tar.gz", hash = "sha256:229b7f577c951b8c1b92c1bc2b2fdb0b49847bd2af6d1cc2a2e3dd340f3bda8f"}, - {file = "types_urllib3-1.26.25.14-py3-none-any.whl", hash = "sha256:9683bbb7fb72e32bfe9d2be6e04875fbe1b3eeec3cbb4ea231435aa7fd6b4f0e"}, -] - [[package]] name = "typing-extensions" version = "4.7.1" @@ -2084,7 +2016,7 @@ files = [ name = "urllib3" version = "1.26.16" description = "HTTP library with thread-safe connection pooling, file post, and more." -category = "main" +category = "dev" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*" files = [ @@ -2340,4 +2272,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "eca2472ed7755b3bfa62d1f77a4d1faff40475d60aaef8bb966ec5b7bb0230a7" +content-hash = "2d501b1fce0b41992ece67514c496b87153c21ca0656a6320605accda028a934" diff --git a/python/zeroeventhub/pyproject.toml b/python/zeroeventhub/pyproject.toml index eba23e1..96cf8cc 100644 --- a/python/zeroeventhub/pyproject.toml +++ b/python/zeroeventhub/pyproject.toml @@ -10,8 +10,8 @@ keywords = ["event-streaming"] [tool.poetry.dependencies] python = "^3.9" -requests = "^2" fastapi = "^0.103" +httpx = "^0.24.1" [tool.poetry.group.dev.dependencies] pytest = "^7" @@ -21,14 +21,9 @@ black = "^22" mypy = "1.5.1" pytest-mock = "^3" poetry-types = "^0.3" -responses = "^0.22" coverage = "^7" -mock-protocol = "^1" -httpx = "0.24.1" -pytest-asyncio = "0.21.1" - -[tool.poetry.group.types.dependencies] -types-requests = "^2" +respx = "^0.20" +pytest-asyncio = "^0.21" [build-system] requires = ["poetry-core"] @@ -47,10 +42,6 @@ warn_unreachable = true warn_unused_configs = true explicit_package_bases = true -[[tool.mypy.overrides]] -module = "mock_protocol.*" -ignore_missing_imports = true - [[tool.mypy.overrides]] module = "tests.*" # `disallow_untyped_defs` has to be explicitly disabled because it is enabled by @@ -66,3 +57,10 @@ accept-no-raise-doc = false accept-no-return-doc = true accept-no-yields-doc = true default-docstring-type = "default" + +[tool.pytest.ini_options] +asyncio_mode = "auto" + +[tool.coverage.run] +omit = ["tests/*"] +source = ["zeroeventhub/"] diff --git a/python/zeroeventhub/tests/test_client.py b/python/zeroeventhub/tests/test_client.py index cdf2a3d..c08f897 100644 --- a/python/zeroeventhub/tests/test_client.py +++ b/python/zeroeventhub/tests/test_client.py @@ -1,9 +1,10 @@ import pytest -import responses -import requests -import mock_protocol -from unittest import mock +import pytest_asyncio +import httpx +from httpx import AsyncByteStream +from unittest.mock import AsyncMock, MagicMock from json import JSONDecodeError +from typing import Any, AsyncGenerator, AsyncIterator, Iterable from zeroeventhub import ( Client, @@ -13,28 +14,41 @@ FIRST_CURSOR, LAST_CURSOR, ALL_HEADERS, + receive_events, ) +class IteratorStream(AsyncByteStream): + def __init__(self, stream: Iterable[bytes]): + self.stream = stream + + async def __aiter__(self) -> AsyncIterator[bytes]: + for chunk in self.stream: + yield chunk + + @pytest.fixture def mock_event_receiver(): - return mock_protocol.from_protocol(EventReceiver) + receiver_mock = MagicMock(spec=EventReceiver) + receiver_mock.event = AsyncMock() + receiver_mock.checkpoint = AsyncMock() + return receiver_mock -@pytest.fixture -def client(): +@pytest_asyncio.fixture +async def client(): url = "https://example.com/feed/v1" partition_count = 2 - return Client(url, partition_count) + async with httpx.AsyncClient() as httpx_client: + yield Client(url, partition_count, httpx_client) @pytest.mark.parametrize( - "page_size_hint,headers", + ("page_size_hint", "headers"), [(10, ["header1", "header2"]), (0, None), (5, ["header1"]), (0, ["header1"])], ) -@responses.activate -def test_events_fetched_successfully_when_there_are_multiple_lines_in_response( - client, mock_event_receiver, page_size_hint, headers +async def test_events_fetched_successfully_when_there_are_multiple_lines_in_response( + client, mock_event_receiver, page_size_hint, headers, respx_mock ): """ Test that fetch_events does not raise an error when successfully called. @@ -43,24 +57,28 @@ def test_events_fetched_successfully_when_there_are_multiple_lines_in_response( # arrange cursors = [Cursor(1, "cursor1"), Cursor(2, "cursor2")] - responses.add( - responses.GET, - client.url, - content_type="application/json", - body="""{ "partition": 1, "cursor": "5" } - { "partition": 1, "headers": {}, "data": "some data"}\n""", - status=200, + respx_mock.get(client.url).mock( + return_value=httpx.Response( + status_code=200, + headers={"content_type": "application/x-ndjson"}, + content=IteratorStream( + [ + b"""{ "partition": 1, "cursor": "5" }\n""", + b"""{ "partition": 1, "headers": {}, "data": "some data"}\n""", + ] + ), + ) ) # act - client.fetch_events(cursors, page_size_hint, mock_event_receiver, headers) + await receive_events(mock_event_receiver, client.fetch_events(cursors, page_size_hint, headers)) # assert mock_event_receiver.event.assert_called_once_with(1, {}, "some data") mock_event_receiver.checkpoint.assert_called_once_with(1, "5") -def test_raises_apierror_when_fetch_events_with_missing_cursors(client, mock_event_receiver): +async def test_raises_apierror_when_fetch_events_with_missing_cursors(client): """ Test that fetch_events raises a ValueError when cursors are missing. """ @@ -72,15 +90,14 @@ def test_raises_apierror_when_fetch_events_with_missing_cursors(client, mock_eve # act & assert with pytest.raises(APIError) as excinfo: - client.fetch_events(cursors, page_size_hint, mock_event_receiver, headers) + await async_generator_to_list(client.fetch_events(cursors, page_size_hint, headers)) # assert assert "cursors are missing" in str(excinfo.value) assert excinfo.value.status() == 400 -@responses.activate -def test_raises_http_error_when_fetch_events_with_unexpected_response(client, mock_event_receiver): +async def test_raises_http_error_when_fetch_events_with_unexpected_response(client, respx_mock): """ Test that fetch_events raises a HTTPError when the response status code is not 2xx @@ -91,18 +108,71 @@ def test_raises_http_error_when_fetch_events_with_unexpected_response(client, mo page_size_hint = 10 headers = ["header1", "header2"] - responses.add(responses.GET, client.url, status=404) + respx_mock.get(client.url).mock( + return_value=httpx.Response( + status_code=404, + ) + ) # act & assert that a HTTPError is raised - with pytest.raises(requests.HTTPError) as excinfo: - client.fetch_events(cursors, page_size_hint, mock_event_receiver, headers) + with pytest.raises(httpx.HTTPError) as excinfo: + await async_generator_to_list(client.fetch_events(cursors, page_size_hint, headers)) # assert - assert str(excinfo.value).startswith(f"404 Client Error: Not Found for url: {client.url}?") + assert str(excinfo.value).startswith(f"Client error '404 Not Found' for url '{client.url}?") + + +async def test_raises_error_when_exception_while_parsing_checkpoint(client, respx_mock): + """ + Test that fetch_events raises a ValueError when the checkpoint returned + from the server cannot be parsed. + """ + + # arrange + cursors = [Cursor(1, "cursor1"), Cursor(2, "cursor2")] + page_size_hint = 10 + headers = ["header1", "header2"] + + respx_mock.get(client.url).mock( + return_value=httpx.Response( + status_code=200, + headers={"content_type": "application/x-ndjson"}, + content="""{ "cursor": "0" }""", # NOTE: partition is missing + ) + ) + # act & assert + with pytest.raises(ValueError, match="error while parsing checkpoint"): + await async_generator_to_list(client.fetch_events(cursors, page_size_hint, headers)) + + +async def test_raises_error_when_exception_while_parsing_event(client, respx_mock): + """ + Test that fetch_events raises a ValueError when the event returned + from the server cannot be parsed. + """ + + # arrange + cursors = [Cursor(1, "cursor1"), Cursor(2, "cursor2")] + page_size_hint = 10 + headers = ["header1", "header2"] + + respx_mock.get(client.url).mock( + return_value=httpx.Response( + status_code=200, + headers={"content_type": "application/x-ndjson"}, + content="""{ "data": "" }""", # NOTE: partition is missing + ) + ) -@responses.activate -def test_raises_error_when_exception_while_receiving_checkpoint(client, mock_event_receiver): + # act & assert + with pytest.raises(ValueError, match="error while parsing event"): + await async_generator_to_list(client.fetch_events(cursors, page_size_hint, headers)) + + +async def test_raises_error_when_exception_while_receiving_checkpoint( + client, mock_event_receiver, respx_mock +): """ Test that fetch_events raises a ValueError when the checkpoint method on the event receiver returns an error. @@ -113,25 +183,26 @@ def test_raises_error_when_exception_while_receiving_checkpoint(client, mock_eve page_size_hint = 10 headers = ["header1", "header2"] - responses.add( - responses.GET, - client.url, - json={ - "partition": 0, - "cursor": "0", - }, - status=200, + respx_mock.get(client.url).mock( + return_value=httpx.Response( + status_code=200, + headers={"content_type": "application/x-ndjson"}, + content="""{ "partition": 0, "cursor": "0" }""", + ) ) mock_event_receiver.checkpoint.side_effect = Exception("error while receiving checkpoint") # act & assert with pytest.raises(ValueError, match="error while receiving checkpoint"): - client.fetch_events(cursors, page_size_hint, mock_event_receiver, headers) + await receive_events( + mock_event_receiver, client.fetch_events(cursors, page_size_hint, headers) + ) -@responses.activate -def test_raises_error_when_exception_while_receiving_event(client, mock_event_receiver): +async def test_raises_error_when_exception_while_receiving_event( + client, mock_event_receiver, respx_mock +): """ Test that fetch_events raises a ValueError when the event method on the event receiver returns an error. @@ -142,25 +213,28 @@ def test_raises_error_when_exception_while_receiving_event(client, mock_event_re page_size_hint = 10 headers = ["header1", "header2"] - responses.add( - responses.GET, - client.url, - json={"partition": 0, "headers": {}, "data": "some data"}, - status=200, + respx_mock.get(client.url).mock( + return_value=httpx.Response( + status_code=200, + headers={"content_type": "application/x-ndjson"}, + content="""{"partition": 0, "headers": {}, "data": "some data"}\n""", + ) ) - - mock_event_receiver.event.side_effect = Exception("error while receiving event") + mock_event_receiver.event.side_effect = Exception("some error while processing the event") # act & assert with pytest.raises(ValueError, match="error while receiving event"): - client.fetch_events(cursors, page_size_hint, mock_event_receiver, headers) + await receive_events( + mock_event_receiver, client.fetch_events(cursors, page_size_hint, headers) + ) # assert mock_event_receiver.event.assert_called() -@responses.activate -def test_fetch_events_succeeds_when_response_is_empty(client, mock_event_receiver): +async def test_fetch_events_succeeds_when_response_is_empty( + client, mock_event_receiver, respx_mock +): """ Test that fetch_events gracefully handles an empty response. """ @@ -170,24 +244,25 @@ def test_fetch_events_succeeds_when_response_is_empty(client, mock_event_receive page_size_hint = 10 headers = None - responses.add( - responses.GET, - client.url, - content_type="application/json", - body="", - status=204, + respx_mock.get(client.url).mock( + return_value=httpx.Response( + status_code=204, + headers={"content_type": "application/x-ndjson"}, + content="", + ) ) # act - client.fetch_events(cursors, page_size_hint, mock_event_receiver, headers) + await receive_events(mock_event_receiver, client.fetch_events(cursors, page_size_hint, headers)) # assert that the event and checkpoint methods were not called mock_event_receiver.event.assert_not_called() mock_event_receiver.checkpoint.assert_not_called() -@responses.activate -def test_raises_error_when_response_contains_invalid_json_line(client, mock_event_receiver): +async def test_raises_error_when_response_contains_invalid_json_line( + client, mock_event_receiver, respx_mock +): """ Test that fetch_events raises a JSONDecodeError when the response contains a non-empty line which is not valid JSON. @@ -198,86 +273,23 @@ def test_raises_error_when_response_contains_invalid_json_line(client, mock_even page_size_hint = 10 headers = ["header1", "header2"] - responses.add( - responses.GET, - client.url, - body="""{"partition": 1,"cursor": "5"}\ninvalid json""", - status=200, + respx_mock.get(client.url).mock( + return_value=httpx.Response( + status_code=200, + headers={"content_type": "application/x-ndjson"}, + content="""{"partition": 1,"cursor": "5"}\ninvalid json""", + ) ) # act & assert with pytest.raises(JSONDecodeError): - client.fetch_events(cursors, page_size_hint, mock_event_receiver, headers) + await receive_events( + mock_event_receiver, client.fetch_events(cursors, page_size_hint, headers) + ) # assert that the checkpoint method was called for the first response line mock_event_receiver.checkpoint.assert_called_once_with(1, "5") -@responses.activate -def test_owned_session_destroyed_when_client_destroyed(mock_event_receiver, mocker): - """ - Test that the client-owned session is closed when the client is destroyed - """ - - # arrange - url = "https://example.com" - partition_count = 2 - session_mock = mocker.MagicMock(spec_set=requests.Session) - with mock.patch("zeroeventhub.client.requests.Session") as create_session_mock: - create_session_mock.return_value = session_mock - client = Client(url, partition_count) - create_session_mock.assert_called_once() - assert session_mock == client.session - - cursors = [Cursor(1, "cursor1"), Cursor(2, "cursor2")] - page_size_hint = 10 - headers = ["header1", "header2"] - - responses.add( - responses.GET, - client.url, - body="""{"partition": 0,"cursor": 0}\n""", - status=200, - ) - - # act - client.fetch_events(cursors, page_size_hint, mock_event_receiver, headers) - del client - - # assert - session_mock.send.assert_called_once() - session_mock.close.assert_called_once() - - -@responses.activate -def test_provided_session_not_destroyed_when_client_destroyed(mock_event_receiver, mocker): - """ - Test that the provided session is not closed when the client is destroyed - """ - - # arrange - url = "https://example.com" - partition_count = 2 - session_mock = mocker.MagicMock(spec_set=requests.Session) - with mock.patch("zeroeventhub.client.requests.Session") as create_session_mock: - client = Client(url, partition_count, session_mock) - create_session_mock.assert_not_called() - - cursors = [Cursor(1, FIRST_CURSOR), Cursor(2, LAST_CURSOR)] - page_size_hint = None - headers = ALL_HEADERS - - responses.add( - responses.GET, - client.url, - body="""{"partition": 1,"cursor": "123"}\n""", - status=200, - ) - - # act - client.fetch_events(cursors, page_size_hint, mock_event_receiver, headers) - del client - - # assert - session_mock.send.assert_called_once() - session_mock.close.assert_not_called() +async def async_generator_to_list(input: AsyncGenerator[Any, None]) -> list[Any]: + return [item async for item in input] diff --git a/python/zeroeventhub/tests/test_page_event_receiver.py b/python/zeroeventhub/tests/test_page_event_receiver.py index 89f5a7a..41dbfed 100644 --- a/python/zeroeventhub/tests/test_page_event_receiver.py +++ b/python/zeroeventhub/tests/test_page_event_receiver.py @@ -13,24 +13,24 @@ def page_event_receiver(): return PageEventReceiver() -def receive_page_1_events(page_event_receiver: EventReceiver) -> None: - page_event_receiver.event(1, {"header1": "abc"}, "event 1 partition 1") - page_event_receiver.event(1, {}, "event 2 partition 1") - page_event_receiver.checkpoint(1, "0xf01dab1e") - page_event_receiver.event(2, {"header1": "abc"}, "event 1 partition 2") - page_event_receiver.event(1, {"header1": "def"}, "event 3 partition 1") - page_event_receiver.checkpoint(1, "0xFOO") - page_event_receiver.event(2, {"header1": "blah"}, "event 2 partition 2") - page_event_receiver.checkpoint(2, "0xBA5EBA11") - - -def test_page_contains_all_received_events_and_checkpoints(page_event_receiver): +async def receive_page_1_events(page_event_receiver: EventReceiver) -> None: + await page_event_receiver.event(1, {"header1": "abc"}, "event 1 partition 1") + await page_event_receiver.event(1, {}, "event 2 partition 1") + await page_event_receiver.checkpoint(1, "0xf01dab1e") + await page_event_receiver.event(2, {"header1": "abc"}, "event 1 partition 2") + await page_event_receiver.event(1, {"header1": "def"}, "event 3 partition 1") + await page_event_receiver.checkpoint(1, "0xFOO") + await page_event_receiver.event(2, {"header1": "blah"}, "event 2 partition 2") + await page_event_receiver.checkpoint(2, "0xBA5EBA11") + + +async def test_page_contains_all_received_events_and_checkpoints(page_event_receiver): """ Test that the page contains all received events and checkpoints in order. """ # act - receive_page_1_events(page_event_receiver) + await receive_page_1_events(page_event_receiver) # assert assert page_event_receiver.events == [ @@ -53,12 +53,12 @@ def test_page_contains_all_received_events_and_checkpoints(page_event_receiver): ] -def test_page_is_empty_after_clearing(page_event_receiver): +async def test_page_is_empty_after_clearing(page_event_receiver): """ Test that the page contains no events or checkpoints after being cleared. """ # arrange - receive_page_1_events(page_event_receiver) + await receive_page_1_events(page_event_receiver) # act page_event_receiver.clear() @@ -69,7 +69,7 @@ def test_page_is_empty_after_clearing(page_event_receiver): assert not page_event_receiver.latest_checkpoints -def test_page_contains_all_received_events_and_checkpoints_when_receiving_after_being_cleared( +async def test_page_contains_all_received_events_and_checkpoints_when_receiving_after_being_cleared( page_event_receiver, ): """ @@ -77,12 +77,12 @@ def test_page_contains_all_received_events_and_checkpoints_when_receiving_after_ from the second page only after the first page was cleared. """ # arrange - receive_page_1_events(page_event_receiver) + await receive_page_1_events(page_event_receiver) # act page_event_receiver.clear() - page_event_receiver.event(1, None, "event 4 partition 1") - page_event_receiver.checkpoint(1, "0x5ca1ab1e") + await page_event_receiver.event(1, None, "event 4 partition 1") + await page_event_receiver.checkpoint(1, "0x5ca1ab1e") # assert assert page_event_receiver.events == [ diff --git a/python/zeroeventhub/zeroeventhub/__init__.py b/python/zeroeventhub/zeroeventhub/__init__.py index a3efaba..316536a 100644 --- a/python/zeroeventhub/zeroeventhub/__init__.py +++ b/python/zeroeventhub/zeroeventhub/__init__.py @@ -2,10 +2,11 @@ from .client import Client from .cursor import Cursor, FIRST_CURSOR, LAST_CURSOR -from .event_receiver import EventReceiver +from .event import Event +from .event_receiver import EventReceiver, receive_events from .errors import APIError, ErrCursorsMissing from .constants import ALL_HEADERS -from .page_event_receiver import Event, PageEventReceiver +from .page_event_receiver import PageEventReceiver from .api_handler import ZeroEventHubFastApiHandler from .data_reader import DataReader @@ -23,4 +24,5 @@ "PageEventReceiver", "ZeroEventHubFastApiHandler", "DataReader", + "receive_events", ] diff --git a/python/zeroeventhub/zeroeventhub/client.py b/python/zeroeventhub/zeroeventhub/client.py index 4b6346a..1f839fc 100644 --- a/python/zeroeventhub/zeroeventhub/client.py +++ b/python/zeroeventhub/zeroeventhub/client.py @@ -1,25 +1,25 @@ """Module containing client-side related code for ZeroEventHub""" import json -from typing import Dict, Optional, Any, Sequence, Union -import requests +from typing import Dict, Any, Optional, Sequence, Union +from collections.abc import AsyncGenerator +import httpx from .cursor import Cursor -from .event_receiver import EventReceiver +from .event import Event from .errors import ErrCursorsMissing class Client: """ - Client-side code to query a ZeroEventHub server to fetch events and pass them to the supplied - EventReceiver instance. + Client-side code to query a ZeroEventHub server to fetch events. """ def __init__( self, url: str, partition_count: int, - session: Optional[requests.Session] = None, + http_client: httpx.AsyncClient, ) -> None: """ @@ -27,55 +27,45 @@ def __init__( :param url: The base URL for the service. :param partition_count: The number of partitions the ZeroEventHub server has. - :param session: An optional session under which to make the HTTP requests. + :param http_client: A httpx AsyncClient under which to make the HTTP requests. This allows one time setup of authentication etc. on the session, and increases performance if fetching events frequently due to connection pooling. """ self.url = url self.partition_count = partition_count - self._client_owns_session = not bool(session) - self._session = session or requests.Session() - - def __del__(self) -> None: - """ - Close the session when this Client instance is destroyed if we created it - during initialization - """ - if self._client_owns_session: - self._session.close() + self._http_client = http_client @property - def session(self) -> requests.Session: - """Return the session being used by this client.""" - return self._session + def http_client(self) -> httpx.AsyncClient: + """Return the http_client being used by this client.""" + return self._http_client - def fetch_events( + async def fetch_events( self, cursors: Sequence[Cursor], - page_size_hint: Optional[int], - event_receiver: EventReceiver, + page_size_hint: Optional[int] = None, headers: Optional[Sequence[str]] = None, - ) -> None: + ) -> AsyncGenerator[Union[Event, Cursor], None]: """ - Fetch events from the server using the provided context, cursors, page size hint, - event receiver, and headers. + Fetch events from the server using the provided cursors, page size hint and + desired headers. :param cursors: A sequence of cursors to be used in the request. :param page_size_hint: An optional hint for the page size of the response. - :param event_receiver: An event receiver to handle the received events. :param headers: An optional sequence containing event headers desired in the response. :raises APIError: if cursors are missing. :raises ValueError: if an exception occurs while the event receiver handles the response. - :raises requests.exceptions.RequestException: if unable to call the endpoint successfully. - :raises requests.HTTPError: if response status code does not indicate success. + :raises httpx.RequestError: if unable to call the endpoint successfully. + :raises httpx.HTTPError: if response status code does not indicate success. :raises json.JSONDecodeError: if a line from the response cannot be decoded into JSON. """ self._validate_inputs(cursors) - req = self._build_request(cursors, page_size_hint, headers) + params = self._build_request_params(cursors, page_size_hint, headers) - with self.session.send(req, stream=True) as res: - self._process_response(res, event_receiver) + async with self._http_client.stream("GET", self.url, params=params) as res: + async for event_or_checkpoint in self._process_response(res): + yield event_or_checkpoint def _validate_inputs(self, cursors: Sequence[Cursor]) -> None: """ @@ -87,12 +77,12 @@ def _validate_inputs(self, cursors: Sequence[Cursor]) -> None: if not cursors: raise ErrCursorsMissing - def _build_request( + def _build_request_params( self, cursors: Sequence[Cursor], page_size_hint: Optional[int], headers: Optional[Sequence[str]], - ) -> requests.PreparedRequest: + ) -> Dict[str, Union[str, int]]: """ Build the http request using the provided inputs. @@ -113,50 +103,48 @@ def _build_request( if headers: params["headers"] = ",".join(headers) - request = requests.Request("GET", self.url, params=params) - return self._session.prepare_request(request) + return params - def _process_response(self, res: requests.Response, event_receiver: EventReceiver) -> None: + async def _process_response( + self, res: httpx.Response + ) -> AsyncGenerator[Union[Event, Cursor], None]: """ Process the response from the server. :param res: the server response - :param event_receiver: An event receiver to handle the received events. - :raises requests.HTTPError: if response status code does not indicate success. + :raises httpx.HTTPError: if response status code does not indicate success. :raises json.JSONDecodeError: if a line from the response cannot be decoded into JSON. :raises ValueError: error while EventReceiver handles checkpoint or event. """ res.raise_for_status() - for line in res.iter_lines(): - checkpoint_or_event = json.loads(line) - self._process_checkpoint_or_event(checkpoint_or_event, event_receiver) + async for line in res.aiter_lines(): + yield self._parse_checkpoint_or_event(line) - def _process_checkpoint_or_event( - self, checkpoint_or_event: Dict[str, Any], event_receiver: EventReceiver - ) -> None: + def _parse_checkpoint_or_event(self, raw_line: str) -> Union[Event, Cursor]: """ - Process a line of response from the server. + Parse a line of response from the server. - :param checkpoint_or_event: A dictionary containing a checkpoint or event. - :param event_receiver: An event receiver to handle the received events. - :raises ValueError: if an error occurred in the event receiver while - the event or checkpoint was being processed. + :param raw_line: The raw JSON line from the server + :raises ValueError: if an error occurred parsing the json line into an event or checkpoint. """ - if checkpoint_or_event.get("cursor", None) is not None: + checkpoint_or_event: Dict[str, Any] = json.loads(raw_line) + + if (cursor := checkpoint_or_event.get("cursor", None)) is not None: try: - event_receiver.checkpoint( - checkpoint_or_event["partition"], checkpoint_or_event["cursor"] + return Cursor( + partition_id=checkpoint_or_event["partition"], + cursor=cursor, ) except Exception as error: - raise ValueError("error while receiving checkpoint") from error + raise ValueError("error while parsing checkpoint") from error else: try: - event_receiver.event( - checkpoint_or_event["partition"], - checkpoint_or_event.get("headers", None), - checkpoint_or_event["data"], + return Event( + partition_id=checkpoint_or_event["partition"], + headers=checkpoint_or_event.get("headers", None), + data=checkpoint_or_event["data"], ) except Exception as error: - raise ValueError("error while receiving event") from error + raise ValueError("error while parsing event") from error diff --git a/python/zeroeventhub/zeroeventhub/event.py b/python/zeroeventhub/zeroeventhub/event.py new file mode 100644 index 0000000..ced5bbf --- /dev/null +++ b/python/zeroeventhub/zeroeventhub/event.py @@ -0,0 +1,13 @@ +"""Module to define the event dataclass""" + +from dataclasses import dataclass +from typing import Any, Dict, Optional + + +@dataclass +class Event: + """All properties received relating to a certain event.""" + + partition_id: int + headers: Optional[Dict[str, str]] + data: Any diff --git a/python/zeroeventhub/zeroeventhub/event_receiver.py b/python/zeroeventhub/zeroeventhub/event_receiver.py index f7a0dcd..2d646b2 100644 --- a/python/zeroeventhub/zeroeventhub/event_receiver.py +++ b/python/zeroeventhub/zeroeventhub/event_receiver.py @@ -1,6 +1,9 @@ """Module to define the EventReceiver interface""" -from typing import Protocol, Dict, Any, Optional +from typing import Protocol, Dict, Any, Optional, Union +from collections.abc import AsyncGenerator +from .cursor import Cursor +from .event import Event class EventReceiver(Protocol): @@ -10,7 +13,7 @@ class EventReceiver(Protocol): Checkpoint in this context is basically a cursor. """ - def event(self, partition_id: int, headers: Optional[Dict[str, str]], data: Any) -> None: + async def event(self, partition_id: int, headers: Optional[Dict[str, str]], data: Any) -> None: """ Event method processes actual events. @@ -19,10 +22,28 @@ def event(self, partition_id: int, headers: Optional[Dict[str, str]], data: Any) :param data: the data """ - def checkpoint(self, partition_id: int, cursor: str) -> None: + async def checkpoint(self, partition_id: int, cursor: str) -> None: """ Checkpoint method processes cursors. :param partition_id: the partition id :param cursor: the cursor """ + + +async def receive_events( + event_receiver: EventReceiver, events: AsyncGenerator[Union[Cursor, Event], None] +) -> None: + """bridge between the output from the Client fetch_events return value + and the EventReceiver interface.""" + async for event in events: + if isinstance(event, Cursor): + try: + await event_receiver.checkpoint(event.partition_id, event.cursor) + except Exception as error: + raise ValueError("error while receiving checkpoint") from error + else: + try: + await event_receiver.event(event.partition_id, event.headers, event.data) + except Exception as error: + raise ValueError("error while receiving event") from error diff --git a/python/zeroeventhub/zeroeventhub/page_event_receiver.py b/python/zeroeventhub/zeroeventhub/page_event_receiver.py index 0019ebe..f1440b7 100644 --- a/python/zeroeventhub/zeroeventhub/page_event_receiver.py +++ b/python/zeroeventhub/zeroeventhub/page_event_receiver.py @@ -1,18 +1,9 @@ """Module to make it easy to receive a page of events""" from typing import Dict, Any, Sequence, Optional, List -from dataclasses import dataclass -from .event_receiver import EventReceiver from .cursor import Cursor - - -@dataclass -class Event: - """All properties received relating to a certain event.""" - - partition_id: int - headers: Optional[Dict[str, str]] - data: Any +from .event import Event +from .event_receiver import EventReceiver class PageEventReceiver(EventReceiver): @@ -50,7 +41,7 @@ def latest_checkpoints(self) -> Sequence[Cursor]: for partition_id, cursor in self._latest_checkpoints.items() ] - def event(self, partition_id: int, headers: Optional[Dict[str, str]], data: Any) -> None: + async def event(self, partition_id: int, headers: Optional[Dict[str, str]], data: Any) -> None: """ Add the given event to the list. @@ -60,7 +51,7 @@ def event(self, partition_id: int, headers: Optional[Dict[str, str]], data: Any) """ self._events.append(Event(partition_id, headers, data)) - def checkpoint(self, partition_id: int, cursor: str) -> None: + async def checkpoint(self, partition_id: int, cursor: str) -> None: """ Add the given cursor to the list. From d758b84db5fcb22004dab7b34dc005849dd1d2a4 Mon Sep 17 00:00:00 2001 From: Keith Hall Date: Mon, 4 Sep 2023 12:17:54 +0300 Subject: [PATCH 2/6] address inconsistency between PageEventReceiver and EventReceiver --- python/zeroeventhub/CHANGELOG.md | 3 ++ python/zeroeventhub/tests/test_client.py | 7 +++-- .../tests/test_page_event_receiver.py | 20 ++++++------- .../zeroeventhub/event_receiver.py | 21 ++++++-------- .../zeroeventhub/page_event_receiver.py | 28 ++++++++----------- 5 files changed, 37 insertions(+), 42 deletions(-) diff --git a/python/zeroeventhub/CHANGELOG.md b/python/zeroeventhub/CHANGELOG.md index ffa1abf..90a8dc6 100644 --- a/python/zeroeventhub/CHANGELOG.md +++ b/python/zeroeventhub/CHANGELOG.md @@ -5,3 +5,6 @@ Switched from `requests` to `httpx` due to a memory leak in `requests` with long-lived sessions and many requests. - As a result, we are now using an `AsyncClient` which is the `httpx` equivalent of a `requests.Session`. After observing usage patterns, it was noted that the session was always passed in as a parameter, due to consuming feeds which require authentication, so `AsyncClient` has been made mandatory instead of optional to simplify the code a bit. - It was also observed that it can be useful to directly receive the parsed response as it is streamed from the server instead of being forced to use an `EventReceiver` protocol. This makes it easier to process received events in batches. As the above dependency change is a breaking change anyway, and due to the switch to async calls, it was the perfect time to switch the `Client.fetch_events` method to return an `AsyncGenerator`. A helper method called `receive_events` has been introduced to make it easy to keep the `EventReceiver` behavior from before. + +Also the inconsistency between `PageEventReceiver` returning `Event` and `Cursor` instances but the `EventReceiver` protocol having separate parameters for each property has been addressed, converging on using the dataclasses. This moves away slightly from the Go library implementation but makes sense for the Python library. + diff --git a/python/zeroeventhub/tests/test_client.py b/python/zeroeventhub/tests/test_client.py index c08f897..607bb1b 100644 --- a/python/zeroeventhub/tests/test_client.py +++ b/python/zeroeventhub/tests/test_client.py @@ -9,6 +9,7 @@ from zeroeventhub import ( Client, Cursor, + Event, APIError, EventReceiver, FIRST_CURSOR, @@ -74,8 +75,8 @@ async def test_events_fetched_successfully_when_there_are_multiple_lines_in_resp await receive_events(mock_event_receiver, client.fetch_events(cursors, page_size_hint, headers)) # assert - mock_event_receiver.event.assert_called_once_with(1, {}, "some data") - mock_event_receiver.checkpoint.assert_called_once_with(1, "5") + mock_event_receiver.event.assert_called_once_with(Event(1, {}, "some data")) + mock_event_receiver.checkpoint.assert_called_once_with(Cursor(1, "5")) async def test_raises_apierror_when_fetch_events_with_missing_cursors(client): @@ -288,7 +289,7 @@ async def test_raises_error_when_response_contains_invalid_json_line( ) # assert that the checkpoint method was called for the first response line - mock_event_receiver.checkpoint.assert_called_once_with(1, "5") + mock_event_receiver.checkpoint.assert_called_once_with(Cursor(1, "5")) async def async_generator_to_list(input: AsyncGenerator[Any, None]) -> list[Any]: diff --git a/python/zeroeventhub/tests/test_page_event_receiver.py b/python/zeroeventhub/tests/test_page_event_receiver.py index 41dbfed..868c078 100644 --- a/python/zeroeventhub/tests/test_page_event_receiver.py +++ b/python/zeroeventhub/tests/test_page_event_receiver.py @@ -14,14 +14,14 @@ def page_event_receiver(): async def receive_page_1_events(page_event_receiver: EventReceiver) -> None: - await page_event_receiver.event(1, {"header1": "abc"}, "event 1 partition 1") - await page_event_receiver.event(1, {}, "event 2 partition 1") - await page_event_receiver.checkpoint(1, "0xf01dab1e") - await page_event_receiver.event(2, {"header1": "abc"}, "event 1 partition 2") - await page_event_receiver.event(1, {"header1": "def"}, "event 3 partition 1") - await page_event_receiver.checkpoint(1, "0xFOO") - await page_event_receiver.event(2, {"header1": "blah"}, "event 2 partition 2") - await page_event_receiver.checkpoint(2, "0xBA5EBA11") + await page_event_receiver.event(Event(1, {"header1": "abc"}, "event 1 partition 1")) + await page_event_receiver.event(Event(1, {}, "event 2 partition 1")) + await page_event_receiver.checkpoint(Cursor(1, "0xf01dab1e")) + await page_event_receiver.event(Event(2, {"header1": "abc"}, "event 1 partition 2")) + await page_event_receiver.event(Event(1, {"header1": "def"}, "event 3 partition 1")) + await page_event_receiver.checkpoint(Cursor(1, "0xFOO")) + await page_event_receiver.event(Event(2, {"header1": "blah"}, "event 2 partition 2")) + await page_event_receiver.checkpoint(Cursor(2, "0xBA5EBA11")) async def test_page_contains_all_received_events_and_checkpoints(page_event_receiver): @@ -81,8 +81,8 @@ async def test_page_contains_all_received_events_and_checkpoints_when_receiving_ # act page_event_receiver.clear() - await page_event_receiver.event(1, None, "event 4 partition 1") - await page_event_receiver.checkpoint(1, "0x5ca1ab1e") + await page_event_receiver.event(Event(1, None, "event 4 partition 1")) + await page_event_receiver.checkpoint(Cursor(1, "0x5ca1ab1e")) # assert assert page_event_receiver.events == [ diff --git a/python/zeroeventhub/zeroeventhub/event_receiver.py b/python/zeroeventhub/zeroeventhub/event_receiver.py index 2d646b2..7eec855 100644 --- a/python/zeroeventhub/zeroeventhub/event_receiver.py +++ b/python/zeroeventhub/zeroeventhub/event_receiver.py @@ -1,6 +1,6 @@ """Module to define the EventReceiver interface""" -from typing import Protocol, Dict, Any, Optional, Union +from typing import Protocol, Union from collections.abc import AsyncGenerator from .cursor import Cursor from .event import Event @@ -13,21 +13,18 @@ class EventReceiver(Protocol): Checkpoint in this context is basically a cursor. """ - async def event(self, partition_id: int, headers: Optional[Dict[str, str]], data: Any) -> None: + async def event(self, event: Event) -> None: """ Event method processes actual events. - :param partition_id: the partition id - :param headers: the headers - :param data: the data + :param event: the details of the event which has been received from the server """ - async def checkpoint(self, partition_id: int, cursor: str) -> None: + async def checkpoint(self, checkpoint: Cursor) -> None: """ Checkpoint method processes cursors. - :param partition_id: the partition id - :param cursor: the cursor + :param checkpoint: the checkpoint which was received from the server """ @@ -36,14 +33,14 @@ async def receive_events( ) -> None: """bridge between the output from the Client fetch_events return value and the EventReceiver interface.""" - async for event in events: - if isinstance(event, Cursor): + async for event_or_checkpoint in events: + if isinstance(event_or_checkpoint, Cursor): try: - await event_receiver.checkpoint(event.partition_id, event.cursor) + await event_receiver.checkpoint(event_or_checkpoint) except Exception as error: raise ValueError("error while receiving checkpoint") from error else: try: - await event_receiver.event(event.partition_id, event.headers, event.data) + await event_receiver.event(event_or_checkpoint) except Exception as error: raise ValueError("error while receiving event") from error diff --git a/python/zeroeventhub/zeroeventhub/page_event_receiver.py b/python/zeroeventhub/zeroeventhub/page_event_receiver.py index f1440b7..374e055 100644 --- a/python/zeroeventhub/zeroeventhub/page_event_receiver.py +++ b/python/zeroeventhub/zeroeventhub/page_event_receiver.py @@ -1,6 +1,6 @@ """Module to make it easy to receive a page of events""" -from typing import Dict, Any, Sequence, Optional, List +from typing import Dict, Sequence, List from .cursor import Cursor from .event import Event from .event_receiver import EventReceiver @@ -15,7 +15,7 @@ def __init__(self) -> None: """Initialize the PageEventReceiver with empty state.""" self._events: List[Event] = [] self._checkpoints: List[Cursor] = [] - self._latest_checkpoints: Dict[int, str] = {} + self._latest_checkpoints: Dict[int, Cursor] = {} def clear(self) -> None: """Clear the received events and checkpoints, ready to handle a new page.""" @@ -36,27 +36,21 @@ def checkpoints(self) -> Sequence[Cursor]: @property def latest_checkpoints(self) -> Sequence[Cursor]: """Only return the latest checkpoint for each partition.""" - return [ - Cursor(partition_id, cursor) - for partition_id, cursor in self._latest_checkpoints.items() - ] + return list(self._latest_checkpoints.values()) - async def event(self, partition_id: int, headers: Optional[Dict[str, str]], data: Any) -> None: + async def event(self, event: Event) -> None: """ Add the given event to the list. - :param partition_id: the partition id - :param headers: the headers - :param data: the data + :param event: the event """ - self._events.append(Event(partition_id, headers, data)) + self._events.append(event) - async def checkpoint(self, partition_id: int, cursor: str) -> None: + async def checkpoint(self, checkpoint: Cursor) -> None: """ - Add the given cursor to the list. + Add the given checkpoint to the list. - :param partition_id: the partition id - :param cursor: the cursor + :param checkpoint: the cursor to use as a checkpoint to continue processing from later """ - self._checkpoints.append(Cursor(partition_id, cursor)) - self._latest_checkpoints[partition_id] = cursor + self._checkpoints.append(checkpoint) + self._latest_checkpoints[checkpoint.partition_id] = checkpoint From 7e1870040f43cbb8dceeedc17036b2a440505fe4 Mon Sep 17 00:00:00 2001 From: Keith Hall Date: Thu, 7 Sep 2023 08:48:34 +0300 Subject: [PATCH 3/6] don't hide exceptions from the EventReceiver --- python/zeroeventhub/tests/test_client.py | 12 ++++++------ python/zeroeventhub/zeroeventhub/event_receiver.py | 10 ++-------- 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/python/zeroeventhub/tests/test_client.py b/python/zeroeventhub/tests/test_client.py index 607bb1b..cb8a202 100644 --- a/python/zeroeventhub/tests/test_client.py +++ b/python/zeroeventhub/tests/test_client.py @@ -171,11 +171,11 @@ async def test_raises_error_when_exception_while_parsing_event(client, respx_moc await async_generator_to_list(client.fetch_events(cursors, page_size_hint, headers)) -async def test_raises_error_when_exception_while_receiving_checkpoint( +async def test_exceptions_bubble_up_when_exception_while_receiving_checkpoint( client, mock_event_receiver, respx_mock ): """ - Test that fetch_events raises a ValueError when the checkpoint method + Test that receive_events doesn't hide the exception when the checkpoint method on the event receiver returns an error. """ @@ -195,17 +195,17 @@ async def test_raises_error_when_exception_while_receiving_checkpoint( mock_event_receiver.checkpoint.side_effect = Exception("error while receiving checkpoint") # act & assert - with pytest.raises(ValueError, match="error while receiving checkpoint"): + with pytest.raises(Exception, match="error while receiving checkpoint"): await receive_events( mock_event_receiver, client.fetch_events(cursors, page_size_hint, headers) ) -async def test_raises_error_when_exception_while_receiving_event( +async def test_exceptions_bubble_up_when_exception_while_receiving_event( client, mock_event_receiver, respx_mock ): """ - Test that fetch_events raises a ValueError when the event method + Test that receive_events doesn't hide the exception when the event method on the event receiver returns an error. """ @@ -224,7 +224,7 @@ async def test_raises_error_when_exception_while_receiving_event( mock_event_receiver.event.side_effect = Exception("some error while processing the event") # act & assert - with pytest.raises(ValueError, match="error while receiving event"): + with pytest.raises(Exception, match="some error while processing the event"): await receive_events( mock_event_receiver, client.fetch_events(cursors, page_size_hint, headers) ) diff --git a/python/zeroeventhub/zeroeventhub/event_receiver.py b/python/zeroeventhub/zeroeventhub/event_receiver.py index 7eec855..11daf9d 100644 --- a/python/zeroeventhub/zeroeventhub/event_receiver.py +++ b/python/zeroeventhub/zeroeventhub/event_receiver.py @@ -35,12 +35,6 @@ async def receive_events( and the EventReceiver interface.""" async for event_or_checkpoint in events: if isinstance(event_or_checkpoint, Cursor): - try: - await event_receiver.checkpoint(event_or_checkpoint) - except Exception as error: - raise ValueError("error while receiving checkpoint") from error + await event_receiver.checkpoint(event_or_checkpoint) else: - try: - await event_receiver.event(event_or_checkpoint) - except Exception as error: - raise ValueError("error while receiving event") from error + await event_receiver.event(event_or_checkpoint) From 8f44e6fbe2020622cab68473f71ca7d2f37cd11f Mon Sep 17 00:00:00 2001 From: Keith Hall Date: Thu, 7 Sep 2023 10:46:31 +0300 Subject: [PATCH 4/6] add end to end test of client and server --- python/zeroeventhub/tests/test_api_handler.py | 53 +++++-- .../tests/test_client_with_server.py | 136 ++++++++++++++++++ .../zeroeventhub/zeroeventhub/api_handler.py | 2 +- .../zeroeventhub/zeroeventhub/data_reader.py | 2 +- 4 files changed, 183 insertions(+), 10 deletions(-) create mode 100644 python/zeroeventhub/tests/test_client_with_server.py diff --git a/python/zeroeventhub/tests/test_api_handler.py b/python/zeroeventhub/tests/test_api_handler.py index 5dbcbfe..08e1997 100644 --- a/python/zeroeventhub/tests/test_api_handler.py +++ b/python/zeroeventhub/tests/test_api_handler.py @@ -1,6 +1,7 @@ from typing import Any, Dict, List, Optional, Sequence, Generator, AsyncGenerator, Union import json import pytest +import asyncio from unittest import mock from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse @@ -16,13 +17,15 @@ class FakeAsyncDataReader(DataReader): async def get_data( self, cursors: Sequence[Cursor], headers: Optional[Sequence[str]], page_size: Optional[int] - ) -> Union[Generator[Dict[str, Any], Any, Any], AsyncGenerator[Dict[str, Any], Any]]: + ) -> AsyncGenerator[Dict[str, Any], None]: header_dict = {} if headers: for header in headers: header_dict[header] = header event_list_p1 = ["e1", "e2", "e3"] event_list_p2 = ["e4", "e5", "e6"] + + await asyncio.sleep(0.1) for cursor in cursors: if cursor.partition_id == 0: for event in event_list_p1: @@ -47,7 +50,7 @@ async def get_data( class FakeDataReader(DataReader): def get_data( self, cursors: Sequence[Cursor], headers: Optional[Sequence[str]], page_size: Optional[int] - ) -> Union[Generator[Dict[str, Any], Any, Any], AsyncGenerator[Dict[str, Any], Any]]: + ) -> Generator[Dict[str, Any], None, None]: header_dict = {} if headers: for header in headers: @@ -157,10 +160,12 @@ async def test_request_handler_cursor0_skipping(): def test_no_n_param(): - client = TestClient(app) - response = client.get("/feed/v1?cursor0=c0&cursor1=c1&headers=h1,h2,h3") - assert response.status_code == 400 - assert response.json() == {"detail": "Parameter n not found"} + with mock.patch.object(FakeDataReader, "get_data") as mocked_get_data: + client = TestClient(app) + response = client.get("/feed/v1?cursor0=c0&cursor1=c1&headers=h1,h2,h3") + assert response.status_code == 400 + assert response.json() == {"detail": "Parameter n not found"} + mocked_get_data.assert_not_called() def test_invalid_n_param(): @@ -172,9 +177,41 @@ def test_invalid_n_param(): mocked_get_data.assert_not_called() -def test_invalid_cursor_param(): +@pytest.mark.parametrize( + ("url",), + [ + ("/feed/v1?n=1&cursor0=0",), + ("/feed/v1?n=0",), + ], +) +def test_mismatched_n_param(url: str) -> None: + with mock.patch.object(FakeDataReader, "get_data") as mocked_get_data: + client = TestClient(app) + response = client.get(url) + assert response.status_code == 400 + assert response.json() == {"detail": "Partition count doesn't match as expected"} + mocked_get_data.assert_not_called() + + +def test_invalid_pagesizehint_param() -> None: + with mock.patch.object(FakeDataReader, "get_data") as mocked_get_data: + client = TestClient(app) + response = client.get("/feed/v1?n=2&cursor0=c0&pagesizehint=foobar") + assert response.status_code == 400 + assert response.json() == {"detail": "Invalid parameter pagesizehint"} + mocked_get_data.assert_not_called() + + +@pytest.mark.parametrize( + ("url",), + [ + ("/feed/v1?n=2&cursor2=c0&headers=h1,h2,h3",), + ("/feed/v1?n=2",), + ], +) +def test_invalid_cursor_param(url: str) -> None: client = TestClient(app) - response = client.get("/feed/v1?n=2&cursor2=c0&headers=h1,h2,h3") + response = client.get(url) assert response.status_code == 400 assert response.json() == {"detail": "Cursor parameter is missing"} diff --git a/python/zeroeventhub/tests/test_client_with_server.py b/python/zeroeventhub/tests/test_client_with_server.py new file mode 100644 index 0000000..b4e1133 --- /dev/null +++ b/python/zeroeventhub/tests/test_client_with_server.py @@ -0,0 +1,136 @@ +import pytest +import pytest_asyncio +import httpx +from httpx import AsyncByteStream +from unittest.mock import AsyncMock, MagicMock +from pytest_mock import MockerFixture +from json import JSONDecodeError +from typing import Any, AsyncGenerator, Dict, Optional, Sequence, Union + +from fastapi import FastAPI, Request +from fastapi.responses import StreamingResponse + + +from zeroeventhub import ( + Client, + Cursor, + Event, + FIRST_CURSOR, + ALL_HEADERS, + DataReader, + ZeroEventHubFastApiHandler, +) + + +app = FastAPI() + + +class FakeDataReader(DataReader): + async def get_data( + self, cursors: Sequence[Cursor], headers: Optional[Sequence[str]], page_size: Optional[int] + ) -> AsyncGenerator[Dict[str, Any], Any]: + yield {"this method will be replaced": "by mocks"} + + +@app.get("/person/feed/v1") +async def feed(request: Request) -> StreamingResponse: + dr = FakeDataReader() + api_handler = ZeroEventHubFastApiHandler(data_reader=dr, server_partition_count=1) + return api_handler.handle(request) + + +@pytest_asyncio.fixture +async def client(): + url = "person/feed/v1" + partition_count = 1 + async with httpx.AsyncClient(app=app, base_url="http://example/") as httpx_client: + yield Client(url, partition_count, httpx_client) + + +@pytest.mark.parametrize( + "feed", + ( + [ + Event( + 0, + {}, + { + "id": 1000, + "type": "PersonNameChanged", + "when": "2023-09-26T11:23:30.472906Z", + "person_id": 876, + "new_name": { + "first_name": "Fred", + "surname": "Bloggs", + }, + }, + ), + Event( + 0, + None, + { + "id": 1001, + "type": "PersonNameChanged", + "when": "2023-09-27T07:34:09.261815Z", + "person_id": 932, + "new_name": { + "first_name": "Joe", + "surname": "King", + }, + }, + ), + Cursor(0, "1002"), + ], + ), +) +@pytest.mark.parametrize( + ("page_size_hint", "headers"), + [ + (None, None), + (None, ALL_HEADERS), + (1000, ALL_HEADERS), + (1000, None), + (None, ["header1", "header2"]), + ], +) +async def test_client_can_successfully_fetch_events_from_server( + client: Client, + feed: Sequence[Union[Event, Cursor]], + mocker: MockerFixture, + page_size_hint: Optional[int], + headers: Optional[Sequence[str]], +) -> None: + """ + Test that fetch_events retrieves all the events and cursors the server responds with. + """ + + # arrange + cursors = [Cursor(0, FIRST_CURSOR)] + + async def yield_data( + _cursors: Sequence[Cursor], + _headers: Optional[Dict[str, Any]], + _page_size_hint: Optional[int], + ) -> AsyncGenerator[Dict[str, Any], None]: + for item in feed: + if isinstance(item, Cursor): + yield {"partition": item.partition_id, "cursor": item.cursor} + elif isinstance(item, Event): + yield {"partition": item.partition_id, "headers": item.headers, "data": item.data} + + get_data_mock = mocker.patch.object(FakeDataReader, "get_data") + get_data_mock.side_effect = yield_data + + # act + result = [ + item + async for item in client.fetch_events( + cursors, headers=headers, page_size_hint=page_size_hint + ) + ] + + # assert + assert result == feed + get_data_mock.assert_called_once_with( + cursors, list(headers) if headers else headers, page_size_hint + ) diff --git a/python/zeroeventhub/zeroeventhub/api_handler.py b/python/zeroeventhub/zeroeventhub/api_handler.py index 2459eb5..c07d3a0 100644 --- a/python/zeroeventhub/zeroeventhub/api_handler.py +++ b/python/zeroeventhub/zeroeventhub/api_handler.py @@ -1,6 +1,6 @@ """ Api handlers definition""" import json -from typing import Any, Generator, Dict, Union, AsyncGenerator +from typing import Any, AsyncGenerator, Dict, Generator, Union from fastapi import Request, HTTPException, status from fastapi.responses import StreamingResponse from .data_reader import DataReader diff --git a/python/zeroeventhub/zeroeventhub/data_reader.py b/python/zeroeventhub/zeroeventhub/data_reader.py index 0628cff..9a95d95 100644 --- a/python/zeroeventhub/zeroeventhub/data_reader.py +++ b/python/zeroeventhub/zeroeventhub/data_reader.py @@ -14,7 +14,7 @@ class DataReader(Protocol): def get_data( self, cursors: Sequence[Cursor], headers: Optional[Sequence[str]], page_size: Optional[int] - ) -> Union[Generator[Dict[str, Any], Any, Any], AsyncGenerator[Dict[str, Any], Any]]: + ) -> Union[Generator[Dict[str, Any], None, None], AsyncGenerator[Dict[str, Any], None]]: """ Read a page of events at server side for the given cursors. From bc73458b4064eac7b424c98ed14810b9f5cf6e5a Mon Sep 17 00:00:00 2001 From: Keith Hall Date: Fri, 8 Sep 2023 10:04:21 +0300 Subject: [PATCH 5/6] add documentation for server (ZeroEventHubFastApiHandler) --- python/zeroeventhub/README.md | 47 +++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/python/zeroeventhub/README.md b/python/zeroeventhub/README.md index f18c565..e6f6554 100644 --- a/python/zeroeventhub/README.md +++ b/python/zeroeventhub/README.md @@ -63,6 +63,53 @@ database. Example of simple single-partition consumption. *Note about the exampl ``` +## Server + +This library makes it easy to setup a zeroeventhub feed endpoint with FastAPI. + +```python +>>> from typing import Annotated, Any, AsyncGenerator, Dict, Optional, Sequence +>>> from fastapi import Depends, FastAPI, Request +>>> from fastapi.responses import StreamingResponse +>>> from zeroeventhub import ( +... Cursor, +... DataReader, +... ZeroEventHubFastApiHandler, +... ) +>>> from unittest.mock import Mock + +>>> app = FastAPI() + +>>> PersonEventRepository = Mock + +>>> class PersonDataReader(DataReader): +... def __init__(self, person_event_repository: PersonEventRepository) -> None: +... self._person_event_repository = person_event_repository +... +... def get_data( +... self, cursors: Sequence[Cursor], headers: Optional[Sequence[str]], page_size: Optional[int] +... ) -> AsyncGenerator[Dict[str, Any], Any]: +... return ( +... self._person_event_repository.get_events_since(cursors[0].cursor) +... .take(page_size) +... .with_headers(headers) +... ) + +>>> def get_person_data_reader() -> PersonDataReader: +... return PersonDataReader(PersonEventRepository()) + +>>> PersonDataReaderDependency = Annotated[ +... PersonDataReader, +... Depends(get_person_data_reader, use_cache=True), +... ] + +>>> @app.get("person/feed/v1") +... async def feed(request: Request, person_data_reader: PersonDataReaderDependency) -> StreamingResponse: +... api_handler = ZeroEventHubFastApiHandler(data_reader=person_data_reader, server_partition_count=1) +... return api_handler.handle(request) + +``` + ## Development To run the test suite, assuming you already have Python 3.10 or later installed and on your `PATH`: From 0217176fb1a9182e58ba02b586dc8f6eee8e2281 Mon Sep 17 00:00:00 2001 From: Keith Hall Date: Thu, 7 Sep 2023 17:43:49 +0300 Subject: [PATCH 6/6] bump version of python package --- python/zeroeventhub/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/zeroeventhub/pyproject.toml b/python/zeroeventhub/pyproject.toml index 96cf8cc..a171c6d 100644 --- a/python/zeroeventhub/pyproject.toml +++ b/python/zeroeventhub/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "zeroeventhub" -version = "0.1.2" +version = "0.2.0" description = "Broker-less event streaming over HTTP" authors = ["Vipps MobilePay"] readme = "README.md"