Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask interface: Accept and forward the new if-exists query parameter #129

Merged
merged 1 commit into from
Jun 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ Changelog

in progress
===========
- Dask interface: Accept and forward the new ``if-exists`` URL query
parameter to Dask's ``to_sql()`` method.

2024-06-13 v0.3.1
=================
Expand Down
23 changes: 23 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,29 @@ keystrokes on subsequent invocations.
influxio copy "${SOURCE}" "${TARGET}"


Parameters
==========

``if-exists``
-------------
When targeting the SQLAlchemy database interface, the target table will be
created automatically, if it does not exist. The ``if-exists`` URL query
parameter can be used to configure this behavior. The default value is
``fail``.

* fail: Raise a ValueError.
* replace: Drop the table before inserting new values.
* append: Insert new values to the existing table.

Example usage:

.. code-block:: shell

influxio copy \
"http://example:token@localhost:8086/testdrive/demo" \
"crate://crate@localhost:4200/testdrive?table=demo&if-exists=replace"


*******************
Project information
*******************
Expand Down
2 changes: 1 addition & 1 deletion examples/export_sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def main():
logger.info("Transferring data")
for df in influx.read_df():
logger.info("Loading data frame into RDBMS/SQL database using pandas/Dask")
dataframe_to_sql(df, dburi=DBURI, tablename="demo", progress=True)
dataframe_to_sql(df, dburi=DBURI, tablename="demo", if_exists="replace", progress=True)

# Read back data from target database.
logger.info("Reading back data from the target database")
Expand Down
17 changes: 12 additions & 5 deletions influxio/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@
if isinstance(url, str):
url: URL = URL(url)

self.database, self.table = SqlAlchemyAdapter.decode_database_table(url)
self.database, self.table = self.decode_database_table(url)
self.if_exists = url.query.get("if-exists")

# Special handling for SQLite and CrateDB databases.
self.dburi = str(url.with_query(None))
Expand Down Expand Up @@ -301,9 +302,13 @@
logger.info("Loading dataframes into RDBMS/SQL database using pandas/Dask")
if isinstance(source, InfluxDbApiAdapter):
for df in source.read_df():
dataframe_to_sql(df, dburi=self.dburi, tablename=self.table, progress=self.progress)
dataframe_to_sql(
df, dburi=self.dburi, tablename=self.table, if_exists=self.if_exists, progress=self.progress
)
elif isinstance(source, pd.DataFrame):
dataframe_to_sql(source, dburi=self.dburi, tablename=self.table, progress=self.progress)
dataframe_to_sql(

Check warning on line 309 in influxio/adapter.py

View check run for this annotation

Codecov / codecov/patch

influxio/adapter.py#L309

Added line #L309 was not covered by tests
source, dburi=self.dburi, tablename=self.table, if_exists=self.if_exists, progress=self.progress
)
else:
raise NotImplementedError(f"Failed handling source: {source}")

Expand All @@ -329,13 +334,15 @@
def run_sql(self, sql: str):
engine = sa.create_engine(self.dburi)
with engine.connect() as connection:
connection.connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
if hasattr(connection.connection, "set_isolation_level"):
connection.connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)

Check warning on line 338 in influxio/adapter.py

View check run for this annotation

Codecov / codecov/patch

influxio/adapter.py#L338

Added line #L338 was not covered by tests
return connection.execute(sa.text(sql))

def run_sql_raw(self, sql: str):
engine = sa.create_engine(self.dburi)
connection = engine.raw_connection()
connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
if hasattr(connection, "set_isolation_level"):
connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)

Check warning on line 345 in influxio/adapter.py

View check run for this annotation

Codecov / codecov/patch

influxio/adapter.py#L344-L345

Added lines #L344 - L345 were not covered by tests
cursor = connection.cursor()
cursor.execute(sql)
result = cursor.fetchall()
Expand Down
10 changes: 9 additions & 1 deletion influxio/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,26 @@ def dataframe_to_sql(
tablename: str,
index=False,
chunksize=None,
if_exists="replace",
if_exists="fail",
npartitions: int = None,
progress: bool = False,
):
"""
Load pandas dataframe into database using Dask.

https://stackoverflow.com/questions/62404502/using-dasks-new-to-sql-for-improved-efficiency-memory-speed-or-alternative-to

if_exists : {'fail', 'replace', 'append'}, default 'fail'
How to behave if the table already exists.

* fail: Raise a ValueError.
* replace: Drop the table before inserting new values.
* append: Insert new values to the existing table.
"""
import dask.dataframe as dd

