Skip to content

Commit

Permalink
Feature: Export from InfluxDB and import into RDBMS
Browse files Browse the repository at this point in the history
Using SQLAlchemy/pandas/Dask.
  • Loading branch information
amotl committed Nov 12, 2023
1 parent 3c911c0 commit 1042115
Show file tree
Hide file tree
Showing 21 changed files with 791 additions and 111 deletions.
33 changes: 25 additions & 8 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
name: Tests

on:
pull_request: ~
push:
branches: [ main ]
pull_request:
branches: [ main ]

# Allow job to be triggered manually.
workflow_dispatch:
Expand All @@ -22,10 +21,11 @@ jobs:
strategy:
matrix:
os: ["ubuntu-latest"]
python-version: ["3.10", "3.11"]
influxdb-version: [ "2.6" ]
python-version: ["3.8", "3.11"]
influxdb-version: [ "2.6", "2.7" ]
fail-fast: false

# https://docs.github.com/en/actions/using-containerized-services/about-service-containers
services:

influxdb:
Expand All @@ -41,32 +41,49 @@ jobs:
DOCKER_INFLUXDB_INIT_BUCKET: default
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: token

cratedb:
image: crate/crate:nightly
ports:
- 4200:4200
env:
CRATE_HEAP_SIZE: 1g

postgresql:
image: postgres:16
ports:
- 5432:5432
env:
POSTGRES_HOST_AUTH_METHOD: trust

env:
OS: ${{ matrix.os }}
PYTHON: ${{ matrix.python-version }}

name: Python ${{ matrix.python-version }} on OS ${{ matrix.os }}
name: "
Python ${{ matrix.python-version }},
InfluxDB ${{ matrix.influxdb-version }},
OS ${{ matrix.os }}"
steps:

- name: Acquire sources
uses: actions/checkout@v3

# Install InfluxDB CLI tools.
- name: Setup InfluxDB CLI
- name: Set up InfluxDB CLI
uses: influxdata/influxdb-action@v3
with:
influxdb_version: 2.6.1
influxdb_start: false

- name: Setup Python
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
architecture: x64
cache: 'pip'
cache-dependency-path: 'pyproject.toml'

- name: Setup project
- name: Set up project
run: |
# `setuptools 0.64.0` adds support for editable install hooks (PEP 660).
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
.DS_Store
__pycache__
*.egg-info
.coverage
.coverage*
coverage.xml
/example_*.py
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ in progress
- Tests: Add test for ``influxio info``
- Feature: Add reading line protocol format from file
- Feature: Add reading line protocol format from URL
- Feature: Export from InfluxDB and import into RDBMS,
using SQLAlchemy/pandas/Dask


2023-xx-xx 0.1.0
Expand Down
20 changes: 10 additions & 10 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Synopsis
# Export from API to database.
influxio copy \
"http://example:token@localhost:8086/testdrive/demo" \
"sqlite://export.sqlite"
"sqlite://export.sqlite?table=demo"
**********
Expand All @@ -52,7 +52,7 @@ just use the OCI image on Podman or Docker.
docker run --rm --network=host ghcr.io/daq-tools/influxio \
influxio copy \
"http://example:token@localhost:8086/testdrive/demo" \
"crate://crate@localhost:4200/testdrive"
"crate://crate@localhost:4200/testdrive/demo"
*****
Expand Down Expand Up @@ -91,15 +91,15 @@ instances is to use Podman or Docker.
--env=DOCKER_INFLUXDB_INIT_BUCKET=default \
--env=DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=token \
--volume="$PWD/var/lib/influxdb2:/var/lib/influxdb2" \
influxdb:2.6
influxdb:2.7
- https://github.com/docker-library/docs/blob/master/influxdb/README.md

.. code-block:: sh
docker run --rm -it --publish=4200:4200 --publish=5432:5432 \
--volume="$PWD/var/lib/cratedb:/data" \
crate:5.2 -Cdiscovery.type=single-node
crate:5.5 -Cdiscovery.type=single-node
- https://github.com/docker-library/docs/blob/master/crate/README.md

Expand Down Expand Up @@ -155,12 +155,12 @@ Export
# From API to database file.
influxio copy \
"http://example:token@localhost:8086/testdrive/demo" \
"sqlite://export.sqlite"
"sqlite://export.sqlite?table=demo"
# From API to database server.
influxio copy \
"http://example:token@localhost:8086/testdrive/demo" \
"crate://crate@localhost:4200/testdrive"
"crate://crate@localhost:4200/testdrive?table=demo"
# From API to line protocol file.
influxio copy \
Expand All @@ -175,7 +175,7 @@ Export
# From line protocol file to database.
influxio copy \
"file://export.lp" \
"sqlite://export.sqlite"
"sqlite://export.sqlite?table=export"
OCI
---
Expand All @@ -197,11 +197,11 @@ or use the ``--interactive`` option to consume STDIN, like:
.. code-block:: sh
docker run --rm --volume=$(pwd):/data ghcr.io/daq-tools/influxio \
influxio copy "file:///data/export.lp" "sqlite:///data/export.sqlite"
influxio copy "file:///data/export.lp" "sqlite:///data/export.sqlite?table=export"
cat export.lp | \
docker run --rm --interactive --network=host ghcr.io/daq-tools/influxio \
influxio copy "stdin://?format=lp" "crate://crate@localhost:4200/testdrive"
influxio copy "stdin://?format=lp" "crate://crate@localhost:4200/testdrive/export"
In order to always run the latest ``nightly`` development version, and to use a
shortcut for that, this section outlines how to use an alias for ``influxio``,
Expand All @@ -213,7 +213,7 @@ keystrokes on subsequent invocations.
docker pull ghcr.io/daq-tools/influxio:nightly
alias influxio="docker run --rm --interactive ghcr.io/daq-tools/influxio:nightly influxio"
SOURCE=https://github.com/daq-tools/influxio/raw/main/tests/testdata/basic.lp
TARGET=crate://crate@localhost:4200/testdrive
TARGET=crate://crate@localhost:4200/testdrive/basic
influxio copy "${SOURCE}" "${TARGET}"
Expand Down
33 changes: 24 additions & 9 deletions doc/backlog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,32 @@ influxio backlog
################


