Skip to content

Commit 540e134

Browse files
authored
feat: introduce deal proposal protocol v1.2.1 (#1185)
1 parent 29dc7d3 commit 540e134

File tree

8 files changed

+371
-14
lines changed

8 files changed

+371
-14
lines changed

storagemarket/deal_execution.go

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"os"
8+
"runtime/debug"
89
"time"
910

1011
"github.com/filecoin-project/boost/storagemarket/types"
@@ -80,7 +81,17 @@ func (p *Provider) runDeal(deal *types.ProviderDealState, dh *dealHandler) {
8081
p.saveDealToDB(dh.Publisher, deal)
8182
}
8283

83-
func (p *Provider) execDeal(deal *smtypes.ProviderDealState, dh *dealHandler) *dealMakingError {
84+
func (p *Provider) execDeal(deal *smtypes.ProviderDealState, dh *dealHandler) (dmerr *dealMakingError) {
85+
// Capture any panic as a manually retryable error
86+
defer func() {
87+
if err := recover(); err != nil {
88+
dmerr = &dealMakingError{
89+
error: fmt.Errorf("Caught panic in deal execution: %s\n%s", err, debug.Stack()),
90+
retry: smtypes.DealRetryManual,
91+
}
92+
}
93+
}()
94+
8495
// If the deal has not yet been handed off to the sealer
8596
if deal.Checkpoint < dealcheckpoints.AddedPiece {
8697
transferType := "downloaded file"
@@ -363,19 +374,29 @@ func (p *Provider) transferAndVerify(dh *dealHandler, pub event.Emitter, deal *s
363374
return p.updateCheckpoint(pub, deal, dealcheckpoints.Transferred)
364375
}
365376

377+
const OneGib = 1024 * 1024 * 1024
378+
366379
func (p *Provider) waitForTransferFinish(ctx context.Context, handler transport.Handler, pub event.Emitter, deal *types.ProviderDealState) error {
367380
defer handler.Close()
368381
defer p.transfers.complete(deal.DealUuid)
369382

370-
// log transfer progress to the deal log every 10%
371-
var lastOutputPct int64
383+
// log transfer progress to the deal log every 10% or every GiB
384+
var lastOutput int64
372385
logTransferProgress := func(received int64) {
373-
pct := (100 * received) / int64(deal.Transfer.Size)
374-
outputPct := pct / 10
375-
if outputPct != lastOutputPct {
376-
lastOutputPct = outputPct
377-
p.dealLogger.Infow(deal.DealUuid, "transfer progress", "bytes received", received,
378-
"deal size", deal.Transfer.Size, "percent complete", pct)
386+
if deal.Transfer.Size > 0 {
387+
pct := (100 * received) / int64(deal.Transfer.Size)
388+
outputPct := pct / 10
389+
if outputPct != lastOutput {
390+
lastOutput = outputPct
391+
p.dealLogger.Infow(deal.DealUuid, "transfer progress", "bytes received", received,
392+
"deal size", deal.Transfer.Size, "percent complete", pct)
393+
}
394+
} else {
395+
gib := received / OneGib
396+
if gib != lastOutput {
397+
lastOutput = gib
398+
p.dealLogger.Infow(deal.DealUuid, "transfer progress", "bytes received", received)
399+
}
379400
}
380401
}
381402

storagemarket/lp2pimpl/net.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ import (
2828
var log = logging.Logger("boost-net")
2929
var propLog = logging.Logger("boost-prop")
3030

31-
const DealProtocolID = "/fil/storage/mk/1.2.0"
31+
const DealProtocolv120ID = "/fil/storage/mk/1.2.0"
32+
const DealProtocolv121ID = "/fil/storage/mk/1.2.1"
3233
const DealStatusV12ProtocolID = "/fil/storage/status/1.2.0"
3334
const providerReadDeadline = 10 * time.Second
3435
const providerWriteDeadline = 10 * time.Second
@@ -57,7 +58,7 @@ func (c *DealClient) SendDealProposal(ctx context.Context, id peer.ID, params ty
5758
log.Debugw("send deal proposal", "id", params.DealUUID, "provider-peer", id)
5859

5960
// Create a libp2p stream to the provider
60-
s, err := c.retryStream.OpenStream(ctx, id, []protocol.ID{DealProtocolID})
61+
s, err := c.retryStream.OpenStream(ctx, id, []protocol.ID{DealProtocolv121ID, DealProtocolv120ID})
6162
if err != nil {
6263
return nil, err
6364
}
@@ -169,12 +170,26 @@ func NewDealProvider(h host.Host, prov *storagemarket.Provider, fullNodeApi v1ap
169170

170171
func (p *DealProvider) Start(ctx context.Context) {
171172
p.ctx = ctx
172-
p.host.SetStreamHandler(DealProtocolID, p.handleNewDealStream)
173+
174+
// Note that the handling for deal protocol v1.2.0 and v1.2.1 is the same.
175+
// Deal protocol v1.2.1 has a couple of new fields: SkipIPNIAnnounce and
176+
// RemoveUnsealedCopy.
177+
// If a client that supports deal protocol v1.2.0 sends a request to a
178+
// boostd server that supports deal protocol v1.2.1, the DealParams struct
179+
// will be missing these new fields.
180+
// When the DealParams struct is unmarshalled the missing fields will be
181+
// set to false, which maintains the previous behaviour:
182+
// - SkipIPNIAnnounce=false: announce deal to IPNI
183+
// - RemoveUnsealedCopy=false: keep unsealed copy of deal data
184+
p.host.SetStreamHandler(DealProtocolv121ID, p.handleNewDealStream)
185+
p.host.SetStreamHandler(DealProtocolv120ID, p.handleNewDealStream)
186+
173187
p.host.SetStreamHandler(DealStatusV12ProtocolID, p.handleNewDealStatusStream)
174188
}
175189

176190
func (p *DealProvider) Stop() {
177-
p.host.RemoveStreamHandler(DealProtocolID)
191+
p.host.RemoveStreamHandler(DealProtocolv121ID)
192+
p.host.RemoveStreamHandler(DealProtocolv120ID)
178193
p.host.RemoveStreamHandler(DealStatusV12ProtocolID)
179194
}
180195

storagemarket/lp2pimpl/net_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package lp2pimpl
2+
3+
import (
4+
"bytes"
5+
"github.com/filecoin-project/boost/storagemarket/types"
6+
"github.com/filecoin-project/boost/testutil"
7+
"github.com/filecoin-project/go-address"
8+
"github.com/filecoin-project/go-state-types/abi"
9+
"github.com/filecoin-project/go-state-types/builtin/v9/market"
10+
"github.com/filecoin-project/go-state-types/crypto"
11+
"github.com/google/uuid"
12+
"github.com/stretchr/testify/require"
13+
"testing"
14+
)
15+
16+
// TestDealParamsMissingFields verifies that when the client sends a v1.2.0
17+
// DealParams struct, the server unmarshalls it into a v1.2.1 DealParams struct
18+
// with all missing boolean fields set to false.
19+
func TestDealParamsMissingFields(t *testing.T) {
20+
label, err := market.NewLabelFromString("label")
21+
require.NoError(t, err)
22+
dpv120 := types.DealParamsV120{
23+
DealUUID: uuid.New(),
24+
IsOffline: false,
25+
ClientDealProposal: market.ClientDealProposal{
26+
Proposal: market.DealProposal{
27+
PieceCID: testutil.GenerateCid(),
28+
Client: address.TestAddress,
29+
Provider: address.TestAddress2,
30+
Label: label,
31+
StoragePricePerEpoch: abi.NewTokenAmount(1),
32+
ProviderCollateral: abi.NewTokenAmount(2),
33+
ClientCollateral: abi.NewTokenAmount(3),
34+
},
35+
ClientSignature: crypto.Signature{
36+
Type: crypto.SigTypeSecp256k1,
37+
Data: []byte("sig"),
38+
},
39+
},
40+
DealDataRoot: testutil.GenerateCid(),
41+
Transfer: types.Transfer{
42+
Type: "http",
43+
ClientID: "1234",
44+
Params: []byte("params"),
45+
Size: 5678,
46+
},
47+
}
48+
49+
var buff bytes.Buffer
50+
err = dpv120.MarshalCBOR(&buff)
51+
require.NoError(t, err)
52+
53+
var dpv121 types.DealParams
54+
err = dpv121.UnmarshalCBOR(&buff)
55+
require.NoError(t, err)
56+
57+
// Expect all fields present in both v1.2.0 and v1.2.1 to match
58+
require.Equal(t, dpv120.DealUUID, dpv121.DealUUID)
59+
require.Equal(t, dpv120.IsOffline, dpv121.IsOffline)
60+
require.Equal(t, dpv120.ClientDealProposal, dpv121.ClientDealProposal)
61+
require.Equal(t, dpv120.DealDataRoot, dpv121.DealDataRoot)
62+
require.Equal(t, dpv120.Transfer, dpv121.Transfer)
63+
64+
// Expect all boolean fields not present in v1.2.0 to be false
65+
// in v1.2.1
66+
require.False(t, dpv121.SkipIPNIAnnounce)
67+
require.False(t, dpv121.RemoveUnsealedCopy)
68+
}

storagemarket/provider_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,16 @@ func TestOfflineDealRestartAfterManualRecoverableErrors(t *testing.T) {
567567
onResume: func(builder *testDealBuilder) *testDeal {
568568
return builder.withCommpNonBlocking().withPublishNonBlocking().withPublishConfirmNonBlocking().withAddPieceNonBlocking().withAnnounceNonBlocking().build()
569569
},
570+
}, {
571+
name: "deal exec panic",
572+
dbuilder: func(h *ProviderHarness) *testDeal {
573+
// Simulate panic
574+
return h.newDealBuilder(t, 1, withOfflineDeal()).withCommpFailing(errors.New("panic: commp panic")).build()
575+
},
576+
expectedErr: "Caught panic in deal execution: commp panic",
577+
onResume: func(builder *testDealBuilder) *testDeal {
578+
return builder.withCommpNonBlocking().withPublishNonBlocking().withPublishConfirmNonBlocking().withAddPieceNonBlocking().withAnnounceNonBlocking().build()
579+
},
570580
}}
571581

572582
for _, tc := range tcs {

storagemarket/smtestutil/mocks.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"io"
66
"io/ioutil"
7+
"strings"
78
"sync"
89

910
"github.com/filecoin-project/boost/storagemarket/types"
@@ -167,6 +168,9 @@ func (mb *MinerStubBuilder) SetupCommp(blocking bool) *MinerStubBuilder {
167168

168169
func (mb *MinerStubBuilder) SetupCommpFailure(err error) {
169170
mb.stub.MockCommpCalculator.EXPECT().ComputeDataCid(gomock.Any(), gomock.Eq(mb.dp.ClientDealProposal.Proposal.PieceSize.Unpadded()), gomock.Any()).DoAndReturn(func(_ context.Context, _ abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
171+
if strings.HasPrefix(err.Error(), "panic: ") {
172+
panic(err.Error()[len("panic: "):])
173+
}
170174
return abi.PieceInfo{}, err
171175
})
172176
}

storagemarket/types/legacy.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package types
2+
3+
import (
4+
"github.com/filecoin-project/go-state-types/builtin/v9/market"
5+
"github.com/google/uuid"
6+
"github.com/ipfs/go-cid"
7+
)
8+
9+
type DealParamsV120 struct {
10+
DealUUID uuid.UUID
11+
IsOffline bool
12+
ClientDealProposal market.ClientDealProposal
13+
DealDataRoot cid.Cid
14+
Transfer Transfer
15+
}

storagemarket/types/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
"github.com/ipfs/go-cid"
2323
)
2424

25-
//go:generate cbor-gen-for --map-encoding StorageAsk DealParams Transfer DealResponse DealStatusRequest DealStatusResponse DealStatus
25+
//go:generate cbor-gen-for --map-encoding StorageAsk DealParamsV120 DealParams Transfer DealResponse DealStatusRequest DealStatusResponse DealStatus
2626
//go:generate go run github.com/golang/mock/mockgen -destination=mock_types/mocks.go -package=mock_types . PieceAdder,CommpCalculator,DealPublisher,ChainDealManager,IndexProvider
2727

2828
// StorageAsk defines the parameters by which a miner will choose to accept or

0 commit comments

Comments
 (0)