Skip to content

Commit

Permalink
feat(package)!: Add support for writing clp-s single file archives to…
Browse files Browse the repository at this point in the history
… S3. (#634)

Co-authored-by: kirkrodrigues <[email protected]>
  • Loading branch information
haiqi96 and kirkrodrigues authored Dec 19, 2024
1 parent 880a741 commit 37263eb
Show file tree
Hide file tree
Showing 18 changed files with 470 additions and 146 deletions.
26 changes: 20 additions & 6 deletions components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
REDIS_COMPONENT_NAME,
REDUCER_COMPONENT_NAME,
RESULTS_CACHE_COMPONENT_NAME,
StorageType,
WEBUI_COMPONENT_NAME,
WorkerConfig,
)
from clp_py_utils.core import (
get_config_value,
Expand Down Expand Up @@ -239,17 +241,17 @@ def generate_container_config(
DockerMountType.BIND, clp_config.logs_directory, container_clp_config.logs_directory
)

container_clp_config.archive_output.directory = pathlib.Path("/") / "mnt" / "archive-output"
container_clp_config.archive_output.set_directory(pathlib.Path("/") / "mnt" / "archive-output")
if not is_path_already_mounted(
clp_home,
CONTAINER_CLP_HOME,
clp_config.archive_output.directory,
container_clp_config.archive_output.directory,
clp_config.archive_output.get_directory(),
container_clp_config.archive_output.get_directory(),
):
docker_mounts.archives_output_dir = DockerMount(
DockerMountType.BIND,
clp_config.archive_output.directory,
container_clp_config.archive_output.directory,
clp_config.archive_output.get_directory(),
container_clp_config.archive_output.get_directory(),
)

container_clp_config.stream_output.directory = pathlib.Path("/") / "mnt" / "stream-output"
Expand All @@ -268,6 +270,18 @@ def generate_container_config(
return container_clp_config, docker_mounts


def generate_worker_config(clp_config: CLPConfig) -> WorkerConfig:
worker_config = WorkerConfig()
worker_config.package = clp_config.package.copy(deep=True)
worker_config.archive_output = clp_config.archive_output.copy(deep=True)
worker_config.data_directory = clp_config.data_directory

worker_config.stream_output_dir = clp_config.stream_output.directory
worker_config.stream_collection_name = clp_config.results_cache.stream_collection_name

return worker_config


def dump_container_config(
container_clp_config: CLPConfig, clp_config: CLPConfig, container_name: str
) -> Tuple[pathlib.Path, pathlib.Path]:
Expand Down Expand Up @@ -482,7 +496,7 @@ def validate_results_cache_config(

def validate_worker_config(clp_config: CLPConfig):
clp_config.validate_input_logs_dir()
clp_config.validate_archive_output_dir()
clp_config.validate_archive_output_config()
clp_config.validate_stream_output_dir()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import sys
from typing import Optional

from clp_py_utils.clp_config import CLPConfig
from clp_py_utils.clp_config import CLPConfig, StorageType

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
Expand Down Expand Up @@ -81,6 +81,11 @@ def handle_extract_file_cmd(
if clp_config is None:
return -1

storage_type = clp_config.archive_output.storage.type
if StorageType.FS != storage_type:
logger.error(f"File extraction is not supported for archive storage type: {storage_type}.")
return -1

container_name = generate_container_name(str(JobType.FILE_EXTRACTION))
container_clp_config, mounts = generate_container_config(clp_config, clp_home)
generated_config_path_on_container, generated_config_path_on_host = dump_container_config(
Expand Down Expand Up @@ -156,6 +161,13 @@ def handle_extract_stream_cmd(
if clp_config is None:
return -1

storage_type = clp_config.archive_output.storage.type
if StorageType.FS != storage_type:
logger.error(
f"Stream extraction is not supported for archive storage type: {storage_type}."
)
return -1

container_name = generate_container_name(str(JobType.IR_EXTRACTION))
container_clp_config, mounts = generate_container_config(clp_config, clp_home)
generated_config_path_on_container, generated_config_path_on_host = dump_container_config(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import sys
from pathlib import Path

from clp_py_utils.clp_config import StorageType

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
dump_container_config,
Expand Down Expand Up @@ -57,6 +59,11 @@ def main(argv):
logger.exception("Failed to load config.")
return -1

storage_type = clp_config.archive_output.storage.type
if StorageType.FS != storage_type:
logger.error(f"Archive deletion is not supported for storage type: {storage_type}.")
return -1

# Validate the input timestamp
begin_ts = parsed_args.begin_ts
end_ts = parsed_args.end_ts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def validate_and_load_config_file(
"""
try:
clp_config = load_config_file(config_file_path, default_config_file_path, clp_home)
clp_config.validate_archive_output_dir()
clp_config.validate_archive_output_config()
clp_config.validate_logs_dir()
return clp_config
except Exception:
Expand Down Expand Up @@ -207,7 +207,7 @@ def handle_extract_file_cmd(
list_path = parsed_args.files_from

logs_dir = clp_config.logs_directory
archives_dir = clp_config.archive_output.directory
archives_dir = clp_config.archive_output.get_directory()

# Generate database config file for clp
db_config_file_path = logs_dir / f".decompress-db-config-{uuid.uuid4()}.yml"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def main(argv):
return -1

database_config = clp_config.database
archives_dir = clp_config.archive_output.directory
archives_dir = clp_config.archive_output.get_directory()
if not archives_dir.exists():
logger.error("`archive_output.directory` doesn't exist.")
return -1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import uuid

import yaml
from clp_py_utils.clp_config import StorageType

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
Expand Down Expand Up @@ -74,6 +75,11 @@ def main(argv):
logger.exception("Failed to load config.")
return -1

storage_type = clp_config.archive_output.storage.type
if StorageType.FS != storage_type:
logger.error(f"Search is not supported for archive storage type: {storage_type}.")
return -1

container_name = generate_container_name(str(JobType.SEARCH))

container_clp_config, mounts = generate_container_config(clp_config, clp_home)
Expand Down
45 changes: 23 additions & 22 deletions components/clp-package-utils/clp_package_utils/scripts/start_clp.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
REDIS_COMPONENT_NAME,
REDUCER_COMPONENT_NAME,
RESULTS_CACHE_COMPONENT_NAME,
StorageType,
WEBUI_COMPONENT_NAME,
)
from job_orchestration.scheduler.constants import QueueName
Expand All @@ -42,6 +43,7 @@
DockerMount,
DockerMountType,
generate_container_config,
generate_worker_config,
get_clp_home,
is_container_exited,
is_container_running,
Expand Down Expand Up @@ -626,6 +628,7 @@ def start_compression_worker(
):
celery_method = "job_orchestration.executor.compress"
celery_route = f"{QueueName.COMPRESSION}"
compression_worker_mounts = [mounts.archives_output_dir]
generic_start_worker(
COMPRESSION_WORKER_COMPONENT_NAME,
instance_id,
Expand All @@ -637,8 +640,7 @@ def start_compression_worker(
clp_config.redis.compression_backend_database,
num_cpus,
mounts,
None,
None,
compression_worker_mounts,
)


Expand All @@ -652,11 +654,9 @@ def start_query_worker(
celery_method = "job_orchestration.executor.query"
celery_route = f"{QueueName.QUERY}"

query_worker_mount = [mounts.stream_output_dir]
query_worker_env = {
"CLP_STREAM_OUTPUT_DIR": container_clp_config.stream_output.directory,
"CLP_STREAM_COLLECTION_NAME": clp_config.results_cache.stream_collection_name,
}
query_worker_mounts = [mounts.stream_output_dir]
if clp_config.archive_output.storage.type == StorageType.FS:
query_worker_mounts.append(mounts.archives_output_dir)

generic_start_worker(
QUERY_WORKER_COMPONENT_NAME,
Expand All @@ -669,8 +669,7 @@ def start_query_worker(
clp_config.redis.query_backend_database,
num_cpus,
mounts,
query_worker_env,
query_worker_mount,
query_worker_mounts,
)


Expand All @@ -685,23 +684,26 @@ def generic_start_worker(
redis_database: int,
num_cpus: int,
mounts: CLPDockerMounts,
worker_specific_env: Dict[str, Any],
worker_specific_mount: List[Optional[DockerMount]],
worker_specific_mount: Optional[List[Optional[DockerMount]]],
):
logger.info(f"Starting {component_name}...")

container_name = f"clp-{component_name}-{instance_id}"
if container_exists(container_name):
return

validate_worker_config(clp_config)
container_config_filename = f"{container_name}.yml"
container_config_file_path = clp_config.logs_directory / container_config_filename
container_worker_config = generate_worker_config(container_clp_config)
with open(container_config_file_path, "w") as f:
yaml.safe_dump(container_worker_config.dump_to_primitive_dict(), f)

logs_dir = clp_config.logs_directory / component_name
logs_dir.mkdir(parents=True, exist_ok=True)
container_logs_dir = container_clp_config.logs_directory / component_name

# Create necessary directories
clp_config.archive_output.directory.mkdir(parents=True, exist_ok=True)
clp_config.archive_output.get_directory().mkdir(parents=True, exist_ok=True)
clp_config.stream_output.directory.mkdir(parents=True, exist_ok=True)

clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages"
Expand All @@ -724,24 +726,17 @@ def generic_start_worker(
f"{container_clp_config.redis.host}:{container_clp_config.redis.port}/{redis_database}"
),
"-e", f"CLP_HOME={CONTAINER_CLP_HOME}",
"-e", f"CLP_DATA_DIR={container_clp_config.data_directory}",
"-e", f"CLP_ARCHIVE_OUTPUT_DIR={container_clp_config.archive_output.directory}",
"-e", f"CLP_CONFIG_PATH={container_clp_config.logs_directory / container_config_filename}",
"-e", f"CLP_LOGS_DIR={container_logs_dir}",
"-e", f"CLP_LOGGING_LEVEL={worker_config.logging_level}",
"-e", f"CLP_STORAGE_ENGINE={clp_config.package.storage_engine}",
"-u", f"{os.getuid()}:{os.getgid()}",
]
if worker_specific_env:
for env_name, env_value in worker_specific_env.items():
container_start_cmd.append("-e")
container_start_cmd.append(f"{env_name}={env_value}")

# fmt: on

necessary_mounts = [
mounts.clp_home,
mounts.data_dir,
mounts.logs_dir,
mounts.archives_output_dir,
mounts.input_logs_dir,
]
if worker_specific_mount:
Expand Down Expand Up @@ -1125,6 +1120,12 @@ def main(argv):
QUERY_WORKER_COMPONENT_NAME,
):
validate_and_load_redis_credentials_file(clp_config, clp_home, True)
if target in (
ALL_TARGET_NAME,
COMPRESSION_WORKER_COMPONENT_NAME,
QUERY_WORKER_COMPONENT_NAME,
):
validate_worker_config(clp_config)

clp_config.validate_data_dir()
clp_config.validate_logs_dir()
Expand Down
Loading

0 comments on commit 37263eb

Please sign in to comment.