diff --git a/pyclowder/connectors.py b/pyclowder/connectors.py index 14543be..d1af961 100644 --- a/pyclowder/connectors.py +++ b/pyclowder/connectors.py @@ -63,7 +63,7 @@ class Connector(object): """ def __init__(self, extractor_name, extractor_info, check_message=None, process_message=None, ssl_verify=True, - mounted_paths=None, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None): + mounted_paths=None, minio_mounted_path=None, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None): self.extractor_name = extractor_name self.extractor_info = extractor_info self.check_message = check_message @@ -73,6 +73,10 @@ def __init__(self, extractor_name, extractor_info, check_message=None, process_m self.mounted_paths = {} else: self.mounted_paths = mounted_paths + if minio_mounted_path is None: + self.minio_mounted_path = '' + else: + self.minio_mounted_path = minio_mounted_path self.clowder_url = clowder_url self.clowder_email = clowder_email self.extractor_key = extractor_key @@ -268,8 +272,13 @@ def _build_resource(self, body, host, secret_key, clowder_version): "metadata": body['metadata'] } - def _check_for_local_file(self, file_metadata): + def _check_for_local_file(self, file_metadata, file_id=None): """ Try to get pointer to locally accessible copy of file for extractor.""" + # Check if file is present in a minio mount (only valid for Clowder v2) + if self.minio_mounted_path and file_id: + minio_file_path = self.minio_mounted_path + "/" + file_id + if os.path.isfile(minio_file_path): + return minio_file_path # first check if file is accessible locally if 'filepath' in file_metadata: @@ -278,7 +287,6 @@ def _check_for_local_file(self, file_metadata): # first simply check if file is present locally if os.path.isfile(file_path): return file_path - # otherwise check any mounted paths... if len(self.mounted_paths) > 0: for source_path in self.mounted_paths: @@ -317,7 +325,6 @@ def _prepare_dataset(self, host, secret_key, resource): temp_link_dir = tempfile.mkdtemp() tmp_dirs_created.append(temp_link_dir) - # first check if any files in dataset accessible locally ds_file_list = pyclowder.datasets.get_file_list(self, host, secret_key, resource["id"]) for ds_file in ds_file_list: file_path = self._check_for_local_file(ds_file) @@ -333,7 +340,7 @@ def _prepare_dataset(self, host, secret_key, resource): # Also get file metadata in format expected by extrator (file_md_dir, file_md_tmp) = self._download_file_metadata(host, secret_key, ds_file['id'], - ds_file['filepath']) + ds_file['filepath']) located_files.append(file_path) located_files.append(file_md_tmp) tmp_files_created.append(file_md_tmp) @@ -427,7 +434,7 @@ def _process_message(self, body): try: if check_result != pyclowder.utils.CheckMessage.bypass: file_metadata = pyclowder.files.download_info(self, host, secret_key, resource["id"]) - file_path = self._check_for_local_file(file_metadata) + file_path = self._check_for_local_file(file_metadata, resource["id"]) if not file_path: file_path = pyclowder.files.download(self, host, secret_key, resource["id"], resource["intermediate_id"], @@ -628,10 +635,10 @@ class RabbitMQConnector(Connector): # pylint: disable=too-many-arguments def __init__(self, extractor_name, extractor_info, rabbitmq_uri, rabbitmq_key=None, rabbitmq_queue=None, - check_message=None, process_message=None, ssl_verify=True, mounted_paths=None, + check_message=None, process_message=None, ssl_verify=True, mounted_paths=None, minio_mounted_path=None, heartbeat=10, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None): super(RabbitMQConnector, self).__init__(extractor_name, extractor_info, check_message, process_message, - ssl_verify, mounted_paths, clowder_url, max_retry, extractor_key, clowder_email) + ssl_verify, mounted_paths, minio_mounted_path, clowder_url, max_retry, extractor_key, clowder_email) self.rabbitmq_uri = rabbitmq_uri self.rabbitmq_key = rabbitmq_key if rabbitmq_queue is None: @@ -756,7 +763,7 @@ def on_message(self, channel, method, header, body): job_id = None self.worker = RabbitMQHandler(self.extractor_name, self.extractor_info, job_id, self.check_message, - self.process_message, self.ssl_verify, self.mounted_paths, self.clowder_url, + self.process_message, self.ssl_verify, self.mounted_paths, self.minio_mounted_path, self.clowder_url, method, header, body) self.worker.start_thread(json_body) @@ -836,10 +843,10 @@ class RabbitMQHandler(Connector): """ def __init__(self, extractor_name, extractor_info, job_id, check_message=None, process_message=None, ssl_verify=True, - mounted_paths=None, clowder_url=None, method=None, header=None, body=None, max_retry=10): + mounted_paths=None, minio_mounted_path=None, clowder_url=None, method=None, header=None, body=None, max_retry=10): super(RabbitMQHandler, self).__init__(extractor_name, extractor_info, check_message, process_message, - ssl_verify, mounted_paths, clowder_url, max_retry) + ssl_verify, mounted_paths, minio_mounted_path,clowder_url, max_retry) self.method = method self.header = header self.body = body diff --git a/pyclowder/extractors.py b/pyclowder/extractors.py index 60a3a87..ee1b480 100644 --- a/pyclowder/extractors.py +++ b/pyclowder/extractors.py @@ -72,6 +72,7 @@ def __init__(self): clowder_email = os.getenv("CLOWDER_EMAIL", "") logging_config = os.getenv("LOGGING") mounted_paths = os.getenv("MOUNTED_PATHS", "{}") + minio_mounted_path = os.getenv("MINIO_MOUNTED_PATH", "") input_file_path = os.getenv("INPUT_FILE_PATH") output_file_path = os.getenv("OUTPUT_FILE_PATH") connector_default = "RabbitMQ" @@ -105,6 +106,8 @@ def __init__(self): help='rabbitMQ queue name (default=%s)' % rabbitmq_queuename) self.parser.add_argument('--mounts', '-m', dest="mounted_paths", default=mounted_paths, help="dictionary of {'remote path':'local path'} mount mappings") + self.parser.add_argument('--minio-mount', dest="minio_mounted_path", default=minio_mounted_path, + help="path to mount Minio storage") self.parser.add_argument('--input-file-path', '-ifp', dest="input_file_path", default=input_file_path, help="Full path to local input file to be processed (used by Big Data feature)") self.parser.add_argument('--output-file-path', '-ofp', dest="output_file_path", default=output_file_path, @@ -175,6 +178,7 @@ def start(self): rabbitmq_key=rabbitmq_key, rabbitmq_queue=self.args.rabbitmq_queuename, mounted_paths=json.loads(self.args.mounted_paths), + minio_mounted_path=self.args.minio_mounted_path, clowder_url=self.args.clowder_url, max_retry=self.args.max_retry, heartbeat=self.args.heartbeat, @@ -193,6 +197,7 @@ def start(self): process_message=self.process_message, picklefile=self.args.hpc_picklefile, mounted_paths=json.loads(self.args.mounted_paths), + minio_mounted_path=self.args.minio_mounted_path, max_retry=self.args.max_retry) threading.Thread(target=connector.listen, name="HPCConnector").start() diff --git a/pyclowder/files.py b/pyclowder/files.py index cfe3b27..55b1b0b 100644 --- a/pyclowder/files.py +++ b/pyclowder/files.py @@ -51,6 +51,14 @@ def download(connector, host, key, fileid, intermediatefileid=None, ext="", trac tracking -- should the download action be tracked """ client = ClowderClient(host=host, key=key) + # Check if minio mounted path is set + minio_mounted_path = os.getenv("MINIO_MOUNTED_PATH", "") + if minio_mounted_path: + # Check if the file is stored in Minio mount path + minio_file_path = minio_mounted_path + "/" + fileid + if os.path.isfile(minio_file_path): + return minio_file_path + # Else download the file from Clowder inputfilename = files.download(connector, client, fileid, intermediatefileid, ext) return inputfilename