Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: pushsync improvements #4709

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 29 additions & 17 deletions pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,13 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
ps.metrics.TotalRequests.Inc()

var (
sentErrorsLeft = 1
preemptiveTicker <-chan time.Time
inflight int
parallelForwards = maxMultiplexForwards
sentErrorsLeft = 1
preemptiveTicker <-chan time.Time
inflight int
parallelForwards = maxMultiplexForwards
onlyWithinRadiusPeers = false
resultChan = make(chan receiptResult)
retryC = make(chan struct{}, max(1, parallelForwards))
)

if origin {
Expand All @@ -337,10 +340,6 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
sentErrorsLeft = maxPushErrors
}

resultChan := make(chan receiptResult)

retryC := make(chan struct{}, max(1, parallelForwards))

retry := func() {
select {
case retryC <- struct{}{}:
Expand All @@ -363,7 +362,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
// If no peer can be found from an origin peer, the origin peer may store the chunk.
// Non-origin peers store the chunk if the chunk is within depth.
// For non-origin peers, if the chunk is not within depth, they may store the chunk if they are the closest peer to the chunk.
peer, err := ps.closestPeer(ch.Address(), origin)
peer, err := ps.closestPeer(ch.Address(), origin, onlyWithinRadiusPeers)
if errors.Is(err, topology.ErrNotFound) {
if ps.skipList.PruneExpiresAfter(ch.Address(), overDraftRefresh) == 0 { //no overdraft peers, we have depleted ALL peers
if inflight == 0 {
Expand Down Expand Up @@ -399,8 +398,10 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
}

// since we can reach into the neighborhood of the chunk
// act as the multiplexer and push the chunk in parallel to multiple peers
// act as the multiplexer and push the chunk in parallel to multiple peers.
// the next peers to try must be part of the neighborhood
if swarm.Proximity(peer.Bytes(), ch.Address().Bytes()) >= ps.store.StorageRadius() {
onlyWithinRadiusPeers = true
for ; parallelForwards > 0; parallelForwards-- {
retry()
sentErrorsLeft++
Expand All @@ -427,12 +428,17 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
ps.measurePushPeer(result.pushTime, result.err)

if result.err == nil {
return result.receipt, nil
// shallow receipt check
if depth := swarm.Proximity(result.receipt.Address, ch.Address().Bytes()); depth >= ps.store.StorageRadius() {
return result.receipt, nil
} else {
ps.logger.Debug("shallow receipt", "chunk_address", ch.Address(), "depth", depth)
}
} else {
ps.metrics.TotalFailedSendAttempts.Inc()
ps.logger.Debug("could not push to peer", "chunk_address", ch.Address(), "peer_address", result.peer, "error", result.err)
}

ps.metrics.TotalFailedSendAttempts.Inc()
ps.logger.Debug("could not push to peer", "chunk_address", ch.Address(), "peer_address", result.peer, "error", result.err)

sentErrorsLeft--

retry()
Expand All @@ -442,14 +448,20 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
return nil, ErrNoPush
}

func (ps *PushSync) closestPeer(chunkAddress swarm.Address, origin bool) (swarm.Address, error) {
func (ps *PushSync) closestPeer(chunkAddress swarm.Address, origin bool, withinRadius bool) (peer swarm.Address, err error) {

skipList := ps.skipList.ChunkPeers(chunkAddress)
includeSelf := ps.fullNode && !origin

peer, err := ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{Reachable: true, Healthy: true}, skipList...)
defer func() {
if withinRadius && err == nil && swarm.Proximity(chunkAddress.Bytes(), peer.Bytes()) < ps.store.StorageRadius() {
peer, err = swarm.ZeroAddress, topology.ErrNotFound
}
}()

peer, err = ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{Reachable: true, Healthy: true}, skipList...)
if errors.Is(err, topology.ErrNotFound) {
peer, err := ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{Reachable: true}, skipList...)
peer, err = ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{Reachable: true}, skipList...)
if errors.Is(err, topology.ErrNotFound) {
return ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{}, skipList...)
}
Expand Down
Loading