Skip to content

Commit

Permalink
fix(DS-463): fix motherduck connector (#377)
Browse files Browse the repository at this point in the history
* Fix duck base stager output file missing suffix

* Fix motherduck uploader

* version and changelog bump; motherduck fix

* DuckDB base stager unit test

* Fix imports
  • Loading branch information
mpolomdeepsense authored Feb 10, 2025
1 parent 667db4b commit 485c4dc
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 6 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
## 0.5.0-dev0
## 0.5.0-dev1

### Fixes

* **Fix Attribute Not Exist bug in GoogleDrive connector**
* **Fix query syntax error in MotherDuck uploader**
* **Fix missing output filename suffix in DuckDB base stager**

### Enhancements

Expand Down
Empty file.
74 changes: 74 additions & 0 deletions test/unit/v2/connectors/motherduck/test_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from pathlib import Path

import pytest
from pytest_mock import MockerFixture

from unstructured_ingest.v2.interfaces import FileData
from unstructured_ingest.v2.interfaces.file_data import SourceIdentifiers
from unstructured_ingest.v2.interfaces.upload_stager import UploadStagerConfig
from unstructured_ingest.v2.processes.connectors.duckdb.base import BaseDuckDBUploadStager


@pytest.fixture
def mock_instance() -> BaseDuckDBUploadStager:
return BaseDuckDBUploadStager(UploadStagerConfig())


@pytest.mark.parametrize(
("input_filepath", "output_filename", "expected"),
[
(
"/path/to/input_file.ndjson",
"output_file.ndjson",
"output_file.ndjson",
),
("input_file.txt", "output_file.json", "output_file.txt"),
("/path/to/input_file.json", "output_file", "output_file.json"),
],
)
def test_run_output_filename_suffix(
mocker: MockerFixture,
mock_instance: BaseDuckDBUploadStager,
input_filepath: str,
output_filename: str,
expected: str,
):
output_dir = Path("/tmp/test/output_dir")

# Mocks
mock_get_data = mocker.patch(
"unstructured_ingest.v2.processes.connectors.duckdb.base.get_data",
return_value=[{"key": "value"}, {"key": "value2"}],
)
mock_conform_dict = mocker.patch.object(
BaseDuckDBUploadStager,
"conform_dict",
side_effect=lambda element_dict, file_data: element_dict,
)
mock_get_output_path = mocker.patch.object(
BaseDuckDBUploadStager, "get_output_path", return_value=output_dir / expected
)
mock_write_output = mocker.patch(
"unstructured_ingest.v2.processes.connectors.duckdb.base.write_data", return_value=None
)

# Act
result = mock_instance.run(
elements_filepath=Path(input_filepath),
file_data=FileData(
identifier="test",
connector_type="test",
source_identifiers=SourceIdentifiers(filename=input_filepath, fullpath=input_filepath),
),
output_dir=output_dir,
output_filename=output_filename,
)

# Assert
mock_get_data.assert_called_once_with(path=Path(input_filepath))
assert mock_conform_dict.call_count == 2
mock_get_output_path.assert_called_once_with(output_filename=expected, output_dir=output_dir)
mock_write_output.assert_called_once_with(
path=output_dir / expected, data=[{"key": "value"}, {"key": "value2"}]
)
assert result.name == expected
2 changes: 1 addition & 1 deletion unstructured_ingest/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.5.0-dev0" # pragma: no cover
__version__ = "0.5.0-dev1" # pragma: no cover
2 changes: 2 additions & 0 deletions unstructured_ingest/v2/processes/connectors/duckdb/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ def run(
**kwargs: Any,
) -> Path:
elements_contents = get_data(path=elements_filepath)
output_filename_suffix = Path(elements_filepath).suffix
output_filename = f"{Path(output_filename).stem}{output_filename_suffix}"
output_path = self.get_output_path(output_filename=output_filename, output_dir=output_dir)

output = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def get_client(self) -> Generator["MotherDuckConnection", None, None]:
"custom_user_agent": f"unstructured-io-ingest/{unstructured_io_ingest_version}"
},
) as conn:
conn.sql(f"USE {self.database}")
conn.sql(f'USE "{self.database}"')
yield conn

@contextmanager
Expand Down Expand Up @@ -102,11 +102,12 @@ def precheck(self) -> None:

def upload_dataframe(self, df: pd.DataFrame) -> None:
logger.debug(f"uploading {len(df)} entries to {self.connection_config.database} ")
database = self.connection_config.database
db_schema = self.connection_config.db_schema
table = self.connection_config.table

with self.connection_config.get_client() as conn:
conn.query(
f"INSERT INTO {self.connection_config.db_schema}.{self.connection_config.table} BY NAME SELECT * FROM df" # noqa: E501
)
conn.query(f'INSERT INTO "{database}"."{db_schema}"."{table}" BY NAME SELECT * FROM df')

def run_data(self, data: list[dict], file_data: FileData, **kwargs: Any) -> None:
df = pd.DataFrame(data=data)
Expand Down

0 comments on commit 485c4dc

Please sign in to comment.