-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for ingesting CSV files #2
Changes from 20 commits
f1ad858
9db8060
bbb0597
f5e3f38
4c97868
88622f6
321c5a5
87ac971
3b763a1
8803850
ecbed2e
c350ad6
f7761b1
927af4b
db08097
a4a128e
c2b125c
eb51d55
43d21af
13021e9
8c43c69
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,12 +41,17 @@ jobs: | |
verbose: true | ||
mypy: | ||
runs-on: ubuntu-latest | ||
name: "mypy" | ||
steps: | ||
- uses: davidslusser/[email protected] | ||
with: | ||
src: "src" | ||
options: "--ignore-missing-imports" | ||
- uses: actions/checkout@v4 | ||
- name: Set up Python | ||
uses: actions/setup-python@v5 | ||
with: | ||
python-version: 3.12 | ||
- name: Install dependencies | ||
run: | | ||
python -m pip install --upgrade pip | ||
python -m pip install ".[dev]" | ||
mypy | ||
ruff: | ||
runs-on: ubuntu-latest | ||
name: "ruff" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
from datetime import datetime, timedelta | ||
|
||
import duckdb | ||
import pandas as pd | ||
import polars as pl | ||
from IPython import get_ipython | ||
|
||
from sqlalchemy import Double, text | ||
from chronify.models import ColumnDType, CsvTableSchema, TableSchema | ||
from chronify.store import Store | ||
from chronify.time import TimeIntervalType, TimeZone | ||
from chronify.time_configs import DatetimeRange | ||
|
||
|
||
GENERATOR_TIME_SERIES_FILE = "tests/data/gen.csv" | ||
|
||
|
||
def read_duckdb(conn, name: str): | ||
return conn.sql(f"SELECT * FROM {name}").df() | ||
|
||
|
||
def read_pandas(store: Store, name: str): | ||
with store.engine.begin() as conn: | ||
query = f"select * from {name}" | ||
return pd.read_sql(query, conn) | ||
|
||
|
||
def read_polars(store: Store, name: str): | ||
with store.engine.begin() as conn: | ||
query = f"select * from {name}" | ||
return pl.read_database(query, connection=conn).to_pandas() | ||
|
||
|
||
def read_sqlalchemy(store: Store, name: str): | ||
with store.engine.begin() as conn: | ||
query = f"select * from {name}" | ||
res = conn.execute(text(query)).fetchall() | ||
return pd.DataFrame.from_records(res, columns=["timestamp", "generator", "value"]) | ||
|
||
|
||
def setup(): | ||
time_config = DatetimeRange( | ||
start=datetime(year=2020, month=1, day=1), | ||
resolution=timedelta(hours=1), | ||
length=8784, | ||
interval_type=TimeIntervalType.PERIOD_BEGINNING, | ||
time_columns=["timestamp"], | ||
time_zone=TimeZone.UTC, | ||
) | ||
|
||
src_schema = CsvTableSchema( | ||
time_config=time_config, | ||
column_dtypes=[ | ||
ColumnDType(name="gen1", dtype=Double), | ||
ColumnDType(name="gen2", dtype=Double), | ||
ColumnDType(name="gen3", dtype=Double), | ||
], | ||
value_columns=["gen1", "gen2", "gen3"], | ||
pivoted_dimension_name="generator", | ||
time_array_id_columns=[], | ||
lixiliu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
dst_schema = TableSchema( | ||
name="generators", | ||
time_config=time_config, | ||
time_array_id_columns=["generator"], | ||
value_column="value", | ||
) | ||
return src_schema, dst_schema | ||
|
||
|
||
def run_test(engine_name: str): | ||
store = Store(engine_name=engine_name) | ||
src_schema, dst_schema = setup() | ||
store.ingest_from_csv(GENERATOR_TIME_SERIES_FILE, src_schema, dst_schema) | ||
ipython = get_ipython() | ||
df = read_polars(store, dst_schema.name) # noqa: F841 | ||
conn = duckdb.connect(":memory:") | ||
conn.sql("CREATE OR REPLACE TABLE perf_test AS SELECT * from df") | ||
print(f"Run {engine_name} database with read_duckdb.") | ||
ipython.run_line_magic("timeit", "read_duckdb(conn, 'perf_test')") | ||
print(f"Run {engine_name} database with read_pandas.") | ||
ipython.run_line_magic("timeit", "read_pandas(store, dst_schema.name)") | ||
print(f"Run {engine_name} database with read_polars.") | ||
ipython.run_line_magic("timeit", "read_polars(store, dst_schema.name)") | ||
print(f"Run {engine_name} database with read_sqlalchemy.") | ||
ipython.run_line_magic("timeit", "read_sqlalchemy(store, dst_schema.name)") | ||
|
||
|
||
run_test("duckdb") | ||
run_test("sqlite") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
import importlib.metadata as metadata | ||
|
||
|
||
__version__ = metadata.metadata("chronify")["Version"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
from typing import Any | ||
|
||
from pydantic import BaseModel, ConfigDict | ||
|
||
|
||
def make_model_config(**kwargs: Any) -> Any: | ||
"""Return a Pydantic config""" | ||
return ConfigDict( | ||
str_strip_whitespace=True, | ||
validate_assignment=True, | ||
validate_default=True, | ||
extra="forbid", | ||
use_enum_values=False, | ||
arbitrary_types_allowed=True, | ||
populate_by_name=True, | ||
**kwargs, # type: ignore | ||
) | ||
|
||
|
||
class ChronifyBaseModel(BaseModel): | ||
"""Base model for all chronify data models""" | ||
|
||
model_config = make_model_config() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
"""Common definitions for the package""" | ||
|
||
VALUE_COLUMN = "value" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
from pathlib import Path | ||
from typing import Any | ||
|
||
import duckdb | ||
from duckdb import DuckDBPyRelation | ||
|
||
from chronify.models import CsvTableSchema, get_duckdb_type_from_sqlalchemy | ||
|
||
|
||
def read_csv(path: Path | str, schema: CsvTableSchema, **kwargs: Any) -> DuckDBPyRelation: | ||
"""Read a CSV file into a DuckDB relation.""" | ||
if schema.column_dtypes: | ||
dtypes = {x.name: get_duckdb_type_from_sqlalchemy(x.dtype) for x in schema.column_dtypes} | ||
rel = duckdb.read_csv(str(path), dtype=dtypes, **kwargs) | ||
else: | ||
rel = duckdb.read_csv(str(path), **kwargs) | ||
|
||
expr = ",".join(rel.columns) | ||
return duckdb.sql(f"SELECT {expr} FROM rel") | ||
Comment on lines
+10
to
+19
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we catch any errors or just let duckdb to trow and error for a badly format csv There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. DuckDB should have it covered. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
from collections.abc import Iterable | ||
from datetime import datetime, timedelta | ||
from pathlib import Path | ||
|
||
import duckdb | ||
from duckdb import DuckDBPyRelation | ||
|
||
|
||
def add_datetime_column( | ||
rel: DuckDBPyRelation, | ||
start: datetime, | ||
resolution: timedelta, | ||
length: int, | ||
time_array_id_columns: Iterable[str], | ||
time_column: str, | ||
timestamps: list[datetime], | ||
) -> DuckDBPyRelation: | ||
"""Add a datetime column to the relation.""" | ||
# TODO | ||
raise NotImplementedError | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like we'll need two handling, one without tz-offset and one with. |
||
# values = [] | ||
# columns = ",".join(rel.columns) | ||
# return duckdb.sql( | ||
# f""" | ||
# SELECT | ||
# AS {time_column} | ||
# ,{columns} | ||
# FROM rel | ||
# """ | ||
# ) | ||
|
||
|
||
def make_write_parquet_query(table_or_view: str, file_path: Path | str) -> str: | ||
"""Make an SQL string that can be used to write a Parquet file from a table or view.""" | ||
# TODO: Hive partitioning? | ||
return f""" | ||
COPY | ||
(SELECT * FROM {table_or_view}) | ||
TO '{file_path}' | ||
(FORMAT 'parquet'); | ||
""" | ||
|
||
|
||
def unpivot( | ||
rel: DuckDBPyRelation, | ||
pivoted_columns: Iterable[str], | ||
name_column: str, | ||
value_column: str, | ||
) -> DuckDBPyRelation: | ||
pivoted_str = ",".join(pivoted_columns) | ||
|
||
query = f""" | ||
SELECT * FROM rel | ||
UNPIVOT INCLUDE NULLS ( | ||
{value_column} | ||
FOR {name_column} in ({pivoted_str}) | ||
) | ||
""" | ||
return duckdb.sql(query) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
"""Contains logging functionality.""" | ||
|
||
import sys | ||
from pathlib import Path | ||
from typing import Iterable, Optional, Union | ||
|
||
from loguru import logger | ||
|
||
|
||
# Logger printing formats | ||
DEFAULT_FORMAT = "<level>{level}</level>: {message}" | ||
DEBUG_FORMAT = ( | ||
"<green>{time:YYYY-MM-DD HH:mm:ss}</green> | " | ||
"<level>{level: <7}</level> | " | ||
"<cyan>{name}:{line}</cyan> | " | ||
"{message}" | ||
) | ||
|
||
|
||
def setup_logging( | ||
filename: Optional[Union[str, Path]] = None, | ||
console_level: str = "INFO", | ||
file_level: str = "DEBUG", | ||
mode: str = "w", | ||
rotation: Optional[str] = "10 MB", | ||
packages: Optional[Iterable[str]] = None, | ||
) -> None: | ||
"""Configures logging to file and console. | ||
|
||
Parameters | ||
---------- | ||
filename | ||
Log filename, defaults to None for no file logging. | ||
console_level | ||
Console logging level | ||
file_level | ||
File logging level | ||
mode | ||
Mode in which to open the file | ||
rotation | ||
Size in which to rotate file. Set to None for no rotation. | ||
packages | ||
Additional packages to enable logging | ||
""" | ||
logger.remove() | ||
logger.enable("chronify") | ||
for pkg in packages or []: | ||
logger.enable(pkg) | ||
|
||
logger.add(sys.stderr, level=console_level, format=DEFAULT_FORMAT) | ||
if filename: | ||
logger.add( | ||
filename, | ||
level=file_level, | ||
mode=mode, | ||
rotation=rotation, | ||
format=DEBUG_FORMAT, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pesap My take is to remove it. Running it here doesn't use pyproject.toml. We would have to make other changes. The action for mypy does follow pyproject.toml.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you pass language system it uses the mypy that you have installed from the environment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But that still doesn't respect settings in pyproject.toml. With the changes here, if the developer runs
mypy
in the terminal, the right things will happen. The CI job does that as a check. The downside is that pre-commit doesn't run mypy. We would have to add duplicate settings in the pre-commit config file.