From a2724536ea66cab051dd7d14cc613a3f012b1ea5 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Sat, 1 Jun 2024 19:57:58 +0300 Subject: [PATCH 1/5] fix: cmd --- pkg/puller/puller.go | 4 ++-- pkg/storer/migration/reserveRepair.go | 12 ++++++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 9c92989c7f4..a07b52b2edc 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -343,10 +343,10 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint if err != nil { p.metrics.SyncWorkerErrCounter.Inc() if errors.Is(err, p2p.ErrPeerNotFound) { - p.logger.Debug("syncWorker interval failed, quitting", "error", err, "peer_address", address, "bin", bin, "cursor", address, "start", start, "topmost", top) + p.logger.Debug("syncWorker interval failed, quitting", "error", err, "peer_address", address, "bin", bin, "cursor", cursor, "start", start, "topmost", top) return } - loggerV2.Debug("syncWorker interval failed", "error", err, "peer_address", address, "bin", bin, "cursor", address, "start", start, "topmost", top) + loggerV2.Debug("syncWorker interval failed", "error", err, "peer_address", address, "bin", bin, "cursor", cursor, "start", start, "topmost", top) } if isHistorical { diff --git a/pkg/storer/migration/reserveRepair.go b/pkg/storer/migration/reserveRepair.go index 04fdab6e9b0..3f17dcda57a 100644 --- a/pkg/storer/migration/reserveRepair.go +++ b/pkg/storer/migration/reserveRepair.go @@ -18,6 +18,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" "github.com/ethersphere/bee/v2/pkg/swarm" "golang.org/x/sync/errgroup" + "resenje.org/multex" ) // ReserveRepairer is a migration step that removes all BinItem entries and migrates @@ -164,9 +165,15 @@ func ReserveRepairer( var eg errgroup.Group eg.SetLimit(runtime.NumCPU()) + locker := multex.New() + for _, item := range batchRadiusItems { func(item *reserve.BatchRadiusItem) { eg.Go(func() error { + + locker.Lock(item.ID()) + defer locker.Unlock(item.ID()) + return st.Run(context.Background(), func(s transaction.Store) error { chunk, err := s.ChunkStore().Get(context.Background(), item.Address) @@ -236,11 +243,12 @@ func ReserveRepairer( return err } + logger.Info("migrated all chunk entries", "new_size", batchRadiusCnt, "missing_chunks", missingChunks.Load(), "invalid_sharky_chunks", invalidSharkyChunks.Load()) + if batchRadiusCnt != chunkBinCnt { - return errors.New("index counts do not match") + return fmt.Errorf("index counts do not match, %d vs %d", batchRadiusCnt, chunkBinCnt) } - logger.Info("migrated all chunk entries", "new_size", batchRadiusCnt, "missing_chunks", missingChunks.Load(), "invalid_sharky_chunks", invalidSharkyChunks.Load()) return nil } } From 1cea4b678c912e4db2fc591881eecb2cc085cddd Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Mon, 3 Jun 2024 01:33:14 +0300 Subject: [PATCH 2/5] fix: asd --- pkg/storer/migration/reserveRepair.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pkg/storer/migration/reserveRepair.go b/pkg/storer/migration/reserveRepair.go index 3f17dcda57a..35d5c73af5e 100644 --- a/pkg/storer/migration/reserveRepair.go +++ b/pkg/storer/migration/reserveRepair.go @@ -18,7 +18,6 @@ import ( "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" "github.com/ethersphere/bee/v2/pkg/swarm" "golang.org/x/sync/errgroup" - "resenje.org/multex" ) // ReserveRepairer is a migration step that removes all BinItem entries and migrates @@ -165,15 +164,12 @@ func ReserveRepairer( var eg errgroup.Group eg.SetLimit(runtime.NumCPU()) - locker := multex.New() + logger.Info("parallel workers", "count", runtime.NumCPU()) for _, item := range batchRadiusItems { func(item *reserve.BatchRadiusItem) { eg.Go(func() error { - locker.Lock(item.ID()) - defer locker.Unlock(item.ID()) - return st.Run(context.Background(), func(s transaction.Store) error { chunk, err := s.ChunkStore().Get(context.Background(), item.Address) From 0f7e5d9a299479d3f5b205bfd264d685595ddbf6 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Mon, 3 Jun 2024 01:46:14 +0300 Subject: [PATCH 3/5] fix: logs --- pkg/storer/migration/reserveRepair.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/storer/migration/reserveRepair.go b/pkg/storer/migration/reserveRepair.go index 35d5c73af5e..b182a504c73 100644 --- a/pkg/storer/migration/reserveRepair.go +++ b/pkg/storer/migration/reserveRepair.go @@ -162,9 +162,11 @@ func ReserveRepairer( } var eg errgroup.Group - eg.SetLimit(runtime.NumCPU()) - logger.Info("parallel workers", "count", runtime.NumCPU()) + p := runtime.NumCPU() + eg.SetLimit(p) + + logger.Info("parallel workers", "count", p) for _, item := range batchRadiusItems { func(item *reserve.BatchRadiusItem) { From d18035fac0253c98044294a3f245c02fe9a64c32 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Mon, 3 Jun 2024 01:55:22 +0300 Subject: [PATCH 4/5] chore: logs --- pkg/storer/migration/reserveRepair.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/storer/migration/reserveRepair.go b/pkg/storer/migration/reserveRepair.go index b182a504c73..b6488a244fc 100644 --- a/pkg/storer/migration/reserveRepair.go +++ b/pkg/storer/migration/reserveRepair.go @@ -56,6 +56,12 @@ func ReserveRepairer( if binIds[item.Bin][item.BinID] > 1 { return false, fmt.Errorf("binID %d in bin %d already used", item.BinID, item.Bin) } + + err := st.IndexStore().Get(&reserve.ChunkBinItem{Bin: item.Bin, BinID: item.BinID}) + if err != nil { + return false, fmt.Errorf("check failed: chunkBinItem, bin %d, binID %d: %w", item.Bin, item.BinID, err) + } + return false, nil }, ) From 3ebf9b34f0ec694531bdd5132849696c426f7492 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Mon, 3 Jun 2024 01:59:09 +0300 Subject: [PATCH 5/5] fix: change log type --- pkg/storer/migration/reserveRepair.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storer/migration/reserveRepair.go b/pkg/storer/migration/reserveRepair.go index b6488a244fc..baab404e0ae 100644 --- a/pkg/storer/migration/reserveRepair.go +++ b/pkg/storer/migration/reserveRepair.go @@ -69,7 +69,7 @@ func ReserveRepairer( err := checkBinIDs() if err != nil { - logger.Error(err, "check failed") + logger.Info("pre-repair check failed", "error", err) } // STEP 0