Skip to content

Commit

Permalink
store filesystem tables in correct dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Sep 20, 2024
1 parent 1d935d4 commit d6f25ef
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 5 deletions.
5 changes: 4 additions & 1 deletion dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,9 @@ def sql_client(self) -> SqlClientBase[Any]:
from dlt.destinations.impl.filesystem.sql_client import FilesystemSqlClient

if not self._sql_client:
self._sql_client = FilesystemSqlClient(self, protocol=self.config.protocol)
self._sql_client = FilesystemSqlClient(
self, protocol=self.config.protocol, dataset_name=self.dataset_name
)
return self._sql_client

@sql_client.setter
Expand Down Expand Up @@ -705,6 +707,7 @@ def create_table_chain_completed_followup_jobs(
def table_relation(
self, *, table: str, columns: TTableSchemaColumns
) -> Generator[DBApiCursor, Any, Any]:
table = self.sql_client.make_qualified_table_name(table)
with self.sql_client.execute_query(f"SELECT * FROM {table}") as cursor:
cursor.columns = columns
yield cursor
Expand Down
12 changes: 8 additions & 4 deletions dlt/destinations/impl/filesystem/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,25 @@


class FilesystemSqlClient(DuckDbSqlClient):
def __init__(self, fs_client: FSClientBase, protocol: str) -> None:
def __init__(self, fs_client: FSClientBase, protocol: str, dataset_name: str) -> None:
"""For now we do all operations in the memory dataset"""
"""TODO: is this ok?"""
super().__init__(
dataset_name="memory",
dataset_name=dataset_name,
staging_dataset_name=None,
credentials=None,
capabilities=duckdb_factory()._raw_capabilities(),
)
self.fs_client = fs_client
self._conn = duckdb.connect(":memory:")
self._conn.register_filesystem(self.fs_client.fs_client)
self.existing_views: List[str] = [] # remember which views already where created
self.protocol = protocol
self.is_local_filesystem = protocol == "file"

# set up duckdb instance
self._conn = duckdb.connect(":memory:")
self._conn.sql(f"CREATE SCHEMA {self.dataset_name}")
self._conn.register_filesystem(self.fs_client.fs_client)

@raise_database_error
def populate_duckdb(self, tables: List[str]) -> None:
"""Add the required tables as views to the duckdb in memory instance"""
Expand All @@ -60,6 +63,7 @@ def populate_duckdb(self, tables: List[str]) -> None:
# create table
protocol = "" if self.is_local_filesystem else f"{self.protocol}://"
files_string = f"'{protocol}{folder}/**/*.{file_type}'"
ptable = self.make_qualified_table_name(ptable)
create_table_sql_base = (
f"CREATE VIEW {ptable} AS SELECT * FROM {read_command}([{files_string}])"
)
Expand Down

0 comments on commit d6f25ef

Please sign in to comment.