Skip to content

Commit

Permalink
Merge pull request #34 from vippsas/python/change_requests_dependency
Browse files Browse the repository at this point in the history
Python client: Change "requests" dependency to "httpx"
  • Loading branch information
keith-hall-vmp authored Sep 8, 2023
2 parents 4ecf134 + 0217176 commit a8f3297
Show file tree
Hide file tree
Showing 15 changed files with 628 additions and 443 deletions.
10 changes: 10 additions & 0 deletions python/zeroeventhub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# ZeroEventHub Changelog

## v2.0

Switched from `requests` to `httpx` due to a memory leak in `requests` with long-lived sessions and many requests.
- As a result, we are now using an `AsyncClient` which is the `httpx` equivalent of a `requests.Session`. After observing usage patterns, it was noted that the session was always passed in as a parameter, due to consuming feeds which require authentication, so `AsyncClient` has been made mandatory instead of optional to simplify the code a bit.
- It was also observed that it can be useful to directly receive the parsed response as it is streamed from the server instead of being forced to use an `EventReceiver` protocol. This makes it easier to process received events in batches. As the above dependency change is a breaking change anyway, and due to the switch to async calls, it was the perfect time to switch the `Client.fetch_events` method to return an `AsyncGenerator`. A helper method called `receive_events` has been introduced to make it easy to keep the `EventReceiver` behavior from before.

Also the inconsistency between `PageEventReceiver` returning `Event` and `Cursor` instances but the `EventReceiver` protocol having separate parameters for each property has been addressed, converging on using the dataclasses. This moves away slightly from the Go library implementation but makes sense for the Python library.

113 changes: 86 additions & 27 deletions python/zeroeventhub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,43 +12,102 @@ database. Example of simple single-partition consumption. *Note about the exampl
* Things starting with "their" is supplied by the service you connect to

```python
>>> import zeroeventhub
>>> import httpx
>>> import asyncio
>>> from typing import Sequence
>>> from unittest.mock import MagicMock, Mock, PropertyMock

>>> my_db = MagicMock()
>>> my_person_event_repository = Mock()
>>> my_person_event_repository.read_cursors_from_db.return_value = None

# Step 1: Setup
their_partition_count = 1 # documented contract with server
zeh_session = requests.Session() # you can setup the authentication on the session
client = zeroeventhub.Client(their_service_url, their_partition_count, zeh_session)
>>> their_partition_count = 1 # documented contract with server
>>> their_service_url = "https://localhost:8192/person/feed/v1"
>>> my_zeh_session = httpx.AsyncClient() # you can setup the authentication on the session
>>> client = zeroeventhub.Client(their_service_url, their_partition_count, my_zeh_session)

# Step 2: Load the cursors from last time we ran
cursors = my_get_cursors_from_db()
if not cursors:
# we have never run before, so we can get all events with FIRST_CURSOR
# (if we just want to receive new events from now, we would use LAST_CURSOR)
cursors = [
zeroeventhub.Cursor(partition_id, zeroeventhub.FIRST_CURSOR)
for partition_id in range(their_partition_count)
]
>>> cursors = my_person_event_repository.read_cursors_from_db()
>>> if not cursors:
... # we have never run before, so we can get all events with FIRST_CURSOR
... # (if we just want to receive new events from now, we would use LAST_CURSOR)
... cursors = [
... zeroeventhub.Cursor(partition_id, zeroeventhub.FIRST_CURSOR)
... for partition_id in range(their_partition_count)
... ]

# Step 3: Enter listening loop...
page_of_events = PageEventReceiver()
while myStillWantToReadEvents:
# Step 4: Use ZeroEventHub client to fetch the next page of events.
client.fetch_events(
cursors,
my_page_size_hint,
page_of_events
)
>>> my_still_want_to_read_events = PropertyMock(side_effect=[True, False])

>>> async def poll_for_events(cursors: Sequence[zeroeventhub.Cursor]) -> None:
... page_of_events = zeroeventhub.PageEventReceiver()
... while my_still_want_to_read_events():
... # Step 4: Use ZeroEventHub client to fetch the next page of events.
... await zeroeventhub.receive_events(page_of_events,
... client.fetch_events(cursors),
... )
...
... # Step 5: Write the effect of changes to our own database and the updated
... # cursor value in the same transaction.
... with my_db.begin_transaction() as tx:
... my_person_event_repository.write_effect_of_events_to_db(tx, page_of_events.events)
... my_person_event_repository.write_cursors_to_db(tx, page_of_events.latest_checkpoints)
... tx.commit()
...
... cursors = page_of_events.latest_checkpoints
... page_of_events.clear()

>>> asyncio.run(poll_for_events(cursors))

# Step 5: Write the effect of changes to our own database and the updated
# cursor value in the same transaction.
with db.begin_transaction() as tx:
my_write_effect_of_events_to_db(tx, page_of_events.events)
```

my_write_cursors_to_db(tx, page_of_events.latest_checkpoints)
## Server

tx.commit()
This library makes it easy to setup a zeroeventhub feed endpoint with FastAPI.

cursors = page_of_events.latest_checkpoints
```python
>>> from typing import Annotated, Any, AsyncGenerator, Dict, Optional, Sequence
>>> from fastapi import Depends, FastAPI, Request
>>> from fastapi.responses import StreamingResponse
>>> from zeroeventhub import (
... Cursor,
... DataReader,
... ZeroEventHubFastApiHandler,
... )
>>> from unittest.mock import Mock

>>> app = FastAPI()

>>> PersonEventRepository = Mock

>>> class PersonDataReader(DataReader):
... def __init__(self, person_event_repository: PersonEventRepository) -> None:
... self._person_event_repository = person_event_repository
...
... def get_data(
... self, cursors: Sequence[Cursor], headers: Optional[Sequence[str]], page_size: Optional[int]
... ) -> AsyncGenerator[Dict[str, Any], Any]:
... return (
... self._person_event_repository.get_events_since(cursors[0].cursor)
... .take(page_size)
... .with_headers(headers)
... )

>>> def get_person_data_reader() -> PersonDataReader:
... return PersonDataReader(PersonEventRepository())

>>> PersonDataReaderDependency = Annotated[
... PersonDataReader,
... Depends(get_person_data_reader, use_cache=True),
... ]

>>> @app.get("person/feed/v1")
... async def feed(request: Request, person_data_reader: PersonDataReaderDependency) -> StreamingResponse:
... api_handler = ZeroEventHubFastApiHandler(data_reader=person_data_reader, server_partition_count=1)
... return api_handler.handle(request)

page_of_events.clear()
```

## Development
Expand Down
Loading

0 comments on commit a8f3297

Please sign in to comment.