Skip to content

Commit

Permalink
fix: cmd
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Jun 1, 2024
1 parent 97e7ee6 commit a272453
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
4 changes: 2 additions & 2 deletions pkg/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,10 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint
if err != nil {
p.metrics.SyncWorkerErrCounter.Inc()
if errors.Is(err, p2p.ErrPeerNotFound) {
p.logger.Debug("syncWorker interval failed, quitting", "error", err, "peer_address", address, "bin", bin, "cursor", address, "start", start, "topmost", top)
p.logger.Debug("syncWorker interval failed, quitting", "error", err, "peer_address", address, "bin", bin, "cursor", cursor, "start", start, "topmost", top)
return
}
loggerV2.Debug("syncWorker interval failed", "error", err, "peer_address", address, "bin", bin, "cursor", address, "start", start, "topmost", top)
loggerV2.Debug("syncWorker interval failed", "error", err, "peer_address", address, "bin", bin, "cursor", cursor, "start", start, "topmost", top)
}

if isHistorical {
Expand Down
12 changes: 10 additions & 2 deletions pkg/storer/migration/reserveRepair.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/storer/internal/transaction"
"github.com/ethersphere/bee/v2/pkg/swarm"
"golang.org/x/sync/errgroup"
"resenje.org/multex"
)

// ReserveRepairer is a migration step that removes all BinItem entries and migrates
Expand Down Expand Up @@ -164,9 +165,15 @@ func ReserveRepairer(
var eg errgroup.Group
eg.SetLimit(runtime.NumCPU())

locker := multex.New()

for _, item := range batchRadiusItems {
func(item *reserve.BatchRadiusItem) {
eg.Go(func() error {

locker.Lock(item.ID())
defer locker.Unlock(item.ID())

return st.Run(context.Background(), func(s transaction.Store) error {

chunk, err := s.ChunkStore().Get(context.Background(), item.Address)
Expand Down Expand Up @@ -236,11 +243,12 @@ func ReserveRepairer(
return err
}

logger.Info("migrated all chunk entries", "new_size", batchRadiusCnt, "missing_chunks", missingChunks.Load(), "invalid_sharky_chunks", invalidSharkyChunks.Load())

if batchRadiusCnt != chunkBinCnt {
return errors.New("index counts do not match")
return fmt.Errorf("index counts do not match, %d vs %d", batchRadiusCnt, chunkBinCnt)
}

logger.Info("migrated all chunk entries", "new_size", batchRadiusCnt, "missing_chunks", missingChunks.Load(), "invalid_sharky_chunks", invalidSharkyChunks.Load())
return nil
}
}

0 comments on commit a272453

Please sign in to comment.