Skip to content

Commit

Permalink
Add reducer server to packaging
Browse files Browse the repository at this point in the history
  • Loading branch information
gibber9809 committed Oct 13, 2023
1 parent 15d0e7f commit f8b697b
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
SEARCH_QUEUE_COMPONENT_NAME,
SEARCH_SCHEDULER_COMPONENT_NAME,
SEARCH_WORKER_COMPONENT_NAME,
REDUCER_COMPONENT_NAME,
COMPRESSION_WORKER_COMPONENT_NAME,
COMPRESSION_JOB_HANDLER_COMPONENT_NAME,
WEBUI_COMPONENT_NAME,
Expand Down Expand Up @@ -588,6 +589,71 @@ def start_worker(component_name: str, instance_id: str, clp_config: CLPConfig, w

logger.info(f"Started {component_name}.")

def get_host_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(0)
try:
# doesn't even have to be reachable
s.connect(('10.254.254.254', 1))
ip = s.getsockname()[0]
except Exception:
ip = '127.0.0.1'
finally:
s.close()
return ip

def start_reducer(instance_id: str, clp_config: CLPConfig, container_clp_config: CLPConfig,
num_cpus: int, mounts: CLPDockerMounts):
logger.info(f"Starting {REDUCER_COMPONENT_NAME}...")

container_name = f'clp-{REDUCER_COMPONENT_NAME}-{instance_id}'
if container_exists(container_name):
logger.info(f"{component_name} already running.")
return

container_config_filename = f'{container_name}.yml'
container_config_file_path = clp_config.logs_directory / container_config_filename
with open(container_config_file_path, 'w') as f:
yaml.safe_dump(container_clp_config.dump_to_primitive_dict(), f)

logs_dir = clp_config.logs_directory / REDUCER_COMPONENT_NAME
logs_dir.mkdir(parents=True, exist_ok=True)
container_logs_dir = container_clp_config.logs_directory / REDUCER_COMPONENT_NAME

clp_site_packages_dir = CONTAINER_CLP_HOME / 'lib' / 'python3' / 'site-packages'
container_start_cmd = [
'docker', 'run',
'-di',
'--network', 'host',
'-w', str(CONTAINER_CLP_HOME),
'--name', container_name,
'-e', f'PYTHONPATH={clp_site_packages_dir}',
'-e', f'CLP_LOGS_DIR={container_logs_dir}',
'-e', f'CLP_LOGGING_LEVEL={clp_config.reducer.logging_level}',
'-e', f'CLP_HOME={CONTAINER_CLP_HOME}',
'--mount', str(mounts.clp_home),
]
necessary_mounts = [
mounts.logs_dir,
]
for mount in necessary_mounts:
if mount:
container_start_cmd.append('--mount')
container_start_cmd.append(str(mount))
container_start_cmd.append(clp_config.execution_container)

reducer_cmd = [
'python3', '-u', '-m',
'job_orchestration.reducer.reducer',
'--config', str(container_clp_config.logs_directory / container_config_filename),
'--host', get_host_ip(),
"--concurrency", str(num_cpus),
]

cmd = container_start_cmd + reducer_cmd
subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True)

logger.info(f"Started {REDUCER_COMPONENT_NAME}")

def start_webui(instance_id: str, clp_config: CLPConfig, mounts: CLPDockerMounts):
component_name = WEBUI_COMPONENT_NAME
Expand Down Expand Up @@ -737,6 +803,7 @@ def main(argv):
component_args_parser.add_parser(COMPRESSION_WORKER_COMPONENT_NAME)
component_args_parser.add_parser(WEBUI_COMPONENT_NAME)
component_args_parser.add_parser(WEBUI_QUERY_HANDLER_COMPONENT_NAME)
component_args_parser.add_parser(REDUCER_COMPONENT_NAME)
# Shortcut for multiple components
component_args_parser.add_parser(DATABASE_COMPONENTS)
component_args_parser.add_parser(CONTROLLER_COMPONENTS)
Expand Down Expand Up @@ -799,6 +866,8 @@ def main(argv):
num_cpus = parsed_args.num_cpus
if SEARCH_WORKER_COMPONENT_NAME == component_name and parsed_args.num_cpus != 0:
num_cpus = parsed_args.num_cpus
if REDUCER_COMPONENT_NAME == component_name and parsed_args.num_cpus != 0:
num_cpus = parsed_args.num_cpus

container_clp_config, mounts = generate_container_config(clp_config, clp_home)

