diff --git a/ocrd/ocrd/cli/__init__.py b/ocrd/ocrd/cli/__init__.py index d645daddf..cc75f6310 100644 --- a/ocrd/ocrd/cli/__init__.py +++ b/ocrd/ocrd/cli/__init__.py @@ -31,8 +31,7 @@ def get_help(self, ctx): from ocrd.decorators import ocrd_loglevel from .zip import zip_cli from .log import log_cli -from .processing_server import processing_server_cli -from .processing_worker import processing_worker_cli +from .network import network_cli @click.group() @@ -51,5 +50,4 @@ def cli(**kwargs): # pylint: disable=unused-argument cli.add_command(validate_cli) cli.add_command(log_cli) cli.add_command(resmgr_cli) -cli.add_command(processing_server_cli) -cli.add_command(processing_worker_cli) +cli.add_command(network_cli) diff --git a/ocrd/ocrd/cli/network.py b/ocrd/ocrd/cli/network.py new file mode 100644 index 000000000..f20f96585 --- /dev/null +++ b/ocrd/ocrd/cli/network.py @@ -0,0 +1,34 @@ +""" +OCR-D CLI: management of network components + +.. click:: ocrd.cli.network:network_cli + :prog: ocrd network + :nested: full +""" + +import click +import logging +from ocrd_utils import initLogging +from ocrd_network.cli import ( + client_cli, + processing_server_cli, + processing_worker_cli, + processor_server_cli, +) + + +@click.group("network") +def network_cli(): + """ + Managing network components + """ + initLogging() + # TODO: Remove after the logging fix in core + logging.getLogger('paramiko.transport').setLevel(logging.INFO) + logging.getLogger('ocrd_network').setLevel(logging.DEBUG) + + +network_cli.add_command(client_cli) +network_cli.add_command(processing_server_cli) +network_cli.add_command(processing_worker_cli) +network_cli.add_command(processor_server_cli) diff --git a/ocrd/ocrd/cli/processing_server.py b/ocrd/ocrd/cli/processing_server.py deleted file mode 100644 index a65e02a71..000000000 --- a/ocrd/ocrd/cli/processing_server.py +++ /dev/null @@ -1,41 +0,0 @@ -""" -OCR-D CLI: start the processing server - -.. click:: ocrd.cli.processing_server:processing_server_cli - :prog: ocrd processing-server - :nested: full -""" -import click -import logging -from ocrd_utils import initLogging -from ocrd_network import ( - ProcessingServer, - ProcessingServerParamType -) - - -@click.command('processing-server') -@click.argument('path_to_config', required=True, type=click.STRING) -@click.option('-a', '--address', - default="localhost:8080", - help='The URL of the Processing server, format: host:port', - type=ProcessingServerParamType(), - required=True) -def processing_server_cli(path_to_config, address: str): - """ - Start and manage processing workers with the processing server - - PATH_TO_CONFIG is a yaml file to configure the server and the workers. See - https://github.com/OCR-D/spec/pull/222/files#diff-a71bf71cbc7d9ce94fded977f7544aba4df9e7bdb8fc0cf1014e14eb67a9b273 - for further information (TODO: update path when spec is available/merged) - - """ - initLogging() - # TODO: Remove before the release - logging.getLogger('paramiko.transport').setLevel(logging.INFO) - logging.getLogger('ocrd.network').setLevel(logging.DEBUG) - - # Note, the address is already validated with the type field - host, port = address.split(':') - processing_server = ProcessingServer(path_to_config, host, port) - processing_server.start() diff --git a/ocrd/ocrd/decorators/__init__.py b/ocrd/ocrd/decorators/__init__.py index 2cffe12fe..98bbb4ba3 100644 --- a/ocrd/ocrd/decorators/__init__.py +++ b/ocrd/ocrd/decorators/__init__.py @@ -1,10 +1,5 @@ -from os.path import isfile from os import environ import sys -from contextlib import redirect_stdout -from io import StringIO - -import click from ocrd_utils import ( is_local_filename, @@ -15,7 +10,7 @@ from ocrd_utils import getLogger, initLogging, parse_json_string_with_comments from ocrd_validators import WorkspaceValidator -from ocrd_network import ProcessingWorker +from ocrd_network import ProcessingWorker, ProcessorServer from ..resolver import Resolver from ..processor.base import run_processor @@ -38,8 +33,12 @@ def ocrd_cli_wrap_processor( overwrite=False, show_resource=None, list_resources=False, + # ocrd_network params start # + agent_type=None, + agent_address=None, queue=None, database=None, + # ocrd_network params end # **kwargs ): if not sys.argv[1:]: @@ -56,96 +55,130 @@ def ocrd_cli_wrap_processor( list_resources=list_resources ) sys.exit() - # If either of these two is provided but not both - if bool(queue) != bool(database): - raise Exception("Options --queue and --database require each other.") - # If both of these are provided - start the processing worker instead of the processor - processorClass - if queue and database: - initLogging() - # TODO: Remove before the release - # We are importing the logging here because it's not the ocrd logging but python one - import logging - logging.getLogger('ocrd.network').setLevel(logging.DEBUG) - # Get the ocrd_tool dictionary - processor = processorClass(workspace=None, dump_json=True) - ocrd_tool = processor.ocrd_tool + initLogging() + + # Used for checking/starting network agents for the WebAPI architecture + # Has no side effects if neither of the 4 ocrd_network parameters are passed + check_and_run_network_agent(processorClass, agent_type, agent_address, database, queue) + + LOG = getLogger('ocrd_cli_wrap_processor') + # LOG.info('kwargs=%s' % kwargs) + # Merge parameter overrides and parameters + if 'parameter_override' in kwargs: + set_json_key_value_overrides(kwargs['parameter'], *kwargs['parameter_override']) + # TODO OCR-D/core#274 + # Assert -I / -O + # if not kwargs['input_file_grp']: + # raise ValueError('-I/--input-file-grp is required') + # if not kwargs['output_file_grp']: + # raise ValueError('-O/--output-file-grp is required') + resolver = Resolver() + working_dir, mets, _ = resolver.resolve_mets_arguments(working_dir, mets, None) + workspace = resolver.workspace_from_url(mets, working_dir) + page_id = kwargs.get('page_id') + # XXX not possible while processors do not adhere to # https://github.com/OCR-D/core/issues/505 + # if overwrite + # if 'output_file_grp' not in kwargs or not kwargs['output_file_grp']: + # raise Exception("--overwrite requires --output-file-grp") + # LOG.info("Removing files because of --overwrite") + # for grp in kwargs['output_file_grp'].split(','): + # if page_id: + # for one_page_id in kwargs['page_id'].split(','): + # LOG.debug("Removing files in output file group %s with page ID %s", grp, one_page_id) + # for file in workspace.mets.find_files(pageId=one_page_id, fileGrp=grp): + # workspace.remove_file(file, force=True, keep_file=False, page_recursive=True) + # else: + # LOG.debug("Removing all files in output file group %s ", grp) + # # TODO: can be reduced to `page_same_group=True` as soon as core#505 has landed (in all processors) + # workspace.remove_file_group(grp, recursive=True, force=True, keep_files=False, page_recursive=True, page_same_group=False) + # workspace.save_mets() + # XXX While https://github.com/OCR-D/core/issues/505 is open, set 'overwrite_mode' globally on the workspace + if overwrite: + workspace.overwrite_mode = True + report = WorkspaceValidator.check_file_grp(workspace, kwargs['input_file_grp'], '' if overwrite else kwargs['output_file_grp'], page_id) + if not report.is_valid: + raise Exception("Invalid input/output file grps:\n\t%s" % '\n\t'.join(report.errors)) + # Set up profiling behavior from environment variables/flags + if not profile and 'OCRD_PROFILE' in environ: + if 'CPU' in environ['OCRD_PROFILE']: + profile = True + if not profile_file and 'OCRD_PROFILE_FILE' in environ: + profile_file = environ['OCRD_PROFILE_FILE'] + if profile or profile_file: + import cProfile + import pstats + import io + import atexit + print("Profiling...") + pr = cProfile.Profile() + pr.enable() + def exit(): + pr.disable() + print("Profiling completed") + if profile_file: + with open(profile_file, 'wb') as f: + pr.dump_stats(profile_file) + s = io.StringIO() + pstats.Stats(pr, stream=s).sort_stats("cumulative").print_stats() + print(s.getvalue()) + atexit.register(exit) + run_processor(processorClass, mets_url=mets, workspace=workspace, **kwargs) + + +def check_and_run_network_agent(ProcessorClass, agent_type: str, agent_address: str, database: str, queue: str): + if not agent_type and (agent_address or database or queue): + raise ValueError("Options '--database', '--queue', and '--address' are valid only with '--type'") + if not agent_type: + return + + if not database: + raise ValueError("Options '--type' and '--database' are mutually inclusive") + allowed_agent_types = ['server', 'worker'] + if agent_type not in allowed_agent_types: + agents_str = ', '.join(allowed_agent_types) + raise ValueError(f"Wrong type parameter. Allowed types: {agents_str}") + if agent_type == 'server': + if not agent_address: + raise ValueError("Options '--type=server' and '--address' are mutually inclusive") + if queue: + raise ValueError("Options '--type=server' and '--queue' are mutually exclusive") + if agent_type == 'worker': + if not queue: + raise ValueError("Options '--type=worker' and '--queue' are mutually inclusive") + if agent_address: + raise ValueError("Options '--type=worker' and '--address' are mutually exclusive") + import logging + logging.getLogger('ocrd.network').setLevel(logging.DEBUG) + + processor = ProcessorClass(workspace=None, dump_json=True) + if agent_type == 'worker': try: + # TODO: Passing processor_name and ocrd_tool is reduntant processing_worker = ProcessingWorker( rabbitmq_addr=queue, mongodb_addr=database, - processor_name=ocrd_tool['executable'], - ocrd_tool=ocrd_tool, - processor_class=processorClass, + processor_name=processor.ocrd_tool['executable'], + ocrd_tool=processor.ocrd_tool, + processor_class=ProcessorClass, ) # The RMQConsumer is initialized and a connection to the RabbitMQ is performed processing_worker.connect_consumer() # Start consuming from the queue with name `processor_name` processing_worker.start_consuming() except Exception as e: - raise Exception("Processing worker has failed with error") from e - else: - initLogging() - LOG = getLogger('ocrd_cli_wrap_processor') - # LOG.info('kwargs=%s' % kwargs) - # Merge parameter overrides and parameters - if 'parameter_override' in kwargs: - set_json_key_value_overrides(kwargs['parameter'], *kwargs['parameter_override']) - # TODO OCR-D/core#274 - # Assert -I / -O - # if not kwargs['input_file_grp']: - # raise ValueError('-I/--input-file-grp is required') - # if not kwargs['output_file_grp']: - # raise ValueError('-O/--output-file-grp is required') - resolver = Resolver() - working_dir, mets, _ = resolver.resolve_mets_arguments(working_dir, mets, None) - workspace = resolver.workspace_from_url(mets, working_dir) - page_id = kwargs.get('page_id') - # XXX not possible while processors do not adhere to # https://github.com/OCR-D/core/issues/505 - # if overwrite - # if 'output_file_grp' not in kwargs or not kwargs['output_file_grp']: - # raise Exception("--overwrite requires --output-file-grp") - # LOG.info("Removing files because of --overwrite") - # for grp in kwargs['output_file_grp'].split(','): - # if page_id: - # for one_page_id in kwargs['page_id'].split(','): - # LOG.debug("Removing files in output file group %s with page ID %s", grp, one_page_id) - # for file in workspace.mets.find_files(pageId=one_page_id, fileGrp=grp): - # workspace.remove_file(file, force=True, keep_file=False, page_recursive=True) - # else: - # LOG.debug("Removing all files in output file group %s ", grp) - # # TODO: can be reduced to `page_same_group=True` as soon as core#505 has landed (in all processors) - # workspace.remove_file_group(grp, recursive=True, force=True, keep_files=False, page_recursive=True, page_same_group=False) - # workspace.save_mets() - # XXX While https://github.com/OCR-D/core/issues/505 is open, set 'overwrite_mode' globally on the workspace - if overwrite: - workspace.overwrite_mode = True - report = WorkspaceValidator.check_file_grp(workspace, kwargs['input_file_grp'], '' if overwrite else kwargs['output_file_grp'], page_id) - if not report.is_valid: - raise Exception("Invalid input/output file grps:\n\t%s" % '\n\t'.join(report.errors)) - # Set up profiling behavior from environment variables/flags - if not profile and 'OCRD_PROFILE' in environ: - if 'CPU' in environ['OCRD_PROFILE']: - profile = True - if not profile_file and 'OCRD_PROFILE_FILE' in environ: - profile_file = environ['OCRD_PROFILE_FILE'] - if profile or profile_file: - import cProfile - import pstats - import io - import atexit - print("Profiling...") - pr = cProfile.Profile() - pr.enable() - def exit(): - pr.disable() - print("Profiling completed") - if profile_file: - with open(profile_file, 'wb') as f: - pr.dump_stats(profile_file) - s = io.StringIO() - pstats.Stats(pr, stream=s).sort_stats("cumulative").print_stats() - print(s.getvalue()) - atexit.register(exit) - run_processor(processorClass, mets_url=mets, workspace=workspace, **kwargs) + sys.exit(f"Processing worker has failed with error: {e}") + if agent_type == 'server': + try: + # TODO: Better validate that inside the ProcessorServer itself + host, port = agent_address.split(':') + processor_server = ProcessorServer( + mongodb_addr=database, + processor_name=processor.ocrd_tool['executable'], + processor_class=ProcessorClass, + ) + processor_server.run_server(host=host, port=int(port)) + except Exception as e: + sys.exit(f"Processor server has failed with error: {e}") + sys.exit(0) diff --git a/ocrd/ocrd/decorators/ocrd_cli_options.py b/ocrd/ocrd/decorators/ocrd_cli_options.py index 2ba4bf8ae..5723471ce 100644 --- a/ocrd/ocrd/decorators/ocrd_cli_options.py +++ b/ocrd/ocrd/decorators/ocrd_cli_options.py @@ -1,7 +1,12 @@ +import click from click import option, Path from .parameter_option import parameter_option, parameter_override_option from .loglevel_option import loglevel_option -from ocrd_network import QueueServerParamType, DatabaseParamType +from ocrd_network import ( + DatabaseParamType, + ServerAddressParamType, + QueueServerParamType +) def ocrd_cli_options(f): @@ -33,6 +38,8 @@ def cli(mets_url): parameter_option, parameter_override_option, loglevel_option, + option('--type', 'agent_type', type=click.Choice(['worker', 'server'])), + option('--address', 'agent_address', type=ServerAddressParamType()), option('--queue', type=QueueServerParamType()), option('--database', type=DatabaseParamType()), option('-C', '--show-resource'), @@ -45,4 +52,3 @@ def cli(mets_url): for param in params: param(f) return f - diff --git a/ocrd/ocrd/lib.bash b/ocrd/ocrd/lib.bash index 11417c570..b033228d7 100644 --- a/ocrd/ocrd/lib.bash +++ b/ocrd/ocrd/lib.bash @@ -3,14 +3,14 @@ exit 1 ## ### `ocrd__raise` -## +## ## Raise an error and exit. ocrd__raise () { echo >&2 "ERROR: $1"; exit 127 } ## ### `ocrd__log` -## +## ## Delegate logging to `ocrd log` ocrd__log () { local log_level="${ocrd__argv[log_level]:-}" @@ -23,7 +23,7 @@ ocrd__log () { ## ### `ocrd__minversion` -## +## ## Ensure minimum version # ht https://stackoverflow.com/posts/4025065 ocrd__minversion () { @@ -53,28 +53,28 @@ ocrd__minversion () { } ## ### `ocrd__dumpjson` -## +## ## Output ocrd-tool.json. -## +## ## Requires `$OCRD_TOOL_JSON` and `$OCRD_TOOL_NAME` to be set: -## +## ## ```sh ## export OCRD_TOOL_JSON=/path/to/ocrd-tool.json ## export OCRD_TOOL_NAME=ocrd-foo-bar ## ``` -## +## ocrd__dumpjson () { ocrd ocrd-tool "$OCRD_TOOL_JSON" tool "$OCRD_TOOL_NAME" dump } -## +## ## Output file resource content. ## ocrd__show_resource () { ocrd ocrd-tool "$OCRD_TOOL_JSON" tool "$OCRD_TOOL_NAME" show-resource "$1" } -## +## ## Output file resources names. ## ocrd__list_resources () { @@ -82,9 +82,9 @@ ocrd__list_resources () { } ## ### `ocrd__usage` -## +## ## Print usage -## +## ocrd__usage () { ocrd ocrd-tool "$OCRD_TOOL_JSON" tool "$OCRD_TOOL_NAME" help @@ -92,9 +92,9 @@ ocrd__usage () { } ## ### `ocrd__parse_argv` -## +## ## Expects an associative array ("hash"/"dict") `ocrd__argv` to be defined: -## +## ## ```sh ## declare -A ocrd__argv=() ## ``` @@ -145,18 +145,33 @@ ocrd__parse_argv () { -V|--version) ocrd ocrd-tool "$OCRD_TOOL_JSON" version; exit ;; --queue) ocrd__worker_queue="$2" ; shift ;; --database) ocrd__worker_database="$2" ; shift ;; + --type) ocrd__worker_type="$2" ; shift ;; + --address) ocrd__worker_address="$2" ; shift ;; *) ocrd__raise "Unknown option '$1'" ;; esac shift done - if [ -v ocrd__worker_queue -a -v ocrd__worker_database ]; then - ocrd processing-worker $OCRD_TOOL_NAME --queue "${ocrd__worker_queue}" --database "${ocrd__worker_database}" + if [ -v ocrd__worker_queue -o -v ocrd__worker_database -o -v ocrd__worker_type -o -v ocrd__worker_address ]; then + if ! [ -v ocrd__worker_type ] ; then + ocrd__raise "For Processing Worker / Processor Server --type is required" + elif ! [ -v ocrd__worker_database ]; then + ocrd__raise "For the Processing Worker / Processor Server --database is required" + fi + if [ ${ocrd__worker_type} = "worker" ]; then + if ! [ -v ocrd__worker_queue ]; then + ocrd__raise "For the Processing Worker --queue is required" + fi + ocrd network processing-worker $OCRD_TOOL_NAME --queue "${ocrd__worker_queue}" --database "${ocrd__worker_database}" + elif [ ${ocrd__worker_type} = "server" ]; then + if ! [ -v ocrd__worker_address ]; then + ocrd__raise "For the Processor Server --address is required" + fi + ocrd network processor-server $OCRD_TOOL_NAME --database "${ocrd__worker_database}" --address "${ocrd__worker_address}" + else + ocrd__raise "--type must be either 'worker' or 'server' not '${ocrd__worker_type}'" + fi exit - elif [ -v ocrd__worker_queue ]; then - ocrd__raise "Processing Worker also requires a --database argument" - elif [ -v ocrd__worker_database ]; then - ocrd__raise "Processing Worker also requires a --queue argument" fi if [[ ! -e "${ocrd__argv[mets_file]}" ]]; then diff --git a/ocrd/ocrd/processor/helpers.py b/ocrd/ocrd/processor/helpers.py index bc3cce637..aa1af3b7e 100644 --- a/ocrd/ocrd/processor/helpers.py +++ b/ocrd/ocrd/processor/helpers.py @@ -279,6 +279,7 @@ def wrap(s): --database The MongoDB server address in format "mongodb://{host}:{port}" [mongodb://localhost:27018] + --type type of processing: either "worker" or "server" Options for information: -C, --show-resource RESNAME Dump the content of processor resource RESNAME @@ -301,7 +302,7 @@ def wrap(s): # Taken from https://github.com/OCR-D/core/pull/884 @freeze_args -@lru_cache(maxsize=environ.get('OCRD_MAX_PROCESSOR_CACHE', 128)) +@lru_cache(maxsize=environ.get('OCRD_MAX_PROCESSOR_CACHE', 128)) def get_cached_processor(parameter: dict, processor_class): """ Call this function to get back an instance of a processor. diff --git a/ocrd_network/ocrd_network/__init__.py b/ocrd_network/ocrd_network/__init__.py index 6cd95dc3c..aaeeba7fc 100644 --- a/ocrd_network/ocrd_network/__init__.py +++ b/ocrd_network/ocrd_network/__init__.py @@ -22,10 +22,12 @@ # Note: The Mets Server is still not placed on the architecture diagram and probably won't be a part of # the network package. The reason, Mets Server is tightly coupled with the `OcrdWorkspace`. +from .client import Client from .processing_server import ProcessingServer from .processing_worker import ProcessingWorker +from .processor_server import ProcessorServer from .param_validators import ( DatabaseParamType, - ProcessingServerParamType, + ServerAddressParamType, QueueServerParamType ) diff --git a/ocrd_network/ocrd_network/cli/__init__.py b/ocrd_network/ocrd_network/cli/__init__.py new file mode 100644 index 000000000..1704b2aaf --- /dev/null +++ b/ocrd_network/ocrd_network/cli/__init__.py @@ -0,0 +1,11 @@ +from .client import client_cli +from .processing_server import processing_server_cli +from .processing_worker import processing_worker_cli +from .processor_server import processor_server_cli + +__all__ = [ + 'client_cli', + 'processing_server_cli', + 'processing_worker_cli', + 'processor_server_cli' +] diff --git a/ocrd_network/ocrd_network/cli/client.py b/ocrd_network/ocrd_network/cli/client.py new file mode 100644 index 000000000..0af070992 --- /dev/null +++ b/ocrd_network/ocrd_network/cli/client.py @@ -0,0 +1,101 @@ +import click +from typing import Optional + +from ocrd.decorators import ( + parameter_option, + parameter_override_option +) +from ocrd_network import Client + + +@click.group('client') +def client_cli(): + """ + A client for interacting with the network modules. + The client CLI mimics the WebAPI endpoints + """ + pass + + +@client_cli.group('discovery') +def discovery_cli(): + """ + The discovery endpoint of the WebAPI + """ + pass + + +@client_cli.group('processing') +def processing_cli(): + """ + The processing endpoint of the WebAPI + """ + pass + + +@processing_cli.command('processor') +@click.argument('processor_name', required=True, type=click.STRING) +@click.option('--address') +@click.option('-m', '--mets', required=True, default="mets.xml") +@click.option('-I', '--input-file-grp', default='OCR-D-INPUT') +@click.option('-O', '--output-file-grp', default='OCR-D-OUTPUT') +@click.option('-g', '--page-id') +@parameter_option +@click.option('--result-queue-name') +@click.option('--callback-url') +@click.option('--agent-type', default='worker') +def send_processing_request( + address: Optional[str], + processor_name: str, + mets: str, + input_file_grp: str, + output_file_grp: Optional[str], + page_id: Optional[str], + parameter: Optional[dict], + result_queue_name: Optional[str], + callback_url: Optional[str], + # TODO: This is temporally available to toggle + # between the ProcessingWorker/ProcessorServer + agent_type: Optional[str] +): + req_params = { + "path_to_mets": mets, + "description": "OCR-D Network client request", + "input_file_grps": input_file_grp.split(','), + "parameters": parameter if parameter else {}, + "agent_type": agent_type, + } + if output_file_grp: + req_params["output_file_grps"] = output_file_grp.split(',') + if page_id: + req_params["page_id"] = page_id + if result_queue_name: + req_params["result_queue_name"] = result_queue_name + if callback_url: + req_params["callback_url"] = callback_url + + client = Client( + server_addr_processing=address + ) + response = client.send_processing_request( + processor_name=processor_name, + req_params=req_params + ) + processing_job_id = response.get('job_id', None) + print(f"Processing job id: {processing_job_id}") + + +@client_cli.group('workflow') +def workflow_cli(): + """ + The workflow endpoint of the WebAPI + """ + pass + + +@client_cli.group('workspace') +def workspace_cli(): + """ + The workspace endpoint of the WebAPI + """ + pass diff --git a/ocrd_network/ocrd_network/cli/processing_server.py b/ocrd_network/ocrd_network/cli/processing_server.py new file mode 100644 index 000000000..cf2aacab4 --- /dev/null +++ b/ocrd_network/ocrd_network/cli/processing_server.py @@ -0,0 +1,25 @@ +import click +from .. import ( + ProcessingServer, + ServerAddressParamType +) + + +@click.command('processing-server') +@click.argument('path_to_config', required=True, type=click.STRING) +@click.option('-a', '--address', + default="localhost:8080", + help='The URL of the Processing server, format: host:port', + type=ServerAddressParamType(), + required=True) +def processing_server_cli(path_to_config, address: str): + """ + Start the Processing Server + (proxy between the user and the + Processing Worker(s) / Processor Server(s)) + """ + + # Note, the address is already validated with the type field + host, port = address.split(':') + processing_server = ProcessingServer(path_to_config, host, port) + processing_server.start() diff --git a/ocrd/ocrd/cli/processing_worker.py b/ocrd_network/ocrd_network/cli/processing_worker.py similarity index 74% rename from ocrd/ocrd/cli/processing_worker.py rename to ocrd_network/ocrd_network/cli/processing_worker.py index e9311e061..1cc337738 100644 --- a/ocrd/ocrd/cli/processing_worker.py +++ b/ocrd_network/ocrd_network/cli/processing_worker.py @@ -1,20 +1,10 @@ -""" -OCR-D CLI: start the processing worker - -.. click:: ocrd.cli.processing_worker:processing_worker_cli - :prog: ocrd processing-worker - :nested: full -""" import click -import logging -from ocrd_utils import ( - initLogging, - get_ocrd_tool_json -) -from ocrd_network import ( +from ocrd_utils import get_ocrd_tool_json + +from .. import ( DatabaseParamType, ProcessingWorker, - QueueServerParamType, + QueueServerParamType ) @@ -23,18 +13,18 @@ @click.option('-q', '--queue', default="amqp://admin:admin@localhost:5672/", help='The URL of the Queue Server, format: amqp://username:password@host:port/vhost', - type=QueueServerParamType()) + type=QueueServerParamType(), + required=True) @click.option('-d', '--database', default="mongodb://localhost:27018", help='The URL of the MongoDB, format: mongodb://host:port', - type=DatabaseParamType()) + type=DatabaseParamType(), + required=True) def processing_worker_cli(processor_name: str, queue: str, database: str): """ - Start a processing worker (a specific ocr-d processor) + Start Processing Worker + (a specific ocr-d processor consuming tasks from RabbitMQ queue) """ - initLogging() - # TODO: Remove before the release - logging.getLogger('ocrd.network').setLevel(logging.DEBUG) # Get the ocrd_tool dictionary # ocrd_tool = parse_json_string_with_comments( diff --git a/ocrd_network/ocrd_network/cli/processor_server.py b/ocrd_network/ocrd_network/cli/processor_server.py new file mode 100644 index 000000000..534a9a0fe --- /dev/null +++ b/ocrd_network/ocrd_network/cli/processor_server.py @@ -0,0 +1,35 @@ +import click +from .. import ( + DatabaseParamType, + ProcessorServer, + ServerAddressParamType +) + + +@click.command('processor-server') +@click.argument('processor_name', required=True, type=click.STRING) +@click.option('-a', '--address', + help='The URL of the processor server, format: host:port', + type=ServerAddressParamType(), + required=True) +@click.option('-d', '--database', + default="mongodb://localhost:27018", + help='The URL of the MongoDB, format: mongodb://host:port', + type=DatabaseParamType(), + required=True) +def processor_server_cli(processor_name: str, address: str, database: str): + """ + Start Processor Server + (standalone REST API OCR-D processor) + """ + try: + # TODO: Better validate that inside the ProcessorServer itself + host, port = address.split(':') + processor_server = ProcessorServer( + mongodb_addr=database, + processor_name=processor_name, + processor_class=None, # For readability purposes assigned here + ) + processor_server.run_server(host=host, port=int(port)) + except Exception as e: + raise Exception("Processor server has failed with error") from e diff --git a/ocrd_network/ocrd_network/client.py b/ocrd_network/ocrd_network/client.py new file mode 100644 index 000000000..d237908f7 --- /dev/null +++ b/ocrd_network/ocrd_network/client.py @@ -0,0 +1,37 @@ +import json +from os import environ +import requests + + +# TODO: This is just a conceptual implementation and first try to +# trigger further discussions on how this should look like. +class Client: + def __init__( + self, + server_addr_processing: str = environ.get('OCRD_NETWORK_SERVER_ADDR_PROCESSING', ''), + server_addr_workflow: str = environ.get('OCRD_NETWORK_SERVER_ADDR_WORKFLOW', ''), + server_addr_workspace: str = environ.get('OCRD_NETWORK_SERVER_ADDR_WORKSPACE', ''), + ): + self.server_addr_processing = server_addr_processing + self.server_addr_workflow = server_addr_workflow + self.server_addr_workspace = server_addr_workspace + + def send_processing_request(self, processor_name: str, req_params: dict): + verify_server_protocol(self.server_addr_processing) + req_url = f'{self.server_addr_processing}/processor/{processor_name}' + req_headers = {"Content-Type": "application/json; charset=utf-8"} + req_json = json.loads(json.dumps(req_params)) + + print(f'Sending processing request to: {req_url}') + response = requests.post(url=req_url, headers=req_headers, json=req_json) + return response.json() + + +def verify_server_protocol(address: str): + protocol_matched = False + for protocol in ['http://', 'https://']: + if address.startswith(protocol): + protocol_matched = True + break + if not protocol_matched: + raise ValueError(f'Wrong/Missing protocol in the server address: {address}') diff --git a/ocrd_network/ocrd_network/deployer.py b/ocrd_network/ocrd_network/deployer.py index 27ac323fa..491e6a632 100644 --- a/ocrd_network/ocrd_network/deployer.py +++ b/ocrd_network/ocrd_network/deployer.py @@ -6,47 +6,110 @@ Each Processing Host may have several Processing Workers. Each Processing Worker is an instance of an OCR-D processor. """ - from __future__ import annotations -from typing import Dict, Union -from paramiko import SSHClient +from typing import Dict, List, Union from re import search as re_search +from os import getpid from time import sleep - from ocrd_utils import getLogger -from .deployment_config import * + from .deployment_utils import ( create_docker_client, - create_ssh_client, - CustomDockerClient, DeployType, - HostData, + wait_for_rabbitmq_availability ) -from .rabbitmq_utils import RMQPublisher +from .runtime_data import ( + DataHost, + DataMongoDB, + DataProcessingWorker, + DataProcessorServer, + DataRabbitMQ +) +from .utils import validate_and_load_config -class Deployer: - """Wraps the deployment functionality of the Processing Server - - Deployer is the one acting. - :py:attr:`config` is for representation of the config file only. - :py:attr:`hosts` is for managing processor information, not for actually processing. - """ - def __init__(self, config: ProcessingServerConfig) -> None: - """ - Args: - config (:py:class:`ProcessingServerConfig`): parsed configuration of the Processing Server - """ +class Deployer: + def __init__(self, config_path: str) -> None: self.log = getLogger(__name__) - self.config = config - self.hosts = HostData.from_config(config.hosts) - self.mongo_pid = None - self.mq_pid = None + config = validate_and_load_config(config_path) + + self.data_mongo: DataMongoDB = DataMongoDB(config['database']) + self.data_queue: DataRabbitMQ = DataRabbitMQ(config['process_queue']) + self.data_hosts: List[DataHost] = [] + for config_host in config['hosts']: + self.data_hosts.append(DataHost(config_host)) + + # TODO: Reconsider this. + def find_matching_processors( + self, + worker_only: bool = False, + server_only: bool = False, + docker_only: bool = False, + native_only: bool = False, + str_names_only: bool = False, + unique_only: bool = False + ) -> Union[List[str], List[object]]: + """Finds and returns a list of matching data objects of type: + `DataProcessingWorker` and `DataProcessorServer`. + + :py:attr:`worker_only` match only processors with worker status + :py:attr:`server_only` match only processors with server status + :py:attr:`docker_only` match only docker processors + :py:attr:`native_only` match only native processors + :py:attr:`str_only` returns the processor_name instead of data object + :py:attr:`unique_only` remove duplicates from the matches + + `worker_only` and `server_only` are mutually exclusive to each other + `docker_only` and `native_only` are mutually exclusive to each other + `unique_only` is allowed only together with `str_names_only` + """ + + if worker_only and server_only: + raise ValueError(f"Only 'worker_only' or 'server_only' is allowed, not both.") + if docker_only and native_only: + raise ValueError(f"Only 'docker_only' or 'native_only' is allowed, not both.") + if not str_names_only and unique_only: + raise ValueError(f"Value 'unique_only' is allowed only together with 'str_names_only'") + + # Find all matching objects of type: + # DataProcessingWorker or DataProcessorServer + matched_objects = [] + for data_host in self.data_hosts: + if not server_only: + for data_worker in data_host.data_workers: + if data_worker.deploy_type == DeployType.NATIVE and docker_only: + continue + if data_worker.deploy_type == DeployType.DOCKER and native_only: + continue + matched_objects.append(data_worker) + if not worker_only: + for data_server in data_host.data_servers: + if data_server.deploy_type == DeployType.NATIVE and docker_only: + continue + if data_server.deploy_type == DeployType.DOCKER and native_only: + continue + matched_objects.append(data_server) + if str_names_only: + # gets only the processor names of the matched objects + name_list = [match.processor_name for match in matched_objects] + if unique_only: + # removes the duplicates, if any + return list(dict.fromkeys(name_list)) + return name_list + return matched_objects + + def resolve_processor_server_url(self, processor_name) -> str: + processor_server_url = '' + for data_host in self.data_hosts: + for data_server in data_host.data_servers: + if data_server.processor_name == processor_name: + processor_server_url = f'http://{data_host.address}:{data_server.port}/' + return processor_server_url def kill_all(self) -> None: - """ kill all started services: workers, database, queue + """ kill all started services: hosts, database, queue The order of killing is important to optimize graceful shutdown in the future. If RabbitMQ server is killed before killing Processing Workers, that may have bad outcome and leave @@ -56,85 +119,135 @@ def kill_all(self) -> None: self.kill_mongodb() self.kill_rabbitmq() - def deploy_hosts(self, rabbitmq_url: str, mongodb_url: str) -> None: - for host in self.hosts: - self.log.debug(f'Deploying processing workers on host: {host.config.address}') - - if (any(p.deploy_type == DeployType.native for p in host.config.processors) - and not host.ssh_client): - host.ssh_client = create_ssh_client( - host.config.address, - host.config.username, - host.config.password, - host.config.keypath - ) - if (any(p.deploy_type == DeployType.docker for p in host.config.processors) - and not host.docker_client): - host.docker_client = create_docker_client( - host.config.address, - host.config.username, - host.config.password, - host.config.keypath + def deploy_hosts( + self, + mongodb_url: str, + rabbitmq_url: str + ) -> None: + for host_data in self.data_hosts: + if host_data.needs_ssh: + host_data.create_client(client_type='ssh') + assert host_data.ssh_client + if host_data.needs_docker: + host_data.create_client(client_type='docker') + assert host_data.docker_client + + self.log.debug(f'Deploying processing workers on host: {host_data.address}') + for data_worker in host_data.data_workers: + self._deploy_processing_worker( + mongodb_url, + rabbitmq_url, + host_data, + data_worker ) - for processor in host.config.processors: - self._deploy_processing_worker(processor, host, rabbitmq_url, mongodb_url) - - if host.ssh_client: - host.ssh_client.close() - if host.docker_client: - host.docker_client.close() - - def _deploy_processing_worker(self, processor: WorkerConfig, host: HostData, - rabbitmq_url: str, mongodb_url: str) -> None: - - self.log.debug(f"deploy '{processor.deploy_type}' processor: '{processor}' on '{host.config.address}'") - - for _ in range(processor.count): - if processor.deploy_type == DeployType.native: - assert host.ssh_client # to satisfy mypy - pid = self.start_native_processor( - client=host.ssh_client, - processor_name=processor.name, - queue_url=rabbitmq_url, - database_url=mongodb_url, + self.log.debug(f'Deploying processor servers on host: {host_data.address}') + for data_server in host_data.data_servers: + self._deploy_processor_server( + mongodb_url, + host_data, + data_server ) - host.pids_native.append(pid) - else: - assert processor.deploy_type == DeployType.docker - assert host.docker_client # to satisfy mypy - pid = self.start_docker_processor( - client=host.docker_client, - processor_name=processor.name, - queue_url=rabbitmq_url, - database_url=mongodb_url - ) - host.pids_docker.append(pid) - sleep(0.1) - - def deploy_rabbitmq(self, image: str, detach: bool, remove: bool, - ports_mapping: Union[Dict, None] = None) -> str: - """Start docker-container with rabbitmq - This method deploys the RabbitMQ Server. Handling of creation of queues, submitting messages - to queues, and receiving messages from queues is part of the RabbitMQ Library which is part - of the OCR-D WebAPI implementation. - """ + if host_data.ssh_client: + host_data.ssh_client.close() + host_data.ssh_client = None + if host_data.docker_client: + host_data.docker_client.close() + host_data.docker_client = None + + def _deploy_processing_worker( + self, + mongodb_url: str, + rabbitmq_url: str, + host_data: DataHost, + data_worker: DataProcessingWorker + ) -> None: + self.log.debug(f"Deploying processing worker, " + f"environment: '{data_worker.deploy_type}', " + f"name: '{data_worker.processor_name}', " + f"address: '{host_data.address}'") + + if data_worker.deploy_type == DeployType.NATIVE: + assert host_data.ssh_client # to satisfy mypy + pid = self.start_native_processor( + ssh_client=host_data.ssh_client, + processor_name=data_worker.processor_name, + queue_url=rabbitmq_url, + database_url=mongodb_url, + ) + data_worker.pid = pid + elif data_worker.deploy_type == DeployType.DOCKER: + assert host_data.docker_client # to satisfy mypy + pid = self.start_docker_processor( + docker_client=host_data.docker_client, + processor_name=data_worker.processor_name, + _queue_url=rabbitmq_url, + _database_url=mongodb_url + ) + data_worker.pid = pid + sleep(0.2) + + # TODO: Revisit this to remove code duplications of deploy_* methods + def _deploy_processor_server( + self, + mongodb_url: str, + host_data: DataHost, + data_server: DataProcessorServer, + ) -> None: + self.log.debug(f"Deploying processing worker, " + f"environment: '{data_server.deploy_type}', " + f"name: '{data_server.processor_name}', " + f"address: '{data_server.host}:{data_server.port}'") + + if data_server.deploy_type == DeployType.NATIVE: + assert host_data.ssh_client + pid = self.start_native_processor_server( + ssh_client=host_data.ssh_client, + processor_name=data_server.processor_name, + agent_address=f'{data_server.host}:{data_server.port}', + database_url=mongodb_url, + ) + data_server.pid = pid + + if data_server.processor_name in host_data.server_ports: + name = data_server.processor_name + port = data_server.port + if host_data.server_ports[name]: + host_data.server_ports[name] = host_data.server_ports[name].append(port) + else: + host_data.server_ports[name] = [port] + else: + host_data.server_ports[data_server.processor_name] = [data_server.port] + elif data_server.deploy_type == DeployType.DOCKER: + raise Exception("Deploying docker processor server is not supported yet!") + + def deploy_rabbitmq( + self, + image: str, + detach: bool, + remove: bool, + ports_mapping: Union[Dict, None] = None + ) -> str: self.log.debug(f"Trying to deploy '{image}', with modes: " f"detach='{detach}', remove='{remove}'") - if not self.config or not self.config.queue.address: + if not self.data_queue or not self.data_queue.address: raise ValueError('Deploying RabbitMQ has failed - missing configuration.') - client = create_docker_client(self.config.queue.address, self.config.queue.username, - self.config.queue.password, self.config.queue.keypath) + client = create_docker_client( + self.data_queue.address, + self.data_queue.ssh_username, + self.data_queue.ssh_password, + self.data_queue.ssh_keypath + ) if not ports_mapping: # 5672, 5671 - used by AMQP 0-9-1 and AMQP 1.0 clients without and with TLS # 15672, 15671: HTTP API clients, management UI and rabbitmq admin, without and with TLS # 25672: used for internode and CLI tools communication and is allocated from # a dynamic range (limited to a single port by default, computed as AMQP port + 20000) ports_mapping = { - 5672: self.config.queue.port, + 5672: self.data_queue.port, 15672: 15672, 25672: 25672 } @@ -145,61 +258,52 @@ def deploy_rabbitmq(self, image: str, detach: bool, remove: bool, ports=ports_mapping, # The default credentials to be used by the processing workers environment=[ - f'RABBITMQ_DEFAULT_USER={self.config.queue.credentials[0]}', - f'RABBITMQ_DEFAULT_PASS={self.config.queue.credentials[1]}' + f'RABBITMQ_DEFAULT_USER={self.data_queue.username}', + f'RABBITMQ_DEFAULT_PASS={self.data_queue.password}' ] ) assert res and res.id, \ - f'Failed to start RabbitMQ docker container on host: {self.config.mongo.address}' - self.mq_pid = res.id + f'Failed to start RabbitMQ docker container on host: {self.data_queue.address}' + self.data_queue.pid = res.id client.close() - # Build the RabbitMQ Server URL to return - rmq_host = self.config.queue.address - # note, integer validation is already performed - rmq_port = int(self.config.queue.port) - # the default virtual host since no field is - # provided in the processing server config.yml + rmq_host = self.data_queue.address + rmq_port = int(self.data_queue.port) rmq_vhost = '/' - self.wait_for_rabbitmq_availability(rmq_host, rmq_port, rmq_vhost, - self.config.queue.credentials[0], - self.config.queue.credentials[1]) - - rabbitmq_hostinfo = f'{rmq_host}:{rmq_port}{rmq_vhost}' - self.log.info(f'The RabbitMQ server was deployed on host: {rabbitmq_hostinfo}') - return rabbitmq_hostinfo - - def wait_for_rabbitmq_availability(self, host: str, port: int, vhost: str, username: str, - password: str) -> None: - max_waiting_steps = 15 - while max_waiting_steps > 0: - try: - dummy_publisher = RMQPublisher(host=host, port=port, vhost=vhost) - dummy_publisher.authenticate_and_connect(username=username, password=password) - except Exception: - max_waiting_steps -= 1 - sleep(2) - else: - # TODO: Disconnect the dummy_publisher here before returning... - return - raise RuntimeError('Error waiting for queue startup: timeout exceeded') - - def deploy_mongodb(self, image: str, detach: bool, remove: bool, - ports_mapping: Union[Dict, None] = None) -> str: - """ Start mongodb in docker - """ + wait_for_rabbitmq_availability( + host=rmq_host, + port=rmq_port, + vhost=rmq_vhost, + username=self.data_queue.username, + password=self.data_queue.password + ) + self.log.info(f'The RabbitMQ server was deployed on URL: ' + f'{rmq_host}:{rmq_port}{rmq_vhost}') + return self.data_queue.url + + def deploy_mongodb( + self, + image: str, + detach: bool, + remove: bool, + ports_mapping: Union[Dict, None] = None + ) -> str: self.log.debug(f"Trying to deploy '{image}', with modes: " f"detach='{detach}', remove='{remove}'") - if not self.config or not self.config.mongo.address: + if not self.data_mongo or not self.data_mongo.address: raise ValueError('Deploying MongoDB has failed - missing configuration.') - client = create_docker_client(self.config.mongo.address, self.config.mongo.username, - self.config.mongo.password, self.config.mongo.keypath) + client = create_docker_client( + self.data_mongo.address, + self.data_mongo.ssh_username, + self.data_mongo.ssh_password, + self.data_mongo.ssh_keypath + ) if not ports_mapping: ports_mapping = { - 27017: self.config.mongo.port + 27017: self.data_mongo.port } res = client.containers.run( image=image, @@ -209,67 +313,114 @@ def deploy_mongodb(self, image: str, detach: bool, remove: bool, ) if not res or not res.id: raise RuntimeError('Failed to start MongoDB docker container on host: ' - f'{self.config.mongo.address}') - self.mongo_pid = res.id + f'{self.data_mongo.address}') + self.data_mongo.pid = res.id client.close() - mongodb_hostinfo = f'{self.config.mongo.address}:{self.config.mongo.port}' + mongodb_hostinfo = f'{self.data_mongo.address}:{self.data_mongo.port}' self.log.info(f'The MongoDB was deployed on host: {mongodb_hostinfo}') - return mongodb_hostinfo + return self.data_mongo.url def kill_rabbitmq(self) -> None: - if not self.mq_pid: + if not self.data_queue.pid: self.log.warning('No running RabbitMQ instance found') return - client = create_docker_client(self.config.queue.address, self.config.queue.username, - self.config.queue.password, self.config.queue.keypath) - client.containers.get(self.mq_pid).stop() - self.mq_pid = None + client = create_docker_client( + self.data_queue.address, + self.data_queue.ssh_username, + self.data_queue.ssh_password, + self.data_queue.ssh_keypath + ) + client.containers.get(self.data_queue.pid).stop() + self.data_queue.pid = None client.close() self.log.info('The RabbitMQ is stopped') def kill_mongodb(self) -> None: - if not self.mongo_pid: + if not self.data_mongo.pid: self.log.warning('No running MongoDB instance found') return - client = create_docker_client(self.config.mongo.address, self.config.mongo.username, - self.config.mongo.password, self.config.mongo.keypath) - client.containers.get(self.mongo_pid).stop() - self.mongo_pid = None + client = create_docker_client( + self.data_mongo.address, + self.data_mongo.ssh_username, + self.data_mongo.ssh_password, + self.data_mongo.ssh_keypath + ) + client.containers.get(self.data_mongo.pid).stop() + self.data_mongo.pid = None client.close() self.log.info('The MongoDB is stopped') def kill_hosts(self) -> None: self.log.debug('Starting to kill/stop hosts') # Kill processing hosts - for host in self.hosts: - self.log.debug(f'Killing/Stopping processing workers on host: {host.config.address}') - if host.ssh_client: - host.ssh_client = create_ssh_client(host.config.address, host.config.username, - host.config.password, host.config.keypath) - if host.docker_client: - host.docker_client = create_docker_client(host.config.address, host.config.username, - host.config.password, host.config.keypath) - # Kill deployed OCR-D processor instances on this Processing worker host - self.kill_processing_worker(host) - - def kill_processing_worker(self, host: HostData) -> None: - for pid in host.pids_native: - self.log.debug(f"Trying to kill/stop native processor: with PID: '{pid}'") - host.ssh_client.exec_command(f'kill {pid}') - host.pids_native = [] - - for pid in host.pids_docker: - self.log.debug(f"Trying to kill/stop docker container with PID: '{pid}'") - host.docker_client.containers.get(pid).stop() - host.pids_docker = [] - - def start_native_processor(self, client: SSHClient, processor_name: str, queue_url: str, - database_url: str) -> str: + for host_data in self.data_hosts: + if host_data.needs_ssh: + host_data.create_client(client_type='ssh') + assert host_data.ssh_client + if host_data.needs_docker: + host_data.create_client(client_type='docker') + assert host_data.docker_client + + self.log.debug(f'Killing/Stopping processing workers on host: {host_data.address}') + self.kill_processing_workers(host_data) + + self.log.debug(f'Killing/Stopping processor servers on host: {host_data.address}') + self.kill_processor_servers(host_data) + + if host_data.ssh_client: + host_data.ssh_client.close() + host_data.ssh_client = None + if host_data.docker_client: + host_data.docker_client.close() + host_data.docker_client = None + + # TODO: Optimize the code duplication from start_* and kill_* methods + def kill_processing_workers(self, host_data: DataHost) -> None: + amount = len(host_data.data_workers) + if not amount: + self.log.info(f'No active processing workers to be stopped.') + return + self.log.info(f"Trying to stop {amount} processing workers:") + for worker in host_data.data_workers: + if not worker.pid: + continue + if worker.deploy_type == DeployType.NATIVE: + host_data.ssh_client.exec_command(f'kill {worker.pid}') + self.log.info(f"Stopped native worker with pid: '{worker.pid}'") + elif worker.deploy_type == DeployType.DOCKER: + host_data.docker_client.containers.get(worker.pid).stop() + self.log.info(f"Stopped docker worker with container id: '{worker.pid}'") + host_data.data_workers = [] + + def kill_processor_servers(self, host_data: DataHost) -> None: + amount = len(host_data.data_servers) + if not amount: + self.log.info(f'No active processor servers to be stopped.') + return + self.log.info(f"Trying to stop {amount} processing workers:") + for server in host_data.data_servers: + if not server.pid: + continue + if server.deploy_type == DeployType.NATIVE: + host_data.ssh_client.exec_command(f'kill {server.pid}') + self.log.info(f"Stopped native server with pid: '{server.pid}'") + elif server.deploy_type == DeployType.DOCKER: + host_data.docker_client.containers.get(server.pid).stop() + self.log.info(f"Stopped docker server with container id: '{server.pid}'") + host_data.data_servers = [] + + def start_native_processor( + self, + ssh_client, + processor_name: str, + queue_url: str, + database_url: str + ) -> str: """ start a processor natively on a host via ssh Args: - client: paramiko SSHClient to execute commands on a host + ssh_client: paramiko SSHClient to execute commands on a host processor_name: name of processor to run queue_url: url to rabbitmq database_url: url to database @@ -277,28 +428,60 @@ def start_native_processor(self, client: SSHClient, processor_name: str, queue_u Returns: str: pid of running process """ - self.log.info(f'Starting native processor: {processor_name}') - channel = client.invoke_shell() + self.log.info(f'Starting native processing worker: {processor_name}') + channel = ssh_client.invoke_shell() stdin, stdout = channel.makefile('wb'), channel.makefile('rb') - cmd = f'{processor_name} --database {database_url} --queue {queue_url}' + cmd = f'{processor_name} --type worker --database {database_url} --queue {queue_url}' # the only way (I could find) to make it work to start a process in the background and # return early is this construction. The pid of the last started background process is # printed with `echo $!` but it is printed inbetween other output. Because of that I added # `xyz` before and after the code to easily be able to filter out the pid via regex when # returning from the function - logpath = '/tmp/ocrd-processing-server-startup.log' - stdin.write(f"echo starting processor with '{cmd}' >> '{logpath}'\n") - stdin.write(f'{cmd} >> {logpath} 2>&1 &\n') + log_path = '/tmp/ocrd-processing-server-startup.log' + stdin.write(f"echo starting processing worker with '{cmd}' >> '{log_path}'\n") + stdin.write(f'{cmd} >> {log_path} 2>&1 &\n') stdin.write('echo xyz$!xyz \n exit \n') output = stdout.read().decode('utf-8') stdout.close() stdin.close() return re_search(r'xyz([0-9]+)xyz', output).group(1) # type: ignore - def start_docker_processor(self, client: CustomDockerClient, processor_name: str, - queue_url: str, database_url: str) -> str: + def start_docker_processor( + self, + docker_client, + processor_name: str, + _queue_url: str, + _database_url: str + ) -> str: + # TODO: Raise an exception here as well? + # raise Exception("Deploying docker processing worker is not supported yet!") + self.log.info(f'Starting docker container processor: {processor_name}') # TODO: add real command here to start processing server in docker here - res = client.containers.run('debian', 'sleep 500s', detach=True, remove=True) + res = docker_client.containers.run('debian', 'sleep 500s', detach=True, remove=True) assert res and res.id, f'Running processor: {processor_name} in docker-container failed' return res.id + + # TODO: Just a copy of the above start_native_processor() method. + # Far from being great... But should be good as a starting point + def start_native_processor_server( + self, + ssh_client, + processor_name: str, + agent_address: str, + database_url: str + ) -> str: + self.log.info(f"Starting native processor server: {processor_name} on {agent_address}") + channel = ssh_client.invoke_shell() + stdin, stdout = channel.makefile('wb'), channel.makefile('rb') + cmd = f'{processor_name} --type server --address {agent_address} --database {database_url}' + port = agent_address.split(':')[1] + log_path = f'/tmp/server_{processor_name}_{port}_{getpid()}.log' + # TODO: This entire stdin/stdout thing is broken with servers! + stdin.write(f"echo starting processor server with '{cmd}' >> '{log_path}'\n") + stdin.write(f'{cmd} >> {log_path} 2>&1 &\n') + stdin.write('echo xyz$!xyz \n exit \n') + output = stdout.read().decode('utf-8') + stdout.close() + stdin.close() + return re_search(r'xyz([0-9]+)xyz', output).group(1) # type: ignore diff --git a/ocrd_network/ocrd_network/deployment_config.py b/ocrd_network/ocrd_network/deployment_config.py deleted file mode 100644 index 48b123d1a..000000000 --- a/ocrd_network/ocrd_network/deployment_config.py +++ /dev/null @@ -1,87 +0,0 @@ -from typing import Dict -from yaml import safe_load -from ocrd_validators import ProcessingServerConfigValidator -from .deployment_utils import DeployType - -__all__ = [ - 'ProcessingServerConfig', - 'HostConfig', - 'WorkerConfig', - 'MongoConfig', - 'QueueConfig', -] - - -class ProcessingServerConfig: - def __init__(self, config_path: str) -> None: - # Load and validate the config - with open(config_path) as fin: - config = safe_load(fin) - report = ProcessingServerConfigValidator.validate(config) - if not report.is_valid: - raise Exception(f'Processing-Server configuration file is invalid:\n{report.errors}') - - # Split the configurations - self.mongo = MongoConfig(config['database']) - self.queue = QueueConfig(config['process_queue']) - self.hosts = [] - for host in config['hosts']: - self.hosts.append(HostConfig(host)) - - -class HostConfig: - """Class to wrap information for all processing-worker-hosts. - - Config information and runtime information is stored here. This class - should not do much but hold config information and runtime information. I - hope to make the code better understandable this way. Deployer should still - be the class who does things and this class here should be mostly passive - """ - - def __init__(self, config: dict) -> None: - self.address = config['address'] - self.username = config['username'] - self.password = config.get('password', None) - self.keypath = config.get('path_to_privkey', None) - self.processors = [] - for worker in config['workers']: - deploy_type = DeployType.from_str(worker['deploy_type']) - self.processors.append( - WorkerConfig(worker['name'], worker['number_of_instance'], deploy_type) - ) - - -class WorkerConfig: - """ - Class wrapping information from config file for an OCR-D processor - """ - def __init__(self, name: str, count: int, deploy_type: DeployType) -> None: - self.name = name - self.count = count - self.deploy_type = deploy_type - - -class MongoConfig: - """ Class to hold information for Mongodb-Docker container - """ - - def __init__(self, config: Dict) -> None: - self.address = config['address'] - self.port = int(config['port']) - self.username = config['ssh']['username'] - self.keypath = config['ssh'].get('path_to_privkey', None) - self.password = config['ssh'].get('password', None) - self.credentials = (config['credentials']['username'], config['credentials']['password']) - - -class QueueConfig: - """ Class to hold information for RabbitMQ-Docker container - """ - - def __init__(self, config: Dict) -> None: - self.address = config['address'] - self.port = int(config['port']) - self.username = config['ssh']['username'] - self.keypath = config['ssh'].get('path_to_privkey', None) - self.password = config['ssh'].get('password', None) - self.credentials = (config['credentials']['username'], config['credentials']['password']) diff --git a/ocrd_network/ocrd_network/deployment_utils.py b/ocrd_network/ocrd_network/deployment_utils.py index 6b943127b..9be063cb2 100644 --- a/ocrd_network/ocrd_network/deployment_utils.py +++ b/ocrd_network/ocrd_network/deployment_utils.py @@ -1,62 +1,34 @@ from __future__ import annotations from enum import Enum -from typing import Union, List -from distutils.spawn import find_executable as which -import re - from docker import APIClient, DockerClient from docker.transport import SSHHTTPAdapter from paramiko import AutoAddPolicy, SSHClient +from time import sleep -from ocrd_utils import getLogger -from .deployment_config import * +from .rabbitmq_utils import RMQPublisher __all__ = [ 'create_docker_client', 'create_ssh_client', - 'CustomDockerClient', 'DeployType', - 'HostData', - 'is_bashlib_processor' + 'wait_for_rabbitmq_availability' ] -def create_ssh_client(address: str, username: str, password: Union[str, None], - keypath: Union[str, None]) -> SSHClient: +def create_ssh_client(address: str, username: str, password: str = "", keypath: str = "") -> SSHClient: client = SSHClient() client.set_missing_host_key_policy(AutoAddPolicy) try: client.connect(hostname=address, username=username, password=password, key_filename=keypath) - except Exception: - getLogger(__name__).error(f"Error creating SSHClient for host: '{address}'") - raise + except Exception as error: + raise Exception(f"Error creating SSHClient of host '{address}', reason:") from error return client -def create_docker_client(address: str, username: str, password: Union[str, None], - keypath: Union[str, None]) -> CustomDockerClient: +def create_docker_client(address: str, username: str, password: str = "", keypath: str = "") -> CustomDockerClient: return CustomDockerClient(username, address, password=password, keypath=keypath) - -class HostData: - """class to store runtime information for a host - """ - def __init__(self, config: HostConfig) -> None: - self.config = config - self.ssh_client: Union[SSHClient, None] = None - self.docker_client: Union[CustomDockerClient, None] = None - self.pids_native: List[str] = [] - self.pids_docker: List[str] = [] - - @staticmethod - def from_config(config: List[HostConfig]) -> List[HostData]: - res = [] - for host_config in config: - res.append(HostData(host_config)) - return res - - class CustomDockerClient(DockerClient): """Wrapper for docker.DockerClient to use an own SshHttpAdapter. @@ -82,21 +54,20 @@ def __init__(self, user: str, host: str, **kwargs) -> None: # the super-constructor is not called on purpose: it solely instantiates the APIClient. The # missing `version` in that call would raise an error. APIClient is provided here as a # replacement for what the super-constructor does - if not user or not host: + if not (user and host): raise ValueError('Missing argument: user and host must both be provided') - if 'password' not in kwargs and 'keypath' not in kwargs: + if ('password' not in kwargs) != ('keypath' not in kwargs): raise ValueError('Missing argument: one of password and keyfile is needed') self.api = APIClient(f'ssh://{host}', use_ssh_client=True, version='1.41') ssh_adapter = self.CustomSshHttpAdapter(f'ssh://{user}@{host}:22', **kwargs) self.api.mount('http+docker://ssh', ssh_adapter) class CustomSshHttpAdapter(SSHHTTPAdapter): - def __init__(self, base_url, password: Union[str, None] = None, - keypath: Union[str, None] = None) -> None: + def __init__(self, base_url, password: str = "", keypath: str = "") -> None: self.password = password self.keypath = keypath - if not self.password and not self.keypath: - raise Exception("either 'password' or 'keypath' must be provided") + if bool(self.password) == bool(self.keypath): + raise Exception("Either 'password' or 'keypath' must be provided") super().__init__(base_url) def _create_paramiko_client(self, base_url: str) -> None: @@ -112,18 +83,29 @@ def _create_paramiko_client(self, base_url: str) -> None: self.ssh_client.set_missing_host_key_policy(AutoAddPolicy) -class DeployType(Enum): - """ Deploy-Type of the processing server. - """ - docker = 1 - native = 2 - - @staticmethod - def from_str(label: str) -> DeployType: - return DeployType[label.lower()] +def wait_for_rabbitmq_availability( + host: str, + port: int, + vhost: str, + username: str, + password: str +) -> None: + max_waiting_steps = 15 + while max_waiting_steps > 0: + try: + dummy_publisher = RMQPublisher(host=host, port=port, vhost=vhost) + dummy_publisher.authenticate_and_connect(username=username, password=password) + except Exception: + max_waiting_steps -= 1 + sleep(2) + else: + # TODO: Disconnect the dummy_publisher here before returning... + return + raise RuntimeError('Error waiting for queue startup: timeout exceeded') - def is_native(self) -> bool: - return self == DeployType.native - def is_docker(self) -> bool: - return self == DeployType.docker +class DeployType(Enum): + """ Deploy-Type of the processing worker/processor server. + """ + DOCKER = 1 + NATIVE = 2 diff --git a/ocrd_network/ocrd_network/models/job.py b/ocrd_network/ocrd_network/models/job.py index 3c0857c37..aa50e6aad 100644 --- a/ocrd_network/ocrd_network/models/job.py +++ b/ocrd_network/ocrd_network/models/job.py @@ -25,11 +25,14 @@ class PYJobInput(BaseModel): parameters: dict = {} # Always set to empty dict when None, otherwise it fails ocr-d-validation result_queue_name: Optional[str] = None callback_url: Optional[str] = None + # Used to toggle between sending requests to 'worker and 'server', + # i.e., Processing Worker and Processor Server, respectively + agent_type: Optional[str] = 'worker' class Config: schema_extra = { 'example': { - 'path': '/path/to/mets.xml', + 'path_to_mets': '/path/to/mets.xml', 'description': 'The description of this execution', 'input_file_grps': ['INPUT_FILE_GROUP'], 'output_file_grps': ['OUTPUT_FILE_GROUP'], diff --git a/ocrd_network/ocrd_network/param_validators.py b/ocrd_network/ocrd_network/param_validators.py index 8e4669451..87cfeee72 100644 --- a/ocrd_network/ocrd_network/param_validators.py +++ b/ocrd_network/ocrd_network/param_validators.py @@ -6,8 +6,8 @@ ) -class ProcessingServerParamType(ParamType): - name = 'Processing server string format' +class ServerAddressParamType(ParamType): + name = 'Server address string format' expected_format = 'host:port' def convert(self, value, param, ctx): diff --git a/ocrd_network/ocrd_network/process_helpers.py b/ocrd_network/ocrd_network/process_helpers.py new file mode 100644 index 000000000..ea8c77699 --- /dev/null +++ b/ocrd_network/ocrd_network/process_helpers.py @@ -0,0 +1,47 @@ +import json +from typing import List + +from ocrd import Resolver +from ocrd.processor.helpers import run_cli, run_processor + + +# A wrapper for run_processor() and run_cli() +def invoke_processor( + processor_class, + executable: str, + abs_path_to_mets: str, + input_file_grps: List[str], + output_file_grps: List[str], + page_id: str, + parameters: dict, +) -> None: + if not (processor_class or executable): + raise ValueError(f'Missing processor class and executable') + input_file_grps_str = ','.join(input_file_grps) + output_file_grps_str = ','.join(output_file_grps) + workspace = Resolver().workspace_from_url(abs_path_to_mets) + if processor_class: + try: + run_processor( + processorClass=processor_class, + workspace=workspace, + input_file_grp=input_file_grps_str, + output_file_grp=output_file_grps_str, + page_id=page_id, + parameter=parameters, + instance_caching=True + ) + except Exception as e: + raise RuntimeError(f"Python executable '{executable}' exited with: {e}") + else: + return_code = run_cli( + executable=executable, + workspace=workspace, + mets_url=abs_path_to_mets, + input_file_grp=input_file_grps_str, + output_file_grp=output_file_grps_str, + page_id=page_id, + parameter=json.dumps(parameters) + ) + if return_code != 0: + raise RuntimeError(f"CLI executable '{executable}' exited with: {return_code}") diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index 582c1134c..2a542219f 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -1,4 +1,7 @@ -from typing import Dict +import json +import requests +import httpx +from typing import Dict, List import uvicorn from fastapi import FastAPI, status, Request, HTTPException @@ -7,23 +10,26 @@ from pika.exceptions import ChannelClosedByBroker -from ocrd_utils import getLogger, get_ocrd_tool_json -from ocrd_validators import ParameterValidator -from .database import ( - db_get_processing_job, - db_get_workspace, - initiate_database -) +from ocrd_utils import getLogger +from .database import initiate_database from .deployer import Deployer -from .deployment_config import ProcessingServerConfig -from .rabbitmq_utils import RMQPublisher, OcrdProcessingMessage from .models import ( DBProcessorJob, PYJobInput, PYJobOutput, StateEnum ) -from .utils import generate_created_time, generate_id +from .rabbitmq_utils import RMQPublisher, OcrdProcessingMessage +from .server_utils import ( + _get_processor_job, + validate_and_resolve_mets_path, + validate_job_input, +) +from .utils import ( + download_ocrd_all_tool_json, + generate_created_time, + generate_id +) class ProcessingServer(FastAPI): @@ -42,23 +48,27 @@ def __init__(self, config_path: str, host: str, port: int) -> None: title='OCR-D Processing Server', description='OCR-D processing and processors') self.log = getLogger(__name__) + self.log.info(f"Downloading ocrd all tool json") + self.ocrd_all_tool_json = download_ocrd_all_tool_json( + ocrd_all_url="https://ocr-d.de/js/ocrd-all-tool.json" + ) self.hostname = host self.port = port - self.config = ProcessingServerConfig(config_path) - self.deployer = Deployer(self.config) + # The deployer is used for: + # - deploying agents when the Processing Server is started + # - retrieving runtime data of agents + self.deployer = Deployer(config_path) self.mongodb_url = None - self.rmq_host = self.config.queue.address - self.rmq_port = self.config.queue.port + # TODO: Combine these under a single URL, rabbitmq_utils needs an update + self.rmq_host = self.deployer.data_queue.address + self.rmq_port = self.deployer.data_queue.port self.rmq_vhost = '/' - self.rmq_username = self.config.queue.credentials[0] - self.rmq_password = self.config.queue.credentials[1] + self.rmq_username = self.deployer.data_queue.username + self.rmq_password = self.deployer.data_queue.password # Gets assigned when `connect_publisher` is called on the working object self.rmq_publisher = None - # This list holds all processors mentioned in the config file - self._processor_list = None - # Create routes self.router.add_api_route( path='/stop', @@ -82,7 +92,7 @@ def __init__(self, config_path: str, host: str, port: int) -> None: self.router.add_api_route( path='/processor/{processor_name}/{job_id}', - endpoint=self.get_job, + endpoint=self.get_processor_job, methods=['GET'], tags=['processing'], status_code=status.HTTP_200_OK, @@ -121,27 +131,21 @@ def start(self) -> None: """ deploy agents (db, queue, workers) and start the processing server with uvicorn """ try: - rabbitmq_hostinfo = self.deployer.deploy_rabbitmq( - image='rabbitmq:3-management', detach=True, remove=True) - - # Assign the credentials to the rabbitmq url parameter - rabbitmq_url = f'amqp://{self.rmq_username}:{self.rmq_password}@{rabbitmq_hostinfo}' + self.deployer.deploy_rabbitmq(image='rabbitmq:3-management', detach=True, remove=True) + rabbitmq_url = self.deployer.data_queue.url - mongodb_hostinfo = self.deployer.deploy_mongodb( - image='mongo', detach=True, remove=True) - - self.mongodb_url = f'mongodb://{mongodb_hostinfo}' + self.deployer.deploy_mongodb(image='mongo', detach=True, remove=True) + self.mongodb_url = self.deployer.data_mongo.url # The RMQPublisher is initialized and a connection to the RabbitMQ is performed self.connect_publisher() - self.log.debug(f'Creating message queues on RabbitMQ instance url: {rabbitmq_url}') self.create_message_queues() - # Deploy processing hosts where processing workers are running on - # Note: A deployed processing worker starts listening to a message queue with id - # processor.name - self.deployer.deploy_hosts(rabbitmq_url, self.mongodb_url) + self.deployer.deploy_hosts( + mongodb_url=self.mongodb_url, + rabbitmq_url=rabbitmq_url + ) except Exception: self.log.error('Error during startup of processing server. ' 'Trying to kill parts of incompletely deployed service') @@ -183,25 +187,30 @@ def connect_publisher(self, enable_acks: bool = True) -> None: self.log.info('Successfully connected RMQPublisher.') def create_message_queues(self) -> None: - """Create the message queues based on the occurrence of `processor.name` in the config file + """ Create the message queues based on the occurrence of + `workers.name` in the config file. """ - for host in self.config.hosts: - for processor in host.processors: - # The existence/validity of the processor.name is not tested. - # Even if an ocr-d processor does not exist, the queue is created - self.log.info(f'Creating a message queue with id: {processor.name}') - self.rmq_publisher.create_queue(queue_name=processor.name) - - @property - def processor_list(self): - if self._processor_list: - return self._processor_list - res = set([]) - for host in self.config.hosts: - for processor in host.processors: - res.add(processor.name) - self._processor_list = list(res) - return self._processor_list + + # TODO: Remove + """ + queue_names = set([]) + for data_host in self.deployer.data_hosts: + for data_worker in data_host.data_workers: + queue_names.add(data_worker.processor_name) + """ + + # The abstract version of the above lines + queue_names = self.deployer.find_matching_processors( + worker_only=True, + str_names_only=True, + unique_only=True + ) + + for queue_name in queue_names: + # The existence/validity of the worker.name is not tested. + # Even if an ocr-d processor does not exist, the queue is created + self.log.info(f'Creating a message queue with id: {queue_name}') + self.rmq_publisher.create_queue(queue_name=queue_name) @staticmethod def create_processing_message(job: DBProcessorJob) -> OcrdProcessingMessage: @@ -220,55 +229,78 @@ def create_processing_message(job: DBProcessorJob) -> OcrdProcessingMessage: ) return processing_message - async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJobOutput: - """ Queue a processor job - """ - if not self.rmq_publisher: - raise Exception('RMQPublisher is not connected') + def check_if_queue_exists(self, processor_name): + try: + # Only checks if the process queue exists, if not raises ChannelClosedByBroker + self.rmq_publisher.create_queue(processor_name, passive=True) + except ChannelClosedByBroker as error: + self.log.warning(f"Process queue with id '{processor_name}' not existing: {error}") + # Reconnect publisher - not efficient, but works + # TODO: Revisit when reconnection strategy is implemented + self.connect_publisher(enable_acks=True) + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"Process queue with id '{processor_name}' not existing" + ) - if processor_name not in self.processor_list: - try: - # Only checks if the process queue exists, if not raises ChannelClosedByBroker - self.rmq_publisher.create_queue(processor_name, passive=True) - except ChannelClosedByBroker as error: - self.log.warning(f"Process queue with id '{processor_name}' not existing: {error}") - # Reconnect publisher - not efficient, but works - # TODO: Revisit when reconnection strategy is implemented - self.connect_publisher(enable_acks=True) - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail=f"Process queue with id '{processor_name}' not existing" - ) - - # validate parameters - ocrd_tool = get_ocrd_tool_json(processor_name) - if not ocrd_tool: + def query_ocrd_tool_json_from_server(self, processor_name): + processor_server_url = self.deployer.resolve_processor_server_url(processor_name) + if not processor_server_url: + self.log.exception(f"Processor Server of '{processor_name}' is not available") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Processor Server of '{processor_name}' is not available" + ) + # Request the tool json from the Processor Server + response = requests.get( + processor_server_url, + headers={'Content-Type': 'application/json'} + ) + if not response.status_code == 200: + self.log.exception(f"Failed to retrieve '{processor_name}' from: {processor_server_url}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Processor '{processor_name}' not available. Empty or missing ocrd_tool" + detail=f"Failed to retrieve '{processor_name}' from: {processor_server_url}" ) - report = ParameterValidator(ocrd_tool).validate(dict(data.parameters)) - if not report.is_valid: - raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=report.errors) + ocrd_tool = response.json() + return ocrd_tool, processor_server_url - if bool(data.path_to_mets) == bool(data.workspace_id): + async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJobOutput: + if data.agent_type not in ['worker', 'server']: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="Either 'path' or 'workspace_id' must be provided, but not both" + detail=f"Unknown network agent with value: {data.agent_type}" + ) + job_output = None + if data.agent_type == 'worker': + job_output = await self.push_to_processing_queue(processor_name, data) + if data.agent_type == 'server': + job_output = await self.push_to_processor_server(processor_name, data) + if not job_output: + self.log.exception('Failed to create job output') + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail='Failed to create job output' ) - # This check is done to return early in case - # the workspace_id is provided but not existing in the DB - elif data.workspace_id: - try: - await db_get_workspace(data.workspace_id) - except ValueError: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail=f"Workspace with id '{data.workspace_id}' not existing" - ) + return job_output + + # TODO: Revisit and remove duplications between push_to_* methods + async def push_to_processing_queue(self, processor_name: str, job_input: PYJobInput) -> PYJobOutput: + ocrd_tool = await self.get_processor_info(processor_name) + validate_job_input(self.log, processor_name, ocrd_tool, job_input) + job_input = await validate_and_resolve_mets_path(self.log, job_input, resolve=False) + if not self.rmq_publisher: + raise Exception('RMQPublisher is not connected') + deployed_processors = self.deployer.find_matching_processors( + worker_only=True, + str_names_only=True, + unique_only=True + ) + if processor_name not in deployed_processors: + self.check_if_queue_exists(processor_name) job = DBProcessorJob( - **data.dict(exclude_unset=True, exclude_none=True), + **job_input.dict(exclude_unset=True, exclude_none=True), job_id=generate_id(), processor_name=processor_name, state=StateEnum.queued @@ -276,39 +308,77 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ await job.insert() processing_message = self.create_processing_message(job) encoded_processing_message = OcrdProcessingMessage.encode_yml(processing_message) - try: self.rmq_publisher.publish_to_queue(processor_name, encoded_processing_message) except Exception as error: + self.log.exception(f'RMQPublisher has failed: {error}') raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f'RMQPublisher has failed: {error}' ) return job.to_job_output() - async def get_processor_info(self, processor_name) -> Dict: - """ Return a processor's ocrd-tool.json - """ - if processor_name not in self.processor_list: + async def push_to_processor_server(self, processor_name: str, job_input: PYJobInput) -> PYJobOutput: + ocrd_tool, processor_server_url = self.query_ocrd_tool_json_from_server(processor_name) + validate_job_input(self.log, processor_name, ocrd_tool, job_input) + job_input = await validate_and_resolve_mets_path(self.log, job_input, resolve=False) + try: + json_data = json.dumps(job_input.dict(exclude_unset=True, exclude_none=True)) + except Exception as e: + self.log.exception(f"Failed to json dump the PYJobInput, error: {e}") raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail='Processor not available' + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to json dump the PYJobInput, error: {e}" + ) + + # TODO: The amount of pages should come as a request input + # TODO: cf https://github.com/OCR-D/core/pull/1030/files#r1152551161 + # currently, use 200 as a default + amount_of_pages = 200 + request_timeout = 20.0 * amount_of_pages # 20 sec timeout per page + # Post a processing job to the Processor Server asynchronously + timeout = httpx.Timeout(timeout=request_timeout, connect=30.0) + async with httpx.AsyncClient(timeout=timeout) as client: + response = await client.post( + processor_server_url, + headers={'Content-Type': 'application/json'}, + json=json.loads(json_data) ) - return get_ocrd_tool_json(processor_name) - async def get_job(self, processor_name: str, job_id: str) -> PYJobOutput: - """ Return processing job-information from the database - """ - try: - job = await db_get_processing_job(job_id) - return job.to_job_output() - except ValueError: + if not response.status_code == 202: + self.log.exception(f"Failed to post '{processor_name}' job to: {processor_server_url}") raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail=f"Processing job with id '{job_id}' of processor type '{processor_name}' not existing" + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to post '{processor_name}' job to: {processor_server_url}" ) + job_output = response.json() + return job_output + + async def get_processor_job(self, processor_name: str, job_id: str) -> PYJobOutput: + return await _get_processor_job(self.log, processor_name, job_id) - async def list_processors(self) -> str: - """ Return a list of all available processors + async def get_processor_info(self, processor_name) -> Dict: + """ Return a processor's ocrd-tool.json """ - return self.processor_list + ocrd_tool = self.ocrd_all_tool_json.get(processor_name, None) + if not ocrd_tool: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Ocrd tool JSON of '{processor_name}' not available!" + ) + + # TODO: Returns the ocrd tool json even of processors + # that are not deployed. This may or may not be desired. + return ocrd_tool + + async def list_processors(self) -> List[str]: + # There is no caching on the Processing Server side + processor_names_list = self.deployer.find_matching_processors( + docker_only=False, + native_only=False, + worker_only=False, + server_only=False, + str_names_only=True, + unique_only=True + ) + return processor_names_list diff --git a/ocrd_network/ocrd_network/processing_worker.py b/ocrd_network/ocrd_network/processing_worker.py index 8b3681f18..2fd8a1b25 100644 --- a/ocrd_network/ocrd_network/processing_worker.py +++ b/ocrd_network/ocrd_network/processing_worker.py @@ -9,18 +9,14 @@ """ from datetime import datetime -import json import logging -from os import environ, getpid +from os import getpid import requests -from typing import Any, List import pika.spec import pika.adapters.blocking_connection -from ocrd import Resolver from ocrd_utils import getLogger -from ocrd.processor.helpers import run_cli, run_processor from .database import ( sync_initiate_database, @@ -28,6 +24,7 @@ sync_db_update_processing_job, ) from .models import StateEnum +from .process_helpers import invoke_processor from .rabbitmq_utils import ( OcrdProcessingMessage, OcrdResultMessage, @@ -36,21 +33,13 @@ ) from .utils import ( calculate_execution_time, + tf_disable_interactive_logs, verify_database_uri, verify_and_parse_mq_uri ) # TODO: Check this again when the logging is refactored -try: - # This env variable must be set before importing from Keras - environ['TF_CPP_MIN_LOG_LEVEL'] = '3' - from tensorflow.keras.utils import disable_interactive_logging - # Enabled interactive logging throws an exception - # due to a call of sys.stdout.flush() - disable_interactive_logging() -except Exception: - # Nothing should be handled here if TF is not available - pass +tf_disable_interactive_logs() class ProcessingWorker: @@ -192,20 +181,21 @@ def process_message(self, processing_message: OcrdProcessingMessage) -> None: # may not contain certain keys. Simply passing None in the OcrdProcessingMessage constructor # breaks the message validator schema which expects String, but not None due to the Optional[] wrapper. pm_keys = processing_message.__dict__.keys() + job_id = processing_message.job_id + input_file_grps = processing_message.input_file_grps output_file_grps = processing_message.output_file_grps if 'output_file_grps' in pm_keys else None path_to_mets = processing_message.path_to_mets if 'path_to_mets' in pm_keys else None workspace_id = processing_message.workspace_id if 'workspace_id' in pm_keys else None page_id = processing_message.page_id if 'page_id' in pm_keys else None result_queue_name = processing_message.result_queue_name if 'result_queue_name' in pm_keys else None callback_url = processing_message.callback_url if 'callback_url' in pm_keys else None + parameters = processing_message.parameters if processing_message.parameters else {} if not path_to_mets and workspace_id: path_to_mets = sync_db_get_workspace(workspace_id).workspace_mets_path - workspace = Resolver().workspace_from_url(path_to_mets) - - job_id = processing_message.job_id - + execution_failed = False + self.log.debug(f'Invoking processor: {self.processor_name}') start_time = datetime.now() sync_db_update_processing_job( job_id=job_id, @@ -213,35 +203,30 @@ def process_message(self, processing_message: OcrdProcessingMessage) -> None: path_to_mets=path_to_mets, start_time=start_time ) - if self.processor_class: - self.log.debug(f'Invoking the pythonic processor: {self.processor_name}') - return_status = self.run_processor_from_worker( + try: + invoke_processor( processor_class=self.processor_class, - workspace=workspace, - page_id=page_id, - input_file_grps=processing_message.input_file_grps, - output_file_grps=output_file_grps, - parameter=processing_message.parameters - ) - else: - self.log.debug(f'Invoking the cli: {self.processor_name}') - return_status = self.run_cli_from_worker( executable=self.processor_name, - workspace=workspace, - page_id=page_id, - input_file_grps=processing_message.input_file_grps, + abs_path_to_mets=path_to_mets, + input_file_grps=input_file_grps, output_file_grps=output_file_grps, - parameter=processing_message.parameters + page_id=page_id, + parameters=processing_message.parameters ) + except Exception as error: + self.log.debug(f"processor_name: {self.processor_name}, path_to_mets: {path_to_mets}, " + f"input_grps: {input_file_grps}, output_file_grps: {output_file_grps}, " + f"page_id: {page_id}, parameters: {parameters}") + self.log.exception(error) + execution_failed = True end_time = datetime.now() - # Execution duration in ms - execution_duration = calculate_execution_time(start_time, end_time) - job_state = StateEnum.success if return_status else StateEnum.failed + exec_duration = calculate_execution_time(start_time, end_time) + job_state = StateEnum.success if not execution_failed else StateEnum.failed sync_db_update_processing_job( job_id=job_id, state=job_state, end_time=end_time, - exec_time=f'{execution_duration} ms' + exec_time=f'{exec_duration} ms' ) if result_queue_name or callback_url: @@ -253,11 +238,9 @@ def process_message(self, processing_message: OcrdProcessingMessage) -> None: workspace_id=workspace_id ) self.log.info(f'Result message: {result_message}') - # If the result_queue field is set, send the result message to a result queue if result_queue_name: self.publish_to_result_queue(result_queue_name, result_message) - # If the callback_url field is set, post the result message to a callback url if callback_url: self.post_to_callback_url(callback_url, result_message) @@ -286,65 +269,3 @@ def post_to_callback_url(self, callback_url: str, result_message: OcrdResultMess } response = requests.post(url=callback_url, headers=headers, json=json_data) self.log.info(f'Response from callback_url "{response}"') - - def run_processor_from_worker( - self, - processor_class, - workspace, - page_id: str, - input_file_grps: List[str], - output_file_grps: List[str], - parameter: dict, - ) -> bool: - input_file_grps_str = ','.join(input_file_grps) - output_file_grps_str = ','.join(output_file_grps) - - success = True - try: - run_processor( - processorClass=processor_class, - workspace=workspace, - page_id=page_id, - parameter=parameter, - input_file_grp=input_file_grps_str, - output_file_grp=output_file_grps_str, - instance_caching=True - ) - except Exception as e: - success = False - self.log.exception(e) - - if not success: - self.log.error(f'{processor_class} failed with an exception.') - else: - self.log.debug(f'{processor_class} exited with success.') - return success - - def run_cli_from_worker( - self, - executable: str, - workspace, - page_id: str, - input_file_grps: List[str], - output_file_grps: List[str], - parameter: dict - ) -> bool: - input_file_grps_str = ','.join(input_file_grps) - output_file_grps_str = ','.join(output_file_grps) - - return_code = run_cli( - executable=executable, - workspace=workspace, - page_id=page_id, - input_file_grp=input_file_grps_str, - output_file_grp=output_file_grps_str, - parameter=json.dumps(parameter), - mets_url=workspace.mets_target - ) - - if return_code != 0: - self.log.error(f'{executable} exited with non-zero return value {return_code}.') - return False - else: - self.log.debug(f'{executable} exited with success.') - return True diff --git a/ocrd_network/ocrd_network/processor_server.py b/ocrd_network/ocrd_network/processor_server.py new file mode 100644 index 000000000..785b82c61 --- /dev/null +++ b/ocrd_network/ocrd_network/processor_server.py @@ -0,0 +1,223 @@ +from datetime import datetime +import logging +from os import getpid +from subprocess import run, PIPE +import uvicorn + +from fastapi import FastAPI, HTTPException, status, BackgroundTasks + +from ocrd_utils import ( + get_ocrd_tool_json, + getLogger, + parse_json_string_with_comments, +) +from .database import ( + DBProcessorJob, + db_update_processing_job, + initiate_database +) +from .models import ( + PYJobInput, + PYJobOutput, + PYOcrdTool, + StateEnum +) +from .process_helpers import invoke_processor +from .server_utils import ( + _get_processor_job, + validate_and_resolve_mets_path, + validate_job_input +) +from .utils import ( + calculate_execution_time, + generate_id, + tf_disable_interactive_logs +) + +# TODO: Check this again when the logging is refactored +tf_disable_interactive_logs() + + +class ProcessorServer(FastAPI): + def __init__(self, mongodb_addr: str, processor_name: str = "", processor_class=None): + if not (processor_name or processor_class): + raise ValueError('Either "processor_name" or "processor_class" must be provided') + self.log = getLogger(__name__) + + self.db_url = mongodb_addr + self.processor_name = processor_name + self.processor_class = processor_class + self.ocrd_tool = None + self.version = None + + self.version = self.get_version() + self.ocrd_tool = self.get_ocrd_tool() + + if not self.ocrd_tool: + raise Exception(f"The ocrd_tool is empty or missing") + + if not self.processor_name: + self.processor_name = self.ocrd_tool['executable'] + + tags_metadata = [ + { + 'name': 'Processing', + 'description': 'OCR-D Processor Server' + } + ] + + super().__init__( + title=self.processor_name, + description=self.ocrd_tool['description'], + version=self.version, + openapi_tags=tags_metadata, + on_startup=[self.startup] + ) + + # Create routes + self.router.add_api_route( + path='/', + endpoint=self.get_processor_info, + methods=['GET'], + tags=['Processing'], + status_code=status.HTTP_200_OK, + summary='Get information about this processor.', + response_model=PYOcrdTool, + response_model_exclude_unset=True, + response_model_exclude_none=True + ) + + self.router.add_api_route( + path='/', + endpoint=self.create_processor_task, + methods=['POST'], + tags=['Processing'], + status_code=status.HTTP_202_ACCEPTED, + summary='Submit a job to this processor.', + response_model=PYJobOutput, + response_model_exclude_unset=True, + response_model_exclude_none=True + ) + + self.router.add_api_route( + path='/{job_id}', + endpoint=self.get_processor_job, + methods=['GET'], + tags=['Processing'], + status_code=status.HTTP_200_OK, + summary='Get information about a job based on its ID', + response_model=PYJobOutput, + response_model_exclude_unset=True, + response_model_exclude_none=True + ) + + async def startup(self): + await initiate_database(db_url=self.db_url) + DBProcessorJob.Settings.name = self.processor_name + + async def get_processor_info(self): + if not self.ocrd_tool: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f'Empty or missing ocrd_tool' + ) + return self.ocrd_tool + + # Note: The Processing server pushes to a queue, while + # the Processor Server creates (pushes to) a background task + async def create_processor_task(self, job_input: PYJobInput, background_tasks: BackgroundTasks): + validate_job_input(self.log, self.processor_name, self.ocrd_tool, job_input) + job_input = await validate_and_resolve_mets_path(self.log, job_input, resolve=True) + + job_id = generate_id() + job = DBProcessorJob( + **job_input.dict(exclude_unset=True, exclude_none=True), + job_id=job_id, + processor_name=self.processor_name, + state=StateEnum.queued + ) + await job.insert() + await self.run_processor_task(job_id=job_id, job=job) + return job.to_job_output() + + async def run_processor_task(self, job_id: str, job: DBProcessorJob): + execution_failed = False + start_time = datetime.now() + await db_update_processing_job( + job_id=job_id, + state=StateEnum.running, + start_time=start_time + ) + try: + invoke_processor( + processor_class=self.processor_class, + executable=self.processor_name, + abs_path_to_mets=job.path_to_mets, + input_file_grps=job.input_file_grps, + output_file_grps=job.output_file_grps, + page_id=job.page_id, + parameters=job.parameters + ) + except Exception as error: + self.log.debug(f"processor_name: {self.processor_name}, path_to_mets: {job.path_to_mets}, " + f"input_grps: {job.input_file_grps}, output_file_grps: {job.output_file_grps}, " + f"page_id: {job.page_id}, parameters: {job.parameters}") + self.log.exception(error) + execution_failed = True + end_time = datetime.now() + exec_duration = calculate_execution_time(start_time, end_time) + job_state = StateEnum.success if not execution_failed else StateEnum.failed + await db_update_processing_job( + job_id=job_id, + state=job_state, + end_time=end_time, + exec_time=f'{exec_duration} ms' + ) + + def get_ocrd_tool(self): + if self.ocrd_tool: + return self.ocrd_tool + if self.processor_class: + # The way of accessing ocrd tool like in the line below may be problematic + # ocrd_tool = self.processor_class(workspace=None, version=True).ocrd_tool + ocrd_tool = parse_json_string_with_comments( + run( + [self.processor_name, '--dump-json'], + stdout=PIPE, + check=True, + universal_newlines=True + ).stdout + ) + else: + ocrd_tool = get_ocrd_tool_json(self.processor_name) + return ocrd_tool + + def get_version(self) -> str: + if self.version: + return self.version + + """ + if self.processor_class: + # The way of accessing the version like in the line below may be problematic + # version_str = self.processor_class(workspace=None, version=True).version + return version_str + """ + version_str = run( + [self.processor_name, '--version'], + stdout=PIPE, + check=True, + universal_newlines=True + ).stdout + return version_str + + def run_server(self, host, port, access_log=False): + # TODO: Provide more flexibility for configuring file logging (i.e. via ENV variables) + file_handler = logging.FileHandler(f'/tmp/server_{self.processor_name}_{port}_{getpid()}.log', mode='a') + logging_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + file_handler.setFormatter(logging.Formatter(logging_format)) + file_handler.setLevel(logging.DEBUG) + self.log.addHandler(file_handler) + uvicorn.run(self, host=host, port=port, access_log=access_log) + + async def get_processor_job(self, processor_name: str, job_id: str) -> PYJobOutput: + return await _get_processor_job(self.log, processor_name, job_id) diff --git a/ocrd_network/ocrd_network/runtime_data.py b/ocrd_network/ocrd_network/runtime_data.py new file mode 100644 index 000000000..8ab9a2896 --- /dev/null +++ b/ocrd_network/ocrd_network/runtime_data.py @@ -0,0 +1,123 @@ +from __future__ import annotations +from typing import Dict, List + +from .deployment_utils import ( + create_docker_client, + create_ssh_client, + DeployType +) + +__all__ = [ + 'DataHost', + 'DataMongoDB', + 'DataProcessingWorker', + 'DataProcessorServer', + 'DataRabbitMQ' +] + + +class DataHost: + def __init__(self, config: Dict) -> None: + self.address = config['address'] + self.username = config['username'] + self.password = config.get('password', None) + self.keypath = config.get('path_to_privkey', None) + + # These flags are used to track whether a connection + # of the specified type will be required + self.needs_ssh: bool = False + self.needs_docker: bool = False + + self.ssh_client = None + self.docker_client = None + + # TODO: Not sure this is DS is ideal, seems off + self.data_workers: List[DataProcessingWorker] = [] + self.data_servers: List[DataProcessorServer] = [] + + for worker in config.get('workers', []): + name = worker['name'] + count = worker['number_of_instance'] + deploy_type = DeployType.DOCKER if worker.get('deploy_type', None) == 'docker' else DeployType.NATIVE + if not self.needs_ssh and deploy_type == DeployType.NATIVE: + self.needs_ssh = True + if not self.needs_docker and deploy_type == DeployType.DOCKER: + self.needs_docker = True + for _ in range(count): + self.data_workers.append(DataProcessingWorker(self.address, deploy_type, name)) + + for server in config.get('servers', []): + name = server['name'] + port = server['port'] + deploy_type = DeployType.DOCKER if server.get('deploy_type', None) == 'docker' else DeployType.NATIVE + if not self.needs_ssh and deploy_type == DeployType.NATIVE: + self.needs_ssh = True + if not self.needs_docker and deploy_type == DeployType.DOCKER: + self.needs_docker = True + self.data_servers.append(DataProcessorServer(self.address, port, deploy_type, name)) + + # Key: processor_name, Value: list of ports + self.server_ports: dict = {} + + def create_client(self, client_type: str): + if client_type not in ['docker', 'ssh']: + raise ValueError(f'Host client type cannot be of type: {client_type}') + if client_type == 'ssh': + if not self.ssh_client: + self.ssh_client = create_ssh_client( + self.address, self.username, self.password, self.keypath) + return self.ssh_client + if client_type == 'docker': + if not self.docker_client: + self.docker_client = create_docker_client( + self.address, self.username, self.password, self.keypath + ) + return self.docker_client + + +class DataProcessingWorker: + def __init__(self, host: str, deploy_type: DeployType, processor_name: str) -> None: + self.host = host + self.deploy_type = deploy_type + self.processor_name = processor_name + # Assigned when deployed + self.pid = None + + +class DataProcessorServer: + def __init__(self, host: str, port: int, deploy_type: DeployType, processor_name: str) -> None: + self.host = host + self.port = port + self.deploy_type = deploy_type + self.processor_name = processor_name + # Assigned when deployed + self.pid = None + + +class DataMongoDB: + def __init__(self, config: Dict) -> None: + self.address = config['address'] + self.port = int(config['port']) + self.ssh_username = config['ssh']['username'] + self.ssh_keypath = config['ssh'].get('path_to_privkey', None) + self.ssh_password = config['ssh'].get('password', None) + self.username = config['credentials']['username'] + self.password = config['credentials']['password'] + self.url = f'mongodb://{self.address}:{self.port}' + # Assigned when deployed + self.pid = None + + +class DataRabbitMQ: + def __init__(self, config: Dict) -> None: + self.address = config['address'] + self.port = int(config['port']) + self.ssh_username = config['ssh']['username'] + self.ssh_keypath = config['ssh'].get('path_to_privkey', None) + self.ssh_password = config['ssh'].get('password', None) + self.vhost = '/' + self.username = config['credentials']['username'] + self.password = config['credentials']['password'] + self.url = f'amqp://{self.username}:{self.password}@{self.address}:{self.port}{self.vhost}' + # Assigned when deployed + self.pid = None diff --git a/ocrd_network/ocrd_network/server_utils.py b/ocrd_network/ocrd_network/server_utils.py new file mode 100644 index 000000000..b117cb48b --- /dev/null +++ b/ocrd_network/ocrd_network/server_utils.py @@ -0,0 +1,67 @@ +from fastapi import FastAPI, HTTPException, status, BackgroundTasks +from ocrd_validators import ParameterValidator +from .database import ( + db_get_processing_job, + db_get_workspace, +) +from .models import PYJobInput, PYJobOutput + + +async def _get_processor_job(logger, processor_name: str, job_id: str) -> PYJobOutput: + """ Return processing job-information from the database + """ + try: + job = await db_get_processing_job(job_id) + return job.to_job_output() + except ValueError as e: + logger.exception(f"Processing job with id '{job_id}' of processor type " + f"'{processor_name}' not existing, error: {e}") + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"Processing job with id '{job_id}' of processor type '{processor_name}' not existing" + ) + + +async def validate_and_resolve_mets_path(logger, job_input: PYJobInput, resolve: bool = False) -> PYJobInput: + # This check is done to return early in case the workspace_id is provided + # but the abs mets path cannot be queried from the DB + if not job_input.path_to_mets and job_input.workspace_id: + try: + db_workspace = await db_get_workspace(job_input.workspace_id) + if resolve: + job_input.path_to_mets = db_workspace.workspace_mets_path + except ValueError as e: + logger.exception(f"Workspace with id '{job_input.workspace_id}' not existing: {e}") + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"Workspace with id '{job_input.workspace_id}' not existing" + ) + return job_input + + +def validate_job_input(logger, processor_name: str, ocrd_tool: dict, job_input: PYJobInput) -> None: + if bool(job_input.path_to_mets) == bool(job_input.workspace_id): + logger.exception("Either 'path' or 'workspace_id' must be provided, but not both") + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Either 'path' or 'workspace_id' must be provided, but not both" + ) + if not ocrd_tool: + logger.exception(f"Processor '{processor_name}' not available. Empty or missing ocrd_tool") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Processor '{processor_name}' not available. Empty or missing ocrd_tool" + ) + try: + report = ParameterValidator(ocrd_tool).validate(dict(job_input.parameters)) + except Exception as e: + logger.exception(f'Failed to validate processing job against the ocrd_tool: {e}') + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f'Failed to validate processing job against the ocrd_tool' + ) + else: + if not report.is_valid: + logger.exception(f'Failed to validate processing job ' + f'against the ocrd_tool, errors: {report.errors}') + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=report.errors) diff --git a/ocrd_network/ocrd_network/utils.py b/ocrd_network/ocrd_network/utils.py index 759a31597..d41a1b13a 100644 --- a/ocrd_network/ocrd_network/utils.py +++ b/ocrd_network/ocrd_network/utils.py @@ -1,9 +1,15 @@ from datetime import datetime from functools import wraps -from re import match as re_match +from os import environ from pika import URLParameters from pymongo import uri_parser as mongo_uri_parser +from re import match as re_match +import requests +from typing import Dict from uuid import uuid4 +from yaml import safe_load + +from ocrd_validators import ProcessingServerConfigValidator # Based on: https://gist.github.com/phizaz/20c36c6734878c6ec053245a477572ec @@ -27,6 +33,19 @@ def calculate_execution_time(start: datetime, end: datetime) -> int: return int((end - start).total_seconds() * 1000) +def tf_disable_interactive_logs(): + try: + # This env variable must be set before importing from Keras + environ['TF_CPP_MIN_LOG_LEVEL'] = '3' + from tensorflow.keras.utils import disable_interactive_logging + # Enabled interactive logging throws an exception + # due to a call of sys.stdout.flush() + disable_interactive_logging() + except Exception: + # Nothing should be handled here if TF is not available + pass + + def generate_created_time() -> int: return int(datetime.utcnow().timestamp()) @@ -40,6 +59,16 @@ def generate_id() -> str: return str(uuid4()) +def validate_and_load_config(config_path: str) -> Dict: + # Load and validate the config + with open(config_path) as fin: + config = safe_load(fin) + report = ProcessingServerConfigValidator.validate(config) + if not report.is_valid: + raise Exception(f'Processing-Server configuration file is invalid:\n{report.errors}') + return config + + def verify_database_uri(mongodb_address: str) -> str: try: # perform validation check @@ -69,3 +98,13 @@ def verify_and_parse_mq_uri(rabbitmq_address: str): 'vhost': url_params.virtual_host } return parsed_data + + +def download_ocrd_all_tool_json(ocrd_all_url: str): + if not ocrd_all_url: + raise ValueError(f'The URL of ocrd all tool json is empty') + headers = {'Accept': 'application/json'} + response = requests.get(ocrd_all_url, headers=headers) + if not response.status_code == 200: + raise ValueError(f"Failed to download ocrd all tool json from: '{ocrd_all_url}'") + return response.json() diff --git a/ocrd_network/requirements.txt b/ocrd_network/requirements.txt index d11fb430f..4e8986c1b 100644 --- a/ocrd_network/requirements.txt +++ b/ocrd_network/requirements.txt @@ -4,3 +4,4 @@ docker paramiko pika>=1.2.0 beanie~=1.7 +httpx>=0.22.0 diff --git a/ocrd_network/setup.py b/ocrd_network/setup.py index f79081fa0..ee0521aaf 100644 --- a/ocrd_network/setup.py +++ b/ocrd_network/setup.py @@ -19,6 +19,7 @@ install_requires=install_requires, packages=[ 'ocrd_network', + 'ocrd_network.cli', 'ocrd_network.models', 'ocrd_network.rabbitmq_utils' ], diff --git a/ocrd_validators/ocrd_validators/processing_server_config.schema.yml b/ocrd_validators/ocrd_validators/processing_server_config.schema.yml index d28b63a3d..4039e4917 100644 --- a/ocrd_validators/ocrd_validators/processing_server_config.schema.yml +++ b/ocrd_validators/ocrd_validators/processing_server_config.schema.yml @@ -57,12 +57,16 @@ properties: required: - address - username - - workers oneOf: - required: - password - required: - path_to_privkey + anyOf: + - required: + - workers + - required: + - servers properties: address: description: The IP address or domain name of the target machine @@ -75,7 +79,7 @@ properties: description: Path to private key file type: string workers: - description: List of workers which will be deployed + description: List of processing workers that will be deployed type: array minItems: 1 items: @@ -97,12 +101,41 @@ properties: minimum: 1 default: 1 deploy_type: - description: Should the processor be deployed natively or with Docker + description: Should the processing worker be deployed natively or with Docker + type: string + enum: + - native + - docker + default: native + servers: + description: List of processor servers that will be deployed + type: array + minItems: 1 + items: + type: object + additionalProperties: false + required: + - name + - port + properties: + name: + description: Name of the processor + type: string + pattern: "^ocrd-.*$" + examples: + - ocrd-cis-ocropy-binarize + - ocrd-olena-binarize + deploy_type: + description: Should the processor server be deployed natively or with Docker type: string enum: - native - docker default: native + port: + description: The port number to be deployed on the host + $ref: "#/$defs/port" + $defs: address: type: string