Skip to content

Commit

Permalink
Merge pull request datalad#370 from candleindark/cte
Browse files Browse the repository at this point in the history
Replace temporary table with CTE in stats calculation
  • Loading branch information
yarikoptic authored Jun 12, 2024
2 parents 3b2445a + 897cb34 commit f4aed5d
Showing 1 changed file with 89 additions and 100 deletions.
189 changes: 89 additions & 100 deletions datalad_registry/blueprints/api/dataset_urls/tools.py
Original file line number Diff line number Diff line change
@@ -1,97 +1,67 @@
from sqlalchemy import (
CTE,
ScalarSelect,
Select,
Subquery,
TableClause,
and_,
column,
func,
not_,
or_,
select,
table,
text,
)

from datalad_registry.models import RepoUrl, db

from .models import (
AnnexDsCollectionStats,
CollectionStats,
DataladDsCollectionStats,
NonAnnexDsCollectionStats,
StatsSummary,
)

from .models import CollectionStats

def cache_result_to_tmp_tb(select_stmt: Select, tb_name: str) -> TableClause:
"""
Execute the given select statement and cache the result to a temporary table
with the given name

:param select_stmt: The given select statement to execute
:param tb_name: The string to use as the name of the temporary table
:return: A object representing the temporary table
Note: The execution of this function requires the Flask app's context
"""
create_tmp_tb_sql = f"""
CREATE TEMPORARY TABLE {tb_name} AS
{select_stmt.compile(bind=db.engine, compile_kwargs={'literal_binds': True})};
"""
db.session.execute(text(create_tmp_tb_sql))

return table(
tb_name,
*(column(name, c.type) for name, c in select_stmt.selected_columns.items()),
)


def _get_annex_ds_collection_stats(q: Subquery) -> AnnexDsCollectionStats:
def _get_annex_ds_collection_stats(q: Subquery) -> ScalarSelect:
"""
Get the stats of a collection of datasets that contains only of annex datasets
:param q: The query that specifies the collection of datasets under consideration
:return: The object representing the stats
:return: The scalar selectable for obtaining the stats
Note: The execution of this function requires the Flask app's context
"""

ds_count, annexed_files_size, annexed_file_count = db.session.execute(
return (
select(
func.count().label("ds_count"),
func.sum(q.c.annexed_files_in_wt_size).label("annexed_files_size"),
func.sum(q.c.annexed_files_in_wt_count).label("annexed_file_count"),
).select_from(q)
).one()

return AnnexDsCollectionStats(
ds_count=ds_count,
annexed_files_size=annexed_files_size,
annexed_file_count=annexed_file_count,
func.jsonb_build_object(
"ds_count",
func.count(),
"annexed_files_size",
func.sum(q.c.annexed_files_in_wt_size),
"annexed_file_count",
func.sum(q.c.annexed_files_in_wt_count),
).label("annex_ds_collection_stats")
)
.select_from(q)
.scalar_subquery()
)


def get_unique_dl_ds_collection_stats(base_q: Subquery) -> AnnexDsCollectionStats:
def get_unique_dl_ds_collection_stats(base_cte: CTE) -> ScalarSelect:
"""
Get the stats of the subset of the collection of datasets that contains only
of Datalad datasets, considering datasets with the same `ds_id` as the same
dataset
:param base_q: The base query that specified the collection of datasets
under consideration
:return: The object representing the stats
:param base_cte: The base CTE that specified the collection of datasets
under consideration
:return: The scalar selectable for obtaining the stats
Note: The execution of this function requires the Flask app's context
"""

