forked from all-of-us/raw-data-repository
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
244 lines (202 loc) · 8.21 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
"""The main API definition file for endpoints that trigger MapReduces and batch tasks."""
import json
import logging
import traceback
import app_util
import config
from api_util import EXPORTER
from dao.metric_set_dao import AggregateMetricsDao
from dao.metrics_dao import MetricsVersionDao
from flask import Flask, request
from google.appengine.api import app_identity
from offline import biobank_samples_pipeline
from offline.participant_maint import skew_duplicate_last_modified
from offline.base_pipeline import send_failure_alert
from offline.exclude_ghost_participants import mark_ghost_participants
from offline.metrics_export import MetricsExport
from offline.participant_counts_over_time import calculate_participant_metrics
from offline.public_metrics_export import PublicMetricsExport, LIVE_METRIC_SET_ID
from offline.sa_key_remove import delete_service_account_keys
from offline.table_exporter import TableExporter
import offline.sync_consent_files
from sqlalchemy.exc import DBAPIError
from werkzeug.exceptions import BadRequest
PREFIX = '/offline/'
def _alert_on_exceptions(func):
"""Sends e-mail alerts for any failure of the decorated function.
This handles Biobank DataErrors specially.
This must be the innermost (bottom) decorator in order to discover the wrapped function's name.
"""
def alert_on_exceptions_wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except biobank_samples_pipeline.DataError as e:
# This is for CSVs older than 24h; we only want to send alerts in prod, where we expect
# regular CSV uploads. In other environments, it's OK to just abort the CSV import if there's
# no new data.
biobank_recipients = config.getSettingList(config.BIOBANK_STATUS_MAIL_RECIPIENTS, default=[])
if not e.external or (e.external and biobank_recipients):
send_failure_alert(
func.__name__,
'Data error in Biobank samples pipeline: %s' % e,
log_exc_info=True,
extra_recipients=biobank_recipients)
else:
# Don't alert for stale CSVs except in prod (where external recipients are configured).
logging.info('Not alerting on external-only DataError (%s).', e)
return json.dumps({'data_error': str(e)})
except:
send_failure_alert(func.__name__, 'Exception in cron: %s' % traceback.format_exc())
raise
return alert_on_exceptions_wrapper
@app_util.auth_required_cron
@_alert_on_exceptions
def recalculate_metrics():
in_progress = MetricsVersionDao().get_version_in_progress()
if in_progress:
logging.info("=========== Metrics pipeline already running ============")
return '{"metrics-pipeline-status": "running"}'
else:
bucket_name = app_identity.get_default_gcs_bucket_name()
logging.info("=========== Starting metrics export ============")
MetricsExport.start_export_tasks(bucket_name,
int(config.getSetting(config.METRICS_SHARDS, 1)))
return '{"metrics-pipeline-status": "started"}'
@app_util.auth_required_cron
def recalculate_public_metrics():
logging.info('generating public metrics')
aggs = PublicMetricsExport.export(LIVE_METRIC_SET_ID)
client_aggs = AggregateMetricsDao.to_client_json(aggs)
# summing all counts for one metric yields a total qualified participant count
participant_count = 0
if len(client_aggs) > 0:
participant_count = sum([a['count'] for a in client_aggs[0]['values']])
logging.info('persisted public metrics: {} aggregations over '
'{} participants'.format(len(client_aggs), participant_count))
# Same format returned by the metric sets API.
return json.dumps({
'metrics': client_aggs
})
@app_util.auth_required_cron
@_alert_on_exceptions
def import_biobank_samples():
# Note that crons always have a 10 minute deadline instead of the normal 60s; additionally our
# offline service uses basic scaling with has no deadline.
logging.info('Starting samples import.')
written, timestamp = biobank_samples_pipeline.upsert_from_latest_csv()
logging.info(
'Import complete (%d written), generating report.', written)
logging.info('Generating reconciliation report.')
biobank_samples_pipeline.write_reconciliation_report(timestamp)
logging.info('Generated reconciliation report.')
return json.dumps({'written': written})
@app_util.auth_required_cron
@_alert_on_exceptions
def biobank_monthly_reconciliation_report():
# make sure this cron job is executed after import_biobank_samples
timestamp = biobank_samples_pipeline.get_last_biobank_sample_file_info()[2]
logging.info('Generating monthly reconciliation report.')
biobank_samples_pipeline.write_reconciliation_report(timestamp, 'monthly')
logging.info('Generated monthly reconciliation report.')
return json.dumps({'monthly-reconciliation-report': 'generated'})
@app_util.auth_required(EXPORTER)
def export_tables():
resource = request.get_data()
resource_json = json.loads(resource)
database = resource_json.get('database')
tables = resource_json.get('tables')
instance_name = resource_json.get('instance_name')
if not database:
raise BadRequest("database is required")
if not tables or type(tables) is not list:
raise BadRequest("tables is required")
directory = resource_json.get('directory')
if not directory:
raise BadRequest("directory is required")
# Ensure this has a boolean value to avoid downstream issues.
deidentify = resource_json.get('deidentify') is True
return json.dumps(TableExporter.export_tables(database, tables, directory, deidentify,
instance_name))
@app_util.auth_required_cron
@_alert_on_exceptions
def skew_duplicates():
skew_duplicate_last_modified()
return '{"success": "true"}'
@app_util.auth_required_cron
@_alert_on_exceptions
def delete_old_keys():
delete_service_account_keys()
return '{"success": "true"}'
@app_util.auth_required_cron
@_alert_on_exceptions
def participant_counts_over_time():
calculate_participant_metrics()
return '{"success": "true"}'
@app_util.auth_required_cron
@_alert_on_exceptions
def exclude_ghosts():
mark_ghost_participants()
return '{"success": "true"}'
@app_util.auth_required_cron
@_alert_on_exceptions
def sync_consent_files():
offline.sync_consent_files.do_sync_consent_files()
return '{"success": "true"}'
def _build_pipeline_app():
"""Configure and return the app with non-resource pipeline-triggering endpoints."""
offline_app = Flask(__name__)
offline_app.add_url_rule(
PREFIX + 'BiobankSamplesImport',
endpoint='biobankSamplesImport',
view_func=import_biobank_samples,
methods=['GET'])
offline_app.add_url_rule(
PREFIX + 'MonthlyReconciliationReport',
endpoint='monthlyReconciliationReport',
view_func=biobank_monthly_reconciliation_report,
methods=['GET'])
offline_app.add_url_rule(
PREFIX + 'SkewDuplicates',
endpoint='skew_duplicates',
view_func=skew_duplicates,
methods=['GET'])
offline_app.add_url_rule(
PREFIX + 'MetricsRecalculate',
endpoint='metrics_recalc',
view_func=recalculate_metrics,
methods=['GET'])
offline_app.add_url_rule(
PREFIX + 'PublicMetricsRecalculate',
endpoint='public_metrics_recalc',
view_func=recalculate_public_metrics,
methods=['GET'])
offline_app.add_url_rule(
PREFIX + 'ExportTables',
endpoint='ExportTables',
view_func=export_tables,
methods=['POST'])
offline_app.add_url_rule(
PREFIX + 'DeleteOldKeys',
endpoint='delete_old_keys',
view_func=delete_old_keys,
methods=['GET'])
offline_app.add_url_rule(
PREFIX + 'ParticipantCountsOverTime',
endpoint='participant_counts_over_time',
view_func=participant_counts_over_time,
methods=['GET'])
offline_app.add_url_rule(
PREFIX + 'MarkGhostParticipants',
endpoint='exclude_ghosts',
view_func=exclude_ghosts,
methods=['GET'])
offline_app.add_url_rule(
PREFIX + 'SyncConsentFiles',
endpoint='sync_consent_files',
view_func=sync_consent_files,
methods=['GET'])
offline_app.after_request(app_util.add_headers)
offline_app.before_request(app_util.request_logging)
offline_app.register_error_handler(DBAPIError, app_util.handle_database_disconnect)
return offline_app
app = _build_pipeline_app()