Skip to content

Commit

Permalink
Merge pull request #326 from mimiro-io/chore/more-testcases-compaction
Browse files Browse the repository at this point in the history
Chore/more testcases compaction
  • Loading branch information
rompetroll authored Sep 18, 2024
2 parents 0dcadd0 + 3bc4ccb commit d231903
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 497 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.21 AS builder_src
FROM golang:1.23 AS builder_src

COPY jemalloc-install.sh .
RUN apt-get update -y
Expand Down Expand Up @@ -48,4 +48,4 @@ EXPOSE 8080 40000

ENV GOMAXPROCS=128

CMD ["./server"]
CMD ["./server"]
11 changes: 5 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/mimiro-io/datahub

go 1.21
go 1.23.1

require (
github.com/DataDog/datadog-go/v5 v5.5.0
Expand All @@ -26,20 +26,21 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
)

require github.com/mimiro-io/entity-graph-data-model v0.7.7
require (
github.com/dgraph-io/ristretto v0.1.1
github.com/mimiro-io/entity-graph-data-model v0.7.7
)

require (
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dlclark/regexp2 v1.11.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-sourcemap/sourcemap v2.1.4+incompatible // indirect
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
Expand Down Expand Up @@ -82,13 +83,11 @@ require (
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.20.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/protobuf v1.34.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)
438 changes: 2 additions & 436 deletions go.sum

Large diffs are not rendered by default.

27 changes: 15 additions & 12 deletions internal/service/dataset/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"time"

"github.com/dgraph-io/badger/v4"
"github.com/mimiro-io/datahub/internal/server"
"github.com/mimiro-io/datahub/internal/service/store"
"github.com/mimiro-io/datahub/internal/service/types"
"go.uber.org/zap"
"time"
)

type CompactionWorker struct {
Expand All @@ -25,6 +26,7 @@ func NewCompactor(store *server.Store, dsm *server.DsManager, logger *zap.Sugare
logger: logger.Named("compaction-worker"),
}
}

