diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 3feeed67bf4..862b69daf3e 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -324,10 +324,13 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo ps.metrics.TotalRequests.Inc() var ( - sentErrorsLeft = 1 - preemptiveTicker <-chan time.Time - inflight int - parallelForwards = maxMultiplexForwards + sentErrorsLeft = 1 + preemptiveTicker <-chan time.Time + inflight int + parallelForwards = maxMultiplexForwards + onlyWithinRadiusPeers = false + resultChan = make(chan receiptResult) + retryC = make(chan struct{}, max(1, parallelForwards)) ) if origin { @@ -337,10 +340,6 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo sentErrorsLeft = maxPushErrors } - resultChan := make(chan receiptResult) - - retryC := make(chan struct{}, max(1, parallelForwards)) - retry := func() { select { case retryC <- struct{}{}: @@ -363,7 +362,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo // If no peer can be found from an origin peer, the origin peer may store the chunk. // Non-origin peers store the chunk if the chunk is within depth. // For non-origin peers, if the chunk is not within depth, they may store the chunk if they are the closest peer to the chunk. - peer, err := ps.closestPeer(ch.Address(), origin) + peer, err := ps.closestPeer(ch.Address(), origin, onlyWithinRadiusPeers) if errors.Is(err, topology.ErrNotFound) { if ps.skipList.PruneExpiresAfter(ch.Address(), overDraftRefresh) == 0 { //no overdraft peers, we have depleted ALL peers if inflight == 0 { @@ -399,8 +398,10 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo } // since we can reach into the neighborhood of the chunk - // act as the multiplexer and push the chunk in parallel to multiple peers + // act as the multiplexer and push the chunk in parallel to multiple peers. + // the next peers to try must be part of the neighborhood if swarm.Proximity(peer.Bytes(), ch.Address().Bytes()) >= ps.store.StorageRadius() { + onlyWithinRadiusPeers = true for ; parallelForwards > 0; parallelForwards-- { retry() sentErrorsLeft++ @@ -427,12 +428,17 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo ps.measurePushPeer(result.pushTime, result.err) if result.err == nil { - return result.receipt, nil + // shallow receipt check + if depth := swarm.Proximity(result.receipt.Address, ch.Address().Bytes()); depth >= ps.store.StorageRadius() { + return result.receipt, nil + } else { + ps.logger.Debug("shallow receipt", "chunk_address", ch.Address(), "depth", depth) + } + } else { + ps.metrics.TotalFailedSendAttempts.Inc() + ps.logger.Debug("could not push to peer", "chunk_address", ch.Address(), "peer_address", result.peer, "error", result.err) } - ps.metrics.TotalFailedSendAttempts.Inc() - ps.logger.Debug("could not push to peer", "chunk_address", ch.Address(), "peer_address", result.peer, "error", result.err) - sentErrorsLeft-- retry() @@ -442,14 +448,20 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo return nil, ErrNoPush } -func (ps *PushSync) closestPeer(chunkAddress swarm.Address, origin bool) (swarm.Address, error) { +func (ps *PushSync) closestPeer(chunkAddress swarm.Address, origin bool, withinRadius bool) (peer swarm.Address, err error) { skipList := ps.skipList.ChunkPeers(chunkAddress) includeSelf := ps.fullNode && !origin - peer, err := ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{Reachable: true, Healthy: true}, skipList...) + defer func() { + if withinRadius && err == nil && swarm.Proximity(chunkAddress.Bytes(), peer.Bytes()) < ps.store.StorageRadius() { + peer, err = swarm.ZeroAddress, topology.ErrNotFound + } + }() + + peer, err = ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{Reachable: true, Healthy: true}, skipList...) if errors.Is(err, topology.ErrNotFound) { - peer, err := ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{Reachable: true}, skipList...) + peer, err = ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{Reachable: true}, skipList...) if errors.Is(err, topology.ErrNotFound) { return ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{}, skipList...) }