forked from opentargets/genetics-finemapping
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path5_combine_results_rmdup.py
150 lines (130 loc) · 5.36 KB
/
5_combine_results_rmdup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Jeremy Schwartzentruber
#
# This script was used when we found that some top_loci were duplicated, which was
# due to eQTL catalogue ingestion before duplicate SNPs were removed. In general
# this script should not be needed.
#
'''
# Set SPARK_HOME and PYTHONPATH to use 2.4.0
export PYSPARK_SUBMIT_ARGS="--driver-memory 8g pyspark-shell"
export SPARK_HOME=/Users/em21/software/spark-2.4.0-bin-hadoop2.7
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-2.4.0-src.zip:$PYTHONPATH
'''
import pyspark.sql
from pyspark.sql.types import *
from pyspark.sql.functions import *
import os
from shutil import copyfile
from glob import glob
import yaml
import subprocess as sp
def main():
# Make spark session
# Using `ignoreCorruptFiles` will skip empty files
spark = (
pyspark.sql.SparkSession.builder
.config("spark.sql.files.ignoreCorruptFiles", "true")
.config("spark.master", "local[*]")
.getOrCreate()
)
# sc = spark.sparkContext
print('Spark version: ', spark.version)
# Args
#root = '/home/ubuntu/results/finemapping'
root = '/home/js29/genetics-finemapping'
#in_top_loci_pattern = root + '/output/study_id=*/phenotype_id=*/bio_feature=*/chrom=*/top_loci.json.gz'
#in_credset_pattern = root + '/output/study_id=*/phenotype_id=*/bio_feature=*/chrom=*/credible_set.json.gz'
in_top_loci_pattern = root + '/top_loci.dedup.json.gz'
in_credset_pattern = root + '/credible_set.dedup.json.gz'
out_top_loci = root + '/results/top_loci'
out_credset = root + '/results/credset'
with open('configs/analysis.config.yaml', 'r') as in_h:
config_dict = yaml.load(in_h, Loader=yaml.FullLoader)
# Process top loci
toploci = spark.read.json(in_top_loci_pattern)
# Cols phenotype_id and bio_feature might not be present if no molecular
# QTL data were processed
if not 'phenotype_id' in toploci.columns:
toploci = toploci.withColumn('phenotype_id', lit(None))
if not 'bio_feature' in toploci.columns:
toploci = toploci.withColumn('bio_feature', lit(None))
# Remove any duplicate rows of top_loci
last_n = toploci.count()
toploci = toploci.dropDuplicates(subset=['study_id', 'bio_feature', 'phenotype_id', 'type', 'variant_id'])
num_toploci = toploci.count()
print('{} toploci removed that were duplicates'.format( last_n - num_toploci ))
num_gwas_loci = toploci.filter(col('type') == 'gwas').count()
num_qtl_loci = num_toploci - num_gwas_loci
print(f'{num_toploci} toploci remain. {num_gwas_loci} GWAS and {num_qtl_loci} QTL')
(
toploci
.coalesce(1)
.orderBy('study_id', 'phenotype_id', 'bio_feature',
'chrom', 'pos')
.write.json(out_top_loci,
compression='gzip',
mode='overwrite')
)
# Copy to single file (this seems to only copy the first file in the folder...)
copyfile(
glob(out_top_loci + '/part-*.json.gz')[0],
out_top_loci + '.json.gz'
)
# Process cred set
credset = spark.read.json(in_credset_pattern)
if not 'phenotype_id' in credset.columns:
credset = credset.withColumn('phenotype_id', lit(None))
if not 'bio_feature' in credset.columns:
credset = credset.withColumn('bio_feature', lit(None))
# Remove any duplicate rows
last_n = credset.count()
credset = credset.dropDuplicates(subset=['study_id', 'bio_feature', 'phenotype_id', 'lead_variant_id', 'tag_variant_id'])
num_credset_rows = credset.count()
print('{} credset rows removed that were duplicates'.format( last_n - num_credset_rows ))
print(f'{num_credset_rows} credset rows remain.')
(
credset
.repartitionByRange('lead_chrom', 'lead_pos')
.sortWithinPartitions('lead_chrom', 'lead_pos')
.write.json(out_credset,
compression='gzip',
mode='overwrite')
)
if config_dict['run_finemap']:
# Process finemap output
in_finemap_pattern = root + '/output/study_id=*/phenotype_id=*/bio_feature=*/chrom=*/finemap_snp.tsv.gz'
out_finemap_snp = root + '/results/finemap_snp'
finemap_res = (
spark.read.option("delimiter", "\t")
.option("header", "true")
.csv(in_finemap_pattern)
)
(
finemap_res
.filter(col('prob') > 0.0001)
.repartitionByRange('chromosome', 'position')
.sortWithinPartitions('chromosome', 'position')
.write.csv(out_finemap_snp + "_filtered",
header=True,
compression='gzip',
mode='overwrite')
)
(
finemap_res
.repartitionByRange('chromosome', 'position')
.sortWithinPartitions('chromosome', 'position')
.write.csv(out_finemap_snp,
header=True,
compression='gzip',
mode='overwrite')
)
cmd = 'zcat {0}/part-00000*.csv.gz | head -n 1 > {0}.csv'.format(out_finemap_snp)
cp = sp.run(cmd, shell=True, stderr=sp.STDOUT)
cmd = 'for f in {0}/part-*.csv.gz; do zcat $f | sed "1d" >> {0}.csv; done; gzip -f {0}.csv'.format(out_finemap_snp)
cp = sp.run(cmd, shell=True, stderr=sp.STDOUT)
return 0
if __name__ == '__main__':
main()