Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clp-package): Add support for deleting archives that are exclusively within a time range. #594

Merged
merged 20 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class JobType(KebabCaseStrEnum):
FILE_EXTRACTION = auto()
IR_EXTRACTION = auto()
SEARCH = auto()
DEL_ARCHIVE = auto()


class DockerMount:
Expand Down
110 changes: 110 additions & 0 deletions components/clp-package-utils/clp_package_utils/scripts/del_archives.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import argparse
import logging
import pathlib
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
import subprocess
import sys

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
dump_container_config,
generate_container_config,
generate_container_name,
generate_container_start_cmd,
get_clp_home,
JobType,
load_config_file,
validate_and_load_db_credentials_file,
)

# Setup logging
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
# Create logger
logger = logging.getLogger(__file__)
logger.setLevel(logging.INFO)
# Setup console logging
logging_console_handler = logging.StreamHandler()
logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s")
logging_console_handler.setFormatter(logging_formatter)
logger.addHandler(logging_console_handler)


def main(argv):
clp_home = get_clp_home()
default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH

args_parser = argparse.ArgumentParser(description="Prune the out-dated archives.")
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
args_parser.add_argument(
"--config",
"-c",
default=str(default_config_file_path),
help="CLP package configuration file.",
)
args_parser.add_argument(
"--begin-ts",
type=int,
default=0,
help="Time range filter lower-bound (inclusive) as milliseconds" " from the UNIX epoch.",
)
args_parser.add_argument(
"--end-ts",
type=int,
required=True,
help="Time range filter upper-bound (inclusive) as milliseconds" " from the UNIX epoch.",
)
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
parsed_args = args_parser.parse_args(argv[1:])

# Validate and load config file
try:
config_file_path = pathlib.Path(parsed_args.config)
clp_config = load_config_file(config_file_path, default_config_file_path, clp_home)
clp_config.validate_logs_dir()

# Validate and load necessary credentials
validate_and_load_db_credentials_file(clp_config, clp_home, False)
except:
logger.exception("Failed to load config.")
return -1
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

# Validate the input timestamp
begin_ts = parsed_args.begin_ts
end_ts = parsed_args.end_ts
if begin_ts > end_ts:
logger.error("begin-ts must be less than or equal to end-ts")
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
return -1
if end_ts < 0 or begin_ts < 0:
logger.error("begin_ts and end_ts must be positive.")
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
return -1

container_name = generate_container_name(JobType.DEL_ARCHIVE)
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

container_clp_config, mounts = generate_container_config(clp_config, clp_home)
generated_config_path_on_container, generated_config_path_on_host = dump_container_config(
container_clp_config, clp_config, container_name
)

necessary_mounts = [mounts.clp_home, mounts.logs_dir, mounts.archives_output_dir]
container_start_cmd = generate_container_start_cmd(
container_name, necessary_mounts, clp_config.execution_container
)

# fmt: off
del_archive_cmd = [
"python3",
"-m", "clp_package_utils.scripts.native.del_archives",
"--config", str(generated_config_path_on_container),
str(begin_ts),
str(end_ts)

]
# fmt: on

cmd = container_start_cmd + del_archive_cmd
subprocess.run(cmd, check=True)

# Remove generated files
generated_config_path_on_host.unlink()

return 0


if "__main__" == __name__:
sys.exit(main(sys.argv))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a brand new script, let's use our latest conventions and best practices.

Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import argparse
import logging
import pathlib
import shutil
import sys
from contextlib import closing
from typing import List

from clp_py_utils.sql_adapter import SQL_Adapter

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
get_clp_home,
load_config_file,
)

# Setup logging
# Create logger
logger = logging.getLogger(__file__)
logger.setLevel(logging.INFO)
# Setup console logging
logging_console_handler = logging.StreamHandler()
logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s")
logging_console_handler.setFormatter(logging_formatter)
logger.addHandler(logging_console_handler)
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved


def handle_file_deletion(
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
clp_home: pathlib.Path,
config_file_path: pathlib.Path,
default_config_file_path: pathlib.Path,
begin_ts: int,
end_ts: int,
) -> int:
"""
Deletes all archives with `begin_timestamp` and `end_timestamp` within the specified range from
the database, and removes any files associated with these archives.
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
:param clp_home:
:param config_file_path:
:param default_config_file_path:
:param begin_ts:
:param end_ts:
:return: 0 on success, -1 otherwise.
"""

# Validate and load config file
try:
clp_config = load_config_file(config_file_path, default_config_file_path, clp_home)
clp_config.validate_logs_dir()
except:
logger.exception("Failed to load config.")
return -1
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

