-
Notifications
You must be signed in to change notification settings - Fork 13
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #74 from phac-nml/cloud_parser_refactor
Cloud parser refactor
- Loading branch information
Showing
43 changed files
with
903 additions
and
833 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,183 @@ | ||
|
||
# Deploying the uploader to the cloud | ||
|
||
While there is not an end to end solution that you can deploy onto the cloud, the iridauploader does allow you to use it's modules to simplify your code for cloud deployment. | ||
|
||
|
||
#### Why can't I just deploy straight to cloud? | ||
|
||
The main difficulty is that each cloud storage solution maintains files differently, and it would not be feasible for us to support every cloud environment available. | ||
|
||
## How to Deploy to cloud | ||
|
||
The simplest way is to incorperate the `iridauploader` modules from `pip` / `PyPi` . | ||
|
||
`pip install iridauploader` | ||
|
||
Example for creating a new instance of the API, and a MiSeq Parser: | ||
|
||
```python | ||
import iridauploader.api as api | ||
import iridauploader.parsers as parsers | ||
|
||
api_instance = api.ApiCalls(client_id, client_secret, base_url, username, password, max_wait_time) | ||
parser_instance = parsers.Parser.factory("miseq") | ||
``` | ||
|
||
## Examples for deployment on Azure Cloud | ||
|
||
In these examples we have the following setup: | ||
* We are using an Azure Function App using Python | ||
* Files are stored in blob storage containers (in our example `myblobcontainer`) | ||
* We use a BlobTrigger to run when a new run is uploaded with the path identifier `myblobcontainer/{name}.csv` | ||
|
||
Example `function.json` file: | ||
|
||
```json | ||
{ | ||
"scriptFile": "__init__.py", | ||
"disabled": false, | ||
"bindings": [ | ||
{ | ||
"name": "myblob", | ||
"type": "blobTrigger", | ||
"direction": "in", | ||
"path": "myblobcontainer/{name}.csv", | ||
"connection":"AzureWebJobsStorage" | ||
} | ||
] | ||
} | ||
``` | ||
|
||
For the following example, we have this simple setup at the top of our `__init__.py` function app file. | ||
|
||
```python | ||
from azure.storage.blob import BlobServiceClient | ||
from azure.storage.blob import BlobClient | ||
from azure.storage.blob import ContainerClient | ||
import azure.functions as func | ||
|
||
from iridauploader import parsers | ||
|
||
|
||
# connect to our blob storage | ||
connect_str = os.getenv('AZURE_STORAGE_CONNECTION_STRING') | ||
blob_service_client = BlobServiceClient.from_connection_string(connect_str) | ||
# These strings could be fetched somehow, but this works for an example | ||
container_name = "myblobcontainer" | ||
container_client = blob_service_client.get_container_client(container_name) | ||
``` | ||
|
||
### Miseq example | ||
|
||
For this example, we will be getting the entire folder for a miseq run, as a set of blobs. When parsing directly from other sequencers, please consult the parser documentation for file structure differences. | ||
|
||
```python | ||
def main(myblob: func.InputStream): | ||
logging.info('Python blob trigger function %s', myblob.name) | ||
|
||
# download the sample sheet so it can be parsed | ||
download_sample_sheet_file_path = os.path.join(local_path, local_file_name) | ||
with open(download_sample_sheet_file_path, "wb") as download_file: | ||
download_file.write(myblob.read()) | ||
logging.info("done downloading") | ||
|
||
# get run directory (getting the middle portion) | ||
# example 'myblobcontainer/miseq_run/SampleSheet.csv' -> 'miseq_run | ||
run_directory_name = posixpath.split(posixpath.split(myblob.name)[0])[1] | ||
|
||
# we are gonna use miseq for this example | ||
my_parser = parsers.Parser.factory("miseq") | ||
logging.info("built parser") | ||
|
||
# This example was tested locally on a windows machine, so replacing \\ with / was needed for compatibility | ||
relative_data_path = my_parser.get_relative_data_directory().replace("\\", "/") | ||
full_data_dir = posixpath.join( | ||
run_directory_name, | ||
relative_data_path) | ||
|
||
# list the blobs of the run directory | ||
blob_list = list(container_client.list_blobs(full_data_dir)) | ||
file_list = [] | ||
# The file_blob_tuple_list could be useful when moving to the uploading stage in the case where | ||
# you do not want to use the iridauploader.api module to upload to irida, otherwise it can be ignored | ||
file_blob_tuple_list = [] | ||
for file_blob in blob_list: | ||
file_name = remove_prefix(file_blob.name, full_data_dir) | ||
file_list.append(file_name) | ||
file_blob_tuple_list.append({"file_name": file_name, "blob": file_blob}) | ||
|
||
# TODO, put a try catch around this with the parser exceptions. | ||
# We can catch errors within the samplesheet or missing files here | ||
sequencing_run = my_parser.get_sequencing_run( | ||
sample_sheet=download_sample_sheet_file_path, | ||
run_data_directory=full_data_dir, | ||
run_data_directory_file_list=file_list) | ||
logging.info("built sequencing run") | ||
|
||
# move to upload / error handling when the parser finds an error in the run | ||
|
||
|
||
def remove_prefix(text, prefix): | ||
if text.startswith(prefix): | ||
return text[len(prefix):] | ||
raise Exception("should not happen") | ||
``` | ||
|
||
### Directory example | ||
|
||
In this example we will be using the basic file layout for a directory upload. | ||
|
||
``` | ||
.directory_run | ||
├── file_1.fastq.gz | ||
├── file_2.fastq.gz | ||
└── SampleList.csv | ||
``` | ||
|
||
```python | ||
def main(myblob: func.InputStream): | ||
logging.info('Python blob trigger function %s', myblob.name) | ||
|
||
# download the sample sheet | ||
download_sample_sheet_file_path = os.path.join(local_path, local_file_name) | ||
with open(download_sample_sheet_file_path, "wb") as download_file: | ||
download_file.write(myblob.read()) | ||
logging.info("done downloading") | ||
|
||
# get run directory (getting the middle portion) | ||
# example 'myblobcontainer/directory_run/SampleSheet.csv' -> 'directory_run | ||
run_directory_name = posixpath.split(posixpath.split(myblob.name)[0])[1] | ||
|
||
# we are gonna use directory for this example | ||
my_parser = parsers.Parser.factory("directory") | ||
logging.info("built parser") | ||
|
||
# list the blobs of the run directory | ||
blob_list = list(container_client.list_blobs(run_directory_name)) | ||
file_list = [] | ||
# The file_blob_tuple_list could be useful when moving to the uploading stage in the case where | ||
# you do not want to use the iridauploader.api module to upload to irida, otherwise it can be ignored | ||
file_blob_tuple_list = [] | ||
for file_blob in blob_list: | ||
file_name = remove_prefix(file_blob.name, run_directory_name) | ||
# trim the leading | ||
file_name = file_name.replace("/","") | ||
file_list.append(file_name) | ||
file_blob_tuple_list.append({"file_name": file_name, "blob": file_blob}) | ||
|
||
# TODO, put a try catch around this with the parser exceptions. | ||
# We can catch errors within the samplesheet or missing files here | ||
sequencing_run = my_parser.get_sequencing_run( | ||
sample_sheet=download_sample_sheet_file_path, | ||
run_data_directory_file_list=file_list) | ||
logging.info("built sequencing run") | ||
|
||
# move to upload / error handling when the parser finds an error in the run | ||
|
||
|
||
def remove_prefix(text, prefix): | ||
if text.startswith(prefix): | ||
return text[len(prefix):] | ||
raise Exception("should not happen") | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
from iridauploader.parsers.parsers import Parser | ||
from iridauploader.parsers.parsers import supported_parsers | ||
from iridauploader.parsers import exceptions | ||
from iridauploader.parsers import common |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
""" | ||
This file has generic utility methods that can be used by all parsers | ||
These methods can rely on the os module to function, and therefore not to be used with cloud environments. | ||
They should be used as generic utilities for any new parser that is added to the project. | ||
""" | ||
import os | ||
from csv import reader | ||
import logging | ||
|
||
from iridauploader.parsers import exceptions | ||
from iridauploader import model | ||
|
||
|
||
def get_csv_reader(sample_sheet_file): | ||
|
||
""" | ||
tries to create a csv.reader object which will be used to | ||
parse through the lines in SampleSheet.csv | ||
raises an error if: | ||
sample_sheet_file is not an existing file | ||
sample_sheet_file contains null byte(s) | ||
arguments: | ||
data_dir -- the directory that has SampleSheet.csv in it | ||
returns a csv.reader object | ||
""" | ||
|
||
if os.path.isfile(sample_sheet_file): | ||
csv_file = open(sample_sheet_file, "r") | ||
# strip any trailing newline characters from the end of the line | ||
# including Windows newline characters (\r\n) | ||
csv_lines = [x.rstrip('\n') for x in csv_file] | ||
csv_lines = [x.rstrip('\r') for x in csv_lines] | ||
|
||
# open and read file in binary then send it to be parsed by csv's reader | ||
csv_reader = reader(csv_lines) | ||
else: | ||
raise exceptions.SampleSheetError("Sample sheet cannot be parsed as a CSV file because it's not a regular file.", | ||
sample_sheet_file) | ||
|
||
return csv_reader | ||
|
||
|
||
def find_directory_list(directory): | ||
"""Find and return all directories in the specified directory. | ||
Arguments: | ||
directory -- the directory to find directories in | ||
Returns: a list of directories including current directory | ||
""" | ||
|
||
# Checks if we can access to the given directory, return empty and log a warning if we cannot. | ||
if not os.access(directory, os.W_OK): | ||
raise exceptions.DirectoryError("The directory is not writeable, " | ||
"can not upload samples from this directory {}".format(directory), | ||
directory) | ||
|
||
dir_list = next(os.walk(directory))[1] # Gets the list of directories in the directory | ||
full_dir_list = [] | ||
for d in dir_list: | ||
full_dir_list.append(os.path.join(directory, d)) | ||
return full_dir_list | ||
|
||
|
||
def build_sequencing_run_from_samples(sample_list, metadata): | ||
""" | ||
Create a SequencingRun object with full project/sample/sequence_file structure | ||
:param sample_list: List of Sample objects | ||
:param metadata: metadata dict to add to the run | ||
:return: SequencingRun | ||
""" | ||
|
||
logging.debug("Building SequencingRun from parsed data") | ||
|
||
# create list of projects and add samples to appropriate project | ||
project_list = [] | ||
for sample in sample_list: | ||
project = None | ||
for p in project_list: | ||
if sample.get('sample_project') == p.id: | ||
project = p | ||
if project is None: | ||
project = model.Project(id=sample.get('sample_project')) | ||
project_list.append(project) | ||
|
||
project.add_sample(sample) | ||
|
||
sequence_run = model.SequencingRun(metadata, project_list) | ||
logging.debug("SequencingRun built") | ||
return sequence_run | ||
|
||
|
||
def get_file_list(directory): | ||
""" | ||
Get the list of file names in the data directory | ||
:param directory: directory to search for files | ||
:return: list of file names in data directory | ||
""" | ||
# verify that directory exists | ||
if not os.path.exists(directory): | ||
raise exceptions.DirectoryError("Could not list files, as directory does not exist.", directory) | ||
# Create a file list of the directory, only hit the os once | ||
file_list = next(os.walk(directory))[2] | ||
return file_list |
Oops, something went wrong.