From d200c95f6ccb0b9075274c729beecb52129cdb4f Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Mon, 9 Sep 2024 16:15:40 +0300 Subject: [PATCH] fix: move pullsync rate limiter and remove reserve epoch in migration (#4804) --- pkg/pullsync/pullsync.go | 20 ++++++++++---------- pkg/storer/migration/step_06.go | 8 ++++++++ pkg/storer/migration/step_06_test.go | 6 ++++++ 3 files changed, 24 insertions(+), 10 deletions(-) diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index a30308f794a..dfc24242686 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -50,7 +50,7 @@ const ( DefaultMaxPage uint64 = 250 pageTimeout = time.Second makeOfferTimeout = 15 * time.Minute - handleMaxChunksPerSecond = 100 + handleMaxChunksPerSecond = 250 handleRequestsLimitRate = time.Second / handleMaxChunksPerSecond // handle max 100 chunks per second per peer ) @@ -194,6 +194,15 @@ func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Strea return fmt.Errorf("process want: %w", err) } + // slow down future requests + waitDur, err := s.limiter.Wait(streamCtx, p.Address.ByteString(), max(1, len(chs))) + if err != nil { + return fmt.Errorf("rate limiter: %w", err) + } + if waitDur > 0 { + s.logger.Debug("rate limited peer", "wait_duration", waitDur, "peer_address", p.Address) + } + for _, c := range chs { var stamp []byte if c.Stamp() != nil { @@ -210,15 +219,6 @@ func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Strea s.metrics.Sent.Inc() } - // slow down future requests - waitDur, err := s.limiter.Wait(streamCtx, p.Address.ByteString(), max(1, len(chs))) - if err != nil { - return fmt.Errorf("rate limiter: %w", err) - } - if waitDur > 0 { - s.logger.Debug("rate limited peer", "wait_duration", waitDur, "peer_address", p.Address) - } - return nil } diff --git a/pkg/storer/migration/step_06.go b/pkg/storer/migration/step_06.go index 9cd85273920..05237a3dd40 100644 --- a/pkg/storer/migration/step_06.go +++ b/pkg/storer/migration/step_06.go @@ -54,6 +54,14 @@ func addStampHash(logger log.Logger, st transaction.Storage) (int64, int64, erro return 0, 0, fmt.Errorf("pre-migration check: index counts do not match, %d vs %d. It's recommended that the repair-reserve cmd is run first", preBatchRadiusCnt, preChunkBinCnt) } + // Delete epoch timestamp + err = st.Run(context.Background(), func(s transaction.Store) error { + return s.IndexStore().Delete(&reserve.EpochItem{}) + }) + if err != nil { + return 0, 0, err + } + itemC := make(chan *reserve.BatchRadiusItemV1) errC := make(chan error, 1) diff --git a/pkg/storer/migration/step_06_test.go b/pkg/storer/migration/step_06_test.go index 0c93a7b3807..d6e44d7e872 100644 --- a/pkg/storer/migration/step_06_test.go +++ b/pkg/storer/migration/step_06_test.go @@ -101,6 +101,12 @@ func Test_Step_06(t *testing.T) { err = localmigration.Step_06(store)() require.NoError(t, err) + has, err := store.IndexStore().Has(&reserve.EpochItem{}) + if has { + t.Fatal("epoch item should be deleted") + } + require.NoError(t, err) + checkBatchRadiusItems(t, store.IndexStore(), len(chunks), batchRadiusItems) checkChunkBinItems(t, store.IndexStore(), len(chunks), chunkBinItems) checkStampIndex(t, store.IndexStore(), len(chunks), stampIndexItems)