From 021da8105d1b4faacbe65c57b5af079530a39570 Mon Sep 17 00:00:00 2001 From: Acribbs Date: Tue, 29 Oct 2024 07:43:36 +0000 Subject: [PATCH] included S3 decorators --- .gitignore | 2 + cgatcore/pipeline/__init__.py | 357 +++++++++----------------------- cgatcore/remote/file_handler.py | 109 ++++++++++ 3 files changed, 210 insertions(+), 258 deletions(-) create mode 100644 cgatcore/remote/file_handler.py diff --git a/.gitignore b/.gitignore index c346d24d..125eae5b 100644 --- a/.gitignore +++ b/.gitignore @@ -59,3 +59,5 @@ _test_commandline.yaml # sample workflow means.txt sample* + +.idea diff --git a/cgatcore/pipeline/__init__.py b/cgatcore/pipeline/__init__.py index 7434bdee..e7d07894 100644 --- a/cgatcore/pipeline/__init__.py +++ b/cgatcore/pipeline/__init__.py @@ -1,202 +1,34 @@ -'''pipeline.py - Tools for ruffus pipelines -=========================================== +''' +pipeline.py - Tools for CGAT Ruffus Pipelines +============================================= -The :mod:`pipeline` module contains various utility functions for -interfacing CGAT ruffus pipelines with an HPC cluster, uploading data -to databases, providing parameterization, and more. +This module provides a comprehensive set of tools to facilitate the creation and management +of data processing pipelines using CGAT Ruffus. It includes functionalities for pipeline control, +logging, parameterization, task execution, database uploads, temporary file management, and +integration with AWS S3. -It is a collection of utility functions covering the topics: +**Features:** -* `pipeline control`_ -* `Logging`_ -* `Parameterisation`_ -* `Running tasks`_ -* `database upload`_ -* `Report building`_ +- **Pipeline Control:** Command-line interface for executing, showing, and managing pipeline tasks. +- **Logging:** Configures logging to files and RabbitMQ for real-time monitoring. +- **Parameterization:** Loads and manages configuration parameters from various files. +- **Task Execution:** Manages the execution of tasks, supporting both local and cluster environments. +- **Database Upload:** Utilities for uploading processed data to databases. +- **Temporary File Management:** Functions to handle temporary files and directories. +- **AWS S3 Integration:** Support for processing files stored in AWS S3. -See :doc:`pipelines/pipeline_template` for a pipeline illustrating the -use of this module. See :ref:`pipelineSettingUp` on how to set up a -pipeline. +**Example Usage:** -pipeline control ----------------- +```python +from cgatcore import pipeline as P + +@P.transform("input.txt", suffix(".txt"), ".processed.txt") +def process_data(infile, outfile): + # Processing logic here + pass -:mod:`pipeline` provides a :func:`main` function that provides command -line control to a pipeline. To use it, add:: - - import cgatcore.pipeline as P - # ... - - def main(argv=None): - if argv is None: - argv = sys.argv - P.main(argv) - - - if __name__ == "__main__": - sys.exit(P.main(sys.argv)) - -to your pipeline script. Typing:: - - python my_pipeline.py --help - -will provide the following output: - - Usage: - usage: [OPTIONS] [CMD] [target] - - Execute pipeline mapping. - - Commands can be any of the following - - make - run all tasks required to build *target* - - show - show tasks required to build *target* without executing them - - plot - plot image (using inkscape) of pipeline state for *target* - - debug [args] - debug a method using the supplied arguments. The method - in the pipeline is run without checking any dependencies. - - config - write new configuration files pipeline.yml with default values - - dump - write pipeline configuration to stdout - - touch - touch files only, do not run - - regenerate - regenerate the ruffus checkpoint file - - check - check if requirements (external tool dependencies) are satisfied. - - clone - create a clone of a pipeline in in the current - directory. The cloning process aims to use soft linking to files - (not directories) as much as possible. Time stamps are - preserved. Cloning is useful if a pipeline needs to be re-run from - a certain point but the original pipeline should be preserved. - - - - Options: - --version show program's version number and exit - -h, --help show this help message and exit - --pipeline-action=PIPELINE_ACTION - action to take [default=none]. - --pipeline-format=PIPELINE_FORMAT - pipeline format [default=svg]. - -n, --dry-run perform a dry run (do not execute any shell commands) - [default=False]. - -c CONFIG_FILE, --config-file=CONFIG_FILE - benchmark configuration file [default=pipeline.yml]. - -f FORCE_RUN, --force-run=FORCE_RUN - force running the pipeline even if there are up-to- - date tasks. If option is 'all', all tasks will be - rerun. Otherwise, only the tasks given as arguments - will be rerun. [default=False]. - -p MULTIPROCESS, --multiprocess=MULTIPROCESS - number of parallel processes to use on submit host - (different from number of jobs to use for cluster - jobs) [default=none]. - -e, --exceptions echo exceptions immediately as they occur - [default=True]. - -i, --terminate terminate immediately at the first exception - [default=none]. - -d, --debug output debugging information on console, and not the - logfile [default=False]. - -s VARIABLES_TO_SET, --set=VARIABLES_TO_SET - explicitely set paramater values [default=[]]. - --input-glob=INPUT_GLOBS, --input-glob=INPUT_GLOBS - glob expression for input filenames. The exact format - is pipeline specific. If the pipeline expects only a - single input, `--input-glob=*.bam` will be sufficient. - If the pipeline expects multiple types of input, a - qualifier might need to be added, for example - `--input-glob=bam=*.bam` --input-glob=bed=*.bed.gz`. - Giving this option overrides the default of a pipeline - looking for input in the current directory or - specified the config file. [default=[]]. - --checksums=RUFFUS_CHECKSUMS_LEVEL - set the level of ruffus checksums[default=0]. - -t, --is-test this is a test run[default=False]. - --engine=ENGINE engine to use.[default=local]. - --always-mount force mounting of arvados keep [False] - --only-info only update meta information, do not run - [default=False]. - --work-dir=WORK_DIR working directory. Will be created if it does not - exist [default=none]. - --input-validation perform input validation before starting - [default=False]. - - pipeline logging configuration: - --pipeline-logfile=PIPELINE_LOGFILE - primary logging destination.[default=pipeline.log]. - --shell-logfile=SHELL_LOGFILE - filename for shell debugging information. If it is not - an absolute path, the output will be written into the - current working directory. If unset, no logging will - be output. [default=none]. - - Script timing options: - --timeit=TIMEIT_FILE - store timeing information in file [none]. - --timeit-name=TIMEIT_NAME - name in timing file for this class of jobs [all]. - --timeit-header add header for timing information [none]. - - Common options: - --random-seed=RANDOM_SEED - random seed to initialize number generator with - [none]. - -v LOGLEVEL, --verbose=LOGLEVEL - loglevel [1]. The higher, the more output. - --log-config-filename=LOG_CONFIG_FILENAME - Configuration file for logger [logging.yml]. - --tracing=TRACING enable function tracing [none]. - -? output short help (command line options only. - - cluster options: - --no-cluster, --local - do no use cluster - run locally [False]. - --cluster-priority=CLUSTER_PRIORITY - set job priority on cluster [none]. - --cluster-queue=CLUSTER_QUEUE - set cluster queue [none]. - --cluster-num-jobs=CLUSTER_NUM_JOBS - number of jobs to submit to the queue execute in - parallel [none]. - --cluster-parallel=CLUSTER_PARALLEL_ENVIRONMENT - name of the parallel environment to use [none]. - --cluster-options=CLUSTER_OPTIONS - additional options for cluster jobs, passed on to - queuing system [none]. - --cluster-queue-manager=CLUSTER_QUEUE_MANAGER - cluster queuing system [sge]. - --cluster-memory-resource=CLUSTER_MEMORY_RESOURCE - resource name to allocate memory with [none]. - --cluster-memory-default=CLUSTER_MEMORY_DEFAULT - default amount of memory to allocate [unlimited]. - - Input/output options: - -I FILE, --stdin=FILE - file to read stdin from [default = stdin]. - -L FILE, --log=FILE - file with logging information [default = stdout]. - -E FILE, --error=FILE - file with error information [default = stderr]. - -S FILE, --stdout=FILE - file where output is to go [default = stdout]. - - -Documentation on using pipelines is at :ref:`getting_started-Examples`. +if __name__ == "__main__": + P.main() Logging ------- @@ -283,7 +115,7 @@ def main(argv=None): several utility functions for conveniently uploading data. The :func:`load` method uploads data in a tab-separated file:: - @transform("*.tsv.gz", suffix(".tsv.gz"), ".load") + @P.transform("*.tsv.gz", suffix(".tsv.gz"), ".load") def loadData(infile, outfile): P.load(infile, outfile) @@ -321,74 +153,83 @@ def loadData(infile, outfile): ''' -import os +# cgatcore/pipeline/__init__.py -# import submodules into namespace +# Import existing pipeline functionality from cgatcore.pipeline.control import * -from cgatcore.pipeline.database import * from cgatcore.pipeline.files import * from cgatcore.pipeline.cluster import * -from cgatcore.pipeline.execution import * -from cgatcore.pipeline.utils import * from cgatcore.pipeline.parameters import * +from cgatcore.pipeline.utils import * +# Import original Ruffus decorators +from ruffus import ( + transform, + merge, + split, + originate, + follows +) + +# Import S3-aware decorators and functions +from cgatcore.remote.file_handler import ( + s3_transform, + s3_merge, + s3_split, + s3_originate, + s3_follows, + S3Mapper, + s3_aware +) + +# Expose the S3Mapper instance if it's needed elsewhere +s3_mapper = S3Mapper() + +# Add S3-related utility functions +def configure_s3(aws_access_key_id=None, aws_secret_access_key=None, region_name=None): + """ + Configure AWS credentials for S3 access. + If credentials are not provided, it will use the default AWS configuration. + """ + import boto3 + session = boto3.Session( + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + region_name=region_name + ) + s3_mapper.s3.S3 = session.resource('s3') + +# You can add more S3-related utility functions here as needed + +# Add any other pipeline-related imports or functionality here + +# Include a version number for the pipeline module +__version__ = "0.1.0" # Update this as needed + +# Add a docstring for the module +__doc__ = """ +This module provides pipeline functionality for cgat-core, including support for AWS S3. + +It includes both standard Ruffus decorators and S3-aware decorators. The S3-aware decorators +can be used to seamlessly work with both local and S3-based files in your pipelines. + +Example usage: + +from cgatcore import pipeline as P + +# Using standard Ruffus decorator (works as before) +@P.transform("input.txt", suffix(".txt"), ".processed") +def process_local_file(infile, outfile): + # Your processing logic here + pass + +# Using S3-aware decorator +@P.s3_transform("s3://my-bucket/input.txt", suffix(".txt"), ".processed") +def process_s3_file(infile, outfile): + # Your processing logic here + pass + +# Configure S3 credentials if needed +P.configure_s3(aws_access_key_id="YOUR_KEY", aws_secret_access_key="YOUR_SECRET") +""" -# __all__ = [ -# # backwards incompatibility -# "clone", -# "touch", -# "snip", -# # execution.py -# "run", -# "execute", -# "shellquote", -# "buildStatement", -# "submit", -# "joinStatements", -# "cluster_runnable", -# "run_pickled", -# # database.py -# "tablequote", -# "toTable", -# "build_load_statement", -# "load", -# "concatenateAndLoad", -# "mergeAndLoad", -# "connect", -# "createView", -# "getdatabaseName", -# "importFromIterator", -# # Utils.py -# "add_doc", -# "isTest", -# "getCallerLocals", -# "getCaller", -# # Control.py -# "main", -# "peekParameters", -# # Files.py -# "getTempFile", -# "getTempDir", -# "getTempFilename", -# "checkScripts", -# "checkExecutables", -# # Local.py -# "run_report", -# "publish_report", -# "publish_notebooks", -# "publish_tracks", -# "getProjectDirectories", -# "getpipelineName", -# "getProjectId", -# "getProjectName", -# "isCGAT", -# # Parameters.py -# "getParameters", -# "loadParameters", -# "matchParameter", -# "substituteParameters", -# "asList", -# "checkParameter", -# "isTrue", -# "configToDictionary", -# ] diff --git a/cgatcore/remote/file_handler.py b/cgatcore/remote/file_handler.py new file mode 100644 index 00000000..ed2de922 --- /dev/null +++ b/cgatcore/remote/file_handler.py @@ -0,0 +1,109 @@ +# cgatcore/remote/file_handler.py + +import os +import hashlib +from cgatcore.remote.aws import S3RemoteObject +from ruffus import transform, merge, split, originate, follows + + +class S3Mapper: + def __init__(self): + self.s3_to_local = {} + self.local_to_s3 = {} + self.s3 = S3RemoteObject() + + def get_local_path(self, s3_path): + if s3_path in self.s3_to_local: + return self.s3_to_local[s3_path] + + bucket, key = s3_path[5:].split('/', 1) + local_path = os.path.join('/tmp', hashlib.md5(s3_path.encode()).hexdigest()) + self.s3_to_local[s3_path] = local_path + self.local_to_s3[local_path] = (bucket, key) + return local_path + + def download_if_s3(self, path): + if path.startswith('s3://'): + local_path = self.get_local_path(path) + bucket, key = self.local_to_s3[local_path] + self.s3.download(bucket, key, local_path) + return local_path + return path + + def upload_if_s3(self, path): + if path in self.local_to_s3: + bucket, key = self.local_to_s3[path] + self.s3.upload(bucket, key, path) + + +s3_mapper = S3Mapper() + + +def s3_aware(func): + def wrapper(*args, **kwargs): + # Download S3 files before the task + local_args = [s3_mapper.download_if_s3(arg) if isinstance(arg, str) else arg for arg in args] + + # Run the original function + result = func(*local_args, **kwargs) + + # Upload modified files back to S3 after the task + for arg in local_args: + if isinstance(arg, str): + s3_mapper.upload_if_s3(arg) + + return result + + return wrapper + + +def s3_transform(input_files, filter, output_files, *args, **kwargs): + def decorator(func): + @transform(input_files, filter, output_files, *args, **kwargs) + @s3_aware + def wrapped_func(*args, **kwargs): + return func(*args, **kwargs) + + return wrapped_func + + return decorator + + +def s3_merge(input_files, output_file, *args, **kwargs): + def decorator(func): + @merge(input_files, output_file, *args, **kwargs) + @s3_aware + def wrapped_func(*args, **kwargs): + return func(*args, **kwargs) + + return wrapped_func + + return decorator + + +def s3_split(input_files, output_files, *args, **kwargs): + def decorator(func): + @split(input_files, output_files, *args, **kwargs) + @s3_aware + def wrapped_func(*args, **kwargs): + return func(*args, **kwargs) + + return wrapped_func + + return decorator + + +def s3_originate(output_files, *args, **kwargs): + def decorator(func): + @originate(output_files, *args, **kwargs) + @s3_aware + def wrapped_func(*args, **kwargs): + return func(*args, **kwargs) + + return wrapped_func + + return decorator + + +# The @follows decorator doesn't directly handle files, so we can use it as is +s3_follows = follows \ No newline at end of file