diff --git a/test/config.yaml b/test/config.yaml index 19024a9..cd94001 100644 --- a/test/config.yaml +++ b/test/config.yaml @@ -43,3 +43,5 @@ outputs: - output/sifter.edge.edge.json.gz - output/sifter.vertex.vertex.json.gz +- playbook: examples/code-ref/Pipeline.yaml +- playbook: examples/code-ref/flatMappipeline.yaml diff --git a/test/examples/code-ref/Pipeline.yaml b/test/examples/code-ref/Pipeline.yaml new file mode 100644 index 0000000..ad17ca3 --- /dev/null +++ b/test/examples/code-ref/Pipeline.yaml @@ -0,0 +1,26 @@ + +class: Playbook +name: codeTest + +inputs: + startData: + embedded: + - {"value": 0, "name": "alice"} + - {"value": 1, "name": "bob"} + - {"value": 2, "name": "charlie"} + + +pipelines: + codeTest: + - from: startData + - map: + method: update + gpython: + $ref: map.py + - map: + method: update + gpython: | + def update(x): + x["value"] = x["value"] + 1 + return x + - debug: {} \ No newline at end of file diff --git a/test/examples/code-ref/filter.py b/test/examples/code-ref/filter.py new file mode 100644 index 0000000..7f45d6a --- /dev/null +++ b/test/examples/code-ref/filter.py @@ -0,0 +1,2 @@ +def filter_bob(row): + return row['name'] == "bob" diff --git a/test/examples/code-ref/flatMap.py b/test/examples/code-ref/flatMap.py new file mode 100644 index 0000000..ed431dd --- /dev/null +++ b/test/examples/code-ref/flatMap.py @@ -0,0 +1,19 @@ +def fix(row): + out = { + "identifier":[{ + "system": "https://redivis.com/datasets/ye2v-6skh7wdr7/tables", + "value":str(int(row["person_id"])) + }] + } + + if(row["person_source_value"] is not None): + out["identifier"].append({ + "value": row["person_source_value"], + "system": "https://redivis.com/datasets/ye2v-6skh7wdr7/tables" + }) + else: + out["identifier"].append({"value": "None", "system": "https://redivis.com/datasets/ye2v-6skh7wdr7/tables"}) + + out["identifier"][1]["value"] = str(out["identifier"][1]["value"]) + "_" + "None" + + return out["identifier"] \ No newline at end of file diff --git a/test/examples/code-ref/flatMappipeline.yaml b/test/examples/code-ref/flatMappipeline.yaml new file mode 100644 index 0000000..a59d522 --- /dev/null +++ b/test/examples/code-ref/flatMappipeline.yaml @@ -0,0 +1,47 @@ + +class: Playbook +name: codeTest + +inputs: + startData: + embedded: + - {"COMPLEX_ID":"Complex_76a0f49f-272e-4ec4-bcba-723806b35a31___null__6468_","PROTEIN":"Q9Y296"} + - {"COMPLEX_ID":"Complex_da6f165b-e085-4ec6-ba43-1170756b0a57___null__6967_","PROTEIN":"O08957"} + - {"COMPLEX_ID":"Complex_c688ddcc-a541-4098-ab0d-25b87e5bc5cd___null__1097_","PROTEIN":"Q13347"} + + otherData: + embedded: + - {"person_id": 3589912774911670272, "person_source_value": 10009628, "name": "alice"} + - {"person_id": -3210373572193940992, "person_source_value": 10011398, "name": "bob"} + - {"person_id": -775517641933593344, "person_source_value": 10004235, "name": "charlie"} + +pipelines: + filterpipeline: + - from: startData + - reduce: + field: COMPLEX_ID + method: merge + init: { "proteins": [] } + gpython: + $ref: reduce.py + + - debug: {} + + otherpipelines: + - from: otherData + - filter: + # The [field,match] values and the gpython file give the same result + #field: name + #match: bob + method: filter_bob + gpython: + $ref: filter.py + #- debug: {} + - flatMap: + method: fix + gpython: + $ref: flatMap.py + + - debug: {} + + diff --git a/test/examples/code-ref/map.py b/test/examples/code-ref/map.py new file mode 100644 index 0000000..0ce5511 --- /dev/null +++ b/test/examples/code-ref/map.py @@ -0,0 +1,4 @@ + +def update(x): + x["value"] = x["value"] + 1 + return x \ No newline at end of file diff --git a/test/examples/code-ref/reduce.py b/test/examples/code-ref/reduce.py new file mode 100644 index 0000000..54c93db --- /dev/null +++ b/test/examples/code-ref/reduce.py @@ -0,0 +1,3 @@ +def merge(x,y): + x["proteins"] = [x["PROTEIN"]] + y["proteins"] + return x diff --git a/test/examples/gdc/gdc-convert.yaml b/test/examples/gdc/gdc-convert.yaml index b882814..e3358f6 100644 --- a/test/examples/gdc/gdc-convert.yaml +++ b/test/examples/gdc/gdc-convert.yaml @@ -31,4 +31,8 @@ pipelines: - from: caseObject - graphBuild: schema: "{{config.schema}}" - title: Case \ No newline at end of file + title: Case + EdgeFix: + method: test + gpython: + $ref: test.py diff --git a/test/examples/gdc/test.py b/test/examples/gdc/test.py new file mode 100644 index 0000000..0d25812 --- /dev/null +++ b/test/examples/gdc/test.py @@ -0,0 +1,3 @@ +def test(row): + row["TEST"] = "test_string" + return row diff --git a/transform/code_block.go b/transform/code_block.go new file mode 100644 index 0000000..015171e --- /dev/null +++ b/transform/code_block.go @@ -0,0 +1,45 @@ +package transform + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" +) + +type CodeBlock struct { + Code string + Ref string + BaseDir string +} + +func (cb *CodeBlock) UnmarshalJSON(data []byte) error { + if err := json.Unmarshal(data, &cb.Code); err == nil { + return nil + } + ref := map[string]any{} + if err := json.Unmarshal(data, &ref); err == nil { + if path, ok := ref["$ref"]; ok { + if pathStr, ok := path.(string); ok { + cb.Ref = pathStr + return nil + } + } + } + return fmt.Errorf("unknown code block type") +} + +func (cb *CodeBlock) SetBaseDir(path string) { + cb.BaseDir = path +} + +func (cb *CodeBlock) String() string { + if cb.Ref != "" { + path := filepath.Join(cb.BaseDir, cb.Ref) + data, err := os.ReadFile(path) + if err == nil { + cb.Code = string(data) + } + } + return cb.Code +} diff --git a/transform/filter.go b/transform/filter.go index e46b894..b383e20 100644 --- a/transform/filter.go +++ b/transform/filter.go @@ -9,13 +9,13 @@ import ( ) type FilterStep struct { - Field string `json:"field"` - Value string `json:"value"` - Match string `json:"match"` - Check string `json:"check" jsonschema_description:"How to check value, 'exists' or 'hasValue'"` - Method string `json:"method"` - Python string `json:"python"` - GPython string `json:"gpython"` + Field string `json:"field"` + Value string `json:"value"` + Match string `json:"match"` + Check string `json:"check" jsonschema_description:"How to check value, 'exists' or 'hasValue'"` + Method string `json:"method"` + Python string `json:"python"` + GPython *CodeBlock `json:"gpython"` } type filterProcessor struct { @@ -34,10 +34,11 @@ func (fs FilterStep) Init(task task.RuntimeTask) (Processor, error) { log.Printf("Compile Error: %s", err) } return &filterProcessor{fs, c, task}, nil - } else if fs.GPython != "" && fs.Method != "" { + } else if fs.GPython != nil && fs.Method != "" { log.Printf("Starting Filter Map: %s", fs.GPython) + fs.GPython.SetBaseDir(task.BaseDir()) e := evaluate.GetEngine("gpython", task.WorkDir()) - c, err := e.Compile(fs.GPython, fs.Method) + c, err := e.Compile(fs.GPython.String(), fs.Method) if err != nil { log.Printf("Compile Error: %s", err) } diff --git a/transform/flat_map.go b/transform/flat_map.go index 0f225f2..9023383 100644 --- a/transform/flat_map.go +++ b/transform/flat_map.go @@ -11,9 +11,9 @@ import ( ) type FlatMapStep struct { - Method string `json:"method" jsonschema_description:"Name of function to call"` - Python string `json:"python" jsonschema_description:"Python code to be run"` - GPython string `json:"gpython" jsonschema_description:"Python code to be run using GPython"` + Method string `json:"method" jsonschema_description:"Name of function to call"` + Python string `json:"python" jsonschema_description:"Python code to be run"` + GPython *CodeBlock `json:"gpython" jsonschema_description:"Python code to be run using GPython"` } type flatMapProcess struct { @@ -30,10 +30,11 @@ func (ms *FlatMapStep) Init(task task.RuntimeTask) (Processor, error) { log.Printf("Compile Error: %s", err) } return &flatMapProcess{ms, c}, nil - } else if ms.GPython != "" { + } else if ms.GPython != nil { log.Printf("Init Map: %s", ms.GPython) + ms.GPython.SetBaseDir(task.BaseDir()) e := evaluate.GetEngine("gpython", task.WorkDir()) - c, err := e.Compile(ms.GPython, ms.Method) + c, err := e.Compile(ms.GPython.String(), ms.Method) if err != nil { log.Printf("Compile Error: %s", err) } diff --git a/transform/graph_build.go b/transform/graph_build.go index 2720aba..7ed31b1 100644 --- a/transform/graph_build.go +++ b/transform/graph_build.go @@ -10,8 +10,8 @@ import ( ) type EdgeFix struct { - Method string `json:"method"` - GPython string `json:"gpython"` + Method string `json:"method"` + GPython *CodeBlock `json:"gpython"` } type GraphBuildStep struct { @@ -49,10 +49,11 @@ func (ts GraphBuildStep) Init(task task.RuntimeTask) (Processor, error) { var edgeFix evaluate.Processor if ts.EdgeFix != nil { - if ts.EdgeFix.GPython != "" { + if ts.EdgeFix.GPython != nil { + ts.EdgeFix.GPython.SetBaseDir(task.BaseDir()) log.Printf("Init Map: %s", ts.EdgeFix.GPython) e := evaluate.GetEngine("gpython", task.WorkDir()) - c, err := e.Compile(ts.EdgeFix.GPython, ts.EdgeFix.Method) + c, err := e.Compile(ts.EdgeFix.GPython.String(), ts.EdgeFix.Method) if err != nil { log.Printf("Compile Error: %s", err) } diff --git a/transform/mapping.go b/transform/mapping.go index a2d3ab6..1496010 100644 --- a/transform/mapping.go +++ b/transform/mapping.go @@ -11,9 +11,9 @@ import ( ) type MapStep struct { - Method string `json:"method" jsonschema_description:"Name of function to call"` - Python string `json:"python" jsonschema_description:"Python code to be run"` - GPython string `json:"gpython" jsonschema_description:"Python code to be run using GPython"` + Method string `json:"method" jsonschema_description:"Name of function to call"` + Python string `json:"python" jsonschema_description:"Python code to be run"` + GPython *CodeBlock `json:"gpython" jsonschema_description:"Python code to be run using GPython"` } type mapProcess struct { @@ -30,10 +30,11 @@ func (ms *MapStep) Init(task task.RuntimeTask) (Processor, error) { log.Printf("Compile Error: %s", err) } return &mapProcess{ms, c}, nil - } else if ms.GPython != "" { + } else if ms.GPython != nil { log.Printf("Init Map: %s", ms.GPython) + ms.GPython.SetBaseDir(task.BaseDir()) e := evaluate.GetEngine("gpython", task.WorkDir()) - c, err := e.Compile(ms.GPython, ms.Method) + c, err := e.Compile(ms.GPython.String(), ms.Method) if err != nil { log.Printf("Compile Error: %s", err) } diff --git a/transform/reduce.go b/transform/reduce.go index 2f8a450..8cabcaa 100644 --- a/transform/reduce.go +++ b/transform/reduce.go @@ -12,7 +12,7 @@ type ReduceStep struct { Field string `json:"field"` Method string `json:"method"` Python string `json:"python"` - GPython string `json:"gpython"` + GPython *CodeBlock `json:"gpython"` InitData *map[string]interface{} `json:"init"` } @@ -31,11 +31,12 @@ func (ms *ReduceStep) Init(t task.RuntimeTask) (Processor, error) { log.Printf("Compile Error: %s", err) } return &reduceProcess{ms, c}, nil - } else if ms.GPython != "" { + } else if ms.GPython != nil { + ms.GPython.SetBaseDir(t.BaseDir()) log.Printf("ReduceInit: %s", ms.InitData) log.Printf("Reduce: %s", ms.GPython) e := evaluate.GetEngine("gpython", t.WorkDir()) - c, err := e.Compile(ms.GPython, ms.Method) + c, err := e.Compile(ms.GPython.String(), ms.Method) if err != nil { log.Printf("Compile Error: %s", err) }