Skip to content

Commit

Permalink
Improve packaging for reducer
Browse files Browse the repository at this point in the history
  • Loading branch information
gibber9809 committed Oct 15, 2023
1 parent f404b1e commit 3439f4a
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,7 @@ def start_reducer(instance_id: str, clp_config: CLPConfig, container_clp_config:
'--config', str(container_clp_config.logs_directory / container_config_filename),
'--host', get_host_ip(),
"--concurrency", str(num_cpus),
"--polling-interval-ms", str(clp_config.reducer.polling_interval),
]

cmd = container_start_cmd + reducer_cmd
Expand Down Expand Up @@ -835,7 +836,7 @@ def main(argv):
if component_name in ['', DB_COMPONENT_NAME, SEARCH_SCHEDULER_COMPONENT_NAME,
COMPRESSION_JOB_HANDLER_COMPONENT_NAME,
WEBUI_QUERY_HANDLER_COMPONENT_NAME, DATABASE_COMPONENTS,
CONTROLLER_COMPONENTS]:
CONTROLLER_COMPONENTS, REDUCER_COMPONENT_NAME]:
validate_and_load_db_credentials_file(clp_config, clp_home, True)
if component_name in [
'',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ def main(argv):
container_config_file_path = logs_dir / f'{container_name}.yml'
if container_config_file_path.exists():
container_config_file_path.unlink()
if '' == component_name or REDUCER_COMPONENT_NAME == component_name:
stop_container(f'clp-{REDUCER_COMPONENT_NAME}-{instance_id}', stale_containers)
if '' == component_name or COMPRESSION_QUEUE_COMPONENT_NAME == component_name:
stop_queue_container(COMPRESSION_QUEUE_COMPONENT_NAME, instance_id, logs_dir, stale_containers)
if '' == component_name or SEARCH_QUEUE_COMPONENT_NAME == component_name:
Expand All @@ -144,8 +146,6 @@ def main(argv):
stop_container(container_name, stale_containers)
if '' == component_name or DB_COMPONENT_NAME == component_name:
stop_container(f'clp-{DB_COMPONENT_NAME}-{instance_id}', stale_containers)
if '' == component_name or REDUCER_COMPONENT_NAME == component_name:
stop_container(f'clp-{REDUCER_COMPONENT_NAME}-{instance_id}', stale_containers)

if '' == component_name:
# NOTE: We can only remove the instance ID file if all containers have been stopped.
Expand Down
9 changes: 9 additions & 0 deletions components/clp-py-utils/clp_py_utils/clp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ def validate_logging_level(cls, field):
class Reducer(BaseModel):
logging_level: str = 'INFO'
base_port: int = 14009
polling_interval: int = 100

@validator('logging_level')
def validate_logging_level(cls, field):
Expand All @@ -209,6 +210,14 @@ def validate_base_port(cls, field):
)
return field

@validator('polling_interval')
def validate_polling_interval(cls, field):
if not field > 0:
raise ValueError(
f"{cls.__name__}: polling interval {field} must be greater than zero"
)
return field

class Queue(BaseModel):
host: str = 'localhost'
port: int = 5672
Expand Down
14 changes: 9 additions & 5 deletions components/core/src/clo/reducer_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,18 @@ struct ServerContext {
mongocxx::collection results_collection;
std::string host;
std::string mongodb_collection;
int polling_interval_ms;

ServerContext(std::string& host, int port, std::string& mongodb_collection)
ServerContext(std::string& host, int port, std::string& mongodb_collection, int polling_interval_ms)
: acceptor(ioctx, tcp::endpoint(tcp::v6(), port)),
p(nullptr),
db(nullptr),
status(ServerStatus::IDLE),
job_id(-1),
port(port),
host(std::move(host)),
mongodb_collection(std::move(mongodb_collection)) {}
mongodb_collection(std::move(mongodb_collection)),
polling_interval_ms(polling_interval_ms) {}
};

