Skip to content

Commit

Permalink
Merge pull request #5468 from spacemeshos/backport-1.3-fix-limiting-a…
Browse files Browse the repository at this point in the history
…tx-deps

[backport] Don't limit get hash requests for fetching ATX deps
  • Loading branch information
poszu authored Jan 18, 2024
2 parents 9f3561a + 06c8efe commit 489db02
Show file tree
Hide file tree
Showing 14 changed files with 190 additions and 196 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,18 @@

See [RELEASE](./RELEASE.md) for workflow instructions.

## Release v1.3.4

### Highlights

### Features

### Improvements

* [#5467](https://github.com/spacemeshos/go-spacemesh/pull/5467)
Fix a bug that could cause ATX sync to stall because of exhausted limit of concurrent requests for dependencies.
Fetching dependencies of an ATX is not limited anymore.

## Release v1.3.3

### Improvements
Expand Down
2 changes: 1 addition & 1 deletion activation/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ func (h *Handler) FetchReferences(ctx context.Context, atx *types.ActivationTx)
return nil
}

if err := h.fetcher.GetAtxs(ctx, maps.Keys(atxIDs)); err != nil {
if err := h.fetcher.GetAtxs(ctx, maps.Keys(atxIDs), system.WithoutLimiting()); err != nil {
dbg := fmt.Sprintf("prev %v pos %v commit %v", atx.PrevATXID, atx.PositioningATX, atx.CommitmentATX)
return fmt.Errorf("fetch referenced atxs (%s): %w", dbg, err)
}
Expand Down
24 changes: 12 additions & 12 deletions activation/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/go-spacemesh/sql/identities"
"github.com/spacemeshos/go-spacemesh/system"
"github.com/spacemeshos/go-spacemesh/system/mocks"
)

Expand Down Expand Up @@ -1183,8 +1184,8 @@ func TestHandler_HandleGossipAtx(t *testing.T) {
atxHdlr.mclock.EXPECT().CurrentLayer().Return(second.PublishEpoch.FirstLayer())
atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(gomock.Any(), gomock.Any())
atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), second.GetPoetProofRef())
atxHdlr.mockFetch.EXPECT().GetAtxs(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, ids []types.ATXID) error {
atxHdlr.mockFetch.EXPECT().GetAtxs(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, ids []types.ATXID, _ ...system.GetAtxOpt) error {
require.ElementsMatch(t, []types.ATXID{first.ID()}, ids)
data, err := codec.Encode(first)
require.NoError(t, err)
Expand Down Expand Up @@ -1475,7 +1476,7 @@ func TestHandler_FetchReferences(t *testing.T) {

atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), atx.GetPoetProofRef())
atxHdlr.mockFetch.EXPECT().
GetAtxs(gomock.Any(), gomock.InAnyOrder([]types.ATXID{atx.PositioningATX, atx.PrevATXID})).
GetAtxs(gomock.Any(), gomock.InAnyOrder([]types.ATXID{atx.PositioningATX, atx.PrevATXID}), gomock.Any()).
Return(nil)
require.NoError(t, atxHdlr.FetchReferences(context.Background(), atx))
})
Expand All @@ -1490,7 +1491,7 @@ func TestHandler_FetchReferences(t *testing.T) {
atx := newAtx(t, sig, challenge, nipost, 2, coinbase)

atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), atx.GetPoetProofRef())
atxHdlr.mockFetch.EXPECT().GetAtxs(gomock.Any(), []types.ATXID{atx.PrevATXID}).Return(nil)
atxHdlr.mockFetch.EXPECT().GetAtxs(gomock.Any(), []types.ATXID{atx.PrevATXID}, gomock.Any()).Return(nil)
require.NoError(t, atxHdlr.FetchReferences(context.Background(), atx))
})

Expand All @@ -1504,7 +1505,7 @@ func TestHandler_FetchReferences(t *testing.T) {
atx := newAtx(t, sig, challenge, nipost, 2, coinbase)

atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), atx.GetPoetProofRef())
atxHdlr.mockFetch.EXPECT().GetAtxs(gomock.Any(), []types.ATXID{atx.PrevATXID}).Return(nil)
atxHdlr.mockFetch.EXPECT().GetAtxs(gomock.Any(), []types.ATXID{atx.PrevATXID}, gomock.Any()).Return(nil)
require.NoError(t, atxHdlr.FetchReferences(context.Background(), atx))
})

