Skip to content

Commit

Permalink
Merge pull request #160 from mimiro-io/fix/empty-depencency-datasets
Browse files Browse the repository at this point in the history
don't panic if dependency dataset in multisource yields zero changes
  • Loading branch information
rompetroll authored Sep 13, 2022
2 parents 8c89dd5 + 897ee76 commit 53c26e0
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 15 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ require (
go.uber.org/dig v1.14.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/net v0.0.0-20220909164309-bea034e7d591 // indirect
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 // indirect
google.golang.org/protobuf v1.28.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,8 @@ golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 h1:NWy5+hlRbC7HK+PmcXVUmW1IMyFce7to56IUvhUFm7Y=
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220909164309-bea034e7d591 h1:D0B/7al0LLrVC8aWF4+oxpv/m8bc7ViFfVS8/gXGdqI=
golang.org/x/net v0.0.0-20220909164309-bea034e7d591/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -479,8 +479,8 @@ golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 h1:WIoqL4EROvwiPdUtaip4VcDdpZ4kha7wBWZrbVKCIZg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
22 changes: 13 additions & 9 deletions internal/jobs/source/multi_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
"github.com/mimiro-io/datahub/internal/server"
)

/**
MultiSource only operates on changes, but accepts calls to FullsyncStart to avoid processing dependencies during initial load
/*
*
MultiSource only operates on changes, but accepts calls to FullsyncStart to avoid processing dependencies during initial load
*/
type Join struct {
Dataset string
Expand Down Expand Up @@ -187,16 +189,18 @@ func (multiSource *MultiSource) processDependency(dep Dependency, d *MultiDatase
if err != nil {
return err
}
if len(changes.Entities) > 0 {

timestamp := int64(changes.Entities[0].Recorded)
timestamp := int64(changes.Entities[0].Recorded)

prevRelatedEntities, err := multiSource.Store.GetManyRelatedEntitiesAtTime(
uris, join.Predicate, join.Inverse, nil, timestamp)
if err != nil {
return fmt.Errorf("previous GetManyRelatedEntities failed for Join %+v at timestamp %v, %w", join, timestamp, err)
}
prevRelatedEntities, err := multiSource.Store.GetManyRelatedEntitiesAtTime(
uris, join.Predicate, join.Inverse, nil, timestamp)
if err != nil {
return fmt.Errorf("previous GetManyRelatedEntities failed for Join %+v at timestamp %v, %w", join, timestamp, err)
}

relatedEntities = append(relatedEntities, prevRelatedEntities...)
relatedEntities = append(relatedEntities, prevRelatedEntities...)
}
}

dedupCache := map[uint64]bool{}
Expand Down
42 changes: 42 additions & 0 deletions internal/jobs/source/multi_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,48 @@ func TestMultiSource(t *testing.T) {
//Bob was emitted enchanged. up to transform to do something with bob and dependency that triggered bob's emission
g.Assert(recordedEntities[0].Properties["name"]).Eql("Bob")
})
g.It("should support empty dependency dastasets", func() {
_, _ = createTestDataset("people", []string{"Bob", "Alice"}, nil, dsm, g, store)
_, _ = createTestDataset("employment", nil, nil, dsm, g, store)

testSource := source.MultiSource{DatasetName: "people", Store: store, DatasetManager: dsm}
srcJSON := `{ "Type" : "MultiSource", "Name" : "people", "Dependencies": [ {
"dataset": "employment",
"joins": [ { "dataset": "people", "predicate": "http://people/employment", "inverse": false } ] } ] }`

srcConfig := map[string]interface{}{}
_ = json.Unmarshal([]byte(srcJSON), &srcConfig)
_ = testSource.ParseDependencies(srcConfig["Dependencies"])

//fullsync
var recordedEntities []server.Entity
token := &source.MultiDatasetContinuation{}
var lastToken source.DatasetContinuation
testSource.StartFullSync()
err := testSource.ReadEntities(token, 1000, func(entities []*server.Entity, token source.DatasetContinuation) error {
lastToken = token
for _, e := range entities {
recordedEntities = append(recordedEntities, *e)
}
return nil
})
g.Assert(err).IsNil()
testSource.EndFullSync()

g.Assert(len(recordedEntities)).Eql(2)

// run inc
recordedEntities = []server.Entity{}
err = testSource.ReadEntities(lastToken, 1000, func(entities []*server.Entity, token source.DatasetContinuation) error {
lastToken = token
for _, e := range entities {
recordedEntities = append(recordedEntities, *e)
}
return nil
})
g.Assert(err).IsNil()
g.Assert(len(recordedEntities)).Eql(0)
})

g.It("should emit main enitity if inverse multi hop dependency is removed", func() {
// people <- employment <- salary
Expand Down

0 comments on commit 53c26e0

Please sign in to comment.