-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathoutputs_and_yaql.yml
91 lines (81 loc) · 3.38 KB
/
outputs_and_yaql.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# This example workflow demonstrates how to access job outputs and how to use
# YAQL to query the outputs for specific objects.
#
# Workflows in the Civis Platform are written in YAML and a workflow DSL
# (domain specific language) called Mistral.
#
# See this website, https://docs.ansible.com/ansible/latest/reference_appendices/YAMLSyntax.html,
# for an introduction to YAML.
#
# See the Mistral documentation, https://docs.openstack.org/mistral/train/user/wf_lang_v2.html,
# for a description of the Mistral DSL.
#
# See the YAQL docs for more details, https://yaql.readthedocs.io/en/latest/.
version: '2.0' # you always need this key to specify version 2 of the mistral DSL
outputs_and_yaql: # the name of the workflow can be anything, but Platform ignores it
tasks:
make_outputs:
action: civis.scripts.python3
input:
name: make outputs
# The code here writes some text data to some files and then uses
# the API client to access the files.
source: |
import civis
import os
f1 = open('tmp_data1.txt', 'a+')
f1.write("hi1\n")
f1.seek(0)
file_id1 = civis.io.file_to_civis(f1, 'f1')
f1.close()
f1 = open('tmp_data2.txt', 'a+')
f1.write("hi2\n")
f1.seek(0)
file_id2 = civis.io.file_to_civis(f1, 'f2')
f1.close()
client = civis.APIClient()
client.scripts.post_python3_runs_outputs(
os.environ['CIVIS_JOB_ID'], os.environ['CIVIS_RUN_ID'], 'File', file_id1)
client.scripts.post_python3_runs_outputs(
os.environ['CIVIS_JOB_ID'], os.environ['CIVIS_RUN_ID'], 'File', file_id2)
on-success:
- find_outputs
find_outputs:
action: civis.scripts.python3
input:
name: find_outputs
# Now we are using the attached task outputs and YAQL to find a specific
# output.
#
# The expression in `<%...%>` is YAQL with some Mistral extensions.
#
# The syntax `task(make_outputs).result` gets the results of a task.
#
# The Civis mistral client attaches run outputs to the `outputs`
# attribute of the result.
#
# We then use YAQL to query the run outputs. In YAQL, the `$` sign
# refers to the specific "row" in thw query. The rest of the syntax is
# fairly SQL-like, but `.` is used to access attributes of the row.
# Here we access the `name` attribute (i.e., the name of object attached
# as a run output) and find the one with the name 'f1'.
#
# Finally, we get the object id (or file id), via accessing the `.object_id`
# attribute. One gotcha here is that YAQL returns a list of all objects
# matching the query, even if there is only one. We pull the first item
# off the list.
#
# As an aside, you can access the Platform run id and
# job_ids for any Civis action via
#
# <% task(make_outputs).result.run_id %> # the run ID
# <% task(make_outputs).result.job_id %> # the run ID
#
source: |
import io
import civis;
file_ids = <% task(make_outputs).result.outputs.where($.name = 'f1').object_id %>
buff = io.BytesIO()
civis.io.civis_to_file(file_ids[0], buff)
buff.seek(0)
print(buff.read(), flush=True)