Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
gabe committed Apr 11, 2024
1 parent ec7c876 commit 9c64ef9
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 12 deletions.
2 changes: 1 addition & 1 deletion impl/pkg/server/pkarr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestPkarrRouter(t *testing.T) {

t.Run("test get not found", func(t *testing.T) {
w := httptest.NewRecorder()
suffix := "aaa"
suffix := "uqaj3fcr9db6jg6o9pjs53iuftyj45r46aubogfaceqjbo6pp9sy"
req := httptest.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", testServerURL, suffix), nil)
c := newRequestContextWithParams(w, req, map[string]string{IDParam: suffix})
pkarrRouter.GetRecord(c)
Expand Down
45 changes: 34 additions & 11 deletions impl/pkg/service/pkarr.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ const recordSizeLimit = 1000

// PkarrService is the Pkarr service responsible for managing the Pkarr DHT and reading/writing records
type PkarrService struct {
cfg *config.Config
db storage.Storage
dht *dht.DHT
cache *bigcache.BigCache
scheduler *dhtint.Scheduler
cfg *config.Config
db storage.Storage
dht *dht.DHT
cache *bigcache.BigCache
badGetCache *bigcache.BigCache
scheduler *dhtint.Scheduler
}

// NewPkarrService returns a new instance of the Pkarr service
Expand All @@ -41,7 +42,7 @@ func NewPkarrService(cfg *config.Config, db storage.Storage, d *dht.DHT) (*Pkarr
return nil, ssiutil.LoggingNewError("config is required")
}

// create and start cache and scheduler
// create and start get cache
cacheTTL := time.Duration(cfg.PkarrConfig.CacheTTLSeconds) * time.Second
cacheConfig := bigcache.DefaultConfig(cacheTTL)
cacheConfig.MaxEntrySize = recordSizeLimit
Expand All @@ -51,13 +52,24 @@ func NewPkarrService(cfg *config.Config, db storage.Storage, d *dht.DHT) (*Pkarr
if err != nil {
return nil, ssiutil.LoggingErrorMsg(err, "failed to instantiate cache")
}

// create a new cache for bad gets to prevent spamming the DHT
cacheConfig.LifeWindow = 120 * time.Second
cacheConfig.CleanWindow = 60 * time.Second
badGetCache, err := bigcache.New(context.Background(), cacheConfig)
if err != nil {
return nil, ssiutil.LoggingErrorMsg(err, "failed to instantiate badGetCache")
}

// start scheduler for republishing
scheduler := dhtint.NewScheduler()
svc := PkarrService{
cfg: cfg,
db: db,
dht: d,
cache: cache,
scheduler: &scheduler,
cfg: cfg,
db: db,
dht: d,
cache: cache,
badGetCache: badGetCache,
scheduler: &scheduler,
}
if err = scheduler.Schedule(cfg.PkarrConfig.RepublishCRON, svc.republish); err != nil {
return nil, ssiutil.LoggingErrorMsg(err, "failed to start republisher")
Expand Down Expand Up @@ -125,6 +137,11 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*pkarr.Response
return nil, ssiutil.LoggingCtxErrorMsgf(ctx, err, "failed to decode z-base-32 encoded ID: %s", id)
}

// if the key is in the badGetCache, return an error
if _, err := s.badGetCache.Get(id); err == nil {
return nil, ssiutil.LoggingCtxErrorMsgf(ctx, err, "key [%s] looked up too frequently, please wait a bit before trying again", id)
}

// first do a cache lookup
if got, err := s.cache.Get(id); err == nil {
var resp pkarr.Response
Expand All @@ -149,6 +166,12 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*pkarr.Response
record, err := s.db.ReadRecord(ctx, rawID)
if err != nil || record == nil {
logrus.WithContext(ctx).WithError(err).WithField("record", id).Error("failed to resolve pkarr record from storage")

// add the key to the badGetCache to prevent spamming the DHT
if err = s.badGetCache.Set(id, []byte{0}); err != nil {
logrus.WithContext(ctx).WithError(err).WithField("record", id).Error("failed to set key in badGetCache")
}

return nil, err
}

Expand Down
11 changes: 11 additions & 0 deletions impl/pkg/service/pkarr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,17 @@ func TestPkarrService(t *testing.T) {
assert.Equal(t, putMsg.Seq, got.Seq)
})

t.Run("test get record with invalid ID", func(t *testing.T) {
got, err := svc.GetPkarr(context.Background(), "uqaj3fcr9db6jg6o9pjs53iuftyj45r46aubogfaceqjbo6pp9sy")
assert.NoError(t, err)
assert.Empty(t, got)

// try it again to make sure the cache is working
got, err = svc.GetPkarr(context.Background(), "uqaj3fcr9db6jg6o9pjs53iuftyj45r46aubogfaceqjbo6pp9sy")
assert.ErrorContains(t, err, "looked up too frequently, please wait a bit before trying again")
assert.Empty(t, got)
})

t.Cleanup(func() { svc.Close() })
}

Expand Down

0 comments on commit 9c64ef9

Please sign in to comment.