Skip to content

Commit

Permalink
Merge pull request #302 from bmeg/mongo-remap
Browse files Browse the repository at this point in the history
Mongo remap
  • Loading branch information
kellrott authored Feb 17, 2024
2 parents c3798d0 + f52dce8 commit c60a321
Show file tree
Hide file tree
Showing 35 changed files with 736 additions and 577 deletions.
7 changes: 4 additions & 3 deletions conformance/tests/ot_aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def getMinMax(input_data, percent, accuracy=0.15):
if count != len(percents):
errors.append(
"Unexpected number of terms: %d != %d" %
(len(row["buckets"]), len(percents))
(len(res["buckets"]), len(percents))
)

return errors
Expand Down Expand Up @@ -190,11 +190,12 @@ def test_traversal_gid_aggregation(man):
def test_field_aggregation(man):
errors = []

fields = [ "id", 'orbital_period', 'gravity', 'terrain', 'name','climate', 'system', 'diameter', 'rotation_period', 'url', 'population', 'surface_water']
# TODO: find way to get gripper driver to drop id field
fields = [ "id", "_gid", "_label", 'orbital_period', 'gravity', 'terrain', 'name','climate', 'system', 'diameter', 'rotation_period', 'url', 'population', 'surface_water']

G = man.setGraph("swapi")
count = 0
for row in G.query().V().hasLabel("Planet").aggregate(gripql.field("gid-agg", "$._data")):
for row in G.query().V().hasLabel("Planet").aggregate(gripql.field("gid-agg", "$")):
if row["key"] not in fields:
errors.append("unknown field returned: %s" % (row['key']))
if row["value"] != 3:
Expand Down
2 changes: 1 addition & 1 deletion conformance/tests/ot_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def test_fields(man):
}
resp = G.query().V("Character:1").fields(["name"]).execute()
if resp[0] != expected:
errors.append("vertex contains incorrect fields: \nexpected:%s\nresponse:%s" % (expected, resp))
errors.append("""Query 'V("Character:1").fields(["name"])' vertex contains incorrect fields: \nexpected:%s\nresponse:%s""" % (expected, resp))

