Skip to content

Commit

Permalink
Dask interface: Accept and forward the new if-exists query param
Browse files Browse the repository at this point in the history
The new `if-exists` query parameter will be forwarded to Dask's
`to_sql()` method. By default, `influxio` will use `fail`.

When targeting the SQLAlchemy database interface, the target table will
be created automatically, if it does not exist. The `if-exists` URL
query parameter can be used to configure this behavior. The default
value is `fail`.

* fail: Raise a ValueError.
* replace: Drop the table before inserting new values.
* append: Insert new values to the existing table.
  • Loading branch information
amotl committed Jun 23, 2024
1 parent fe3281e commit 5967722
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 10 deletions.
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ Changelog

in progress
===========
- Dask interface: Accept and forward the new ``if-exists`` URL query
parameter to Dask's ``to_sql()`` method.

2024-06-13 v0.3.1
=================
Expand Down
23 changes: 23 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,29 @@ keystrokes on subsequent invocations.
influxio copy "${SOURCE}" "${TARGET}"
Parameters
==========

``if-exists``
-------------
When targeting the SQLAlchemy database interface, the target table will be
created automatically, if it does not exist. The ``if-exists`` URL query
parameter can be used to configure this behavior. The default value is
``fail``.

* fail: Raise a ValueError.
* replace: Drop the table before inserting new values.
* append: Insert new values to the existing table.

Example usage:

.. code-block:: shell
influxio copy \
"http://example:token@localhost:8086/testdrive/demo" \
"crate://crate@localhost:4200/testdrive?table=demo&if-exists=replace"
*******************
Project information
*******************
Expand Down
2 changes: 1 addition & 1 deletion examples/export_sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def main():
logger.info("Transferring data")
for df in influx.read_df():
logger.info("Loading data frame into RDBMS/SQL database using pandas/Dask")
dataframe_to_sql(df, dburi=DBURI, tablename="demo", progress=True)
dataframe_to_sql(df, dburi=DBURI, tablename="demo", if_exists="replace", progress=True)

# Read back data from target database.
logger.info("Reading back data from the target database")
Expand Down
17 changes: 12 additions & 5 deletions influxio/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ def __init__(self, url: t.Union[URL, str], progress: bool = False, debug: bool =
if isinstance(url, str):
url: URL = URL(url)

self.database, self.table = SqlAlchemyAdapter.decode_database_table(url)
self.database, self.table = self.decode_database_table(url)
self.if_exists = url.query.get("if-exists")

# Special handling for SQLite and CrateDB databases.
self.dburi = str(url.with_query(None))
Expand Down Expand Up @@ -301,9 +302,13 @@ def write(self, source: t.Union[pd.DataFrame, InfluxDbApiAdapter]):
logger.info("Loading dataframes into RDBMS/SQL database using pandas/Dask")
if isinstance(source, InfluxDbApiAdapter):
for df in source.read_df():
dataframe_to_sql(df, dburi=self.dburi, tablename=self.table, progress=self.progress)
dataframe_to_sql(
df, dburi=self.dburi, tablename=self.table, if_exists=self.if_exists, progress=self.progress
)
elif isinstance(source, pd.DataFrame):
dataframe_to_sql(source, dburi=self.dburi, tablename=self.table, progress=self.progress)
dataframe_to_sql(

Check warning on line 309 in influxio/adapter.py

View check run for this annotation

Codecov / codecov/patch

influxio/adapter.py#L309

Added line #L309 was not covered by tests
source, dburi=self.dburi, tablename=self.table, if_exists=self.if_exists, progress=self.progress
)
else:
raise NotImplementedError(f"Failed handling source: {source}")

Expand All @@ -329,13 +334,15 @@ def create_database(self):
def run_sql(self, sql: str):
engine = sa.create_engine(self.dburi)
with engine.connect() as connection:
connection.connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
if hasattr(connection.connection, "set_isolation_level"):
connection.connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)

Check warning on line 338 in influxio/adapter.py

View check run for this annotation

Codecov / codecov/patch

influxio/adapter.py#L338

Added line #L338 was not covered by tests
return connection.execute(sa.text(sql))

def run_sql_raw(self, sql: str):
engine = sa.create_engine(self.dburi)
connection = engine.raw_connection()
connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
if hasattr(connection, "set_isolation_level"):
connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)

Check warning on line 345 in influxio/adapter.py

View check run for this annotation

Codecov / codecov/patch

influxio/adapter.py#L344-L345

Added lines #L344 - L345 were not covered by tests
cursor = connection.cursor()
cursor.execute(sql)
result = cursor.fetchall()
Expand Down
10 changes: 9 additions & 1 deletion influxio/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,26 @@ def dataframe_to_sql(
tablename: str,
index=False,
chunksize=None,
if_exists="replace",
if_exists="fail",
npartitions: int = None,
progress: bool = False,
):
"""
Load pandas dataframe into database using Dask.
https://stackoverflow.com/questions/62404502/using-dasks-new-to-sql-for-improved-efficiency-memory-speed-or-alternative-to
if_exists : {'fail', 'replace', 'append'}, default 'fail'
How to behave if the table already exists.
* fail: Raise a ValueError.
* replace: Drop the table before inserting new values.
* append: Insert new values to the existing table.
"""
import dask.dataframe as dd

