Skip to content

Commit

Permalink
remove db migration (#192)
Browse files Browse the repository at this point in the history
* migration

* update logging

* consistent logging
  • Loading branch information
decentralgabe authored Apr 20, 2024
1 parent 2750188 commit d703405
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 74 deletions.
24 changes: 12 additions & 12 deletions impl/pkg/service/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (s *DHTService) PublishDHT(ctx context.Context, id string, record dht.BEP44
if err = s.cache.Set(id, recordBytes); err != nil {
return err
}
logrus.WithContext(ctx).WithField("record", id).Debug("added dht record to cache and db")
logrus.WithContext(ctx).WithField("record_id", id).Debug("added dht record to cache and db")

// return here and put it in the DHT asynchronously
go func() {
Expand All @@ -117,9 +117,9 @@ func (s *DHTService) PublishDHT(ctx context.Context, id string, record dht.BEP44
defer cancel()

if _, err = s.dht.Put(putCtx, record.Put()); err != nil {
logrus.WithContext(ctx).WithError(err).Errorf("error from dht.Put for record: %s", id)
logrus.WithContext(ctx).WithField("record_id", id).WithError(err).Warnf("error from dht.Put for record: %s", id)
} else {
logrus.WithContext(ctx).WithField("record", id).Debug("put record to DHT")
logrus.WithContext(ctx).WithField("record_id", id).Debug("put record to DHT")
}
}()

Expand Down Expand Up @@ -148,7 +148,7 @@ func (s *DHTService) GetDHT(ctx context.Context, id string) (*dht.BEP44Response,
logrus.WithContext(ctx).WithField("record_id", id).Info("resolved record from cache")
return &resp, nil
}
logrus.WithContext(ctx).WithError(err).WithField("record", id).Warn("failed to get record from cache, falling back to dht")
logrus.WithContext(ctx).WithError(err).WithField("record_id", id).Warn("failed to get record from cache, falling back to dht")
}

// next do a dht lookup with a timeout of 10 seconds
Expand All @@ -158,28 +158,28 @@ func (s *DHTService) GetDHT(ctx context.Context, id string) (*dht.BEP44Response,
got, err := s.dht.GetFull(getCtx, id)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
logrus.WithContext(ctx).WithField("record", id).Warn("dht lookup timed out, attempting to resolve from storage")
logrus.WithContext(ctx).WithField("record_id", id).Warn("dht lookup timed out, attempting to resolve from storage")
} else {
logrus.WithContext(ctx).WithError(err).WithField("record", id).Warn("failed to get record from dht, attempting to resolve from storage")
logrus.WithContext(ctx).WithError(err).WithField("record_id", id).Warn("failed to get record from dht, attempting to resolve from storage")
}

record, err := s.db.ReadRecord(ctx, id)
if err != nil || record == nil {
logrus.WithContext(ctx).WithError(err).WithField("record", id).Error("failed to resolve record from storage; adding to badGetCache")
logrus.WithContext(ctx).WithError(err).WithField("record_id", id).Error("failed to resolve record from storage; adding to badGetCache")

// add the key to the badGetCache to prevent spamming the DHT
if err = s.badGetCache.Set(id, []byte{0}); err != nil {
logrus.WithContext(ctx).WithError(err).WithField("record", id).Error("failed to set key in badGetCache")
logrus.WithContext(ctx).WithError(err).WithField("record_id", id).Error("failed to set key in badGetCache")
}

return nil, err
}

logrus.WithContext(ctx).WithField("record", id).Info("resolved record from storage")
logrus.WithContext(ctx).WithField("record_id", id).Info("resolved record from storage")
resp := record.Response()
// add the record back to the cache for future lookups
if err = s.addRecordToCache(id, record.Response()); err != nil {
logrus.WithError(err).WithField("record", id).Error("failed to set record in cache")
logrus.WithError(err).WithField("record_id", id).Error("failed to set record in cache")
}

return &resp, err
Expand All @@ -202,9 +202,9 @@ func (s *DHTService) GetDHT(ctx context.Context, id string) (*dht.BEP44Response,

// add the record to cache, do it here to avoid duplicate calculations
if err = s.addRecordToCache(id, resp); err != nil {
logrus.WithContext(ctx).WithField("record", id).WithError(err).Error("failed to set record in cache")
logrus.WithContext(ctx).WithField("record_id", id).WithError(err).Error("failed to set record in cache")
} else {
logrus.WithContext(ctx).WithField("record", id).Info("added record back to cache")
logrus.WithContext(ctx).WithField("record_id", id).Info("added record back to cache")
}

return &resp, nil
Expand Down
67 changes: 5 additions & 62 deletions impl/pkg/storage/db/bolt/bolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"encoding/binary"
"fmt"
"time"

"github.com/goccy/go-json"
Expand All @@ -19,7 +18,6 @@ import (

const (
dhtNamespace = "dht"
oldDHTNamespace = "pkarr"
failedNamespace = "failed"
)

Expand All @@ -40,52 +38,9 @@ func NewBolt(path string) (*Bolt, error) {
if err != nil {
return nil, err
}

// Perform the migration
go migrate(db)

return &Bolt{db: db}, nil
}

func migrate(db *bolt.DB) {
// Perform the migration within a write transaction
err := db.Update(func(tx *bolt.Tx) error {
// Create the new namespace bucket
newBucket, err := tx.CreateBucketIfNotExists([]byte(dhtNamespace))
if err != nil {
return fmt.Errorf("failed to create new namespace bucket: %v", err)
}

// Get the old namespace bucket
oldBucket := tx.Bucket([]byte(oldDHTNamespace))
if oldBucket == nil {
// If the old namespace bucket doesn't exist, there's nothing to migrate
return nil
}

// Iterate over the key-value pairs in the old namespace bucket
err = oldBucket.ForEach(func(k, v []byte) error {
// Copy each key-value pair to the new namespace bucket
err = newBucket.Put(k, v)
if err != nil {
return fmt.Errorf("failed to copy key-value pair to new namespace: %v", err)
}
return nil
})
if err != nil {
return err
}

return nil
})

if err != nil {
logrus.WithError(err).Error("failed to migrate records")
} else {
logrus.Info("migration completed successfully")
}
}

// WriteRecord writes the given record to the storage
// TODO: don't overwrite existing records, store unique seq numbers
func (b *Bolt) WriteRecord(ctx context.Context, record dht.BEP44Record) error {
Expand All @@ -98,27 +53,15 @@ func (b *Bolt) WriteRecord(ctx context.Context, record dht.BEP44Record) error {
return err
}

// write to both the old and new namespaces for now
errOld := b.write(ctx, oldDHTNamespace, record.ID(), recordBytes)
errNew := b.write(ctx, dhtNamespace, record.ID(), recordBytes)
if errOld == nil && errNew == nil {
return nil
}
if errOld != nil && errNew != nil {
return errors.New(fmt.Sprintf("old: %v, new: %v", errOld, errNew))
}
if errOld != nil {
return errOld
}
return errNew
return b.write(ctx, dhtNamespace, record.ID(), recordBytes)
}

// ReadRecord reads the record with the given id from the storage
func (b *Bolt) ReadRecord(ctx context.Context, id string) (*dht.BEP44Record, error) {
ctx, span := telemetry.GetTracer().Start(ctx, "bolt.ReadRecord")
defer span.End()

recordBytes, err := b.read(ctx, oldDHTNamespace, id)
recordBytes, err := b.read(ctx, dhtNamespace, id)
if err != nil {
return nil, err
}
Expand All @@ -144,7 +87,7 @@ func (b *Bolt) ListRecords(ctx context.Context, nextPageToken []byte, pageSize i
ctx, span := telemetry.GetTracer().Start(ctx, "bolt.ListRecords")
defer span.End()

boltRecords, err := b.readSeveral(ctx, oldDHTNamespace, nextPageToken, pageSize)
boltRecords, err := b.readSeveral(ctx, dhtNamespace, nextPageToken, pageSize)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -268,9 +211,9 @@ func (b *Bolt) RecordCount(ctx context.Context) (int, error) {

var count int
err := b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(oldDHTNamespace))
bucket := tx.Bucket([]byte(dhtNamespace))
if bucket == nil {
logrus.WithContext(ctx).WithField("namespace", oldDHTNamespace).Warn("namespace does not exist")
logrus.WithContext(ctx).WithField("namespace", dhtNamespace).Warn("namespace does not exist")
return nil
}
count = bucket.Stats().KeyN
Expand Down

0 comments on commit d703405

Please sign in to comment.