-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapi_datausa_to_bq.py
72 lines (67 loc) · 2.96 KB
/
api_datausa_to_bq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
"""
Example of fetching data from an api and inserting to BigQuery. Before that it creates table in BQ by calling
sub_workflow which is imported from another workflow
BigQuery dataset needs to exists.
This example with input parameters
{
"dataset":"data",
"table":"usa_population"
}
Because of current memory limitations in Cloud Workflows, rows are inserted into BigQuery one by one, not all in single
request, which is not most efficient way...
"""
from workflows import Workflow, Step, Http, Assign, For, Call
from workflows.standard import SysLog
from workflows.connectors.bigquery import BigQueryTableData
from bq_create_table import w as w_create_bq_table
w = Workflow(params='[args]')
w_create_bq_table.name = 'sub_create_bq_table' # change the name of the workflow, because workflow is 'main'
w.add_subworkflow(w_create_bq_table)
# set variables
w.add_step(Step('assign_variables', Assign(variables=[
{'bq_dataset': '${args.dataset}'},
{'bq_table': '${args.table}'},
{'gcp_project': '${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}'}
],
)))
# call included sub_workflow and pass arguments
w.add_step(
Step('create_bq_table',
Call(w_create_bq_table.name, args={
'dataset': '${bq_dataset}',
'table': '${bq_table}'
}
)))
# get data from API with query params and output to result
w.add_step(Step('get_data', Http('get', 'https://datausa.io/api/data', query={'drilldowns': 'State',
'measures': 'Population'},
result='api_res')))
w.add_step(Step('api_log', SysLog(json='${api_res}')))
# body for BigQuery insert
insert_body = {
'rows': [{'json': '${row}'}]
}
# loop through data, get desired fields into variable, insert row in BigQuery and log response
w.add_step(Step('extract_data',
For(list_values='${api_res.body.data}', # here is list from API response
loop_variable='item', # variable name in loop step
steps=[ # steps to execute
# prepare row, get desired data
Step('set_row', Assign(variables=[
{'row':
{'State': '${item.State}',
'Year': '${item.Year}',
'Population': '${item.Population}',
}
},
]
)),
# make BigQuery request
Step('insert_bq',
BigQueryTableData.insertAll('${gcp_project}', '${bq_dataset}', '${bq_table}', insert_body,
'bq_insert')),
# log response
Step('log_item', SysLog(json='${bq_insert}', severity='INFO'))
])))
if __name__ == '__main__':
print(w.to_yaml())