Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clp-package): Add support for deleting archives that are exclusively within a time range. #594

Merged
merged 20 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions components/clp-package-utils/clp_package_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import logging

# Set up console logging
logging_console_handler = logging.StreamHandler()
logging_formatter = logging.Formatter(
"%(asctime)s.%(msecs)03d %(levelname)s [%(module)s] %(message)s", datefmt="%Y-%m-%dT%H:%M:%S"
)
logging_console_handler.setFormatter(logging_formatter)
# Set up root logger
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(logging_console_handler)
2 changes: 1 addition & 1 deletion components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def get_clp_home():
return clp_home.resolve()


def generate_container_name(job_type: JobType) -> str:
def generate_container_name(job_type: str) -> str:
"""
:param job_type:
:return: A unique container name for the given job type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,8 @@
validate_and_load_db_credentials_file,
)

# Setup logging
# Create logger
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Create logger

Don't think this comment is necessary. Can delete it in all the refactored files.

logger = logging.getLogger(__file__)
logger.setLevel(logging.INFO)
# Setup console logging
logging_console_handler = logging.StreamHandler()
logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s")
logging_console_handler.setFormatter(logging_formatter)
logger.addHandler(logging_console_handler)


def main(argv):
Expand Down Expand Up @@ -66,7 +59,7 @@ def main(argv):
logger.exception("Failed to load config.")
return -1

container_name = generate_container_name(JobType.COMPRESSION)
container_name = generate_container_name(str(JobType.COMPRESSION))
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

container_clp_config, mounts = generate_container_config(clp_config, clp_home)
generated_config_path_on_container, generated_config_path_on_host = dump_container_config(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,8 @@
validate_path_could_be_dir,
)

# Setup logging
# Create logger
logger = logging.getLogger("clp")
logger.setLevel(logging.DEBUG)
# Setup console logging
logging_console_handler = logging.StreamHandler()
logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s")
logging_console_handler.setFormatter(logging_formatter)
logger.addHandler(logging_console_handler)
logger = logging.getLogger(__file__)


def validate_and_load_config(
Expand Down Expand Up @@ -88,7 +81,7 @@ def handle_extract_file_cmd(
if clp_config is None:
return -1

container_name = generate_container_name(JobType.FILE_EXTRACTION)
container_name = generate_container_name(str(JobType.FILE_EXTRACTION))
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
container_clp_config, mounts = generate_container_config(clp_config, clp_home)
generated_config_path_on_container, generated_config_path_on_host = dump_container_config(
container_clp_config, clp_config, container_name
Expand Down Expand Up @@ -163,7 +156,7 @@ def handle_extract_ir_cmd(
if clp_config is None:
return -1

container_name = generate_container_name(JobType.IR_EXTRACTION)
container_name = generate_container_name(str(JobType.IR_EXTRACTION))
container_clp_config, mounts = generate_container_config(clp_config, clp_home)
generated_config_path_on_container, generated_config_path_on_host = dump_container_config(
container_clp_config, clp_config, container_name
Expand Down
105 changes: 105 additions & 0 deletions components/clp-package-utils/clp_package_utils/scripts/del_archives.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import argparse
import logging
import subprocess
import sys
from pathlib import Path

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
dump_container_config,
generate_container_config,
generate_container_name,
generate_container_start_cmd,
get_clp_home,
JobType,
load_config_file,
validate_and_load_db_credentials_file,
)

# Create logger
logger = logging.getLogger(__file__)


def main(argv):
clp_home = get_clp_home()
default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH

args_parser = argparse.ArgumentParser(
description="Deletes archives that fall within the specified time range."
)
args_parser.add_argument(
"--config",
"-c",
default=str(default_config_file_path),
help="CLP package configuration file.",
)
args_parser.add_argument(
"--begin-ts",
type=int,
default=0,
help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.",
)
args_parser.add_argument(
"--end-ts",
type=int,
required=True,
help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.",
)
parsed_args = args_parser.parse_args(argv[1:])

# Validate and load config file
try:
config_file_path = Path(parsed_args.config)
clp_config = load_config_file(config_file_path, default_config_file_path, clp_home)
clp_config.validate_logs_dir()

# Validate and load necessary credentials
validate_and_load_db_credentials_file(clp_config, clp_home, False)
except:
logger.exception("Failed to load config.")
return -1

# Validate the input timestamp
begin_ts = parsed_args.begin_ts
end_ts = parsed_args.end_ts
if begin_ts > end_ts:
logger.error("begin-ts must be <= end-ts")
return -1
if end_ts < 0 or begin_ts < 0:
logger.error("begin_ts and end_ts must be non-negative.")
return -1

container_name = generate_container_name("del-archives")

container_clp_config, mounts = generate_container_config(clp_config, clp_home)
generated_config_path_on_container, generated_config_path_on_host = dump_container_config(
container_clp_config, clp_config, container_name
)

necessary_mounts = [mounts.clp_home, mounts.logs_dir, mounts.archives_output_dir]
container_start_cmd = generate_container_start_cmd(
container_name, necessary_mounts, clp_config.execution_container
)

# fmt: off
del_archive_cmd = [
"python3",
"-m", "clp_package_utils.scripts.native.del_archives",
"--config", str(generated_config_path_on_container),
str(begin_ts),
str(end_ts)

]
# fmt: on

cmd = container_start_cmd + del_archive_cmd
subprocess.run(cmd, check=True)

# Remove generated files
generated_config_path_on_host.unlink()

return 0


if "__main__" == __name__:
sys.exit(main(sys.argv))
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,8 @@
load_config_file,
)

# Setup logging
# Create logger
logger = logging.getLogger(__file__)
logger.setLevel(logging.INFO)
# Setup console logging
logging_console_handler = logging.StreamHandler()
logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s")
logging_console_handler.setFormatter(logging_formatter)
logger.addHandler(logging_console_handler)


def print_compression_job_status(job_row, current_time):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,8 @@
wait_for_query_job,
)

# Setup logging
# Create logger
logger = logging.getLogger(__file__)
logger.setLevel(logging.INFO)
# Setup console logging
logging_console_handler = logging.StreamHandler()
logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s")
logging_console_handler.setFormatter(logging_formatter)
logger.addHandler(logging_console_handler)


def get_orig_file_id(db_config: Database, path: str) -> Optional[str]:
Expand Down
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a brand new script, let's use our latest conventions and best practices.

Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import argparse
import logging
import shutil
import sys
from contextlib import closing
from pathlib import Path
from typing import List

from clp_py_utils.clp_config import Database
from clp_py_utils.sql_adapter import SQL_Adapter

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
get_clp_home,
load_config_file,
)

# Create logger
logger = logging.getLogger(__file__)


def main(argv):
clp_home = get_clp_home()
default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH

args_parser = argparse.ArgumentParser(
description="Deletes archives that fall within the specified time range."
)
args_parser.add_argument(
"--config",
"-c",
required=True,
default=str(default_config_file_path),
help="CLP configuration file.",
)
args_parser.add_argument(
"begin_ts",
type=int,
help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.",
)
args_parser.add_argument(
"end_ts",
type=int,
help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.",
)
parsed_args = args_parser.parse_args(argv[1:])

# Validate and load config file
config_file_path = Path(parsed_args.config)
try:
clp_config = load_config_file(config_file_path, default_config_file_path, clp_home)
clp_config.validate_logs_dir()
except:
logger.exception("Failed to load config.")
return -1
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

database_config = clp_config.database
archives_dir = clp_config.archive_output.directory
if not archives_dir.exists():
logger.error("`archive_output.directory` doesn't exist.")
return -1

return _delete_archives(
archives_dir,
database_config,
parsed_args.begin_ts,
parsed_args.end_ts,
)


def _delete_archives(
archives_dir: Path,
database_config: Database,
begin_ts: int,
end_ts: int,
) -> int:
"""
Deletes all archives where `begin_ts <= archive.begin_timestamp` and
`archive.end_timestamp <= end_ts` from both the metadata database and disk.
:param archives_dir:
:param database_config:
:param begin_ts:
:param end_ts:
:return: 0 on success, -1 otherwise.
"""

archive_ids: List[str]
logger.info("Starting to delete archives from the database.")
try:
sql_adapter = SQL_Adapter(database_config)
clp_db_connection_params = database_config.get_clp_connection_params_and_type(True)
table_prefix = clp_db_connection_params["table_prefix"]
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
db_conn.cursor(dictionary=True)
) as db_cursor:
db_cursor.execute(
f"""
DELETE FROM `{table_prefix}archives`
WHERE begin_timestamp >= %s AND end_timestamp <= %s
RETURNING id
""",
(begin_ts, end_ts),
)
results = db_cursor.fetchall()

if 0 == len(results):
logger.info("No archives (exclusively) within the specified time range.")
return 0

archive_ids = [result["id"] for result in results]
db_cursor.execute(
f"""
DELETE FROM `{table_prefix}files`
WHERE archive_id in ({', '.join(['%s'] * len(archive_ids))})
""",
archive_ids,
)
db_conn.commit()
except Exception:
logger.exception("Failed to delete archives from the database. Aborting deletion.")
return -1

logger.info(f"Finished deleting archives from the database.")

for archive_id in archive_ids:
archive_path = archives_dir / archive_id
if not archive_path.is_dir():
logger.warning(f"Archive {archive_id} is not a directory. Skipping deletion.")
continue
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

logger.info(f"Deleting archive {archive_id} from disk.")
shutil.rmtree(archive_path)

logger.info(f"Finished deleting archives from disk.")

return 0


if "__main__" == __name__:
sys.exit(main(sys.argv))
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,8 @@
wait_for_query_job,
)

# Setup logging
# Create logger
logger = logging.getLogger(__file__)
logger.setLevel(logging.INFO)
# Setup console logging
logging_console_handler = logging.StreamHandler()
logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s")
logging_console_handler.setFormatter(logging_formatter)
logger.addHandler(logging_console_handler)


def create_and_monitor_job_in_db(
Expand Down
Loading