diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 9c92989c7f4..a07b52b2edc 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -343,10 +343,10 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint if err != nil { p.metrics.SyncWorkerErrCounter.Inc() if errors.Is(err, p2p.ErrPeerNotFound) { - p.logger.Debug("syncWorker interval failed, quitting", "error", err, "peer_address", address, "bin", bin, "cursor", address, "start", start, "topmost", top) + p.logger.Debug("syncWorker interval failed, quitting", "error", err, "peer_address", address, "bin", bin, "cursor", cursor, "start", start, "topmost", top) return } - loggerV2.Debug("syncWorker interval failed", "error", err, "peer_address", address, "bin", bin, "cursor", address, "start", start, "topmost", top) + loggerV2.Debug("syncWorker interval failed", "error", err, "peer_address", address, "bin", bin, "cursor", cursor, "start", start, "topmost", top) } if isHistorical { diff --git a/pkg/storer/migration/reserveRepair.go b/pkg/storer/migration/reserveRepair.go index 04fdab6e9b0..3f17dcda57a 100644 --- a/pkg/storer/migration/reserveRepair.go +++ b/pkg/storer/migration/reserveRepair.go @@ -18,6 +18,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" "github.com/ethersphere/bee/v2/pkg/swarm" "golang.org/x/sync/errgroup" + "resenje.org/multex" ) // ReserveRepairer is a migration step that removes all BinItem entries and migrates @@ -164,9 +165,15 @@ func ReserveRepairer( var eg errgroup.Group eg.SetLimit(runtime.NumCPU()) + locker := multex.New() + for _, item := range batchRadiusItems { func(item *reserve.BatchRadiusItem) { eg.Go(func() error { + + locker.Lock(item.ID()) + defer locker.Unlock(item.ID()) + return st.Run(context.Background(), func(s transaction.Store) error { chunk, err := s.ChunkStore().Get(context.Background(), item.Address) @@ -236,11 +243,12 @@ func ReserveRepairer( return err } + logger.Info("migrated all chunk entries", "new_size", batchRadiusCnt, "missing_chunks", missingChunks.Load(), "invalid_sharky_chunks", invalidSharkyChunks.Load()) + if batchRadiusCnt != chunkBinCnt { - return errors.New("index counts do not match") + return fmt.Errorf("index counts do not match, %d vs %d", batchRadiusCnt, chunkBinCnt) } - logger.Info("migrated all chunk entries", "new_size", batchRadiusCnt, "missing_chunks", missingChunks.Load(), "invalid_sharky_chunks", invalidSharkyChunks.Load()) return nil } }