Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fully using new workspace #67

Merged
merged 24 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ed21a60
added (failing) test for test_tpch_data
wangpatrick57 Dec 29, 2024
91a6db3
wrote and passed test_tpch_data
wangpatrick57 Dec 29, 2024
0f00113
tpch data -> tables
wangpatrick57 Dec 29, 2024
74c32e2
now passing test_tpch_workload
wangpatrick57 Dec 29, 2024
8c2c4ac
added and passed test_job_tables
wangpatrick57 Dec 29, 2024
86f62eb
wrote and passed test_job_workload
wangpatrick57 Dec 29, 2024
883b183
fixed bug in workspace where it would always sleep for 1 second
wangpatrick57 Dec 29, 2024
664728d
can now run integtest_benchmark.py fully
wangpatrick57 Dec 29, 2024
e4235fe
gymlib integtests no longer crash when run together with other integt…
wangpatrick57 Dec 29, 2024
c430a90
fixed some paths in gymlib integtests
wangpatrick57 Dec 30, 2024
e0cde22
added and passed test_postgres_build
wangpatrick57 Dec 30, 2024
9d111d3
added and passed test_postgres_dbdata
wangpatrick57 Dec 30, 2024
85f40a0
now passing all integtests
wangpatrick57 Dec 30, 2024
395a6f8
fordpath/fpath/dpath -> path
wangpatrick57 Dec 30, 2024
1dcf669
dbms integtest now uses intended_dbdata_hardware
wangpatrick57 Dec 30, 2024
b5fd76e
fully removed the old open_and_save, save_file, and link_result
wangpatrick57 Dec 30, 2024
49364c9
num_times_created_this_run -> _num_times_created_this_run
wangpatrick57 Dec 30, 2024
2ffa8c7
rm lab.py
wangpatrick57 Dec 30, 2024
17f89ff
deleted all the cur_* functions from workspace fixed all uses of them
wangpatrick57 Dec 30, 2024
e3ac705
deleted append_group
wangpatrick57 Dec 30, 2024
f5c89e7
deleted more functions from workspace.py
wangpatrick57 Dec 30, 2024
e6ed566
refactored everything to use linkname helpers
wangpatrick57 Dec 30, 2024
388aeec
refactored how workspace works in gymlib tests
wangpatrick57 Dec 30, 2024
e3f7bfe
now resetting num times created in integtests
wangpatrick57 Dec 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
# Integration tests do require external systems to be running (most commonly a database instance).
# Unlike end-to-end tests though, they test a specific module in a detailed manner, much like a unit test does.
env:
# We set `INTENDED_DBDATA_HARDWARE` so that it's seen when `integtest_pg_conn.py` executes `_set_up_gymlib_integtest_workspace.sh`.
# The CI runs on ssd so we have to set this.
INTENDED_DBDATA_HARDWARE: ssd
run: |
. "$HOME/.cargo/env"
Expand Down
2 changes: 1 addition & 1 deletion benchmark/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
@click.group(name="benchmark")
@click.pass_obj
def benchmark_group(dbgym_workspace: DBGymWorkspace) -> None:
dbgym_workspace.append_group("benchmark")
pass


benchmark_group.add_command(tpch_group)
Expand Down
125 changes: 74 additions & 51 deletions benchmark/job/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@
from typing import Optional

import click
from gymlib.symlinks_paths import (
get_tables_dirname,
get_workload_dirname,
get_workload_suffix,
name_to_linkname,
)

from benchmark.constants import DEFAULT_SCALE_FACTOR
from util.log import DBGYM_LOGGER_NAME
from util.shell import subprocess_run
from util.workspace import (
DBGymWorkspace,
get_default_tables_dname,
get_workload_name,
is_fully_resolved,
link_result,
)
from util.workspace import DBGymWorkspace, fully_resolve_path