Expand All @@ -1519,9 +1520,8 @@ func TestHandler_FetchReferences(t *testing.T) {
atx.CommitmentATX = &commitATX

atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), atx.GetPoetProofRef())
atxHdlr.mockFetch.EXPECT().
GetAtxs(gomock.Any(), gomock.InAnyOrder([]types.ATXID{atx.PositioningATX, *atx.CommitmentATX})).
Return(nil)
expectedIds := []types.ATXID{atx.PositioningATX, *atx.CommitmentATX}
atxHdlr.mockFetch.EXPECT().GetAtxs(gomock.Any(), gomock.InAnyOrder(expectedIds), gomock.Any()).Return(nil)
require.NoError(t, atxHdlr.FetchReferences(context.Background(), atx))
})

Expand All @@ -1536,7 +1536,7 @@ func TestHandler_FetchReferences(t *testing.T) {
atx.CommitmentATX = &goldenATXID

atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), atx.GetPoetProofRef())
atxHdlr.mockFetch.EXPECT().GetAtxs(gomock.Any(), []types.ATXID{atx.PositioningATX}).Return(nil)
atxHdlr.mockFetch.EXPECT().GetAtxs(gomock.Any(), []types.ATXID{atx.PositioningATX}, gomock.Any()).Return(nil)
require.NoError(t, atxHdlr.FetchReferences(context.Background(), atx))
})

Expand All @@ -1563,7 +1563,7 @@ func TestHandler_FetchReferences(t *testing.T) {
atx := newAtx(t, sig, challenge, nipost, 2, coinbase)

atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), atx.GetPoetProofRef())
atxHdlr.mockFetch.EXPECT().GetAtxs(gomock.Any(), []types.ATXID{atx.PrevATXID}).Return(nil)
atxHdlr.mockFetch.EXPECT().GetAtxs(gomock.Any(), []types.ATXID{atx.PrevATXID}, gomock.Any()).Return(nil)
require.NoError(t, atxHdlr.FetchReferences(context.Background(), atx))
})

