From 1042115bb27541a2eb3c4573cb963f0f8c422e14 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sun, 12 Nov 2023 01:02:21 +0100 Subject: [PATCH] Feature: Export from InfluxDB and import into RDBMS Using SQLAlchemy/pandas/Dask. --- .github/workflows/tests.yml | 33 +++- .gitignore | 2 +- CHANGES.rst | 2 + README.rst | 20 +-- doc/backlog.rst | 33 ++-- examples/export_lineprotocol.py | 57 +++++++ examples/export_sqlalchemy.py | 58 +++++++ influxio/core.py | 52 ++++--- influxio/io.py | 122 +++++++++++++++ influxio/model.py | 194 ++++++++++++++++++------ influxio/testdata.py | 5 +- influxio/util/common.py | 2 +- influxio/util/compat.py | 13 ++ influxio/util/db.py | 14 ++ influxio/util/report.py | 6 +- pyproject.toml | 22 ++- tests/conftest.py | 2 +- tests/test_adapter.py | 36 +++++ tests/test_examples.py | 97 ++++++++++++ tests/test_export.py | 112 ++++++++++++++ tests/{test_write.py => test_import.py} | 20 +-- 21 files changed, 791 insertions(+), 111 deletions(-) create mode 100644 examples/export_lineprotocol.py create mode 100644 examples/export_sqlalchemy.py create mode 100644 influxio/io.py create mode 100644 influxio/util/compat.py create mode 100644 influxio/util/db.py create mode 100644 tests/test_adapter.py create mode 100644 tests/test_examples.py create mode 100644 tests/test_export.py rename tests/{test_write.py => test_import.py} (83%) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index f138861..cf3c0bc 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -1,10 +1,9 @@ name: Tests on: + pull_request: ~ push: branches: [ main ] - pull_request: - branches: [ main ] # Allow job to be triggered manually. workflow_dispatch: @@ -22,10 +21,11 @@ jobs: strategy: matrix: os: ["ubuntu-latest"] - python-version: ["3.10", "3.11"] - influxdb-version: [ "2.6" ] + python-version: ["3.8", "3.11"] + influxdb-version: [ "2.6", "2.7" ] fail-fast: false + # https://docs.github.com/en/actions/using-containerized-services/about-service-containers services: influxdb: @@ -41,24 +41,41 @@ jobs: DOCKER_INFLUXDB_INIT_BUCKET: default DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: token + cratedb: + image: crate/crate:nightly + ports: + - 4200:4200 + env: + CRATE_HEAP_SIZE: 1g + + postgresql: + image: postgres:16 + ports: + - 5432:5432 + env: + POSTGRES_HOST_AUTH_METHOD: trust + env: OS: ${{ matrix.os }} PYTHON: ${{ matrix.python-version }} - name: Python ${{ matrix.python-version }} on OS ${{ matrix.os }} + name: " + Python ${{ matrix.python-version }}, + InfluxDB ${{ matrix.influxdb-version }}, + OS ${{ matrix.os }}" steps: - name: Acquire sources uses: actions/checkout@v3 # Install InfluxDB CLI tools. - - name: Setup InfluxDB CLI + - name: Set up InfluxDB CLI uses: influxdata/influxdb-action@v3 with: influxdb_version: 2.6.1 influxdb_start: false - - name: Setup Python + - name: Set up Python uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} @@ -66,7 +83,7 @@ jobs: cache: 'pip' cache-dependency-path: 'pyproject.toml' - - name: Setup project + - name: Set up project run: | # `setuptools 0.64.0` adds support for editable install hooks (PEP 660). diff --git a/.gitignore b/.gitignore index aa8396f..04fc6b3 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,6 @@ .DS_Store __pycache__ *.egg-info -.coverage +.coverage* coverage.xml /example_*.py diff --git a/CHANGES.rst b/CHANGES.rst index f8a72ed..8923c8e 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -10,6 +10,8 @@ in progress - Tests: Add test for ``influxio info`` - Feature: Add reading line protocol format from file - Feature: Add reading line protocol format from URL +- Feature: Export from InfluxDB and import into RDBMS, + using SQLAlchemy/pandas/Dask 2023-xx-xx 0.1.0 diff --git a/README.rst b/README.rst index a4bef45..d759d46 100644 --- a/README.rst +++ b/README.rst @@ -37,7 +37,7 @@ Synopsis # Export from API to database. influxio copy \ "http://example:token@localhost:8086/testdrive/demo" \ - "sqlite://export.sqlite" + "sqlite://export.sqlite?table=demo" ********** @@ -52,7 +52,7 @@ just use the OCI image on Podman or Docker. docker run --rm --network=host ghcr.io/daq-tools/influxio \ influxio copy \ "http://example:token@localhost:8086/testdrive/demo" \ - "crate://crate@localhost:4200/testdrive" + "crate://crate@localhost:4200/testdrive/demo" ***** @@ -91,7 +91,7 @@ instances is to use Podman or Docker. --env=DOCKER_INFLUXDB_INIT_BUCKET=default \ --env=DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=token \ --volume="$PWD/var/lib/influxdb2:/var/lib/influxdb2" \ - influxdb:2.6 + influxdb:2.7 - https://github.com/docker-library/docs/blob/master/influxdb/README.md @@ -99,7 +99,7 @@ instances is to use Podman or Docker. docker run --rm -it --publish=4200:4200 --publish=5432:5432 \ --volume="$PWD/var/lib/cratedb:/data" \ - crate:5.2 -Cdiscovery.type=single-node + crate:5.5 -Cdiscovery.type=single-node - https://github.com/docker-library/docs/blob/master/crate/README.md @@ -155,12 +155,12 @@ Export # From API to database file. influxio copy \ "http://example:token@localhost:8086/testdrive/demo" \ - "sqlite://export.sqlite" + "sqlite://export.sqlite?table=demo" # From API to database server. influxio copy \ "http://example:token@localhost:8086/testdrive/demo" \ - "crate://crate@localhost:4200/testdrive" + "crate://crate@localhost:4200/testdrive?table=demo" # From API to line protocol file. influxio copy \ @@ -175,7 +175,7 @@ Export # From line protocol file to database. influxio copy \ "file://export.lp" \ - "sqlite://export.sqlite" + "sqlite://export.sqlite?table=export" OCI --- @@ -197,11 +197,11 @@ or use the ``--interactive`` option to consume STDIN, like: .. code-block:: sh docker run --rm --volume=$(pwd):/data ghcr.io/daq-tools/influxio \ - influxio copy "file:///data/export.lp" "sqlite:///data/export.sqlite" + influxio copy "file:///data/export.lp" "sqlite:///data/export.sqlite?table=export" cat export.lp | \ docker run --rm --interactive --network=host ghcr.io/daq-tools/influxio \ - influxio copy "stdin://?format=lp" "crate://crate@localhost:4200/testdrive" + influxio copy "stdin://?format=lp" "crate://crate@localhost:4200/testdrive/export" In order to always run the latest ``nightly`` development version, and to use a shortcut for that, this section outlines how to use an alias for ``influxio``, @@ -213,7 +213,7 @@ keystrokes on subsequent invocations. docker pull ghcr.io/daq-tools/influxio:nightly alias influxio="docker run --rm --interactive ghcr.io/daq-tools/influxio:nightly influxio" SOURCE=https://github.com/daq-tools/influxio/raw/main/tests/testdata/basic.lp - TARGET=crate://crate@localhost:4200/testdrive + TARGET=crate://crate@localhost:4200/testdrive/basic influxio copy "${SOURCE}" "${TARGET}" diff --git a/doc/backlog.rst b/doc/backlog.rst index 835740e..75ce76c 100644 --- a/doc/backlog.rst +++ b/doc/backlog.rst @@ -3,20 +3,32 @@ influxio backlog ################ -*********** -Iteration 1 -*********** - +************ +Iteration +1 +************ - [x] Add project boilerplate -- [o] Add initial working version -- [o] Refinements +- [x] Make it work +- [x] Export to SQLite, PostgreSQL, and CrateDB +- [x] Fix documentation about crate:// target - [o] Release 0.1.0 -*********** -Iteration 2 -*********** +************ +Iteration +2 +************ +- [o] Tests using ``assert_dataframe_equal``? +- [o] Fix ``crate.client.sqlalchemy.dialect.DateTime`` re. ``TimezoneUnawareException`` +- [o] Add Docker Compose file for auxiliary services +- [o] Check if using a CrateDB schema works well +- [o] Refinements +- [o] Verify documentation +- [o] Refactor general purpose code to `pueblo` package + +************ +Iteration +3 +************ +- [o] Unlock more parameters in InfluxDbAdapter.write_df - [o] Format: Compressed line protocol - [o] Format: Annotated CSV - https://docs.influxdata.com/influxdb/v2.6/reference/syntax/annotated-csv/ @@ -28,7 +40,10 @@ Iteration 2 - [o] cloud-to-cloud copy - [o] influxio list testdata:// - [o] "SQLAlchemy » Dialects built-in" is broken +- [o] ``DBURI = "crate+psycopg://localhost:4200"`` +References +========== - https://docs.influxdata.com/influxdb/v2.6/migrate-data/ - https://docs.influxdata.com/influxdb/v2.6/reference/cli/influx/write/ - https://docs.influxdata.com/influxdb/v2.6/reference/cli/influx/backup/ diff --git a/examples/export_lineprotocol.py b/examples/export_lineprotocol.py new file mode 100644 index 0000000..c4ca01d --- /dev/null +++ b/examples/export_lineprotocol.py @@ -0,0 +1,57 @@ +""" +Miniature data pipeline shuffling data from InfluxDB to PostgreSQL/CrateDB. + +- Load a synthetic pandas DataFrame into InfluxDB. +- Export data to InfluxDB line protocol format (ILP). +- Read back a few samples worth of data from the ILP file. + +https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/ +""" +import gzip +import logging +from pathlib import Path + +from influxio.io import dataframe_from_lineprotocol +from influxio.model import InfluxDbAdapter +from influxio.testdata import DataFrameFactory +from influxio.util.common import setup_logging + +logger = logging.getLogger(__name__) + + +LINEPROTOCOL_FILE = Path("./var/export/demo.lp.gz") +DATASET_SIZE = 15_000 + + +def main(): + logger.info("Connecting to InfluxDB") + influx = InfluxDbAdapter( + url="http://localhost:8086", + org="example", + token="token", # noqa: S106 + bucket="testdrive", + measurement="demo", + ) + + # Provision data to source database. + logger.info("Provisioning InfluxDB") + dff = DataFrameFactory(rows=DATASET_SIZE) + df = dff.make("dateindex") + influx.write_df(df) + + # Export data into file using lineprotocol format. + logger.info("Exporting data to lineprotocol file (ILP)") + LINEPROTOCOL_FILE.parent.mkdir(parents=True, exist_ok=True) + influx.to_lineprotocol(engine_path="./var/lib/influxdb2/engine", output_path=LINEPROTOCOL_FILE) + + logger.info("Reading back data from lineprotocol file") + with gzip.open(LINEPROTOCOL_FILE) as buffer: + df = dataframe_from_lineprotocol(buffer) + print(df) # noqa: T201 + + logger.info("Ready.") + + +if __name__ == "__main__": + setup_logging() + main() diff --git a/examples/export_sqlalchemy.py b/examples/export_sqlalchemy.py new file mode 100644 index 0000000..acfc20d --- /dev/null +++ b/examples/export_sqlalchemy.py @@ -0,0 +1,58 @@ +""" +Miniature data pipeline shuffling data from InfluxDB to PostgreSQL/CrateDB. + +- Load a synthetic pandas DataFrame into InfluxDB. +- Transfer data from InfluxDB to RDBMS database using SQLAlchemy/pandas/Dask. +- Read back a few samples worth of data from the RDBMS database. +""" +import logging + +import sqlalchemy as sa + +from influxio.io import dataframe_to_sql +from influxio.model import InfluxDbAdapter +from influxio.testdata import DataFrameFactory +from influxio.util.common import jd, setup_logging + +logger = logging.getLogger(__name__) + +DBURI = "crate://localhost:4200" +DATASET_SIZE = 15_000 + + +def main(): + logger.info("Connecting to InfluxDB") + influx = InfluxDbAdapter( + url="http://localhost:8086", + org="example", + token="token", # noqa: S106 + bucket="testdrive", + measurement="demo", + ) + + # Provision data to source database. + logger.info("Provisioning InfluxDB") + dff = DataFrameFactory(rows=DATASET_SIZE) + df = dff.make("dateindex") + influx.write_df(df) + + # Transfer data. + logger.info("Transferring data") + for df in influx.read_df(): + logger.info("Loading data frame into RDBMS/SQL database using pandas/Dask") + dataframe_to_sql(df, dburi=DBURI, tablename="demo", progress=True) + + # Read back data from target database. + logger.info("Reading back data from the target database") + engine = sa.create_engine(DBURI) + with engine.connect() as connection: + connection.execute(sa.text("REFRESH TABLE demo;")) + result = connection.execute(sa.text("SELECT * FROM demo LIMIT 3;")) + records = [dict(item) for item in result.mappings().fetchall()] + jd(records) + logger.info("Ready.") + + +if __name__ == "__main__": + setup_logging() + main() diff --git a/influxio/core.py b/influxio/core.py index 4571f97..d317325 100644 --- a/influxio/core.py +++ b/influxio/core.py @@ -3,13 +3,14 @@ from yarl import URL -from influxio.model import InfluxAPI +from influxio.model import InfluxDbAdapter, SqlAlchemyAdapter from influxio.testdata import DataFrameFactory +from influxio.util.db import get_sqlalchemy_dialects logger = logging.getLogger(__name__) -def copy(source: str, target: str): +def copy(source: str, target: str, progress: bool = False): """ Copy/transfer data from/to InfluxDB API / InfluxDB line protocol / RDBMS. @@ -17,7 +18,7 @@ def copy(source: str, target: str): `source` and `target` are resource identifiers in URL format. - When the InfluxDB API is addressed, the schema is: + When InfluxDB is addressed, the schema is: http://example:token@localhost:8086/testdrive/demo This means: @@ -25,30 +26,47 @@ def copy(source: str, target: str): - Authentication: token - Bucket: testdrive - Measurement: demo + + When an RDBMS is addressed through SQLAlchemy, the schema is: + http://username:password@localhost:12345/testdrive/demo + + This means: + - Database or schema: testdrive + - Table name: demo """ - source = URL(source) - target = URL(target) + source_url = URL(source) + target_url = URL(target) + + sqlalchemy_dialects = get_sqlalchemy_dialects() logger.info(f"Copying from {source} to {target}") - if target.scheme.startswith("http"): - sink = InfluxAPI.from_url(target) + scheme_primary = target_url.scheme.split("+")[0] + + if target_url.scheme.startswith("http"): + sink = InfluxDbAdapter.from_url(target) + elif scheme_primary in sqlalchemy_dialects: + sink = SqlAlchemyAdapter.from_url(target, progress=True) else: - raise NotImplementedError(f"Data sink not implemented: {target}") + raise NotImplementedError(f"Data sink not implemented: {target_url}") - if source.scheme == "testdata": - dff = DataFrameFactory(**source.query) - df = dff.make(source.host) + if source_url.scheme == "testdata": + dff = DataFrameFactory(**source_url.query) + df = dff.make(source_url.host) sink.write_df(df) - elif source.scheme == "file": - path = Path(source.host).joinpath(Path(source.path).relative_to("/")) + elif source_url.scheme == "file": + path = Path(source_url.host).joinpath(Path(source_url.path).relative_to("/")) # TODO: Determine file type by suffix. # TODO: Make `precision` configurable. - sink.write_lineprotocol(path) + sink.from_lineprotocol(path) - elif source.scheme.startswith("http"): - sink.write_lineprotocol(str(source)) + elif source_url.scheme.startswith("http"): + if isinstance(sink, SqlAlchemyAdapter): + source_node = InfluxDbAdapter.from_url(source) + sink.write(source_node) + else: + sink.from_lineprotocol(str(source_url)) else: - raise NotImplementedError(f"Data source not implemented: {source}") + raise NotImplementedError(f"Data source not implemented: {source_url}") diff --git a/influxio/io.py b/influxio/io.py new file mode 100644 index 0000000..e45b00a --- /dev/null +++ b/influxio/io.py @@ -0,0 +1,122 @@ +import logging +import os +import typing as t +from collections import OrderedDict +from pathlib import Path + +import fsspec +import pandas as pd + +logger = logging.getLogger(__name__) + + +BytesString = t.Union[bytes, str] +BytesStringList = t.List[BytesString] + + +def open(path: t.Union[Path, str]): # noqa: A001 + """ + Access a plethora of resources using `fsspec`. + """ + path = str(path) + kwargs: t.Dict[str, t.Any] = {} + + # TODO: Also support authenticated S3. + if path.startswith("s3"): + kwargs["anon"] = True + + # TODO: Why isn't compression selected transparently? + if path.endswith(".gz"): + kwargs["compression"] = "gzip" + fs = fsspec.open(path, mode="rb", **kwargs).open() + return fs + + +def read_lineprotocol(data: t.IO[t.Any]): + """ + Read stream of InfluxDB line protocol and decode raw data. + + https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/ + """ + from line_protocol_parser import LineFormatError, parse_line + + for line in data.readlines(): + try: + yield parse_line(line) + except LineFormatError as ex: + logger.info(f"WARNING: Line protocol item {line} invalid. Reason: {ex}") + + +def records_from_lineprotocol(data: t.IO[t.Any]): + """ + Read stream of InfluxDB line protocol and generate `OrderedDict` records. + """ + for lp in read_lineprotocol(data=data): + record = OrderedDict() + record["time"] = lp["time"] + for tag, value in lp["tags"].items(): + record[tag] = value + for field, value in lp["fields"].items(): + record[field] = value + yield record + + +def dataframe_from_lineprotocol(data: t.IO[t.Any]): + """ + Read stream of InfluxDB line protocol into pandas DataFrame. + """ + records = records_from_lineprotocol(data) + return pd.DataFrame(records) + + +def dataframe_to_sql( + df: pd.DataFrame, + dburi: str, + tablename: str, + index=False, + chunksize=None, + if_exists="replace", + npartitions: int = None, + progress: bool = False, +): + """ + Load pandas dataframe into database using Dask. + + https://stackoverflow.com/questions/62404502/using-dasks-new-to-sql-for-improved-efficiency-memory-speed-or-alternative-to + """ + import dask.dataframe as dd + + # Set a few defaults. + chunksize = chunksize or 5_000 + npartitions = npartitions or int(os.cpu_count() / 2) + + if progress: + from dask.diagnostics import ProgressBar + + pbar = ProgressBar() + pbar.register() + + if dburi.startswith("crate"): + # TODO: Submit patch to upstream `crate-python`. This is another proof that something is wrong. + from cratedb_toolkit.sqlalchemy import patch_inspector + + patch_inspector() + + # Use performance INSERT method. + from crate.client.sqlalchemy.support import insert_bulk + + method = insert_bulk + else: + method = "multi" + + # Load data into database. + ddf = dd.from_pandas(df, npartitions=npartitions) + return ddf.to_sql( + tablename, + uri=dburi, + index=index, + chunksize=chunksize, + if_exists=if_exists, + method=method, + parallel=True, + ) diff --git a/influxio/model.py b/influxio/model.py index c305c8a..fc4d86b 100644 --- a/influxio/model.py +++ b/influxio/model.py @@ -3,38 +3,45 @@ import typing as t from pathlib import Path -import dask.dataframe as dd import influxdb_client.rest import pandas as pd -from dask.diagnostics import ProgressBar +import psycopg2 +import sqlalchemy +import sqlalchemy as sa from influxdb_client import InfluxDBClient +from sqlalchemy_utils import create_database from yarl import URL +from influxio.io import dataframe_to_sql from influxio.util.common import run_command logger = logging.getLogger(__name__) -class InfluxAPI: - def __init__(self, url: str, token: str, org: str, bucket: str, measurement: str): +class InfluxDbAdapter: + def __init__(self, url: str, token: str, org: str, bucket: str, measurement: str, debug: bool = False): self.url = url self.token = token self.org = org self.bucket = bucket self.measurement = measurement - self.client = InfluxDBClient(url=self.url, org=self.org, token=self.token) + self.debug = debug + self.client = InfluxDBClient(url=self.url, org=self.org, token=self.token, debug=self.debug) @classmethod - def from_url(cls, url: t.Union[URL, str]) -> "InfluxAPI": + def from_url(cls, url: t.Union[URL, str], **kwargs) -> "InfluxDbAdapter": if isinstance(url, str): url: URL = URL(url) token = url.password org = url.user bucket, measurement = url.path.strip("/").split("/") bare_url = f"{url.scheme}://{url.host}:{url.port}" - return cls(url=bare_url, token=token, org=org, bucket=bucket, measurement=measurement) + return cls(url=bare_url, token=token, org=org, bucket=bucket, measurement=measurement, **kwargs) - def delete(self): + def delete_measurement(self): + """ + https://docs.influxdata.com/influxdb/cloud/write-data/delete-data/ + """ try: return self.client.delete_api().delete( start="1677-09-21T00:12:43.145224194Z", @@ -79,6 +86,25 @@ def ensure_bucket(self): raise logger.info(f"Bucket id is {self.get_bucket_id()}") + def delete_bucket(self, missing_ok: bool = True): + """ + https://docs.influxdata.com/influxdb/v2/admin/buckets/delete-bucket/ + """ + try: + bucket_id = self.get_bucket_id() + except KeyError: + if missing_ok: + return + else: + raise + try: + self.client.buckets_api().delete_bucket(bucket_id) + except influxdb_client.rest.ApiException as ex: + if ex.status == 404 and missing_ok: + pass + else: + raise + def write_df(self, df: pd.DataFrame): """ Use batching API to import data frame into InfluxDB. @@ -99,8 +125,10 @@ def write_df(self, df: pd.DataFrame): # data_frame_tag_columns=['tag'], # noqa: ERA001 ) - def write_lineprotocol(self, source: t.Union[Path, str], precision: str = "ns"): + def from_lineprotocol(self, source: t.Union[Path, str], precision: str = "ns"): """ + Import data from file in lineprotocol format (ILP) by invoking `influx write`. + Precision of the timestamps of the lines (default: ns) [$INFLUX_PRECISION] The default precision for timestamps is in nanoseconds. If the precision of @@ -130,6 +158,7 @@ def write_lineprotocol(self, source: t.Union[Path, str], precision: str = "ns"): source_option = f'--file="{str(source)}"' command = f""" influx write \ + --host="{self.url}" \ --token="{self.token}" \ --org="{self.org}" \ --bucket="{self.bucket}" \ @@ -149,17 +178,26 @@ def get_bucket_id(self): raise KeyError(f"Bucket not found: {self.bucket}") return bucket.id - def to_lineprotocol(self, engine_path: str, output_path: str): + def to_lineprotocol(self, engine_path: str, output_path: t.Union[str, Path]): """ - https://docs.influxdata.com/influxdb/v2.6/migrate-data/migrate-oss/ + Export data into lineprotocol format (ILP) by invoking `influxd inspect export-lp`. + + TODO: Using a hyphen `-` for `--output-path` works well now, so export can also go to stdout. + TODO: By default, it will *append* to the .lp file. + Make it configurable to "replace" data. + TODO: Make it configurable to use compression, or not. + TODO: Propagate `--start` and `--end` parameters. + TODO: Capture stderr messages, and forward user admonition. + »detected deletes in WAL file, some deleted data may be brought back by replaying this export« + -- https://github.com/influxdata/influxdb/issues/24456 - :return: + https://docs.influxdata.com/influxdb/v2.6/migrate-data/migrate-oss/ """ logger.info("Exporting data to InfluxDB line protocol format (ILP)") bucket_id = self.get_bucket_id() command = f""" influxd inspect export-lp \ - --engine-path {engine_path} \ + --engine-path '{engine_path}' \ --bucket-id '{bucket_id}' \ --measurement '{self.measurement}' \ --output-path '{output_path}' \ @@ -167,33 +205,103 @@ def to_lineprotocol(self, engine_path: str, output_path: str): """ run_command(command) - def df_to_sql(self, df: pd.DataFrame): - """ - https://stackoverflow.com/questions/62404502/using-dasks-new-to-sql-for-improved-efficiency-memory-speed-or-alternative-to - """ - logger.info("Importing data frame to CrateDB") - pbar = ProgressBar() - pbar.register() - - # TODO: Make variable configurable. - # Default to nproc / 2? - ddf = dd.from_pandas(df, npartitions=4) - - # TODO: Make configurable. - dburi = "crate://localhost:4200" - # dburi = "crate+psycopg://localhost:4200" # noqa: ERA001 - - # NOTE: `chunksize` is important, otherwise CrateDB will croak with - # RuntimeException[java.lang.OutOfMemoryError: Java heap space] - # TODO: Unlock `engine_kwargs={"fast_executemany": True}`. - # Exception: - # TypeError: Invalid argument(s) 'fast_executemany' sent to create_engine(), - # using configuration CrateDialect/QueuePool/Engine. - # Please check that the keyword arguments are appropriate for this combination of components. - # Note that `fast_executemany` is only provided by `pyodbc`. - # TODO: Unlock `method="multi"` - # sqlalchemy.exc.CompileError: The 'crate' dialect with current - # database version settings does not support in-place multirow inserts. - ddf.to_sql("demo", uri=dburi, index=False, if_exists="replace", chunksize=10000, parallel=True) - - # TODO: Read back and `assert_frame_equal()` + +def decode_database_table(url: URL): + """ + Decode database and table names from database URI path and/or query string. + + Variants: + + // + ?database=&table=
+ ?schema=&table=
+ """ + try: + database, table = url.path.strip("/").split("/") + except ValueError as ex: + if "too many values to unpack" not in str(ex) and "not enough values to unpack" not in str(ex): + raise + database = url.query.get("database") + table = url.query.get("table") + if url.scheme == "crate" and not database: + database = url.query.get("schema") + return database, table + + +class SqlAlchemyAdapter: + """ + Adapter to talk to SQLAlchemy-compatible databases. + """ + + def __init__(self, url: t.Union[URL, str], progress: bool = False, debug: bool = False): + self.progress = progress + + if isinstance(url, str): + url: URL = URL(url) + + self.database, self.table = decode_database_table(url) + + # Special handling for SQLite and CrateDB databases. + self.dburi = str(url.with_query(None)) + if url.scheme == "crate": + url = url.with_path("") + if self.database: + url = url.with_query({"schema": self.database}) + self.dburi = str(url) + elif url.scheme == "sqlite": + self.dburi = self.dburi.replace("sqlite:/", "sqlite:///") + else: + url = url.with_path(self.database) + self.dburi = str(url) + + logger.info(f"SQLAlchemy DB URI: {self.dburi}") + + @classmethod + def from_url(cls, url: t.Union[URL, str], **kwargs) -> "SqlAlchemyAdapter": + return cls(url=url, **kwargs) + + def write(self, source: t.Union[pd.DataFrame, InfluxDbAdapter]): + logger.info("Loading dataframes into RDBMS/SQL database using pandas/Dask") + if isinstance(source, InfluxDbAdapter): + for df in source.read_df(): + dataframe_to_sql(df, dburi=self.dburi, tablename=self.table, progress=self.progress) + elif isinstance(source, pd.DataFrame): + dataframe_to_sql(source, dburi=self.dburi, tablename=self.table, progress=self.progress) + else: + raise NotImplementedError(f"Failed handling source: {source}") + + def refresh_table(self): + engine = sa.create_engine(self.dburi) + with engine.connect() as connection: + return connection.execute(sa.text(f"REFRESH TABLE {self.table};")) + + def read_records(self) -> t.List[t.Dict]: + engine = sa.create_engine(self.dburi) + with engine.connect() as connection: + result = connection.execute(sa.text(f"SELECT * FROM {self.table};")) # noqa: S608 + records = [dict(item) for item in result.mappings().fetchall()] + return records + + def create_database(self): + try: + return create_database(self.dburi) + except sqlalchemy.exc.ProgrammingError as ex: + if "psycopg2.errors.DuplicateDatabase" not in str(ex): + raise + + def run_sql(self, sql: str): + engine = sa.create_engine(self.dburi) + with engine.connect() as connection: + connection.connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + return connection.execute(sa.text(sql)) + + def run_sql_raw(self, sql: str): + engine = sa.create_engine(self.dburi) + connection = engine.raw_connection() + connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + cursor = connection.cursor() + cursor.execute(sql) + result = cursor.fetchall() + cursor.close() + connection.close() + return result diff --git a/influxio/testdata.py b/influxio/testdata.py index 77ca9cf..ece9f53 100644 --- a/influxio/testdata.py +++ b/influxio/testdata.py @@ -49,9 +49,8 @@ def make_dummy() -> pd.DataFrame: def make_mixed() -> pd.DataFrame: return makeMixedDataFrame() - @staticmethod - def make_dateindex(rows: int = default_rows_count) -> pd.DataFrame: - return makeTimeDataFrame(nper=rows, freq="S") + def make_dateindex(self) -> pd.DataFrame: + return makeTimeDataFrame(nper=self.rows, freq="S") @staticmethod def make_wide(rows: int = default_rows_count, columns: int = 99) -> pd.DataFrame: diff --git a/influxio/util/common.py b/influxio/util/common.py index f141592..9589246 100644 --- a/influxio/util/common.py +++ b/influxio/util/common.py @@ -36,7 +36,7 @@ def run_command(command: str): """ command = dedent(command).strip() cmd = shlex.split(command) - + logger.info(f"Running command: {command}") try: output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True) # noqa: S603 except subprocess.CalledProcessError as exc: diff --git a/influxio/util/compat.py b/influxio/util/compat.py new file mode 100644 index 0000000..6a8723c --- /dev/null +++ b/influxio/util/compat.py @@ -0,0 +1,13 @@ +import sys +import typing + +if typing.TYPE_CHECKING: + from importlib.metadata import EntryPoints + + +def entry_points(**params) -> "EntryPoints": + if sys.version_info < (3, 10): + from importlib_metadata import entry_points + else: + from importlib.metadata import entry_points + return entry_points(**params) diff --git a/influxio/util/db.py b/influxio/util/db.py new file mode 100644 index 0000000..50eb12a --- /dev/null +++ b/influxio/util/db.py @@ -0,0 +1,14 @@ +def get_sqlalchemy_dialects(): + """ + Return list of available SQLAlchemy dialects. + + TODO: Synchronize with influxio.util.report. + """ + import sqlalchemy.dialects + + from influxio.util.compat import entry_points + + dialects = list(sqlalchemy.dialects.__all__) + eps = entry_points(group="sqlalchemy.dialects") + dialects += [dialect.name for dialect in eps] + return sorted(set(dialects)) diff --git a/influxio/util/report.py b/influxio/util/report.py index 4776a9a..1c4c086 100644 --- a/influxio/util/report.py +++ b/influxio/util/report.py @@ -77,12 +77,12 @@ def platform(): print() # SQLAlchemy - from importlib.metadata import entry_points - import sqlalchemy.dialects + from influxio.util.compat import entry_points + subsection("SQLAlchemy") - print(bullet_item(sqlalchemy.dialects.registry.impls.keys(), label="Dialects built-in")) + print(bullet_item(list(sqlalchemy.dialects.__all__), label="Dialects built-in")) eps = entry_points(group="sqlalchemy.dialects") dialects = [dialect.name for dialect in eps] print(bullet_item(dialects, label="Dialects 3rd-party")) diff --git a/pyproject.toml b/pyproject.toml index 78a8cf3..38574bb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,12 +15,18 @@ description = "Import and export data into/from InfluxDB" readme = "README.rst" keywords = [ "export", + "ilp", "import", + "influx", "influxdb", "line-protocol", "lineprotocol", + "rdbms", + "sql", + "sqlalchemy", "timeseries", "timeseries-data", + "transfer", ] license = { text = "MIT" } authors = [ @@ -46,8 +52,12 @@ classifiers = [ "Operating System :: POSIX :: Linux", "Operating System :: Unix", "Programming Language :: Python", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", "Topic :: Communications", "Topic :: Database", "Topic :: Documentation", @@ -69,10 +79,14 @@ dependencies = [ "click<9", "colorama<1", "crate[sqlalchemy]", + "cratedb-toolkit", "dask", - "influxdb-client[ciso]", + 'importlib-metadata; python_version <= "3.9"', + "influxdb-client[ciso]<2", "line-protocol-parser<2", "pandas<2.2", + "psycopg2-binary<3", + "SQLAlchemy-Utils<0.42", "yarl<2", ] [project.optional-dependencies] @@ -86,7 +100,6 @@ develop = [ ] release = [ "build<2", - 'minibump<1; python_version >= "3.10"', "twine<5", ] test = [ @@ -134,6 +147,7 @@ testpaths = [ ] xfail_strict = true markers = [ + "examples", "slow", ] @@ -213,10 +227,8 @@ lint = [ ] release = [ - { cmd = "minibump bump --relax minor" }, { cmd = "python -m build" }, { cmd = "twine upload dist/*" }, ] -test = { cmd = "pytest -m 'not roadrunner'" } -test-roadrunner = { cmd = "pytest -m 'roadrunner'" } +test = { cmd = "pytest" } diff --git a/tests/conftest.py b/tests/conftest.py index d673b94..4536caf 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -24,7 +24,7 @@ def line_protocol_file_basic(): @pytest.fixture def line_protocol_url_basic(): - return "https://raw.githubusercontent.com/daq-tools/influxio/main/tests/testdata/basic.lp" + return "https://github.com/daq-tools/influxio/raw/main/tests/testdata/basic.lp" @pytest.fixture diff --git a/tests/test_adapter.py b/tests/test_adapter.py new file mode 100644 index 0000000..a38772c --- /dev/null +++ b/tests/test_adapter.py @@ -0,0 +1,36 @@ +from influxio.model import SqlAlchemyAdapter + + +def test_cratedb_adapter_void(): + adapter = SqlAlchemyAdapter.from_url("crate://localhost:4200/") + assert adapter.database is None + assert adapter.table is None + assert adapter.dburi == "crate://localhost:4200" + + +def test_cratedb_adapter_universal_notation(): + adapter = SqlAlchemyAdapter.from_url("crate://localhost:4200/testdrive/basic") + assert adapter.database == "testdrive" + assert adapter.table == "basic" + assert adapter.dburi == "crate://localhost:4200/?schema=testdrive" + + +def test_cratedb_adapter_table(): + adapter = SqlAlchemyAdapter.from_url("crate://localhost:4200/?table=basic") + assert adapter.database is None + assert adapter.table == "basic" + assert adapter.dburi == "crate://localhost:4200" + + +def test_cratedb_adapter_schema_table(): + adapter = SqlAlchemyAdapter.from_url("crate://localhost:4200/?schema=testdrive&table=basic") + assert adapter.database == "testdrive" + assert adapter.table == "basic" + assert adapter.dburi == "crate://localhost:4200/?schema=testdrive" + + +def test_cratedb_adapter_database_table(): + adapter = SqlAlchemyAdapter.from_url("crate://localhost:4200/?database=testdrive&table=basic") + assert adapter.database == "testdrive" + assert adapter.table == "basic" + assert adapter.dburi == "crate://localhost:4200/?schema=testdrive" diff --git a/tests/test_examples.py b/tests/test_examples.py new file mode 100644 index 0000000..9261148 --- /dev/null +++ b/tests/test_examples.py @@ -0,0 +1,97 @@ +import gzip +import os +import subprocess +import sys +import typing as t +from pathlib import Path + +import pytest +import sqlalchemy as sa + +from examples.export_lineprotocol import LINEPROTOCOL_FILE +from examples.export_sqlalchemy import DBURI +from influxio.model import InfluxDbAdapter + + +def get_example_program_path(filename: str): + """ + Compute path to example program. + """ + return Path(__file__).parent.parent.joinpath("examples").joinpath(filename) + + +def run_program(command: t.List[str]): + """ + Run a program, connecting stdout and stderr streams with the current ones. + """ + return subprocess.check_call( + command, # noqa: S603 + stdout=sys.stdout.buffer, + stderr=sys.stderr.buffer, + ) + + +@pytest.fixture(scope="function", autouse=True) +def reset_resources(): + """ + Make sure each test case function uses a fresh environment. + """ + + # Reset InfluxDB database. + influx = InfluxDbAdapter( + url="http://localhost:8086", + org="example", + token="token", # noqa: S106 + bucket="testdrive", + measurement="demo", + ) + influx.delete_bucket() + + # Reset RDBMS database. + engine = sa.create_engine(DBURI) + with engine.connect() as connection: + connection.execute(sa.text("DROP TABLE IF EXISTS demo;")) + + # Reset export files. + LINEPROTOCOL_FILE.unlink(missing_ok=True) + + +@pytest.mark.examples +def test_example_dataframe(): + """ + Verify the `examples/export_sqlalchemy.py` program. + """ + + # Invoke example program. + example = get_example_program_path("export_sqlalchemy.py") + exitcode = run_program([sys.executable, example]) + assert exitcode == 0 + + # Verify database content. + engine = sa.create_engine(DBURI) + with engine.connect() as connection: + connection.execute(sa.text("REFRESH TABLE demo;")) + result = connection.execute(sa.text("SELECT COUNT(*) FROM demo;")) + assert result.fetchone() == (15_000,) + + +@pytest.mark.examples +def test_example_lineprotocol(): + """ + Verify the `examples/export_lineprotocol.py` program. + """ + + if "CI" in os.environ: + raise pytest.skip("Needs access to InfluxDB storage file location") + + # Invoke example program. + example = get_example_program_path("export_lineprotocol.py") + exitcode = run_program([sys.executable, example]) + assert exitcode == 0 + + # Verify content of lineprotocol file. + # Because the example input data contains four columns, + # there are four times more lines than individual records. + with gzip.open(LINEPROTOCOL_FILE) as buffer: + lines = buffer.readlines() + assert len(lines) == 15_000 * 4 diff --git a/tests/test_export.py b/tests/test_export.py new file mode 100644 index 0000000..6165a21 --- /dev/null +++ b/tests/test_export.py @@ -0,0 +1,112 @@ +import pytest + +import influxio.core +from influxio.model import InfluxDbAdapter, SqlAlchemyAdapter + +CRATEDB_URL = "crate://crate@localhost:4200/testdrive/basic" +INFLUXDB_URL = "http://example:token@localhost:8086/testdrive/basic" +POSTGRESQL_URL = "postgresql+psycopg2://postgres@localhost:5432/testdrive/basic" + + +@pytest.fixture +def influxdb() -> InfluxDbAdapter: + return InfluxDbAdapter.from_url(INFLUXDB_URL) + + +@pytest.fixture +def cratedb() -> SqlAlchemyAdapter: + return SqlAlchemyAdapter.from_url(CRATEDB_URL) + + +@pytest.fixture +def sqlite_url(tmp_path) -> str: + dbpath = tmp_path / "basic.sqlite" + return f"sqlite:///{dbpath}?table=basic" + + +@pytest.fixture +def sqlite(sqlite_url) -> SqlAlchemyAdapter: + return SqlAlchemyAdapter.from_url(sqlite_url) + + +@pytest.fixture +def postgresql() -> SqlAlchemyAdapter: + adapter = SqlAlchemyAdapter.from_url(POSTGRESQL_URL) + adapter.create_database() + return adapter + + +@pytest.fixture +def provision_influxdb(influxdb, line_protocol_file_basic): + """ + Provision seed data to InfluxDB. + """ + source_url = f"file://{line_protocol_file_basic}" + target_url = INFLUXDB_URL + + # Make sure database is purged. + influxdb.delete_measurement() + + # Transfer data. + influxio.core.copy(source_url, target_url) + + +def test_export_cratedb(caplog, influxdb, provision_influxdb, cratedb): + """ + Export data from InfluxDB to CrateDB. + """ + + source_url = INFLUXDB_URL + target_url = CRATEDB_URL + + # Transfer data. + influxio.core.copy(source_url, target_url) + + # Verify execution. + assert f"Copying from {source_url} to {target_url}" in caplog.messages + assert "Loading dataframes into RDBMS/SQL database using pandas/Dask" in caplog.messages + + # Verify number of records in target database. + cratedb.refresh_table() + records = cratedb.read_records() + assert len(records) == 2 + + +def test_export_postgresql(caplog, influxdb, provision_influxdb, postgresql): + """ + Export data from InfluxDB to PostgreSQL. + """ + + source_url = INFLUXDB_URL + target_url = POSTGRESQL_URL + + # Transfer data. + influxio.core.copy(source_url, target_url) + + # Verify execution. + assert f"Copying from {source_url} to {target_url}" in caplog.messages + assert "Loading dataframes into RDBMS/SQL database using pandas/Dask" in caplog.messages + + # Verify number of records in target database. + records = postgresql.read_records() + assert len(records) == 2 + + +def test_export_sqlite(caplog, influxdb, provision_influxdb, sqlite, sqlite_url): + """ + Export data from InfluxDB to SQLite. + """ + + source_url = INFLUXDB_URL + target_url = sqlite_url + + # Transfer data. + influxio.core.copy(source_url, target_url) + + # Verify execution. + assert f"Copying from {source_url} to {target_url}" in caplog.messages + assert "Loading dataframes into RDBMS/SQL database using pandas/Dask" in caplog.messages + + # Verify number of records in target database. + records = sqlite.read_records() + assert len(records) == 2 diff --git a/tests/test_write.py b/tests/test_import.py similarity index 83% rename from tests/test_write.py rename to tests/test_import.py index 520d695..db27e3e 100644 --- a/tests/test_write.py +++ b/tests/test_import.py @@ -3,7 +3,7 @@ import pytest import influxio.core -from influxio.model import InfluxAPI +from influxio.model import InfluxDbAdapter @dataclasses.dataclass @@ -22,7 +22,7 @@ class DatasetItemSpec: ], ids=lambda x: x.name, ) -def test_write_testdata(spec: DatasetItemSpec, caplog): +def test_import_testdata(spec: DatasetItemSpec, caplog): """ CLI test: Import test data to InfluxDB. """ @@ -31,8 +31,8 @@ def test_write_testdata(spec: DatasetItemSpec, caplog): target_url = f"http://example:token@localhost:8086/testdrive/testdata-{dataset}" # Make sure database is purged. - api = InfluxAPI.from_url(target_url) - api.delete() + api = InfluxDbAdapter.from_url(target_url) + api.delete_measurement() # Transfer data. influxio.core.copy(source_url, target_url) @@ -47,7 +47,7 @@ def test_write_testdata(spec: DatasetItemSpec, caplog): assert len(records) == spec.length -def test_write_lineprotocol_file(line_protocol_file_basic, caplog): +def test_import_lineprotocol_file(line_protocol_file_basic, caplog): """ CLI test: Import line protocol data to InfluxDB. """ @@ -55,8 +55,8 @@ def test_write_lineprotocol_file(line_protocol_file_basic, caplog): target_url = "http://example:token@localhost:8086/testdrive/basic" # Make sure database is purged. - api = InfluxAPI.from_url(target_url) - api.delete() + api = InfluxDbAdapter.from_url(target_url) + api.delete_measurement() # Transfer data. influxio.core.copy(source_url, target_url) @@ -70,7 +70,7 @@ def test_write_lineprotocol_file(line_protocol_file_basic, caplog): assert len(records) == 2 -def test_write_lineprotocol_url(line_protocol_url_basic, caplog): +def test_import_lineprotocol_url(line_protocol_url_basic, caplog): """ CLI test: Import line protocol data to InfluxDB. """ @@ -78,8 +78,8 @@ def test_write_lineprotocol_url(line_protocol_url_basic, caplog): target_url = "http://example:token@localhost:8086/testdrive/basic" # Make sure database is purged. - api = InfluxAPI.from_url(target_url) - api.delete() + api = InfluxDbAdapter.from_url(target_url) + api.delete_measurement() # Transfer data. influxio.core.copy(source_url, target_url)