-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadc-search.py
294 lines (251 loc) · 11.2 KB
/
adc-search.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
import requests
import pandas as pd
import numpy as np
import argparse
import json
import os, ssl
import sys
import time
def getField(dictionary, field_path, verbose):
field_list = field_path.split(".")
current_object = dictionary
for field_name in field_list:
if field_name in current_object:
current_object = current_object[field_name]
if isinstance(current_object, list):
if verbose:
print("IR-INFO: Warning: Processing array field, first object only")
current_object = current_object[0]
current_field = field_name
else:
return None, None
return current_field, current_object
def performRearrangementQuery(rearrangement_url, repertoires, rearrangement_dict, repertoire_field_df, output_handle, service_delay, verbose):
count = 0
total = len(repertoires)
print("IR-INFO: Processing %d repertoires from %s" %(total,rearrangement_url))
print("{\n\"repository\":\"%s\",\n\"results\":["%(rearrangement_url), file=output_handle)
t_start = time.perf_counter()
for repertoire in repertoires:
repertoire_id = repertoire['repertoire_id']
query_dict = generateRearrangementQuery(repertoire_id, rearrangement_dict)
query_response = processQuery(rearrangement_url, query_dict, verbose)
# Print out an error if the query failed.
if query_response == None:
print('IR-ERROR: Query %s failed to %s'%(query_dict, rearrangement_url))
continue
count = count + 1
repertoire_info = dict()
for index, row in repertoire_field_df.iterrows():
field, value = getField(repertoire, row[0], verbose)
if not field == None:
repertoire_info[row[0]] = value
#print("\"%s\":\"%s\","%(field, value), file=output_handle)
query_response["Repertoire"] = repertoire_info
print("%s"%(json.dumps(query_response, indent = 4)), file=output_handle)
if count < total:
print(",", file=output_handle)
time.sleep(service_delay)
print("]\n}", file=output_handle, flush=True)
t_end = time.perf_counter()
print("IR-INFO: Performed %d queries in %f s, %f queries/s" %
(count, t_end - t_start, count/(t_end - t_start)))
def processQuery(query_url, query_dict, verbose):
# Do a post request
url_response = requests.post(query_url, json=query_dict)
# Get the JSON data as a dictionary.
try:
json_data = url_response.json()
except json.decoder.JSONDecodeError as error:
print("IR-ERROR: Unable to process JSON response: " + str(error))
print("IR-ERROR: Status code = %s"%(url_response.status_code))
print("IR-ERROR: Reason = %s"%(url_response.reason))
if verbose:
print("IR-ERROR: Query = " + str(query_dict))
return None
except Exception as error:
print("IR-ERROR: Unable to process JSON response: " + str(error))
print("IR-ERROR: Status code = %s"%(url_response.status_code))
print("IR-ERROR: Reason = %s"%(url_response.reason))
if verbose:
print("IR-ERROR: Query = " + str(query_dict))
return None
# Return the JSON data
return json_data
def getHeaderDict():
# Set up the header for the post request.
header_dict = {'accept': 'application/json',
'Content-Type': 'application/json'}
return header_dict
def initHTTP():
# Deafult OS do not have create cient certificate bundles. It is
# easiest for us to ignore HTTPS certificate errors in this case.
if (not os.environ.get('PYTHONHTTPSVERIFY', '') and
getattr(ssl, '_create_unverified_context', None)):
ssl._create_default_https_context = ssl._create_unverified_context
def generateRearrangementQuery(repertoire_id, query_dict):
# Create a new query
query_with_repertoire = dict()
if "filters" in query_dict:
# Add an and clause that contains the repertoire_id and the original query
and_filter_list = []
and_filter_list.append({ "op":"=", "content": { "field":"repertoire_id", "value":str(repertoire_id) }})
and_filter_list.append(query_dict["filters"])
query_with_repertoire = {"filters": { "op":"and", "content": and_filter_list}}
else:
query_with_repertoire = {"filters": {"op":"=", "content": { "field":"repertoire_id", "value":str(repertoire_id) }}}
#query_with_repertoire["filters"]["content"].append({ "op":"=", "content": { "field":"repertoire_id", "value":str(repertoire_id) }}, original_filter ] }}
# Copy any other query info other than filters (e.g. facets, from, size etc)
for query_field in query_dict:
if not query_field == "filters":
query_with_repertoire[query_field] = query_dict[query_field]
# Return the new response
return query_with_repertoire
def getRepertoires(repertoire_url, repertoire_dict, output_handle, verbose):
# Ensure our HTTP set up has been done.
initHTTP()
# Get the HTTP header information (in the form of a dictionary)
header_dict = getHeaderDict()
if verbose:
print('IR-INFO: Query: ' + str(repertoire_dict))
# Perform the query.
query_json = processQuery(repertoire_url, repertoire_dict, verbose)
if verbose:
print('IR-INFO: Query response: ' + str(query_json))
# Print out an error if the query failed.
if query_json == None:
print('IR-ERROR: Query %s failed to %s'%(query_json, repertoire_url))
return None
# Check for a correct Info object.
if not "Info" in query_json:
print("IR-ERROR: Expected to find an 'Info' object, none found")
return None
# Check for a valid Repertoire object
if not "Repertoire" in query_json:
print("IR-ERROR: Expected to find an 'Repertoire' object, none found")
return None
repertoire_list = query_json['Repertoire']
return repertoire_list
def getArguments():
# Set up the command line parser
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=""
)
# A file that contains a list of URLs to send queries to
parser.add_argument("repository_url_file")
# A file that contains the repertoire query to send to each repository.
parser.add_argument("repertoire_search_json")
# A file that contains the rearrangement query to send to each repository.
parser.add_argument("rearrangement_search_json")
# Repertoire entry point to use - should be the normal AIRR reptetoire
parser.add_argument(
"--repertoire_api",
dest="repertoire_api",
default="repertoire",
help="The repertoire API entry point. Defaults to '/airr/v1/repertoire'/")
# Rearragement entry point to use - should be the normal AIRR rearrangements or
# the iR Plus Stats.
parser.add_argument(
"--rearrangement_api",
dest="rearrangement_api",
default="rearrangement",
help="The Rearrangement API entry point. Defaults to '/airr/v1/rearrangement'/"
)
# Field file
parser.add_argument(
"--field_file",
dest="field_file",
default=None,
help="File that contains a list of AIRR fields in dot notation (subject.subject_id). These fields are output in every repertoire query output in a 'Repertoire' object. If no file is provided then an empty repertoire object is created."
)
# Output file
parser.add_argument(
"--output_file",
dest="output_file",
default=None,
help="The output file to use. If none supplied, uses stdout."
)
# Choose a time delay between repertoire queries. This is so we can be nice
# to the service and not inundate it with queries. Note the service will
# reject queries if we go too fast, so this may or may not be needed.
parser.add_argument(
"--service_delay",
dest="service_delay",
default=0.2,
type=float,
help="The service delay to use between rearrangement queries. If we go too fast the serive may reject queries with an error. Default = 0.2"
)
# Flag to determine if AIRR Rearrangement or iR+ Stats API should be used.
parser.add_argument(
"-s",
"--stats",
action="store_true",
help="Use the Stats API rather than the Rearrangement API.")
# Verbosity flag
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="Run the program in verbose mode.")
# Parse the command line arguements.
options = parser.parse_args()
return options
if __name__ == "__main__":
# Get the command line arguments.
options = getArguments()
# Get the output file handle
if options.output_file == None:
output_handle = sys.stdout
else:
try:
output_handle = open(options.output_file, "w")
except Exception as err:
print("IR-ERROR: Unable to open output file %s - %s" % (options.output_file, err))
sys.exit(1)
# Read in the repertoire field file
if options.field_file is None:
repertoire_field_df = pd.DataFrame(['repertoire_id'],columns=['Fields'])
else:
try:
repertoire_field_df = pd.read_csv(options.field_file, sep='\t',
engine='python', encoding='utf-8-sig')
except Exception as err:
print("IR-ERROR: Unable to open file %s - %s" % (options.repository_url_file, err))
sys.exit(1)
# Read in the repository file
try:
repository_df = pd.read_csv(options.repository_url_file, sep='\t',
engine='python', encoding='utf-8-sig')
except Exception as err:
print("IR-ERROR: Unable to open file %s - %s" % (options.repository_url_file, err))
sys.exit(1)
# Open the Repertoire JSON query file.
with open(options.repertoire_search_json) as f:
repertoire_dict = json.load(f)
repertoire_json = str(repertoire_dict)
if options.verbose:
print("IR-INFO: Repertoire query = %s"%(repertoire_json))
# Open the Rearrangement JSON query file.
with open(options.rearrangement_search_json) as f:
rearrangement_dict = json.load(f)
rearrangement_json = str(rearrangement_dict)
if options.verbose:
print("IR-INFO: Rearrangement query = %s"%(rearrangement_json))
repo_count = 0
print("[", file=output_handle)
number_repos = len(repository_df.index)
for index, row in repository_df.iterrows():
if options.verbose:
print("IR-INFO: Row %d: %s"% (index, row['URL']+options.repertoire_api))
repertoires = getRepertoires(row['URL'].strip()+options.repertoire_api,
repertoire_dict, output_handle, options.verbose)
if not repertoires == None:
performRearrangementQuery(row['URL'].strip()+options.rearrangement_api,
repertoires, rearrangement_dict, repertoire_field_df, output_handle,
options.service_delay, options.verbose)
repo_count = repo_count+1
if repo_count < number_repos:
print(",", file=output_handle)
print("]", file=output_handle)
sys.exit(0)