# Set a few defaults.
if_exists = if_exists or "fail"
chunksize = chunksize or 5_000
npartitions = npartitions or int(os.cpu_count() / 2)

Expand Down
9 changes: 9 additions & 0 deletions tests/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ def test_cratedb_adapter_database_table():
assert adapter.database == "testdrive"
assert adapter.table == "basic"
assert adapter.dburi == "crate://localhost:4200/?schema=testdrive"
assert adapter.if_exists is None


def test_cratedb_adapter_if_exists():
adapter = SqlAlchemyAdapter.from_url("crate://localhost:4200/?database=testdrive&table=basic&if-exists=append")
assert adapter.database == "testdrive"
assert adapter.table == "basic"
assert adapter.dburi == "crate://localhost:4200/?schema=testdrive"
assert adapter.if_exists == "append"


def test_file_adapter_ilp_file():
Expand Down
80 changes: 77 additions & 3 deletions tests/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ def influxdb() -> InfluxDbApiAdapter:

@pytest.fixture
def cratedb() -> SqlAlchemyAdapter:
return SqlAlchemyAdapter.from_url(CRATEDB_URL)
adapter = SqlAlchemyAdapter.from_url(CRATEDB_URL)
adapter.run_sql("DROP TABLE IF EXISTS basic")
return adapter


@pytest.fixture
Expand Down Expand Up @@ -63,9 +65,9 @@ def provision_influxdb(influxdb, line_protocol_file_basic):
influxio.core.copy(source_url, target_url)


def test_export_cratedb(caplog, influxdb, provision_influxdb, cratedb):
def test_export_cratedb_default(caplog, influxdb, provision_influxdb, cratedb):
"""
Export data from InfluxDB to CrateDB.
Export data from InfluxDB to CrateDB, happy path.
"""

source_url = INFLUXDB_API_URL
Expand All @@ -84,6 +86,78 @@ def test_export_cratedb(caplog, influxdb, provision_influxdb, cratedb):
assert len(records) == 2


def test_export_cratedb_fail_if_target_exists(caplog, influxdb, provision_influxdb, cratedb):
"""
Exporting data from InfluxDB to CrateDB should fail if target table exists.
"""

source_url = INFLUXDB_API_URL
target_url = CRATEDB_URL

# Create a table that will cause the export process to fail.
cratedb.run_sql("CREATE TABLE basic (foo INT)")

# Transfer data.
with pytest.raises(ValueError) as ex:
influxio.core.copy(source_url, target_url)
ex.match("Table 'basic' already exists.")


def test_export_cratedb_if_exists_unknown(caplog, influxdb, provision_influxdb, cratedb):
"""
Exporting data from InfluxDB to CrateDB should fail if target table exists.
"""

source_url = INFLUXDB_API_URL
target_url = CRATEDB_URL + "?if-exists=Hotzenplotz"

# Create a table that will cause the export process to fail.
cratedb.run_sql("CREATE TABLE basic (foo INT)")

# Transfer data.
with pytest.raises(ValueError) as ex:
influxio.core.copy(source_url, target_url)
ex.match("'Hotzenplotz' is not valid for if_exists")


def test_export_cratedb_if_exists_replace(caplog, influxdb, provision_influxdb, cratedb):
"""
Exporting data from InfluxDB to CrateDB will succeed with ``if-exists=replace``.
"""

source_url = INFLUXDB_API_URL
target_url = CRATEDB_URL + "?if-exists=replace"

# Create a table that would cause the export process to fail.
cratedb.run_sql("CREATE TABLE basic (foo INT)")

# Transfer data.
influxio.core.copy(source_url, target_url)

# Verify number of records in target database.
cratedb.refresh_table()
records = cratedb.read_records()
assert len(records) == 2


def test_export_cratedb_if_exists_append(caplog, influxdb, provision_influxdb, cratedb):
"""
Exporting data from InfluxDB to CrateDB twice will succeed with ``if-exists=append``.
"""

source_url = INFLUXDB_API_URL
target_url = CRATEDB_URL + "?if-exists=append"

# Transfer data.
influxio.core.copy(source_url, target_url)
influxio.core.copy(source_url, target_url)

# Verify number of records in target database.
cratedb.refresh_table()
records = cratedb.read_records()
assert len(records) == 4


def test_export_postgresql(caplog, influxdb, provision_influxdb, postgresql):
"""
Export data from InfluxDB to PostgreSQL.
Expand Down

0 comments on commit 5967722

Please sign in to comment.