Expand Down Expand Up @@ -836,6 +905,8 @@ def main(argv):
start_search_scheduler(instance_id, clp_config, container_clp_config, mounts)
if component_name in ['', SEARCH_WORKER_COMPONENT_NAME]:
start_search_worker(instance_id, clp_config, container_clp_config, num_cpus, mounts)
if component_name in ['', REDUCER_COMPONENT_NAME]:
start_reducer(instance_id, clp_config, container_clp_config, num_cpus, mounts)
if component_name in ['', COMPRESSION_WORKER_COMPONENT_NAME]:
start_compression_worker(instance_id, clp_config, container_clp_config, num_cpus, mounts)
if component_name in ['', WEBUI_QUERY_HANDLER_COMPONENT_NAME]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
SEARCH_QUEUE_COMPONENT_NAME,
SEARCH_SCHEDULER_COMPONENT_NAME,
SEARCH_WORKER_COMPONENT_NAME,
REDUCER_COMPONENT_NAME,
COMPRESSION_WORKER_COMPONENT_NAME,
COMPRESSION_JOB_HANDLER_COMPONENT_NAME,
WEBUI_COMPONENT_NAME,
Expand Down Expand Up @@ -76,6 +77,7 @@ def main(argv):
component_args_parser.add_parser(SEARCH_QUEUE_COMPONENT_NAME)
component_args_parser.add_parser(SEARCH_SCHEDULER_COMPONENT_NAME)
component_args_parser.add_parser(SEARCH_WORKER_COMPONENT_NAME)
component_args_parser.add_parser(REDUCER_COMPONENT_NAME)
component_args_parser.add_parser(COMPRESSION_WORKER_COMPONENT_NAME)
component_args_parser.add_parser(WEBUI_COMPONENT_NAME)
component_args_parser.add_parser(WEBUI_QUERY_HANDLER_COMPONENT_NAME)
Expand Down Expand Up @@ -142,6 +144,8 @@ def main(argv):
stop_container(container_name, stale_containers)
if '' == component_name or DB_COMPONENT_NAME == component_name:
stop_container(f'clp-{DB_COMPONENT_NAME}-{instance_id}', stale_containers)
if '' == component_name or REDUCER_COMPONENT_NAME == component_name:
stop_container(f'clp-{REDUCER_COMPONENT_NAME}-{instance_id}', stale_containers)

if '' == component_name:
# NOTE: We can only remove the instance ID file if all containers have been stopped.
Expand Down
18 changes: 18 additions & 0 deletions components/clp-py-utils/clp_py_utils/clp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
SEARCH_QUEUE_COMPONENT_NAME = 'search_queue'
SEARCH_SCHEDULER_COMPONENT_NAME = 'search_scheduler'
SEARCH_WORKER_COMPONENT_NAME = 'search_worker'
REDUCER_COMPONENT_NAME = "reducer"
COMPRESSION_WORKER_COMPONENT_NAME = 'compression_worker'
WEBUI_COMPONENT_NAME = 'webui'
WEBUI_QUERY_HANDLER_COMPONENT_NAME = 'webui_query_handler'
Expand Down Expand Up @@ -191,6 +192,22 @@ def validate_logging_level(cls, field):
validate_logging_level_static(cls, field)
return field

class Reducer(BaseModel):
logging_level: str = 'INFO'
base_port: int = 14009

@validator('logging_level')
def validate_logging_level(cls, field):
validate_logging_level_static(cls, field)
return field

@validator('base_port')
def validate_base_port(cls, field):
if not field > 0:
raise ValueError(
f"{cls.__name__}: base port {field} is not a valid value"
)
return field