struct RecordReceiverContext {
Expand Down Expand Up @@ -498,7 +500,7 @@ void poll_db(
}

if (ctx->status == ServerStatus::IDLE || ctx->status == ServerStatus::RUNNING) {
poll_timer->expires_at(poll_timer->expiry() + boost::asio::chrono::milliseconds(500));
poll_timer->expires_at(poll_timer->expiry() + boost::asio::chrono::milliseconds(ctx->polling_interval_ms));
poll_timer->async_wait(
boost::bind(poll_db, boost::asio::placeholders::error, ctx, poll_timer)
);
Expand Down Expand Up @@ -548,6 +550,7 @@ int main(int argc, char const* argv[]) {
std::string mongodb_database = "clp-search";
std::string mongodb_uri = "mongodb://localhost:27017/";
std::string mongodb_collection = "results";
int polling_interval_ms = 100;

po::options_description arguments("Arguments");
arguments.add_options()
Expand All @@ -561,6 +564,7 @@ int main(int argc, char const* argv[]) {
("mongodb-database", po::value<std::string>(&mongodb_database)->default_value("clp-search"))
("mongodb-uri", po::value<std::string>(&mongodb_uri)->default_value("mongodb://localhost:27017/"))
("mongodb-collection", po::value<std::string>(&mongodb_collection)->default_value("results"))
("polling-interval-ms", po::value<int>(&polling_interval_ms)->default_value(100))
;

po::variables_map opts;
Expand All @@ -573,7 +577,7 @@ int main(int argc, char const* argv[]) {
return -1;
}

ServerContext ctx(host, port, mongodb_collection);
ServerContext ctx(host, port, mongodb_collection, polling_interval_ms);

if (connect_to_db(&ctx, db_host, db_port, db_user, db_pass, database) < 0) {
std::cout << "Failed to connect to the database... exiting" << db_user << " " << db_pass
Expand All @@ -592,7 +596,7 @@ int main(int argc, char const* argv[]) {
return -1;
}
// Polling interval
boost::asio::steady_timer poll_timer(ctx.ioctx, boost::asio::chrono::milliseconds(500));
boost::asio::steady_timer poll_timer(ctx.ioctx, boost::asio::chrono::milliseconds(ctx.polling_interval_ms));

std::cout << "Starting on host " << ctx.host << " port " << port << " listening successfully " << ctx.acceptor.is_open() << std::endl;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def main(argv: List[str]) -> int:
args_parser.add_argument('--config', '-c', required=True, help='CLP configuration file.')
args_parser.add_argument('--host', required=True, help='Host ip this container is running on')
args_parser.add_argument('--concurrency', required=True, help='Number of reducer servers to run')
args_parser.add_argument('--polling-interval-ms', required=True, help='Database polling interval in ms')

parsed_args = args_parser.parse_args(argv[1:])

Expand Down Expand Up @@ -71,6 +72,7 @@ def main(argv: List[str]) -> int:
"--mongodb-database", clp_config.results_cache.db_name,
"--mongodb-uri", f"mongodb://{clp_config.results_cache.host}:{clp_config.results_cache.port}/",
"--mongodb-collection", clp_config.results_cache.results_collection_name,
"--polling-interval-ms", str(parsed_args.polling_interval_ms),
"--host", parsed_args.host,
"--port",
]
Expand All @@ -93,7 +95,7 @@ def main(argv: List[str]) -> int:
)

logger.info("reducers started.")
logger.info(f"Host={parsed_args.host} Base port={clp_config.reducer.base_port} Concurrency={concurrency}")
logger.info(f"Host={parsed_args.host} Base port={clp_config.reducer.base_port} Concurrency={concurrency} Polling Interval={parsed_args.polling_interval_ms}")
for r in reducers:
r.wait()

Expand Down
5 changes: 5 additions & 0 deletions components/package-template/src/etc/clp-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@
#search_worker:
# logging_level: "INFO"
#
#reducer:
# logging_level: "INFO"
# polling_interval: 100 # milliseconds
# base_port: 14009
#
## Where archives should be output to
#archive_output:
# type: "fs"
Expand Down

0 comments on commit 3439f4a

Please sign in to comment.