Skip to content

Commit

Permalink
Add a minimum multihash digest length check (#708)
Browse files Browse the repository at this point in the history
* Add a minimum multihash digest length check

This is necessary for value stores (e.g. storethehash) that require a minimum key length. Additionally, the minimum multihash digest length can be configured in the indexer config, should requiring a some minimum size be desired.

* Update version to v0.4.21 so that it gets back in sync with the release version
* Increase shutdown time limit
  • Loading branch information
gammazero authored Aug 26, 2022
1 parent aa42d98 commit 5e35beb
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 19 deletions.
36 changes: 26 additions & 10 deletions command/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,18 @@ func daemonCommand(cctx *cli.Context) error {
}

// Create a valuestore of the configured type.
valueStore, err := createValueStore(cctx.Context, cfg.Indexer)
valueStore, minKeyLen, err := createValueStore(cctx.Context, cfg.Indexer)
if err != nil {
return err
}
log.Info("Valuestore initialized")

// If the value store requires a minimum key length, make sure the ingester
// if configured with at least the minimum.
if minKeyLen > cfg.Ingest.MinimumKeyLength {
cfg.Ingest.MinimumKeyLength = minKeyLen
}

// Create result cache
var resultCache cache.Interface
cacheSize := int(cctx.Int64("cachesize"))
Expand Down Expand Up @@ -467,15 +473,17 @@ func fileChanged(filePath string, modTime time.Time) (time.Time, bool, error) {
return modTime, false, nil
}

func createValueStore(ctx context.Context, cfgIndexer config.Indexer) (indexer.Interface, error) {
func createValueStore(ctx context.Context, cfgIndexer config.Indexer) (indexer.Interface, int, error) {
const sthMinKeyLen = 4

dir, err := config.Path("", cfgIndexer.ValueStoreDir)
if err != nil {
return nil, err
return nil, 0, err
}
log.Infow("Valuestore initializing/opening", "type", cfgIndexer.ValueStoreType, "path", dir)

if err = checkWritable(dir); err != nil {
return nil, err
return nil, 0, err
}

var vcodec indexer.ValueCodec
Expand All @@ -485,26 +493,34 @@ func createValueStore(ctx context.Context, cfgIndexer config.Indexer) (indexer.I
case vstoreBinaryCodec:
vcodec = indexer.BinaryValueCodec{}
default:
return nil, fmt.Errorf("unrecognized value store codec: %s", cfgIndexer.ValueStoreCodec)
return nil, 0, fmt.Errorf("unrecognized value store codec: %s", cfgIndexer.ValueStoreCodec)
}

var vs indexer.Interface
var minKeyLen int

switch cfgIndexer.ValueStoreType {
case vstoreStorethehash:
return storethehash.New(
vs, err = storethehash.New(
ctx,
dir,
vcodec,
sth.GCInterval(time.Duration(cfgIndexer.GCInterval)),
sth.GCTimeLimit(time.Duration(cfgIndexer.GCTimeLimit)),
sth.IndexBitSize(cfgIndexer.STHBits),
)
minKeyLen = sthMinKeyLen
case vstorePogreb:
return pogreb.New(dir, vcodec)
vs, err = pogreb.New(dir, vcodec)
case vstoreMemory:
return memory.New(), nil
vs, err = memory.New(), nil
default:
err = fmt.Errorf("unrecognized store type: %s", cfgIndexer.ValueStoreType)
}

return nil, fmt.Errorf("unrecognized store type: %s", cfgIndexer.ValueStoreType)
if err != nil {
return nil, 0, err
}
return vs, minKeyLen, nil
}

func setLoggingConfig(cfgLogging config.Logging) error {
Expand Down
4 changes: 4 additions & 0 deletions config/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type Ingest struct {
// IngestWorkerCount sets how many ingest worker goroutines to spawn. This
// controls how many concurrent ingest from different providers we can handle.
IngestWorkerCount int
// MinimumKeyLengt causes any multihash, that has a digest length less than
// this, to be ignored. If using storethehash, this value is automatically
// set to 4 if it was configured to be anything less.
MinimumKeyLength int
// PubSubTopic sets the topic name to which to subscribe for ingestion
// announcements.
PubSubTopic string
Expand Down
3 changes: 2 additions & 1 deletion deploy/manifests/base/storetheindex-single/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@
"CacheSize": 300000,
"ConfigCheckInterval": "30s",
"GCInterval": "30m0s",
"ShutdownTimeout": "15s",
"GCTimeLimit": "5m",
"ShutdownTimeout": "15m",
"ValueStoreDir": "/data/valuestore",
"ValueStoreType": "sth"
},
Expand Down
3 changes: 2 additions & 1 deletion deploy/manifests/base/storetheindex/indexer-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ data:
"CacheSize": 300000,
"ConfigCheckInterval": "30s",
"GCInterval": "30m0s",
"ShutdownTimeout": "15s",
"GCTimeLimit":, "5m",
"ShutdownTimeout": "15m",
"ValueStoreDir": "/data/valuestore",
"ValueStoreType": "sth"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ data:
"CacheSize": 300000,
"ConfigCheckInterval": "30s",
"GCInterval": "30m0s",
"ShutdownTimeout": "15s",
"GCTimeLimit": "5m",
"ShutdownTimeout": "15m",
"ValueStoreDir": "/data/valuestore",
"ValueStoreType": "sth",
"STHBits": 30
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ data:
"CacheSize": 300000,
"ConfigCheckInterval": "30s",
"GCInterval": "30m0s",
"ShutdownTimeout": "15s",
"GCTimeLimit": "5m",
"ShutdownTimeout": "15m",
"ValueStoreDir": "/data/valuestore",
"ValueStoreType": "sth"
},
Expand Down
8 changes: 5 additions & 3 deletions internal/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ type Ingester struct {
entriesSel datamodel.Node
reg *registry.Registry

cfg config.Ingest

// inEvents is used to send a adProcessedEvent to the distributeEvents
// goroutine, when an advertisement in marked complete or err'd.
inEvents chan adProcessedEvent
Expand Down Expand Up @@ -135,6 +133,9 @@ type Ingester struct {

rateLimit rate.Limit
rateMutex sync.Mutex

// Multihash minimum length
minKeyLen int
}

// NewIngester creates a new Ingester that uses a go-legs Subscriber to handle
Expand All @@ -151,7 +152,6 @@ func NewIngester(cfg config.Ingest, h host.Host, idxr indexer.Interface, reg *re
syncTimeout: time.Duration(cfg.SyncTimeout),
entriesSel: Selectors.EntriesWithLimit(recursionLimit(cfg.EntriesDepthLimit)),
reg: reg,
cfg: cfg,
inEvents: make(chan adProcessedEvent, 1),

closePendingSyncs: make(chan struct{}),
Expand All @@ -160,6 +160,8 @@ func NewIngester(cfg config.Ingest, h host.Host, idxr indexer.Interface, reg *re
providerAdChainStaging: make(map[peer.ID]*atomic.Value),
toWorkers: NewPriorityQueue(),
closeWorkers: make(chan struct{}),

minKeyLen: cfg.MinimumKeyLength,
}

var err error
Expand Down
7 changes: 6 additions & 1 deletion internal/ingest/linksystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,14 +441,19 @@ func (ing *Ingester) indexAdMultihashes(ad schema.Advertisement, mhs []multihash
// Iterate over all entries and ingest (or remove) them.
var count, badMultihashCount int
for _, entry := range mhs {
if _, err = multihash.Decode(entry); err != nil {
decoded, err := multihash.Decode(entry)
if err != nil {
// Only log first error to prevent log flooding.
if badMultihashCount == 0 {
log.Warnw("Ignoring bad multihash", "err", err)
}
badMultihashCount++
continue
}
if len(decoded.Digest) < ing.minKeyLen {
log.Warnw("Multihash digest too short, ignoring", "digestSize", len(decoded.Digest))
continue
}

batch = append(batch, entry)

Expand Down
2 changes: 1 addition & 1 deletion version.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"version": "v0.4.19"
"version": "v0.4.21"
}

0 comments on commit 5e35beb

Please sign in to comment.