expected = {
u"gid": u"Character:1",
Expand Down
18 changes: 9 additions & 9 deletions conformance/tests/ot_mark.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ def test_mark_select_label_filter(man):
count += 1
if len(row) != 2:
errors.append("Incorrect number of marks returned")
if row["a"]["gid"] != "Film:1":
if row["a"]["_gid"] != "Film:1":
errors.append("Incorrect vertex returned for 'a': %s" % row["a"])
if row["b"]["label"] not in ["Vehicle", "Starship", "Species", "Planet", "Character"]:
if row["b"]["_label"] not in ["Vehicle", "Starship", "Species", "Planet", "Character"]:
errors.append("Incorrect vertex returned for 'b': %s" % row["b"])

if count != 38:
Expand All @@ -36,11 +36,11 @@ def test_mark_select(man):
count += 1
if len(row) != 3:
errors.append("Incorrect number of marks returned")
if row["a"]["gid"] != "Character:1":
if row["a"]["_gid"] != "Character:1":
errors.append("Incorrect vertex returned for 'a': %s" % row["a"])
if row["a"]["data"]["height"] != 172:
if row["a"]["height"] != 172:
errors.append("Missing data for 'a'")
if row["b"]["label"] not in ["Starship", "Planet", "Species", "Film"]:
if row["b"]["_label"] not in ["Starship", "Planet", "Species", "Film"]:
errors.append("Incorrect vertex returned for 'b': %s" % row["b"])

if count != 64:
Expand All @@ -61,13 +61,13 @@ def test_mark_edge_select(man):
count += 1
if len(row) != 3:
errors.append("Incorrect number of marks returned")
if row["a"]["gid"] != "Film:1":
if row["a"]["_gid"] != "Film:1":
errors.append("Incorrect as selection")
if row["b"]["label"] != "planets":
if row["b"]["_label"] != "planets":
errors.append("Incorrect as edge selection: %s" % row["b"])
if "scene_count" not in row["b"]["data"]:
if "scene_count" not in row["b"]:
errors.append("Data not returned")
if row["c"]["label"] != "Planet":
if row["c"]["_label"] != "Planet":
errors.append("Incorrect element returned")

if count != 3:
Expand Down
8 changes: 4 additions & 4 deletions conformance/tests/ot_repeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,20 @@ def test_set(man):
G = man.setGraph("swapi")

q = G.query().V("Character:1").set("count", 0)
q = q.as_("start").render("$start._data")
q = q.as_("start").render("$start")
for row in q:
if row['count'] != 0:
errors.append("Incorrect increment value")

q = G.query().V("Character:1").set("count", 0).as_("start").out().increment("$start.count")
q = q.render("$start._data")
q = q.render("$start")
for row in q:
if row['count'] != 1:
errors.append("Incorrect increment value")

q = G.query().V("Character:1").set("count", 0).as_("start").out().increment("$start.count")
q = q.increment("$start.count").has(gripql.gt("$start.count", 1.0))
q = q.render("$start._data")
q = q.render("$start")
count = 0
for row in q:
count += 1
Expand All @@ -102,7 +102,7 @@ def test_set(man):
errors.append("Incorrect number of rows returned")

q = G.query().V("Character:1").set("count", 0).increment("count",2).as_("start").out().increment("$start.count")
q = q.render("$start._data")
q = q.render("$start")
for row in q:
if row['count'] != 3:
errors.append("Incorrect increment value")
Expand Down
2 changes: 1 addition & 1 deletion conformance/tests/ot_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def test_replace(man):
errors.append("vertex has unexpected data")

if G.getEdge("edge1")["data"] != {"weight": 5}:
errors.append("edge is missing expected data")
errors.append("edge is missing expected data: %s" % (G.getEdge("edge1")))

return errors

Expand Down
8 changes: 4 additions & 4 deletions engine/core/optimize.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package core

import (
"github.com/bmeg/grip/gdbi/tpath"
"github.com/bmeg/grip/gripql"
"github.com/bmeg/grip/travelerpath"
"github.com/bmeg/grip/util/protoutil"
)

Expand Down Expand Up @@ -47,11 +47,11 @@ func IndexStartOptimize(pipe []*gripql.GraphStatement) []*gripql.GraphStatement
return IndexStartOptimize(newPipe)
}
if cond := s.Has.GetCondition(); cond != nil {
path := travelerpath.GetJSONPath(cond.Key)
path := tpath.NormalizePath(cond.Key)
switch path {
case "$.gid":
case "$_current._gid":
hasIDIdx = append(hasIDIdx, i)
case "$.label":
case "$_current._label":
hasLabelIdx = append(hasLabelIdx, i)
default:
// do nothing
Expand Down
29 changes: 17 additions & 12 deletions engine/core/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/bmeg/grip/engine/logic"
"github.com/bmeg/grip/gdbi"
"github.com/bmeg/grip/gdbi/tpath"
"github.com/bmeg/grip/gripql"
"github.com/bmeg/grip/log"
"github.com/bmeg/grip/util/copy"
Expand Down Expand Up @@ -1053,19 +1054,21 @@ func (agg *aggregate) Process(ctx context.Context, man gdbi.Manager, in gdbi.InP
c++
}
}
sort.Float64s(fieldValues)
min := fieldValues[0]
max := fieldValues[len(fieldValues)-1]

for bucket := math.Floor(min/i) * i; bucket <= max; bucket += i {
var count float64
for _, v := range fieldValues {
if v >= bucket && v < (bucket+i) {
count++
if len(fieldValues) > 0 {
sort.Float64s(fieldValues)
min := fieldValues[0]
max := fieldValues[len(fieldValues)-1]

for bucket := math.Floor(min/i) * i; bucket <= max; bucket += i {
var count float64
for _, v := range fieldValues {
if v >= bucket && v < (bucket+i) {
count++
}
}
//sBucket, _ := structpb.NewValue(bucket)
out <- &gdbi.BaseTraveler{Aggregation: &gdbi.Aggregate{Name: a.Name, Key: bucket, Value: float64(count)}}
}
//sBucket, _ := structpb.NewValue(bucket)
out <- &gdbi.BaseTraveler{Aggregation: &gdbi.Aggregate{Name: a.Name, Key: bucket, Value: float64(count)}}
}
return outErr
})
Expand Down Expand Up @@ -1104,7 +1107,9 @@ func (agg *aggregate) Process(ctx context.Context, man gdbi.Manager, in gdbi.InP
val := gdbi.TravelerPathLookup(t, fa.Field)
if m, ok := val.(map[string]interface{}); ok {
for k := range m {
fieldCounts[k]++
if !tpath.IsGraphField(k) {
fieldCounts[k]++
}
}
}
}
Expand Down
20 changes: 10 additions & 10 deletions engine/core/processors_extra.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
"math"
"strings"

"github.com/bmeg/grip/gdbi/tpath"
"github.com/bmeg/grip/gripql"
"github.com/bmeg/grip/kvi"
"github.com/bmeg/grip/kvindex"
"github.com/bmeg/grip/travelerpath"
"github.com/influxdata/tdigest"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -61,16 +61,16 @@ func (agg *aggregateDisk) Process(ctx context.Context, man gdbi.Manager, in gdbi
kv := man.GetTempKV()
idx := kvindex.NewIndex(kv)

namespace := travelerpath.GetNamespace(tagg.Field)
field := travelerpath.GetJSONPath(tagg.Field)
namespace := tpath.GetNamespace(tagg.Field)
field := tpath.NormalizePath(tagg.Field)
field = strings.TrimPrefix(field, "$.")
idx.AddField(field)

tid := 0
for batch := range aChans[a.Name] {
err := kv.Update(func(tx kvi.KVTransaction) error {
for _, t := range batch {
doc := gdbi.GetDoc(t, namespace)
doc := gdbi.TravelerGetDoc(t, namespace)
err := idx.AddDocTx(tx, fmt.Sprintf("%d", tid), doc)
tid++
if err != nil {
Expand Down Expand Up @@ -107,16 +107,16 @@ func (agg *aggregateDisk) Process(ctx context.Context, man gdbi.Manager, in gdbi
kv := man.GetTempKV()
idx := kvindex.NewIndex(kv)

namespace := travelerpath.GetNamespace(hagg.Field)
field := travelerpath.GetJSONPath(hagg.Field)
namespace := tpath.GetNamespace(hagg.Field)
field := tpath.NormalizePath(hagg.Field)
field = strings.TrimPrefix(field, "$.")
idx.AddField(field)

tid := 0
for batch := range aChans[a.Name] {
err := kv.Update(func(tx kvi.KVTransaction) error {
for _, t := range batch {
doc := gdbi.GetDoc(t, namespace)
doc := gdbi.TravelerGetDoc(t, namespace)
err := idx.AddDocTx(tx, fmt.Sprintf("%d", tid), doc)
tid++
if err != nil {
Expand Down Expand Up @@ -152,16 +152,16 @@ func (agg *aggregateDisk) Process(ctx context.Context, man gdbi.Manager, in gdbi
kv := man.GetTempKV()
idx := kvindex.NewIndex(kv)

namespace := travelerpath.GetNamespace(pagg.Field)
field := travelerpath.GetJSONPath(pagg.Field)
namespace := tpath.GetNamespace(pagg.Field)
field := tpath.NormalizePath(pagg.Field)
field = strings.TrimPrefix(field, "$.")
idx.AddField(field)

tid := 0
for batch := range aChans[a.Name] {
err := kv.Update(func(tx kvi.KVTransaction) error {
for _, t := range batch {
doc := gdbi.GetDoc(t, namespace)
doc := gdbi.TravelerGetDoc(t, namespace)
err := idx.AddDocTx(tx, fmt.Sprintf("%d", tid), doc)
tid++
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions engine/core/statement_compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (

"github.com/bmeg/grip/engine/logic"
"github.com/bmeg/grip/gdbi"
"github.com/bmeg/grip/gdbi/tpath"
"github.com/bmeg/grip/gripql"
"github.com/bmeg/grip/travelerpath"
"github.com/bmeg/grip/util/protoutil"
)

Expand Down Expand Up @@ -166,8 +166,8 @@ func (sc *DefaultStmtCompiler) As(stmt *gripql.GraphStatement_As, ps *gdbi.State
if err := gripql.ValidateFieldName(stmt.As); err != nil {
return nil, fmt.Errorf(`"mark" statement invalid; %v`, err)
}
if stmt.As == travelerpath.Current {
return nil, fmt.Errorf(`"mark" statement invalid; uses reserved name %s`, travelerpath.Current)
if stmt.As == tpath.CURRENT {
return nil, fmt.Errorf(`"mark" statement invalid; uses reserved name %s`, tpath.CURRENT)
}
ps.MarkTypes[stmt.As] = ps.LastType
return &Marker{stmt.As}, nil
Expand Down
3 changes: 3 additions & 0 deletions engine/logic/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ import (
func MatchesCondition(trav gdbi.Traveler, cond *gripql.HasCondition) bool {
var val interface{}
var condVal interface{}

val = gdbi.TravelerPathLookup(trav, cond.Key)
condVal = cond.Value.AsInterface()

log.Debugf("match: %s %s %s", condVal, val, cond.Key)

switch cond.Condition {
case gripql.Condition_EQ:
return reflect.DeepEqual(val, condVal)
Expand Down
Loading

0 comments on commit c60a321

Please sign in to comment.