From 293114e236aa9496c8a781467c6b2417dff4eae2 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Fri, 12 Jan 2024 08:06:17 -0800 Subject: [PATCH 1/4] Adding ability to use '$ref' to refer to external code files --- test/examples/code-ref/code.py | 4 +++ test/examples/code-ref/pipeline.yaml | 26 ++++++++++++++++ transform/code_block.go | 45 ++++++++++++++++++++++++++++ transform/mapping.go | 11 +++---- 4 files changed, 81 insertions(+), 5 deletions(-) create mode 100644 test/examples/code-ref/code.py create mode 100644 test/examples/code-ref/pipeline.yaml create mode 100644 transform/code_block.go diff --git a/test/examples/code-ref/code.py b/test/examples/code-ref/code.py new file mode 100644 index 0000000..0ce5511 --- /dev/null +++ b/test/examples/code-ref/code.py @@ -0,0 +1,4 @@ + +def update(x): + x["value"] = x["value"] + 1 + return x \ No newline at end of file diff --git a/test/examples/code-ref/pipeline.yaml b/test/examples/code-ref/pipeline.yaml new file mode 100644 index 0000000..084a057 --- /dev/null +++ b/test/examples/code-ref/pipeline.yaml @@ -0,0 +1,26 @@ + +class: Playbook +name: codeTest + +inputs: + startData: + embedded: + - {"value": 0, "name": "alice"} + - {"value": 1, "name": "bob"} + - {"value": 2, "name": "charlie"} + + +pipelines: + codeTest: + - from: startData + - map: + method: update + gpython: + $ref: code.py + - map: + method: update + gpython: | + def update(x): + x["value"] = x["value"] + 1 + return x + - debug: {} \ No newline at end of file diff --git a/transform/code_block.go b/transform/code_block.go new file mode 100644 index 0000000..015171e --- /dev/null +++ b/transform/code_block.go @@ -0,0 +1,45 @@ +package transform + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" +) + +type CodeBlock struct { + Code string + Ref string + BaseDir string +} + +func (cb *CodeBlock) UnmarshalJSON(data []byte) error { + if err := json.Unmarshal(data, &cb.Code); err == nil { + return nil + } + ref := map[string]any{} + if err := json.Unmarshal(data, &ref); err == nil { + if path, ok := ref["$ref"]; ok { + if pathStr, ok := path.(string); ok { + cb.Ref = pathStr + return nil + } + } + } + return fmt.Errorf("unknown code block type") +} + +func (cb *CodeBlock) SetBaseDir(path string) { + cb.BaseDir = path +} + +func (cb *CodeBlock) String() string { + if cb.Ref != "" { + path := filepath.Join(cb.BaseDir, cb.Ref) + data, err := os.ReadFile(path) + if err == nil { + cb.Code = string(data) + } + } + return cb.Code +} diff --git a/transform/mapping.go b/transform/mapping.go index a2d3ab6..1496010 100644 --- a/transform/mapping.go +++ b/transform/mapping.go @@ -11,9 +11,9 @@ import ( ) type MapStep struct { - Method string `json:"method" jsonschema_description:"Name of function to call"` - Python string `json:"python" jsonschema_description:"Python code to be run"` - GPython string `json:"gpython" jsonschema_description:"Python code to be run using GPython"` + Method string `json:"method" jsonschema_description:"Name of function to call"` + Python string `json:"python" jsonschema_description:"Python code to be run"` + GPython *CodeBlock `json:"gpython" jsonschema_description:"Python code to be run using GPython"` } type mapProcess struct { @@ -30,10 +30,11 @@ func (ms *MapStep) Init(task task.RuntimeTask) (Processor, error) { log.Printf("Compile Error: %s", err) } return &mapProcess{ms, c}, nil - } else if ms.GPython != "" { + } else if ms.GPython != nil { log.Printf("Init Map: %s", ms.GPython) + ms.GPython.SetBaseDir(task.BaseDir()) e := evaluate.GetEngine("gpython", task.WorkDir()) - c, err := e.Compile(ms.GPython, ms.Method) + c, err := e.Compile(ms.GPython.String(), ms.Method) if err != nil { log.Printf("Compile Error: %s", err) } From ba400805d3a4fb7b937b45c79291544f883dc508 Mon Sep 17 00:00:00 2001 From: matthewpeterkort Date: Fri, 12 Jan 2024 13:21:09 -0800 Subject: [PATCH 2/4] adds tests, additional properties --- test/config.yaml | 2 + test/examples/code-ref/filter.py | 2 + test/examples/code-ref/flatMap.py | 19 ++++++++ test/examples/code-ref/flatMap_pipeline.yaml | 47 +++++++++++++++++++ .../code-ref/{code.py => map_code.py} | 0 .../{pipeline.yaml => map_pipeline.yaml} | 2 +- test/examples/code-ref/reduce.py | 3 ++ test/examples/gdc/gdc-convert.yaml | 6 ++- transform/filter.go | 19 ++++---- transform/flat_map.go | 11 +++-- transform/graph_build.go | 9 ++-- transform/reduce.go | 7 +-- 12 files changed, 104 insertions(+), 23 deletions(-) create mode 100644 test/examples/code-ref/filter.py create mode 100644 test/examples/code-ref/flatMap.py create mode 100644 test/examples/code-ref/flatMap_pipeline.yaml rename test/examples/code-ref/{code.py => map_code.py} (100%) rename test/examples/code-ref/{pipeline.yaml => map_pipeline.yaml} (93%) create mode 100644 test/examples/code-ref/reduce.py diff --git a/test/config.yaml b/test/config.yaml index 19024a9..22480fe 100644 --- a/test/config.yaml +++ b/test/config.yaml @@ -43,3 +43,5 @@ outputs: - output/sifter.edge.edge.json.gz - output/sifter.vertex.vertex.json.gz +- playbook: examples/code-ref/flatMap_pipeline.yaml +- playbook: examples/code-ref/map_pipeline.yaml diff --git a/test/examples/code-ref/filter.py b/test/examples/code-ref/filter.py new file mode 100644 index 0000000..7f45d6a --- /dev/null +++ b/test/examples/code-ref/filter.py @@ -0,0 +1,2 @@ +def filter_bob(row): + return row['name'] == "bob" diff --git a/test/examples/code-ref/flatMap.py b/test/examples/code-ref/flatMap.py new file mode 100644 index 0000000..ed431dd --- /dev/null +++ b/test/examples/code-ref/flatMap.py @@ -0,0 +1,19 @@ +def fix(row): + out = { + "identifier":[{ + "system": "https://redivis.com/datasets/ye2v-6skh7wdr7/tables", + "value":str(int(row["person_id"])) + }] + } + + if(row["person_source_value"] is not None): + out["identifier"].append({ + "value": row["person_source_value"], + "system": "https://redivis.com/datasets/ye2v-6skh7wdr7/tables" + }) + else: + out["identifier"].append({"value": "None", "system": "https://redivis.com/datasets/ye2v-6skh7wdr7/tables"}) + + out["identifier"][1]["value"] = str(out["identifier"][1]["value"]) + "_" + "None" + + return out["identifier"] \ No newline at end of file diff --git a/test/examples/code-ref/flatMap_pipeline.yaml b/test/examples/code-ref/flatMap_pipeline.yaml new file mode 100644 index 0000000..a59d522 --- /dev/null +++ b/test/examples/code-ref/flatMap_pipeline.yaml @@ -0,0 +1,47 @@ + +class: Playbook +name: codeTest + +inputs: + startData: + embedded: + - {"COMPLEX_ID":"Complex_76a0f49f-272e-4ec4-bcba-723806b35a31___null__6468_","PROTEIN":"Q9Y296"} + - {"COMPLEX_ID":"Complex_da6f165b-e085-4ec6-ba43-1170756b0a57___null__6967_","PROTEIN":"O08957"} + - {"COMPLEX_ID":"Complex_c688ddcc-a541-4098-ab0d-25b87e5bc5cd___null__1097_","PROTEIN":"Q13347"} + + otherData: + embedded: + - {"person_id": 3589912774911670272, "person_source_value": 10009628, "name": "alice"} + - {"person_id": -3210373572193940992, "person_source_value": 10011398, "name": "bob"} + - {"person_id": -775517641933593344, "person_source_value": 10004235, "name": "charlie"} + +pipelines: + filterpipeline: + - from: startData + - reduce: + field: COMPLEX_ID + method: merge + init: { "proteins": [] } + gpython: + $ref: reduce.py + + - debug: {} + + otherpipelines: + - from: otherData + - filter: + # The [field,match] values and the gpython file give the same result + #field: name + #match: bob + method: filter_bob + gpython: + $ref: filter.py + #- debug: {} + - flatMap: + method: fix + gpython: + $ref: flatMap.py + + - debug: {} + + diff --git a/test/examples/code-ref/code.py b/test/examples/code-ref/map_code.py similarity index 100% rename from test/examples/code-ref/code.py rename to test/examples/code-ref/map_code.py diff --git a/test/examples/code-ref/pipeline.yaml b/test/examples/code-ref/map_pipeline.yaml similarity index 93% rename from test/examples/code-ref/pipeline.yaml rename to test/examples/code-ref/map_pipeline.yaml index 084a057..efa3f6d 100644 --- a/test/examples/code-ref/pipeline.yaml +++ b/test/examples/code-ref/map_pipeline.yaml @@ -16,7 +16,7 @@ pipelines: - map: method: update gpython: - $ref: code.py + $ref: map_code.py - map: method: update gpython: | diff --git a/test/examples/code-ref/reduce.py b/test/examples/code-ref/reduce.py new file mode 100644 index 0000000..54c93db --- /dev/null +++ b/test/examples/code-ref/reduce.py @@ -0,0 +1,3 @@ +def merge(x,y): + x["proteins"] = [x["PROTEIN"]] + y["proteins"] + return x diff --git a/test/examples/gdc/gdc-convert.yaml b/test/examples/gdc/gdc-convert.yaml index b882814..e3358f6 100644 --- a/test/examples/gdc/gdc-convert.yaml +++ b/test/examples/gdc/gdc-convert.yaml @@ -31,4 +31,8 @@ pipelines: - from: caseObject - graphBuild: schema: "{{config.schema}}" - title: Case \ No newline at end of file + title: Case + EdgeFix: + method: test + gpython: + $ref: test.py diff --git a/transform/filter.go b/transform/filter.go index e46b894..b383e20 100644 --- a/transform/filter.go +++ b/transform/filter.go @@ -9,13 +9,13 @@ import ( ) type FilterStep struct { - Field string `json:"field"` - Value string `json:"value"` - Match string `json:"match"` - Check string `json:"check" jsonschema_description:"How to check value, 'exists' or 'hasValue'"` - Method string `json:"method"` - Python string `json:"python"` - GPython string `json:"gpython"` + Field string `json:"field"` + Value string `json:"value"` + Match string `json:"match"` + Check string `json:"check" jsonschema_description:"How to check value, 'exists' or 'hasValue'"` + Method string `json:"method"` + Python string `json:"python"` + GPython *CodeBlock `json:"gpython"` } type filterProcessor struct { @@ -34,10 +34,11 @@ func (fs FilterStep) Init(task task.RuntimeTask) (Processor, error) { log.Printf("Compile Error: %s", err) } return &filterProcessor{fs, c, task}, nil - } else if fs.GPython != "" && fs.Method != "" { + } else if fs.GPython != nil && fs.Method != "" { log.Printf("Starting Filter Map: %s", fs.GPython) + fs.GPython.SetBaseDir(task.BaseDir()) e := evaluate.GetEngine("gpython", task.WorkDir()) - c, err := e.Compile(fs.GPython, fs.Method) + c, err := e.Compile(fs.GPython.String(), fs.Method) if err != nil { log.Printf("Compile Error: %s", err) } diff --git a/transform/flat_map.go b/transform/flat_map.go index 0f225f2..9023383 100644 --- a/transform/flat_map.go +++ b/transform/flat_map.go @@ -11,9 +11,9 @@ import ( ) type FlatMapStep struct { - Method string `json:"method" jsonschema_description:"Name of function to call"` - Python string `json:"python" jsonschema_description:"Python code to be run"` - GPython string `json:"gpython" jsonschema_description:"Python code to be run using GPython"` + Method string `json:"method" jsonschema_description:"Name of function to call"` + Python string `json:"python" jsonschema_description:"Python code to be run"` + GPython *CodeBlock `json:"gpython" jsonschema_description:"Python code to be run using GPython"` } type flatMapProcess struct { @@ -30,10 +30,11 @@ func (ms *FlatMapStep) Init(task task.RuntimeTask) (Processor, error) { log.Printf("Compile Error: %s", err) } return &flatMapProcess{ms, c}, nil - } else if ms.GPython != "" { + } else if ms.GPython != nil { log.Printf("Init Map: %s", ms.GPython) + ms.GPython.SetBaseDir(task.BaseDir()) e := evaluate.GetEngine("gpython", task.WorkDir()) - c, err := e.Compile(ms.GPython, ms.Method) + c, err := e.Compile(ms.GPython.String(), ms.Method) if err != nil { log.Printf("Compile Error: %s", err) } diff --git a/transform/graph_build.go b/transform/graph_build.go index 2720aba..7ed31b1 100644 --- a/transform/graph_build.go +++ b/transform/graph_build.go @@ -10,8 +10,8 @@ import ( ) type EdgeFix struct { - Method string `json:"method"` - GPython string `json:"gpython"` + Method string `json:"method"` + GPython *CodeBlock `json:"gpython"` } type GraphBuildStep struct { @@ -49,10 +49,11 @@ func (ts GraphBuildStep) Init(task task.RuntimeTask) (Processor, error) { var edgeFix evaluate.Processor if ts.EdgeFix != nil { - if ts.EdgeFix.GPython != "" { + if ts.EdgeFix.GPython != nil { + ts.EdgeFix.GPython.SetBaseDir(task.BaseDir()) log.Printf("Init Map: %s", ts.EdgeFix.GPython) e := evaluate.GetEngine("gpython", task.WorkDir()) - c, err := e.Compile(ts.EdgeFix.GPython, ts.EdgeFix.Method) + c, err := e.Compile(ts.EdgeFix.GPython.String(), ts.EdgeFix.Method) if err != nil { log.Printf("Compile Error: %s", err) } diff --git a/transform/reduce.go b/transform/reduce.go index 2f8a450..8cabcaa 100644 --- a/transform/reduce.go +++ b/transform/reduce.go @@ -12,7 +12,7 @@ type ReduceStep struct { Field string `json:"field"` Method string `json:"method"` Python string `json:"python"` - GPython string `json:"gpython"` + GPython *CodeBlock `json:"gpython"` InitData *map[string]interface{} `json:"init"` } @@ -31,11 +31,12 @@ func (ms *ReduceStep) Init(t task.RuntimeTask) (Processor, error) { log.Printf("Compile Error: %s", err) } return &reduceProcess{ms, c}, nil - } else if ms.GPython != "" { + } else if ms.GPython != nil { + ms.GPython.SetBaseDir(t.BaseDir()) log.Printf("ReduceInit: %s", ms.InitData) log.Printf("Reduce: %s", ms.GPython) e := evaluate.GetEngine("gpython", t.WorkDir()) - c, err := e.Compile(ms.GPython, ms.Method) + c, err := e.Compile(ms.GPython.String(), ms.Method) if err != nil { log.Printf("Compile Error: %s", err) } From 9899d8568a79778f6a8b9aeb5ea776e4d867447d Mon Sep 17 00:00:00 2001 From: matthewpeterkort Date: Fri, 12 Jan 2024 13:29:33 -0800 Subject: [PATCH 3/4] Change naming --- test/config.yaml | 4 ++-- test/examples/code-ref/{map_pipeline.yaml => Pipeline.yaml} | 2 +- .../code-ref/{flatMap_pipeline.yaml => flatMappipeline.yaml} | 0 test/examples/code-ref/{map_code.py => map.py} | 0 4 files changed, 3 insertions(+), 3 deletions(-) rename test/examples/code-ref/{map_pipeline.yaml => Pipeline.yaml} (93%) rename test/examples/code-ref/{flatMap_pipeline.yaml => flatMappipeline.yaml} (100%) rename test/examples/code-ref/{map_code.py => map.py} (100%) diff --git a/test/config.yaml b/test/config.yaml index 22480fe..cd94001 100644 --- a/test/config.yaml +++ b/test/config.yaml @@ -43,5 +43,5 @@ outputs: - output/sifter.edge.edge.json.gz - output/sifter.vertex.vertex.json.gz -- playbook: examples/code-ref/flatMap_pipeline.yaml -- playbook: examples/code-ref/map_pipeline.yaml +- playbook: examples/code-ref/Pipeline.yaml +- playbook: examples/code-ref/flatMappipeline.yaml diff --git a/test/examples/code-ref/map_pipeline.yaml b/test/examples/code-ref/Pipeline.yaml similarity index 93% rename from test/examples/code-ref/map_pipeline.yaml rename to test/examples/code-ref/Pipeline.yaml index efa3f6d..ad17ca3 100644 --- a/test/examples/code-ref/map_pipeline.yaml +++ b/test/examples/code-ref/Pipeline.yaml @@ -16,7 +16,7 @@ pipelines: - map: method: update gpython: - $ref: map_code.py + $ref: map.py - map: method: update gpython: | diff --git a/test/examples/code-ref/flatMap_pipeline.yaml b/test/examples/code-ref/flatMappipeline.yaml similarity index 100% rename from test/examples/code-ref/flatMap_pipeline.yaml rename to test/examples/code-ref/flatMappipeline.yaml diff --git a/test/examples/code-ref/map_code.py b/test/examples/code-ref/map.py similarity index 100% rename from test/examples/code-ref/map_code.py rename to test/examples/code-ref/map.py From 12fe5757d4b95ab9ca64c2c4bfaf5431fc6e4901 Mon Sep 17 00:00:00 2001 From: matthewpeterkort Date: Fri, 12 Jan 2024 13:34:05 -0800 Subject: [PATCH 4/4] Adds missing file --- test/examples/gdc/test.py | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 test/examples/gdc/test.py diff --git a/test/examples/gdc/test.py b/test/examples/gdc/test.py new file mode 100644 index 0000000..0d25812 --- /dev/null +++ b/test/examples/gdc/test.py @@ -0,0 +1,3 @@ +def test(row): + row["TEST"] = "test_string" + return row