Skip to content

Commit

Permalink
✨ add code to download job results
Browse files Browse the repository at this point in the history
  • Loading branch information
sickler-alex committed Apr 1, 2024
1 parent d39399b commit 95fd403
Showing 1 changed file with 74 additions and 67 deletions.
141 changes: 74 additions & 67 deletions d3b_dff_cli/modules/dewrangle/helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import sys
import traceback
import configparser
import requests
import pandas as pd
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
from datetime import datetime
Expand Down Expand Up @@ -65,6 +67,20 @@ def create_gql_client(endpoint=None, api_key=None):
return client


def create_rest_creds(endpoint=None, api_key=None):
"""Create Rest connection"""

# default endpoint
if endpoint is None:
endpoint = "https://dewrangle.com/api/rest/jobs/"

if api_key:
req_header = {"X-Api-Key": api_key}
else:
req_header = {"X-Api-Key": get_api_credential()}
return endpoint, req_header


def get_study_credentials(client, study_id):
"""Get credential ids from a study."""

Expand Down Expand Up @@ -485,55 +501,6 @@ def get_billing_groups(client, org_id):
return billing_groups


def get_volume_jobs(client, vid):
"""Query volume for a list of jobs"""
jobs = {}

query = gql(
"""
query Volume_Job_Query($id: ID!) {
volume: node(id: $id) {
id
... on Volume {
jobs {
edges {
node {
id
operation
completedAt
createdAt
}
}
}
}
}
}
"""
)

params = {"id": vid}

# run query
result = client.execute(query, variable_values=params)

# format result
for vol in result:
for job in result[vol]["jobs"]:
for node in result[vol]["jobs"][job]:
id = node["node"]["id"]
# convert createdAt from string to datetime object
created = datetime.strptime(
node["node"]["createdAt"], "%Y-%m-%dT%H:%M:%S.%fZ"
)
op = node["node"]["operation"]
comp = datetime.strptime(
node["node"]["completedAt"], "%Y-%m-%dT%H:%M:%S.%fZ"
)
jobs[id] = {"operation": op, "createdAt": created, "completedAt": comp}

return jobs


def get_most_recent_job(client, vid, job_type):
"""Query volume and get most recent job"""
jid = None
Expand Down Expand Up @@ -566,9 +533,6 @@ def get_most_recent_job(client, vid, job_type):
def get_job_info(jobid, client=None):
"""Query job info with job id"""

if client is None:
client = create_gql_client()

query = gql(
"""
query Job_Query($id: ID!) {
Expand All @@ -585,11 +549,6 @@ def get_job_info(jobid, client=None):
id
}
}
billingGroup {
name
}
cost {
cents
}
parentJob {
id
Expand All @@ -603,11 +562,6 @@ def get_job_info(jobid, client=None):
id
}
}
billingGroup {
name
}
cost {
cents
}
}
children {
Expand All @@ -622,11 +576,6 @@ def get_job_info(jobid, client=None):
id
}
}
billingGroup {
name
}
cost {
cents
}
}
}
Expand All @@ -641,3 +590,61 @@ def get_job_info(jobid, client=None):
result = client.execute(query, variable_values=params)

return result


def request_to_df(url, **kwargs):
"""Call api and return response as a pandas dataframe."""
my_data = []
with requests.get(url, **kwargs) as response:
# check if the request was successful
if response.status_code == 200:
for line in response.iter_lines():
my_data.append(line.decode().split(","))
else:
print(f"Failed to fetch the CSV. Status code: {response.status_code}")

my_cols = my_data.pop(0)
df = pd.DataFrame(my_data, columns=my_cols)
return df


def download_job_result(jobid, client=None, api_key=None):
"""Check if a job is complete, download results if it is.
If the job is a list and hash job, only download the hash result."""

endpoint, req_header = create_rest_creds(api_key=api_key)

job_status = None

job_result = None

job_info = get_job_info(jobid, client)

# check if it's done
if (
job_info["job"]["completedAt"] != ""
and job_info["job"]["completedAt"] is not None
):
job_status = "Complete"

else:
job_status = "Incomplete"

if job_status == "Complete":
job_type = job_info["job"]["operation"]
# we can only download results for hash or list jobs so check that the job is one of those
if job_type in ["VOLUME_LIST", "VOLUME_HASH", "VOLUME_LIST_AND_HASH"]:
# if the job is a parent job, find the hash job to get it's result
if (
job_type == "VOLUME_LIST_AND_HASH"
and len(job_info["job"]["children"]) != 0
):
for child_job in job_info["job"]["children"]:
if child_job["operation"] == "VOLUME_HASH":
jobid = child_job["id"]
url = endpoint + jobid + "/result"
job_result = request_to_df(url, headers=req_header, stream=True)
else:
print("Job type {} does not have results to download".format(job_type))

return job_status, job_result

0 comments on commit 95fd403

Please sign in to comment.