Skip to content

Commit

Permalink
fix: lotus-miner: remove provecommit1 method (#12251)
Browse files Browse the repository at this point in the history
* remove provecommit1

* add changelog

* update precommit and commit params

* fix lint error

* fix commit params
  • Loading branch information
LexLuthr authored and jennijuju committed Jul 25, 2024
1 parent 1f8517d commit 8aaddfe
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 498 deletions.
338 changes: 20 additions & 318 deletions storage/pipeline/commit_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ import (
"sync"
"time"

"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-state-types/abi"
actorstypes "github.com/filecoin-project/go-state-types/actors"
"github.com/filecoin-project/go-state-types/big"
Expand Down Expand Up @@ -215,7 +213,7 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
return nil, nil
}

var res, resV1 []sealiface.CommitBatchRes
var res []sealiface.CommitBatchRes

ts, err := b.api.ChainHead(b.mctx)
if err != nil {
Expand Down Expand Up @@ -243,67 +241,22 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
}
}

if nv >= MinDDONetworkVersion {
// After nv21, we have a new ProveCommitSectors2 method, which supports
// batching without aggregation, but it doesn't support onboarding
// sectors which were precommitted with DealIDs in the precommit message.
// We prefer it for all other sectors, so first we use the new processBatchV2

var sectors []abi.SectorNumber
for sn := range b.todo {
sectors = append(sectors, sn)
}
res, err = b.processBatchV2(cfg, sectors, nv, !individual)
if err != nil {
err = xerrors.Errorf("processBatchV2: %w", err)
}

// Mark sectors as done
for _, r := range res {
if err != nil {
r.Error = err.Error()
}

for _, sn := range r.Sectors {
for _, ch := range b.waiting[sn] {
ch <- r // buffered
}

delete(b.waiting, sn)
delete(b.todo, sn)
delete(b.cutoffs, sn)
}
}
}

if err != nil {
log.Warnf("CommitBatcher maybeStartBatch processBatch-ddo %v", err)
}
// After nv21, we have a new ProveCommitSectors2 method, which supports
// batching without aggregation, but it doesn't support onboarding
// sectors which were precommitted with DealIDs in the precommit message.
// We prefer it for all other sectors, so first we use the new processBatchV2

if err != nil && len(res) == 0 {
return nil, err
}