# Set a few defaults.
if_exists = if_exists or "fail"
chunksize = chunksize or 5_000
npartitions = npartitions or int(os.cpu_count() / 2)

Expand Down
9 changes: 9 additions & 0 deletions tests/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ def test_cratedb_adapter_database_table():
assert adapter.database == "testdrive"
assert adapter.table == "basic"
assert adapter.dburi == "crate://localhost:4200/?schema=testdrive"
assert adapter.if_exists is None


def test_cratedb_adapter_if_exists():
adapter = SqlAlchemyAdapter.from_url("crate://localhost:4200/?database=testdrive&table=basic&if-exists=append")
assert adapter.database == "testdrive"
assert adapter.table == "basic"
assert adapter.dburi == "crate://localhost:4200/?schema=testdrive"
assert adapter.if_exists == "append"


def test_file_adapter_ilp_file():
Expand Down
80 changes: 77 additions & 3 deletions tests/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ def influxdb() -> InfluxDbApiAdapter:

@pytest.fixture
def cratedb() -> SqlAlchemyAdapter:
return SqlAlchemyAdapter.from_url(CRATEDB_URL)
adapter = SqlAlchemyAdapter.from_url(CRATEDB_URL)
adapter.run_sql("DROP TABLE IF EXISTS basic")
return adapter


@pytest.fixture
Expand Down Expand Up @@ -63,9 +65,9 @@ def provision_influxdb(influxdb, line_protocol_file_basic):
influxio.core.copy(source_url, target_url)


def test_export_cratedb(caplog, influxdb, provision_influxdb, cratedb):
def test_export_cratedb_default(caplog, influxdb, provision_influxdb, cratedb):
"""
Export data from InfluxDB to CrateDB.
Export data from InfluxDB to CrateDB, happy path.
"""

source_url = INFLUXDB_API_URL
Expand All @@ -84,6 +86,78 @@ def test_export_cratedb(caplog, influxdb, provision_influxdb, cratedb):
assert len(records) == 2


def test_export_cratedb_fail_if_target_exists(caplog, influxdb, provision_influxdb, cratedb):
"""
Exporting data from InfluxDB to CrateDB should fail if target table exists.
"""

source_url = INFLUXDB_API_URL
target_url = CRATEDB_URL

# Create a table that will cause the export process to fail.
cratedb.run_sql("CREATE TABLE basic (foo INT)")

# Transfer data.
with pytest.raises(ValueError) as ex:
influxio.core.copy(source_url, target_url)
ex.match("Table 'basic' already exists.")


def test_export_cratedb_if_exists_unknown(caplog, influxdb, provision_influxdb, cratedb):
"""
Exporting data from InfluxDB to CrateDB should fail if target table exists.
"""

source_url = INFLUXDB_API_URL
target_url = CRATEDB_URL + "?if-exists=Hotzenplotz"

# Create a table that will cause the export process to fail.
cratedb.run_sql("CREATE TABLE basic (foo INT)")

# Transfer data.
with pytest.raises(ValueError) as ex:
influxio.core.copy(source_url, target_url)
ex.match("'Hotzenplotz' is not valid for if_exists")


def test_export_cratedb_if_exists_replace(caplog, influxdb, provision_influxdb, cratedb):
"""
Exporting data from InfluxDB to CrateDB will succeed with ``if-exists=replace``.
"""

source_url = INFLUXDB_API_URL
target_url = CRATEDB_URL + "?if-exists=replace"

# Create a table that would cause the export process to fail.
cratedb.run_sql("CREATE TABLE basic (foo INT)")

# Transfer data.
influxio.core.copy(source_url, target_url)

# Verify number of records in target database.
cratedb.refresh_table()
records = cratedb.read_records()
assert len(records) == 2


def test_export_cratedb_if_exists_append(caplog, influxdb, provision_influxdb, cratedb):
"""
Exporting data from InfluxDB to CrateDB twice will succeed with ``if-exists=append``.
"""

source_url = INFLUXDB_API_URL
target_url = CRATEDB_URL + "?if-exists=append"

# Transfer data.
influxio.core.copy(source_url, target_url)
influxio.core.copy(source_url, target_url)

# Verify number of records in target database.
cratedb.refresh_table()
records = cratedb.read_records()
assert len(records) == 4


def test_export_postgresql(caplog, influxdb, provision_influxdb, postgresql):
"""
Export data from InfluxDB to PostgreSQL.
Expand Down