From 37263eb792b0ccdbd0a154ecaa3ec757c8b71b12 Mon Sep 17 00:00:00 2001 From: haiqi96 <14502009+haiqi96@users.noreply.github.com> Date: Wed, 18 Dec 2024 20:23:36 -0500 Subject: [PATCH] feat(package)!: Add support for writing clp-s single file archives to S3. (#634) Co-authored-by: kirkrodrigues <2454684+kirkrodrigues@users.noreply.github.com> --- .../clp_package_utils/general.py | 26 ++- .../clp_package_utils/scripts/decompress.py | 14 +- .../clp_package_utils/scripts/del_archives.py | 7 + .../scripts/native/decompress.py | 4 +- .../scripts/native/del_archives.py | 2 +- .../clp_package_utils/scripts/search.py | 6 + .../clp_package_utils/scripts/start_clp.py | 45 ++--- .../clp-py-utils/clp_py_utils/clp_config.py | 154 +++++++++++++++-- .../initialize-orchestration-db.py | 2 +- .../clp-py-utils/clp_py_utils/s3_utils.py | 51 ++++++ components/clp-py-utils/pyproject.toml | 2 + .../executor/compress/fs_compression_task.py | 163 ++++++++++++------ .../executor/query/extract_stream_task.py | 47 +++-- .../executor/query/fs_search_task.py | 39 +++-- .../job_orchestration/executor/query/utils.py | 5 +- .../job_orchestration/executor/utils.py | 23 +++ .../compress/compression_scheduler.py | 22 +-- .../package-template/src/etc/clp-config.yml | 4 +- 18 files changed, 470 insertions(+), 146 deletions(-) create mode 100644 components/clp-py-utils/clp_py_utils/s3_utils.py create mode 100644 components/job-orchestration/job_orchestration/executor/utils.py diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index 5fae8166f..60f1053f8 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -20,7 +20,9 @@ REDIS_COMPONENT_NAME, REDUCER_COMPONENT_NAME, RESULTS_CACHE_COMPONENT_NAME, + StorageType, WEBUI_COMPONENT_NAME, + WorkerConfig, ) from clp_py_utils.core import ( get_config_value, @@ -239,17 +241,17 @@ def generate_container_config( DockerMountType.BIND, clp_config.logs_directory, container_clp_config.logs_directory ) - container_clp_config.archive_output.directory = pathlib.Path("/") / "mnt" / "archive-output" + container_clp_config.archive_output.set_directory(pathlib.Path("/") / "mnt" / "archive-output") if not is_path_already_mounted( clp_home, CONTAINER_CLP_HOME, - clp_config.archive_output.directory, - container_clp_config.archive_output.directory, + clp_config.archive_output.get_directory(), + container_clp_config.archive_output.get_directory(), ): docker_mounts.archives_output_dir = DockerMount( DockerMountType.BIND, - clp_config.archive_output.directory, - container_clp_config.archive_output.directory, + clp_config.archive_output.get_directory(), + container_clp_config.archive_output.get_directory(), ) container_clp_config.stream_output.directory = pathlib.Path("/") / "mnt" / "stream-output" @@ -268,6 +270,18 @@ def generate_container_config( return container_clp_config, docker_mounts +def generate_worker_config(clp_config: CLPConfig) -> WorkerConfig: + worker_config = WorkerConfig() + worker_config.package = clp_config.package.copy(deep=True) + worker_config.archive_output = clp_config.archive_output.copy(deep=True) + worker_config.data_directory = clp_config.data_directory + + worker_config.stream_output_dir = clp_config.stream_output.directory + worker_config.stream_collection_name = clp_config.results_cache.stream_collection_name + + return worker_config + + def dump_container_config( container_clp_config: CLPConfig, clp_config: CLPConfig, container_name: str ) -> Tuple[pathlib.Path, pathlib.Path]: @@ -482,7 +496,7 @@ def validate_results_cache_config( def validate_worker_config(clp_config: CLPConfig): clp_config.validate_input_logs_dir() - clp_config.validate_archive_output_dir() + clp_config.validate_archive_output_config() clp_config.validate_stream_output_dir() diff --git a/components/clp-package-utils/clp_package_utils/scripts/decompress.py b/components/clp-package-utils/clp_package_utils/scripts/decompress.py index 325f2add6..092c339a6 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/decompress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/decompress.py @@ -5,7 +5,7 @@ import sys from typing import Optional -from clp_py_utils.clp_config import CLPConfig +from clp_py_utils.clp_config import CLPConfig, StorageType from clp_package_utils.general import ( CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, @@ -81,6 +81,11 @@ def handle_extract_file_cmd( if clp_config is None: return -1 + storage_type = clp_config.archive_output.storage.type + if StorageType.FS != storage_type: + logger.error(f"File extraction is not supported for archive storage type: {storage_type}.") + return -1 + container_name = generate_container_name(str(JobType.FILE_EXTRACTION)) container_clp_config, mounts = generate_container_config(clp_config, clp_home) generated_config_path_on_container, generated_config_path_on_host = dump_container_config( @@ -156,6 +161,13 @@ def handle_extract_stream_cmd( if clp_config is None: return -1 + storage_type = clp_config.archive_output.storage.type + if StorageType.FS != storage_type: + logger.error( + f"Stream extraction is not supported for archive storage type: {storage_type}." + ) + return -1 + container_name = generate_container_name(str(JobType.IR_EXTRACTION)) container_clp_config, mounts = generate_container_config(clp_config, clp_home) generated_config_path_on_container, generated_config_path_on_host = dump_container_config( diff --git a/components/clp-package-utils/clp_package_utils/scripts/del_archives.py b/components/clp-package-utils/clp_package_utils/scripts/del_archives.py index 54d959771..5b9bc6d97 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/del_archives.py +++ b/components/clp-package-utils/clp_package_utils/scripts/del_archives.py @@ -4,6 +4,8 @@ import sys from pathlib import Path +from clp_py_utils.clp_config import StorageType + from clp_package_utils.general import ( CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, dump_container_config, @@ -57,6 +59,11 @@ def main(argv): logger.exception("Failed to load config.") return -1 + storage_type = clp_config.archive_output.storage.type + if StorageType.FS != storage_type: + logger.error(f"Archive deletion is not supported for storage type: {storage_type}.") + return -1 + # Validate the input timestamp begin_ts = parsed_args.begin_ts end_ts = parsed_args.end_ts diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py b/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py index d16cdcb6f..7e3c7da6e 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py @@ -167,7 +167,7 @@ def validate_and_load_config_file( """ try: clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) - clp_config.validate_archive_output_dir() + clp_config.validate_archive_output_config() clp_config.validate_logs_dir() return clp_config except Exception: @@ -207,7 +207,7 @@ def handle_extract_file_cmd( list_path = parsed_args.files_from logs_dir = clp_config.logs_directory - archives_dir = clp_config.archive_output.directory + archives_dir = clp_config.archive_output.get_directory() # Generate database config file for clp db_config_file_path = logs_dir / f".decompress-db-config-{uuid.uuid4()}.yml" diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/del_archives.py b/components/clp-package-utils/clp_package_utils/scripts/native/del_archives.py index 735bf299d..c489c3806 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/del_archives.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/del_archives.py @@ -54,7 +54,7 @@ def main(argv): return -1 database_config = clp_config.database - archives_dir = clp_config.archive_output.directory + archives_dir = clp_config.archive_output.get_directory() if not archives_dir.exists(): logger.error("`archive_output.directory` doesn't exist.") return -1 diff --git a/components/clp-package-utils/clp_package_utils/scripts/search.py b/components/clp-package-utils/clp_package_utils/scripts/search.py index beb7fb0b0..38d528528 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/search.py +++ b/components/clp-package-utils/clp_package_utils/scripts/search.py @@ -7,6 +7,7 @@ import uuid import yaml +from clp_py_utils.clp_config import StorageType from clp_package_utils.general import ( CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, @@ -74,6 +75,11 @@ def main(argv): logger.exception("Failed to load config.") return -1 + storage_type = clp_config.archive_output.storage.type + if StorageType.FS != storage_type: + logger.error(f"Search is not supported for archive storage type: {storage_type}.") + return -1 + container_name = generate_container_name(str(JobType.SEARCH)) container_clp_config, mounts = generate_container_config(clp_config, clp_home) diff --git a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py index 8097929f1..6de3174ff 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py @@ -29,6 +29,7 @@ REDIS_COMPONENT_NAME, REDUCER_COMPONENT_NAME, RESULTS_CACHE_COMPONENT_NAME, + StorageType, WEBUI_COMPONENT_NAME, ) from job_orchestration.scheduler.constants import QueueName @@ -42,6 +43,7 @@ DockerMount, DockerMountType, generate_container_config, + generate_worker_config, get_clp_home, is_container_exited, is_container_running, @@ -626,6 +628,7 @@ def start_compression_worker( ): celery_method = "job_orchestration.executor.compress" celery_route = f"{QueueName.COMPRESSION}" + compression_worker_mounts = [mounts.archives_output_dir] generic_start_worker( COMPRESSION_WORKER_COMPONENT_NAME, instance_id, @@ -637,8 +640,7 @@ def start_compression_worker( clp_config.redis.compression_backend_database, num_cpus, mounts, - None, - None, + compression_worker_mounts, ) @@ -652,11 +654,9 @@ def start_query_worker( celery_method = "job_orchestration.executor.query" celery_route = f"{QueueName.QUERY}" - query_worker_mount = [mounts.stream_output_dir] - query_worker_env = { - "CLP_STREAM_OUTPUT_DIR": container_clp_config.stream_output.directory, - "CLP_STREAM_COLLECTION_NAME": clp_config.results_cache.stream_collection_name, - } + query_worker_mounts = [mounts.stream_output_dir] + if clp_config.archive_output.storage.type == StorageType.FS: + query_worker_mounts.append(mounts.archives_output_dir) generic_start_worker( QUERY_WORKER_COMPONENT_NAME, @@ -669,8 +669,7 @@ def start_query_worker( clp_config.redis.query_backend_database, num_cpus, mounts, - query_worker_env, - query_worker_mount, + query_worker_mounts, ) @@ -685,8 +684,7 @@ def generic_start_worker( redis_database: int, num_cpus: int, mounts: CLPDockerMounts, - worker_specific_env: Dict[str, Any], - worker_specific_mount: List[Optional[DockerMount]], + worker_specific_mount: Optional[List[Optional[DockerMount]]], ): logger.info(f"Starting {component_name}...") @@ -694,14 +692,18 @@ def generic_start_worker( if container_exists(container_name): return - validate_worker_config(clp_config) + container_config_filename = f"{container_name}.yml" + container_config_file_path = clp_config.logs_directory / container_config_filename + container_worker_config = generate_worker_config(container_clp_config) + with open(container_config_file_path, "w") as f: + yaml.safe_dump(container_worker_config.dump_to_primitive_dict(), f) logs_dir = clp_config.logs_directory / component_name logs_dir.mkdir(parents=True, exist_ok=True) container_logs_dir = container_clp_config.logs_directory / component_name # Create necessary directories - clp_config.archive_output.directory.mkdir(parents=True, exist_ok=True) + clp_config.archive_output.get_directory().mkdir(parents=True, exist_ok=True) clp_config.stream_output.directory.mkdir(parents=True, exist_ok=True) clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages" @@ -724,24 +726,17 @@ def generic_start_worker( f"{container_clp_config.redis.host}:{container_clp_config.redis.port}/{redis_database}" ), "-e", f"CLP_HOME={CONTAINER_CLP_HOME}", - "-e", f"CLP_DATA_DIR={container_clp_config.data_directory}", - "-e", f"CLP_ARCHIVE_OUTPUT_DIR={container_clp_config.archive_output.directory}", + "-e", f"CLP_CONFIG_PATH={container_clp_config.logs_directory / container_config_filename}", "-e", f"CLP_LOGS_DIR={container_logs_dir}", "-e", f"CLP_LOGGING_LEVEL={worker_config.logging_level}", - "-e", f"CLP_STORAGE_ENGINE={clp_config.package.storage_engine}", "-u", f"{os.getuid()}:{os.getgid()}", ] - if worker_specific_env: - for env_name, env_value in worker_specific_env.items(): - container_start_cmd.append("-e") - container_start_cmd.append(f"{env_name}={env_value}") - # fmt: on + necessary_mounts = [ mounts.clp_home, mounts.data_dir, mounts.logs_dir, - mounts.archives_output_dir, mounts.input_logs_dir, ] if worker_specific_mount: @@ -1125,6 +1120,12 @@ def main(argv): QUERY_WORKER_COMPONENT_NAME, ): validate_and_load_redis_credentials_file(clp_config, clp_home, True) + if target in ( + ALL_TARGET_NAME, + COMPRESSION_WORKER_COMPONENT_NAME, + QUERY_WORKER_COMPONENT_NAME, + ): + validate_worker_config(clp_config) clp_config.validate_data_dir() clp_config.validate_logs_dir() diff --git a/components/clp-py-utils/clp_py_utils/clp_config.py b/components/clp-py-utils/clp_py_utils/clp_config.py index 79a94505d..f59de7647 100644 --- a/components/clp-py-utils/clp_py_utils/clp_config.py +++ b/components/clp-py-utils/clp_py_utils/clp_config.py @@ -1,10 +1,10 @@ import pathlib -import typing from enum import auto +from typing import Literal, Optional, Union from dotenv import dotenv_values from pydantic import BaseModel, PrivateAttr, validator -from strenum import KebabCaseStrEnum +from strenum import KebabCaseStrEnum, LowercaseStrEnum from .clp_logging import get_valid_logging_level, is_valid_logging_level from .core import ( @@ -48,6 +48,11 @@ class StorageEngine(KebabCaseStrEnum): CLP_S = auto() +class StorageType(LowercaseStrEnum): + FS = auto() + S3 = auto() + + VALID_STORAGE_ENGINES = [storage_engine.value for storage_engine in StorageEngine] @@ -69,12 +74,12 @@ class Database(BaseModel): host: str = "localhost" port: int = 3306 name: str = "clp-db" - ssl_cert: typing.Optional[str] = None + ssl_cert: Optional[str] = None auto_commit: bool = False compress: bool = True - username: typing.Optional[str] = None - password: typing.Optional[str] = None + username: Optional[str] = None + password: Optional[str] = None @validator("type") def validate_database_type(cls, field): @@ -227,7 +232,7 @@ class Redis(BaseModel): query_backend_database: int = 0 compression_backend_database: int = 1 # redis can perform authentication without a username - password: typing.Optional[str] + password: Optional[str] @validator("host") def validate_host(cls, field): @@ -300,12 +305,80 @@ class Queue(BaseModel): host: str = "localhost" port: int = 5672 - username: typing.Optional[str] - password: typing.Optional[str] + username: Optional[str] + password: Optional[str] -class ArchiveOutput(BaseModel): +class S3Config(BaseModel): + region_code: str + bucket: str + key_prefix: str + + access_key_id: Optional[str] = None + secret_access_key: Optional[str] = None + + @validator("region_code") + def validate_region_code(cls, field): + if field == "": + raise ValueError("region_code cannot be empty") + return field + + @validator("bucket") + def validate_bucket(cls, field): + if field == "": + raise ValueError("bucket cannot be empty") + return field + + @validator("key_prefix") + def validate_key_prefix(cls, field): + if field == "": + raise ValueError("key_prefix cannot be empty") + if not field.endswith("/"): + raise ValueError('key_prefix must end with "/"') + return field + + +class FsStorage(BaseModel): + type: Literal[StorageType.FS.value] = StorageType.FS.value directory: pathlib.Path = pathlib.Path("var") / "data" / "archives" + + @validator("directory") + def validate_directory(cls, field): + if "" == field: + raise ValueError("directory cannot be empty") + return field + + def make_config_paths_absolute(self, clp_home: pathlib.Path): + self.directory = make_config_path_absolute(clp_home, self.directory) + + def dump_to_primitive_dict(self): + d = self.dict() + d["directory"] = str(d["directory"]) + return d + + +class S3Storage(BaseModel): + type: Literal[StorageType.S3.value] = StorageType.S3.value + staging_directory: pathlib.Path = pathlib.Path("var") / "data" / "staged_archives" + s3_config: S3Config + + @validator("staging_directory") + def validate_staging_directory(cls, field): + if "" == field: + raise ValueError("staging_directory cannot be empty") + return field + + def make_config_paths_absolute(self, clp_home: pathlib.Path): + self.staging_directory = make_config_path_absolute(clp_home, self.staging_directory) + + def dump_to_primitive_dict(self): + d = self.dict() + d["staging_directory"] = str(d["staging_directory"]) + return d + + +class ArchiveOutput(BaseModel): + storage: Union[FsStorage, S3Storage] = FsStorage() target_archive_size: int = 256 * 1024 * 1024 # 256 MB target_dictionaries_size: int = 32 * 1024 * 1024 # 32 MB target_encoded_file_size: int = 256 * 1024 * 1024 # 256 MB @@ -335,13 +408,30 @@ def validate_target_segment_size(cls, field): raise ValueError("target_segment_size must be greater than 0") return field - def make_config_paths_absolute(self, clp_home: pathlib.Path): - self.directory = make_config_path_absolute(clp_home, self.directory) + def set_directory(self, directory: pathlib.Path): + storage_config = self.storage + storage_type = storage_config.type + if StorageType.FS == storage_type: + storage_config.directory = directory + elif StorageType.S3 == storage_type: + storage_config.staging_directory = directory + else: + raise NotImplementedError(f"storage.type {storage_type} is not supported") + + def get_directory(self) -> pathlib.Path: + storage_config = self.storage + storage_type = storage_config.type + if StorageType.FS == storage_config.type: + return storage_config.directory + elif StorageType.S3 == storage_type: + return storage_config.staging_directory + else: + raise NotImplementedError(f"storage.type {storage_type} is not supported") def dump_to_primitive_dict(self): d = self.dict() # Turn directory (pathlib.Path) into a primitive string - d["directory"] = str(d["directory"]) + d["storage"] = self.storage.dump_to_primitive_dict() return d @@ -352,7 +442,7 @@ class StreamOutput(BaseModel): @validator("directory") def validate_directory(cls, field): if "" == field: - raise ValueError("directory can not be empty") + raise ValueError("directory cannot be empty") return field @validator("target_uncompressed_size") @@ -408,7 +498,7 @@ def validate_port(cls, field): class CLPConfig(BaseModel): - execution_container: typing.Optional[str] + execution_container: Optional[str] = None input_logs_directory: pathlib.Path = pathlib.Path("/") @@ -436,7 +526,7 @@ class CLPConfig(BaseModel): def make_config_paths_absolute(self, clp_home: pathlib.Path): self.input_logs_directory = make_config_path_absolute(clp_home, self.input_logs_directory) self.credentials_file_path = make_config_path_absolute(clp_home, self.credentials_file_path) - self.archive_output.make_config_paths_absolute(clp_home) + self.archive_output.storage.make_config_paths_absolute(clp_home) self.stream_output.make_config_paths_absolute(clp_home) self.data_directory = make_config_path_absolute(clp_home, self.data_directory) self.logs_directory = make_config_path_absolute(clp_home, self.logs_directory) @@ -451,11 +541,19 @@ def validate_input_logs_dir(self): if not input_logs_dir.is_dir(): raise ValueError(f"input_logs_directory '{input_logs_dir}' is not a directory.") - def validate_archive_output_dir(self): + def validate_archive_output_config(self): + if ( + StorageType.S3 == self.archive_output.storage.type + and StorageEngine.CLP_S != self.package.storage_engine + ): + raise ValueError( + f"archive_output.storage.type = 's3' is only supported with package.storage_engine" + f" = '{StorageEngine.CLP_S}'" + ) try: - validate_path_could_be_dir(self.archive_output.directory) + validate_path_could_be_dir(self.archive_output.get_directory()) except ValueError as ex: - raise ValueError(f"archive_output.directory is invalid: {ex}") + raise ValueError(f"archive_output.storage's directory is invalid: {ex}") def validate_stream_output_dir(self): try: @@ -537,3 +635,23 @@ def dump_to_primitive_dict(self): d["data_directory"] = str(self.data_directory) d["logs_directory"] = str(self.logs_directory) return d + + +class WorkerConfig(BaseModel): + package: Package = Package() + archive_output: ArchiveOutput = ArchiveOutput() + data_directory: pathlib.Path = CLPConfig().data_directory + + # Only needed by query workers. + stream_output_dir: pathlib.Path = StreamOutput().directory + stream_collection_name: str = ResultsCache().stream_collection_name + + def dump_to_primitive_dict(self): + d = self.dict() + d["archive_output"] = self.archive_output.dump_to_primitive_dict() + + # Turn paths into primitive strings + d["data_directory"] = str(self.data_directory) + d["stream_output_dir"] = str(self.stream_output_dir) + + return d diff --git a/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py b/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py index 1ed727367..2c8133e8a 100644 --- a/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py +++ b/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py @@ -52,7 +52,7 @@ def main(argv): CREATE TABLE IF NOT EXISTS `{COMPRESSION_JOBS_TABLE_NAME}` ( `id` INT NOT NULL AUTO_INCREMENT, `status` INT NOT NULL DEFAULT '{CompressionJobStatus.PENDING}', - `status_msg` VARCHAR(255) NOT NULL DEFAULT '', + `status_msg` VARCHAR(512) NOT NULL DEFAULT '', `creation_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), `start_time` DATETIME(3) NULL DEFAULT NULL, `duration` FLOAT NULL DEFAULT NULL, diff --git a/components/clp-py-utils/clp_py_utils/s3_utils.py b/components/clp-py-utils/clp_py_utils/s3_utils.py new file mode 100644 index 000000000..03717a445 --- /dev/null +++ b/components/clp-py-utils/clp_py_utils/s3_utils.py @@ -0,0 +1,51 @@ +from pathlib import Path + +import boto3 +from botocore.config import Config +from botocore.exceptions import ClientError +from result import Err, Ok, Result + +from clp_py_utils.clp_config import S3Config + + +def s3_put( + s3_config: S3Config, src_file: Path, dest_file_name: str, total_max_attempts: int = 3 +) -> Result[bool, str]: + """ + Uploads a local file to an S3 bucket using AWS's PutObject operation. + :param s3_config: S3 configuration specifying the upload destination and credentials. + :param src_file: Local file to upload. + :param dest_file_name: The name for the uploaded file in the S3 bucket. + :param total_max_attempts: Maximum number of retry attempts for the upload. + :return: Result.OK(bool) on success, or Result.Err(str) with the error message otherwise. + """ + if not src_file.exists(): + return Err(f"{src_file} doesn't exist") + if not src_file.is_file(): + return Err(f"{src_file} is not a file") + if src_file.stat().st_size > 5 * 1024 * 1024 * 1024: + return Err(f"{src_file} is larger than the limit (5GiB) for a single PutObject operation.") + + config = Config(retries=dict(total_max_attempts=total_max_attempts, mode="adaptive")) + + my_s3_client = boto3.client( + "s3", + region_name=s3_config.region_code, + aws_access_key_id=s3_config.access_key_id, + aws_secret_access_key=s3_config.secret_access_key, + config=config, + ) + + with open(src_file, "rb") as file_data: + try: + my_s3_client.put_object( + Bucket=s3_config.bucket, Body=file_data, Key=s3_config.key_prefix + dest_file_name + ) + except ClientError as e: + error_code = e.response["Error"]["Code"] + error_message = e.response["Error"]["Message"] + return Err(f"ClientError: {error_code} - {error_message}") + except Exception as e: + return Err(f"An unexpected error occurred: {e}") + + return Ok(True) diff --git a/components/clp-py-utils/pyproject.toml b/components/clp-py-utils/pyproject.toml index 4e827b926..6d68ceebe 100644 --- a/components/clp-py-utils/pyproject.toml +++ b/components/clp-py-utils/pyproject.toml @@ -10,6 +10,7 @@ readme = "README.md" [tool.poetry.dependencies] python = "^3.8 || ^3.10" +boto3 = "^1.35.81" # mariadb version must be compatible with libmariadev installed in runtime env. # See https://mariadb.com/docs/server/connect/programming-languages/python/install/#Dependencies mariadb = "~1.0.11" @@ -19,6 +20,7 @@ python-dotenv = "^1.0.1" python-Levenshtein = "~0.22" sqlalchemy = "~2.0" PyYAML = "^6.0.1" +result = "^0.17.0" StrEnum = "^0.4.15" [build-system] diff --git a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py index ce88ad185..a5dbc0e35 100644 --- a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py +++ b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py @@ -4,6 +4,7 @@ import pathlib import subprocess from contextlib import closing +from typing import Any, Dict, Optional import yaml from celery.app.task import Task @@ -12,9 +13,14 @@ COMPRESSION_JOBS_TABLE_NAME, COMPRESSION_TASKS_TABLE_NAME, Database, + S3Config, StorageEngine, + StorageType, + WorkerConfig, ) from clp_py_utils.clp_logging import set_logging_level +from clp_py_utils.core import read_yaml_config_file +from clp_py_utils.s3_utils import s3_put from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.executor.compress.celery import app from job_orchestration.scheduler.constants import CompressionTaskStatus @@ -108,6 +114,7 @@ def make_clp_s_command( archive_output_dir: pathlib.Path, clp_config: ClpIoConfig, db_config_file_path: pathlib.Path, + enable_s3_write: bool, ): # fmt: off compression_cmd = [ @@ -120,6 +127,9 @@ def make_clp_s_command( ] # fmt: on + if enable_s3_write: + compression_cmd.append("--single-file-archive") + if clp_config.input.timestamp_key is not None: compression_cmd.append("--timestamp-key") compression_cmd.append(clp_config.input.timestamp_key) @@ -128,10 +138,9 @@ def make_clp_s_command( def run_clp( + worker_config: WorkerConfig, clp_config: ClpIoConfig, clp_home: pathlib.Path, - data_dir: pathlib.Path, - archive_output_dir: pathlib.Path, logs_dir: pathlib.Path, job_id: int, task_id: int, @@ -143,10 +152,9 @@ def run_clp( """ Compresses files from an FS into archives on an FS + :param worker_config: WorkerConfig :param clp_config: ClpIoConfig :param clp_home: - :param data_dir: - :param archive_output_dir: :param logs_dir: :param job_id: :param task_id: @@ -156,16 +164,31 @@ def run_clp( :param clp_metadata_db_connection_config :return: tuple -- (whether compression was successful, output messages) """ - clp_storage_engine = str(os.getenv("CLP_STORAGE_ENGINE")) - instance_id_str = f"compression-job-{job_id}-task-{task_id}" + clp_storage_engine = worker_config.package.storage_engine + data_dir = worker_config.data_directory + archive_output_dir = worker_config.archive_output.get_directory() + # Generate database config file for clp db_config_file_path = data_dir / f"{instance_id_str}-db-config.yml" db_config_file = open(db_config_file_path, "w") yaml.safe_dump(clp_metadata_db_connection_config, db_config_file) db_config_file.close() + # Get s3 config + s3_config: S3Config + enable_s3_write = False + storage_type = worker_config.archive_output.storage.type + if StorageType.S3 == storage_type: + if StorageEngine.CLP_S != clp_storage_engine: + error_msg = f"S3 storage is not supported for storage engine: {clp_storage_engine}." + logger.error(error_msg) + return False, {"error_message": error_msg} + + s3_config = worker_config.archive_output.storage.s3_config + enable_s3_write = True + if StorageEngine.CLP == clp_storage_engine: compression_cmd = make_clp_command( clp_home=clp_home, @@ -179,6 +202,7 @@ def run_clp( archive_output_dir=archive_output_dir, clp_config=clp_config, db_config_file_path=db_config_file_path, + enable_s3_write=enable_s3_write, ) else: logger.error(f"Unsupported storage engine {clp_storage_engine}") @@ -212,48 +236,65 @@ def run_clp( # Compute the total amount of data compressed last_archive_stats = None + last_line_decoded = False total_uncompressed_size = 0 total_compressed_size = 0 - while True: + + # Handle job metadata update and s3 write if enabled + s3_error = None + while not last_line_decoded: line = proc.stdout.readline() - if not line: - break - stats = json.loads(line.decode("ascii")) - if last_archive_stats is not None and stats["id"] != last_archive_stats["id"]: - # We've started a new archive so add the previous archive's last - # reported size to the total - total_uncompressed_size += last_archive_stats["uncompressed_size"] - total_compressed_size += last_archive_stats["size"] - with closing(sql_adapter.create_connection(True)) as db_conn, closing( - db_conn.cursor(dictionary=True) - ) as db_cursor: - update_job_metadata_and_tags( - db_cursor, - job_id, - clp_metadata_db_connection_config["table_prefix"], - tag_ids, - last_archive_stats, - ) - db_conn.commit() + stats: Optional[Dict[str, Any]] = None + if "" == line: + # Skip empty lines that could be caused by potential errors in printing archive stats + continue + + if line is not None: + stats = json.loads(line.decode("ascii")) + else: + last_line_decoded = True + + if last_archive_stats is not None and ( + None is stats or stats["id"] != last_archive_stats["id"] + ): + if enable_s3_write: + archive_id = last_archive_stats["id"] + archive_path = archive_output_dir / archive_id + + if s3_error is None: + logger.info(f"Uploading archive {archive_id} to S3...") + result = s3_put(s3_config, archive_path, archive_id) + + if result.is_err(): + logger.error(f"Failed to upload archive {archive_id}: {result.err_value}") + s3_error = result.err_value + # NOTE: It's possible `proc` finishes before we call `terminate` on it, in + # which case the process will still return success. + proc.terminate() + else: + logger.info(f"Finished uploading archive {archive_id} to S3.") + + archive_path.unlink() + + if s3_error is None: + # We've started a new archive so add the previous archive's last reported size to + # the total + total_uncompressed_size += last_archive_stats["uncompressed_size"] + total_compressed_size += last_archive_stats["size"] + with closing(sql_adapter.create_connection(True)) as db_conn, closing( + db_conn.cursor(dictionary=True) + ) as db_cursor: + update_job_metadata_and_tags( + db_cursor, + job_id, + clp_metadata_db_connection_config["table_prefix"], + tag_ids, + last_archive_stats, + ) + db_conn.commit() last_archive_stats = stats - if last_archive_stats is not None: - # Add the last archive's last reported size - total_uncompressed_size += last_archive_stats["uncompressed_size"] - total_compressed_size += last_archive_stats["size"] - with closing(sql_adapter.create_connection(True)) as db_conn, closing( - db_conn.cursor(dictionary=True) - ) as db_cursor: - update_job_metadata_and_tags( - db_cursor, - job_id, - clp_metadata_db_connection_config["table_prefix"], - tag_ids, - last_archive_stats, - ) - db_conn.commit() - # Wait for compression to finish return_code = proc.wait() if 0 != return_code: @@ -274,10 +315,16 @@ def run_clp( "total_uncompressed_size": total_uncompressed_size, "total_compressed_size": total_compressed_size, } - if compression_successful: + + if compression_successful and s3_error is None: return CompressionTaskStatus.SUCCEEDED, worker_output else: - worker_output["error_message"] = f"See logs {stderr_log_path}" + error_msgs = [] + if compression_successful is False: + error_msgs.append(f"See logs {stderr_log_path}") + if s3_error is not None: + error_msgs.append(s3_error) + worker_output["error_message"] = "\n".join(error_msgs) return CompressionTaskStatus.FAILED, worker_output @@ -291,15 +338,28 @@ def compress( paths_to_compress_json: str, clp_metadata_db_connection_config, ): - clp_home_str = os.getenv("CLP_HOME") - data_dir_str = os.getenv("CLP_DATA_DIR") - archive_output_dir_str = os.getenv("CLP_ARCHIVE_OUTPUT_DIR") - logs_dir_str = os.getenv("CLP_LOGS_DIR") + clp_home = pathlib.Path(os.getenv("CLP_HOME")) # Set logging level + logs_dir = pathlib.Path(os.getenv("CLP_LOGS_DIR")) clp_logging_level = str(os.getenv("CLP_LOGGING_LEVEL")) set_logging_level(logger, clp_logging_level) + # Load configuration + try: + worker_config = WorkerConfig.parse_obj( + read_yaml_config_file(pathlib.Path(os.getenv("CLP_CONFIG_PATH"))) + ) + except Exception as ex: + error_msg = "Failed to load worker config" + logger.exception(error_msg) + return CompressionTaskResult( + task_id=task_id, + status=CompressionTaskStatus.FAILED, + duration=0, + error_message=error_msg, + ) + clp_io_config = ClpIoConfig.parse_raw(clp_io_config_json) paths_to_compress = PathsToCompress.parse_raw(paths_to_compress_json) @@ -308,11 +368,10 @@ def compress( start_time = datetime.datetime.now() logger.info(f"[job_id={job_id} task_id={task_id}] COMPRESSION STARTED.") compression_task_status, worker_output = run_clp( + worker_config, clp_io_config, - pathlib.Path(clp_home_str), - pathlib.Path(data_dir_str), - pathlib.Path(archive_output_dir_str), - pathlib.Path(logs_dir_str), + clp_home, + logs_dir, job_id, task_id, tag_ids, diff --git a/components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py b/components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py index 423ebb757..58ae43450 100644 --- a/components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py +++ b/components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py @@ -5,14 +5,15 @@ from celery.app.task import Task from celery.utils.log import get_task_logger -from clp_py_utils.clp_config import Database, StorageEngine +from clp_py_utils.clp_config import Database, StorageEngine, StorageType, WorkerConfig from clp_py_utils.clp_logging import set_logging_level from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.executor.query.celery import app from job_orchestration.executor.query.utils import ( - report_command_creation_failure, + report_task_failure, run_query_task, ) +from job_orchestration.executor.utils import load_worker_config from job_orchestration.scheduler.job_config import ExtractIrJobConfig, ExtractJsonJobConfig from job_orchestration.scheduler.scheduler_data import QueryTaskStatus @@ -21,15 +22,17 @@ def make_command( - storage_engine: str, clp_home: Path, - archives_dir: Path, + worker_config: WorkerConfig, archive_id: str, - stream_output_dir: Path, job_config: dict, results_cache_uri: str, - stream_collection_name: str, ) -> Optional[List[str]]: + storage_engine = worker_config.package.storage_engine + archives_dir = worker_config.archive_output.get_directory() + stream_output_dir = worker_config.stream_output_dir + stream_collection_name = worker_config.stream_collection_name + if StorageEngine.CLP == storage_engine: logger.info("Starting IR extraction") extract_ir_config = ExtractIrJobConfig.parse_obj(job_config) @@ -97,28 +100,38 @@ def extract_stream( task_status: QueryTaskStatus sql_adapter = SQL_Adapter(Database.parse_obj(clp_metadata_db_conn_params)) + # Load configuration + clp_config_path = Path(os.getenv("CLP_CONFIG_PATH")) + worker_config = load_worker_config(clp_config_path, logger) + if worker_config is None: + return report_task_failure( + sql_adapter=sql_adapter, + task_id=task_id, + start_time=start_time, + ) + + if worker_config.archive_output.storage.type == StorageType.S3: + logger.error(f"Stream extraction is not supported for the S3 storage type") + return report_task_failure( + sql_adapter=sql_adapter, + task_id=task_id, + start_time=start_time, + ) + # Make task_command clp_home = Path(os.getenv("CLP_HOME")) - archive_directory = Path(os.getenv("CLP_ARCHIVE_OUTPUT_DIR")) - clp_storage_engine = os.getenv("CLP_STORAGE_ENGINE") - stream_output_dir = Path(os.getenv("CLP_STREAM_OUTPUT_DIR")) - stream_collection_name = os.getenv("CLP_STREAM_COLLECTION_NAME") task_command = make_command( - storage_engine=clp_storage_engine, clp_home=clp_home, - archives_dir=archive_directory, + worker_config=worker_config, archive_id=archive_id, - stream_output_dir=stream_output_dir, job_config=job_config, results_cache_uri=results_cache_uri, - stream_collection_name=stream_collection_name, ) if not task_command: - return report_command_creation_failure( + logger.error(f"Error creating {task_name} command") + return report_task_failure( sql_adapter=sql_adapter, - logger=logger, - task_name=task_name, task_id=task_id, start_time=start_time, ) diff --git a/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py b/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py index 598bfdcfc..7cf7b330f 100644 --- a/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py +++ b/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py @@ -5,14 +5,15 @@ from celery.app.task import Task from celery.utils.log import get_task_logger -from clp_py_utils.clp_config import Database, StorageEngine +from clp_py_utils.clp_config import Database, StorageEngine, StorageType, WorkerConfig from clp_py_utils.clp_logging import set_logging_level from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.executor.query.celery import app from job_orchestration.executor.query.utils import ( - report_command_creation_failure, + report_task_failure, run_query_task, ) +from job_orchestration.executor.utils import load_worker_config from job_orchestration.scheduler.job_config import SearchJobConfig from job_orchestration.scheduler.scheduler_data import QueryTaskStatus @@ -21,14 +22,16 @@ def make_command( - storage_engine: str, clp_home: Path, - archives_dir: Path, + worker_config: WorkerConfig, archive_id: str, search_config: SearchJobConfig, results_cache_uri: str, results_collection: str, ) -> Optional[List[str]]: + storage_engine = worker_config.package.storage_engine + archives_dir = worker_config.archive_output.get_directory() + if StorageEngine.CLP == storage_engine: command = [str(clp_home / "bin" / "clo"), "s", str(archives_dir / archive_id)] if search_config.path_filter is not None: @@ -116,26 +119,40 @@ def search( task_status: QueryTaskStatus sql_adapter = SQL_Adapter(Database.parse_obj(clp_metadata_db_conn_params)) + # Load configuration + clp_config_path = Path(os.getenv("CLP_CONFIG_PATH")) + worker_config = load_worker_config(clp_config_path, logger) + if worker_config is None: + return report_task_failure( + sql_adapter=sql_adapter, + task_id=task_id, + start_time=start_time, + ) + + if worker_config.archive_output.storage.type == StorageType.S3: + logger.error(f"Search is not supported for the S3 storage type") + return report_task_failure( + sql_adapter=sql_adapter, + task_id=task_id, + start_time=start_time, + ) + # Make task_command clp_home = Path(os.getenv("CLP_HOME")) - archive_directory = Path(os.getenv("CLP_ARCHIVE_OUTPUT_DIR")) - clp_storage_engine = os.getenv("CLP_STORAGE_ENGINE") search_config = SearchJobConfig.parse_obj(job_config) task_command = make_command( - storage_engine=clp_storage_engine, clp_home=clp_home, - archives_dir=archive_directory, + worker_config=worker_config, archive_id=archive_id, search_config=search_config, results_cache_uri=results_cache_uri, results_collection=job_id, ) if not task_command: - return report_command_creation_failure( + logger.error(f"Error creating {task_name} command") + return report_task_failure( sql_adapter=sql_adapter, - logger=logger, - task_name=task_name, task_id=task_id, start_time=start_time, ) diff --git a/components/job-orchestration/job_orchestration/executor/query/utils.py b/components/job-orchestration/job_orchestration/executor/query/utils.py index 69d22398e..523abbe00 100644 --- a/components/job-orchestration/job_orchestration/executor/query/utils.py +++ b/components/job-orchestration/job_orchestration/executor/query/utils.py @@ -19,14 +19,11 @@ def get_task_log_file_path(clp_logs_dir: Path, job_id: str, task_id: int) -> Pat return worker_logs_dir / f"{task_id}-clo.log" -def report_command_creation_failure( +def report_task_failure( sql_adapter: SQL_Adapter, - logger: Logger, - task_name: str, task_id: int, start_time: datetime.datetime, ): - logger.error(f"Error creating {task_name} command") task_status = QueryTaskStatus.FAILED update_query_task_metadata( sql_adapter, diff --git a/components/job-orchestration/job_orchestration/executor/utils.py b/components/job-orchestration/job_orchestration/executor/utils.py new file mode 100644 index 000000000..47ea702ae --- /dev/null +++ b/components/job-orchestration/job_orchestration/executor/utils.py @@ -0,0 +1,23 @@ +from logging import Logger +from pathlib import Path +from typing import Optional + +from clp_py_utils.clp_config import WorkerConfig +from clp_py_utils.core import read_yaml_config_file + + +def load_worker_config( + config_path: Path, + logger: Logger, +) -> Optional[WorkerConfig]: + """ + Loads a WorkerConfig object from the specified configuration file. + :param config_path: Path to the configuration file. + :param logger: Logger instance for reporting errors if loading fails. + :return: The loaded WorkerConfig object on success, None otherwise. + """ + try: + return WorkerConfig.parse_obj(read_yaml_config_file(config_path)) + except Exception: + logger.exception("Failed to load worker config") + return None diff --git a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py index 62b7a27fc..bd793686b 100644 --- a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py @@ -53,13 +53,14 @@ def update_compression_task_metadata(db_cursor, task_id, kv): logger.error("Must specify at least one field to update") raise ValueError - field_set_expressions = [f'{k}="{v}"' for k, v in kv.items()] + field_set_expressions = [f"{k} = %s" for k in kv.keys()] query = f""" - UPDATE {COMPRESSION_TASKS_TABLE_NAME} - SET {", ".join(field_set_expressions)} - WHERE id={task_id} + UPDATE {COMPRESSION_TASKS_TABLE_NAME} + SET {", ".join(field_set_expressions)} + WHERE id = %s """ - db_cursor.execute(query) + values = list(kv.values()) + [task_id] + db_cursor.execute(query, values) def update_compression_job_metadata(db_cursor, job_id, kv): @@ -67,13 +68,14 @@ def update_compression_job_metadata(db_cursor, job_id, kv): logger.error("Must specify at least one field to update") raise ValueError - field_set_expressions = [f'{k}="{v}"' for k, v in kv.items()] + field_set_expressions = [f"{k} = %s" for k in kv.keys()] query = f""" - UPDATE {COMPRESSION_JOBS_TABLE_NAME} - SET {", ".join(field_set_expressions)} - WHERE id={job_id} + UPDATE {COMPRESSION_JOBS_TABLE_NAME} + SET {", ".join(field_set_expressions)} + WHERE id = %s """ - db_cursor.execute(query) + values = list(kv.values()) + [job_id] + db_cursor.execute(query, values) def search_and_schedule_new_tasks(db_conn, db_cursor, clp_metadata_db_connection_config): diff --git a/components/package-template/src/etc/clp-config.yml b/components/package-template/src/etc/clp-config.yml index f19b93463..22b03b889 100644 --- a/components/package-template/src/etc/clp-config.yml +++ b/components/package-template/src/etc/clp-config.yml @@ -66,7 +66,9 @@ # ## Where archives should be output to #archive_output: -# directory: "var/data/archives" +# storage: +# type: "fs" +# directory: "var/data/archives" # # # How much data CLP should try to compress into each archive # target_archive_size: 268435456 # 256 MB