JOB_TABLES_URL = "https://event.cwi.nl/da/job/imdb.tgz"
JOB_QUERIES_URL = "https://event.cwi.nl/da/job/job.tgz"
Expand Down Expand Up @@ -137,18 +137,22 @@
@click.group(name="job")
@click.pass_obj
def job_group(dbgym_workspace: DBGymWorkspace) -> None:
dbgym_workspace.append_group("job")
pass


@job_group.command(name="data")
@job_group.command(name="tables")
# We expose this option to keep its interface consistent with other workloads, but you should never pass in something other than DEFAULT_SCALE_FACTOR.
@click.argument("scale-factor", type=float)
@click.pass_obj
# The reason generate data is separate from create dbdata is because generate-data is generic
# The reason generate data is separate from create dbdata is because generate data is generic
# to all DBMSs while create dbdata is specific to a single DBMS.
def job_data(dbgym_workspace: DBGymWorkspace, scale_factor: float) -> None:
def job_tables(dbgym_workspace: DBGymWorkspace, scale_factor: float) -> None:
_job_tables(dbgym_workspace, scale_factor)


def _job_tables(dbgym_workspace: DBGymWorkspace, scale_factor: float) -> None:
assert scale_factor == DEFAULT_SCALE_FACTOR
_download_job_data(dbgym_workspace)
_download_job_tables(dbgym_workspace)


@job_group.command(name="workload")
Expand All @@ -161,18 +165,24 @@ def job_data(dbgym_workspace: DBGymWorkspace, scale_factor: float) -> None:
@click.pass_obj
def job_workload(
dbgym_workspace: DBGymWorkspace, query_subset: str, scale_factor: float
) -> None:
_job_workload(dbgym_workspace, query_subset, scale_factor)


def _job_workload(
dbgym_workspace: DBGymWorkspace, query_subset: str, scale_factor: float
) -> None:
assert scale_factor == DEFAULT_SCALE_FACTOR
_download_job_queries(dbgym_workspace)
_generate_job_workload(dbgym_workspace, query_subset)


def _download_job_data(dbgym_workspace: DBGymWorkspace) -> None:
def _download_job_tables(dbgym_workspace: DBGymWorkspace) -> None:
_download_and_untar_dir(
dbgym_workspace,
JOB_TABLES_URL,
"imdb.tgz",
get_default_tables_dname(DEFAULT_SCALE_FACTOR),
get_tables_dirname("job", DEFAULT_SCALE_FACTOR),
)


