Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Table Loader: Add capability to load InfluxDB Line Protocol (ILP) files #333

Merged
merged 1 commit into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- Improved `ctk shell` to also talk to CrateDB standalone databases
- Added basic utility command `ctk tail`, for tailing a database
table, and optionally following the tail
- Table Loader: Added capability to load InfluxDB Line Protocol (ILP) files

## 2024/10/13 v0.0.29
- MongoDB: Added Zyp transformations to the CDC subsystem,
Expand Down
6 changes: 5 additions & 1 deletion cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ def load_table(
logger.error("Data loading failed or incomplete")
return False

elif source_url_obj.scheme.startswith("influxdb"):
elif (
source_url_obj.scheme.startswith("influxdb")
or resource.url.endswith(".lp")
or resource.url.endswith(".lp.gz")
):
Comment on lines -132 to +136
Copy link
Member Author

@amotl amotl Dec 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The feature has been there already, and only needed to be unlocked here by adjusting the dispatching, and adding software tests and documentation.

from cratedb_toolkit.io.influxdb import influxdb_copy

http_scheme = "http"
Expand Down
10 changes: 10 additions & 0 deletions cratedb_toolkit/io/influxdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from influxio.core import copy

from cratedb_toolkit.model import DatabaseAddress

logger = logging.getLogger(__name__)


Expand All @@ -12,6 +14,14 @@ def influxdb_copy(source_url, target_url, progress: bool = False):
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo
ctk load table influxdb2://example:token@localhost:8086/testdrive/demo
"""

# Sanity checks.
target_address = DatabaseAddress.from_string(target_url)
url, table_address = target_address.decode()
if table_address.table is None:
raise ValueError("Table name is missing. Please adjust CrateDB database URL.")

# Invoke copy operation.
logger.info("Running InfluxDB copy")
copy(source_url, target_url, progress=progress)
return True
30 changes: 25 additions & 5 deletions doc/io/influxdb/loader.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
# InfluxDB Table Loader

## About
Load data from InfluxDB into CrateDB using a one-stop command
`ctk load table influxdb2://...`, in order to facilitate convenient
Load data from InfluxDB into CrateDB using one-stop commands
`ctk load table ...`, in order to facilitate convenient
data transfers to be used within data pipelines or ad hoc operations.

## Synopsis
- Load from InfluxDB server: `ctk load table influxdb2://...`
- Load from InfluxDB line protocol: `ctk load table file://observations.lp`

## Details
The InfluxDB table loader is based on the [influxio] package. Please also check
its documentation to learn about more of its capabilities, supporting you when
Expand All @@ -18,7 +22,13 @@ pip install --upgrade 'cratedb-toolkit[influxdb]'

## Usage

### Workstation
Prepare subsequent commands by defining the database address of your
CrateDB database cluster.
```shell
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo
```

### InfluxDB 2 API

An exemplary walkthrough, copying data from InfluxDB to CrateDB, both services
expected to be listening on `localhost`.
Expand All @@ -37,13 +47,11 @@ influx query "from(bucket:\"${INFLUX_BUCKET_NAME}\") |> range(start:-100y)"

Transfer data from InfluxDB bucket/measurement into CrateDB schema/table.
```shell
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo
ctk load table influxdb2://example:token@localhost:8086/testdrive/demo
```

Query data in CrateDB.
```shell
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo
ctk shell --command "SELECT * FROM testdrive.demo;"
ctk show table "testdrive.demo"
```
Expand All @@ -52,6 +60,18 @@ ctk show table "testdrive.demo"
- More convenient table querying.
:::

### InfluxDB Line protocol file (ILP)

Import ILP file from local filesystem.
```shell
ctk load table "file://influxdb-export.lp"
```

Import ILP file from a remote resource.
```shell
ctk load table \
"https://github.com/influxdata/influxdb2-sample-data/raw/master/air-sensor-data/air-sensor-data.lp"
```

### Cloud

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ full = [
]
influxdb = [
"cratedb-toolkit[io]",
"influxio>=0.4,<1",
"influxio>=0.5,<1",
]
io = [
"cr8",
Expand Down
40 changes: 40 additions & 0 deletions tests/io/influxdb/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,46 @@
from influxio.adapter import InfluxDbApiAdapter


def test_line_protocol_load_table_success(caplog, cratedb, needs_sqlalchemy2):
"""
CLI test: Invoke `ctk load table` for InfluxDB line protocol (ILP) file.
"""
ilp_url = "https://github.com/influxdata/influxdb2-sample-data/raw/master/noaa-ndbc-data/latest-observations.lp"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"

# Run transfer command.
runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb_url})
result = runner.invoke(
cli,
args=f"load table {ilp_url}",
catch_exceptions=False,
)
assert result.exit_code == 0

# Verify data in target database.
assert cratedb.database.table_exists("testdrive.demo") is True
assert cratedb.database.refresh_table("testdrive.demo") is True
assert cratedb.database.count_records("testdrive.demo") >= 500


def test_line_protocol_load_table_wrong_cratedb_url_failure(caplog, cratedb, needs_sqlalchemy2):
"""
CLI test: Invoke `ctk load table` for InfluxDB line protocol (ILP) file.
"""
ilp_url = "https://github.com/influxdata/influxdb2-sample-data/raw/master/noaa-ndbc-data/latest-observations.lp"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive"

# Run transfer command.
runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb_url})
with pytest.raises(ValueError) as ex:
runner.invoke(
cli,
args=f"load table {ilp_url}",
catch_exceptions=False,
)
assert ex.match("Table name is missing. Please adjust CrateDB database URL.")


def test_influxdb2_load_table(caplog, cratedb, influxdb, needs_sqlalchemy2):
"""
CLI test: Invoke `ctk load table` for InfluxDB.
Expand Down
Loading