database_config = clp_config.database
archives_dir = clp_config.archive_output.directory
# Note, the error message doesn't output the value of archives_dir because it is a mounted
# path. It could be confusing for user because the path will not exist in their file system.
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
if not archives_dir.exists():
logger.error(f"archive directory doesn't exist. abort deletion")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.error(f"archive directory doesn't exist. abort deletion")
# TODO: Figure out a way to log `archives_dir` relative to the user's file system rather
# than the container's file system.
logger.error(f"archive directory doesn't exist. abort deletion")

haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
return -1

archive_ids: List[str]
logger.info(f"Start deleting archives from database")
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
try:
sql_adapter = SQL_Adapter(database_config)
clp_db_connection_params = database_config.get_clp_connection_params_and_type(True)
table_prefix = clp_db_connection_params["table_prefix"]
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
db_conn.cursor(dictionary=True)
) as db_cursor:
db_cursor.execute(
f"""
DELETE FROM `{table_prefix}archives` WHERE
begin_timestamp >= {begin_ts} AND end_timestamp <= {end_ts}
RETURNING id
"""
)
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Resolve timestamp handling in SQL query

The current implementation has an unresolved issue with large integer timestamps. Consider alternative approaches for handling large integers.

Here are potential solutions:

  1. Use string representation:
-                f"""
-                DELETE FROM `{table_prefix}archives` WHERE
-                begin_timestamp >= {begin_ts} AND end_timestamp <= {end_ts}
-                RETURNING id
-                """
+                f"DELETE FROM `{table_prefix}archives` WHERE begin_timestamp >= %s AND end_timestamp <= %s RETURNING id",
+                (str(begin_ts), str(end_ts))
  1. Use DECIMAL type:
from decimal import Decimal

db_cursor.execute(
    f"DELETE FROM `{table_prefix}archives` WHERE begin_timestamp >= %s AND end_timestamp <= %s RETURNING id",
    (Decimal(begin_ts), Decimal(end_ts))
)

Please test these solutions to determine which one resolves the integer overflow issue in your environment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, managed to resolve it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@haiqi96, great to hear that you resolved the issue!


If you found this review helpful, would you consider giving us a shout-out on X?
Thank you for using CodeRabbit!

results = db_cursor.fetchall()
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

if 0 == len(results):
logger.warning("No archive falls into the specified time range, abort deletion")
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
return 0

archive_ids = [result["id"] for result in results]
db_cursor.execute(
f"""
DELETE FROM `{table_prefix}files`
WHERE archive_id in ({', '.join(['%s'] * len(archive_ids))})
""",
archive_ids,
)
db_conn.commit()
except Exception:
logger.exception("Failed to delete archives from database, abort deletion")
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
return -1

logger.info(f"Finished deleting archives from the database")
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

for archive_id in archive_ids:
logger.info(f"Deleting archive {archive_id} from the storage")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think should makes more sense to be just before shutil.rmtree, right? Otherwise, you could have the log sequence:

  1. "Deleting archive"
  2. "Archive ... does not resolve ... skip deletion"

haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
archive_path = archives_dir / archive_id
if not archive_path.is_dir():
logger.warning(f"Archive {archive_id} does not resolve to a directory, skip deletion")
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
continue
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
shutil.rmtree(archive_path)

logger.info(f"Finished deleting archives")
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
return 0


def main(argv):
clp_home = get_clp_home()
default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH

args_parser = argparse.ArgumentParser(
description="Delete archives that fall into the specified time range."
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
)
args_parser.add_argument(
"--config",
"-c",
required=True,
default=str(default_config_file_path),
help="CLP configuration file.",
)
args_parser.add_argument(
"begin_ts",
type=int,
help="Time range filter lower-bound (inclusive) as milliseconds" " from the UNIX epoch.",
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
)
args_parser.add_argument(
"end_ts",
type=int,
help="Time range filter lower-bound (inclusive) as milliseconds" " from the UNIX epoch.",
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
)
parsed_args = args_parser.parse_args(argv[1:])

return handle_file_deletion(
clp_home,
pathlib.Path(parsed_args.config),
default_config_file_path,
parsed_args.begin_ts,
parsed_args.end_ts,
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
)


if "__main__" == __name__:
sys.exit(main(sys.argv))
9 changes: 9 additions & 0 deletions components/package-template/src/sbin/del-archives.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/usr/bin/env bash

script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"
package_root="$script_dir/.."

haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
PYTHONPATH=$(readlink -f "$package_root/lib/python3/site-packages") \
python3 \
-m clp_package_utils.scripts.del_archives \
"$@"
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
Loading