Expand All @@ -1590,7 +1590,7 @@ func TestHandler_FetchReferences(t *testing.T) {
atx := newAtx(t, sig, challenge, nipost, 2, coinbase)

atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), atx.GetPoetProofRef())
atxHdlr.mockFetch.EXPECT().GetAtxs(gomock.Any(), []types.ATXID{atx.PrevATXID}).Return(errors.New("pooh"))
atxHdlr.mockFetch.EXPECT().GetAtxs(gomock.Any(), []types.ATXID{atx.PrevATXID}, gomock.Any()).Return(errors.New("oh"))
require.Error(t, atxHdlr.FetchReferences(context.Background(), atx))
})
}
Expand Down Expand Up @@ -1685,7 +1685,7 @@ func TestHandler_AtxWeight(t *testing.T) {
require.ElementsMatch(t, []types.Hash32{atx1.ID().Hash32(), proofRef}, got)
})
atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), proofRef)
atxHdlr.mockFetch.EXPECT().GetAtxs(gomock.Any(), gomock.Any())
atxHdlr.mockFetch.EXPECT().GetAtxs(gomock.Any(), gomock.Any(), gomock.Any())
atxHdlr.mValidator.EXPECT().
NIPost(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(leaves, nil)
Expand Down
2 changes: 1 addition & 1 deletion checkpoint/recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func validateAndPreserveData(
lg,
activation.PoetConfig{},
)
mfetch.EXPECT().GetAtxs(gomock.Any(), gomock.Any()).AnyTimes()
mfetch.EXPECT().GetAtxs(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
for i, vatx := range deps {
encoded, err := codec.Encode(vatx)
require.NoError(tb, err)
Expand Down
15 changes: 13 additions & 2 deletions fetch/mesh_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,28 @@ import (
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/system"
)

var errBadRequest = errors.New("invalid request")

// GetAtxs gets the data for given atx IDs and validates them. returns an error if at least one ATX cannot be fetched.
func (f *Fetch) GetAtxs(ctx context.Context, ids []types.ATXID) error {
func (f *Fetch) GetAtxs(ctx context.Context, ids []types.ATXID, opts ...system.GetAtxOpt) error {
if len(ids) == 0 {
return nil
}
f.logger.WithContext(ctx).With().Info("requesting atxs from peer", log.Int("num_atxs", len(ids)))

options := system.GetAtxOpts{}
for _, opt := range opts {
opt(&options)
}

f.logger.WithContext(ctx).With().
Info("requesting atxs from peer", log.Int("num_atxs", len(ids)), log.Bool("limiting", !options.LimitingOff))
hashes := types.ATXIDsToHashes(ids)
if options.LimitingOff {
return f.getHashes(ctx, hashes, datastore.ATXDB, f.validators.atx.HandleMessage)
}
return f.getHashes(ctx, hashes, datastore.ATXDB, f.validators.atx.HandleMessage, withLimiter(f.getAtxsLimiter))
}

Expand Down
132 changes: 74 additions & 58 deletions fetch/mesh_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package fetch
import (
"context"
"errors"
"fmt"
"os"
"sync"
"testing"
Expand All @@ -24,6 +25,7 @@ import (
"github.com/spacemeshos/go-spacemesh/p2p/server"
"github.com/spacemeshos/go-spacemesh/signing"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/system"
)

const (
Expand Down Expand Up @@ -808,68 +810,82 @@ func Test_GetAtxsLimiting(t *testing.T) {
getAtxConcurrency = 10
)

srv := server.New(
mesh.Hosts()[1],
hashProtocol,
func(_ context.Context, data []byte) ([]byte, error) {
var requestBatch RequestBatch
require.NoError(t, codec.Decode(data, &requestBatch))
resBatch := ResponseBatch{
ID: requestBatch.ID,
}
require.Len(t, requestBatch.Requests, getAtxConcurrency)
for _, r := range requestBatch.Requests {
resBatch.Responses = append(resBatch.Responses, ResponseMessage{Hash: r.Hash})
}
response, err := codec.Encode(&resBatch)
require.NoError(t, err)
return response, nil
},
)

var (
eg errgroup.Group
ctx, cancel = context.WithCancel(context.Background())
)
defer cancel()
eg.Go(func() error {
return srv.Run(ctx)
})
t.Cleanup(func() {
assert.NoError(t, eg.Wait())
})
for _, withLimiting := range []bool{false, true} {
t.Run(fmt.Sprintf("with limiting: %v", withLimiting), func(t *testing.T) {
srv := server.New(
mesh.Hosts()[1],
hashProtocol,
func(_ context.Context, data []byte) ([]byte, error) {
var requestBatch RequestBatch
require.NoError(t, codec.Decode(data, &requestBatch))
resBatch := ResponseBatch{
ID: requestBatch.ID,
}
if withLimiting {
// should do only `cfg.GetAtxsConcurrency` requests at a time even though batch size is 1000
require.Len(t, requestBatch.Requests, getAtxConcurrency)
} else {
require.Len(t, requestBatch.Requests, totalRequests)
}
for _, r := range requestBatch.Requests {
resBatch.Responses = append(resBatch.Responses, ResponseMessage{Hash: r.Hash})
}
response, err := codec.Encode(&resBatch)
require.NoError(t, err)
return response, nil
},
)

cfg := DefaultConfig()
// should do only `cfg.GetAtxsConcurrency` requests at a time even though batch size is 1000
cfg.BatchSize = 1000
cfg.GetAtxsConcurrency = getAtxConcurrency
var (
eg errgroup.Group
ctx, cancel = context.WithCancel(context.Background())
)
defer cancel()
eg.Go(func() error {
return srv.Run(ctx)
})
t.Cleanup(func() {
assert.NoError(t, eg.Wait())
})

cfg := DefaultConfig()
// should do only `cfg.GetAtxsConcurrency` requests at a time even though batch size is 1000
cfg.BatchSize = 1000
cfg.QueueSize = 1000
cfg.GetAtxsConcurrency = getAtxConcurrency

cdb := datastore.NewCachedDB(sql.InMemory(), logtest.New(t))
client := server.New(mesh.Hosts()[0], hashProtocol, nil)
host, err := p2p.Upgrade(mesh.Hosts()[0])
require.NoError(t, err)
f := NewFetch(cdb, host,
WithContext(context.Background()),
withServers(map[string]requester{hashProtocol: client}),
WithConfig(cfg),
)

cdb := datastore.NewCachedDB(sql.InMemory(), logtest.New(t))
client := server.New(mesh.Hosts()[0], hashProtocol, nil)
host, err := p2p.Upgrade(mesh.Hosts()[0])
require.NoError(t, err)
f := NewFetch(cdb, host,
WithContext(context.Background()),
withServers(map[string]requester{hashProtocol: client}),
WithConfig(cfg),
)
atxValidatorMock := mocks.NewMockSyncValidator(gomock.NewController(t))
f.validators = &dataValidators{
atx: atxValidatorMock,
}
require.NoError(t, f.Start())
t.Cleanup(f.Stop)

var atxIds []types.ATXID
for i := 0; i < totalRequests; i++ {
id := types.RandomATXID()
atxIds = append(atxIds, id)
atxValidatorMock.EXPECT().HandleMessage(gomock.Any(), id.Hash32(), mesh.Hosts()[1].ID(), gomock.Any())
}

atxValidatorMock := mocks.NewMockSyncValidator(gomock.NewController(t))
f.validators = &dataValidators{
atx: atxValidatorMock,
}
require.NoError(t, f.Start())
t.Cleanup(f.Stop)

var atxIds []types.ATXID
for i := 0; i < totalRequests; i++ {
id := types.RandomATXID()
atxIds = append(atxIds, id)
atxValidatorMock.EXPECT().HandleMessage(gomock.Any(), id.Hash32(), mesh.Hosts()[1].ID(), gomock.Any())
if withLimiting {
err = f.GetAtxs(context.Background(), atxIds)
} else {
err = f.GetAtxs(context.Background(), atxIds, system.WithoutLimiting())
}
require.NoError(t, err)
})
}

err = f.GetAtxs(context.Background(), atxIds)
require.NoError(t, err)
}

func FuzzCertRequest(f *testing.F) {
Expand Down
8 changes: 2 additions & 6 deletions syncer/atxsync/atxsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,9 @@ import (
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/go-spacemesh/system"
)

//go:generate mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./atxsync.go
type atxFetcher interface {
GetAtxs(context.Context, []types.ATXID) error
}

func getMissing(db *sql.Database, set []types.ATXID) ([]types.ATXID, error) {
missing := []types.ATXID{}
for _, atx := range set {
Expand All @@ -40,7 +36,7 @@ func Download(
retryInterval time.Duration,
logger *zap.Logger,
db *sql.Database,
fetcher atxFetcher,
fetcher system.AtxFetcher,
set []types.ATXID,
) error {
total := len(set)
Expand Down
9 changes: 5 additions & 4 deletions syncer/atxsync/atxsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
"github.com/spacemeshos/go-spacemesh/log/logtest"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/go-spacemesh/syncer/atxsync/mocks"
"github.com/spacemeshos/go-spacemesh/system"
"github.com/spacemeshos/go-spacemesh/system/mocks"
)

func atx(id types.ATXID) *types.VerifiedActivationTx {
Expand Down Expand Up @@ -104,16 +105,16 @@ func TestDownload(t *testing.T) {
logger := logtest.New(t)
db := sql.InMemory()
ctrl := gomock.NewController(t)
fetcher := mocks.NewMockatxFetcher(ctrl)
fetcher := mocks.NewMockAtxFetcher(ctrl)
for _, atx := range tc.existing {
require.NoError(t, atxs.Add(db, atx))
}
for i := range tc.fetched {
req := tc.fetched[i]
fetcher.EXPECT().
GetAtxs(tc.ctx, req.request).
GetAtxs(tc.ctx, req.request, gomock.Any()).
Times(1).
DoAndReturn(func(_ context.Context, _ []types.ATXID) error {
DoAndReturn(func(_ context.Context, _ []types.ATXID, _ ...system.GetAtxOpt) error {
for _, atx := range req.result {
require.NoError(t, atxs.Add(db, atx))
}
Expand Down
Loading

0 comments on commit 489db02

Please sign in to comment.