Skip to content

Commit

Permalink
Loading ref entries in properties, adding field processing option t…
Browse files Browse the repository at this point in the history
…o data pipeline
  • Loading branch information
kellrott committed May 15, 2019
1 parent 7981051 commit 4ba4b27
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 5 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "examples/bmeg-dictionary"]
path = examples/bmeg-dictionary
url = https://github.com/bmeg/bmeg-dictionary.git
1 change: 1 addition & 0 deletions examples/bmeg-dictionary
Submodule bmeg-dictionary added at 915b3d
58 changes: 53 additions & 5 deletions manager/tabular_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"regexp"
"strconv"

"encoding/json"

"crypto/sha1"
"compress/gzip"

Expand Down Expand Up @@ -88,6 +90,12 @@ type ProjectStep struct {
Mapping map[string]string `json:"mapping"`
}

type FieldProcessStep struct {
Column string `json:"col"`
Steps TransformPipe `json:"steps"`
inChan chan map[string]interface{}
}

type DebugStep struct {}

type TransformStep struct {
Expand All @@ -103,6 +111,7 @@ type TransformStep struct {
Project *ProjectStep `json:"project"`
Map *MapStep `json:"map"`
Reduce *ReduceStep `json:"reduce"`
FieldProcess *FieldProcessStep `json:"fieldProcess"`
}

type TransformPipe []TransformStep
Expand Down Expand Up @@ -249,12 +258,18 @@ func (ts ObjectCreateStep) Run(i map[string]interface{}, task *Task) map[string]
}
}
} else {
log.Printf("Error: %s : %s", err, i)
s, _ := json.Marshal(i)
log.Printf("Error: %s on data %s", err, s)
}
return i
}


func (fs *FilterStep) Start(task *Task, wg *sync.WaitGroup) {
fs.inChan = make(chan map[string]interface{}, 100)
fs.Steps.Start(fs.inChan, task, wg)
}

func (fs FilterStep) Run(i map[string]interface{}, task *Task) map[string]interface{} {
col, _ := evaluate.ExpressionString(fs.Column, task.Inputs, i)
match, _ := evaluate.ExpressionString(fs.Match, task.Inputs, i)
Expand All @@ -264,6 +279,33 @@ func (fs FilterStep) Run(i map[string]interface{}, task *Task) map[string]interf
return i
}

func (fs FilterStep) Close() {
close(fs.inChan)
}


func (fs *FieldProcessStep) Start(task *Task, wg *sync.WaitGroup) {
fs.inChan = make(chan map[string]interface{}, 100)
fs.Steps.Start(fs.inChan, task, wg)
}


func (fs FieldProcessStep) Run(i map[string]interface{}, task *Task) map[string]interface{} {
if v, ok := i[fs.Column]; ok {
if vList, ok := v.([]interface{}); ok {
for _, l := range vList {
if m, ok := l.(map[string]interface{}); ok {
fs.inChan <- m
}
}
}
}
return i
}

func (fs FieldProcessStep) Close() {
close(fs.inChan)
}

func (re RegexReplaceStep) Run(i map[string]interface{}, task *Task) map[string]interface{} {
col, _ := evaluate.ExpressionString(re.Column, task.Inputs, i)
Expand Down Expand Up @@ -311,7 +353,8 @@ func (pr ProjectStep) Run(i map[string]interface{}, task *Task) map[string]inter
}

func (db DebugStep) Run(i map[string]interface{}, task *Task) map[string]interface{} {
log.Printf("DebugData: %s", i)
s, _ := json.Marshal(i)
log.Printf("DebugData: %s", s)
return i
}

Expand All @@ -338,12 +381,17 @@ func (ts TransformStep) Start(in chan map[string]interface{},
out <- ts.EdgeCreate.Run(i, task)
}
} else if ts.Filter != nil {
ts.Filter.inChan = make(chan map[string]interface{}, 100)
ts.Filter.Steps.Start(ts.Filter.inChan, task, wg)
defer close(ts.Filter.inChan)
ts.Filter.Start(task, wg)
for i := range in {
out <- ts.Filter.Run(i, task)
}
ts.Filter.Close()
} else if ts.FieldProcess != nil {
ts.FieldProcess.Start(task, wg)
for i := range in {
out <- ts.FieldProcess.Run(i, task)
}
ts.FieldProcess.Close()
} else if ts.Debug != nil {
for i := range in {
out <- ts.Debug.Run(i, task)
Expand Down
1 change: 1 addition & 0 deletions schema/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (s Schema) Validate(data map[string]interface{}) (map[string]interface{}, e

for _, r := range s.Required {
if _, ok := out[r]; !ok {
log.Printf("Not Found %s in %s ", r, data)
return nil, fmt.Errorf("Required field '%s' in '%s' not found", r, s.Id)
}
}
Expand Down
29 changes: 29 additions & 0 deletions schema/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package schema
import (
"log"
"fmt"
"strings"
"io/ioutil"
"path/filepath"
"encoding/json"
Expand Down Expand Up @@ -119,5 +120,33 @@ func LoadSchema(path string) (Schema, error) {
if err := yaml.Unmarshal(raw, &s); err != nil {
return Schema{}, fmt.Errorf("failed to read data at path %s: \n%v", path, err)
}
if ref, ok := s.Props["$ref"]; ok {
//log.Printf("refpath: %s", ref.Value)
vs := strings.Split(ref.Value, "#")
dir := filepath.Dir(path)
pPath := filepath.Join(dir, vs[0])
//log.Printf("ref file: %s", pPath)

raw, err := ioutil.ReadFile(pPath)
if err != nil {
return Schema{}, fmt.Errorf("failed to read data at path %s: \n%v", pPath, err)
}
pProps := map[string]interface{}{}
if err := yaml.Unmarshal(raw, &pProps); err != nil {
return Schema{}, fmt.Errorf("failed to file reference at path %s: \n%v", pPath, err)
}
fPath := vs[1]
fName := fPath[1:len(fPath)]
if fData, ok := pProps[fName]; ok {
sData, _ := yaml.Marshal(fData)
np := map[string]PropertyElement{}
if err := yaml.Unmarshal(sData, &np); err != nil {
return s, err
}
for k, v := range np {
s.Props[k] = v
}
}
}
return s, nil
}

0 comments on commit 4ba4b27

Please sign in to comment.