Skip to content

Commit

Permalink
less transformations in pivoted train data
Browse files Browse the repository at this point in the history
  • Loading branch information
robinklaassen committed Nov 1, 2024
1 parent 363555e commit 91e7a43
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 17 deletions.
15 changes: 9 additions & 6 deletions aid/provide/influx_trains.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
class InfluxTrainProvider(BaseProvider):
# TODO better name, changing from postgres to influx kind of on the fly now

def get_trains(self, start: datetime, end: datetime) -> list[TrainRecord]:
def get_trains(self, start: datetime, end: datetime) -> pd.DataFrame:
"""
Get train data from InfluxDB and return a list of pydantic records.
Get train data from InfluxDB as a data frame.
Learnings:
- truncateTimeColumn sounds nice but is ridiculously slow, insert rounded or use pandas
Expand Down Expand Up @@ -61,8 +61,13 @@ def get_trains(self, start: datetime, end: datetime) -> list[TrainRecord]:
time_geo_done = perf_counter()
logger.debug("Time geo conversion done", duration=time_geo_done - time_rounding_done)

# print(gdf.to_dict("records"))
# TODO return the (geo)dataframe, to prevent needless conversion for pivoted_trains
return gdf

def get_trains_as_records(self, start: datetime, end: datetime) -> list[TrainRecord]:
"""
Get train data from InfluxDB as a list of pydantic records.
"""
gdf = self.get_trains(start, end)

output = [
TrainRecord(
Expand All @@ -77,8 +82,6 @@ def get_trains(self, start: datetime, end: datetime) -> list[TrainRecord]:
)
for r in gdf.to_dict("records") # TODO this could be quicker
]
time_format_done = perf_counter()
logger.debug("Format to list of records done", duration=time_format_done - time_geo_done)

return output

Expand Down
25 changes: 14 additions & 11 deletions aid/provide/routers/trains.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
from typing import TypeAlias

import pandas as pd
from fastapi import APIRouter, Security, status
from fastapi import APIRouter, Security
from pydantic import BaseModel
from starlette.responses import Response

from aid.constants import DEFAULT_TIMEZONE
from aid.logger import logger
Expand All @@ -33,7 +32,7 @@ class TrainLocation(BaseModel):
def _records(start: datetime | None = None, end: datetime | None = None) -> list[TrainRecord]:
end = end or datetime.now(DEFAULT_TIMEZONE)
start = start or end - timedelta(seconds=10)
return InfluxTrainProvider().get_trains(start, end)
return InfluxTrainProvider().get_trains_as_records(start, end)


@router.get("/records")
Expand Down Expand Up @@ -77,12 +76,16 @@ def get_pivoted_data(start: datetime | None = None, end: datetime | None = None)
Get train locations pivoted for use in TouchDesigner.
Start and end parameters work the same as in `/records`.
"""
records = _records(start, end)
if not records:
print("No records")
return Response(status_code=status.HTTP_204_NO_CONTENT) # type: ignore

df = pd.DataFrame.from_records([rec.model_dump() for rec in records])
end = end or datetime.now(DEFAULT_TIMEZONE)
start = start or end - timedelta(seconds=10)
df = InfluxTrainProvider().get_trains(start, end)
df = df.rename(
columns={
"_time": "timestamp",
"train_id": "id",
"train_type": "type",
}
)

# Scale x and y to a [-1,1] square area
# RDS range is 0 < x 280 and 300 < y < 625 (km)
Expand Down Expand Up @@ -117,7 +120,7 @@ def get_train_types(start: datetime | None = None, end: datetime | None = None)
if __name__ == "__main__":
time_start = perf_counter()
start = datetime.now(DEFAULT_TIMEZONE) - timedelta(hours=3)
# data = get_pivoted_data(start=start)
data = get_train_types(start=start)
data = get_pivoted_data(start=start)
# data = get_train_types(start=start)
print(data)
logger.info("Local run done", duration=perf_counter() - time_start)

0 comments on commit 91e7a43

Please sign in to comment.