Skip to content

Liftover Batch Job #58

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 27 additions & 13 deletions dataregistry/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,22 +412,36 @@ async def validate_hermes_csv(request: QCHermesFileRequest, background_tasks: Ba
if validation_errors:
return {"errors": validation_errors}

hg38 = metadata.get('referenceGenome') == 'Hg38'

script_options = {k: v for k, v in request.qc_script_options.dict().items() if v is not None}
s3.upload_metadata(metadata, f"hermes/{dataset}")
file_guid = query.save_file_upload_info(engine, dataset, metadata, s3_path, filename, file_size, user.user_name,
script_options)

# Submit the batch job for further processing
background_tasks.add_task(batch.submit_and_await_job, engine, {
'jobName': 'hermes-qc-job',
'jobQueue': 'hermes-qc-job-queue',
'jobDefinition': 'hermes-qc-job',
'parameters': {
's3-path': f"s3://{s3.BASE_BUCKET}/{s3_path}",
'file-guid': file_guid,
'col-map': json.dumps(metadata["column_map"]),
'script-options': json.dumps(script_options)
}}, query.update_file_upload_qc_log, file_guid, True)
script_options,
HermesFileStatus.SUBMITTED_TO_LIFTOVER if hg38 else HermesFileStatus.SUBMITTED_TO_QC)

if hg38:
background_tasks.add_task(batch.submit_and_await_job, engine, {
'jobName': 'liftover-job',
'jobQueue': 'liftover-job-queue',
'jobDefinition': 'liftover-job',
'parameters': {
's3-path': f"s3://{s3.BASE_BUCKET}/{s3_path}",
'chromosome-col': metadata["column_map"]['chromosome'],
'position-col': metadata["column_map"]['position']
}}, query.update_file_upload_qc_log, file_guid, True)
# can we do something here?
else:
background_tasks.add_task(batch.submit_and_await_job, engine, {
'jobName': 'hermes-qc-job',
'jobQueue': 'hermes-qc-job-queue',
'jobDefinition': 'hermes-qc-job',
'parameters': {
's3-path': f"s3://{s3.BASE_BUCKET}/{s3_path}",
'file-guid': file_guid,
'col-map': json.dumps(metadata["column_map"]),
'script-options': json.dumps(script_options)
}}, query.update_file_upload_qc_log, file_guid, True)

return {"file_size": file_size, "s3_path": s3_path, "file_id": file_guid}

Expand Down
9 changes: 9 additions & 0 deletions dataregistry/api/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import boto3

from dataregistry.api import query
from dataregistry.api.model import HermesFileStatus
from dataregistry.api.s3 import S3_REGION

Expand All @@ -18,6 +19,9 @@ def submit_aggregator_job(branch, method, extra_args):
job_id = response['jobId']
return job_id

def run_liftover_then_qc(engine, liftover_config, qc_config, identifier):
submit_and_await_job(engine, liftover_config, query.update_file_upload_qc_log, identifier, is_qc=False)
submit_and_await_job(engine, qc_config, query.update_file_upload_qc_log, identifier, is_qc=False)

