Skip to content

Commit

Permalink
Remove storage from dataset query and refactor related codebase (#367)
Browse files Browse the repository at this point in the history
* first version of from_storage without deprecated listing

* first version of from_storage without deprecated listing

* fixing tests and removing prints, refactoring

* refactoring listing static methods

* fixing non recursive queries

* using ctc in test session

* fixing json

* fixing windows tests

* returning to all tests

* added session on cloud test catalog and refactoring tests

* refactoring and fixing tests

* fixing apply_udf and its tests

* refactoring tests and related codebase

* first version of from_storage without deprecated listing

* first version of from_storage without deprecated listing

* fixing tests and removing prints, refactoring

* refactoring listing static methods

* fixing non recursive queries

* using ctc in test session

* fixing json

* removed not needed catalog storage methods and their related codebase

* fixing windows tests

* returning to all tests

* fixing dataset dependencies

* added session on cloud test catalog and refactoring tests

* added dataset name dependencies test and fixes

* small refactoring

* fixing bug

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fixing tests

* fixing tests

* removed duplicate function

* removed not needed method

* fix conftest

* refactoring tests

* removed catalog index from test

* added print

* fixing tests

* added verbose tests

* fixing test

* fixing test

* removing apply udf

* remove parallel

* removed -vvv from tests

* removing not used functions and related tests

* returned skipped test, returned output assert, fix print in CLI

* returned dir expansion test

* removing not needed method and added one test

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
ilongin and pre-commit-ci[bot] authored Sep 17, 2024
1 parent 2c8cefc commit 0abafcd
Show file tree
Hide file tree
Showing 21 changed files with 370 additions and 688 deletions.
50 changes: 13 additions & 37 deletions src/datachain/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@
DataChainDir,
batched,
datachain_paths_join,
import_object,
parse_params_string,
)

