Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pivot Operation #312

Merged
merged 8 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions conformance/graphs/fhir.edges
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"from":"patient_a", "to":"observation_a1", "label":"patient_observation"}
{"from":"patient_a", "to":"observation_a2", "label":"patient_observation"}
{"from":"patient_a", "to":"observation_a3", "label":"patient_observation"}
{"from":"patient_b", "to":"observation_b1", "label":"patient_observation"}
{"from":"patient_b", "to":"observation_b2", "label":"patient_observation"}
{"from":"patient_b", "to":"observation_b3", "label":"patient_observation"}
8 changes: 8 additions & 0 deletions conformance/graphs/fhir.vertices
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{"gid":"patient_a", "label":"Patient", "data":{"name":"Alice"}}
{"gid":"patient_b", "label":"Patient", "data":{"name":"Bob"}}
{"gid":"observation_a1", "label":"Observation", "data":{"key":"age", "value":36}}
{"gid":"observation_a2", "label":"Observation", "data":{"key":"sex", "value":"Female"}}
{"gid":"observation_a3", "label":"Observation", "data":{"key":"blood_pressure", "value":"111/78"}}
{"gid":"observation_b1", "label":"Observation", "data":{"key":"age", "value":42}}
{"gid":"observation_b2", "label":"Observation", "data":{"key":"sex", "value":"Male"}}
{"gid":"observation_b3", "label":"Observation", "data":{"key":"blood_pressure", "value":"120/80"}}
2 changes: 1 addition & 1 deletion conformance/tests/ot_aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def test_traversal_gid_aggregation(man):
return errors

if planet_agg_map[row["key"]] != row["value"]:
errors.append("Incorrect bucket count returned: %s" % res)
errors.append("Incorrect bucket count returned: %s" % row)

if count != 2:
errors.append(
Expand Down
14 changes: 14 additions & 0 deletions conformance/tests/ot_pivot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from __future__ import absolute_import

import gripql


def test_pivot(man):
errors = []
G = man.setGraph("fhir")

for row in G.query().V().hasLabel("Patient").as_("a").out("patient_observation").pivot("$a._gid", "$.key", "$.value" ):
print(row)

return errors

69 changes: 69 additions & 0 deletions engine/core/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package core
import (
"bytes"
"context"
"encoding/json"
"fmt"
"math"
"reflect"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/bmeg/grip/gdbi"
"github.com/bmeg/grip/gdbi/tpath"
"github.com/bmeg/grip/gripql"
"github.com/bmeg/grip/kvi"
"github.com/bmeg/grip/log"
"github.com/bmeg/grip/util/copy"
"github.com/influxdata/tdigest"
Expand Down Expand Up @@ -433,6 +435,73 @@ func (r *Render) Process(ctx context.Context, man gdbi.Manager, in gdbi.InPipe,

////////////////////////////////////////////////////////////////////////////////

// Render takes current state and renders into requested structure
type Pivot struct {
Stmt *gripql.PivotStep
}

// Process runs the pivot processor
func (r *Pivot) Process(ctx context.Context, man gdbi.Manager, in gdbi.InPipe, out gdbi.OutPipe) context.Context {
go func() {
defer close(out)
kv := man.GetTempKV()
kv.BulkWrite(func(bl kvi.KVBulkWrite) error {
for t := range in {
if t.IsSignal() {
out <- t
continue
}
//fmt.Printf("Checking %#v\n", t.GetCurrent())
id := gdbi.TravelerPathLookup(t, r.Stmt.Id)
if idStr, ok := id.(string); ok {
field := gdbi.TravelerPathLookup(t, r.Stmt.Field)
if fieldStr, ok := field.(string); ok {
value := gdbi.TravelerPathLookup(t, r.Stmt.Value)
if v, err := json.Marshal(value); err == nil {
key := bytes.Join([][]byte{[]byte(idStr), []byte(fieldStr)}, []byte{0})
bl.Set(key, v)
}
}
}
}
return nil
})
kv.View(func(it kvi.KVIterator) error {
it.Seek([]byte{0})
lastKey := ""
curDict := map[string]any{}
for it.Seek([]byte{0}); it.Valid(); it.Next() {
tmp := bytes.Split(it.Key(), []byte{0})
curKey := string(tmp[0])
curField := string(tmp[1])
if lastKey == "" {
lastKey = curKey
}
var curData any
value, _ := it.Value()
json.Unmarshal(value, &curData)
if lastKey != curKey {
curDict["_id"] = curKey
out <- &gdbi.BaseTraveler{Render: curDict}
curDict = map[string]any{}
curDict[curField] = curData
lastKey = curKey
} else {
curDict[curField] = curData
}
}
if lastKey != "" {
curDict["_id"] = lastKey
out <- &gdbi.BaseTraveler{Render: curDict}
}
return nil
})
}()
return ctx
}

////////////////////////////////////////////////////////////////////////////////

// Path tells system to return path data
type Path struct {
Template interface{} //this isn't really used yet.
Expand Down
4 changes: 4 additions & 0 deletions engine/core/statement_compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ func (sc *DefaultStmtCompiler) Render(stmt *gripql.GraphStatement_Render, ps *gd
return &Render{stmt.Render.AsInterface()}, nil
}

func (sc *DefaultStmtCompiler) Pivot(stmt *gripql.GraphStatement_Pivot, ps *gdbi.State) (gdbi.Processor, error) {
return &Pivot{stmt.Pivot}, nil
}

func (sc *DefaultStmtCompiler) Path(stmt *gripql.GraphStatement_Path, ps *gdbi.State) (gdbi.Processor, error) {
return &Path{stmt.Path.AsSlice()}, nil
}
Expand Down
2 changes: 2 additions & 0 deletions gdbi/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type StatementCompiler interface {
Select(gs *gripql.GraphStatement_Select, ps *State) (Processor, error)

Render(gs *gripql.GraphStatement_Render, ps *State) (Processor, error)
Pivot(gs *gripql.GraphStatement_Pivot, ps *State) (Processor, error)

Path(gs *gripql.GraphStatement_Path, ps *State) (Processor, error)
Unwind(gs *gripql.GraphStatement_Unwind, ps *State) (Processor, error)
Fields(gs *gripql.GraphStatement_Fields, ps *State) (Processor, error)
Expand Down
8 changes: 8 additions & 0 deletions gdbi/statement_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,14 @@ func StatementProcessor(
ps.LastType = RenderData
return out, err

case *gripql.GraphStatement_Pivot:
if ps.LastType != VertexData && ps.LastType != EdgeData {
return nil, fmt.Errorf(`"pivot" statement is only valid for edge or vertex types not: %s`, ps.LastType.String())
}
out, err := sc.Pivot(stmt, ps)
ps.LastType = RenderData
return out, err

case *gripql.GraphStatement_Path:
if ps.LastType != VertexData && ps.LastType != EdgeData {
return nil, fmt.Errorf(`"path" statement is only valid for edge or vertex types not: %s`, ps.LastType.String())
Expand Down
Loading
Loading