if individual {
resV1, err = b.processIndividually(cfg)
} else {
var sectors []abi.SectorNumber
for sn := range b.todo {
sectors = append(sectors, sn)
}
resV1, err = b.processBatchV1(cfg, sectors, nv)
var sectors []abi.SectorNumber
for sn := range b.todo {
sectors = append(sectors, sn)
}

res, err = b.processBatchV2(cfg, sectors, nv, !individual)
if err != nil {
log.Warnf("CommitBatcher maybeStartBatch individual:%v processBatch %v", individual, err)
err = xerrors.Errorf("processBatchV2: %w", err)
}

if err != nil && len(resV1) == 0 {
return nil, err
}

// Mark the rest as processed
for _, r := range resV1 {
// Mark sectors as done
for _, r := range res {
if err != nil {
r.Error = err.Error()
}
Expand All @@ -319,7 +272,13 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
}
}

res = append(res, resV1...)
if err != nil {
log.Warnf("CommitBatcher maybeStartBatch processBatch-ddo %v", err)
}

if err != nil && len(res) == 0 {
return nil, err
}

return res, nil
}
Expand Down Expand Up @@ -357,7 +316,6 @@ func (b *CommitBatcher) processBatchV2(cfg sealiface.Config, sectors []abi.Secto

for _, sector := range sectors {
if b.todo[sector].DealIDPrecommit {
// can't process sectors precommitted with deal IDs with ProveCommitSectors2
continue
}

Expand Down Expand Up @@ -499,263 +457,7 @@ func (b *CommitBatcher) processBatchV2(cfg sealiface.Config, sectors []abi.Secto
return []sealiface.CommitBatchRes{res}, nil
}

// processBatchV1 processes a batch of sectors before nv22. It always sends out an aggregate message.
func (b *CommitBatcher) processBatchV1(cfg sealiface.Config, sectors []abi.SectorNumber, nv network.Version) ([]sealiface.CommitBatchRes, error) {
ts, err := b.api.ChainHead(b.mctx)
if err != nil {
return nil, err
}

total := len(sectors)

res := sealiface.CommitBatchRes{
FailedSectors: map[abi.SectorNumber]string{},
}

params := miner.ProveCommitAggregateParams{
SectorNumbers: bitfield.New(),
}

proofs := make([][]byte, 0, total)
infos := make([]proof.AggregateSealVerifyInfo, 0, total)
collateral := big.Zero()

for _, sector := range sectors {
res.Sectors = append(res.Sectors, sector)

sc, err := b.getSectorCollateral(sector, ts.Key())
if err != nil {
res.FailedSectors[sector] = err.Error()
continue
}

collateral = big.Add(collateral, sc)

params.SectorNumbers.Set(uint64(sector))
infos = append(infos, b.todo[sector].Info)
}

if len(infos) == 0 {
return nil, nil
}

sort.Slice(infos, func(i, j int) bool {
return infos[i].Number < infos[j].Number
})

for _, info := range infos {
proofs = append(proofs, b.todo[info.Number].Proof)
}

mid, err := address.IDFromAddress(b.maddr)
if err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting miner id: %w", err)
}

arp, err := b.aggregateProofType(nv)
if err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting aggregate proof type: %w", err)
}

params.AggregateProof, err = b.prover.AggregateSealProofs(proof.AggregateSealVerifyProofAndInfos{
Miner: abi.ActorID(mid),
SealProof: b.todo[infos[0].Number].Spt,
AggregateProof: arp,
Infos: infos,
}, proofs)
if err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("aggregating proofs: %w", err)
}

enc := new(bytes.Buffer)
if err := params.MarshalCBOR(enc); err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't serialize ProveCommitAggregateParams: %w", err)
}

mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, types.EmptyTSK)
if err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err)
}

maxFee := b.feeCfg.MaxCommitBatchGasFee.FeeForSectors(len(infos))

aggFeeRaw, err := policy.AggregateProveCommitNetworkFee(nv, len(infos), ts.MinTicketBlock().ParentBaseFee)
if err != nil {
res.Error = err.Error()
log.Errorf("getting aggregate commit network fee: %s", err)
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting aggregate commit network fee: %s", err)
}

aggFee := big.Div(big.Mul(aggFeeRaw, aggFeeNum), aggFeeDen)

needFunds := big.Add(collateral, aggFee)
needFunds, err = collateralSendAmount(b.mctx, b.api, b.maddr, cfg, needFunds)
if err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, err
}

goodFunds := big.Add(maxFee, needFunds)

from, _, err := b.addrSel.AddressFor(b.mctx, b.api, mi, api.CommitAddr, goodFunds, needFunds)
if err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err)
}

_, err = simulateMsgGas(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitAggregate, needFunds, maxFee, enc.Bytes())

if err != nil && (!api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) || len(sectors) < miner.MinAggregatedSectors*2) {
log.Errorf("simulating CommitBatch %s", err)
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("simulating CommitBatch %w", err)
}

// If we're out of gas, split the batch in half and evaluate again
if api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) {
log.Warnf("CommitAggregate message ran out of gas, splitting batch in half and trying again (sectors: %d)", len(sectors))
mid := len(sectors) / 2
ret0, _ := b.processBatchV1(cfg, sectors[:mid], nv)
ret1, _ := b.processBatchV1(cfg, sectors[mid:], nv)

return append(ret0, ret1...), nil
}

mcid, err := sendMsg(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitAggregate, needFunds, maxFee, enc.Bytes())
if err != nil {
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err)
}

res.Msg = &mcid

log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "todo", total, "sectors", len(infos))

return []sealiface.CommitBatchRes{res}, nil
}

