diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 9c92989c7f4..a07b52b2edc 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -343,10 +343,10 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint if err != nil { p.metrics.SyncWorkerErrCounter.Inc() if errors.Is(err, p2p.ErrPeerNotFound) { - p.logger.Debug("syncWorker interval failed, quitting", "error", err, "peer_address", address, "bin", bin, "cursor", address, "start", start, "topmost", top) + p.logger.Debug("syncWorker interval failed, quitting", "error", err, "peer_address", address, "bin", bin, "cursor", cursor, "start", start, "topmost", top) return } - loggerV2.Debug("syncWorker interval failed", "error", err, "peer_address", address, "bin", bin, "cursor", address, "start", start, "topmost", top) + loggerV2.Debug("syncWorker interval failed", "error", err, "peer_address", address, "bin", bin, "cursor", cursor, "start", start, "topmost", top) } if isHistorical { diff --git a/pkg/storer/migration/reserveRepair.go b/pkg/storer/migration/reserveRepair.go index 04fdab6e9b0..baab404e0ae 100644 --- a/pkg/storer/migration/reserveRepair.go +++ b/pkg/storer/migration/reserveRepair.go @@ -56,6 +56,12 @@ func ReserveRepairer( if binIds[item.Bin][item.BinID] > 1 { return false, fmt.Errorf("binID %d in bin %d already used", item.BinID, item.Bin) } + + err := st.IndexStore().Get(&reserve.ChunkBinItem{Bin: item.Bin, BinID: item.BinID}) + if err != nil { + return false, fmt.Errorf("check failed: chunkBinItem, bin %d, binID %d: %w", item.Bin, item.BinID, err) + } + return false, nil }, ) @@ -63,7 +69,7 @@ func ReserveRepairer( err := checkBinIDs() if err != nil { - logger.Error(err, "check failed") + logger.Info("pre-repair check failed", "error", err) } // STEP 0 @@ -162,11 +168,16 @@ func ReserveRepairer( } var eg errgroup.Group - eg.SetLimit(runtime.NumCPU()) + + p := runtime.NumCPU() + eg.SetLimit(p) + + logger.Info("parallel workers", "count", p) for _, item := range batchRadiusItems { func(item *reserve.BatchRadiusItem) { eg.Go(func() error { + return st.Run(context.Background(), func(s transaction.Store) error { chunk, err := s.ChunkStore().Get(context.Background(), item.Address) @@ -236,11 +247,12 @@ func ReserveRepairer( return err } + logger.Info("migrated all chunk entries", "new_size", batchRadiusCnt, "missing_chunks", missingChunks.Load(), "invalid_sharky_chunks", invalidSharkyChunks.Load()) + if batchRadiusCnt != chunkBinCnt { - return errors.New("index counts do not match") + return fmt.Errorf("index counts do not match, %d vs %d", batchRadiusCnt, chunkBinCnt) } - logger.Info("migrated all chunk entries", "new_size", batchRadiusCnt, "missing_chunks", missingChunks.Load(), "invalid_sharky_chunks", invalidSharkyChunks.Load()) return nil } }