from .datasource import DataSource
Expand Down Expand Up @@ -843,7 +841,7 @@ def enlist_sources_grouped(
from datachain.query import DatasetQuery

def _row_to_node(d: dict[str, Any]) -> Node:
del d["source"]
del d["file__source"]
return Node.from_dict(d)

enlisted_sources: list[tuple[bool, bool, Any]] = []
Expand Down Expand Up @@ -1148,30 +1146,28 @@ def create_dataset_from_sources(
if not sources:
raise ValueError("Sources needs to be non empty list")

from datachain.query import DatasetQuery
from datachain.lib.dc import DataChain
from datachain.query.session import Session

session = Session.get(catalog=self, client_config=client_config)

dataset_queries = []
chains = []
for source in sources:
if source.startswith(DATASET_PREFIX):
dq = DatasetQuery(
name=source[len(DATASET_PREFIX) :],
catalog=self,
client_config=client_config,
dc = DataChain.from_dataset(
source[len(DATASET_PREFIX) :], session=session
)
else:
dq = DatasetQuery(
path=source,
catalog=self,
client_config=client_config,
recursive=recursive,
dc = DataChain.from_storage(
source, session=session, recursive=recursive
)

dataset_queries.append(dq)
chains.append(dc)

# create union of all dataset queries created from sources
dq = reduce(lambda ds1, ds2: ds1.union(ds2), dataset_queries)
dc = reduce(lambda dc1, dc2: dc1.union(dc2), chains)
try:
dq.save(name)
dc.save(name)
except Exception as e: # noqa: BLE001
try:
ds = self.get_dataset(name)
Expand Down Expand Up @@ -1731,26 +1727,6 @@ def clone(
output, sources, client_config=client_config, recursive=recursive
)

def apply_udf(
self,
udf_location: str,
source: str,
target_name: str,
parallel: Optional[int] = None,
params: Optional[str] = None,
):
from datachain.query import DatasetQuery

if source.startswith(DATASET_PREFIX):
ds = DatasetQuery(name=source[len(DATASET_PREFIX) :], catalog=self)
else:
ds = DatasetQuery(path=source, catalog=self)
udf = import_object(udf_location)
if params:
args, kwargs = parse_params_string(params)
udf = udf(*args, **kwargs)
ds.add_signals(udf, parallel=parallel).save(target_name)

def query(
self,
query_script: str,
Expand Down
25 changes: 0 additions & 25 deletions src/datachain/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,27 +494,6 @@ def get_parser() -> ArgumentParser: # noqa: PLR0915
help="Query parameters",
)

apply_udf_parser = subp.add_parser(
"apply-udf", parents=[parent_parser], description="Apply UDF"
)
apply_udf_parser.add_argument("udf", type=str, help="UDF location")
apply_udf_parser.add_argument("source", type=str, help="Source storage or dataset")
apply_udf_parser.add_argument("target", type=str, help="Target dataset name")
apply_udf_parser.add_argument(
"--parallel",
nargs="?",
type=int,
const=-1,
default=None,
metavar="N",
help=(
"Use multiprocessing to run the UDF with N worker processes. "
"N defaults to the CPU count."
),
)
apply_udf_parser.add_argument(
"--udf-params", type=str, default=None, help="UDF class parameters"
)
subp.add_parser(
"clear-cache", parents=[parent_parser], description="Clear the local file cache"
)
Expand Down Expand Up @@ -1016,10 +995,6 @@ def main(argv: Optional[list[str]] = None) -> int: # noqa: C901, PLR0912, PLR09
parallel=args.parallel,
params=args.param,
)
elif args.command == "apply-udf":
catalog.apply_udf(
args.udf, args.source, args.target, args.parallel, args.udf_params
)
elif args.command == "clear-cache":
clear_cache(catalog)
elif args.command == "gc":
Expand Down
73 changes: 7 additions & 66 deletions src/datachain/data_storage/metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,39 +297,6 @@ def update_dataset_status(
#
# Dataset dependencies
#

def add_dependency(
self,
dependency: DatasetDependency,
source_dataset_name: str,
source_dataset_version: int,
) -> None:
"""Add dependency to dataset or storage."""
if dependency.is_dataset:
self.add_dataset_dependency(
source_dataset_name,
source_dataset_version,
dependency.dataset_name,
int(dependency.version),
)
else:
self.add_storage_dependency(
source_dataset_name,
source_dataset_version,
StorageURI(dependency.name),
dependency.version,
)

@abstractmethod
def add_storage_dependency(
self,
source_dataset_name: str,
source_dataset_version: int,
storage_uri: StorageURI,
storage_timestamp_str: Optional[str] = None,
) -> None:
"""Adds storage dependency to dataset."""

@abstractmethod
def add_dataset_dependency(
self,
Expand Down Expand Up @@ -1268,32 +1235,6 @@ def update_dataset_status(
#
# Dataset dependencies
#

def _insert_dataset_dependency(self, data: dict[str, Any]) -> None:
"""Method for inserting dependencies."""
self.db.execute(self._datasets_dependencies_insert().values(**data))

def add_storage_dependency(
self,
source_dataset_name: str,
source_dataset_version: int,
storage_uri: StorageURI,
storage_timestamp_str: Optional[str] = None,
) -> None:
source_dataset = self.get_dataset(source_dataset_name)
storage = self.get_storage(storage_uri)

self._insert_dataset_dependency(
{
"source_dataset_id": source_dataset.id,
"source_dataset_version_id": (
source_dataset.get_version(source_dataset_version).id
),
"bucket_id": storage.id,
"bucket_version": storage_timestamp_str,
}
)

def add_dataset_dependency(
self,
source_dataset_name: str,
Expand All @@ -1305,15 +1246,15 @@ def add_dataset_dependency(
source_dataset = self.get_dataset(source_dataset_name)
dataset = self.get_dataset(dataset_name)

self._insert_dataset_dependency(
{
"source_dataset_id": source_dataset.id,
"source_dataset_version_id": (
self.db.execute(
self._datasets_dependencies_insert().values(
source_dataset_id=source_dataset.id,
source_dataset_version_id=(
source_dataset.get_version(source_dataset_version).id
),
"dataset_id": dataset.id,
"dataset_version_id": dataset.get_version(dataset_version).id,
}
dataset_id=dataset.id,
dataset_version_id=dataset.get_version(dataset_version).id,
)
)

def update_dataset_dependency_source(
Expand Down
7 changes: 5 additions & 2 deletions src/datachain/data_storage/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,11 +651,14 @@ def get_dataset_sources(
self, dataset: DatasetRecord, version: int
) -> list[StorageURI]:
dr = self.dataset_rows(dataset, version)
query = dr.select(dr.c.source).distinct()
query = dr.select(dr.c.file__source).distinct()
cur = self.db.cursor()
cur.row_factory = sqlite3.Row # type: ignore[assignment]

return [StorageURI(row["source"]) for row in self.db.execute(query, cursor=cur)]
return [
StorageURI(row["file__source"])
for row in self.db.execute(query, cursor=cur)
]

def merge_dataset_rows(
self,
Expand Down
22 changes: 0 additions & 22 deletions src/datachain/data_storage/warehouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -942,28 +942,6 @@ def cleanup_tables(self, names: Iterable[str]) -> None:
self.db.drop_table(Table(name, self.db.metadata), if_exists=True)
pbar.update(1)

def changed_query(
self,
source_query: sa.sql.selectable.Select,
target_query: sa.sql.selectable.Select,
) -> sa.sql.selectable.Select:
sq = source_query.alias("source_query")
tq = target_query.alias("target_query")

source_target_join = sa.join(
sq, tq, (sq.c.source == tq.c.source) & (sq.c.path == tq.c.path)
)

return (
select(*sq.c)
.select_from(source_target_join)
.where(
(sq.c.last_modified > tq.c.last_modified)
& (sq.c.is_latest == true())
& (tq.c.is_latest == true())
)
)


def _random_string(length: int) -> str:
return "".join(
Expand Down
3 changes: 1 addition & 2 deletions src/datachain/lib/dc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1337,8 +1337,7 @@ def subtract( # type: ignore[override]
other.signals_schema.resolve(*right_on).db_signals(),
) # type: ignore[arg-type]
)

return super()._subtract(other, signals) # type: ignore[arg-type]
return super().subtract(other, signals) # type: ignore[arg-type]

@classmethod
def from_values(
Expand Down
1 change: 1 addition & 0 deletions src/datachain/lib/listing.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def parse_listing_uri(uri: str, cache, client_config) -> tuple[str, str, str]:
"""
Parsing uri and returns listing dataset name, listing uri and listing path
"""
client_config = client_config or {}
client = Client.get_client(uri, cache, **client_config)
storage_uri, path = Client.parse_url(uri)

Expand Down
20 changes: 17 additions & 3 deletions src/datachain/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,23 @@ def as_uid(self, storage: Optional[StorageURI] = None) -> UniqueId:
)

@classmethod
def from_dict(cls, d: dict[str, Any]) -> "Self":
kw = {f.name: d[f.name] for f in attrs.fields(cls) if f.name in d}
return cls(**kw)
def from_dict(cls, d: dict[str, Any], file_prefix: str = "file") -> "Self":
def _dval(field_name: str):
return d.get(f"{file_prefix}__{field_name}")

return cls(
sys__id=d["sys__id"],
sys__rand=d["sys__rand"],
source=_dval("source"),
path=_dval("path"),
etag=_dval("etag"),
is_latest=_dval("is_latest"),
size=_dval("size"),
last_modified=_dval("last_modified"),
version=_dval("version"),
location=_dval("location"),
dir_type=DirType.FILE,
)

@classmethod
def from_dir(cls, path, **kwargs) -> "Node":
Expand Down
Loading

0 comments on commit 0abafcd

Please sign in to comment.