forked from dfci/matchminer-api
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pymm_run.py
466 lines (337 loc) · 14.4 KB
/
pymm_run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
#!/usr/bin/python
import argparse
import sched
from flask import redirect
import requests
from matchminer import managment
from matchminer import settings, security
from matchminer.custom import blueprint
from matchminer.database import get_db
from matchminer.events import register_hooks
from matchminer.miner import email_content
from matchminer.utilities import *
from matchminer.validation import ConsentValidatorEve
from services.account import account_pipeline
from eve_swagger import swagger, add_documentation
# logging
logging.basicConfig(level=logging.DEBUG, format='[%(levelname)s] %(message)s', )
# useful variables
cur_dir = os.path.dirname(os.path.realpath(__file__))
static_dir = os.path.join(cur_dir, 'static')
settings_file = os.path.join(cur_dir, "matchminer/settings.py")
# define the application.
if settings.NO_AUTH:
logging.warn("NO AUTHENTICATION IS ENABLED")
app = Eve(settings=settings_file,
static_folder=static_dir,
static_url_path='',
validator=ConsentValidatorEve)
else:
app = Eve(settings=settings_file,
static_folder=static_dir,
static_url_path='',
auth=security.TokenAuth,
validator=ConsentValidatorEve)
# hot-swappable variables
app.config['SECRET_KEY'] = '2d159b3bd49bc76e93d640f86e46ad29545fc909'
app.config['SAML_PATH'] = os.path.join(cur_dir, 'saml')
# register blueprint to the main Eve application.
app.register_blueprint(blueprint)
if settings.MM_SETTINGS != 'PROD':
app.register_blueprint(swagger)
# API documentation
app.config['SWAGGER_INFO'] = {
'title': 'Matchminer API',
'version': '1.0',
'description': 'Documentation of Matchminer\'s API',
'termsOfService': 'Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the \"Software\"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.',
'contact': {
'name': 'James Lindsay',
},
'license': {
'name': 'MIT',
'url': 'https://github.com/pyeve/eve-swagger/blob/master/LICENSE',
},
'schemes': ['http', 'https'],
}
# # optional. Will use flask.request.host if missing.
# app.config['SWAGGER_HOST'] = 'myhost.com'
#
# # optional. Add/Update elements in the documentation at run-time without deleting subtrees.
# add_documentation({'paths': {'/status': {'get': {'parameters': [
# {
# 'in': 'query',
# 'name': 'foobar',
# 'required': False,
# 'description': 'special query parameter',
# 'type': 'string'
# }]
# }}}})
# register the springify hook
register_hooks(app)
@app.after_request
def after_request(response):
# test for response
is_response, item_id = parse_response(request.url)
# only redirect if response
if is_response:
# do the redirect.
db = get_db()
item = db['response'].find_one({"_id": ObjectId(item_id)})
# if it exists return the redirect.
if item is not None:
return make_response(redirect(item['return_url']))
# remove these headers
response.headers.add('Last-Modified', datetime.now())
response.headers.add('Expires', '-1')
# dont use these headers because IE11 doesn't like them with fonts.
if response.content_type != 'application/json':
response.headers.add('Cache-Control', 'no-store, no-cache, must-revalidate, post-check=0, pre-check=0, max-age=0')
response.headers.add('Pragma', 'no-cache')
return response
@app.errorhandler(401)
def my_own_error_msg(err):
return make_response("unauthorized access", 497)
@app.errorhandler(333)
def redirect_response(err):
logging.info("redirected to: %s" % err)
return make_response(redirect(err))
# Generate swagger.io API documentation automatically
@app.before_first_request
def generate_api_docs():
import json
try:
previous_api = json.load(open('api-swagger-documentation.json'))
current_api = requests.get('http://localhost:5000/api-docs')
if not current_api.status_code == 200:
logging.warn('API documentation request failed - /api-docs')
return
if not previous_api == current_api.json():
new_api = open('api-swagger-documentation.json','w')
new_api.write(json.dumps(current_api.json()))
new_api.close()
logging.info('API changes detected. Successfully generated new swagger documentation => api-swagger-documentation.json')
else:
logging.info('No API changes detected. No documentation generated.')
except:
logging.warn('Exception occured during API documentation generation. Check that api-swagger-documentation.json exists')
def run_server(args):
# set enviromental variables.
os.environ['NO_AUTH'] = str(args.no_auth)
# start the server
app.run(host='0.0.0.0', port=settings.API_PORT, threaded=True)
def bootstrap_insert(args):
# insert default data.
bootstrap(app, forced=True)
def bootstrap_dump(args):
# restore the important collections.
data_dir = settings.DATA_DIR_PROD
if data_dir == "":
data_dir = settings.DATA_DIR
dump_collections(data_dir, settings)
def bootstrap_restore(args):
# restore the important collections.
data_dir = settings.DATA_DIR_PROD
if data_dir == "":
data_dir = settings.DATA_DIR
restore_collections(data_dir, settings)
# fetch the database.
db = get_db()
db['clinical'].update_many({"VITAL_STATUS": "dead"}, {"$set": {"VITAL_STATUS": "deceased"}})
def bootstrap_debug(args):
# add matches.
bootstrap_matches(app)
# add simulated structural variants.
from matchminer.constants import synonyms
def periodic_backup(args):
# schedule periodic backups
scheduler = sched.scheduler(time.time, time.sleep)
# schedule hourly.
backup_freq = settings.BACKUP_HOURLY_FREQ
backup_dir = settings.BACKUP_HOURLY_DIR
max_cnt = settings.BACKUP_HOURLY_MAX
scheduler.enter(backup_freq, 1, backup_event, (scheduler, backup_dir, backup_freq, max_cnt, False))
# schedule daily.
backup_freq = settings.BACKUP_DAILY_FREQ
backup_dir = settings.BACKUP_DAILY_DIR
max_cnt = settings.BACKUP_DAILY_MAX
scheduler.enter(backup_freq, 1, backup_event, (scheduler, backup_dir, backup_freq, max_cnt, False))
# schedule weekly.
backup_freq = settings.BACKUP_WEEKLY_FREQ
backup_dir = settings.BACKUP_WEEKLY_DIR
max_cnt = settings.BACKUP_WEEKLY_MAX
scheduler.enter(backup_freq, 1, backup_event, (scheduler, backup_dir, backup_freq, max_cnt, False))
# run everything.
scheduler.run()
def account_service(args):
# schedule periodic backups
scheduler = sched.scheduler(time.time, time.sleep)
# schedule the sync.
scheduler.enter(settings.ACCOUNT_SYNC_FREQ, 1, account_pipeline, (scheduler, settings.ACCOUNT_SYNC_FREQ, -1))
# run everything.
scheduler.run()
def manage(args):
# detect mode.
if args.mode == "maintain_matches":
managment.maintain_matches()
elif args.mode == "maintain_users":
managment.maintain_users()
elif args.mode == "maintain_filters":
managment.maintain_filters()
elif args.mode == "reannotate_trials":
managment.reannotate_trials()
elif args.mode == "maintain_elastic":
managment.maintain_elastic()
def load_users(args):
# test insertion.
insert_users(args.file_path, from_file=True)
def sync_oncologists_email(args):
# get names.
names = get_physicians_names()
# get reference.
with open(args.file_path) as fin:
lines = fin.readlines()
# loop over each name.
for name_og in names:
# clean up
name = name_og.strip().lower()
# look for hits.
hits = 0
tokens = name.split(" ")
# remove punctuations.
tokens = [t.replace(".", "").replace("md","").replace(",","") for t in tokens]
# strip smallies.
tmp = []
for t in tokens:
if len(t) <= 2:
continue
tmp.append(t)
tokens = tmp
# check ever line.
for i in range(len(lines) - 2):
# only do line before email.
if lines[i+2].count("@") < 1:
continue
# clean up.
line = lines[i].strip().lower()
email = lines[i+2].strip().lower()
# match tokens against line.
hits = 0
for t in tokens:
if line.count(t) > 0:
hits += 1
# look for enough hits.
if hits >= 2:
xsdf = 1
#sys.exit()
def sync_oncologists_emailq(args):
# load the reference.
ref_df = pd.read_csv(args.file_path, header=0)
email_lu = {}
for name, email in zip(ref_df['name'], ref_df['email']):
if not pd.isnull(email):
email_lu[name] = email
# load the oncologists name.
db = get_db()
#names = db['clinical'].find().distinct("ORD_PHYSICIAN_NAME")
names = get_physicians_names()
# update the database.
db = get_db()
clin_cnt = 0
for name in email_lu:
# update the clinical records.
r = db['clinical'].update_many({"ORD_PHYSICIAN_NAME": name}, {"$set": {"ORD_PHYSICIAN_EMAIL": email_lu[name]}})
clin_cnt += r.modified_count
logging.info("udpated %d clinical records" % clin_cnt)
# update all filters.
genomic_lu = {}
clinical_lu = {}
match_cnt = 0
for filter in db['filter'].find():
# fix the values.
protocol_id = ''
if 'protocol_id' in filter:
protocol_id = filter['protocol_id']
email_subject = "(%s) ONCO PANEL RESULTS" % protocol_id
# loop over each match.
for match in db['match'].find({"FILTER_ID": ObjectId(filter['_id'])}):
# do lookup.
clinical_id = match['CLINICAL_ID']
genomic_id = match['VARIANTS'][0]
if clinical_id not in clinical_lu:
clinical_lu[clinical_id] = db['clinical'].find_one(clinical_id)
if genomic_id not in genomic_lu:
genomic_lu[genomic_id] = db['genomic'].find_one(genomic_id)
# generate body.
email_body = email_content(protocol_id, genomic_lu[genomic_id], clinical_lu[clinical_id])
# get name.
name = clinical_lu[clinical_id]['ORD_PHYSICIAN_NAME']
email_address = ""
if name in email_lu:
email_address = email_lu[name]
# update all matches.
db['match'].update_one({"_id": match["_id"]}, {"$set": {
"EMAIL_ADDRESS": email_address,
"EMAIL_SUBJECT": email_subject,
"EMAIL_BODY": email_body,
}})
match_cnt += 1
logging.info("updated %d matches" % match_cnt)
def remove_deceased(args):
# fetch the database.
db = get_db()
# track patients so we don't lookup.
dead_patients = list(db['clinical'].find({"VITAL_STATUS": "deceased"}))
logging.info("found %d deceased patients" % len(dead_patients))
# remove matches for each of these patients.
match_cnt = 0
for patient in dead_patients:
# remove associated matches.
match_cnt += db['match'].delete_many({"CLINICAL_ID": patient['_id']}).deleted_count
logging.info("removed %d matches" % match_cnt)
# main
if __name__ == '__main__':
# mode parser.
main_p = argparse.ArgumentParser()
subp = main_p.add_subparsers(help='sub-command help')
# bootstrap ACC
subp_p = subp.add_parser('insert', help='adds genomics/clinical to database')
subp_p.set_defaults(func=bootstrap_insert)
# dump the data
subp_p = subp.add_parser('dump', help='serializes data to disk')
subp_p.set_defaults(func=bootstrap_dump)
# restore the db.
subp_p = subp.add_parser('restore', help='restores data from disk')
subp_p.set_defaults(func=bootstrap_restore)
# restore the db.
subp_p = subp.add_parser('debug', help='loads debug data into database')
subp_p.set_defaults(func=bootstrap_debug)
# load users from file.
subp_p = subp.add_parser('load_users', help='loads users from csv file')
subp_p.add_argument("-f", dest='file_path', type=str, required=True)
subp_p.set_defaults(func=load_users)
# backup daemon
subp_p = subp.add_parser('backup_daemon', help='simple backup daemon')
subp_p.set_defaults(func=periodic_backup)
# account daemon
subp_p = subp.add_parser('account_daemon', help='account sync service')
subp_p.set_defaults(func=account_service)
# sync oncologist email (hack)
subp_p = subp.add_parser('sync_email', help='synchronize oncologist email')
subp_p.add_argument("-f", dest='file_path', type=str, required=True)
subp_p.set_defaults(func=sync_oncologists_email)
# remove matches to decease patients (hack)
subp_p = subp.add_parser('remove_deceased', help='deletes matches to existing patients')
subp_p.set_defaults(func=remove_deceased)
# management scripts.
subp_p = subp.add_parser('manage', help='managment functions')
subp_p.add_argument("-m", dest='mode', type=str, required=True)
subp_p.set_defaults(func=manage)
# run the webserver.
subp_p = subp.add_parser('serve', help='runs webserver')
subp_p.add_argument("-d", dest='debug', action='store_const', const=True, default=False)
subp_p.add_argument("--no-auth", dest='no_auth', action='store_const', const=True, default=False)
subp_p.set_defaults(func=run_server)
# parse args.
args = main_p.parse_args()
args.func(args)