Skip to content

Commit

Permalink
Unlock loading ILP files from HTTP resources
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Sep 17, 2024
1 parent 82d9a65 commit b7f8b21
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Changelog
in progress
===========
- Unlock loading ILP files into SQLAlchemy databases
- Unlock loading ILP files from HTTP resources

2024-06-23 v0.4.0
=================
Expand Down
9 changes: 9 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,15 @@ Load data from InfluxDB files into any SQL database supported by SQLAlchemy.
"file://export.lp" \
"crate://crate@localhost:4200/testdrive/demo"
# From remote line protocol file to SQLite.
influxio copy \
"https://github.com/influxdata/influxdb2-sample-data/raw/master/air-sensor-data/air-sensor-data.lp" \
"sqlite:///export.sqlite?table=air-sensor-data"
# From remote line protocol file to CrateDB.
influxio copy \
"https://github.com/influxdata/influxdb2-sample-data/raw/master/air-sensor-data/air-sensor-data.lp" \
"crate://crate@localhost:4200/testdrive/demo"
Export from Cloud to Cloud
Expand Down
10 changes: 7 additions & 3 deletions influxio/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import psycopg2
import sqlalchemy
import sqlalchemy as sa
from fsspec import filesystem
from influxdb_client import InfluxDBClient
from sqlalchemy_utils import create_database
from upath import UPath
from yarl import URL

from influxio.io import dataframe_from_lineprotocol, dataframe_to_lineprotocol, dataframe_to_sql
Expand Down Expand Up @@ -374,10 +376,12 @@ def decode_database_table(url: URL):

def from_lineprotocol(self, source: t.Union[Path, str], precision: str = "ns"):
"""
Load data from file in lineprotocol format (ILP).
Load data from file or resource in lineprotocol format (ILP).
"""
logger.info(f"Loading line protocol file: {source}")
with open(source, "r") as fp:
logger.info(f"Loading line protocol data. source={source}")
p = UPath(source)
fs = filesystem(p.protocol, **p.storage_options) # equivalent to p.fs
with fs.open(p.path) as fp:
df = dataframe_from_lineprotocol(fp)
self.write(df)

Expand Down
6 changes: 4 additions & 2 deletions influxio/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,13 @@ def copy(source: str, target: str, progress: bool = False) -> t.Union[CommandRes
sink.from_lineprotocol(path)

elif source_url.scheme.startswith("http"):
if isinstance(sink, (FileAdapter, SqlAlchemyAdapter)):
# TODO: Improve dispatching.
source_url_str = str(source_url)
if isinstance(sink, (FileAdapter, SqlAlchemyAdapter)) and ".lp" not in source_url_str:
source_node = InfluxDbApiAdapter.from_url(source)
sink.write(source_node)
else:
sink.from_lineprotocol(str(source_url))
sink.from_lineprotocol(source_url_str)

else:
raise NotImplementedError(f"Data source not implemented: {source_url}")
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ dependencies = [
"cratedb-toolkit",
"dask[dataframe]>=2020",
'importlib-metadata; python_version <= "3.9"',
"fsspec[s3,http]",
"influx-line==1.0.0",
"influxdb-client[ciso]<2",
"line-protocol-parser<2",
Expand All @@ -97,6 +98,7 @@ dependencies = [
"pueblo>=0.0.7",
"sqlalchemy-cratedb>=0.37,<1",
"SQLAlchemy-Utils<0.42",
"universal-pathlib<0.3",
"yarl<2",
]
[project.optional-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion tests/test_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def test_import_lineprotocol_url(line_protocol_url_basic, caplog):

# Make sure database is purged.
api = InfluxDbApiAdapter.from_url(target_url)
api.delete_measurement()
api.delete_bucket()

# Transfer data.
influxio.core.copy(source_url, target_url)
Expand Down
27 changes: 26 additions & 1 deletion tests/test_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,32 @@ def test_load_lineprotocol_to_sqlite_file(line_protocol_file_basic, caplog):

# Verify execution.
assert f"Copying from {source_url} to {target_url}" in caplog.messages
assert "Loading line protocol file: tests/testdata/basic.lp" in caplog.messages
assert "Loading line protocol data. source=tests/testdata/basic.lp" in caplog.messages

# Verify number of records in target database.
db = SqlAlchemyAdapter.from_url(target_url)
records = db.read_records()
assert len(records) == 2


def test_load_lineprotocol_to_sqlite_url(line_protocol_url_basic, caplog):
"""
Load line protocol URL into SQLite.
"""

# Define source and target URLs.
source_url = line_protocol_url_basic
target_url = "sqlite:///export.sqlite?table=export"

# Make sure target database is purged.
Path("export.sqlite").unlink(missing_ok=True)

# Transfer data.
influxio.core.copy(source_url, target_url)

# Verify execution.
assert f"Copying from {source_url} to {target_url}" in caplog.messages
assert f"Loading line protocol data. source={line_protocol_url_basic}" in caplog.messages

# Verify number of records in target database.
db = SqlAlchemyAdapter.from_url(target_url)
Expand Down

0 comments on commit b7f8b21

Please sign in to comment.