Skip to content

Commit

Permalink
Merge pull request #65 from bmeg/feature/code-ref
Browse files Browse the repository at this point in the history
[WIP] External code references
  • Loading branch information
kellrott authored Jan 25, 2024
2 parents bb5c66f + 12fe575 commit 27f0379
Show file tree
Hide file tree
Showing 15 changed files with 187 additions and 27 deletions.
2 changes: 2 additions & 0 deletions test/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,5 @@
outputs:
- output/sifter.edge.edge.json.gz
- output/sifter.vertex.vertex.json.gz
- playbook: examples/code-ref/Pipeline.yaml
- playbook: examples/code-ref/flatMappipeline.yaml
26 changes: 26 additions & 0 deletions test/examples/code-ref/Pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@

class: Playbook
name: codeTest

inputs:
startData:
embedded:
- {"value": 0, "name": "alice"}
- {"value": 1, "name": "bob"}
- {"value": 2, "name": "charlie"}


pipelines:
codeTest:
- from: startData
- map:
method: update
gpython:
$ref: map.py
- map:
method: update
gpython: |
def update(x):
x["value"] = x["value"] + 1
return x
- debug: {}
2 changes: 2 additions & 0 deletions test/examples/code-ref/filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def filter_bob(row):
return row['name'] == "bob"
19 changes: 19 additions & 0 deletions test/examples/code-ref/flatMap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
def fix(row):
out = {
"identifier":[{
"system": "https://redivis.com/datasets/ye2v-6skh7wdr7/tables",
"value":str(int(row["person_id"]))
}]
}

if(row["person_source_value"] is not None):
out["identifier"].append({
"value": row["person_source_value"],
"system": "https://redivis.com/datasets/ye2v-6skh7wdr7/tables"
})
else:
out["identifier"].append({"value": "None", "system": "https://redivis.com/datasets/ye2v-6skh7wdr7/tables"})

out["identifier"][1]["value"] = str(out["identifier"][1]["value"]) + "_" + "None"

return out["identifier"]
47 changes: 47 additions & 0 deletions test/examples/code-ref/flatMappipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@

class: Playbook
name: codeTest

inputs:
startData:
embedded:
- {"COMPLEX_ID":"Complex_76a0f49f-272e-4ec4-bcba-723806b35a31___null__6468_","PROTEIN":"Q9Y296"}
- {"COMPLEX_ID":"Complex_da6f165b-e085-4ec6-ba43-1170756b0a57___null__6967_","PROTEIN":"O08957"}
- {"COMPLEX_ID":"Complex_c688ddcc-a541-4098-ab0d-25b87e5bc5cd___null__1097_","PROTEIN":"Q13347"}

otherData:
embedded:
- {"person_id": 3589912774911670272, "person_source_value": 10009628, "name": "alice"}
- {"person_id": -3210373572193940992, "person_source_value": 10011398, "name": "bob"}
- {"person_id": -775517641933593344, "person_source_value": 10004235, "name": "charlie"}

pipelines:
filterpipeline:
- from: startData
- reduce:
field: COMPLEX_ID
method: merge
init: { "proteins": [] }
gpython:
$ref: reduce.py

- debug: {}

otherpipelines:
- from: otherData
- filter:
# The [field,match] values and the gpython file give the same result
#field: name
#match: bob
method: filter_bob
gpython:
$ref: filter.py
#- debug: {}
- flatMap:
method: fix
gpython:
$ref: flatMap.py

- debug: {}


4 changes: 4 additions & 0 deletions test/examples/code-ref/map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

def update(x):
x["value"] = x["value"] + 1
return x
3 changes: 3 additions & 0 deletions test/examples/code-ref/reduce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
def merge(x,y):
x["proteins"] = [x["PROTEIN"]] + y["proteins"]
return x
6 changes: 5 additions & 1 deletion test/examples/gdc/gdc-convert.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,8 @@ pipelines:
- from: caseObject
- graphBuild:
schema: "{{config.schema}}"
title: Case
title: Case
EdgeFix:
method: test
gpython:
$ref: test.py
3 changes: 3 additions & 0 deletions test/examples/gdc/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
def test(row):
row["TEST"] = "test_string"
return row
45 changes: 45 additions & 0 deletions transform/code_block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package transform

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
)

type CodeBlock struct {
Code string
Ref string
BaseDir string
}

func (cb *CodeBlock) UnmarshalJSON(data []byte) error {
if err := json.Unmarshal(data, &cb.Code); err == nil {
return nil
}
ref := map[string]any{}
if err := json.Unmarshal(data, &ref); err == nil {
if path, ok := ref["$ref"]; ok {
if pathStr, ok := path.(string); ok {
cb.Ref = pathStr
return nil
}
}
}
return fmt.Errorf("unknown code block type")
}

func (cb *CodeBlock) SetBaseDir(path string) {
cb.BaseDir = path
}

func (cb *CodeBlock) String() string {
if cb.Ref != "" {
path := filepath.Join(cb.BaseDir, cb.Ref)
data, err := os.ReadFile(path)
if err == nil {
cb.Code = string(data)
}
}
return cb.Code
}
19 changes: 10 additions & 9 deletions transform/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
)

type FilterStep struct {
Field string `json:"field"`
Value string `json:"value"`
Match string `json:"match"`
Check string `json:"check" jsonschema_description:"How to check value, 'exists' or 'hasValue'"`
Method string `json:"method"`
Python string `json:"python"`
GPython string `json:"gpython"`
Field string `json:"field"`
Value string `json:"value"`
Match string `json:"match"`
Check string `json:"check" jsonschema_description:"How to check value, 'exists' or 'hasValue'"`
Method string `json:"method"`
Python string `json:"python"`
GPython *CodeBlock `json:"gpython"`
}