Expand All @@ -199,51 +209,66 @@ def _download_and_untar_dir(
an "original" directory name. If this is the case, you should set
`untarred_original_dname` to ensure that it gets renamed to `untarred_dname`.
"""
expected_symlink_dpath = (
dbgym_workspace.cur_symlinks_data_path(mkdir=True) / f"{untarred_dname}.link"
expected_symlink_path = (
dbgym_workspace.dbgym_cur_symlinks_path / f"{untarred_dname}.link"
)
if expected_symlink_dpath.exists():
if expected_symlink_path.exists():
logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Skipping download: {expected_symlink_dpath}"
f"Skipping download: {expected_symlink_path}"
)
return

logging.getLogger(DBGYM_LOGGER_NAME).info(f"Downloading: {expected_symlink_dpath}")
real_data_path = dbgym_workspace.cur_task_runs_data_path(mkdir=True)
subprocess_run(f"curl -O {download_url}", cwd=real_data_path)
untarred_data_dpath = dbgym_workspace.cur_task_runs_data_path(untarred_dname)
logging.getLogger(DBGYM_LOGGER_NAME).info(f"Downloading: {expected_symlink_path}")
subprocess_run(f"curl -O {download_url}", cwd=dbgym_workspace.dbgym_this_run_path)
untarred_data_path = dbgym_workspace.dbgym_this_run_path / untarred_dname

if untarred_original_dname is not None:
assert not untarred_data_dpath.exists()
subprocess_run(f"tar -zxvf {download_tarred_fname}", cwd=real_data_path)
assert (real_data_path / untarred_original_dname).exists()
assert not untarred_data_path.exists()
subprocess_run(
f"tar -zxvf {download_tarred_fname}",
cwd=dbgym_workspace.dbgym_this_run_path,
)
assert (dbgym_workspace.dbgym_this_run_path / untarred_original_dname).exists()
subprocess_run(
f"mv {untarred_original_dname} {untarred_dname}", cwd=real_data_path
f"mv {untarred_original_dname} {untarred_dname}",
cwd=dbgym_workspace.dbgym_this_run_path,
)
else:
untarred_data_dpath.mkdir(parents=True, exist_ok=False)
subprocess_run(f"tar -zxvf ../{download_tarred_fname}", cwd=untarred_data_dpath)
untarred_data_path.mkdir(parents=True, exist_ok=False)
subprocess_run(f"tar -zxvf ../{download_tarred_fname}", cwd=untarred_data_path)

assert untarred_data_dpath.exists()
subprocess_run(f"rm {download_tarred_fname}", cwd=real_data_path)
symlink_dpath = link_result(dbgym_workspace, untarred_data_dpath)
assert expected_symlink_dpath.samefile(symlink_dpath)
logging.getLogger(DBGYM_LOGGER_NAME).info(f"Downloaded: {expected_symlink_dpath}")
assert untarred_data_path.exists()
subprocess_run(
f"rm {download_tarred_fname}", cwd=dbgym_workspace.dbgym_this_run_path
)
symlink_path = dbgym_workspace.link_result(untarred_data_path)
assert expected_symlink_path.samefile(symlink_path)
logging.getLogger(DBGYM_LOGGER_NAME).info(f"Downloaded: {expected_symlink_path}")


def _generate_job_workload(
dbgym_workspace: DBGymWorkspace,
query_subset: str,
) -> None:
workload_name = get_workload_name(DEFAULT_SCALE_FACTOR, query_subset)
expected_workload_symlink_dpath = dbgym_workspace.cur_symlinks_data_path(
mkdir=True
) / (workload_name + ".link")
workload_name = get_workload_dirname(
"job",
DEFAULT_SCALE_FACTOR,
get_workload_suffix("job", query_subset=query_subset),
)
expected_workload_symlink_path = dbgym_workspace.dbgym_cur_symlinks_path / (
name_to_linkname(workload_name)
)
if expected_workload_symlink_path.exists():
logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Skipping generation: {expected_workload_symlink_path}"
)
return

logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Generating: {expected_workload_symlink_dpath}"
f"Generating: {expected_workload_symlink_path}"
)
real_dpath = dbgym_workspace.cur_task_runs_data_path(workload_name, mkdir=True)
workload_path = dbgym_workspace.dbgym_this_run_path / workload_name
workload_path.mkdir(parents=False, exist_ok=False)

query_names = None
if query_subset == "all":
Expand All @@ -255,19 +280,17 @@ def _generate_job_workload(
else:
assert False

with open(real_dpath / "order.txt", "w") as f:
with open(workload_path / "order.txt", "w") as f:
queries_parent_path = dbgym_workspace.dbgym_cur_symlinks_path / (
name_to_linkname(JOB_QUERIES_DNAME)
)

for qname in query_names:
sql_fpath = (
dbgym_workspace.cur_symlinks_data_path(mkdir=True)
/ (f"{JOB_QUERIES_DNAME}.link")
).resolve() / f"{qname}.sql"
assert is_fully_resolved(
sql_fpath
), "We should only write existent real absolute paths to a file"
f.write(f"Q{qname},{sql_fpath}\n")
sql_path = fully_resolve_path(queries_parent_path / f"{qname}.sql")
f.write(f"Q{qname},{sql_path}\n")

workload_symlink_dpath = link_result(dbgym_workspace, real_dpath)
assert workload_symlink_dpath == expected_workload_symlink_dpath
workload_symlink_path = dbgym_workspace.link_result(workload_path)
assert workload_symlink_path == expected_workload_symlink_path
logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Generated: {expected_workload_symlink_dpath}"
f"Generated: {expected_workload_symlink_path}"
)
49 changes: 20 additions & 29 deletions benchmark/job/load_info.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
from pathlib import Path
from typing import Optional

from gymlib.symlinks_paths import get_tables_symlink_path

from benchmark.constants import DEFAULT_SCALE_FACTOR
from dbms.load_info_base_class import LoadInfoBaseClass
from util.workspace import DBGymWorkspace, get_default_tables_dname, is_fully_resolved
from util.workspace import DBGymWorkspace, fully_resolve_path

JOB_SCHEMA_FNAME = "job_schema.sql"


class JobLoadInfo(LoadInfoBaseClass):
CODEBASE_PATH_COMPONENTS = ["dbgym", "benchmark", "job"]
CODEBASE_DNAME = "_".join(CODEBASE_PATH_COMPONENTS)
TABLES = [
"aka_name",
"aka_title",
Expand All @@ -36,43 +36,34 @@ class JobLoadInfo(LoadInfoBaseClass):
]

def __init__(self, dbgym_workspace: DBGymWorkspace):
# schema and constraints
schema_root_dpath = dbgym_workspace.base_dbgym_repo_dpath
for component in JobLoadInfo.CODEBASE_PATH_COMPONENTS[
1:
]: # [1:] to skip "dbgym"
schema_root_dpath /= component
self._schema_fpath = schema_root_dpath / JOB_SCHEMA_FNAME
# Schema (directly in the codebase).
job_codebase_path = dbgym_workspace.base_dbgym_repo_path / "benchmark" / "job"
self._schema_path = job_codebase_path / JOB_SCHEMA_FNAME
assert (
self._schema_fpath.exists()
), f"self._schema_fpath ({self._schema_fpath}) does not exist"
self._schema_path.exists()
), f"self._schema_path ({self._schema_path}) does not exist"

# Tables
data_root_dpath = (
dbgym_workspace.dbgym_symlinks_path / JobLoadInfo.CODEBASE_DNAME / "data"
)
tables_symlink_dpath = (
data_root_dpath / f"{get_default_tables_dname(DEFAULT_SCALE_FACTOR)}.link"
tables_path = fully_resolve_path(
get_tables_symlink_path(
dbgym_workspace.dbgym_workspace_path, "job", DEFAULT_SCALE_FACTOR
)
)
tables_dpath = tables_symlink_dpath.resolve()
assert is_fully_resolved(
tables_dpath
), f"tables_dpath ({tables_dpath}) should be an existent real absolute path. Make sure you have generated the TPC-H data"
self._tables_and_fpaths = []
self._tables_and_paths = []
for table in JobLoadInfo.TABLES:
table_fpath = tables_dpath / f"{table}.csv"
self._tables_and_fpaths.append((table, table_fpath))
table_path = tables_path / f"{table}.csv"
self._tables_and_paths.append((table, table_path))

def get_schema_fpath(self) -> Path:
return self._schema_fpath
def get_schema_path(self) -> Path:
return self._schema_path

def get_tables_and_fpaths(self) -> list[tuple[str, Path]]:
return self._tables_and_fpaths
def get_tables_and_paths(self) -> list[tuple[str, Path]]:
return self._tables_and_paths

def get_table_file_delimiter(self) -> str:
return ","

def get_constraints_fpath(self) -> Optional[Path]:
def get_constraints_path(self) -> Optional[Path]:
# JOB does not have any constraints. It does have indexes, but we don't want to create
# those indexes so that the tuning agent can start from a clean slate.
return None
Empty file added benchmark/tests/__init__.py
Empty file.
1 change: 1 addition & 0 deletions benchmark/tests/benchmark_integtest_dbgym_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
dbgym_workspace_path: ../dbgym_benchmark_integtest_workspace/
Loading
Loading