def submit_and_await_job(engine, job_config, db_callback, identifier, is_qc=True):
batch_client = boto3.client('batch', region_name=S3_REGION)
Expand All @@ -28,6 +32,7 @@ def submit_and_await_job(engine, job_config, db_callback, identifier, is_qc=True
while True:
response = batch_client.describe_jobs(jobs=[job_id])
job_status = response['jobs'][0]['status']
job_queue = response['jobs'][0]['jobQueue']
if job_status in ['SUCCEEDED', 'FAILED']:
log_stream_name = response['jobs'][0]['container']['logStreamName']
log_group_name = '/aws/batch/job'
Expand All @@ -42,6 +47,10 @@ def submit_and_await_job(engine, job_config, db_callback, identifier, is_qc=True
HermesFileStatus.READY_FOR_REVIEW if job_status == 'SUCCEEDED' else
HermesFileStatus.FAILED_QC)
else:
if job_queue == 'liftover-job-queue':
db_callback(engine, complete_log, identifier,
HermesFileStatus.LIFTOVER_COMPLETE if job_status == 'SUCCEEDED' else
HermesFileStatus.LIFTOVER_FAILED)
db_callback(engine, complete_log, identifier, job_status)
break
time.sleep(60)
3 changes: 3 additions & 0 deletions dataregistry/api/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ class BioIndexCreationStatus(str, Enum):


class HermesFileStatus(str, Enum):
LIFTOVER_COMPLETE = "LIFTOVER COMPLETE"
LIFTOVER_FAILED = "LIFTOVER FAILED"
SUBMITTED_TO_LIFTOVER = "SUBMITTED TO LIFTOVER"
SUBMITTED_TO_QC = "SUBMITTED TO QC"
SUBMISSION_TO_QC_FAILED = "FAILED TO SUBMIT TO QC"
FAILED_QC = "FAILED QC"
Expand Down
6 changes: 3 additions & 3 deletions dataregistry/api/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,15 +404,15 @@ def retrieve_meta_data_mapping(engine, user: str) -> [dict]:

return {row.dataset: json.loads(row.metadata) for row in results}

def save_file_upload_info(engine, dataset, metadata, s3_path, filename, file_size, uploader, qc_script_options) -> str:
def save_file_upload_info(engine, dataset, metadata, s3_path, filename, file_size, uploader, qc_script_options, status) -> str:
with engine.connect() as conn:
new_guid = str(uuid.uuid4())
conn.execute(text("""INSERT INTO file_uploads(id, dataset, file_name, file_size, uploaded_at, uploaded_by,
metadata, s3_path, qc_script_options, qc_status) VALUES(:id, :dataset, :file_name, :file_size, NOW(), :uploaded_by, :metadata,
:s3_path, :qc_script_options, 'SUBMITTED TO QC')"""), {'id': new_guid.replace('-', ''), 'dataset': dataset,
:s3_path, :qc_script_options, :qc_status)"""), {'id': new_guid.replace('-', ''), 'dataset': dataset,
'file_name': filename,
'file_size': file_size, 'uploaded_by': uploader,
'metadata': json.dumps(metadata), 's3_path': s3_path,
'metadata': json.dumps(metadata), 's3_path': s3_path, 'qc_status': status,
'qc_script_options': json.dumps(qc_script_options)})
conn.commit()
return new_guid
Expand Down
10 changes: 10 additions & 0 deletions liftover_docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM python:3.9-slim

RUN pip install pyliftover pandas argparse boto3

WORKDIR /app

COPY gwas_liftover.py /app/

RUN chmod +x /app/gwas_liftover.py

117 changes: 117 additions & 0 deletions liftover_docker/gwas_liftover.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#!/usr/bin/env python3
import argparse
import pandas as pd
import os
import boto3
from urllib.parse import urlparse
from pyliftover import LiftOver
import tempfile
import time

def parse_args():
parser = argparse.ArgumentParser(description='Lift GWAS data from hg38 to hg19')
parser.add_argument('--input', required=True, help='Input S3 path (s3://bucket/path)')
parser.add_argument('--chr-col', default='CHR', help='Chromosome column name')
parser.add_argument('--pos-col', default='POS', help='Position column name')
parser.add_argument('--delimiter', default='\t', help='Delimiter in input file (tab is default)')
return parser.parse_args()

def parse_s3_path(s3_path):
parsed = urlparse(s3_path)
if parsed.scheme != 's3':
raise ValueError(f"Invalid S3 path: {s3_path}. Must start with s3://")
bucket = parsed.netloc
key = parsed.path.lstrip('/')
filename = os.path.basename(key)
return bucket, key, filename

def download_from_s3(s3_path, local_path):
print(f"Downloading {s3_path} to {local_path}")
bucket, key, filename = parse_s3_path(s3_path)
s3_client = boto3.client('s3')
local_path = f"{local_path}/to_convert_{filename}"
s3_client.download_file(bucket, key, local_path)
print("Download complete")
return local_path, filename

def upload_to_s3(local_path, s3_path):
print(f"Uploading {local_path} to {s3_path}")
bucket, key, filename = parse_s3_path(s3_path)
s3_client = boto3.client('s3')
s3_client.upload_file(local_path, bucket, key)
print("Upload complete")

def batch_convert_coordinates(converter, chroms, positions):
new_positions = []
failed_count = 0

for chrom, pos in zip(chroms, positions):
try:
result = converter.convert_coordinate(chrom, pos)
if result and len(result) > 0:
new_positions.append(int(result[0][1]))
else:
new_positions.append(None)
failed_count += 1
except Exception as e:
new_positions.append(None)
failed_count += 1

return new_positions, failed_count

def main():
start_time = time.time()
args = parse_args()

temp_dir = tempfile.mkdtemp()

file_to_convert, original_file = download_from_s3(args.input, temp_dir)

print("Initializing liftover converter...")
converter = LiftOver('hg38', 'hg19')
print("Converter initialized")

total_processed = 0
total_failed = 0
first_chunk = True
chunk_size = 100000
print(f"Processing in chunks of {chunk_size} rows with delimiter: {repr(args.delimiter)}")

for chunk_idx, chunk in enumerate(pd.read_csv(file_to_convert,
sep=args.delimiter,
chunksize=chunk_size,
low_memory=False)):
chunk_start = time.time()
print(f"Processing chunk {chunk_idx+1} with {len(chunk)} rows")

chunk[args.chr_col] = chunk[args.chr_col].astype(str)
chunk[args.pos_col] = chunk[args.pos_col].astype("Int64")
hg38_column = f"{args.pos_col}_hg38"
chunk.loc[:, hg38_column] = chunk[args.pos_col].copy()

chroms = chunk[args.chr_col].tolist()
positions = chunk[hg38_column].tolist()
new_positions, chunk_failed = batch_convert_coordinates(converter, chroms, positions)
chunk.loc[:, args.pos_col] = pd.Series(new_positions, dtype="Int64")

total_failed += chunk_failed
total_processed += len(chunk)

if first_chunk:
chunk.to_csv(original_file, sep=args.delimiter, index=False, mode='w')
first_chunk = False
else:
chunk.to_csv(original_file, sep=args.delimiter, index=False, mode='a', header=False)

chunk_time = time.time() - chunk_start
print(f"Chunk {chunk_idx+1}: Processed {len(chunk)} rows in {chunk_time:.2f} seconds. Failed liftovers: {chunk_failed}")

print(f"Liftover complete. Failed to lift {total_failed} out of {total_processed} variants ({(total_failed/total_processed)*100:.2f}%).")

upload_to_s3(original_file, args.input)

total_time = time.time() - start_time
print(f"Process completed successfully in {total_time:.2f} seconds")

if __name__ == "__main__":
main()
123 changes: 123 additions & 0 deletions liftover_docker/liftover.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
AWSTemplateFormatVersion: 2010-09-09
Description:
Infrastructure for lifting Hg38 to Hg19
Resources:
EcrRepository:
Type: AWS::ECR::Repository
Properties:
RepositoryName: liftover-repo

BatchServiceRole:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- batch.amazonaws.com
Action:
- 'sts:AssumeRole'
ManagedPolicyArns:
- 'arn:aws:iam::aws:policy/service-role/AWSBatchServiceRole'

LiftoverComputeEnv:
Type: 'AWS::Batch::ComputeEnvironment'
Properties:
Type: MANAGED
ServiceRole: !GetAtt BatchServiceRole.Arn
ComputeResources:
Type: Fargate
MaxvCpus: 128
Subnets:
- subnet-041ed74e61806c6f0
SecurityGroupIds:
- sg-28485f53

LiftoverJobQueue:
Type: 'AWS::Batch::JobQueue'
Properties:
JobQueueName: 'liftover-job-queue'
ComputeEnvironmentOrder:
- Order: 1
ComputeEnvironment: !Ref LiftoverComputeEnv
Priority: 1
State: ENABLED

LiftoverJobDefinition:
Type: 'AWS::Batch::JobDefinition'
Properties:
Type: 'container'
JobDefinitionName: 'liftover-job'
PlatformCapabilities:
- 'FARGATE'
Timeout:
AttemptDurationSeconds: 7200
ContainerProperties:
Image: !Sub '${AWS::AccountId}.dkr.ecr.us-east-1.amazonaws.com/liftover-repo'
Command:
- 'python3'
- 'gwas_liftover.py'
- '--input'
- 'Ref::s3-path'
- '--chr-col'
- 'Ref::chromosome-col'
- '--pos-col'
- 'Ref::position-col'
JobRoleArn: !GetAtt LiftoverJobRole.Arn
ExecutionRoleArn: !GetAtt LiftoverJobRole.Arn
ResourceRequirements:
- Type: 'VCPU'
Value: '16'
- Type: 'MEMORY'
Value: '122880'
NetworkConfiguration:
AssignPublicIp: 'ENABLED'
FargatePlatformConfiguration:
PlatformVersion: 'LATEST'

LiftoverJobRole:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: 'Allow'
Principal:
Service:
- 'ecs-tasks.amazonaws.com'
Action:
- 'sts:AssumeRole'
Path: '/'
Policies:
- PolicyName: 'S3Access'
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: 'Allow'
Action:
- 's3:GetObject'
- 's3:PutObject'
Resource:
- 'arn:aws:s3:::dig-data-registry'
- 'arn:aws:s3:::dig-data-registry/*'
- 'arn:aws:s3:::dig-data-registry-qa'
- 'arn:aws:s3:::dig-data-registry-qa/*'
- 'arn:aws:s3:::hermes-qc/*'
- 'arn:aws:s3:::hermes-qc'
- PolicyName: 'ECSTaskExecution'
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: 'Allow'
Action:
- 'ecr:GetAuthorizationToken'
- 'ecr:BatchCheckLayerAvailability'
- 'ecr:GetDownloadUrlForLayer'
- 'ecr:BatchGetImage'
- 'logs:CreateLogStream'
- 'logs:PutLogEvents'
Resource: '*'