Skip to content

Commit 75dbd31

Browse files
authored
Merge pull request #11 from uc-cdis/submit_file
added submit_file script for submitting structured data with error ha…
2 parents 8e89d4a + 7029877 commit 75dbd31

File tree

2 files changed

+242
-6
lines changed

2 files changed

+242
-6
lines changed

gen3/submission.py

Lines changed: 240 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,18 @@
11
import json
22
import requests
3+
import pandas as pd
4+
import os
35

46

5-
class Gen3SubmissionQueryError(Exception):
7+
class Gen3Error(Exception):
8+
pass
9+
10+
11+
class Gen3SubmissionQueryError(Gen3Error):
12+
pass
13+
14+
15+
class Gen3UserError(Gen3Error):
616
pass
717

818

@@ -37,7 +47,7 @@ def __export_file(self, filename, output):
3747
outfile = open(filename, "w")
3848
outfile.write(output)
3949
outfile.close
40-
print("\nOutput written to file: "+filename)
50+
print("\nOutput written to file: " + filename)
4151

4252
def query(self, query_txt, variables=None, max_tries=1):
4353
"""Execute a GraphQL query against a data commons.
@@ -93,13 +103,17 @@ def export_record(self, program, project, uuid, fileformat, filename=None):
93103
>>> Gen3Submission.export_record("DCF", "CCLE", "d70b41b9-6f90-4714-8420-e043ab8b77b9", "json", filename="DCF-CCLE_one_record.json")
94104
95105
"""
96-
assert fileformat in ["json","tsv"],"File format must be either 'json' or 'tsv'"
106+
assert fileformat in [
107+
"json",
108+
"tsv",
109+
], "File format must be either 'json' or 'tsv'"
97110
api_url = "{}/api/v0/submission/{}/{}/export?ids={}&format={}".format(
98111
self._endpoint, program, project, uuid, fileformat
99112
)
100113
output = requests.get(api_url, auth=self._auth_provider).text
101114
if filename is None:
102-
if fileformat == 'json': output = json.loads(output)
115+
if fileformat == "json":
116+
output = json.loads(output)
103117
return output
104118
else:
105119
self.__export_file(filename, output)
@@ -121,13 +135,17 @@ def export_node(self, program, project, node_type, fileformat, filename=None):
121135
>>> Gen3Submission.export_node("DCF", "CCLE", "sample", "tsv", filename="DCF-CCLE_sample_node.tsv")
122136
123137
"""
124-
assert fileformat in ["json","tsv"],"File format must be either 'json' or 'tsv'"
138+
assert fileformat in [
139+
"json",
140+
"tsv",
141+
], "File format must be either 'json' or 'tsv'"
125142
api_url = "{}/api/v0/submission/{}/{}/export/?node_label={}&format={}".format(
126143
self._endpoint, program, project, node_type, fileformat
127144
)
128145
output = requests.get(api_url, auth=self._auth_provider).text
129146
if filename is None:
130-
if fileformat == 'json': output = json.loads(output)
147+
if fileformat == "json":
148+
output = json.loads(output)
131149
return output
132150
else:
133151
self.__export_file(filename, output)
@@ -284,3 +302,219 @@ def get_graphql_schema(self):
284302
output = requests.get(api_url).text
285303
data = json.loads(output)
286304
return data
305+
306+
def submit_file(self, project_id, filename, chunk_size=30, row_offset=0):
307+
"""Submit data in a spreadsheet file containing multiple records in rows to a Gen3 Data Commons.
308+
309+
Args:
310+
project_id (str): The project_id to submit to.
311+
filename (str): The file containing data to submit. The format can be TSV, CSV or XLSX (first worksheet only for now).
312+
chunk_size (integer): The number of rows of data to submit for each request to the API.
313+
row_offset (integer): The number of rows of data to skip; '0' starts submission from the first row and submits all data.
314+
315+
Examples:
316+
This submits a spreadsheet file containing multiple records in rows to the CCLE project in the sandbox commons.
317+
318+
>>> Gen3Submission.submit_file("DCF-CCLE","data_spreadsheet.tsv")
319+
320+
"""
321+
# Read the file in as a pandas DataFrame
322+
f = os.path.basename(filename)
323+
if f.lower().endswith(".csv"):
324+
df = pd.read_csv(filename, header=0, sep=",", dtype=str).fillna("")
325+
elif f.lower().endswith(".xlsx"):
326+
xl = pd.ExcelFile(filename, dtype=str) # load excel file
327+
sheet = xl.sheet_names[0] # sheetname
328+
df = xl.parse(sheet) # save sheet as dataframe
329+
converters = {
330+
col: str for col in list(df)
331+
} # make sure int isn't converted to float
332+
df = pd.read_excel(filename, converters=converters).fillna("") # remove nan
333+
elif filename.lower().endswith((".tsv", ".txt")):
334+
df = pd.read_csv(filename, header=0, sep="\t", dtype=str).fillna("")
335+
else:
336+
raise Gen3UserError("Please upload a file in CSV, TSV, or XLSX format.")
337+
df.rename(columns = {c: c.lstrip('*') for c in df.columns}, inplace = True) # remove any leading asterisks in the DataFrame column names
338+
339+
# Check uniqueness of submitter_ids:
340+
if len(list(df.submitter_id)) != len(list(df.submitter_id.unique())):
341+
raise Gen3Error(
342+
"Warning: file contains duplicate submitter_ids. \nNote: submitter_ids must be unique within a node!"
343+
)
344+
345+
# Chunk the file
346+
print("\nSubmitting {} with {} records.".format(filename, str(len(df))))
347+
program, project = project_id.split("-", 1)
348+
api_url = "{}/api/v0/submission/{}/{}".format(self._endpoint, program, project)
349+
headers = {"content-type": "text/tab-separated-values"}
350+
351+
start = row_offset
352+
end = row_offset + chunk_size
353+
chunk = df[start:end]
354+
355+
count = 0
356+
357+
results = {
358+
"invalid": {}, # these are invalid records
359+
"other": [], # any unhandled API responses
360+
"details": [], # entire API response details
361+
"succeeded": [], # list of submitter_ids that were successfully updated/created
362+
"responses": [], # list of API response codes
363+
}
364+
365+
# Start the chunking loop:
366+
while (start + len(chunk)) <= len(df):
367+
368+
timeout = False
369+
valid_but_failed = []
370+
invalid = []
371+
count += 1
372+
print(
373+
"Chunk {} (chunk size: {}, submitted: {} of {})".format(
374+
str(count),
375+
str(chunk_size),
376+
str(len(results["succeeded"]) + len(results["invalid"])),
377+
str(len(df)),
378+
)
379+
)
380+
381+
try:
382+
response = requests.put(
383+
api_url,
384+
auth=self._auth_provider,
385+
data=chunk.to_csv(sep="\t", index=False),
386+
headers=headers,
387+
).text
388+
except requests.exceptions.ConnectionError as e:
389+
results["details"].append(e.message)
390+
391+
# Handle the API response
392+
if (
393+
"Request Timeout" in response
394+
or "413 Request Entity Too Large" in response
395+
or "Connection aborted." in response
396+
or "service failure - try again later" in response
397+
): # time-out, response is not valid JSON at the moment
398+
399+
print("\t Reducing Chunk Size: {}".format(response))
400+
results["responses"].append("Reducing Chunk Size: {}".format(response))
401+
timeout = True
402+
403+
else:
404+
try:
405+
json_res = json.loads(response)
406+
except JSONDecodeError as e:
407+
print(response)
408+
print(str(e))
409+
raise Gen3Error("Unable to parse API response as JSON!")
410+
411+
if "message" in json_res and "code" not in json_res:
412+
print(
413+
"\t No code in the API response for Chunk {}: {}".format(
414+
str(count), res.get("message")
415+
)
416+
)
417+
print("\t {}".format(str(res.get("transactional_errors"))))
418+
results["responses"].append(
419+
"Error Chunk {}: {}".format(str(count), res.get("message"))
420+
)
421+
results["other"].append(res.get("transactional_errors"))
422+
423+
elif "code" not in json_res:
424+
print("\t Unhandled API-response: {}".format(response))
425+
results["responses"].append(
426+
"Unhandled API response: {}".format(response)
427+
)
428+
429+
elif json_res["code"] == 200: # success
430+
431+
entities = json_res.get("entities", [])
432+
print("\t Succeeded: {} entities.".format(str(len(entities))))
433+
results["responses"].append(
434+
"Chunk {} Succeeded: {} entities.".format(
435+
str(count), str(len(entities))
436+
)
437+
)
438+
439+
for entity in entities:
440+
sid = entity["unique_keys"][0]["submitter_id"]
441+
results["succeeded"].append(sid)
442+
443+
elif (
444+
json_res["code"] == 400
445+
or json_res["code"] == 403
446+
or json_res["code"] == 404
447+
): # failure
448+
449+
entities = json_res.get("entities", [])
450+
print("\tChunk Failed: {} entities.".format(str(len(entities))))
451+
results["responses"].append(
452+
"Chunk {} Failed: {} entities.".format(
453+
str(count), str(len(entities))
454+
)
455+
)
456+
457+
for entity in entities:
458+
sid = entity["unique_keys"][0]["submitter_id"]
459+
if entity["valid"]: # valid but failed
460+
valid_but_failed.append(sid)
461+
else: # invalid and failed
462+
message = str(entity["errors"])
463+
results["invalid"][sid] = message
464+
invalid.append(sid)
465+
print(
466+
"\tInvalid records in this chunk: {}".format(str(len(invalid)))
467+
)
468+
469+
elif json_res["code"] == 500: # internal server error
470+
471+
print("\t Internal Server Error: {}".format(response))
472+
results["responses"].append(
473+
"Internal Server Error: {}".format(response)
474+
)
475+
476+
if (
477+
len(valid_but_failed) > 0 and len(invalid) > 0
478+
): # if valid entities failed bc grouped with invalid, retry submission
479+
chunk = chunk.loc[
480+
df["submitter_id"].isin(valid_but_failed)
481+
] # these are records that weren't successful because they were part of a chunk that failed, but are valid and can be resubmitted without changes
482+
print(
483+
"Retrying submission of valid entities from failed chunk: {} valid entities.".format(
484+
str(len(chunk))
485+
)
486+
)
487+
488+
elif (
489+
len(valid_but_failed) > 0 and len(invalid) == 0
490+
): # if all entities are valid but submission still failed, probably due to duplicate submitter_ids. Can remove this section once the API response is fixed: https://ctds-planx.atlassian.net/browse/PXP-3065
491+
raise Gen3Error(
492+
"Please check your data for correct file encoding, special characters, or duplicate submitter_ids or ids."
493+
)
494+
495+
elif timeout is False: # get new chunk if didn't timeout
496+
start += chunk_size
497+
end = start + chunk_size
498+
chunk = df[start:end]
499+
500+
else: # if timeout, reduce chunk size and retry smaller chunk
501+
if chunk_size >= 2:
502+
chunk_size = int(chunk_size / 2)
503+
end = start + chunk_size
504+
chunk = df[start:end]
505+
print(
506+
"Retrying Chunk with reduced chunk_size: {}".format(
507+
str(chunk_size)
508+
)
509+
)
510+
timeout = False
511+
else:
512+
raise Gen3SubmissionError(
513+
"Submission is timing out. Please contact the Helpdesk."
514+
)
515+
516+
print("Finished data submission.")
517+
print("Successful records: {}".format(str(len(set(results["succeeded"])))))
518+
print("Failed invalid records: {}".format(str(len(results["invalid"]))))
519+
520+
return results

requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
requests
22
-e git+https://github.com/uc-cdis/[email protected]#egg=indexclient
3+
4+
pandas

0 commit comments

Comments
 (0)