diff --git a/python/zeroeventhub/CHANGELOG.md b/python/zeroeventhub/CHANGELOG.md new file mode 100644 index 0000000..90a8dc6 --- /dev/null +++ b/python/zeroeventhub/CHANGELOG.md @@ -0,0 +1,10 @@ +# ZeroEventHub Changelog + +## v2.0 + +Switched from `requests` to `httpx` due to a memory leak in `requests` with long-lived sessions and many requests. +- As a result, we are now using an `AsyncClient` which is the `httpx` equivalent of a `requests.Session`. After observing usage patterns, it was noted that the session was always passed in as a parameter, due to consuming feeds which require authentication, so `AsyncClient` has been made mandatory instead of optional to simplify the code a bit. +- It was also observed that it can be useful to directly receive the parsed response as it is streamed from the server instead of being forced to use an `EventReceiver` protocol. This makes it easier to process received events in batches. As the above dependency change is a breaking change anyway, and due to the switch to async calls, it was the perfect time to switch the `Client.fetch_events` method to return an `AsyncGenerator`. A helper method called `receive_events` has been introduced to make it easy to keep the `EventReceiver` behavior from before. + +Also the inconsistency between `PageEventReceiver` returning `Event` and `Cursor` instances but the `EventReceiver` protocol having separate parameters for each property has been addressed, converging on using the dataclasses. This moves away slightly from the Go library implementation but makes sense for the Python library. + diff --git a/python/zeroeventhub/README.md b/python/zeroeventhub/README.md index 553d639..e6f6554 100644 --- a/python/zeroeventhub/README.md +++ b/python/zeroeventhub/README.md @@ -12,43 +12,102 @@ database. Example of simple single-partition consumption. *Note about the exampl * Things starting with "their" is supplied by the service you connect to ```python +>>> import zeroeventhub +>>> import httpx +>>> import asyncio +>>> from typing import Sequence +>>> from unittest.mock import MagicMock, Mock, PropertyMock + +>>> my_db = MagicMock() +>>> my_person_event_repository = Mock() +>>> my_person_event_repository.read_cursors_from_db.return_value = None + # Step 1: Setup -their_partition_count = 1 # documented contract with server -zeh_session = requests.Session() # you can setup the authentication on the session -client = zeroeventhub.Client(their_service_url, their_partition_count, zeh_session) +>>> their_partition_count = 1 # documented contract with server +>>> their_service_url = "https://localhost:8192/person/feed/v1" +>>> my_zeh_session = httpx.AsyncClient() # you can setup the authentication on the session +>>> client = zeroeventhub.Client(their_service_url, their_partition_count, my_zeh_session) # Step 2: Load the cursors from last time we ran -cursors = my_get_cursors_from_db() -if not cursors: - # we have never run before, so we can get all events with FIRST_CURSOR - # (if we just want to receive new events from now, we would use LAST_CURSOR) - cursors = [ - zeroeventhub.Cursor(partition_id, zeroeventhub.FIRST_CURSOR) - for partition_id in range(their_partition_count) - ] +>>> cursors = my_person_event_repository.read_cursors_from_db() +>>> if not cursors: +... # we have never run before, so we can get all events with FIRST_CURSOR +... # (if we just want to receive new events from now, we would use LAST_CURSOR) +... cursors = [ +... zeroeventhub.Cursor(partition_id, zeroeventhub.FIRST_CURSOR) +... for partition_id in range(their_partition_count) +... ] # Step 3: Enter listening loop... -page_of_events = PageEventReceiver() -while myStillWantToReadEvents: - # Step 4: Use ZeroEventHub client to fetch the next page of events. - client.fetch_events( - cursors, - my_page_size_hint, - page_of_events - ) +>>> my_still_want_to_read_events = PropertyMock(side_effect=[True, False]) + +>>> async def poll_for_events(cursors: Sequence[zeroeventhub.Cursor]) -> None: +... page_of_events = zeroeventhub.PageEventReceiver() +... while my_still_want_to_read_events(): +... # Step 4: Use ZeroEventHub client to fetch the next page of events. +... await zeroeventhub.receive_events(page_of_events, +... client.fetch_events(cursors), +... ) +... +... # Step 5: Write the effect of changes to our own database and the updated +... # cursor value in the same transaction. +... with my_db.begin_transaction() as tx: +... my_person_event_repository.write_effect_of_events_to_db(tx, page_of_events.events) +... my_person_event_repository.write_cursors_to_db(tx, page_of_events.latest_checkpoints) +... tx.commit() +... +... cursors = page_of_events.latest_checkpoints +... page_of_events.clear() + +>>> asyncio.run(poll_for_events(cursors)) - # Step 5: Write the effect of changes to our own database and the updated - # cursor value in the same transaction. - with db.begin_transaction() as tx: - my_write_effect_of_events_to_db(tx, page_of_events.events) +``` - my_write_cursors_to_db(tx, page_of_events.latest_checkpoints) +## Server - tx.commit() +This library makes it easy to setup a zeroeventhub feed endpoint with FastAPI. - cursors = page_of_events.latest_checkpoints +```python +>>> from typing import Annotated, Any, AsyncGenerator, Dict, Optional, Sequence +>>> from fastapi import Depends, FastAPI, Request +>>> from fastapi.responses import StreamingResponse +>>> from zeroeventhub import ( +... Cursor, +... DataReader, +... ZeroEventHubFastApiHandler, +... ) +>>> from unittest.mock import Mock + +>>> app = FastAPI() + +>>> PersonEventRepository = Mock + +>>> class PersonDataReader(DataReader): +... def __init__(self, person_event_repository: PersonEventRepository) -> None: +... self._person_event_repository = person_event_repository +... +... def get_data( +... self, cursors: Sequence[Cursor], headers: Optional[Sequence[str]], page_size: Optional[int] +... ) -> AsyncGenerator[Dict[str, Any], Any]: +... return ( +... self._person_event_repository.get_events_since(cursors[0].cursor) +... .take(page_size) +... .with_headers(headers) +... ) + +>>> def get_person_data_reader() -> PersonDataReader: +... return PersonDataReader(PersonEventRepository()) + +>>> PersonDataReaderDependency = Annotated[ +... PersonDataReader, +... Depends(get_person_data_reader, use_cache=True), +... ] + +>>> @app.get("person/feed/v1") +... async def feed(request: Request, person_data_reader: PersonDataReaderDependency) -> StreamingResponse: +... api_handler = ZeroEventHubFastApiHandler(data_reader=person_data_reader, server_partition_count=1) +... return api_handler.handle(request) - page_of_events.clear() ``` ## Development diff --git a/python/zeroeventhub/poetry.lock b/python/zeroeventhub/poetry.lock index 56eb767..a9b3d02 100644 --- a/python/zeroeventhub/poetry.lock +++ b/python/zeroeventhub/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.4.2 and should not be changed by hand. +# This file is automatically @generated by Poetry and should not be changed by hand. [[package]] name = "annotated-types" @@ -247,7 +247,7 @@ pycparser = "*" name = "charset-normalizer" version = "3.2.0" description = "The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet." -category = "main" +category = "dev" optional = false python-versions = ">=3.7.0" files = [ @@ -373,64 +373,64 @@ files = [ [[package]] name = "coverage" -version = "7.3.0" +version = "7.3.1" description = "Code coverage measurement for Python" category = "dev" optional = false python-versions = ">=3.8" files = [ - {file = "coverage-7.3.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:db76a1bcb51f02b2007adacbed4c88b6dee75342c37b05d1822815eed19edee5"}, - {file = "coverage-7.3.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c02cfa6c36144ab334d556989406837336c1d05215a9bdf44c0bc1d1ac1cb637"}, - {file = "coverage-7.3.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:477c9430ad5d1b80b07f3c12f7120eef40bfbf849e9e7859e53b9c93b922d2af"}, - {file = "coverage-7.3.0-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ce2ee86ca75f9f96072295c5ebb4ef2a43cecf2870b0ca5e7a1cbdd929cf67e1"}, - {file = "coverage-7.3.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:68d8a0426b49c053013e631c0cdc09b952d857efa8f68121746b339912d27a12"}, - {file = "coverage-7.3.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:b3eb0c93e2ea6445b2173da48cb548364f8f65bf68f3d090404080d338e3a689"}, - {file = "coverage-7.3.0-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:90b6e2f0f66750c5a1178ffa9370dec6c508a8ca5265c42fbad3ccac210a7977"}, - {file = "coverage-7.3.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:96d7d761aea65b291a98c84e1250cd57b5b51726821a6f2f8df65db89363be51"}, - {file = "coverage-7.3.0-cp310-cp310-win32.whl", hash = "sha256:63c5b8ecbc3b3d5eb3a9d873dec60afc0cd5ff9d9f1c75981d8c31cfe4df8527"}, - {file = "coverage-7.3.0-cp310-cp310-win_amd64.whl", hash = "sha256:97c44f4ee13bce914272589b6b41165bbb650e48fdb7bd5493a38bde8de730a1"}, - {file = "coverage-7.3.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:74c160285f2dfe0acf0f72d425f3e970b21b6de04157fc65adc9fd07ee44177f"}, - {file = "coverage-7.3.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:b543302a3707245d454fc49b8ecd2c2d5982b50eb63f3535244fd79a4be0c99d"}, - {file = "coverage-7.3.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ad0f87826c4ebd3ef484502e79b39614e9c03a5d1510cfb623f4a4a051edc6fd"}, - {file = "coverage-7.3.0-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:13c6cbbd5f31211d8fdb477f0f7b03438591bdd077054076eec362cf2207b4a7"}, - {file = "coverage-7.3.0-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fac440c43e9b479d1241fe9d768645e7ccec3fb65dc3a5f6e90675e75c3f3e3a"}, - {file = "coverage-7.3.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:3c9834d5e3df9d2aba0275c9f67989c590e05732439b3318fa37a725dff51e74"}, - {file = "coverage-7.3.0-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:4c8e31cf29b60859876474034a83f59a14381af50cbe8a9dbaadbf70adc4b214"}, - {file = "coverage-7.3.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:7a9baf8e230f9621f8e1d00c580394a0aa328fdac0df2b3f8384387c44083c0f"}, - {file = "coverage-7.3.0-cp311-cp311-win32.whl", hash = "sha256:ccc51713b5581e12f93ccb9c5e39e8b5d4b16776d584c0f5e9e4e63381356482"}, - {file = "coverage-7.3.0-cp311-cp311-win_amd64.whl", hash = "sha256:887665f00ea4e488501ba755a0e3c2cfd6278e846ada3185f42d391ef95e7e70"}, - {file = "coverage-7.3.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:d000a739f9feed900381605a12a61f7aaced6beae832719ae0d15058a1e81c1b"}, - {file = "coverage-7.3.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:59777652e245bb1e300e620ce2bef0d341945842e4eb888c23a7f1d9e143c446"}, - {file = "coverage-7.3.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c9737bc49a9255d78da085fa04f628a310c2332b187cd49b958b0e494c125071"}, - {file = "coverage-7.3.0-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:5247bab12f84a1d608213b96b8af0cbb30d090d705b6663ad794c2f2a5e5b9fe"}, - {file = "coverage-7.3.0-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e2ac9a1de294773b9fa77447ab7e529cf4fe3910f6a0832816e5f3d538cfea9a"}, - {file = "coverage-7.3.0-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:85b7335c22455ec12444cec0d600533a238d6439d8d709d545158c1208483873"}, - {file = "coverage-7.3.0-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:36ce5d43a072a036f287029a55b5c6a0e9bd73db58961a273b6dc11a2c6eb9c2"}, - {file = "coverage-7.3.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:211a4576e984f96d9fce61766ffaed0115d5dab1419e4f63d6992b480c2bd60b"}, - {file = "coverage-7.3.0-cp312-cp312-win32.whl", hash = "sha256:56afbf41fa4a7b27f6635bc4289050ac3ab7951b8a821bca46f5b024500e6321"}, - {file = "coverage-7.3.0-cp312-cp312-win_amd64.whl", hash = "sha256:7f297e0c1ae55300ff688568b04ff26b01c13dfbf4c9d2b7d0cb688ac60df479"}, - {file = "coverage-7.3.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:ac0dec90e7de0087d3d95fa0533e1d2d722dcc008bc7b60e1143402a04c117c1"}, - {file = "coverage-7.3.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:438856d3f8f1e27f8e79b5410ae56650732a0dcfa94e756df88c7e2d24851fcd"}, - {file = "coverage-7.3.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1084393c6bda8875c05e04fce5cfe1301a425f758eb012f010eab586f1f3905e"}, - {file = "coverage-7.3.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:49ab200acf891e3dde19e5aa4b0f35d12d8b4bd805dc0be8792270c71bd56c54"}, - {file = "coverage-7.3.0-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a67e6bbe756ed458646e1ef2b0778591ed4d1fcd4b146fc3ba2feb1a7afd4254"}, - {file = "coverage-7.3.0-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:8f39c49faf5344af36042b293ce05c0d9004270d811c7080610b3e713251c9b0"}, - {file = "coverage-7.3.0-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:7df91fb24c2edaabec4e0eee512ff3bc6ec20eb8dccac2e77001c1fe516c0c84"}, - {file = "coverage-7.3.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:34f9f0763d5fa3035a315b69b428fe9c34d4fc2f615262d6be3d3bf3882fb985"}, - {file = "coverage-7.3.0-cp38-cp38-win32.whl", hash = "sha256:bac329371d4c0d456e8d5f38a9b0816b446581b5f278474e416ea0c68c47dcd9"}, - {file = "coverage-7.3.0-cp38-cp38-win_amd64.whl", hash = "sha256:b859128a093f135b556b4765658d5d2e758e1fae3e7cc2f8c10f26fe7005e543"}, - {file = "coverage-7.3.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:fc0ed8d310afe013db1eedd37176d0839dc66c96bcfcce8f6607a73ffea2d6ba"}, - {file = "coverage-7.3.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:e61260ec93f99f2c2d93d264b564ba912bec502f679793c56f678ba5251f0393"}, - {file = "coverage-7.3.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:97af9554a799bd7c58c0179cc8dbf14aa7ab50e1fd5fa73f90b9b7215874ba28"}, - {file = "coverage-7.3.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:3558e5b574d62f9c46b76120a5c7c16c4612dc2644c3d48a9f4064a705eaee95"}, - {file = "coverage-7.3.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:37d5576d35fcb765fca05654f66aa71e2808d4237d026e64ac8b397ffa66a56a"}, - {file = "coverage-7.3.0-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:07ea61bcb179f8f05ffd804d2732b09d23a1238642bf7e51dad62082b5019b34"}, - {file = "coverage-7.3.0-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:80501d1b2270d7e8daf1b64b895745c3e234289e00d5f0e30923e706f110334e"}, - {file = "coverage-7.3.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:4eddd3153d02204f22aef0825409091a91bf2a20bce06fe0f638f5c19a85de54"}, - {file = "coverage-7.3.0-cp39-cp39-win32.whl", hash = "sha256:2d22172f938455c156e9af2612650f26cceea47dc86ca048fa4e0b2d21646ad3"}, - {file = "coverage-7.3.0-cp39-cp39-win_amd64.whl", hash = "sha256:60f64e2007c9144375dd0f480a54d6070f00bb1a28f65c408370544091c9bc9e"}, - {file = "coverage-7.3.0-pp38.pp39.pp310-none-any.whl", hash = "sha256:5492a6ce3bdb15c6ad66cb68a0244854d9917478877a25671d70378bdc8562d0"}, - {file = "coverage-7.3.0.tar.gz", hash = "sha256:49dbb19cdcafc130f597d9e04a29d0a032ceedf729e41b181f51cd170e6ee865"}, + {file = "coverage-7.3.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:cd0f7429ecfd1ff597389907045ff209c8fdb5b013d38cfa7c60728cb484b6e3"}, + {file = "coverage-7.3.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:966f10df9b2b2115da87f50f6a248e313c72a668248be1b9060ce935c871f276"}, + {file = "coverage-7.3.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0575c37e207bb9b98b6cf72fdaaa18ac909fb3d153083400c2d48e2e6d28bd8e"}, + {file = "coverage-7.3.1-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:245c5a99254e83875c7fed8b8b2536f040997a9b76ac4c1da5bff398c06e860f"}, + {file = "coverage-7.3.1-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4c96dd7798d83b960afc6c1feb9e5af537fc4908852ef025600374ff1a017392"}, + {file = "coverage-7.3.1-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:de30c1aa80f30af0f6b2058a91505ea6e36d6535d437520067f525f7df123887"}, + {file = "coverage-7.3.1-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:50dd1e2dd13dbbd856ffef69196781edff26c800a74f070d3b3e3389cab2600d"}, + {file = "coverage-7.3.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:b9c0c19f70d30219113b18fe07e372b244fb2a773d4afde29d5a2f7930765136"}, + {file = "coverage-7.3.1-cp310-cp310-win32.whl", hash = "sha256:770f143980cc16eb601ccfd571846e89a5fe4c03b4193f2e485268f224ab602f"}, + {file = "coverage-7.3.1-cp310-cp310-win_amd64.whl", hash = "sha256:cdd088c00c39a27cfa5329349cc763a48761fdc785879220d54eb785c8a38520"}, + {file = "coverage-7.3.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:74bb470399dc1989b535cb41f5ca7ab2af561e40def22d7e188e0a445e7639e3"}, + {file = "coverage-7.3.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:025ded371f1ca280c035d91b43252adbb04d2aea4c7105252d3cbc227f03b375"}, + {file = "coverage-7.3.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a6191b3a6ad3e09b6cfd75b45c6aeeffe7e3b0ad46b268345d159b8df8d835f9"}, + {file = "coverage-7.3.1-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7eb0b188f30e41ddd659a529e385470aa6782f3b412f860ce22b2491c89b8593"}, + {file = "coverage-7.3.1-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:75c8f0df9dfd8ff745bccff75867d63ef336e57cc22b2908ee725cc552689ec8"}, + {file = "coverage-7.3.1-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:7eb3cd48d54b9bd0e73026dedce44773214064be93611deab0b6a43158c3d5a0"}, + {file = "coverage-7.3.1-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:ac3c5b7e75acac31e490b7851595212ed951889918d398b7afa12736c85e13ce"}, + {file = "coverage-7.3.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:5b4ee7080878077af0afa7238df1b967f00dc10763f6e1b66f5cced4abebb0a3"}, + {file = "coverage-7.3.1-cp311-cp311-win32.whl", hash = "sha256:229c0dd2ccf956bf5aeede7e3131ca48b65beacde2029f0361b54bf93d36f45a"}, + {file = "coverage-7.3.1-cp311-cp311-win_amd64.whl", hash = "sha256:c6f55d38818ca9596dc9019eae19a47410d5322408140d9a0076001a3dcb938c"}, + {file = "coverage-7.3.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:5289490dd1c3bb86de4730a92261ae66ea8d44b79ed3cc26464f4c2cde581fbc"}, + {file = "coverage-7.3.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:ca833941ec701fda15414be400c3259479bfde7ae6d806b69e63b3dc423b1832"}, + {file = "coverage-7.3.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:cd694e19c031733e446c8024dedd12a00cda87e1c10bd7b8539a87963685e969"}, + {file = "coverage-7.3.1-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:aab8e9464c00da5cb9c536150b7fbcd8850d376d1151741dd0d16dfe1ba4fd26"}, + {file = "coverage-7.3.1-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:87d38444efffd5b056fcc026c1e8d862191881143c3aa80bb11fcf9dca9ae204"}, + {file = "coverage-7.3.1-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:8a07b692129b8a14ad7a37941a3029c291254feb7a4237f245cfae2de78de037"}, + {file = "coverage-7.3.1-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:2829c65c8faaf55b868ed7af3c7477b76b1c6ebeee99a28f59a2cb5907a45760"}, + {file = "coverage-7.3.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:1f111a7d85658ea52ffad7084088277135ec5f368457275fc57f11cebb15607f"}, + {file = "coverage-7.3.1-cp312-cp312-win32.whl", hash = "sha256:c397c70cd20f6df7d2a52283857af622d5f23300c4ca8e5bd8c7a543825baa5a"}, + {file = "coverage-7.3.1-cp312-cp312-win_amd64.whl", hash = "sha256:5ae4c6da8b3d123500f9525b50bf0168023313963e0e2e814badf9000dd6ef92"}, + {file = "coverage-7.3.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:ca70466ca3a17460e8fc9cea7123c8cbef5ada4be3140a1ef8f7b63f2f37108f"}, + {file = "coverage-7.3.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:f2781fd3cabc28278dc982a352f50c81c09a1a500cc2086dc4249853ea96b981"}, + {file = "coverage-7.3.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6407424621f40205bbe6325686417e5e552f6b2dba3535dd1f90afc88a61d465"}, + {file = "coverage-7.3.1-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:04312b036580ec505f2b77cbbdfb15137d5efdfade09156961f5277149f5e344"}, + {file = "coverage-7.3.1-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ac9ad38204887349853d7c313f53a7b1c210ce138c73859e925bc4e5d8fc18e7"}, + {file = "coverage-7.3.1-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:53669b79f3d599da95a0afbef039ac0fadbb236532feb042c534fbb81b1a4e40"}, + {file = "coverage-7.3.1-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:614f1f98b84eb256e4f35e726bfe5ca82349f8dfa576faabf8a49ca09e630086"}, + {file = "coverage-7.3.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:f1a317fdf5c122ad642db8a97964733ab7c3cf6009e1a8ae8821089993f175ff"}, + {file = "coverage-7.3.1-cp38-cp38-win32.whl", hash = "sha256:defbbb51121189722420a208957e26e49809feafca6afeef325df66c39c4fdb3"}, + {file = "coverage-7.3.1-cp38-cp38-win_amd64.whl", hash = "sha256:f4f456590eefb6e1b3c9ea6328c1e9fa0f1006e7481179d749b3376fc793478e"}, + {file = "coverage-7.3.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:f12d8b11a54f32688b165fd1a788c408f927b0960984b899be7e4c190ae758f1"}, + {file = "coverage-7.3.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:f09195dda68d94a53123883de75bb97b0e35f5f6f9f3aa5bf6e496da718f0cb6"}, + {file = "coverage-7.3.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c6601a60318f9c3945be6ea0f2a80571f4299b6801716f8a6e4846892737ebe4"}, + {file = "coverage-7.3.1-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:07d156269718670d00a3b06db2288b48527fc5f36859425ff7cec07c6b367745"}, + {file = "coverage-7.3.1-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:636a8ac0b044cfeccae76a36f3b18264edcc810a76a49884b96dd744613ec0b7"}, + {file = "coverage-7.3.1-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:5d991e13ad2ed3aced177f524e4d670f304c8233edad3210e02c465351f785a0"}, + {file = "coverage-7.3.1-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:586649ada7cf139445da386ab6f8ef00e6172f11a939fc3b2b7e7c9082052fa0"}, + {file = "coverage-7.3.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:4aba512a15a3e1e4fdbfed2f5392ec221434a614cc68100ca99dcad7af29f3f8"}, + {file = "coverage-7.3.1-cp39-cp39-win32.whl", hash = "sha256:6bc6f3f4692d806831c136c5acad5ccedd0262aa44c087c46b7101c77e139140"}, + {file = "coverage-7.3.1-cp39-cp39-win_amd64.whl", hash = "sha256:553d7094cb27db58ea91332e8b5681bac107e7242c23f7629ab1316ee73c4981"}, + {file = "coverage-7.3.1-pp38.pp39.pp310-none-any.whl", hash = "sha256:220eb51f5fb38dfdb7e5d54284ca4d0cd70ddac047d750111a68ab1798945194"}, + {file = "coverage-7.3.1.tar.gz", hash = "sha256:6cb7fe1581deb67b782c153136541e20901aa312ceedaf1467dcb35255787952"}, ] [package.extras] @@ -672,7 +672,7 @@ pyflakes = ">=3.1.0,<3.2.0" name = "h11" version = "0.14.0" description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1" -category = "dev" +category = "main" optional = false python-versions = ">=3.7" files = [ @@ -706,7 +706,7 @@ lxml = ["lxml"] name = "httpcore" version = "0.17.3" description = "A minimal low-level HTTP client." -category = "dev" +category = "main" optional = false python-versions = ">=3.7" files = [ @@ -728,7 +728,7 @@ socks = ["socksio (>=1.0.0,<2.0.0)"] name = "httpx" version = "0.24.1" description = "The next generation HTTP client." -category = "dev" +category = "main" optional = false python-versions = ">=3.7" files = [ @@ -988,17 +988,6 @@ files = [ {file = "mccabe-0.7.0.tar.gz", hash = "sha256:348e0240c33b60bbdf4e523192ef919f28cb2c3d7d5c7794f74009290f236325"}, ] -[[package]] -name = "mock-protocol" -version = "1.0.0" -description = "A unittest mock library than understands annotations" -category = "dev" -optional = false -python-versions = ">=3.6, <4" -files = [ - {file = "mock_protocol-1.0.0.tar.gz", hash = "sha256:2ecf2d7664c3e5de2cc570a108fcbd1b27dad82b40e639f6be7df51bad3e95dc"}, -] - [[package]] name = "more-itertools" version = "10.1.0" @@ -1554,14 +1543,14 @@ tomli = {version = ">=1.1.0", markers = "python_version < \"3.11\""} [[package]] name = "pytest" -version = "7.4.1" +version = "7.4.2" description = "pytest: simple powerful testing with Python" category = "dev" optional = false python-versions = ">=3.7" files = [ - {file = "pytest-7.4.1-py3-none-any.whl", hash = "sha256:460c9a59b14e27c602eb5ece2e47bec99dc5fc5f6513cf924a7d03a578991b1f"}, - {file = "pytest-7.4.1.tar.gz", hash = "sha256:2f2301e797521b23e4d2585a0a3d7b5e50fdddaaf7e7d6773ea26ddb17c213ab"}, + {file = "pytest-7.4.2-py3-none-any.whl", hash = "sha256:1d881c6124e08ff0a1bb75ba3ec0bfd8b5354a01c194ddd5a0a870a48d99b002"}, + {file = "pytest-7.4.2.tar.gz", hash = "sha256:a766259cfab564a2ad52cb1aae1b881a75c3eb7e34ca3779697c23ed47c47069"}, ] [package.dependencies] @@ -1749,7 +1738,7 @@ rpds-py = ">=0.7.0" name = "requests" version = "2.31.0" description = "Python HTTP for Humans." -category = "main" +category = "dev" optional = false python-versions = ">=3.7" files = [ @@ -1783,25 +1772,19 @@ files = [ requests = ">=2.0.1,<3.0.0" [[package]] -name = "responses" -version = "0.22.0" -description = "A utility library for mocking out the `requests` Python library." +name = "respx" +version = "0.20.2" +description = "A utility for mocking out the Python HTTPX and HTTP Core libraries." category = "dev" optional = false python-versions = ">=3.7" files = [ - {file = "responses-0.22.0-py3-none-any.whl", hash = "sha256:dcf294d204d14c436fddcc74caefdbc5764795a40ff4e6a7740ed8ddbf3294be"}, - {file = "responses-0.22.0.tar.gz", hash = "sha256:396acb2a13d25297789a5866b4881cf4e46ffd49cc26c43ab1117f40b973102e"}, + {file = "respx-0.20.2-py2.py3-none-any.whl", hash = "sha256:ab8e1cf6da28a5b2dd883ea617f8130f77f676736e6e9e4a25817ad116a172c9"}, + {file = "respx-0.20.2.tar.gz", hash = "sha256:07cf4108b1c88b82010f67d3c831dae33a375c7b436e54d87737c7f9f99be643"}, ] [package.dependencies] -requests = ">=2.22.0,<3.0" -toml = "*" -types-toml = "*" -urllib3 = ">=1.25.10" - -[package.extras] -tests = ["coverage (>=6.0.0)", "flake8", "mypy", "pytest (>=7.0.0)", "pytest-asyncio", "pytest-cov", "pytest-httpserver", "types-requests"] +httpx = ">=0.21.0" [[package]] name = "rpds-py" @@ -1981,18 +1964,6 @@ typing-extensions = {version = ">=3.10.0", markers = "python_version < \"3.10\"" [package.extras] full = ["httpx (>=0.22.0)", "itsdangerous", "jinja2", "python-multipart", "pyyaml"] -[[package]] -name = "toml" -version = "0.10.2" -description = "Python Library for Tom's Obvious, Minimal Language" -category = "dev" -optional = false -python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" -files = [ - {file = "toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b"}, - {file = "toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"}, -] - [[package]] name = "tomli" version = "2.0.1" @@ -2029,45 +2000,6 @@ files = [ {file = "trove_classifiers-2023.8.7-py3-none-any.whl", hash = "sha256:a676626a31286130d56de2ea1232484df97c567eb429d56cfcb0637e681ecf09"}, ] -[[package]] -name = "types-requests" -version = "2.31.0.2" -description = "Typing stubs for requests" -category = "dev" -optional = false -python-versions = "*" -files = [ - {file = "types-requests-2.31.0.2.tar.gz", hash = "sha256:6aa3f7faf0ea52d728bb18c0a0d1522d9bfd8c72d26ff6f61bfc3d06a411cf40"}, - {file = "types_requests-2.31.0.2-py3-none-any.whl", hash = "sha256:56d181c85b5925cbc59f4489a57e72a8b2166f18273fd8ba7b6fe0c0b986f12a"}, -] - -[package.dependencies] -types-urllib3 = "*" - -[[package]] -name = "types-toml" -version = "0.10.8.7" -description = "Typing stubs for toml" -category = "dev" -optional = false -python-versions = "*" -files = [ - {file = "types-toml-0.10.8.7.tar.gz", hash = "sha256:58b0781c681e671ff0b5c0319309910689f4ab40e8a2431e205d70c94bb6efb1"}, - {file = "types_toml-0.10.8.7-py3-none-any.whl", hash = "sha256:61951da6ad410794c97bec035d59376ce1cbf4453dc9b6f90477e81e4442d631"}, -] - -[[package]] -name = "types-urllib3" -version = "1.26.25.14" -description = "Typing stubs for urllib3" -category = "dev" -optional = false -python-versions = "*" -files = [ - {file = "types-urllib3-1.26.25.14.tar.gz", hash = "sha256:229b7f577c951b8c1b92c1bc2b2fdb0b49847bd2af6d1cc2a2e3dd340f3bda8f"}, - {file = "types_urllib3-1.26.25.14-py3-none-any.whl", hash = "sha256:9683bbb7fb72e32bfe9d2be6e04875fbe1b3eeec3cbb4ea231435aa7fd6b4f0e"}, -] - [[package]] name = "typing-extensions" version = "4.7.1" @@ -2084,7 +2016,7 @@ files = [ name = "urllib3" version = "1.26.16" description = "HTTP library with thread-safe connection pooling, file post, and more." -category = "main" +category = "dev" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*" files = [ @@ -2340,4 +2272,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "eca2472ed7755b3bfa62d1f77a4d1faff40475d60aaef8bb966ec5b7bb0230a7" +content-hash = "2d501b1fce0b41992ece67514c496b87153c21ca0656a6320605accda028a934" diff --git a/python/zeroeventhub/pyproject.toml b/python/zeroeventhub/pyproject.toml index eba23e1..a171c6d 100644 --- a/python/zeroeventhub/pyproject.toml +++ b/python/zeroeventhub/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "zeroeventhub" -version = "0.1.2" +version = "0.2.0" description = "Broker-less event streaming over HTTP" authors = ["Vipps MobilePay"] readme = "README.md" @@ -10,8 +10,8 @@ keywords = ["event-streaming"] [tool.poetry.dependencies] python = "^3.9" -requests = "^2" fastapi = "^0.103" +httpx = "^0.24.1" [tool.poetry.group.dev.dependencies] pytest = "^7" @@ -21,14 +21,9 @@ black = "^22" mypy = "1.5.1" pytest-mock = "^3" poetry-types = "^0.3" -responses = "^0.22" coverage = "^7" -mock-protocol = "^1" -httpx = "0.24.1" -pytest-asyncio = "0.21.1" - -[tool.poetry.group.types.dependencies] -types-requests = "^2" +respx = "^0.20" +pytest-asyncio = "^0.21" [build-system] requires = ["poetry-core"] @@ -47,10 +42,6 @@ warn_unreachable = true warn_unused_configs = true explicit_package_bases = true -[[tool.mypy.overrides]] -module = "mock_protocol.*" -ignore_missing_imports = true - [[tool.mypy.overrides]] module = "tests.*" # `disallow_untyped_defs` has to be explicitly disabled because it is enabled by @@ -66,3 +57,10 @@ accept-no-raise-doc = false accept-no-return-doc = true accept-no-yields-doc = true default-docstring-type = "default" + +[tool.pytest.ini_options] +asyncio_mode = "auto" + +[tool.coverage.run] +omit = ["tests/*"] +source = ["zeroeventhub/"] diff --git a/python/zeroeventhub/tests/test_api_handler.py b/python/zeroeventhub/tests/test_api_handler.py index 5dbcbfe..08e1997 100644 --- a/python/zeroeventhub/tests/test_api_handler.py +++ b/python/zeroeventhub/tests/test_api_handler.py @@ -1,6 +1,7 @@ from typing import Any, Dict, List, Optional, Sequence, Generator, AsyncGenerator, Union import json import pytest +import asyncio from unittest import mock from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse @@ -16,13 +17,15 @@ class FakeAsyncDataReader(DataReader): async def get_data( self, cursors: Sequence[Cursor], headers: Optional[Sequence[str]], page_size: Optional[int] - ) -> Union[Generator[Dict[str, Any], Any, Any], AsyncGenerator[Dict[str, Any], Any]]: + ) -> AsyncGenerator[Dict[str, Any], None]: header_dict = {} if headers: for header in headers: header_dict[header] = header event_list_p1 = ["e1", "e2", "e3"] event_list_p2 = ["e4", "e5", "e6"] + + await asyncio.sleep(0.1) for cursor in cursors: if cursor.partition_id == 0: for event in event_list_p1: @@ -47,7 +50,7 @@ async def get_data( class FakeDataReader(DataReader): def get_data( self, cursors: Sequence[Cursor], headers: Optional[Sequence[str]], page_size: Optional[int] - ) -> Union[Generator[Dict[str, Any], Any, Any], AsyncGenerator[Dict[str, Any], Any]]: + ) -> Generator[Dict[str, Any], None, None]: header_dict = {} if headers: for header in headers: @@ -157,10 +160,12 @@ async def test_request_handler_cursor0_skipping(): def test_no_n_param(): - client = TestClient(app) - response = client.get("/feed/v1?cursor0=c0&cursor1=c1&headers=h1,h2,h3") - assert response.status_code == 400 - assert response.json() == {"detail": "Parameter n not found"} + with mock.patch.object(FakeDataReader, "get_data") as mocked_get_data: + client = TestClient(app) + response = client.get("/feed/v1?cursor0=c0&cursor1=c1&headers=h1,h2,h3") + assert response.status_code == 400 + assert response.json() == {"detail": "Parameter n not found"} + mocked_get_data.assert_not_called() def test_invalid_n_param(): @@ -172,9 +177,41 @@ def test_invalid_n_param(): mocked_get_data.assert_not_called() -def test_invalid_cursor_param(): +@pytest.mark.parametrize( + ("url",), + [ + ("/feed/v1?n=1&cursor0=0",), + ("/feed/v1?n=0",), + ], +) +def test_mismatched_n_param(url: str) -> None: + with mock.patch.object(FakeDataReader, "get_data") as mocked_get_data: + client = TestClient(app) + response = client.get(url) + assert response.status_code == 400 + assert response.json() == {"detail": "Partition count doesn't match as expected"} + mocked_get_data.assert_not_called() + + +def test_invalid_pagesizehint_param() -> None: + with mock.patch.object(FakeDataReader, "get_data") as mocked_get_data: + client = TestClient(app) + response = client.get("/feed/v1?n=2&cursor0=c0&pagesizehint=foobar") + assert response.status_code == 400 + assert response.json() == {"detail": "Invalid parameter pagesizehint"} + mocked_get_data.assert_not_called() + + +@pytest.mark.parametrize( + ("url",), + [ + ("/feed/v1?n=2&cursor2=c0&headers=h1,h2,h3",), + ("/feed/v1?n=2",), + ], +) +def test_invalid_cursor_param(url: str) -> None: client = TestClient(app) - response = client.get("/feed/v1?n=2&cursor2=c0&headers=h1,h2,h3") + response = client.get(url) assert response.status_code == 400 assert response.json() == {"detail": "Cursor parameter is missing"} diff --git a/python/zeroeventhub/tests/test_client.py b/python/zeroeventhub/tests/test_client.py index cdf2a3d..cb8a202 100644 --- a/python/zeroeventhub/tests/test_client.py +++ b/python/zeroeventhub/tests/test_client.py @@ -1,40 +1,55 @@ import pytest -import responses -import requests -import mock_protocol -from unittest import mock +import pytest_asyncio +import httpx +from httpx import AsyncByteStream +from unittest.mock import AsyncMock, MagicMock from json import JSONDecodeError +from typing import Any, AsyncGenerator, AsyncIterator, Iterable from zeroeventhub import ( Client, Cursor, + Event, APIError, EventReceiver, FIRST_CURSOR, LAST_CURSOR, ALL_HEADERS, + receive_events, ) +class IteratorStream(AsyncByteStream): + def __init__(self, stream: Iterable[bytes]): + self.stream = stream + + async def __aiter__(self) -> AsyncIterator[bytes]: + for chunk in self.stream: + yield chunk + + @pytest.fixture def mock_event_receiver(): - return mock_protocol.from_protocol(EventReceiver) + receiver_mock = MagicMock(spec=EventReceiver) + receiver_mock.event = AsyncMock() + receiver_mock.checkpoint = AsyncMock() + return receiver_mock -@pytest.fixture -def client(): +@pytest_asyncio.fixture +async def client(): url = "https://example.com/feed/v1" partition_count = 2 - return Client(url, partition_count) + async with httpx.AsyncClient() as httpx_client: + yield Client(url, partition_count, httpx_client) @pytest.mark.parametrize( - "page_size_hint,headers", + ("page_size_hint", "headers"), [(10, ["header1", "header2"]), (0, None), (5, ["header1"]), (0, ["header1"])], ) -@responses.activate -def test_events_fetched_successfully_when_there_are_multiple_lines_in_response( - client, mock_event_receiver, page_size_hint, headers +async def test_events_fetched_successfully_when_there_are_multiple_lines_in_response( + client, mock_event_receiver, page_size_hint, headers, respx_mock ): """ Test that fetch_events does not raise an error when successfully called. @@ -43,24 +58,28 @@ def test_events_fetched_successfully_when_there_are_multiple_lines_in_response( # arrange cursors = [Cursor(1, "cursor1"), Cursor(2, "cursor2")] - responses.add( - responses.GET, - client.url, - content_type="application/json", - body="""{ "partition": 1, "cursor": "5" } - { "partition": 1, "headers": {}, "data": "some data"}\n""", - status=200, + respx_mock.get(client.url).mock( + return_value=httpx.Response( + status_code=200, + headers={"content_type": "application/x-ndjson"}, + content=IteratorStream( + [ + b"""{ "partition": 1, "cursor": "5" }\n""", + b"""{ "partition": 1, "headers": {}, "data": "some data"}\n""", + ] + ), + ) ) # act - client.fetch_events(cursors, page_size_hint, mock_event_receiver, headers) + await receive_events(mock_event_receiver, client.fetch_events(cursors, page_size_hint, headers)) # assert - mock_event_receiver.event.assert_called_once_with(1, {}, "some data") - mock_event_receiver.checkpoint.assert_called_once_with(1, "5") + mock_event_receiver.event.assert_called_once_with(Event(1, {}, "some data")) + mock_event_receiver.checkpoint.assert_called_once_with(Cursor(1, "5")) -def test_raises_apierror_when_fetch_events_with_missing_cursors(client, mock_event_receiver): +async def test_raises_apierror_when_fetch_events_with_missing_cursors(client): """ Test that fetch_events raises a ValueError when cursors are missing. """ @@ -72,15 +91,14 @@ def test_raises_apierror_when_fetch_events_with_missing_cursors(client, mock_eve # act & assert with pytest.raises(APIError) as excinfo: - client.fetch_events(cursors, page_size_hint, mock_event_receiver, headers) + await async_generator_to_list(client.fetch_events(cursors, page_size_hint, headers)) # assert assert "cursors are missing" in str(excinfo.value) assert excinfo.value.status() == 400 -@responses.activate -def test_raises_http_error_when_fetch_events_with_unexpected_response(client, mock_event_receiver): +async def test_raises_http_error_when_fetch_events_with_unexpected_response(client, respx_mock): """ Test that fetch_events raises a HTTPError when the response status code is not 2xx @@ -91,21 +109,24 @@ def test_raises_http_error_when_fetch_events_with_unexpected_response(client, mo page_size_hint = 10 headers = ["header1", "header2"] - responses.add(responses.GET, client.url, status=404) + respx_mock.get(client.url).mock( + return_value=httpx.Response( + status_code=404, + ) + ) # act & assert that a HTTPError is raised - with pytest.raises(requests.HTTPError) as excinfo: - client.fetch_events(cursors, page_size_hint, mock_event_receiver, headers) + with pytest.raises(httpx.HTTPError) as excinfo: + await async_generator_to_list(client.fetch_events(cursors, page_size_hint, headers)) # assert - assert str(excinfo.value).startswith(f"404 Client Error: Not Found for url: {client.url}?") + assert str(excinfo.value).startswith(f"Client error '404 Not Found' for url '{client.url}?") -@responses.activate -def test_raises_error_when_exception_while_receiving_checkpoint(client, mock_event_receiver): +async def test_raises_error_when_exception_while_parsing_checkpoint(client, respx_mock): """ - Test that fetch_events raises a ValueError when the checkpoint method - on the event receiver returns an error. + Test that fetch_events raises a ValueError when the checkpoint returned + from the server cannot be parsed. """ # arrange @@ -113,28 +134,23 @@ def test_raises_error_when_exception_while_receiving_checkpoint(client, mock_eve page_size_hint = 10 headers = ["header1", "header2"] - responses.add( - responses.GET, - client.url, - json={ - "partition": 0, - "cursor": "0", - }, - status=200, + respx_mock.get(client.url).mock( + return_value=httpx.Response( + status_code=200, + headers={"content_type": "application/x-ndjson"}, + content="""{ "cursor": "0" }""", # NOTE: partition is missing + ) ) - mock_event_receiver.checkpoint.side_effect = Exception("error while receiving checkpoint") - # act & assert - with pytest.raises(ValueError, match="error while receiving checkpoint"): - client.fetch_events(cursors, page_size_hint, mock_event_receiver, headers) + with pytest.raises(ValueError, match="error while parsing checkpoint"): + await async_generator_to_list(client.fetch_events(cursors, page_size_hint, headers)) -@responses.activate -def test_raises_error_when_exception_while_receiving_event(client, mock_event_receiver): +async def test_raises_error_when_exception_while_parsing_event(client, respx_mock): """ - Test that fetch_events raises a ValueError when the event method - on the event receiver returns an error. + Test that fetch_events raises a ValueError when the event returned + from the server cannot be parsed. """ # arrange @@ -142,142 +158,139 @@ def test_raises_error_when_exception_while_receiving_event(client, mock_event_re page_size_hint = 10 headers = ["header1", "header2"] - responses.add( - responses.GET, - client.url, - json={"partition": 0, "headers": {}, "data": "some data"}, - status=200, + respx_mock.get(client.url).mock( + return_value=httpx.Response( + status_code=200, + headers={"content_type": "application/x-ndjson"}, + content="""{ "data": "" }""", # NOTE: partition is missing + ) ) - mock_event_receiver.event.side_effect = Exception("error while receiving event") - # act & assert - with pytest.raises(ValueError, match="error while receiving event"): - client.fetch_events(cursors, page_size_hint, mock_event_receiver, headers) + with pytest.raises(ValueError, match="error while parsing event"): + await async_generator_to_list(client.fetch_events(cursors, page_size_hint, headers)) - # assert - mock_event_receiver.event.assert_called() - -@responses.activate -def test_fetch_events_succeeds_when_response_is_empty(client, mock_event_receiver): +async def test_exceptions_bubble_up_when_exception_while_receiving_checkpoint( + client, mock_event_receiver, respx_mock +): """ - Test that fetch_events gracefully handles an empty response. + Test that receive_events doesn't hide the exception when the checkpoint method + on the event receiver returns an error. """ # arrange cursors = [Cursor(1, "cursor1"), Cursor(2, "cursor2")] page_size_hint = 10 - headers = None + headers = ["header1", "header2"] - responses.add( - responses.GET, - client.url, - content_type="application/json", - body="", - status=204, + respx_mock.get(client.url).mock( + return_value=httpx.Response( + status_code=200, + headers={"content_type": "application/x-ndjson"}, + content="""{ "partition": 0, "cursor": "0" }""", + ) ) - # act - client.fetch_events(cursors, page_size_hint, mock_event_receiver, headers) + mock_event_receiver.checkpoint.side_effect = Exception("error while receiving checkpoint") - # assert that the event and checkpoint methods were not called - mock_event_receiver.event.assert_not_called() - mock_event_receiver.checkpoint.assert_not_called() + # act & assert + with pytest.raises(Exception, match="error while receiving checkpoint"): + await receive_events( + mock_event_receiver, client.fetch_events(cursors, page_size_hint, headers) + ) -@responses.activate -def test_raises_error_when_response_contains_invalid_json_line(client, mock_event_receiver): +async def test_exceptions_bubble_up_when_exception_while_receiving_event( + client, mock_event_receiver, respx_mock +): """ - Test that fetch_events raises a JSONDecodeError when the response contains a non-empty - line which is not valid JSON. + Test that receive_events doesn't hide the exception when the event method + on the event receiver returns an error. """ # arrange - cursors = [Cursor(0, "cursor1"), Cursor(1, "cursor2")] + cursors = [Cursor(1, "cursor1"), Cursor(2, "cursor2")] page_size_hint = 10 headers = ["header1", "header2"] - responses.add( - responses.GET, - client.url, - body="""{"partition": 1,"cursor": "5"}\ninvalid json""", - status=200, + respx_mock.get(client.url).mock( + return_value=httpx.Response( + status_code=200, + headers={"content_type": "application/x-ndjson"}, + content="""{"partition": 0, "headers": {}, "data": "some data"}\n""", + ) ) + mock_event_receiver.event.side_effect = Exception("some error while processing the event") # act & assert - with pytest.raises(JSONDecodeError): - client.fetch_events(cursors, page_size_hint, mock_event_receiver, headers) + with pytest.raises(Exception, match="some error while processing the event"): + await receive_events( + mock_event_receiver, client.fetch_events(cursors, page_size_hint, headers) + ) - # assert that the checkpoint method was called for the first response line - mock_event_receiver.checkpoint.assert_called_once_with(1, "5") + # assert + mock_event_receiver.event.assert_called() -@responses.activate -def test_owned_session_destroyed_when_client_destroyed(mock_event_receiver, mocker): +async def test_fetch_events_succeeds_when_response_is_empty( + client, mock_event_receiver, respx_mock +): """ - Test that the client-owned session is closed when the client is destroyed + Test that fetch_events gracefully handles an empty response. """ # arrange - url = "https://example.com" - partition_count = 2 - session_mock = mocker.MagicMock(spec_set=requests.Session) - with mock.patch("zeroeventhub.client.requests.Session") as create_session_mock: - create_session_mock.return_value = session_mock - client = Client(url, partition_count) - create_session_mock.assert_called_once() - assert session_mock == client.session - cursors = [Cursor(1, "cursor1"), Cursor(2, "cursor2")] page_size_hint = 10 - headers = ["header1", "header2"] + headers = None - responses.add( - responses.GET, - client.url, - body="""{"partition": 0,"cursor": 0}\n""", - status=200, + respx_mock.get(client.url).mock( + return_value=httpx.Response( + status_code=204, + headers={"content_type": "application/x-ndjson"}, + content="", + ) ) # act - client.fetch_events(cursors, page_size_hint, mock_event_receiver, headers) - del client + await receive_events(mock_event_receiver, client.fetch_events(cursors, page_size_hint, headers)) - # assert - session_mock.send.assert_called_once() - session_mock.close.assert_called_once() + # assert that the event and checkpoint methods were not called + mock_event_receiver.event.assert_not_called() + mock_event_receiver.checkpoint.assert_not_called() -@responses.activate -def test_provided_session_not_destroyed_when_client_destroyed(mock_event_receiver, mocker): +async def test_raises_error_when_response_contains_invalid_json_line( + client, mock_event_receiver, respx_mock +): """ - Test that the provided session is not closed when the client is destroyed + Test that fetch_events raises a JSONDecodeError when the response contains a non-empty + line which is not valid JSON. """ # arrange - url = "https://example.com" - partition_count = 2 - session_mock = mocker.MagicMock(spec_set=requests.Session) - with mock.patch("zeroeventhub.client.requests.Session") as create_session_mock: - client = Client(url, partition_count, session_mock) - create_session_mock.assert_not_called() - - cursors = [Cursor(1, FIRST_CURSOR), Cursor(2, LAST_CURSOR)] - page_size_hint = None - headers = ALL_HEADERS - - responses.add( - responses.GET, - client.url, - body="""{"partition": 1,"cursor": "123"}\n""", - status=200, + cursors = [Cursor(0, "cursor1"), Cursor(1, "cursor2")] + page_size_hint = 10 + headers = ["header1", "header2"] + + respx_mock.get(client.url).mock( + return_value=httpx.Response( + status_code=200, + headers={"content_type": "application/x-ndjson"}, + content="""{"partition": 1,"cursor": "5"}\ninvalid json""", + ) ) - # act - client.fetch_events(cursors, page_size_hint, mock_event_receiver, headers) - del client + # act & assert + with pytest.raises(JSONDecodeError): + await receive_events( + mock_event_receiver, client.fetch_events(cursors, page_size_hint, headers) + ) - # assert - session_mock.send.assert_called_once() - session_mock.close.assert_not_called() + # assert that the checkpoint method was called for the first response line + mock_event_receiver.checkpoint.assert_called_once_with(Cursor(1, "5")) + + +async def async_generator_to_list(input: AsyncGenerator[Any, None]) -> list[Any]: + return [item async for item in input] diff --git a/python/zeroeventhub/tests/test_client_with_server.py b/python/zeroeventhub/tests/test_client_with_server.py new file mode 100644 index 0000000..b4e1133 --- /dev/null +++ b/python/zeroeventhub/tests/test_client_with_server.py @@ -0,0 +1,136 @@ +import pytest +import pytest_asyncio +import httpx +from httpx import AsyncByteStream +from unittest.mock import AsyncMock, MagicMock +from pytest_mock import MockerFixture +from json import JSONDecodeError +from typing import Any, AsyncGenerator, Dict, Optional, Sequence, Union + +from fastapi import FastAPI, Request +from fastapi.responses import StreamingResponse + + +from zeroeventhub import ( + Client, + Cursor, + Event, + FIRST_CURSOR, + ALL_HEADERS, + DataReader, + ZeroEventHubFastApiHandler, +) + + +app = FastAPI() + + +class FakeDataReader(DataReader): + async def get_data( + self, cursors: Sequence[Cursor], headers: Optional[Sequence[str]], page_size: Optional[int] + ) -> AsyncGenerator[Dict[str, Any], Any]: + yield {"this method will be replaced": "by mocks"} + + +@app.get("/person/feed/v1") +async def feed(request: Request) -> StreamingResponse: + dr = FakeDataReader() + api_handler = ZeroEventHubFastApiHandler(data_reader=dr, server_partition_count=1) + return api_handler.handle(request) + + +@pytest_asyncio.fixture +async def client(): + url = "person/feed/v1" + partition_count = 1 + async with httpx.AsyncClient(app=app, base_url="http://example/") as httpx_client: + yield Client(url, partition_count, httpx_client) + + +@pytest.mark.parametrize( + "feed", + ( + [ + Event( + 0, + {}, + { + "id": 1000, + "type": "PersonNameChanged", + "when": "2023-09-26T11:23:30.472906Z", + "person_id": 876, + "new_name": { + "first_name": "Fred", + "surname": "Bloggs", + }, + }, + ), + Event( + 0, + None, + { + "id": 1001, + "type": "PersonNameChanged", + "when": "2023-09-27T07:34:09.261815Z", + "person_id": 932, + "new_name": { + "first_name": "Joe", + "surname": "King", + }, + }, + ), + Cursor(0, "1002"), + ], + ), +) +@pytest.mark.parametrize( + ("page_size_hint", "headers"), + [ + (None, None), + (None, ALL_HEADERS), + (1000, ALL_HEADERS), + (1000, None), + (None, ["header1", "header2"]), + ], +) +async def test_client_can_successfully_fetch_events_from_server( + client: Client, + feed: Sequence[Union[Event, Cursor]], + mocker: MockerFixture, + page_size_hint: Optional[int], + headers: Optional[Sequence[str]], +) -> None: + """ + Test that fetch_events retrieves all the events and cursors the server responds with. + """ + + # arrange + cursors = [Cursor(0, FIRST_CURSOR)] + + async def yield_data( + _cursors: Sequence[Cursor], + _headers: Optional[Dict[str, Any]], + _page_size_hint: Optional[int], + ) -> AsyncGenerator[Dict[str, Any], None]: + for item in feed: + if isinstance(item, Cursor): + yield {"partition": item.partition_id, "cursor": item.cursor} + elif isinstance(item, Event): + yield {"partition": item.partition_id, "headers": item.headers, "data": item.data} + + get_data_mock = mocker.patch.object(FakeDataReader, "get_data") + get_data_mock.side_effect = yield_data + + # act + result = [ + item + async for item in client.fetch_events( + cursors, headers=headers, page_size_hint=page_size_hint + ) + ] + + # assert + assert result == feed + get_data_mock.assert_called_once_with( + cursors, list(headers) if headers else headers, page_size_hint + ) diff --git a/python/zeroeventhub/tests/test_page_event_receiver.py b/python/zeroeventhub/tests/test_page_event_receiver.py index 89f5a7a..868c078 100644 --- a/python/zeroeventhub/tests/test_page_event_receiver.py +++ b/python/zeroeventhub/tests/test_page_event_receiver.py @@ -13,24 +13,24 @@ def page_event_receiver(): return PageEventReceiver() -def receive_page_1_events(page_event_receiver: EventReceiver) -> None: - page_event_receiver.event(1, {"header1": "abc"}, "event 1 partition 1") - page_event_receiver.event(1, {}, "event 2 partition 1") - page_event_receiver.checkpoint(1, "0xf01dab1e") - page_event_receiver.event(2, {"header1": "abc"}, "event 1 partition 2") - page_event_receiver.event(1, {"header1": "def"}, "event 3 partition 1") - page_event_receiver.checkpoint(1, "0xFOO") - page_event_receiver.event(2, {"header1": "blah"}, "event 2 partition 2") - page_event_receiver.checkpoint(2, "0xBA5EBA11") - - -def test_page_contains_all_received_events_and_checkpoints(page_event_receiver): +async def receive_page_1_events(page_event_receiver: EventReceiver) -> None: + await page_event_receiver.event(Event(1, {"header1": "abc"}, "event 1 partition 1")) + await page_event_receiver.event(Event(1, {}, "event 2 partition 1")) + await page_event_receiver.checkpoint(Cursor(1, "0xf01dab1e")) + await page_event_receiver.event(Event(2, {"header1": "abc"}, "event 1 partition 2")) + await page_event_receiver.event(Event(1, {"header1": "def"}, "event 3 partition 1")) + await page_event_receiver.checkpoint(Cursor(1, "0xFOO")) + await page_event_receiver.event(Event(2, {"header1": "blah"}, "event 2 partition 2")) + await page_event_receiver.checkpoint(Cursor(2, "0xBA5EBA11")) + + +async def test_page_contains_all_received_events_and_checkpoints(page_event_receiver): """ Test that the page contains all received events and checkpoints in order. """ # act - receive_page_1_events(page_event_receiver) + await receive_page_1_events(page_event_receiver) # assert assert page_event_receiver.events == [ @@ -53,12 +53,12 @@ def test_page_contains_all_received_events_and_checkpoints(page_event_receiver): ] -def test_page_is_empty_after_clearing(page_event_receiver): +async def test_page_is_empty_after_clearing(page_event_receiver): """ Test that the page contains no events or checkpoints after being cleared. """ # arrange - receive_page_1_events(page_event_receiver) + await receive_page_1_events(page_event_receiver) # act page_event_receiver.clear() @@ -69,7 +69,7 @@ def test_page_is_empty_after_clearing(page_event_receiver): assert not page_event_receiver.latest_checkpoints -def test_page_contains_all_received_events_and_checkpoints_when_receiving_after_being_cleared( +async def test_page_contains_all_received_events_and_checkpoints_when_receiving_after_being_cleared( page_event_receiver, ): """ @@ -77,12 +77,12 @@ def test_page_contains_all_received_events_and_checkpoints_when_receiving_after_ from the second page only after the first page was cleared. """ # arrange - receive_page_1_events(page_event_receiver) + await receive_page_1_events(page_event_receiver) # act page_event_receiver.clear() - page_event_receiver.event(1, None, "event 4 partition 1") - page_event_receiver.checkpoint(1, "0x5ca1ab1e") + await page_event_receiver.event(Event(1, None, "event 4 partition 1")) + await page_event_receiver.checkpoint(Cursor(1, "0x5ca1ab1e")) # assert assert page_event_receiver.events == [ diff --git a/python/zeroeventhub/zeroeventhub/__init__.py b/python/zeroeventhub/zeroeventhub/__init__.py index a3efaba..316536a 100644 --- a/python/zeroeventhub/zeroeventhub/__init__.py +++ b/python/zeroeventhub/zeroeventhub/__init__.py @@ -2,10 +2,11 @@ from .client import Client from .cursor import Cursor, FIRST_CURSOR, LAST_CURSOR -from .event_receiver import EventReceiver +from .event import Event +from .event_receiver import EventReceiver, receive_events from .errors import APIError, ErrCursorsMissing from .constants import ALL_HEADERS -from .page_event_receiver import Event, PageEventReceiver +from .page_event_receiver import PageEventReceiver from .api_handler import ZeroEventHubFastApiHandler from .data_reader import DataReader @@ -23,4 +24,5 @@ "PageEventReceiver", "ZeroEventHubFastApiHandler", "DataReader", + "receive_events", ] diff --git a/python/zeroeventhub/zeroeventhub/api_handler.py b/python/zeroeventhub/zeroeventhub/api_handler.py index 2459eb5..c07d3a0 100644 --- a/python/zeroeventhub/zeroeventhub/api_handler.py +++ b/python/zeroeventhub/zeroeventhub/api_handler.py @@ -1,6 +1,6 @@ """ Api handlers definition""" import json -from typing import Any, Generator, Dict, Union, AsyncGenerator +from typing import Any, AsyncGenerator, Dict, Generator, Union from fastapi import Request, HTTPException, status from fastapi.responses import StreamingResponse from .data_reader import DataReader diff --git a/python/zeroeventhub/zeroeventhub/client.py b/python/zeroeventhub/zeroeventhub/client.py index 4b6346a..1f839fc 100644 --- a/python/zeroeventhub/zeroeventhub/client.py +++ b/python/zeroeventhub/zeroeventhub/client.py @@ -1,25 +1,25 @@ """Module containing client-side related code for ZeroEventHub""" import json -from typing import Dict, Optional, Any, Sequence, Union -import requests +from typing import Dict, Any, Optional, Sequence, Union +from collections.abc import AsyncGenerator +import httpx from .cursor import Cursor -from .event_receiver import EventReceiver +from .event import Event from .errors import ErrCursorsMissing class Client: """ - Client-side code to query a ZeroEventHub server to fetch events and pass them to the supplied - EventReceiver instance. + Client-side code to query a ZeroEventHub server to fetch events. """ def __init__( self, url: str, partition_count: int, - session: Optional[requests.Session] = None, + http_client: httpx.AsyncClient, ) -> None: """ @@ -27,55 +27,45 @@ def __init__( :param url: The base URL for the service. :param partition_count: The number of partitions the ZeroEventHub server has. - :param session: An optional session under which to make the HTTP requests. + :param http_client: A httpx AsyncClient under which to make the HTTP requests. This allows one time setup of authentication etc. on the session, and increases performance if fetching events frequently due to connection pooling. """ self.url = url self.partition_count = partition_count - self._client_owns_session = not bool(session) - self._session = session or requests.Session() - - def __del__(self) -> None: - """ - Close the session when this Client instance is destroyed if we created it - during initialization - """ - if self._client_owns_session: - self._session.close() + self._http_client = http_client @property - def session(self) -> requests.Session: - """Return the session being used by this client.""" - return self._session + def http_client(self) -> httpx.AsyncClient: + """Return the http_client being used by this client.""" + return self._http_client - def fetch_events( + async def fetch_events( self, cursors: Sequence[Cursor], - page_size_hint: Optional[int], - event_receiver: EventReceiver, + page_size_hint: Optional[int] = None, headers: Optional[Sequence[str]] = None, - ) -> None: + ) -> AsyncGenerator[Union[Event, Cursor], None]: """ - Fetch events from the server using the provided context, cursors, page size hint, - event receiver, and headers. + Fetch events from the server using the provided cursors, page size hint and + desired headers. :param cursors: A sequence of cursors to be used in the request. :param page_size_hint: An optional hint for the page size of the response. - :param event_receiver: An event receiver to handle the received events. :param headers: An optional sequence containing event headers desired in the response. :raises APIError: if cursors are missing. :raises ValueError: if an exception occurs while the event receiver handles the response. - :raises requests.exceptions.RequestException: if unable to call the endpoint successfully. - :raises requests.HTTPError: if response status code does not indicate success. + :raises httpx.RequestError: if unable to call the endpoint successfully. + :raises httpx.HTTPError: if response status code does not indicate success. :raises json.JSONDecodeError: if a line from the response cannot be decoded into JSON. """ self._validate_inputs(cursors) - req = self._build_request(cursors, page_size_hint, headers) + params = self._build_request_params(cursors, page_size_hint, headers) - with self.session.send(req, stream=True) as res: - self._process_response(res, event_receiver) + async with self._http_client.stream("GET", self.url, params=params) as res: + async for event_or_checkpoint in self._process_response(res): + yield event_or_checkpoint def _validate_inputs(self, cursors: Sequence[Cursor]) -> None: """ @@ -87,12 +77,12 @@ def _validate_inputs(self, cursors: Sequence[Cursor]) -> None: if not cursors: raise ErrCursorsMissing - def _build_request( + def _build_request_params( self, cursors: Sequence[Cursor], page_size_hint: Optional[int], headers: Optional[Sequence[str]], - ) -> requests.PreparedRequest: + ) -> Dict[str, Union[str, int]]: """ Build the http request using the provided inputs. @@ -113,50 +103,48 @@ def _build_request( if headers: params["headers"] = ",".join(headers) - request = requests.Request("GET", self.url, params=params) - return self._session.prepare_request(request) + return params - def _process_response(self, res: requests.Response, event_receiver: EventReceiver) -> None: + async def _process_response( + self, res: httpx.Response + ) -> AsyncGenerator[Union[Event, Cursor], None]: """ Process the response from the server. :param res: the server response - :param event_receiver: An event receiver to handle the received events. - :raises requests.HTTPError: if response status code does not indicate success. + :raises httpx.HTTPError: if response status code does not indicate success. :raises json.JSONDecodeError: if a line from the response cannot be decoded into JSON. :raises ValueError: error while EventReceiver handles checkpoint or event. """ res.raise_for_status() - for line in res.iter_lines(): - checkpoint_or_event = json.loads(line) - self._process_checkpoint_or_event(checkpoint_or_event, event_receiver) + async for line in res.aiter_lines(): + yield self._parse_checkpoint_or_event(line) - def _process_checkpoint_or_event( - self, checkpoint_or_event: Dict[str, Any], event_receiver: EventReceiver - ) -> None: + def _parse_checkpoint_or_event(self, raw_line: str) -> Union[Event, Cursor]: """ - Process a line of response from the server. + Parse a line of response from the server. - :param checkpoint_or_event: A dictionary containing a checkpoint or event. - :param event_receiver: An event receiver to handle the received events. - :raises ValueError: if an error occurred in the event receiver while - the event or checkpoint was being processed. + :param raw_line: The raw JSON line from the server + :raises ValueError: if an error occurred parsing the json line into an event or checkpoint. """ - if checkpoint_or_event.get("cursor", None) is not None: + checkpoint_or_event: Dict[str, Any] = json.loads(raw_line) + + if (cursor := checkpoint_or_event.get("cursor", None)) is not None: try: - event_receiver.checkpoint( - checkpoint_or_event["partition"], checkpoint_or_event["cursor"] + return Cursor( + partition_id=checkpoint_or_event["partition"], + cursor=cursor, ) except Exception as error: - raise ValueError("error while receiving checkpoint") from error + raise ValueError("error while parsing checkpoint") from error else: try: - event_receiver.event( - checkpoint_or_event["partition"], - checkpoint_or_event.get("headers", None), - checkpoint_or_event["data"], + return Event( + partition_id=checkpoint_or_event["partition"], + headers=checkpoint_or_event.get("headers", None), + data=checkpoint_or_event["data"], ) except Exception as error: - raise ValueError("error while receiving event") from error + raise ValueError("error while parsing event") from error diff --git a/python/zeroeventhub/zeroeventhub/data_reader.py b/python/zeroeventhub/zeroeventhub/data_reader.py index 0628cff..9a95d95 100644 --- a/python/zeroeventhub/zeroeventhub/data_reader.py +++ b/python/zeroeventhub/zeroeventhub/data_reader.py @@ -14,7 +14,7 @@ class DataReader(Protocol): def get_data( self, cursors: Sequence[Cursor], headers: Optional[Sequence[str]], page_size: Optional[int] - ) -> Union[Generator[Dict[str, Any], Any, Any], AsyncGenerator[Dict[str, Any], Any]]: + ) -> Union[Generator[Dict[str, Any], None, None], AsyncGenerator[Dict[str, Any], None]]: """ Read a page of events at server side for the given cursors. diff --git a/python/zeroeventhub/zeroeventhub/event.py b/python/zeroeventhub/zeroeventhub/event.py new file mode 100644 index 0000000..ced5bbf --- /dev/null +++ b/python/zeroeventhub/zeroeventhub/event.py @@ -0,0 +1,13 @@ +"""Module to define the event dataclass""" + +from dataclasses import dataclass +from typing import Any, Dict, Optional + + +@dataclass +class Event: + """All properties received relating to a certain event.""" + + partition_id: int + headers: Optional[Dict[str, str]] + data: Any diff --git a/python/zeroeventhub/zeroeventhub/event_receiver.py b/python/zeroeventhub/zeroeventhub/event_receiver.py index f7a0dcd..11daf9d 100644 --- a/python/zeroeventhub/zeroeventhub/event_receiver.py +++ b/python/zeroeventhub/zeroeventhub/event_receiver.py @@ -1,6 +1,9 @@ """Module to define the EventReceiver interface""" -from typing import Protocol, Dict, Any, Optional +from typing import Protocol, Union +from collections.abc import AsyncGenerator +from .cursor import Cursor +from .event import Event class EventReceiver(Protocol): @@ -10,19 +13,28 @@ class EventReceiver(Protocol): Checkpoint in this context is basically a cursor. """ - def event(self, partition_id: int, headers: Optional[Dict[str, str]], data: Any) -> None: + async def event(self, event: Event) -> None: """ Event method processes actual events. - :param partition_id: the partition id - :param headers: the headers - :param data: the data + :param event: the details of the event which has been received from the server """ - def checkpoint(self, partition_id: int, cursor: str) -> None: + async def checkpoint(self, checkpoint: Cursor) -> None: """ Checkpoint method processes cursors. - :param partition_id: the partition id - :param cursor: the cursor + :param checkpoint: the checkpoint which was received from the server """ + + +async def receive_events( + event_receiver: EventReceiver, events: AsyncGenerator[Union[Cursor, Event], None] +) -> None: + """bridge between the output from the Client fetch_events return value + and the EventReceiver interface.""" + async for event_or_checkpoint in events: + if isinstance(event_or_checkpoint, Cursor): + await event_receiver.checkpoint(event_or_checkpoint) + else: + await event_receiver.event(event_or_checkpoint) diff --git a/python/zeroeventhub/zeroeventhub/page_event_receiver.py b/python/zeroeventhub/zeroeventhub/page_event_receiver.py index 0019ebe..374e055 100644 --- a/python/zeroeventhub/zeroeventhub/page_event_receiver.py +++ b/python/zeroeventhub/zeroeventhub/page_event_receiver.py @@ -1,18 +1,9 @@ """Module to make it easy to receive a page of events""" -from typing import Dict, Any, Sequence, Optional, List -from dataclasses import dataclass -from .event_receiver import EventReceiver +from typing import Dict, Sequence, List from .cursor import Cursor - - -@dataclass -class Event: - """All properties received relating to a certain event.""" - - partition_id: int - headers: Optional[Dict[str, str]] - data: Any +from .event import Event +from .event_receiver import EventReceiver class PageEventReceiver(EventReceiver): @@ -24,7 +15,7 @@ def __init__(self) -> None: """Initialize the PageEventReceiver with empty state.""" self._events: List[Event] = [] self._checkpoints: List[Cursor] = [] - self._latest_checkpoints: Dict[int, str] = {} + self._latest_checkpoints: Dict[int, Cursor] = {} def clear(self) -> None: """Clear the received events and checkpoints, ready to handle a new page.""" @@ -45,27 +36,21 @@ def checkpoints(self) -> Sequence[Cursor]: @property def latest_checkpoints(self) -> Sequence[Cursor]: """Only return the latest checkpoint for each partition.""" - return [ - Cursor(partition_id, cursor) - for partition_id, cursor in self._latest_checkpoints.items() - ] + return list(self._latest_checkpoints.values()) - def event(self, partition_id: int, headers: Optional[Dict[str, str]], data: Any) -> None: + async def event(self, event: Event) -> None: """ Add the given event to the list. - :param partition_id: the partition id - :param headers: the headers - :param data: the data + :param event: the event """ - self._events.append(Event(partition_id, headers, data)) + self._events.append(event) - def checkpoint(self, partition_id: int, cursor: str) -> None: + async def checkpoint(self, checkpoint: Cursor) -> None: """ - Add the given cursor to the list. + Add the given checkpoint to the list. - :param partition_id: the partition id - :param cursor: the cursor + :param checkpoint: the cursor to use as a checkpoint to continue processing from later """ - self._checkpoints.append(Cursor(partition_id, cursor)) - self._latest_checkpoints[partition_id] = cursor + self._checkpoints.append(checkpoint) + self._latest_checkpoints[checkpoint.partition_id] = checkpoint