class Queue(BaseModel):
host: str = 'localhost'
Expand Down Expand Up @@ -278,6 +295,7 @@ class CLPConfig(BaseModel):
compression_worker: CompressionWorker = CompressionWorker()
search_scheduler: SearchScheduler = SearchScheduler()
search_worker: SearchWorker = SearchWorker()
reducer: Reducer = Reducer()
webui: WebUi = WebUi()
webui_query_handler: WebUiQueryHandler = WebUiQueryHandler()
credentials_file_path: pathlib.Path = CLP_DEFAULT_CREDENTIALS_FILE_PATH
Expand Down
12 changes: 5 additions & 7 deletions components/core/src/clo/reducer_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ std::ostream& operator<<(std::ostream& os, ServerStatus const& status) {
struct ServerContext {
boost::asio::io_context ioctx;
boost::asio::ip::tcp::acceptor acceptor;
std::string ip;
Pipeline* p;
MYSQL* db;
ServerStatus status;
Expand Down Expand Up @@ -335,7 +334,7 @@ ServerStatus assign_new_job(ServerContext* ctx) {
for (auto job = jobs.begin(); job != jobs.end(); ++job) {
// set reducer_ready where pending_reducer and id=job_id
std::stringstream ss;
ss << "UPDATE distributed_search_jobs SET status=9, reducer_port=" << 14'009
ss << "UPDATE distributed_search_jobs SET status=9, reducer_port=" << ctx->port
<< ", reducer_host=\"" << ctx->host << "\" WHERE status=8 and id=" << job->first;
std::string update = ss.str();
if (0 != mysql_real_query(ctx->db, update.c_str(), update.length())) {
Expand Down Expand Up @@ -524,12 +523,10 @@ int main(int argc, char const* argv[]) {
// Polling interval
boost::asio::steady_timer poll_timer(ctx.ioctx, boost::asio::chrono::milliseconds(500));

std::cout << "Starting on host " << ctx.host << " port " << port << " listening successfully " << ctx.acceptor.is_open() << std::endl;

// Job acquisition loop
while (true) {
boost::asio::ip::address addr;
ctx.acceptor.local_endpoint().address(addr);
ctx.ip = addr.to_string();
std::cout << ctx.ip << std::endl;
// Queue up polling and tcp accepting
poll_timer.async_wait(
boost::bind(poll_db, boost::asio::placeholders::error, &ctx, &poll_timer)
Expand All @@ -554,8 +551,9 @@ int main(int argc, char const* argv[]) {
);
}

if (result_documents.size() > 0)
if (result_documents.size() > 0) {
ctx.results_collection.insert_many(result_documents);
}

if (set_job_done(&ctx) == -1) {
return -1;
Expand Down
109 changes: 109 additions & 0 deletions components/job-orchestration/job_orchestration/reducer/reducer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#!/usr/bin/env python3

import argparse
import logging
import os
import sys
import subprocess
from pathlib import Path
from typing import Any, Dict, List, Optional

from clp_py_utils.clp_config import SEARCH_JOBS_TABLE_NAME

from clp_py_utils.clp_config import CLPConfig, Database, ResultsCache
from clp_py_utils.clp_logging import get_logging_level
from clp_py_utils.core import read_yaml_config_file

# Setup logging
# Create logger
logger = logging.getLogger("reducer")
logger.setLevel(logging.INFO)
# Setup console logging
logging_console_handler = logging.StreamHandler()
logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
logging_console_handler.setFormatter(logging_formatter)
logger.addHandler(logging_console_handler)
# Prevents double logging from sub loggers for specific jobs
logger.propagate = False

def main(argv: List[str]) -> int:
# fmt: off
args_parser = argparse.ArgumentParser(description="Spin up reducers.")
args_parser.add_argument('--config', '-c', required=True, help='CLP configuration file.')
args_parser.add_argument('--host', required=True, help='Host ip this container is running on')
args_parser.add_argument('--concurrency', required=True, help='Number of reducer servers to run')

parsed_args = args_parser.parse_args(argv[1:])

# Setup logging to file
logs_dir = Path(os.getenv("CLP_LOGS_DIR"))
log_file = Path(os.getenv("CLP_LOGS_DIR")) / "reducer.log"
logging_file_handler = logging.FileHandler(filename=log_file, encoding="utf-8")
logging_file_handler.setFormatter(logging_formatter)
logger.addHandler(logging_file_handler)

# Update logging level based on config
logging_level_str = os.getenv("CLP_LOGGING_LEVEL")
logging_level = get_logging_level(logging_level_str)
logger.setLevel(logging_level)
logger.info(f"Start logging level = {logging.getLevelName(logging_level)}")

# Load configuration
config_path = Path(parsed_args.config)
try:
clp_config = CLPConfig.parse_obj(read_yaml_config_file(config_path))
except ValidationError as err:
logger.error(err)
return -1
except Exception as ex:
logger.error(ex)
# read_yaml_config_file already logs the parsing error inside
return -1

clp_home = Path(os.getenv("CLP_HOME"))
reducer_cmd = [
str(clp_home / "bin" / "reducer_server"),
"--db-host", clp_config.database.host,
"--db-port", str(clp_config.database.port),
"--db-user", str(clp_config.database.username),
"--db-pass", str(clp_config.database.password),
"--database", clp_config.database.name,
"--mongodb-database", clp_config.results_cache.db_name,
"--mongodb-uri", f"mongodb://{clp_config.results_cache.host}:{clp_config.results_cache.port}/",
"--mongodb-collection", clp_config.results_cache.results_collection_name,
"--host", parsed_args.host,
"--port",
]

reducers = []
concurrency = max(int(parsed_args.concurrency), 1)
for i in range(concurrency):
reducer_instance_cmd = reducer_cmd + [str(clp_config.reducer.base_port + i)]

log_file_path = logs_dir / ("reducer-" + str(i) + ".log")
log_file = open(log_file_path, 'a')

reducers.append(
subprocess.Popen(
reducer_instance_cmd,
close_fds=True,
stdout=log_file,
stderr=log_file,
)
)

logger.info("reducers started.")
logger.info(f"Host={parsed_args.host} Base port={clp_config.reducer.base_port} Concurrency={concurrency}")
for r in reducers:
r.wait()

logger.error("all reducers terminated")

logger.removeHandler(logging_file_handler)
logging_file_handler.close()

return 0


if "__main__" == __name__:
sys.exit(main(sys.argv))

0 comments on commit f8b697b

Please sign in to comment.