Skip to content

Commit

Permalink
Update to Atoti Python API 0.9.2 (#289)
Browse files Browse the repository at this point in the history
  • Loading branch information
tibdex authored Dec 6, 2024
1 parent 077e0de commit 098de4d
Show file tree
Hide file tree
Showing 11 changed files with 542 additions and 380 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- uses: astral-sh/setup-uv@v3
with:
enable-cache: true
version: "0.4.10"
version: "0.5.6"
- run: uv python install 3.10
- run: uv sync --locked
- run: uv run ruff format --check
Expand Down
14 changes: 10 additions & 4 deletions app/__main__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import asyncio
from urllib.parse import urlparse

from . import Config, start_app

with start_app(config=Config()) as session:
port = urlparse(session.url) or 80
print(f"Session listening on port {port}") # noqa: T201
session.wait()

async def main() -> None:
async with start_app(config=Config()) as session:
port = urlparse(session.url).port or 80
print(f"Session listening on port {port}") # noqa: T201
await asyncio.to_thread(session.wait)


asyncio.run(main())
18 changes: 9 additions & 9 deletions app/create_and_join_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,22 @@
def create_station_status_table(session: tt.Session, /) -> None:
session.create_table(
Table.STATION_STATUS.value,
keys=[
StationStatusTableColumn.STATION_ID.value,
StationStatusTableColumn.BIKE_TYPE.value,
],
types={
data_types={
StationStatusTableColumn.STATION_ID.value: tt.LONG,
StationStatusTableColumn.BIKE_TYPE.value: tt.STRING,
StationStatusTableColumn.BIKES.value: tt.INT,
},
keys={
StationStatusTableColumn.STATION_ID.value,
StationStatusTableColumn.BIKE_TYPE.value,
},
)


def create_station_details_table(session: tt.Session, /) -> None:
session.create_table(
Table.STATION_DETAILS.value,
keys=[
StationDetailsTableColumn.ID.value,
],
types={
data_types={
StationDetailsTableColumn.ID.value: tt.LONG,
StationDetailsTableColumn.NAME.value: tt.STRING,
StationDetailsTableColumn.DEPARTMENT.value: tt.STRING,
Expand All @@ -35,6 +32,9 @@ def create_station_details_table(session: tt.Session, /) -> None:
StationDetailsTableColumn.CAPACITY.value: tt.INT,
},
default_values={StationDetailsTableColumn.POSTCODE.value: 0},
keys={
StationDetailsTableColumn.ID.value,
},
)


Expand Down
54 changes: 41 additions & 13 deletions app/load_tables.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import asyncio
from collections.abc import Iterable, Mapping
from pathlib import Path
from typing import Any, cast

import atoti as tt
import httpx
import pandas as pd
from pydantic import HttpUrl

Expand All @@ -11,14 +13,19 @@
from .util import read_json, reverse_geocode


def read_station_details(
async def read_station_details(
*,
http_client: httpx.AsyncClient,
reverse_geocoding_path: HttpUrl | Path,
velib_data_base_path: HttpUrl | Path,
) -> pd.DataFrame:
stations_data: Any = cast(
Any,
read_json(velib_data_base_path, Path("station_information.json")),
await read_json(
velib_data_base_path,
Path("station_information.json"),
http_client=http_client,
),
)["data"]["stations"]
station_information_df = pd.DataFrame(stations_data)[
["station_id", "name", "capacity", "lat", "lon"]
Expand Down Expand Up @@ -58,10 +65,19 @@ def read_station_details(
).drop(columns=coordinates_column_names)


def read_station_status(velib_data_base_path: HttpUrl | Path, /) -> pd.DataFrame:
async def read_station_status(
velib_data_base_path: HttpUrl | Path,
/,
*,
http_client: httpx.AsyncClient,
) -> pd.DataFrame:
stations_data = cast(
Any,
read_json(velib_data_base_path, Path("station_status.json")),
await read_json(
velib_data_base_path,
Path("station_status.json"),
http_client=http_client,
),
)["data"]["stations"]
station_statuses: list[Mapping[str, Any]] = []
for station_status in stations_data:
Expand All @@ -83,15 +99,27 @@ def read_station_status(velib_data_base_path: HttpUrl | Path, /) -> pd.DataFrame
return pd.DataFrame(station_statuses)


def load_tables(session: tt.Session, /, *, config: Config) -> None:
station_details_df = read_station_details(
reverse_geocoding_path=config.reverse_geocoding_path,
velib_data_base_path=config.velib_data_base_path,
)
station_status_df = read_station_status(
config.velib_data_base_path,
async def load_tables(
session: tt.Session,
/,
*,
config: Config,
http_client: httpx.AsyncClient,
) -> None:
station_details_df, station_status_df = await asyncio.gather(
read_station_details(
http_client=http_client,
reverse_geocoding_path=config.reverse_geocoding_path,
velib_data_base_path=config.velib_data_base_path,
),
read_station_status(
config.velib_data_base_path,
http_client=http_client,
),
)

with session.tables.data_transaction():
session.tables[Table.STATION_DETAILS.value].load_pandas(station_details_df)
session.tables[Table.STATION_STATUS.value].load_pandas(station_status_df)
await asyncio.gather(
session.tables[Table.STATION_DETAILS.value].load_async(station_details_df),
session.tables[Table.STATION_STATUS.value].load_async(station_status_df),
)
16 changes: 9 additions & 7 deletions app/start_app.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
from collections.abc import Generator
from contextlib import contextmanager, nullcontext
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager, nullcontext

import atoti as tt
import httpx

from .config import Config
from .load_tables import load_tables
from .start_session import start_session
from .util import run_periodically


@contextmanager
def start_app(*, config: Config) -> Generator[tt.Session, None, None]:
with (
start_session(config=config) as session,
@asynccontextmanager
async def start_app(*, config: Config) -> AsyncGenerator[tt.Session]:
async with (
httpx.AsyncClient() as http_client,
start_session(config=config, http_client=http_client) as session,
run_periodically(
lambda: load_tables(session, config=config),
lambda: load_tables(session, config=config, http_client=http_client),
period=config.data_refresh_period,
)
if config.data_refresh_period
Expand Down
15 changes: 10 additions & 5 deletions app/start_session.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import sys
from collections.abc import Generator
from contextlib import contextmanager
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from pathlib import Path

import atoti as tt
import httpx
from atoti_jdbc import UserContentStorageConfig

from .config import Config
Expand All @@ -29,12 +30,16 @@ def get_session_config(config: Config, /) -> tt.SessionConfig:
)


@contextmanager
def start_session(*, config: Config) -> Generator[tt.Session, None, None]:
@asynccontextmanager
async def start_session(
*,
config: Config,
http_client: httpx.AsyncClient,
) -> AsyncGenerator[tt.Session]:
"""Start the session, declare the data model and load the initial data."""
session_config = get_session_config(config)
with tt.Session.start(session_config) as session:
create_and_join_tables(session)
create_cubes(session)
load_tables(session, config=config)
await load_tables(session, config=config, http_client=http_client)
yield session
12 changes: 9 additions & 3 deletions app/util/read_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@
from pydantic import HttpUrl


def read_json(base_path: HttpUrl | Path, file_path: Path, /) -> object:
async def read_json(
base_path: HttpUrl | Path,
file_path: Path,
/,
*,
http_client: httpx.AsyncClient,
) -> object:
if isinstance(base_path, Path):
return json.loads((base_path / file_path).read_bytes())

url = f"{base_path}/{file_path.as_posix()}"
response = httpx.get(url).raise_for_status()
return response.json()
response = await http_client.get(url)
return response.raise_for_status().json()
36 changes: 21 additions & 15 deletions app/util/run_periodically.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
from collections.abc import Callable, Generator
from contextlib import contextmanager
import asyncio
from collections.abc import AsyncGenerator, Awaitable, Callable
from contextlib import asynccontextmanager
from datetime import timedelta
from threading import Event, Thread


@contextmanager
def run_periodically(
callback: Callable[[], None], /, *, daemon: bool | None = None, period: timedelta
) -> Generator[None, None, None]:
@asynccontextmanager
async def run_periodically(
callback: Callable[[], Awaitable[None]],
/,
*,
period: timedelta,
) -> AsyncGenerator[None]:
period_in_seconds = period.total_seconds()
stopped = Event()
stopped = asyncio.Event()

def loop() -> None:
while not stopped.wait(period_in_seconds):
callback()
async def loop() -> None:
while not stopped.is_set():
await callback()
await asyncio.sleep(period_in_seconds)

Thread(target=loop, daemon=daemon).start()
task = asyncio.create_task(loop())

yield

stopped.set()
try:
yield
finally:
stopped.set()
await task
12 changes: 11 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ module = ["docker", "docker.*"]
ignore_missing_imports = true

[tool.pytest.ini_options]
addopts = "--strict-markers"
asyncio_default_fixture_loop_scope = "session"
asyncio_mode = "auto"
filterwarnings = ["error"]

[tool.ruff.lint]
Expand Down Expand Up @@ -59,4 +62,11 @@ pandas = "pd"
combine-as-imports = true

[tool.uv]
dev-dependencies = ["docker", "mypy", "pandas-stubs", "pytest", "ruff"]
dev-dependencies = [
"docker",
"mypy",
"pandas-stubs",
"pytest",
"pytest-asyncio",
"ruff",
]
6 changes: 3 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections.abc import Generator
from collections.abc import AsyncGenerator
from pathlib import Path

import atoti as tt
Expand Down Expand Up @@ -32,6 +32,6 @@ def config_fixture() -> Config:
# Don't use this fixture in tests mutating the app or its underlying session.
scope="session",
)
def session_fixture(config: Config) -> Generator[tt.Session, None, None]:
with start_app(config=config) as session:
async def session_fixture(config: Config) -> AsyncGenerator[tt.Session]:
async with start_app(config=config) as session:
yield session
Loading

0 comments on commit 098de4d

Please sign in to comment.