type filterProcessor struct {
Expand All @@ -34,10 +34,11 @@ func (fs FilterStep) Init(task task.RuntimeTask) (Processor, error) {
log.Printf("Compile Error: %s", err)
}
return &filterProcessor{fs, c, task}, nil
} else if fs.GPython != "" && fs.Method != "" {
} else if fs.GPython != nil && fs.Method != "" {
log.Printf("Starting Filter Map: %s", fs.GPython)
fs.GPython.SetBaseDir(task.BaseDir())
e := evaluate.GetEngine("gpython", task.WorkDir())
c, err := e.Compile(fs.GPython, fs.Method)
c, err := e.Compile(fs.GPython.String(), fs.Method)
if err != nil {
log.Printf("Compile Error: %s", err)
}
Expand Down
11 changes: 6 additions & 5 deletions transform/flat_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
)

type FlatMapStep struct {
Method string `json:"method" jsonschema_description:"Name of function to call"`
Python string `json:"python" jsonschema_description:"Python code to be run"`
GPython string `json:"gpython" jsonschema_description:"Python code to be run using GPython"`
Method string `json:"method" jsonschema_description:"Name of function to call"`
Python string `json:"python" jsonschema_description:"Python code to be run"`
GPython *CodeBlock `json:"gpython" jsonschema_description:"Python code to be run using GPython"`
}

type flatMapProcess struct {
Expand All @@ -30,10 +30,11 @@ func (ms *FlatMapStep) Init(task task.RuntimeTask) (Processor, error) {
log.Printf("Compile Error: %s", err)
}
return &flatMapProcess{ms, c}, nil
} else if ms.GPython != "" {
} else if ms.GPython != nil {
log.Printf("Init Map: %s", ms.GPython)
ms.GPython.SetBaseDir(task.BaseDir())
e := evaluate.GetEngine("gpython", task.WorkDir())
c, err := e.Compile(ms.GPython, ms.Method)
c, err := e.Compile(ms.GPython.String(), ms.Method)
if err != nil {
log.Printf("Compile Error: %s", err)
}
Expand Down
9 changes: 5 additions & 4 deletions transform/graph_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
)

type EdgeFix struct {
Method string `json:"method"`
GPython string `json:"gpython"`
Method string `json:"method"`
GPython *CodeBlock `json:"gpython"`
}

type GraphBuildStep struct {
Expand Down Expand Up @@ -49,10 +49,11 @@ func (ts GraphBuildStep) Init(task task.RuntimeTask) (Processor, error) {

var edgeFix evaluate.Processor
if ts.EdgeFix != nil {
if ts.EdgeFix.GPython != "" {
if ts.EdgeFix.GPython != nil {
ts.EdgeFix.GPython.SetBaseDir(task.BaseDir())
log.Printf("Init Map: %s", ts.EdgeFix.GPython)
e := evaluate.GetEngine("gpython", task.WorkDir())
c, err := e.Compile(ts.EdgeFix.GPython, ts.EdgeFix.Method)
c, err := e.Compile(ts.EdgeFix.GPython.String(), ts.EdgeFix.Method)
if err != nil {
log.Printf("Compile Error: %s", err)
}
Expand Down
11 changes: 6 additions & 5 deletions transform/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
)

type MapStep struct {
Method string `json:"method" jsonschema_description:"Name of function to call"`
Python string `json:"python" jsonschema_description:"Python code to be run"`
GPython string `json:"gpython" jsonschema_description:"Python code to be run using GPython"`
Method string `json:"method" jsonschema_description:"Name of function to call"`
Python string `json:"python" jsonschema_description:"Python code to be run"`
GPython *CodeBlock `json:"gpython" jsonschema_description:"Python code to be run using GPython"`
}

type mapProcess struct {
Expand All @@ -30,10 +30,11 @@ func (ms *MapStep) Init(task task.RuntimeTask) (Processor, error) {
log.Printf("Compile Error: %s", err)
}
return &mapProcess{ms, c}, nil
} else if ms.GPython != "" {
} else if ms.GPython != nil {
log.Printf("Init Map: %s", ms.GPython)
ms.GPython.SetBaseDir(task.BaseDir())
e := evaluate.GetEngine("gpython", task.WorkDir())
c, err := e.Compile(ms.GPython, ms.Method)
c, err := e.Compile(ms.GPython.String(), ms.Method)
if err != nil {
log.Printf("Compile Error: %s", err)
}
Expand Down
7 changes: 4 additions & 3 deletions transform/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type ReduceStep struct {
Field string `json:"field"`
Method string `json:"method"`
Python string `json:"python"`
GPython string `json:"gpython"`
GPython *CodeBlock `json:"gpython"`
InitData *map[string]interface{} `json:"init"`
}

Expand All @@ -31,11 +31,12 @@ func (ms *ReduceStep) Init(t task.RuntimeTask) (Processor, error) {
log.Printf("Compile Error: %s", err)
}
return &reduceProcess{ms, c}, nil
} else if ms.GPython != "" {
} else if ms.GPython != nil {
ms.GPython.SetBaseDir(t.BaseDir())
log.Printf("ReduceInit: %s", ms.InitData)
log.Printf("Reduce: %s", ms.GPython)
e := evaluate.GetEngine("gpython", t.WorkDir())
c, err := e.Compile(ms.GPython, ms.Method)
c, err := e.Compile(ms.GPython.String(), ms.Method)
if err != nil {
log.Printf("Compile Error: %s", err)
}
Expand Down

0 comments on commit 27f0379

Please sign in to comment.