From b7f8b213b9b352f3cff0539415993ad5ea7e3de9 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 17 Sep 2024 14:12:42 +0200 Subject: [PATCH] Unlock loading ILP files from HTTP resources --- CHANGES.rst | 1 + README.rst | 9 +++++++++ influxio/adapter.py | 10 +++++++--- influxio/core.py | 6 ++++-- pyproject.toml | 2 ++ tests/test_import.py | 2 +- tests/test_load.py | 27 ++++++++++++++++++++++++++- 7 files changed, 50 insertions(+), 7 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index c697521..c55f880 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -6,6 +6,7 @@ Changelog in progress =========== - Unlock loading ILP files into SQLAlchemy databases +- Unlock loading ILP files from HTTP resources 2024-06-23 v0.4.0 ================= diff --git a/README.rst b/README.rst index fcf191c..fdd1352 100644 --- a/README.rst +++ b/README.rst @@ -227,6 +227,15 @@ Load data from InfluxDB files into any SQL database supported by SQLAlchemy. "file://export.lp" \ "crate://crate@localhost:4200/testdrive/demo" + # From remote line protocol file to SQLite. + influxio copy \ + "https://github.com/influxdata/influxdb2-sample-data/raw/master/air-sensor-data/air-sensor-data.lp" \ + "sqlite:///export.sqlite?table=air-sensor-data" + + # From remote line protocol file to CrateDB. + influxio copy \ + "https://github.com/influxdata/influxdb2-sample-data/raw/master/air-sensor-data/air-sensor-data.lp" \ + "crate://crate@localhost:4200/testdrive/demo" Export from Cloud to Cloud diff --git a/influxio/adapter.py b/influxio/adapter.py index ffc4314..75ff977 100644 --- a/influxio/adapter.py +++ b/influxio/adapter.py @@ -10,8 +10,10 @@ import psycopg2 import sqlalchemy import sqlalchemy as sa +from fsspec import filesystem from influxdb_client import InfluxDBClient from sqlalchemy_utils import create_database +from upath import UPath from yarl import URL from influxio.io import dataframe_from_lineprotocol, dataframe_to_lineprotocol, dataframe_to_sql @@ -374,10 +376,12 @@ def decode_database_table(url: URL): def from_lineprotocol(self, source: t.Union[Path, str], precision: str = "ns"): """ - Load data from file in lineprotocol format (ILP). + Load data from file or resource in lineprotocol format (ILP). """ - logger.info(f"Loading line protocol file: {source}") - with open(source, "r") as fp: + logger.info(f"Loading line protocol data. source={source}") + p = UPath(source) + fs = filesystem(p.protocol, **p.storage_options) # equivalent to p.fs + with fs.open(p.path) as fp: df = dataframe_from_lineprotocol(fp) self.write(df) diff --git a/influxio/core.py b/influxio/core.py index ec5421a..435d873 100644 --- a/influxio/core.py +++ b/influxio/core.py @@ -85,11 +85,13 @@ def copy(source: str, target: str, progress: bool = False) -> t.Union[CommandRes sink.from_lineprotocol(path) elif source_url.scheme.startswith("http"): - if isinstance(sink, (FileAdapter, SqlAlchemyAdapter)): + # TODO: Improve dispatching. + source_url_str = str(source_url) + if isinstance(sink, (FileAdapter, SqlAlchemyAdapter)) and ".lp" not in source_url_str: source_node = InfluxDbApiAdapter.from_url(source) sink.write(source_node) else: - sink.from_lineprotocol(str(source_url)) + sink.from_lineprotocol(source_url_str) else: raise NotImplementedError(f"Data source not implemented: {source_url}") diff --git a/pyproject.toml b/pyproject.toml index 8369fa0..c436066 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -89,6 +89,7 @@ dependencies = [ "cratedb-toolkit", "dask[dataframe]>=2020", 'importlib-metadata; python_version <= "3.9"', + "fsspec[s3,http]", "influx-line==1.0.0", "influxdb-client[ciso]<2", "line-protocol-parser<2", @@ -97,6 +98,7 @@ dependencies = [ "pueblo>=0.0.7", "sqlalchemy-cratedb>=0.37,<1", "SQLAlchemy-Utils<0.42", + "universal-pathlib<0.3", "yarl<2", ] [project.optional-dependencies] diff --git a/tests/test_import.py b/tests/test_import.py index 150c0c6..880c68d 100644 --- a/tests/test_import.py +++ b/tests/test_import.py @@ -79,7 +79,7 @@ def test_import_lineprotocol_url(line_protocol_url_basic, caplog): # Make sure database is purged. api = InfluxDbApiAdapter.from_url(target_url) - api.delete_measurement() + api.delete_bucket() # Transfer data. influxio.core.copy(source_url, target_url) diff --git a/tests/test_load.py b/tests/test_load.py index 8456ad2..087e1c9 100644 --- a/tests/test_load.py +++ b/tests/test_load.py @@ -21,7 +21,32 @@ def test_load_lineprotocol_to_sqlite_file(line_protocol_file_basic, caplog): # Verify execution. assert f"Copying from {source_url} to {target_url}" in caplog.messages - assert "Loading line protocol file: tests/testdata/basic.lp" in caplog.messages + assert "Loading line protocol data. source=tests/testdata/basic.lp" in caplog.messages + + # Verify number of records in target database. + db = SqlAlchemyAdapter.from_url(target_url) + records = db.read_records() + assert len(records) == 2 + + +def test_load_lineprotocol_to_sqlite_url(line_protocol_url_basic, caplog): + """ + Load line protocol URL into SQLite. + """ + + # Define source and target URLs. + source_url = line_protocol_url_basic + target_url = "sqlite:///export.sqlite?table=export" + + # Make sure target database is purged. + Path("export.sqlite").unlink(missing_ok=True) + + # Transfer data. + influxio.core.copy(source_url, target_url) + + # Verify execution. + assert f"Copying from {source_url} to {target_url}" in caplog.messages + assert f"Loading line protocol data. source={line_protocol_url_basic}" in caplog.messages # Verify number of records in target database. db = SqlAlchemyAdapter.from_url(target_url)