func (b *CommitBatcher) processIndividually(cfg sealiface.Config) ([]sealiface.CommitBatchRes, error) {

mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("couldn't get miner info: %w", err)
}

avail := types.TotalFilecoinInt

if cfg.CollateralFromMinerBalance && !cfg.DisableCollateralFallback {
avail, err = b.api.StateMinerAvailableBalance(b.mctx, b.maddr, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("getting available miner balance: %w", err)
}

avail = big.Sub(avail, cfg.AvailableBalanceBuffer)
if avail.LessThan(big.Zero()) {
avail = big.Zero()
}
}

ts, err := b.api.ChainHead(b.mctx)
if err != nil {
return nil, err
}

var res []sealiface.CommitBatchRes

sectorsProcessed := 0

for sn, info := range b.todo {
r := sealiface.CommitBatchRes{
Sectors: []abi.SectorNumber{sn},
FailedSectors: map[abi.SectorNumber]string{},
}

if cfg.MaxSectorProveCommitsSubmittedPerEpoch > 0 &&
uint64(sectorsProcessed) >= cfg.MaxSectorProveCommitsSubmittedPerEpoch {

tmp := ts
for tmp.Height() <= ts.Height() {
tmp, err = b.api.ChainHead(b.mctx)
if err != nil {
log.Errorf("getting chain head: %+v", err)
return nil, err
}
time.Sleep(3 * time.Second)
}

sectorsProcessed = 0
ts = tmp
}

mcid, err := b.processSingle(cfg, mi, &avail, sn, info, ts.Key())
if err != nil {
log.Errorf("process single error: %+v", err) // todo: return to user
r.FailedSectors[sn] = err.Error()
} else {
r.Msg = &mcid
}

res = append(res, r)

sectorsProcessed++
}

return res, nil
}

func (b *CommitBatcher) processSingle(cfg sealiface.Config, mi api.MinerInfo, avail *abi.TokenAmount, sn abi.SectorNumber, info AggregateInput, tsk types.TipSetKey) (cid.Cid, error) {
return b.processSingleV1(cfg, mi, avail, sn, info, tsk)
}

func (b *CommitBatcher) processSingleV1(cfg sealiface.Config, mi api.MinerInfo, avail *abi.TokenAmount, sn abi.SectorNumber, info AggregateInput, tsk types.TipSetKey) (cid.Cid, error) {
enc := new(bytes.Buffer)
params := &miner.ProveCommitSectorParams{
SectorNumber: sn,
Proof: info.Proof,
}

if err := params.MarshalCBOR(enc); err != nil {
return cid.Undef, xerrors.Errorf("marshaling commit params: %w", err)
}

collateral, err := b.getSectorCollateral(sn, tsk)
if err != nil {
return cid.Undef, err
}

if cfg.CollateralFromMinerBalance {
c := big.Sub(collateral, *avail)
*avail = big.Sub(*avail, collateral)
collateral = c

if collateral.LessThan(big.Zero()) {
collateral = big.Zero()
}
if (*avail).LessThan(big.Zero()) {
*avail = big.Zero()
}
}

goodFunds := big.Add(collateral, big.Int(b.feeCfg.MaxCommitGasFee))

from, _, err := b.addrSel.AddressFor(b.mctx, b.api, mi, api.CommitAddr, goodFunds, collateral)
if err != nil {
return cid.Undef, xerrors.Errorf("no good address to send commit message from: %w", err)
}

mcid, err := sendMsg(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitSector, collateral, big.Int(b.feeCfg.MaxCommitGasFee), enc.Bytes())
if err != nil {
return cid.Undef, xerrors.Errorf("pushing message to mpool: %w", err)
}

return mcid, nil
}

// register commit, wait for batch message, return message CID
// AddCommit registers commit, wait for batch message, return message CID
func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in AggregateInput) (res sealiface.CommitBatchRes, err error) {
sn := s.SectorNumber

Expand Down
Loading

0 comments on commit 8aaddfe

Please sign in to comment.