func (c *CompactionWorker) CompactAsync(datasetID string, strategy CompactionStrategy) error {
if !c.running {
c.running = true
Expand Down Expand Up @@ -53,10 +55,10 @@ func (c *CompactionWorker) compact(datasetID string, strategy CompactionStrategy
defer txn.Discard()

// 1. go through these:
//datasetEntitiesLatestVersionKey := make([]byte, 14)
//binary.BigEndian.PutUint16(datasetEntitiesLatestVersionKey, DatasetLatestEntities)
//binary.BigEndian.PutUint32(datasetEntitiesLatestVersionKey[2:], ds.InternalID)
//binary.BigEndian.PutUint64(datasetEntitiesLatestVersionKey[6:], rid)
// datasetEntitiesLatestVersionKey := make([]byte, 14)
// binary.BigEndian.PutUint16(datasetEntitiesLatestVersionKey, DatasetLatestEntities)
// binary.BigEndian.PutUint32(datasetEntitiesLatestVersionKey[2:], ds.InternalID)
// binary.BigEndian.PutUint64(datasetEntitiesLatestVersionKey[6:], rid)
seekLatestChanges := store.SeekLatestChanges(dsId)
opts := badger.DefaultIteratorOptions
opts.Prefix = seekLatestChanges
Expand All @@ -76,23 +78,24 @@ func (c *CompactionWorker) compact(datasetID string, strategy CompactionStrategy
return err
}
cnt++
//keysToBeDeleted = append(keysToBeDeleted, newKeys...)
// keysToBeDeleted = append(keysToBeDeleted, newKeys...)
if time.Since(ts) > 15*time.Second {
ts = time.Now()
c.logger.Infof("compacting dataset %v, processed %v entities, removed keys so far: %v", datasetID, cnt, strategy.stats())
}

}
_, err4 := flushDeletes(c.bs, ops, true, strategy)
if err4 != nil {
return err4
_, err := flushDeletes(c.bs, ops, true, strategy)
if err != nil {
return err
}
c.logger.Infof("compacted dataset %s in %v, removed keys: %v", datasetID, time.Since(startTs), strategy.stats())
return nil
}

func (c *CompactionWorker) forEntity(dsId types.InternalDatasetID, internalEntityID types.InternalID, txn *badger.Txn,
strategy CompactionStrategy, ops *compactionInstruction) error {
strategy CompactionStrategy, ops *compactionInstruction,
) error {
entityLocatorPrefixBuffer := store.SeekEntityChanges(dsId, internalEntityID)
opts1 := badger.DefaultIteratorOptions
opts1.PrefetchValues = false
Expand Down Expand Up @@ -174,14 +177,14 @@ func flushDeletes(bs store.BadgerStore, ops *compactionInstruction, finalFlush b
}
}
}
//fmt.Println("deleted", len(all), "keys")
// fmt.Println("deleted", len(all), "keys")
for i, key := range ops.RewriteKeys {
err2 := txn.Set(key, ops.RewriteValues[i])
if err2 != nil {
return err2
}
}
//fmt.Println("rewritten", len(ops.RewriteKeys), "keys")
// fmt.Println("rewritten", len(ops.RewriteKeys), "keys")
return nil
})
if err != nil {
Expand Down
11 changes: 6 additions & 5 deletions internal/service/dataset/compact_stategy_deduplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package dataset
import (
"bytes"
"encoding/binary"
"reflect"

"github.com/dgraph-io/badger/v4"
"github.com/mimiro-io/datahub/internal/server"
"github.com/mimiro-io/datahub/internal/service/entity"
"go.uber.org/zap"
"reflect"
)

type deduplicationStrategy struct {
Expand Down Expand Up @@ -50,7 +51,8 @@ func (d *deduplicationStrategy) eval(
jsonKey []byte,
isFirstVersion bool,
isLatestVersion bool,
txn *badger.Txn) (*compactionInstruction, error) {
txn *badger.Txn,
) (*compactionInstruction, error) {
// if this is the first version of the entity, we just need to keep it
if isFirstVersion {
d.prevJsonKey = jsonKey
Expand All @@ -76,8 +78,8 @@ func (d *deduplicationStrategy) eval(
}

// 3.delete change log entry (need to iterate over all change versions, match value with json key)
//ts := time.Now()
//del = append(del, d.findChangeLogKeys(jsonKey, txn, false)...)
// ts := time.Now()
// del = append(del, d.findChangeLogKeys(jsonKey, txn, false)...)
d.changeBuffer[[24]byte(jsonKey)] = 1
d.counts["changeLog"]++
//fmt.Printf("findChangeLogKeys took: %v\n", time.Since(ts))
Expand Down Expand Up @@ -113,7 +115,6 @@ func (d *deduplicationStrategy) eval(
if len(refsToDel) == len(refsToDelPrev) {
identical = true
for i, ref := range refsToDel {

if !bytes.Equal(ref, refsToDelPrev[i]) {
identical = false
break
Expand Down
27 changes: 15 additions & 12 deletions internal/service/dataset/compact_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dataset
import (
"encoding/binary"
"fmt"

"github.com/dgraph-io/badger/v4"
"github.com/mimiro-io/datahub/internal/server"
"github.com/mimiro-io/datahub/internal/service/entity"
Expand Down Expand Up @@ -62,26 +63,28 @@ func (i *compactionInstruction) reset() {
i.RewriteValues = make([][]byte, 0)
}

type recordedStrategy struct{}
type maxVersionStrategy struct{}

var (
DeduplicationStrategy = func() CompactionStrategy {
return &deduplicationStrategy{counts: make(map[string]int), changeBuffer: make(map[[24]byte]byte)}
}
//RecordedStrategy = func() CompactionStrategy { return &recordedStrategy{} }
//MaxVersionStrategy = func() CompactionStrategy { return &maxVersionStrategy{} }
type (
recordedStrategy struct{}
maxVersionStrategy struct{}
)

var DeduplicationStrategy = func() CompactionStrategy {
return &deduplicationStrategy{counts: make(map[string]int), changeBuffer: make(map[[24]byte]byte)}
}

// RecordedStrategy = func() CompactionStrategy { return &recordedStrategy{} }
// MaxVersionStrategy = func() CompactionStrategy { return &maxVersionStrategy{} }

func mkLatestKey(jsonKey []byte) []byte {
//2:6, dataset id
//6:14 entity id)
// 2:6, dataset id
// 6:14 entity id)
datasetEntitiesLatestVersionKey := make([]byte, 14)
binary.BigEndian.PutUint16(datasetEntitiesLatestVersionKey, server.DatasetLatestEntities)
copy(datasetEntitiesLatestVersionKey[2:], jsonKey[10:14])
copy(datasetEntitiesLatestVersionKey[6:], jsonKey[2:10])
return datasetEntitiesLatestVersionKey
}

func findRefs(ent *server.Entity, jsonKey []byte, txn *badger.Txn, lookup entity.Lookup) ([][]byte, error) {
refsToDel := make([][]byte, 0)
for k, stringOrArrayValue := range ent.References {
Expand Down Expand Up @@ -121,7 +124,7 @@ func processRefs(
return nil, er
}

//fmt.Println("building refs for entity", ent.InternalID, "pred", k, "related", relatedid, "recorded", ent.Recorded)
// fmt.Println("building refs for entity", ent.InternalID, "pred", k, "related", relatedid, "recorded", ent.Recorded)
// delete outgoing references
// 0:2: outgoing ref index, uint16
// 2:10: this entity id, uint64
Expand Down
Loading

0 comments on commit d231903

Please sign in to comment.