***********
Iteration 1
***********

************
Iteration +1
************
- [x] Add project boilerplate
- [o] Add initial working version
- [o] Refinements
- [x] Make it work
- [x] Export to SQLite, PostgreSQL, and CrateDB
- [x] Fix documentation about crate:// target
- [o] Release 0.1.0


***********
Iteration 2
***********
************
Iteration +2
************
- [o] Tests using ``assert_dataframe_equal``?
- [o] Fix ``crate.client.sqlalchemy.dialect.DateTime`` re. ``TimezoneUnawareException``
- [o] Add Docker Compose file for auxiliary services
- [o] Check if using a CrateDB schema works well
- [o] Refinements
- [o] Verify documentation
- [o] Refactor general purpose code to `pueblo` package


************
Iteration +3
************
- [o] Unlock more parameters in InfluxDbAdapter.write_df
- [o] Format: Compressed line protocol
- [o] Format: Annotated CSV
- https://docs.influxdata.com/influxdb/v2.6/reference/syntax/annotated-csv/
Expand All @@ -28,7 +40,10 @@ Iteration 2
- [o] cloud-to-cloud copy
- [o] influxio list testdata://
- [o] "SQLAlchemy » Dialects built-in" is broken
- [o] ``DBURI = "crate+psycopg://localhost:4200"``

References
==========
- https://docs.influxdata.com/influxdb/v2.6/migrate-data/
- https://docs.influxdata.com/influxdb/v2.6/reference/cli/influx/write/
- https://docs.influxdata.com/influxdb/v2.6/reference/cli/influx/backup/
Expand Down
57 changes: 57 additions & 0 deletions examples/export_lineprotocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
Miniature data pipeline shuffling data from InfluxDB to PostgreSQL/CrateDB.
- Load a synthetic pandas DataFrame into InfluxDB.
- Export data to InfluxDB line protocol format (ILP).
- Read back a few samples worth of data from the ILP file.
https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/
"""
import gzip
import logging
from pathlib import Path

from influxio.io import dataframe_from_lineprotocol
from influxio.model import InfluxDbAdapter
from influxio.testdata import DataFrameFactory
from influxio.util.common import setup_logging

logger = logging.getLogger(__name__)


LINEPROTOCOL_FILE = Path("./var/export/demo.lp.gz")
DATASET_SIZE = 15_000


def main():
logger.info("Connecting to InfluxDB")
influx = InfluxDbAdapter(
url="http://localhost:8086",
org="example",
token="token", # noqa: S106
bucket="testdrive",
measurement="demo",
)

# Provision data to source database.
logger.info("Provisioning InfluxDB")
dff = DataFrameFactory(rows=DATASET_SIZE)
df = dff.make("dateindex")
influx.write_df(df)

# Export data into file using lineprotocol format.
logger.info("Exporting data to lineprotocol file (ILP)")
LINEPROTOCOL_FILE.parent.mkdir(parents=True, exist_ok=True)
influx.to_lineprotocol(engine_path="./var/lib/influxdb2/engine", output_path=LINEPROTOCOL_FILE)

logger.info("Reading back data from lineprotocol file")
with gzip.open(LINEPROTOCOL_FILE) as buffer:
df = dataframe_from_lineprotocol(buffer)
print(df) # noqa: T201

logger.info("Ready.")


if __name__ == "__main__":
setup_logging()
main()
58 changes: 58 additions & 0 deletions examples/export_sqlalchemy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""
Miniature data pipeline shuffling data from InfluxDB to PostgreSQL/CrateDB.
- Load a synthetic pandas DataFrame into InfluxDB.
- Transfer data from InfluxDB to RDBMS database using SQLAlchemy/pandas/Dask.
- Read back a few samples worth of data from the RDBMS database.
"""
import logging

import sqlalchemy as sa

from influxio.io import dataframe_to_sql
from influxio.model import InfluxDbAdapter
from influxio.testdata import DataFrameFactory
from influxio.util.common import jd, setup_logging

logger = logging.getLogger(__name__)

DBURI = "crate://localhost:4200"
DATASET_SIZE = 15_000


def main():
logger.info("Connecting to InfluxDB")
influx = InfluxDbAdapter(
url="http://localhost:8086",
org="example",
token="token", # noqa: S106
bucket="testdrive",
measurement="demo",
)

# Provision data to source database.
logger.info("Provisioning InfluxDB")
dff = DataFrameFactory(rows=DATASET_SIZE)
df = dff.make("dateindex")
influx.write_df(df)

# Transfer data.
logger.info("Transferring data")
for df in influx.read_df():
logger.info("Loading data frame into RDBMS/SQL database using pandas/Dask")
dataframe_to_sql(df, dburi=DBURI, tablename="demo", progress=True)

# Read back data from target database.
logger.info("Reading back data from the target database")
engine = sa.create_engine(DBURI)
with engine.connect() as connection:
connection.execute(sa.text("REFRESH TABLE demo;"))
result = connection.execute(sa.text("SELECT * FROM demo LIMIT 3;"))
records = [dict(item) for item in result.mappings().fetchall()]
jd(records)
logger.info("Ready.")


if __name__ == "__main__":
setup_logging()
main()
Loading

0 comments on commit 1042115

Please sign in to comment.