Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for ingesting CSV files #2

Merged
merged 21 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,22 @@ classifiers = [
"Programming Language :: Python :: Implementation :: PyPy",
]
dependencies = [
"duckdb~=1.0.0",
"duckdb ~= 1.1.0",
"duckdb_engine",
"loguru",
"rich",
"pandas >= 2.2, < 3",
"polars ~= 1.11.0",
"pyarrow",
"pydantic >= 2.7, < 3",
"sqlalchemy",
"pytz",
"rich",
"sqlalchemy == 2.0.35", # 2.0.36 introduced code that duckdb_engine doesn't handle.
"tzdata",
]
[project.optional-dependencies]
dev = [
"mypy",
"pandas-stubs",
"pre-commit",
"pytest",
"pytest-cov",
Expand All @@ -47,6 +53,12 @@ Documentation = "https://github.com/NREL/chronify#readme"
Issues = "https://github.com/NREL/chronify/issues"
Source = "https://github.com/NREL/chronify"

[tool.mypy]
files = [
"src",
"tests",
]

[tool.pytest.ini_options]
pythonpath = "src"
minversion = "6.0"
Expand Down
90 changes: 90 additions & 0 deletions scripts/perf_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from datetime import datetime, timedelta

import duckdb
import pandas as pd
import polars as pl
from IPython import get_ipython

from sqlalchemy import Double, text
from chronify.models import ColumnDType, CsvTableSchema, TableSchema
from chronify.store import Store
from chronify.time import TimeIntervalType, TimeZone
from chronify.time_configs import DatetimeRange


GENERATOR_TIME_SERIES_FILE = "tests/data/gen.csv"


def read_duckdb(conn, name: str):
return conn.sql(f"SELECT * FROM {name}").df()


def read_pandas(store: Store, name: str):
with store.engine.begin() as conn:
query = f"select * from {name}"
return pd.read_sql(query, conn)


def read_polars(store: Store, name: str):
with store.engine.begin() as conn:
query = f"select * from {name}"
return pl.read_database(query, connection=conn).to_pandas()


def read_sqlalchemy(store: Store, name: str):
with store.engine.begin() as conn:
query = f"select * from {name}"
res = conn.execute(text(query)).fetchall()
return pd.DataFrame.from_records(res, columns=["timestamp", "generator", "value"])


def setup():
time_config = DatetimeRange(
start=datetime(year=2020, month=1, day=1),
resolution=timedelta(hours=1),
length=8784,
interval_type=TimeIntervalType.PERIOD_BEGINNING,
time_columns=["timestamp"],
time_zone=TimeZone.UTC,
)

src_schema = CsvTableSchema(
time_config=time_config,
column_dtypes=[
ColumnDType(name="gen1", dtype=Double),
ColumnDType(name="gen2", dtype=Double),
ColumnDType(name="gen3", dtype=Double),
],
value_columns=["gen1", "gen2", "gen3"],
pivoted_dimension_name="generator",
time_array_id_columns=[],
lixiliu marked this conversation as resolved.
Show resolved Hide resolved
)
dst_schema = TableSchema(
name="generators",
time_config=time_config,
time_array_id_columns=["generator"],
value_column="value",
)
return src_schema, dst_schema


def run_test(engine_name: str):
store = Store(engine_name=engine_name)
src_schema, dst_schema = setup()
store.ingest_from_csv(GENERATOR_TIME_SERIES_FILE, src_schema, dst_schema)
ipython = get_ipython()
df = read_polars(store, dst_schema.name) # noqa: F841
conn = duckdb.connect(":memory:")
conn.sql("CREATE OR REPLACE TABLE perf_test AS SELECT * from df")
print(f"Run {engine_name} database with read_duckdb.")
ipython.run_line_magic("timeit", "read_duckdb(conn, 'perf_test')")
print(f"Run {engine_name} database with read_pandas.")
ipython.run_line_magic("timeit", "read_pandas(store, dst_schema.name)")
print(f"Run {engine_name} database with read_polars.")
ipython.run_line_magic("timeit", "read_polars(store, dst_schema.name)")
print(f"Run {engine_name} database with read_sqlalchemy.")
ipython.run_line_magic("timeit", "read_sqlalchemy(store, dst_schema.name)")


run_test("duckdb")
run_test("sqlite")
4 changes: 4 additions & 0 deletions src/chronify/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import importlib.metadata as metadata


__version__ = metadata.metadata("chronify")["Version"]
3 changes: 3 additions & 0 deletions src/chronify/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""Common definitions for the package"""

VALUE_COLUMN = "value"
39 changes: 39 additions & 0 deletions src/chronify/csv_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from pathlib import Path
from chronify.time import get_zone_info

import duckdb
from duckdb import DuckDBPyRelation

from chronify.models import CsvTableSchema, get_duckdb_type_from_sqlalchemy
from chronify.time_configs import DatetimeRange


def read_csv(path: Path | str, schema: CsvTableSchema, **kwargs) -> DuckDBPyRelation:
"""Read a CSV file into a DuckDB relation."""
if schema.column_dtypes:
dtypes = {x.name: get_duckdb_type_from_sqlalchemy(x.dtype) for x in schema.column_dtypes}
rel = duckdb.read_csv(str(path), dtype=dtypes, **kwargs)
else:
rel = duckdb.read_csv(str(path), **kwargs)

exprs = []
for column, dtype in zip(rel.columns, rel.types):
if dtype is duckdb.typing.TIMESTAMP:
if isinstance(schema.time_config, DatetimeRange):
if schema.time_config.time_zone is None:
msg = "time_zone cannot be None if the time zone is not part of the timestamp string"
raise ValueError(msg)
zone_info = get_zone_info(schema.time_config.time_zone)
else:
msg = f"need to add support for {type(schema.time_config)}"
raise NotImplementedError(msg)
expr = f"timezone('UTC', timezone({zone_info.key}, {column})) AS {column}"
elif dtype is duckdb.typing.TIMESTAMP_TZ:
daniel-thom marked this conversation as resolved.
Show resolved Hide resolved
msg = "no handling for timestamp with time zone yet"
raise NotImplementedError(msg)
# expr = f"timezone('UTC', {column}) AS {column}"
else:
expr = column
exprs.append(expr)
expr_str = ",".join(exprs)
return duckdb.sql(f"SELECT {expr_str} FROM rel")
Empty file added src/chronify/duckdb/__init__.py
Empty file.
59 changes: 59 additions & 0 deletions src/chronify/duckdb/functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from collections.abc import Iterable
from datetime import datetime, timedelta
from pathlib import Path

import duckdb
from duckdb import DuckDBPyRelation


def add_datetime_column(
rel: DuckDBPyRelation,
start: datetime,
resolution: timedelta,
length: int,
time_array_id_columns: Iterable[str],
time_column: str,
timestamps: list[datetime],
) -> DuckDBPyRelation:
"""Add a datetime column to the relation."""
# TODO
raise NotImplementedError
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we'll need two handling, one without tz-offset and one with.

# values = []
# columns = ",".join(rel.columns)
# return duckdb.sql(
# f"""
# SELECT
# AS {time_column}
# ,{columns}
# FROM rel
# """
# )


def make_write_parquet_query(table_or_view: str, file_path: Path | str) -> str:
"""Make an SQL string that can be used to write a Parquet file from a table or view."""
# TODO: Hive partitioning?
return f"""
COPY
(SELECT * FROM {table_or_view})
TO '{file_path}'
(FORMAT 'parquet');
"""


def unpivot(
rel: DuckDBPyRelation,
pivoted_columns: Iterable[str],
name_column: str,
value_column: str,
) -> DuckDBPyRelation:
pivoted_str = ",".join(pivoted_columns)

query = f"""
SELECT * FROM rel
UNPIVOT INCLUDE NULLS (
{value_column}
FOR {name_column} in ({pivoted_str})
)
"""
return duckdb.sql(query)
8 changes: 8 additions & 0 deletions src/chronify/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,13 @@ class ChronifyExceptionBase(Exception):
"""Base class for exceptions in this package"""


class ConflictingInputsError(ChronifyExceptionBase):
"""Raised when user inputs conflict with each other."""


class InvalidTable(ChronifyExceptionBase):
"""Raised when a table does not match its schema."""


class InvalidParameter(ChronifyExceptionBase):
"""Raised when an invalid parameter is passed."""
58 changes: 58 additions & 0 deletions src/chronify/loggers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""Contains logging functionality."""

import sys
from pathlib import Path
from typing import Iterable, Optional, Union

from loguru import logger


# Logger printing formats
DEFAULT_FORMAT = "<level>{level}</level>: {message}"
DEBUG_FORMAT = (
"<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
"<level>{level: <7}</level> | "
"<cyan>{name}:{line}</cyan> | "
"{message}"
)


def setup_logging(
filename: Optional[Union[str, Path]] = None,
console_level: str = "INFO",
file_level: str = "DEBUG",
mode: str = "w",
rotation: Optional[str] = "10 MB",
packages: Optional[Iterable] = None,
) -> None:
"""Configures logging to file and console.

Parameters
----------
filename
Log filename, defaults to None for no file logging.
console_level
Console logging level
file_level
File logging level
mode
Mode in which to open the file
rotation
Size in which to rotate file. Set to None for no rotation.
packages
Additional packages to enable logging
"""
logger.remove()
logger.enable("chronify")
for pkg in packages or []:
logger.enable(pkg)

logger.add(sys.stderr, level=console_level, format=DEFAULT_FORMAT)
if filename:
logger.add(
filename,
level=file_level,
mode=mode,
rotation=rotation,
format=DEBUG_FORMAT,
)
Loading
Loading