grp_by_id_q = (
select(
base_q.c.ds_id,
func.max(base_q.c.annexed_files_in_wt_size).label(
base_cte.c.ds_id,
func.max(base_cte.c.annexed_files_in_wt_size).label(
"max_annexed_files_in_wt_size"
),
)
.group_by(base_q.c.ds_id)
.group_by(base_cte.c.ds_id)
.subquery("grp_by_id_q")
)

Expand Down Expand Up @@ -121,86 +91,96 @@ def get_unique_dl_ds_collection_stats(base_q: Subquery) -> AnnexDsCollectionStat
return _get_annex_ds_collection_stats(grp_by_id_and_a_f_size_q)


def get_dl_ds_collection_stats_with_dups(base_q: Subquery) -> AnnexDsCollectionStats:
def get_dl_ds_collection_stats_with_dups(base_cte: CTE) -> ScalarSelect:
"""
Get the stats of the subset of the collection of datasets that contains only
of Datalad datasets, considering individual repos as a dataset regardless of
the value of `ds_id`.
:param base_q: The base query that specified the collection of datasets
under consideration
:return: The object representing the stats
:param base_cte: The base CTE that specified the collection of datasets
under consideration
:return: The scalar selectable for obtaining the stats
Note: The execution of this function requires the Flask app's context
"""

# Select statement for getting all the Datalad datasets
dl_ds_q = select(base_q).filter(base_q.c.ds_id.is_not(None)).subquery("dl_ds_q")
dl_ds_q = select(base_cte).filter(base_cte.c.ds_id.is_not(None)).subquery("dl_ds_q")

return _get_annex_ds_collection_stats(dl_ds_q)


def get_dl_ds_collection_stats(base_q: Subquery) -> DataladDsCollectionStats:
def get_dl_ds_collection_stats(base_cte: CTE) -> ScalarSelect:
"""
Get the stats of the subset of the collection of datasets that contains only
of Datalad datasets
:param base_q: The base query that specified the collection of datasets
under consideration
:return: The object representing the stats
:param base_cte: The base CTE that specified the collection of datasets
under consideration
:return: The scalar selectable for obtaining the stats
Note: The execution of this function requires the Flask app's context
"""

return DataladDsCollectionStats(
unique_ds_stats=get_unique_dl_ds_collection_stats(base_q),
stats=get_dl_ds_collection_stats_with_dups(base_q),
)
return select(
func.jsonb_build_object(
"unique_ds_stats",
get_unique_dl_ds_collection_stats(base_cte),
"stats",
get_dl_ds_collection_stats_with_dups(base_cte),
).label("datalad_ds_collection_stats")
).scalar_subquery()


def get_pure_annex_ds_collection_stats(base_q: Subquery) -> AnnexDsCollectionStats:
def get_pure_annex_ds_collection_stats(base_cte: CTE) -> ScalarSelect:
"""
Get the stats of the subset of the collection of datasets that contains only
of pure annex datasets, the annex datasets that are not Datalad datasets
:param base_q: The base query that specified the collection of datasets
under consideration
:return: The object representing the stats
:param base_cte: The base CTE that specified the collection of datasets
under consideration
:return: The scalar selectable for obtaining the stats
Note: The execution of this function requires the Flask app's context
"""
# Select statement for getting all the pure annex datasets
pure_annex_ds_q = (
select(base_q)
.filter(and_(base_q.c.branches.has_key("git-annex"), base_q.c.ds_id.is_(None)))
select(base_cte)
.filter(
and_(base_cte.c.branches.has_key("git-annex"), base_cte.c.ds_id.is_(None))
)
.subquery("pure_annex_ds_q")
)

return _get_annex_ds_collection_stats(pure_annex_ds_q)


def get_non_annex_ds_collection_stats(base_q: Subquery) -> NonAnnexDsCollectionStats:
def get_non_annex_ds_collection_stats(base_cte: CTE) -> ScalarSelect:
"""
Get the stats of the subset of the collection of datasets that contains only
of non-annex datasets
:param base_q: The base query that specified the collection of datasets
under consideration
:return: The object representing the stats
:param base_cte: The base CTE that specified the collection of datasets
under consideration
:return: The scalar selectable for obtaining the stats
Note: The execution of this function requires the Flask app's context
"""
# Select statement for getting all the non-annex datasets
non_annex_ds_q = (
select(base_q)
.filter(not_(base_q.c.branches.has_key("git-annex")))
select(base_cte)
.filter(not_(base_cte.c.branches.has_key("git-annex")))
.subquery("non_annex_ds_q")
)

return NonAnnexDsCollectionStats(
ds_count=db.session.execute(
select(func.count().label("ds_count")).select_from(non_annex_ds_q)
).scalar_one()
return (
select(
func.jsonb_build_object("ds_count", func.count()).label(
"non_annex_ds_collection_stats"
)
)
.select_from(non_annex_ds_q)
.scalar_subquery()
)


Expand All @@ -215,24 +195,33 @@ def get_collection_stats(select_stmt: Select) -> CollectionStats:
Note: The execution of this function requires the Flask app's context
"""

# Cache the result of the select statement to a temporary table
tmp_tb = cache_result_to_tmp_tb(select_stmt, "tmp_tb")
base_cte = select_stmt.cte("base_cte")

# base_q = select_stmt.subquery("base_q")
base_q = select(tmp_tb).subquery("base_q")

datalad_ds_stats = get_dl_ds_collection_stats(base_q)
datalad_ds_stats_scalar_subq = get_dl_ds_collection_stats(base_cte)

# Total number of datasets, as individual repos, without any deduplication
ds_count = db.session.execute(
select(func.count().label("ds_count")).select_from(base_q)
).scalar_one()

return CollectionStats(
datalad_ds_stats=datalad_ds_stats,
pure_annex_ds_stats=get_pure_annex_ds_collection_stats(base_q),
non_annex_ds_stats=get_non_annex_ds_collection_stats(base_q),
summary=StatsSummary(
unique_ds_count=datalad_ds_stats.unique_ds_stats.ds_count, ds_count=ds_count
),
ds_count_scalar_subq = select(func.count()).select_from(base_cte).scalar_subquery()

return CollectionStats.parse_obj(
db.session.execute(
select(
func.jsonb_build_object(
"datalad_ds_stats",
datalad_ds_stats_scalar_subq,
"pure_annex_ds_stats",
get_pure_annex_ds_collection_stats(base_cte),
"non_annex_ds_stats",
get_non_annex_ds_collection_stats(base_cte),
"summary",
func.jsonb_build_object(
"unique_ds_count",
func.jsonb_extract_path(
datalad_ds_stats_scalar_subq, "unique_ds_stats", "ds_count"
),
"ds_count",
ds_count_scalar_subq,
),
).label("collection_stats")
)
).scalar_one()
)

0 comments on commit f4aed5d

Please sign in to comment.