From ab09ccacb6c33d4d167a191b18416843f0a712ca Mon Sep 17 00:00:00 2001 From: Frrist Date: Tue, 30 Aug 2022 13:28:14 -0700 Subject: [PATCH] Replace ExecutedAndBlockMessages with individual methods (#1040) * refactor: remove block messages from TipSetMessages and refactor processors * chore: TipSetMessageReceipts implemented and wired up * chore: update miner post extractor * polish: TipSetMessageReceipts iterator * refactor: update msapproaval extractor * refactor: update parsed messages task * refactor: memoize TipSetMessageReceipts * refactor: update gas economy * refactor: update gas outs task * refactor: remove ExecutedAndBlockMessages method * address review feedback --- chain/datasource/datasource.go | 152 ++++++------ chain/indexer/integrated/processor/state.go | 11 +- .../processor/state_internal_test.go | 16 +- .../integrated/processor/state_test.go | 16 +- go.mod | 2 +- lens/interface.go | 87 +++++-- lens/lily/impl.go | 207 ++++++++++++++-- lens/lily/struct.go | 8 +- lens/util/repo.go | 220 +----------------- tasks/actorstate/actorstate.go | 2 +- tasks/actorstate/miner/post.go | 29 ++- tasks/api.go | 9 +- tasks/messages/blockmessage/task.go | 9 +- tasks/messages/gaseconomy/task.go | 66 +++--- tasks/messages/gasoutput/task.go | 137 +++++++---- tasks/messages/message/task.go | 100 +++----- tasks/messages/parsedmessage/task.go | 139 ++++++----- tasks/messages/receipt/task.go | 49 ++-- tasks/msapprovals/msapprovals.go | 213 ++++++++++------- tasks/test/api.go | 14 +- testutil/lens.go | 33 ++- 21 files changed, 831 insertions(+), 688 deletions(-) diff --git a/chain/datasource/datasource.go b/chain/datasource/datasource.go index d6df8bc54..cb2a27a46 100644 --- a/chain/datasource/datasource.go +++ b/chain/datasource/datasource.go @@ -12,9 +12,16 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-hamt-ipld/v3" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/state" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/vm" + states0 "github.com/filecoin-project/specs-actors/actors/states" + states2 "github.com/filecoin-project/specs-actors/v2/actors/states" + states3 "github.com/filecoin-project/specs-actors/v3/actors/states" + states4 "github.com/filecoin-project/specs-actors/v4/actors/states" + states5 "github.com/filecoin-project/specs-actors/v5/actors/states" lru "github.com/hashicorp/golang-lru" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" @@ -22,12 +29,6 @@ import ( "go.opentelemetry.io/otel/attribute" "golang.org/x/sync/singleflight" - states0 "github.com/filecoin-project/specs-actors/actors/states" - states2 "github.com/filecoin-project/specs-actors/v2/actors/states" - states3 "github.com/filecoin-project/specs-actors/v3/actors/states" - states4 "github.com/filecoin-project/specs-actors/v4/actors/states" - states5 "github.com/filecoin-project/specs-actors/v5/actors/states" - "github.com/filecoin-project/lily/chain/actors/adt" "github.com/filecoin-project/lily/chain/actors/adt/diff" "github.com/filecoin-project/lily/chain/actors/builtin/miner" @@ -37,28 +38,28 @@ import ( ) var ( - executedBlkMsgCacheSize int - executedTsCacheSize int - diffPreCommitCacheSize int - diffSectorCacheSize int - - executedBlkMsgCacheSizeEnv = "LILY_EXECUTED_BLK_MSG_CACHE_SIZE" - executedTsCacheSizeEnv = "LILY_EXECUTED_TS_CACHE_SIZE" - diffPreCommitCacheSizeEnv = "LILY_DIFF_PRECOMMIT_CACHE_SIZE" - diffSectorCacheSizeEnv = "LILY_DIFF_SECTORS_CACHE_SIZE" + tipsetMessageReceiptCacheSize int + executedTsCacheSize int + diffPreCommitCacheSize int + diffSectorCacheSize int + + tipsetMessageReceiptSizeEnv = "LILY_TIPSET_MSG_RECEIPT_CACHE_SIZE" + executedTsCacheSizeEnv = "LILY_EXECUTED_TS_CACHE_SIZE" + diffPreCommitCacheSizeEnv = "LILY_DIFF_PRECOMMIT_CACHE_SIZE" + diffSectorCacheSizeEnv = "LILY_DIFF_SECTORS_CACHE_SIZE" ) func init() { - executedBlkMsgCacheSize = 4 + tipsetMessageReceiptCacheSize = 4 executedTsCacheSize = 4 diffPreCommitCacheSize = 500 diffSectorCacheSize = 500 - if s := os.Getenv(executedBlkMsgCacheSizeEnv); s != "" { + if s := os.Getenv(tipsetMessageReceiptSizeEnv); s != "" { v, err := strconv.ParseInt(s, 10, 64) if err == nil { - executedBlkMsgCacheSize = int(v) + tipsetMessageReceiptCacheSize = int(v) } else { - log.Warnf("invalid value (%s) for %s defaulting to %d: %s", s, executedBlkMsgCacheSizeEnv, executedBlkMsgCacheSize, err) + log.Warnf("invalid value (%s) for %s defaulting to %d: %s", s, tipsetMessageReceiptSizeEnv, tipsetMessageReceiptCacheSize, err) } } if s := os.Getenv(executedTsCacheSizeEnv); s != "" { @@ -92,28 +93,12 @@ var _ tasks.DataSource = (*DataSource)(nil) var log = logging.Logger("lily/datasource") -type DataSource struct { - node lens.API - - executedBlkMsgCache *lru.Cache - executedBlkMsgGroup singleflight.Group - - executedTsCache *lru.Cache - executedTsGroup singleflight.Group - - diffSectorsCache *lru.Cache - diffSectorsGroup singleflight.Group - - diffPreCommitCache *lru.Cache - diffPreCommitGroup singleflight.Group -} - func NewDataSource(node lens.API) (*DataSource, error) { t := &DataSource{ node: node, } var err error - t.executedBlkMsgCache, err = lru.New(executedBlkMsgCacheSize) + t.tsBlkMsgRecCache, err = lru.New(tipsetMessageReceiptCacheSize) if err != nil { return nil, err } @@ -137,6 +122,56 @@ func NewDataSource(node lens.API) (*DataSource, error) { return t, nil } +type DataSource struct { + node lens.API + + executedTsCache *lru.Cache + executedTsGroup singleflight.Group + + tsBlkMsgRecCache *lru.Cache + tsBlkMsgRecGroup singleflight.Group + + diffSectorsCache *lru.Cache + diffSectorsGroup singleflight.Group + + diffPreCommitCache *lru.Cache + diffPreCommitGroup singleflight.Group +} + +func (t *DataSource) ComputeBaseFee(ctx context.Context, ts *types.TipSet) (abi.TokenAmount, error) { + return t.node.ComputeBaseFee(ctx, ts) +} + +func (t *DataSource) TipSetBlockMessages(ctx context.Context, ts *types.TipSet) ([]*lens.BlockMessages, error) { + return t.node.MessagesForTipSetBlocks(ctx, ts) +} + +// TipSetMessageReceipts returns the blocks and messages in `pts` and their corresponding receipts from `ts` matching block order in tipset (`pts`). +// TODO replace with lotus chainstore method when https://github.com/filecoin-project/lotus/pull/9186 lands +func (t *DataSource) TipSetMessageReceipts(ctx context.Context, ts, pts *types.TipSet) ([]*lens.BlockMessageReceipts, error) { + key, err := asKey(ts, pts) + if err != nil { + return nil, err + } + value, found := t.tsBlkMsgRecCache.Get(key) + if found { + return value.([]*lens.BlockMessageReceipts), nil + } + + value, err, _ = t.tsBlkMsgRecGroup.Do(key, func() (interface{}, error) { + data, innerErr := t.node.TipSetMessageReceipts(ctx, ts, pts) + if innerErr == nil { + t.tsBlkMsgRecCache.Add(key, data) + } + return data, innerErr + }) + if err != nil { + return nil, err + } + + return value.([]*lens.BlockMessageReceipts), nil +} + func (t *DataSource) TipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) { ctx, span := otel.Tracer("").Start(ctx, "DataSource.TipSet") if span.IsRecording() { @@ -239,45 +274,20 @@ func (t *DataSource) MessageExecutions(ctx context.Context, ts, pts *types.TipSe return value.([]*lens.MessageExecution), nil } -func (t *DataSource) ExecutedAndBlockMessages(ctx context.Context, ts, pts *types.TipSet) (*lens.TipSetMessages, error) { - metrics.RecordInc(ctx, metrics.DataSourceExecutedAndBlockMessagesRead) - ctx, span := otel.Tracer("").Start(ctx, "DataSource.ExecutedAndBlockMessages") - if span.IsRecording() { - span.SetAttributes(attribute.String("tipset", ts.Key().String())) - span.SetAttributes(attribute.String("parent", pts.Key().String())) - } - defer span.End() - - key, err := asKey(ts, pts) - if err != nil { - return nil, err - } - value, found := t.executedBlkMsgCache.Get(key) - if found { - metrics.RecordInc(ctx, metrics.DataSourceExecutedAndBlockMessagesCacheHit) - return value.(*lens.TipSetMessages), nil - } - - value, err, shared := t.executedBlkMsgGroup.Do(key, func() (interface{}, error) { - data, innerErr := t.node.GetExecutedAndBlockMessagesForTipset(ctx, ts, pts) - if innerErr == nil { - t.executedBlkMsgCache.Add(key, data) - } +func (t *DataSource) MinerLoad(store adt.Store, act *types.Actor) (miner.State, error) { + return miner.Load(store, act) +} - return data, innerErr - }) +func (t *DataSource) ShouldBurnFn(ctx context.Context, ts *types.TipSet) (lens.ShouldBurnFn, error) { + return t.node.BurnFundsFn(ctx, ts) +} - if span.IsRecording() { - span.SetAttributes(attribute.Bool("shared", shared)) - } +func ComputeGasOutputs(ctx context.Context, block *types.BlockHeader, message *types.Message, receipt *types.MessageReceipt, shouldBurnFn lens.ShouldBurnFn) (vm.GasOutputs, error) { + burn, err := shouldBurnFn(ctx, message, receipt.ExitCode) if err != nil { - return nil, err + return vm.GasOutputs{}, err } - return value.(*lens.TipSetMessages), nil -} - -func (t *DataSource) MinerLoad(store adt.Store, act *types.Actor) (miner.State, error) { - return miner.Load(store, act) + return vm.ComputeGasOutputs(receipt.GasUsed, message.GasLimit, block.ParentBaseFee, message.GasFeeCap, message.GasPremium, burn), nil } func GetActorStateChanges(ctx context.Context, store adt.Store, current, executed *types.TipSet) (tasks.ActorStateChangeDiff, error) { diff --git a/chain/indexer/integrated/processor/state.go b/chain/indexer/integrated/processor/state.go index 16e84b861..7af462b57 100644 --- a/chain/indexer/integrated/processor/state.go +++ b/chain/indexer/integrated/processor/state.go @@ -541,11 +541,14 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces // Messages // case tasktype.Message: - out.TipsetsProcessors[t] = messagetask.NewTask(api) + out.TipsetProcessors[t] = messagetask.NewTask(api) + case tasktype.BlockMessage: + out.TipsetProcessors[t] = bmtask.NewTask(api) + case tasktype.MessageGasEconomy: + out.TipsetProcessors[t] = gasecontask.NewTask(api) + case tasktype.GasOutputs: out.TipsetsProcessors[t] = gasouttask.NewTask(api) - case tasktype.BlockMessage: - out.TipsetsProcessors[t] = bmtask.NewTask(api) case tasktype.ParsedMessage: out.TipsetsProcessors[t] = parentmessagetask.NewTask(api) case tasktype.Receipt: @@ -554,8 +557,6 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces out.TipsetsProcessors[t] = imtask.NewTask(api) case tasktype.InternalParsedMessage: out.TipsetsProcessors[t] = ipmtask.NewTask(api) - case tasktype.MessageGasEconomy: - out.TipsetsProcessors[t] = gasecontask.NewTask(api) case tasktype.MultisigApproval: out.TipsetsProcessors[t] = msapprovaltask.NewTask(api) case tasktype.VmMessage: diff --git a/chain/indexer/integrated/processor/state_internal_test.go b/chain/indexer/integrated/processor/state_internal_test.go index 5ea49712e..9ece6d09f 100644 --- a/chain/indexer/integrated/processor/state_internal_test.go +++ b/chain/indexer/integrated/processor/state_internal_test.go @@ -16,6 +16,9 @@ import ( "github.com/filecoin-project/lily/chain/actors/builtin/verifreg" "github.com/filecoin-project/lily/chain/indexer/tasktype" "github.com/filecoin-project/lily/tasks/messageexecutions/vm" + "github.com/filecoin-project/lily/tasks/messages/blockmessage" + "github.com/filecoin-project/lily/tasks/messages/gaseconomy" + "github.com/filecoin-project/lily/tasks/messages/message" "github.com/filecoin-project/lily/tasks/actorstate" inittask "github.com/filecoin-project/lily/tasks/actorstate/init_" @@ -34,10 +37,7 @@ import ( "github.com/filecoin-project/lily/tasks/consensus" "github.com/filecoin-project/lily/tasks/messageexecutions/internalmessage" "github.com/filecoin-project/lily/tasks/messageexecutions/internalparsedmessage" - "github.com/filecoin-project/lily/tasks/messages/blockmessage" - "github.com/filecoin-project/lily/tasks/messages/gaseconomy" "github.com/filecoin-project/lily/tasks/messages/gasoutput" - "github.com/filecoin-project/lily/tasks/messages/message" "github.com/filecoin-project/lily/tasks/messages/parsedmessage" "github.com/filecoin-project/lily/tasks/messages/receipt" "github.com/filecoin-project/lily/tasks/msapprovals" @@ -48,26 +48,26 @@ func TestNewProcessor(t *testing.T) { require.NoError(t, err) require.Equal(t, t.Name(), proc.name) require.Len(t, proc.actorProcessors, 21) - require.Len(t, proc.tipsetProcessors, 5) - require.Len(t, proc.tipsetsProcessors, 10) + require.Len(t, proc.tipsetProcessors, 8) + require.Len(t, proc.tipsetsProcessors, 7) require.Len(t, proc.builtinProcessors, 1) - require.Equal(t, message.NewTask(nil), proc.tipsetsProcessors[tasktype.Message]) require.Equal(t, gasoutput.NewTask(nil), proc.tipsetsProcessors[tasktype.GasOutputs]) - require.Equal(t, blockmessage.NewTask(nil), proc.tipsetsProcessors[tasktype.BlockMessage]) require.Equal(t, parsedmessage.NewTask(nil), proc.tipsetsProcessors[tasktype.ParsedMessage]) require.Equal(t, receipt.NewTask(nil), proc.tipsetsProcessors[tasktype.Receipt]) require.Equal(t, internalmessage.NewTask(nil), proc.tipsetsProcessors[tasktype.InternalMessage]) require.Equal(t, internalparsedmessage.NewTask(nil), proc.tipsetsProcessors[tasktype.InternalParsedMessage]) - require.Equal(t, gaseconomy.NewTask(nil), proc.tipsetsProcessors[tasktype.MessageGasEconomy]) require.Equal(t, msapprovals.NewTask(nil), proc.tipsetsProcessors[tasktype.MultisigApproval]) require.Equal(t, vm.NewTask(nil), proc.tipsetsProcessors[tasktype.VmMessage]) + require.Equal(t, message.NewTask(nil), proc.tipsetProcessors[tasktype.Message]) + require.Equal(t, blockmessage.NewTask(nil), proc.tipsetProcessors[tasktype.BlockMessage]) require.Equal(t, headers.NewTask(), proc.tipsetProcessors[tasktype.BlockHeader]) require.Equal(t, parents.NewTask(), proc.tipsetProcessors[tasktype.BlockParent]) require.Equal(t, drand.NewTask(), proc.tipsetProcessors[tasktype.DrandBlockEntrie]) require.Equal(t, chaineconomics.NewTask(nil), proc.tipsetProcessors[tasktype.ChainEconomics]) require.Equal(t, consensus.NewTask(nil), proc.tipsetProcessors[tasktype.ChainConsensus]) + require.Equal(t, gaseconomy.NewTask(nil), proc.tipsetProcessors[tasktype.MessageGasEconomy]) require.Equal(t, actorstate.NewTask(nil, actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.DeadlineInfoExtractor{})), proc.actorProcessors[tasktype.MinerCurrentDeadlineInfo]) require.Equal(t, actorstate.NewTask(nil, actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.FeeDebtExtractor{})), proc.actorProcessors[tasktype.MinerFeeDebt]) diff --git a/chain/indexer/integrated/processor/state_test.go b/chain/indexer/integrated/processor/state_test.go index 38b6970dc..08eeed64f 100644 --- a/chain/indexer/integrated/processor/state_test.go +++ b/chain/indexer/integrated/processor/state_test.go @@ -269,47 +269,47 @@ func TestMakeProcessorsActors(t *testing.T) { func TestMakeProcessorsTipSet(t *testing.T) { tasks := []string{ + tasktype.Message, + tasktype.BlockMessage, tasktype.BlockHeader, tasktype.BlockParent, tasktype.DrandBlockEntrie, tasktype.ChainEconomics, tasktype.ChainConsensus, + tasktype.MessageGasEconomy, } proc, err := processor.MakeProcessors(nil, tasks) require.NoError(t, err) require.Len(t, proc.TipsetProcessors, len(tasks)) + require.Equal(t, message.NewTask(nil), proc.TipsetProcessors[tasktype.Message]) + require.Equal(t, blockmessage.NewTask(nil), proc.TipsetProcessors[tasktype.BlockMessage]) require.Equal(t, headers.NewTask(), proc.TipsetProcessors[tasktype.BlockHeader]) require.Equal(t, parents.NewTask(), proc.TipsetProcessors[tasktype.BlockParent]) require.Equal(t, drand.NewTask(), proc.TipsetProcessors[tasktype.DrandBlockEntrie]) require.Equal(t, chaineconomics.NewTask(nil), proc.TipsetProcessors[tasktype.ChainEconomics]) require.Equal(t, consensus.NewTask(nil), proc.TipsetProcessors[tasktype.ChainConsensus]) + require.Equal(t, gaseconomy.NewTask(nil), proc.TipsetProcessors[tasktype.MessageGasEconomy]) } func TestMakeProcessorsTipSets(t *testing.T) { tasks := []string{ - tasktype.Message, tasktype.GasOutputs, - tasktype.BlockMessage, tasktype.ParsedMessage, tasktype.Receipt, tasktype.InternalMessage, tasktype.InternalParsedMessage, - tasktype.MessageGasEconomy, tasktype.MultisigApproval, } proc, err := processor.MakeProcessors(nil, tasks) require.NoError(t, err) require.Len(t, proc.TipsetsProcessors, len(tasks)) - require.Equal(t, message.NewTask(nil), proc.TipsetsProcessors[tasktype.Message]) require.Equal(t, gasoutput.NewTask(nil), proc.TipsetsProcessors[tasktype.GasOutputs]) - require.Equal(t, blockmessage.NewTask(nil), proc.TipsetsProcessors[tasktype.BlockMessage]) require.Equal(t, parsedmessage.NewTask(nil), proc.TipsetsProcessors[tasktype.ParsedMessage]) require.Equal(t, receipt.NewTask(nil), proc.TipsetsProcessors[tasktype.Receipt]) require.Equal(t, internalmessage.NewTask(nil), proc.TipsetsProcessors[tasktype.InternalMessage]) require.Equal(t, internalparsedmessage.NewTask(nil), proc.TipsetsProcessors[tasktype.InternalParsedMessage]) - require.Equal(t, gaseconomy.NewTask(nil), proc.TipsetsProcessors[tasktype.MessageGasEconomy]) require.Equal(t, msapprovals.NewTask(nil), proc.TipsetsProcessors[tasktype.MultisigApproval]) } @@ -346,7 +346,7 @@ func TestMakeProcessorsAllTasks(t *testing.T) { proc, err := processor.MakeProcessors(nil, append(tasktype.AllTableTasks, processor.BuiltinTaskName)) require.NoError(t, err) require.Len(t, proc.ActorProcessors, 21) - require.Len(t, proc.TipsetProcessors, 5) - require.Len(t, proc.TipsetsProcessors, 10) + require.Len(t, proc.TipsetProcessors, 8) + require.Len(t, proc.TipsetsProcessors, 7) require.Len(t, proc.ReportProcessors, 1) } diff --git a/go.mod b/go.mod index 2f3ab42a9..0bc5c9421 100644 --- a/go.mod +++ b/go.mod @@ -67,6 +67,7 @@ require ( github.com/filecoin-project/specs-actors/v8 v8.0.1 github.com/hibiken/asynq v0.23.0 github.com/hibiken/asynq/x v0.0.0-20220413130846-5c723f597e01 + github.com/ipfs/go-ipld-format v0.4.0 github.com/jedib0t/go-pretty/v6 v6.2.7 go.opentelemetry.io/otel/trace v1.3.0 go.uber.org/atomic v1.9.0 @@ -196,7 +197,6 @@ require ( github.com/ipfs/go-ipfs-pq v0.0.2 // indirect github.com/ipfs/go-ipfs-routing v0.2.1 // indirect github.com/ipfs/go-ipfs-util v0.0.2 // indirect - github.com/ipfs/go-ipld-format v0.4.0 // indirect github.com/ipfs/go-ipld-legacy v0.1.1 // indirect github.com/ipfs/go-ipns v0.1.2 // indirect github.com/ipfs/go-log v1.0.5 // indirect diff --git a/lens/interface.go b/lens/interface.go index dbc1b270d..f6f267bb9 100644 --- a/lens/interface.go +++ b/lens/interface.go @@ -2,7 +2,9 @@ package lens import ( "context" + "fmt" + "github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/go-address" @@ -18,8 +20,8 @@ type API interface { StoreAPI ChainAPI StateAPI + VMAPI - GetExecutedAndBlockMessagesForTipset(ctx context.Context, ts, pts *types.TipSet) (*TipSetMessages, error) GetMessageExecutionsForTipSet(ctx context.Context, ts, pts *types.TipSet) ([]*MessageExecution, error) } type StoreAPI interface { @@ -41,7 +43,11 @@ type ChainAPI interface { ChainGetBlockMessages(ctx context.Context, msg cid.Cid) (*api.BlockMessages, error) ChainGetParentMessages(ctx context.Context, blockCid cid.Cid) ([]api.Message, error) - ChainGetParentReceipts(ctx context.Context, blockCid cid.Cid) ([]*types.MessageReceipt, error) + + ComputeBaseFee(ctx context.Context, ts *types.TipSet) (abi.TokenAmount, error) + + MessagesForTipSetBlocks(ctx context.Context, ts *types.TipSet) ([]*BlockMessages, error) + TipSetMessageReceipts(ctx context.Context, ts, pts *types.TipSet) ([]*BlockMessageReceipts, error) } type StateAPI interface { @@ -59,9 +65,10 @@ type StateAPI interface { StateNetworkName(context.Context) (dtypes.NetworkName, error) } -type TipSetMessages struct { - Executed []*ExecutedMessage - Block []*BlockMessages +type ShouldBurnFn func(ctx context.Context, msg *types.Message, errcode exitcode.ExitCode) (bool, error) + +type VMAPI interface { + BurnFundsFn(ctx context.Context, ts *types.TipSet) (ShouldBurnFn, error) } type MessageExecution struct { @@ -78,21 +85,65 @@ type MessageExecution struct { Implicit bool } -type ExecutedMessage struct { - Cid cid.Cid - Height abi.ChainEpoch - Message *types.Message - Receipt *types.MessageReceipt - BlockHeader *types.BlockHeader - Blocks []cid.Cid // blocks this message appeared in - Index uint64 // Message and receipt sequence in tipset - FromActorCode cid.Cid // code of the actor the message is from - ToActorCode cid.Cid // code of the actor the message is to - GasOutputs vm.GasOutputs -} - type BlockMessages struct { Block *types.BlockHeader // block messages appeared in BlsMessages []*types.Message // BLS messages in block `Block` SecpMessages []*types.SignedMessage // SECP messages in block `Block` } + +// BlockMessageReceipts contains a block its messages and their corresponding receipts. +// The Receipts are one-to-one with Messages index. +type BlockMessageReceipts struct { + Block *types.BlockHeader + // Messages contained in Block. + Messages []types.ChainMsg + // Receipts contained in Block. + Receipts []*types.MessageReceipt + // MessageExectionIndex contains a mapping of Messages to their execution order in the tipset they were included. + MessageExecutionIndex map[types.ChainMsg]int +} + +type MessageReceiptIterator struct { + idx int + msgs []types.ChainMsg + receipts []*types.MessageReceipt + exeIdx map[types.ChainMsg]int +} + +// Iterator returns a MessageReceiptIterator to conveniently iterate messages, their execution index, and their respective receipts. +func (bmr *BlockMessageReceipts) Iterator() (*MessageReceiptIterator, error) { + if len(bmr.Messages) != len(bmr.Receipts) { + return nil, fmt.Errorf("invalid construction, expected equal number receipts (%d) and messages (%d)", len(bmr.Receipts), len(bmr.Messages)) + } + return &MessageReceiptIterator{ + idx: 0, + msgs: bmr.Messages, + receipts: bmr.Receipts, + exeIdx: bmr.MessageExecutionIndex, + }, nil +} + +// HasNext returns `true` while there are messages/receipts to iterate. +func (mri *MessageReceiptIterator) HasNext() bool { + if mri.idx < len(mri.msgs) { + return true + } + return false +} + +// Next returns the next message, its execution index, and receipt in the MessageReceiptIterator. +func (mri *MessageReceiptIterator) Next() (types.ChainMsg, int, *types.MessageReceipt) { + if mri.HasNext() { + msg := mri.msgs[mri.idx] + exeIdx := mri.exeIdx[msg] + rec := mri.receipts[mri.idx] + mri.idx++ + return msg, exeIdx, rec + } + return nil, -1, nil +} + +// Reset resets the MessageReceiptIterator to the first message/receipt. +func (mri *MessageReceiptIterator) Reset() { + mri.idx = 0 +} diff --git a/lens/lily/impl.go b/lens/lily/impl.go index 140e77e6d..7e90d125c 100644 --- a/lens/lily/impl.go +++ b/lens/lily/impl.go @@ -7,16 +7,22 @@ import ( "sync" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/exitcode" + network2 "github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/consensus/filcns" "github.com/filecoin-project/lotus/chain/events" + "github.com/filecoin-project/lotus/chain/state" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/vm" "github.com/filecoin-project/lotus/node/impl/common" "github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/impl/net" "github.com/filecoin-project/specs-actors/actors/util/adt" "github.com/go-pg/pg/v10" "github.com/ipfs/go-cid" + ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log/v2" "go.uber.org/fx" @@ -62,17 +68,9 @@ type LilyNodeAPI struct { actorStoreInit sync.Once } -func (m *LilyNodeAPI) CirculatingSupply(ctx context.Context, key types.TipSetKey) (api.CirculatingSupply, error) { - return m.StateAPI.StateVMCirculatingSupplyInternal(ctx, key) -} - -func (m *LilyNodeAPI) ChainGetTipSetAfterHeight(ctx context.Context, epoch abi.ChainEpoch, key types.TipSetKey) (*types.TipSet, error) { - // TODO (Frrist): I copied this from lotus, I need it now to handle gap filling edge cases. - ts, err := m.ChainAPI.Chain.GetTipSetFromKey(ctx, key) - if err != nil { - return nil, fmt.Errorf("loading tipset %s: %w", key, err) - } - return m.ChainAPI.Chain.GetTipsetByHeight(ctx, epoch, ts, false) +type vmWrapper struct { + vm vm.Interface + st *state.StateTree } func (m *LilyNodeAPI) StartTipSetWorker(_ context.Context, cfg *LilyTipSetWorkerConfig) (*schedule.JobSubmitResult, error) { @@ -481,10 +479,6 @@ func (m *LilyNodeAPI) LilyJobList(_ context.Context) ([]schedule.JobListResult, return m.Scheduler.Jobs(), nil } -func (m *LilyNodeAPI) GetExecutedAndBlockMessagesForTipset(ctx context.Context, ts, pts *types.TipSet) (*lens.TipSetMessages, error) { - return util.GetExecutedAndBlockMessagesForTipset(ctx, m.ChainAPI.Chain, m.StateManager, ts, pts) -} - func (m *LilyNodeAPI) GetMessageExecutionsForTipSet(ctx context.Context, next *types.TipSet, current *types.TipSet) ([]*lens.MessageExecution, error) { // this is defined in the lily daemon dep injection constructor, failure here is a developer error. msgMonitor, ok := m.ExecMonitor.(*modules.BufferedExecMonitor) @@ -544,6 +538,189 @@ func (m *LilyNodeAPI) GetMessageExecutionsForTipSet(ctx context.Context, next *t return out, nil } +// ComputeBaseFee calculates the base-fee of the specified tipset. +func (m *LilyNodeAPI) ComputeBaseFee(ctx context.Context, ts *types.TipSet) (abi.TokenAmount, error) { + return m.ChainAPI.Chain.ComputeBaseFee(ctx, ts) +} + +// MessagesForTipSetBlocks returns messages stored in the blocks of the specified tipset, messages may be duplicated +// across the returned set of BlockMessages. +func (m *LilyNodeAPI) MessagesForTipSetBlocks(ctx context.Context, ts *types.TipSet) ([]*lens.BlockMessages, error) { + var out []*lens.BlockMessages + for _, blk := range ts.Blocks() { + blkMsgs, err := m.ChainAPI.ChainModuleAPI.ChainGetBlockMessages(ctx, blk.Cid()) + if err != nil { + return nil, err + } + out = append(out, &lens.BlockMessages{ + Block: blk, + BlsMessages: blkMsgs.BlsMessages, + SecpMessages: blkMsgs.SecpkMessages, + }) + } + return out, nil +} + +// TipSetMessageReceipts returns the blocks and messages in `pts` and their corresponding receipts from `ts` matching block order in tipset (`pts`). +func (m *LilyNodeAPI) TipSetMessageReceipts(ctx context.Context, ts, pts *types.TipSet) ([]*lens.BlockMessageReceipts, error) { + // sanity check args + if ts.Key().IsEmpty() { + return nil, fmt.Errorf("tipset cannot be empty") + } + if pts.Key().IsEmpty() { + return nil, fmt.Errorf("parent tipset cannot be empty") + } + if !types.CidArrsEqual(ts.Parents().Cids(), pts.Cids()) { + return nil, fmt.Errorf("mismatching tipset (%s) and parent tipset (%s)", ts.Key().String(), pts.Key().String()) + } + // returned BlockMessages match block order in tipset + blkMsgs, err := m.ChainAPI.Chain.BlockMsgsForTipset(ctx, pts) + if err != nil { + return nil, err + } + if len(blkMsgs) != len(pts.Blocks()) { + // logic error somewhere + return nil, fmt.Errorf("mismatching number of blocks returned from block messages, got %d wanted %d", len(blkMsgs), len(pts.Blocks())) + } + + // retrieve receipts using a block from the child (ts) tipset + rs, err := adt.AsArray(m.Store(), ts.Blocks()[0].ParentMessageReceipts) + if err != nil { + // if we fail to find the receipts then we need to compute them, which we can safely do since the above `BlockMsgsForTipset` call + // returning successfully indicates we have the message available to compute receipts from. + if ipld.IsNotFound(err) { + log.Debugw("computing tipset to get receipts", "ts", pts.Key().String(), "height", pts.Height()) + if stateRoot, receiptRoot, err := m.StateManager.TipSetState(ctx, pts); err != nil { + log.Errorw("failed to compute tipset state", "tipset", pts.Key().String(), "height", pts.Height()) + return nil, err + } else if !stateRoot.Equals(ts.ParentState()) { // sanity check + return nil, fmt.Errorf("computed stateroot (%s) does not match tipset stateroot (%s)", stateRoot.String(), ts.ParentState().String()) + } else if !receiptRoot.Equals(ts.Blocks()[0].ParentMessageReceipts) { // sanity check + return nil, fmt.Errorf("computed receipts (%s) does not match tipset block parent message receipts (%s)", receiptRoot.String(), ts.Blocks()[0].ParentMessageReceipts.String()) + } + // loading after computing state should succeed as tipset computation produces message receipts + rs, err = adt.AsArray(m.Store(), ts.Blocks()[0].ParentMessageReceipts) + if err != nil { + return nil, fmt.Errorf("load message receipts after tipset execution (something if very wrong contact a developer): %w", err) + } + } else { + return nil, fmt.Errorf("loading message receipts %w", err) + } + } + // so we only load the receipt array once + getReceipt := func(idx int) (*types.MessageReceipt, error) { + var r types.MessageReceipt + if found, err := rs.Get(uint64(idx), &r); err != nil { + return nil, err + } else if !found { + return nil, fmt.Errorf("failed to find receipt %d", idx) + } + return &r, nil + } + + out := make([]*lens.BlockMessageReceipts, len(pts.Blocks())) + executionIndex := 0 + // walk each block in tipset, `pts.Blocks()` has same ordering as `blkMsgs`. + for blkIdx := range pts.Blocks() { + // bls and secp messages for block + msgs := blkMsgs[blkIdx] + // index of messages in `out.Messages` + msgIdx := 0 + // index or receipts in `out.Receipts` + receiptIdx := 0 + out[blkIdx] = &lens.BlockMessageReceipts{ + // block containing messages + Block: pts.Blocks()[blkIdx], + // total messages returned equal to sum of bls and secp messages + Messages: make([]types.ChainMsg, len(msgs.BlsMessages)+len(msgs.SecpkMessages)), + // total receipts returned equal to sum of bls and secp messages + Receipts: make([]*types.MessageReceipt, len(msgs.BlsMessages)+len(msgs.SecpkMessages)), + // index of message indicating execution order. + MessageExecutionIndex: make(map[types.ChainMsg]int), + } + // walk bls messages and extract their receipts + for blsIdx := range msgs.BlsMessages { + receipt, err := getReceipt(executionIndex) + if err != nil { + return nil, err + } + out[blkIdx].Messages[msgIdx] = msgs.BlsMessages[blsIdx] + out[blkIdx].Receipts[receiptIdx] = receipt + out[blkIdx].MessageExecutionIndex[msgs.BlsMessages[blsIdx]] = executionIndex + msgIdx++ + receiptIdx++ + executionIndex++ + } + // walk secp messages and extract their receipts + for secpIdx := range msgs.SecpkMessages { + receipt, err := getReceipt(executionIndex) + if err != nil { + return nil, err + } + out[blkIdx].Messages[msgIdx] = msgs.SecpkMessages[secpIdx] + out[blkIdx].Receipts[receiptIdx] = receipt + out[blkIdx].MessageExecutionIndex[msgs.SecpkMessages[secpIdx]] = executionIndex + msgIdx++ + receiptIdx++ + executionIndex++ + } + } + return out, nil +} + +func (v *vmWrapper) ShouldBurn(ctx context.Context, msg *types.Message, errcode exitcode.ExitCode) (bool, error) { + if lvmi, ok := v.vm.(*vm.LegacyVM); ok { + return lvmi.ShouldBurn(ctx, v.st, msg, errcode) + } + + // Any "don't burn" rules from Network v13 onwards go here, for now we always return true + // source: https://github.com/filecoin-project/lotus/blob/v1.15.1/chain/vm/vm.go#L647 + return true, nil +} + +func (m *LilyNodeAPI) BurnFundsFn(ctx context.Context, ts *types.TipSet) (lens.ShouldBurnFn, error) { + // Create a skeleton vm just for calling ShouldBurn + // NB: VM is only required to process state prior to network v13 + if util.DefaultNetwork.Version(ctx, ts.Height()) <= network2.Version12 { + vmi, err := vm.NewVM(ctx, &vm.VMOpts{ + StateBase: ts.ParentState(), + Epoch: ts.Height(), + Bstore: m.ChainAPI.Chain.StateBlockstore(), + Actors: filcns.NewActorRegistry(), + Syscalls: m.StateManager.Syscalls, + CircSupplyCalc: m.StateManager.GetVMCirculatingSupply, + NetworkVersion: util.DefaultNetwork.Version(ctx, ts.Height()), + BaseFee: ts.Blocks()[0].ParentBaseFee, + }) + if err != nil { + return nil, fmt.Errorf("creating temporary vm: %w", err) + } + parentStateTree, err := state.LoadStateTree(m.ChainAPI.Chain.ActorStore(ctx), ts.ParentState()) + if err != nil { + return nil, err + } + vmw := &vmWrapper{vm: vmi, st: parentStateTree} + return vmw.ShouldBurn, nil + } + // always burn after Network Version 12 + return func(ctx context.Context, msg *types.Message, errcode exitcode.ExitCode) (bool, error) { + return true, nil + }, nil +} + +func (m *LilyNodeAPI) CirculatingSupply(ctx context.Context, key types.TipSetKey) (api.CirculatingSupply, error) { + return m.StateAPI.StateVMCirculatingSupplyInternal(ctx, key) +} + +func (m *LilyNodeAPI) ChainGetTipSetAfterHeight(ctx context.Context, epoch abi.ChainEpoch, key types.TipSetKey) (*types.TipSet, error) { + // TODO (Frrist): I copied this from lotus, I need it now to handle gap filling edge cases. + ts, err := m.ChainAPI.Chain.GetTipSetFromKey(ctx, key) + if err != nil { + return nil, fmt.Errorf("loading tipset %s: %w", key, err) + } + return m.ChainAPI.Chain.GetTipsetByHeight(ctx, epoch, ts, false) +} + func (m *LilyNodeAPI) Store() adt.Store { m.actorStoreInit.Do(func() { if m.CacheConfig.StatestoreCacheSize > 0 { diff --git a/lens/lily/struct.go b/lens/lily/struct.go index 7ea3bbbdb..8155438a9 100644 --- a/lens/lily/struct.go +++ b/lens/lily/struct.go @@ -12,7 +12,6 @@ import ( logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p-core/peer" - "github.com/filecoin-project/lily/lens" "github.com/filecoin-project/lily/schedule" ) @@ -26,8 +25,7 @@ type LilyAPIStruct struct { v0api.CommonStruct Internal struct { - Store func() adt.Store `perm:"read"` - GetExecutedAndBlockMessagesForTipset func(context.Context, *types.TipSet, *types.TipSet) (*lens.TipSetMessages, error) `perm:"read"` + Store func() adt.Store `perm:"read"` LilyIndex func(ctx context.Context, config *LilyIndexConfig) (interface{}, error) `perm:"read"` LilyWatch func(context.Context, *LilyWatchConfig) (*schedule.JobSubmitResult, error) `perm:"read"` @@ -183,10 +181,6 @@ func (s *LilyAPIStruct) Shutdown(ctx context.Context) error { return s.Internal.Shutdown(ctx) } -func (s *LilyAPIStruct) GetExecutedAndBlockMessagesForTipset(ctx context.Context, ts, pts *types.TipSet) (*lens.TipSetMessages, error) { - return s.Internal.GetExecutedAndBlockMessagesForTipset(ctx, ts, pts) -} - func (s *LilyAPIStruct) SyncState(ctx context.Context) (*api.SyncState, error) { return s.Internal.SyncState(ctx) } diff --git a/lens/util/repo.go b/lens/util/repo.go index 83d63de5e..22b9517c8 100644 --- a/lens/util/repo.go +++ b/lens/util/repo.go @@ -10,8 +10,6 @@ import ( "time" "github.com/filecoin-project/go-bitfield" - "github.com/filecoin-project/go-state-types/exitcode" - "github.com/filecoin-project/go-state-types/network" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" cbg "github.com/whyrusleeping/cbor-gen" @@ -22,8 +20,6 @@ import ( builtin "github.com/filecoin-project/lotus/chain/actors/builtin" "github.com/filecoin-project/lotus/chain/consensus/filcns" "github.com/filecoin-project/lotus/chain/state" - "github.com/filecoin-project/lotus/chain/stmgr" - "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/vm" "github.com/filecoin-project/specs-actors/actors/util/adt" @@ -40,221 +36,6 @@ func init() { var log = logging.Logger("lily/lens") -// GetMessagesForTipset returns a list of messages sent as part of pts (parent) with receipts found in ts (child). -// No attempt at deduplication of messages is made. A list of blocks with their corresponding messages is also returned - it contains all messages -// in the block regardless if they were applied during the state change. -func GetExecutedAndBlockMessagesForTipset(ctx context.Context, cs *store.ChainStore, sm *stmgr.StateManager, current, executed *types.TipSet) (*lens.TipSetMessages, error) { - ctx, span := otel.Tracer("").Start(ctx, "GetExecutedAndBlockMessagesForTipSet") - defer span.End() - if !types.CidArrsEqual(current.Parents().Cids(), executed.Cids()) { - return nil, fmt.Errorf("current tipset (%s) is not on the same chain as executed (%s)", current.Key(), executed.Key()) - } - - getActorCode, err := MakeGetActorCodeFunc(ctx, cs.ActorStore(ctx), current, executed) - if err != nil { - return nil, err - } - - // Build a lookup of which blocks each message appears in - messageBlocks := map[cid.Cid][]cid.Cid{} - for blockIdx, bh := range executed.Blocks() { - blscids, secpkcids, err := cs.ReadMsgMetaCids(ctx, bh.Messages) - if err != nil { - return nil, fmt.Errorf("read messages for block: %w", err) - } - - for _, c := range blscids { - messageBlocks[c] = append(messageBlocks[c], executed.Cids()[blockIdx]) - } - - for _, c := range secpkcids { - messageBlocks[c] = append(messageBlocks[c], executed.Cids()[blockIdx]) - } - } - span.AddEvent("read block message metadata") - - bmsgs, err := cs.BlockMsgsForTipset(ctx, executed) - if err != nil { - return nil, fmt.Errorf("block messages for tipset: %w", err) - } - - span.AddEvent("read block messages for tipset") - - pblocks := executed.Blocks() - if len(bmsgs) != len(pblocks) { - // logic error somewhere - return nil, fmt.Errorf("mismatching number of blocks returned from block messages, got %d wanted %d", len(bmsgs), len(pblocks)) - } - - count := 0 - for _, bm := range bmsgs { - count += len(bm.BlsMessages) + len(bm.SecpkMessages) - } - - // Start building a list of completed message with receipt - emsgs := make([]*lens.ExecutedMessage, 0, count) - - // bmsgs is ordered by block - var index uint64 - for blockIdx, bm := range bmsgs { - for _, blsm := range bm.BlsMessages { - msg := blsm.VMMessage() - // if a message ran out of gas while executing this is expected. - toCode, found := getActorCode(msg.To) - if !found { - log.Warnw("failed to find TO actor", "height", current.Height().String(), "message", msg.Cid().String(), "actor", msg.To.String()) - } - // we must always be able to find the sender, else there is a logic error somewhere. - fromCode, found := getActorCode(msg.From) - if !found { - return nil, fmt.Errorf("failed to find from actor %s height %d message %s", msg.From, current.Height(), msg.Cid()) - } - emsgs = append(emsgs, &lens.ExecutedMessage{ - Cid: blsm.Cid(), - Height: executed.Height(), - Message: msg, - BlockHeader: pblocks[blockIdx], - Blocks: messageBlocks[blsm.Cid()], - Index: index, - FromActorCode: fromCode, - ToActorCode: toCode, - }) - index++ - } - - for _, secm := range bm.SecpkMessages { - msg := secm.VMMessage() - toCode, found := getActorCode(msg.To) - // if a message ran out of gas while executing this is expected. - if !found { - log.Warnw("failed to find TO actor", "height", current.Height().String(), "message", msg.Cid().String(), "actor", msg.To.String()) - } - // we must always be able to find the sender, else there is a logic error somewhere. - fromCode, found := getActorCode(msg.From) - if !found { - return nil, fmt.Errorf("failed to find from actor %s height %d message %s", msg.From, current.Height(), msg.Cid()) - } - emsgs = append(emsgs, &lens.ExecutedMessage{ - Cid: secm.Cid(), - Height: executed.Height(), - Message: secm.VMMessage(), - BlockHeader: pblocks[blockIdx], - Blocks: messageBlocks[secm.Cid()], - Index: index, - FromActorCode: fromCode, - ToActorCode: toCode, - }) - index++ - } - - } - span.AddEvent("built executed message list") - - // Retrieve receipts using a block from the child tipset - rs, err := adt.AsArray(cs.ActorStore(ctx), current.Blocks()[0].ParentMessageReceipts) - if err != nil { - return nil, fmt.Errorf("amt load: %w", err) - } - - if rs.Length() != uint64(len(emsgs)) { - // logic error somewhere - return nil, fmt.Errorf("mismatching number of receipts: got %d wanted %d", rs.Length(), len(emsgs)) - } - - // Create a skeleton vm just for calling ShouldBurn - // NB: VM is only required to process state prior to network v13 - if DefaultNetwork.Version(ctx, executed.Height()) <= network.Version12 { - vmi, err := vm.NewVM(ctx, &vm.VMOpts{ - StateBase: current.ParentState(), - Epoch: current.Height(), - Bstore: cs.StateBlockstore(), - Actors: filcns.NewActorRegistry(), - Syscalls: sm.Syscalls, - CircSupplyCalc: sm.GetVMCirculatingSupply, - NetworkVersion: DefaultNetwork.Version(ctx, current.Height()), - BaseFee: current.Blocks()[0].ParentBaseFee, - }) - if err != nil { - return nil, fmt.Errorf("creating temporary vm: %w", err) - } - parentStateTree, err := state.LoadStateTree(cs.ActorStore(ctx), executed.ParentState()) - if err != nil { - return nil, fmt.Errorf("load state tree: %w", err) - } - span.AddEvent("loaded parent state tree") - - vmw := &vmWrapper{vm: vmi} - // Receipts are in same order as BlockMsgsForTipset - for _, em := range emsgs { - var r types.MessageReceipt - if found, err := rs.Get(em.Index, &r); err != nil { - return nil, err - } else if !found { - return nil, fmt.Errorf("failed to find receipt %d", em.Index) - } - em.Receipt = &r - - burn, err := vmw.ShouldBurn(ctx, parentStateTree, em.Message, em.Receipt.ExitCode) - if err != nil { - return nil, fmt.Errorf("deciding whether should burn failed: %w", err) - } - - em.GasOutputs = vm.ComputeGasOutputs(em.Receipt.GasUsed, em.Message.GasLimit, em.BlockHeader.ParentBaseFee, em.Message.GasFeeCap, em.Message.GasPremium, burn) - span.AddEvent("computed executed message gas usage") - } - } else { - // Receipts are in same order as BlockMsgsForTipset - for _, em := range emsgs { - var r types.MessageReceipt - if found, err := rs.Get(em.Index, &r); err != nil { - return nil, err - } else if !found { - return nil, fmt.Errorf("failed to find receipt %d", em.Index) - } - em.Receipt = &r - // always burn after Network Verions 12 - shouldBurn := true - em.GasOutputs = vm.ComputeGasOutputs(em.Receipt.GasUsed, em.Message.GasLimit, em.BlockHeader.ParentBaseFee, em.Message.GasFeeCap, em.Message.GasPremium, shouldBurn) - } - span.AddEvent("computed executed message gas usage") - - } - - blkMsgs := make([]*lens.BlockMessages, len(current.Blocks())) - for idx, blk := range current.Blocks() { - msgs, smsgs, err := cs.MessagesForBlock(ctx, blk) - if err != nil { - return nil, err - } - blkMsgs[idx] = &lens.BlockMessages{ - Block: blk, - BlsMessages: msgs, - SecpMessages: smsgs, - } - } - - span.AddEvent("read messages for current block") - - return &lens.TipSetMessages{ - Executed: emsgs, - Block: blkMsgs, - }, nil -} - -type vmWrapper struct { - vm vm.Interface -} - -func (v *vmWrapper) ShouldBurn(ctx context.Context, st *state.StateTree, msg *types.Message, errcode exitcode.ExitCode) (bool, error) { - if lvmi, ok := v.vm.(*vm.LegacyVM); ok { - return lvmi.ShouldBurn(ctx, st, msg, errcode) - } - - // Any "don't burn" rules from Network v13 onwards go here, for now we always return true - // source: https://github.com/filecoin-project/lotus/blob/v1.15.1/chain/vm/vm.go#L647 - return true, nil -} - func ParseParams(params []byte, method abi.MethodNum, actCode cid.Cid) (string, string, error) { m, found := ActorRegistry.Methods[actCode][method] if !found { @@ -448,6 +229,7 @@ func MakeGetActorCodeFunc(ctx context.Context, store adt.Store, next, current *t } return func(a address.Address) (cid.Cid, bool) { + // TODO accept a context, don't take the function context. _, innerSpan := otel.Tracer("").Start(ctx, "GetActorCode") defer innerSpan.End() // Shortcut lookup before resolving diff --git a/tasks/actorstate/actorstate.go b/tasks/actorstate/actorstate.go index 8cfcbe578..62419cb76 100644 --- a/tasks/actorstate/actorstate.go +++ b/tasks/actorstate/actorstate.go @@ -60,7 +60,7 @@ type ActorStateAPI interface { // StateMinerSectors(ctx context.Context, addr address.Address, bf *bitfield.BitField, tsk types.TipSetKey) ([]*miner.SectorOnChainInfo, error) Store() adt.Store - ExecutedAndBlockMessages(ctx context.Context, ts, pts *types.TipSet) (*lens.TipSetMessages, error) + TipSetMessageReceipts(ctx context.Context, ts, pts *types.TipSet) ([]*lens.BlockMessageReceipts, error) DiffSectors(ctx context.Context, addr address.Address, ts, pts *types.TipSet, pre, cur miner.State) (*miner.SectorChanges, error) DiffPreCommits(ctx context.Context, addr address.Address, ts, pts *types.TipSet, pre, cur miner.State) (*miner.PreCommitChanges, error) diff --git a/tasks/actorstate/miner/post.go b/tasks/actorstate/miner/post.go index 711533e25..a891751fd 100644 --- a/tasks/actorstate/miner/post.go +++ b/tasks/actorstate/miner/post.go @@ -8,11 +8,11 @@ import ( "github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-state-types/abi" minertypes "github.com/filecoin-project/go-state-types/builtin/v8/miner" + "github.com/filecoin-project/lotus/chain/types" "go.opentelemetry.io/otel" "go.uber.org/zap" "github.com/filecoin-project/lily/chain/actors/builtin/miner" - "github.com/filecoin-project/lily/lens" "github.com/filecoin-project/lily/model" minermodel "github.com/filecoin-project/lily/model/actors/miner" "github.com/filecoin-project/lily/tasks/actorstate" @@ -60,13 +60,13 @@ func (PoStExtractor) Extract(ctx context.Context, a actorstate.ActorInfo, node a return pmap, nil } - processPostMsg := func(msg *lens.ExecutedMessage) error { + processPostMsg := func(msg types.ChainMsg, rec *types.MessageReceipt) error { sectors := make([]uint64, 0) - if msg.Receipt == nil || msg.Receipt.ExitCode.IsError() { + if rec == nil || rec.ExitCode.IsError() { return nil } params := minertypes.SubmitWindowedPoStParams{} - if err := params.UnmarshalCBOR(bytes.NewBuffer(msg.Message.Params)); err != nil { + if err := params.UnmarshalCBOR(bytes.NewBuffer(msg.VMMessage().Params)); err != nil { return fmt.Errorf("unmarshal post params: %w", err) } @@ -102,21 +102,28 @@ func (PoStExtractor) Extract(ctx context.Context, a actorstate.ActorInfo, node a Height: int64(ec.PrevTs.Height()), MinerID: addr, SectorID: s, - PostMessageCID: msg.Cid.String(), + PostMessageCID: msg.Cid().String(), }) } return nil } - tsMsgs, err := node.ExecutedAndBlockMessages(ctx, a.Current, a.Executed) + msgRects, err := node.TipSetMessageReceipts(ctx, a.Current, a.Executed) if err != nil { - return nil, fmt.Errorf("getting executed and block messages: %w", err) + return nil, err } - for _, msg := range tsMsgs.Executed { - if msg.Message.To == a.Address && msg.Message.Method == 5 /* miner.SubmitWindowedPoSt */ { - if err := processPostMsg(msg); err != nil { - return nil, fmt.Errorf("process post msg: %w", err) + for _, blkMsgs := range msgRects { + itr, err := blkMsgs.Iterator() + if err != nil { + return nil, err + } + for itr.HasNext() { + msg, _, rec := itr.Next() + if msg.VMMessage().To == a.Address && msg.VMMessage().Method == 5 /* miner.SubmitWindowedPoSt */ { + if err := processPostMsg(msg, rec); err != nil { + return nil, fmt.Errorf("process post msg: %w", err) + } } } } diff --git a/tasks/api.go b/tasks/api.go index e0e5ec8dd..b5201304b 100644 --- a/tasks/api.go +++ b/tasks/api.go @@ -4,6 +4,7 @@ import ( "context" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/types" @@ -51,11 +52,17 @@ type DataSource interface { MinerPower(ctx context.Context, addr address.Address, ts *types.TipSet) (*api.MinerPower, error) ActorStateChanges(ctx context.Context, ts, pts *types.TipSet) (ActorStateChangeDiff, error) MessageExecutions(ctx context.Context, ts, pts *types.TipSet) ([]*lens.MessageExecution, error) - ExecutedAndBlockMessages(ctx context.Context, ts, pts *types.TipSet) (*lens.TipSetMessages, error) Store() adt.Store + ComputeBaseFee(ctx context.Context, ts *types.TipSet) (abi.TokenAmount, error) + TipSetBlockMessages(ctx context.Context, ts *types.TipSet) ([]*lens.BlockMessages, error) + + TipSetMessageReceipts(ctx context.Context, ts, pts *types.TipSet) ([]*lens.BlockMessageReceipts, error) + DiffSectors(ctx context.Context, addr address.Address, ts, pts *types.TipSet, pre, cur miner.State) (*miner.SectorChanges, error) DiffPreCommits(ctx context.Context, addr address.Address, ts, pts *types.TipSet, pre, cur miner.State) (*miner.PreCommitChanges, error) MinerLoad(store adt.Store, act *types.Actor) (miner.State, error) + + ShouldBurnFn(ctx context.Context, ts *types.TipSet) (lens.ShouldBurnFn, error) } diff --git a/tasks/messages/blockmessage/task.go b/tasks/messages/blockmessage/task.go index c6ff64167..bd414c2cf 100644 --- a/tasks/messages/blockmessage/task.go +++ b/tasks/messages/blockmessage/task.go @@ -25,14 +25,12 @@ func NewTask(node tasks.DataSource) *Task { } } -func (t *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, executed *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) { +func (t *Task) ProcessTipSet(ctx context.Context, current *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) { ctx, span := otel.Tracer("").Start(ctx, "ProcessTipSets") if span.IsRecording() { span.SetAttributes( attribute.String("current", current.String()), attribute.Int64("current_height", int64(current.Height())), - attribute.String("executed", executed.String()), - attribute.Int64("executed_height", int64(executed.Height())), attribute.String("processor", "block_messages"), ) } @@ -42,12 +40,11 @@ func (t *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, execut StateRoot: current.ParentState().String(), } - tsMsgs, err := t.node.ExecutedAndBlockMessages(ctx, current, executed) + blkMsgs, err := t.node.TipSetBlockMessages(ctx, current) if err != nil { - report.ErrorsDetected = fmt.Errorf("getting executed and block messages: %w", err) + report.ErrorsDetected = fmt.Errorf("getting tipset block messages: %w", err) return nil, report, nil } - blkMsgs := tsMsgs.Block var ( errorsDetected = make([]*messages.MessageError, 0, len(blkMsgs)) diff --git a/tasks/messages/gaseconomy/task.go b/tasks/messages/gaseconomy/task.go index f84bb3e5c..ada46cebe 100644 --- a/tasks/messages/gaseconomy/task.go +++ b/tasks/messages/gaseconomy/task.go @@ -7,7 +7,6 @@ import ( "math/big" "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/ipfs/go-cid" "go.opentelemetry.io/otel" @@ -17,7 +16,6 @@ import ( messagemodel "github.com/filecoin-project/lily/model/messages" visormodel "github.com/filecoin-project/lily/model/visor" "github.com/filecoin-project/lily/tasks" - "github.com/filecoin-project/lily/tasks/messages" ) type Task struct { @@ -30,14 +28,12 @@ func NewTask(node tasks.DataSource) *Task { } } -func (t *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, executed *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) { +func (t *Task) ProcessTipSet(ctx context.Context, current *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) { ctx, span := otel.Tracer("").Start(ctx, "ProcessTipSets") if span.IsRecording() { span.SetAttributes( attribute.String("current", current.String()), attribute.Int64("current_height", int64(current.Height())), - attribute.String("executed", executed.String()), - attribute.Int64("executed_height", int64(executed.Height())), attribute.String("processor", "gas_economy"), ) } @@ -48,21 +44,19 @@ func (t *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, execut StateRoot: current.ParentState().String(), } - tsMsgs, err := t.node.ExecutedAndBlockMessages(ctx, current, executed) + msgrec, err := t.node.TipSetBlockMessages(ctx, current) if err != nil { - report.ErrorsDetected = fmt.Errorf("getting executed and block messages: %w", err) + report.ErrorsDetected = fmt.Errorf("getting tipset messages receipts: %w", err) return nil, report, nil } - emsgs := tsMsgs.Executed var ( - exeMsgSeen = make(map[cid.Cid]bool, len(emsgs)) + exeMsgSeen = make(map[cid.Cid]bool) totalGasLimit int64 totalUniqGasLimit int64 - errorsDetected = make([]*messages.MessageError, 0, len(emsgs)) ) - for _, m := range emsgs { + for _, mr := range msgrec { // Stop processing if we have been told to cancel select { case <-ctx.Done(): @@ -70,39 +64,49 @@ func (t *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, execut default: } - // calculate total gas limit of executed messages regardless of duplicates. - for range m.Blocks { - totalGasLimit += m.Message.GasLimit - } + for _, msg := range mr.BlsMessages { + // calculate total gas limit of executed messages regardless of duplicates. + totalGasLimit += msg.GasLimit + if exeMsgSeen[msg.Cid()] { + continue + } + exeMsgSeen[msg.Cid()] = true + // calculate unique gas limit + totalUniqGasLimit += msg.GasLimit - if exeMsgSeen[m.Cid] { - continue } - exeMsgSeen[m.Cid] = true - totalUniqGasLimit += m.Message.GasLimit + for _, msg := range mr.SecpMessages { + // calculate total gas limit of executed messages regardless of duplicates. + totalGasLimit += msg.VMMessage().GasLimit + if exeMsgSeen[msg.Cid()] { + continue + } + exeMsgSeen[msg.Cid()] = true + // calculate unique gas limit + totalUniqGasLimit += msg.VMMessage().GasLimit + } } - newBaseFee := store.ComputeNextBaseFee(executed.Blocks()[0].ParentBaseFee, totalUniqGasLimit, len(executed.Blocks()), executed.Height()) - baseFeeRat := new(big.Rat).SetFrac(newBaseFee.Int, new(big.Int).SetUint64(build.FilecoinPrecision)) + currentBaseFee, err := t.node.ComputeBaseFee(ctx, current) + if err != nil { + return nil, nil, err + } + baseFeeRat := new(big.Rat).SetFrac(currentBaseFee.Int, new(big.Int).SetUint64(build.FilecoinPrecision)) baseFee, _ := baseFeeRat.Float64() - baseFeeChange := new(big.Rat).SetFrac(newBaseFee.Int, executed.Blocks()[0].ParentBaseFee.Int) + baseFeeChange := new(big.Rat).SetFrac(currentBaseFee.Int, current.Blocks()[0].ParentBaseFee.Int) baseFeeChangeF, _ := baseFeeChange.Float64() messageGasEconomyResult := &messagemodel.MessageGasEconomy{ - Height: int64(executed.Height()), - StateRoot: executed.ParentState().String(), + Height: int64(current.Height()), + StateRoot: current.ParentState().String(), GasLimitTotal: totalGasLimit, GasLimitUniqueTotal: totalUniqGasLimit, BaseFee: baseFee, BaseFeeChangeLog: math.Log(baseFeeChangeF) / math.Log(1.125), - GasFillRatio: float64(totalGasLimit) / float64(len(executed.Blocks())*build.BlockGasTarget), - GasCapacityRatio: float64(totalUniqGasLimit) / float64(len(executed.Blocks())*build.BlockGasTarget), - GasWasteRatio: float64(totalGasLimit-totalUniqGasLimit) / float64(len(executed.Blocks())*build.BlockGasTarget), - } - - if len(errorsDetected) != 0 { - report.ErrorsDetected = errorsDetected + GasFillRatio: float64(totalGasLimit) / float64(len(current.Blocks())*build.BlockGasTarget), + GasCapacityRatio: float64(totalUniqGasLimit) / float64(len(current.Blocks())*build.BlockGasTarget), + GasWasteRatio: float64(totalGasLimit-totalUniqGasLimit) / float64(len(current.Blocks())*build.BlockGasTarget), } return messageGasEconomyResult, report, nil diff --git a/tasks/messages/gasoutput/task.go b/tasks/messages/gasoutput/task.go index e4030f97a..2ae3e40f1 100644 --- a/tasks/messages/gasoutput/task.go +++ b/tasks/messages/gasoutput/task.go @@ -4,12 +4,17 @@ import ( "context" "fmt" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/chain/types" "github.com/ipfs/go-cid" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "golang.org/x/sync/errgroup" "github.com/filecoin-project/lily/chain/actors/builtin" + "github.com/filecoin-project/lily/chain/datasource" + "github.com/filecoin-project/lily/lens" + "github.com/filecoin-project/lily/lens/util" "github.com/filecoin-project/lily/model" derivedmodel "github.com/filecoin-project/lily/model/derived" visormodel "github.com/filecoin-project/lily/model/visor" @@ -45,20 +50,49 @@ func (t *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, execut StateRoot: current.ParentState().String(), } - tsMsgs, err := t.node.ExecutedAndBlockMessages(ctx, current, executed) - if err != nil { - report.ErrorsDetected = fmt.Errorf("getting executed and block messages: %w", err) + grp, grpCtx := errgroup.WithContext(ctx) + + var getActorCodeFn func(address address.Address) (cid.Cid, bool) + grp.Go(func() error { + var err error + getActorCodeFn, err = util.MakeGetActorCodeFunc(grpCtx, t.node.Store(), current, executed) + if err != nil { + return fmt.Errorf("getting actor code lookup function: %w", err) + } + return nil + }) + + var blkMsgRec []*lens.BlockMessageReceipts + grp.Go(func() error { + var err error + blkMsgRec, err = t.node.TipSetMessageReceipts(grpCtx, current, executed) + if err != nil { + return fmt.Errorf("getting messages and receipts: %w", err) + } + return nil + }) + var burnFn lens.ShouldBurnFn + grp.Go(func() error { + var err error + burnFn, err = t.node.ShouldBurnFn(grpCtx, executed) + if err != nil { + return fmt.Errorf("getting should burn function: %w", err) + } + return nil + }) + + if err := grp.Wait(); err != nil { + report.ErrorsDetected = err return nil, report, nil } - emsgs := tsMsgs.Executed var ( - gasOutputsResults = make(derivedmodel.GasOutputsList, 0, len(emsgs)) - errorsDetected = make([]*messages.MessageError, 0, len(emsgs)) - exeMsgSeen = make(map[cid.Cid]bool, len(emsgs)) + gasOutputsResults = make(derivedmodel.GasOutputsList, 0) + errorsDetected = make([]*messages.MessageError, 0) + exeMsgSeen = make(map[cid.Cid]bool) ) - for _, m := range emsgs { + for _, msgrec := range blkMsgRec { // Stop processing if we have been told to cancel select { case <-ctx.Done(): @@ -66,49 +100,56 @@ func (t *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, execut default: } - if exeMsgSeen[m.Cid] { - continue - } - exeMsgSeen[m.Cid] = true - - var msgSize int - if b, err := m.Message.Serialize(); err == nil { - msgSize = len(b) - } else { - errorsDetected = append(errorsDetected, &messages.MessageError{ - Cid: m.Cid, - Error: fmt.Errorf("failed to serialize message: %w", err).Error(), - }) + itr, err := msgrec.Iterator() + if err != nil { + return nil, nil, err } - actorName := builtin.ActorNameByCode(m.ToActorCode) - gasOutput := &derivedmodel.GasOutputs{ - Height: int64(m.Height), - Cid: m.Cid.String(), - From: m.Message.From.String(), - To: m.Message.To.String(), - Value: m.Message.Value.String(), - GasFeeCap: m.Message.GasFeeCap.String(), - GasPremium: m.Message.GasPremium.String(), - GasLimit: m.Message.GasLimit, - Nonce: m.Message.Nonce, - Method: uint64(m.Message.Method), - StateRoot: m.BlockHeader.ParentStateRoot.String(), - ExitCode: int64(m.Receipt.ExitCode), - GasUsed: m.Receipt.GasUsed, - ParentBaseFee: m.BlockHeader.ParentBaseFee.String(), - SizeBytes: msgSize, - BaseFeeBurn: m.GasOutputs.BaseFeeBurn.String(), - OverEstimationBurn: m.GasOutputs.OverEstimationBurn.String(), - MinerPenalty: m.GasOutputs.MinerPenalty.String(), - MinerTip: m.GasOutputs.MinerTip.String(), - Refund: m.GasOutputs.Refund.String(), - GasRefund: m.GasOutputs.GasRefund, - GasBurned: m.GasOutputs.GasBurned, - ActorName: actorName, - ActorFamily: builtin.ActorFamily(actorName), + blk := msgrec.Block + for itr.HasNext() { + m, _, r := itr.Next() + if exeMsgSeen[m.Cid()] { + continue + } + exeMsgSeen[m.Cid()] = true + + toActorCode, found := getActorCodeFn(m.VMMessage().To) + if !found { + toActorCode = cid.Undef + } + gasOutputs, err := datasource.ComputeGasOutputs(ctx, blk, m.VMMessage(), r, burnFn) + if err != nil { + return nil, nil, err + } + actorName := builtin.ActorNameByCode(toActorCode) + gasOutput := &derivedmodel.GasOutputs{ + Height: int64(blk.Height), + StateRoot: blk.ParentStateRoot.String(), + ParentBaseFee: blk.ParentBaseFee.String(), + Cid: m.Cid().String(), + From: m.VMMessage().From.String(), + To: m.VMMessage().To.String(), + Value: m.VMMessage().Value.String(), + GasFeeCap: m.VMMessage().GasFeeCap.String(), + GasPremium: m.VMMessage().GasPremium.String(), + GasLimit: m.VMMessage().GasLimit, + Nonce: m.VMMessage().Nonce, + Method: uint64(m.VMMessage().Method), + SizeBytes: m.ChainLength(), + ExitCode: int64(r.ExitCode), + GasUsed: r.GasUsed, + BaseFeeBurn: gasOutputs.BaseFeeBurn.String(), + OverEstimationBurn: gasOutputs.OverEstimationBurn.String(), + MinerPenalty: gasOutputs.MinerPenalty.String(), + MinerTip: gasOutputs.MinerTip.String(), + Refund: gasOutputs.Refund.String(), + GasRefund: gasOutputs.GasRefund, + GasBurned: gasOutputs.GasBurned, + ActorName: actorName, + ActorFamily: builtin.ActorFamily(actorName), + } + gasOutputsResults = append(gasOutputsResults, gasOutput) } - gasOutputsResults = append(gasOutputsResults, gasOutput) } if len(errorsDetected) > 0 { diff --git a/tasks/messages/message/task.go b/tasks/messages/message/task.go index 862d74820..f776a50d6 100644 --- a/tasks/messages/message/task.go +++ b/tasks/messages/message/task.go @@ -26,14 +26,12 @@ func NewTask(node tasks.DataSource) *Task { } } -func (t *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, executed *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) { +func (t *Task) ProcessTipSet(ctx context.Context, current *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) { ctx, span := otel.Tracer("").Start(ctx, "ProcessTipSets") if span.IsRecording() { span.SetAttributes( attribute.String("current", current.String()), attribute.Int64("current_height", int64(current.Height())), - attribute.String("executed", executed.String()), - attribute.Int64("executed_height", int64(executed.Height())), attribute.String("processor", "messages"), ) } @@ -44,93 +42,69 @@ func (t *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, execut StateRoot: current.ParentState().String(), } - tsMsgs, err := t.node.ExecutedAndBlockMessages(ctx, current, executed) + blksMsgs, err := t.node.TipSetBlockMessages(ctx, current) if err != nil { - report.ErrorsDetected = fmt.Errorf("getting executed and block messages: %w", err) + report.ErrorsDetected = fmt.Errorf("getting messages for tipset: %w", err) return nil, report, nil } - blkMsgs := tsMsgs.Block var ( - messageResults = make(messagemodel.Messages, 0, len(blkMsgs)) - errorsDetected = make([]*messages.MessageError, 0, len(blkMsgs)) + messageResults = make(messagemodel.Messages, 0) + errorsDetected = make([]*messages.MessageError, 0) blkMsgSeen = make(map[cid.Cid]bool) ) - // Record which blocks had which messages, regardless of duplicates - for _, bm := range blkMsgs { - // Stop processing if we have been told to cancel + // record all unique messages in current + for _, blkMsgs := range blksMsgs { select { case <-ctx.Done(): return nil, nil, fmt.Errorf("context done: %w", ctx.Err()) default: } - - for _, msg := range bm.SecpMessages { + for _, msg := range blkMsgs.BlsMessages { if blkMsgSeen[msg.Cid()] { continue } blkMsgSeen[msg.Cid()] = true - var msgSize int - if b, err := msg.Message.Serialize(); err == nil { - msgSize = len(b) - } else { - errorsDetected = append(errorsDetected, &messages.MessageError{ - Cid: msg.Cid(), - Error: fmt.Errorf("failed to serialize message: %w", err).Error(), - }) - } - - // record all unique Secp messages - msg := &messagemodel.Message{ - Height: int64(bm.Block.Height), + // record all unique messages + messageResults = append(messageResults, &messagemodel.Message{ + Height: int64(blkMsgs.Block.Height), Cid: msg.Cid().String(), - From: msg.Message.From.String(), - To: msg.Message.To.String(), - Value: msg.Message.Value.String(), - GasFeeCap: msg.Message.GasFeeCap.String(), - GasPremium: msg.Message.GasPremium.String(), - GasLimit: msg.Message.GasLimit, - SizeBytes: msgSize, - Nonce: msg.Message.Nonce, - Method: uint64(msg.Message.Method), - } - messageResults = append(messageResults, msg) - + From: msg.VMMessage().From.String(), + To: msg.VMMessage().To.String(), + Value: msg.VMMessage().Value.String(), + GasFeeCap: msg.VMMessage().GasFeeCap.String(), + GasPremium: msg.VMMessage().GasPremium.String(), + GasLimit: msg.VMMessage().GasLimit, + SizeBytes: msg.ChainLength(), + Nonce: msg.VMMessage().Nonce, + Method: uint64(msg.VMMessage().Method), + }) } - for _, msg := range bm.BlsMessages { + for _, msg := range blkMsgs.SecpMessages { if blkMsgSeen[msg.Cid()] { continue } blkMsgSeen[msg.Cid()] = true - var msgSize int - if b, err := msg.Serialize(); err == nil { - msgSize = len(b) - } else { - errorsDetected = append(errorsDetected, &messages.MessageError{ - Cid: msg.Cid(), - Error: fmt.Errorf("failed to serialize message: %w", err).Error(), - }) - } - - // record all unique bls messages - msg := &messagemodel.Message{ - Height: int64(bm.Block.Height), + // record all unique messages + messageResults = append(messageResults, &messagemodel.Message{ + Height: int64(blkMsgs.Block.Height), Cid: msg.Cid().String(), - From: msg.From.String(), - To: msg.To.String(), - Value: msg.Value.String(), - GasFeeCap: msg.GasFeeCap.String(), - GasPremium: msg.GasPremium.String(), - GasLimit: msg.GasLimit, - SizeBytes: msgSize, - Nonce: msg.Nonce, - Method: uint64(msg.Method), - } - messageResults = append(messageResults, msg) + From: msg.VMMessage().From.String(), + To: msg.VMMessage().To.String(), + Value: msg.VMMessage().Value.String(), + GasFeeCap: msg.VMMessage().GasFeeCap.String(), + GasPremium: msg.VMMessage().GasPremium.String(), + GasLimit: msg.VMMessage().GasLimit, + SizeBytes: msg.ChainLength(), + Nonce: msg.VMMessage().Nonce, + Method: uint64(msg.VMMessage().Method), + }) + } + } if len(errorsDetected) != 0 { diff --git a/tasks/messages/parsedmessage/task.go b/tasks/messages/parsedmessage/task.go index d462305aa..304725e08 100644 --- a/tasks/messages/parsedmessage/task.go +++ b/tasks/messages/parsedmessage/task.go @@ -4,13 +4,16 @@ import ( "context" "fmt" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/lotus/chain/types" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "golang.org/x/sync/errgroup" + "github.com/filecoin-project/lily/lens" "github.com/filecoin-project/lily/lens/util" "github.com/filecoin-project/lily/model" messagemodel "github.com/filecoin-project/lily/model/messages" @@ -49,22 +52,42 @@ func (t *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, execut StateRoot: current.ParentState().String(), } - tsMsgs, err := t.node.ExecutedAndBlockMessages(ctx, current, executed) - if err != nil { - report.ErrorsDetected = fmt.Errorf("getting executed and block messages: %w", err) + grp, _ := errgroup.WithContext(ctx) + + var getActorCodeFn func(address address.Address) (cid.Cid, bool) + grp.Go(func() error { + var err error + getActorCodeFn, err = util.MakeGetActorCodeFunc(ctx, t.node.Store(), current, executed) + if err != nil { + return fmt.Errorf("getting actor code lookup function: %w", err) + } + return nil + }) + + var blkMsgRec []*lens.BlockMessageReceipts + grp.Go(func() error { + var err error + blkMsgRec, err = t.node.TipSetMessageReceipts(ctx, current, executed) + if err != nil { + return fmt.Errorf("getting messages and receipts: %w", err) + } + return nil + }) + + if err := grp.Wait(); err != nil { + report.ErrorsDetected = err return nil, report, nil } - emsgs := tsMsgs.Executed var ( - parsedMessageResults = make(messagemodel.ParsedMessages, 0, len(emsgs)) - errorsDetected = make([]*messages.MessageError, 0, len(emsgs)) - exeMsgSeen = make(map[cid.Cid]bool, len(emsgs)) + parsedMessageResults = make(messagemodel.ParsedMessages, 0) + errorsDetected = make([]*messages.MessageError, 0) + exeMsgSeen = make(map[cid.Cid]bool) totalGasLimit int64 totalUniqGasLimit int64 ) - for _, m := range emsgs { + for _, msgrec := range blkMsgRec { // Stop processing if we have been told to cancel select { case <-ctx.Done(): @@ -72,57 +95,69 @@ func (t *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, execut default: } - // calculate total gas limit of executed messages regardless of duplicates. - for range m.Blocks { - totalGasLimit += m.Message.GasLimit + itr, err := msgrec.Iterator() + if err != nil { + return nil, nil, err } - if exeMsgSeen[m.Cid] { - continue + // calculate total gas limit of executed messages regardless of duplicates. + for itr.HasNext() { + msg, _, _ := itr.Next() + totalGasLimit += msg.VMMessage().GasLimit } - exeMsgSeen[m.Cid] = true - totalUniqGasLimit += m.Message.GasLimit - - if m.ToActorCode.Defined() { - method, params, err := util.MethodAndParamsForMessage(m.Message, m.ToActorCode) - if err == nil { - pm := &messagemodel.ParsedMessage{ - Height: int64(m.Height), - Cid: m.Cid.String(), - From: m.Message.From.String(), - To: m.Message.To.String(), - Value: m.Message.Value.String(), - Method: method, - Params: params, - } - parsedMessageResults = append(parsedMessageResults, pm) - } else { - if m.Receipt.ExitCode == exitcode.ErrSerialization || - m.Receipt.ExitCode == exitcode.ErrIllegalArgument || - m.Receipt.ExitCode == exitcode.SysErrInvalidMethod || - // UsrErrUnsupportedMethod TODO: https://github.com/filecoin-project/go-state-types/pull/44 - m.Receipt.ExitCode == exitcode.ExitCode(22) { - // ignore the parse error since the params are probably malformed, as reported by the vm - } else { - log.Errorw("parsing message", "error", err, "cid", m.Message.Cid().String(), "receipt", m.Receipt) - errorsDetected = append(errorsDetected, &messages.MessageError{ - Cid: m.Cid, - Error: fmt.Errorf("failed to parse message params: %w", err).Error(), - }) - } + + // reset the iterator to beginning + itr.Reset() + + for itr.HasNext() { + m, _, r := itr.Next() + if exeMsgSeen[m.Cid()] { + continue } - } else { - // No destination actor code. Normally Lotus will create an account actor for unknown addresses but if the - // message fails then Lotus will not allow the actor to be created and we are left with an address of an - // unknown type. - // If the message was executed it means we are out of step with Lotus behaviour somehow. This probably - // indicates that Lily actor type detection is out of date. - if m.Receipt.ExitCode == 0 { - log.Errorw("parsing message", "error", err, "cid", m.Message.Cid().String(), "receipt", m.Receipt) + exeMsgSeen[m.Cid()] = true + totalUniqGasLimit += m.VMMessage().GasLimit + + toActorCode, found := getActorCodeFn(m.VMMessage().To) + if !found && r.ExitCode == 0 { + // No destination actor code. Normally Lotus will create an account actor for unknown addresses but if the + // message fails then Lotus will not allow the actor to be created and we are left with an address of an + // unknown type. + // If the message was executed it means we are out of step with Lotus behaviour somehow. This probably + // indicates that Lily actor type detection is out of date. + log.Errorw("parsing message", "error", err, "cid", m.Cid().String(), "receipt", r) errorsDetected = append(errorsDetected, &messages.MessageError{ - Cid: m.Cid, + Cid: m.Cid(), Error: fmt.Errorf("failed to parse message params: missing to actor code").Error(), }) + } else { + method, params, err := util.MethodAndParamsForMessage(m.VMMessage(), toActorCode) + if err == nil { + pm := &messagemodel.ParsedMessage{ + Height: int64(msgrec.Block.Height), + Cid: m.Cid().String(), + From: m.VMMessage().From.String(), + To: m.VMMessage().To.String(), + Value: m.VMMessage().Value.String(), + Method: method, + Params: params, + } + parsedMessageResults = append(parsedMessageResults, pm) + } else { + if r.ExitCode == exitcode.ErrSerialization || + r.ExitCode == exitcode.ErrIllegalArgument || + r.ExitCode == exitcode.SysErrInvalidMethod || + // UsrErrUnsupportedMethod TODO: https://github.com/filecoin-project/go-state-types/pull/44 + r.ExitCode == exitcode.ExitCode(22) { + // ignore the parse error since the params are probably malformed, as reported by the vm + + } else { + log.Errorw("parsing message", "error", err, "cid", m.Cid().String(), "receipt", r) + errorsDetected = append(errorsDetected, &messages.MessageError{ + Cid: m.Cid(), + Error: fmt.Errorf("failed to parse message params: %w", err).Error(), + }) + } + } } } } diff --git a/tasks/messages/receipt/task.go b/tasks/messages/receipt/task.go index 3990f9ab0..ec86bb92a 100644 --- a/tasks/messages/receipt/task.go +++ b/tasks/messages/receipt/task.go @@ -44,41 +44,50 @@ func (t *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, execut StateRoot: current.ParentState().String(), } - tsMsgs, err := t.node.ExecutedAndBlockMessages(ctx, current, executed) + blkMsgRect, err := t.node.TipSetMessageReceipts(ctx, current, executed) if err != nil { - report.ErrorsDetected = fmt.Errorf("getting executed and block messages: %w", err) + report.ErrorsDetected = fmt.Errorf("getting tipset message receipet: %w", err) return nil, report, nil } - emsgs := tsMsgs.Executed var ( - receiptResults = make(messagemodel.Receipts, 0, len(emsgs)) - errorsDetected = make([]*messages.MessageError, 0, len(emsgs)) - exeMsgSeen = make(map[cid.Cid]bool, len(emsgs)) + receiptResults = make(messagemodel.Receipts, 0, len(blkMsgRect)) + errorsDetected = make([]*messages.MessageError, 0, len(blkMsgRect)) + msgsSeen = make(map[cid.Cid]bool, len(blkMsgRect)) ) - for _, m := range emsgs { - // Stop processing if we have been told to cancel + for _, m := range blkMsgRect { select { case <-ctx.Done(): return nil, nil, fmt.Errorf("context done: %w", ctx.Err()) default: } - if exeMsgSeen[m.Cid] { - continue + itr, err := m.Iterator() + if err != nil { + return nil, nil, err } - exeMsgSeen[m.Cid] = true - - rcpt := &messagemodel.Receipt{ - Height: int64(current.Height()), - Message: m.Cid.String(), - StateRoot: current.ParentState().String(), - Idx: int(m.Index), - ExitCode: int64(m.Receipt.ExitCode), - GasUsed: m.Receipt.GasUsed, + + for itr.HasNext() { + msg, index, rec := itr.Next() + if msgsSeen[msg.Cid()] { + continue + } + msgsSeen[msg.Cid()] = true + + rcpt := &messagemodel.Receipt{ + // use current's height and stateroot since receipts returned from TipSetMessageReceipts come from current + // the messages from `executed` are applied (executed) to produce the stateroot and receipts in `current`. + Height: int64(current.Height()), + StateRoot: current.ParentState().String(), + + Message: msg.Cid().String(), + Idx: index, + ExitCode: int64(rec.ExitCode), + GasUsed: rec.GasUsed, + } + receiptResults = append(receiptResults, rcpt) } - receiptResults = append(receiptResults, rcpt) } diff --git a/tasks/msapprovals/msapprovals.go b/tasks/msapprovals/msapprovals.go index c44487a4c..8e62d3ed1 100644 --- a/tasks/msapprovals/msapprovals.go +++ b/tasks/msapprovals/msapprovals.go @@ -6,12 +6,17 @@ import ( "context" "fmt" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/chain/actors/builtin" "github.com/filecoin-project/lotus/chain/types" + "github.com/ipfs/go-cid" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "golang.org/x/sync/errgroup" "github.com/filecoin-project/lily/chain/actors/builtin/multisig" + "github.com/filecoin-project/lily/lens" + "github.com/filecoin-project/lily/lens/util" "github.com/filecoin-project/lily/tasks" "github.com/filecoin-project/lily/model" @@ -52,17 +57,37 @@ func (p *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, execut StateRoot: current.ParentState().String(), } - tsMsgs, err := p.node.ExecutedAndBlockMessages(ctx, current, executed) - if err != nil { - report.ErrorsDetected = fmt.Errorf("getting executed and block messages: %w", err) + grp, _ := errgroup.WithContext(ctx) + + var getActorCodeFn func(address address.Address) (cid.Cid, bool) + grp.Go(func() error { + var err error + getActorCodeFn, err = util.MakeGetActorCodeFunc(ctx, p.node.Store(), current, executed) + if err != nil { + return fmt.Errorf("getting actor code lookup function: %w", err) + } + return nil + }) + + var blkMsgRec []*lens.BlockMessageReceipts + grp.Go(func() error { + var err error + blkMsgRec, err = p.node.TipSetMessageReceipts(ctx, current, executed) + if err != nil { + return fmt.Errorf("getting messages and receipts: %w", err) + } + return nil + }) + + if err := grp.Wait(); err != nil { + report.ErrorsDetected = err return nil, report, nil } - emsgs := tsMsgs.Executed - errorsDetected := make([]*MultisigError, 0, len(emsgs)) + errorsDetected := make([]*MultisigError, 0) results := make(msapprovals.MultisigApprovalList, 0) // no initial size capacity since approvals are rare - for _, m := range emsgs { + for _, msgrec := range blkMsgRec { // Stop processing if we have been told to cancel select { case <-ctx.Done(): @@ -70,100 +95,112 @@ func (p *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, execut default: } - // Only interested in messages to multisig actors - if !builtin.IsMultisigActor(m.ToActorCode) { - continue + itr, err := msgrec.Iterator() + if err != nil { + return nil, nil, err } - // Only interested in successful messages - if !m.Receipt.ExitCode.IsSuccess() { - continue - } + for itr.HasNext() { + msg, _, rec := itr.Next() + // Only interested in successful messages + if !rec.ExitCode.IsSuccess() { + continue + } - // Only interested in propose and approve messages - if m.Message.Method != ProposeMethodNum && m.Message.Method != ApproveMethodNum { - continue - } + // Only interested in messages to multisig actors + msgToCode, found := getActorCodeFn(msg.VMMessage().To) + if !found { + return nil, nil, fmt.Errorf("failed to find to actor %s height %d message %s", msg.VMMessage().To, current.Height(), msg.Cid()) + } + if !builtin.IsMultisigActor(msgToCode) { + continue + } - applied, tx, err := p.getTransactionIfApplied(ctx, m.Message, m.Receipt, current) - if err != nil { - errorsDetected = append(errorsDetected, &MultisigError{ - Addr: m.Message.To.String(), - Error: fmt.Errorf("failed to find transaction: %w", err).Error(), - }) - continue - } + // Only interested in propose and approve messages + if msg.VMMessage().Method != ProposeMethodNum && msg.VMMessage().Method != ApproveMethodNum { + continue + } - // Only interested in messages that applied a transaction - if !applied { - continue - } + applied, tx, err := p.getTransactionIfApplied(ctx, msg.VMMessage(), rec, current) + if err != nil { + errorsDetected = append(errorsDetected, &MultisigError{ + Addr: msg.VMMessage().To.String(), + Error: fmt.Errorf("failed to find transaction for message %s: %w", msg.Cid(), err).Error(), + }) + continue + } - appr := msapprovals.MultisigApproval{ - Height: int64(executed.Height()), - StateRoot: executed.ParentState().String(), - MultisigID: m.Message.To.String(), - Message: m.Cid.String(), - Method: uint64(m.Message.Method), - Approver: m.Message.From.String(), - GasUsed: m.Receipt.GasUsed, - TransactionID: tx.id, - To: tx.to, - Value: tx.value, - } + // Only interested in messages that applied a transaction + if !applied { + continue + } - // Get state of actor after the message has been applied - act, err := p.node.Actor(ctx, m.Message.To, current.Key()) - if err != nil { - errorsDetected = append(errorsDetected, &MultisigError{ - Addr: m.Message.To.String(), - Error: fmt.Errorf("failed to load actor: %w", err).Error(), - }) - continue - } + appr := msapprovals.MultisigApproval{ + Height: int64(executed.Height()), + StateRoot: executed.ParentState().String(), + MultisigID: msg.VMMessage().To.String(), + Message: msg.Cid().String(), + Method: uint64(msg.VMMessage().Method), + Approver: msg.VMMessage().From.String(), + GasUsed: rec.GasUsed, + TransactionID: tx.id, + To: tx.to, + Value: tx.value, + } - actorState, err := multisig.Load(p.node.Store(), act) - if err != nil { - errorsDetected = append(errorsDetected, &MultisigError{ - Addr: m.Message.To.String(), - Error: fmt.Errorf("failed to load actor state: %w", err).Error(), - }) - continue - } + // Get state of actor after the message has been applied + act, err := p.node.Actor(ctx, msg.VMMessage().To, current.Key()) + if err != nil { + errorsDetected = append(errorsDetected, &MultisigError{ + Addr: msg.VMMessage().To.String(), + Error: fmt.Errorf("failed to load actor: %w", err).Error(), + }) + continue + } - ib, err := actorState.InitialBalance() - if err != nil { - errorsDetected = append(errorsDetected, &MultisigError{ - Addr: m.Message.To.String(), - Error: fmt.Errorf("failed to read initial balance: %w", err).Error(), - }) - continue - } - appr.InitialBalance = ib.String() + actorState, err := multisig.Load(p.node.Store(), act) + if err != nil { + errorsDetected = append(errorsDetected, &MultisigError{ + Addr: msg.VMMessage().To.String(), + Error: fmt.Errorf("failed to load actor state: %w", err).Error(), + }) + continue + } - threshold, err := actorState.Threshold() - if err != nil { - errorsDetected = append(errorsDetected, &MultisigError{ - Addr: m.Message.To.String(), - Error: fmt.Errorf("failed to read initial balance: %w", err).Error(), - }) - continue - } - appr.Threshold = threshold + ib, err := actorState.InitialBalance() + if err != nil { + errorsDetected = append(errorsDetected, &MultisigError{ + Addr: msg.VMMessage().To.String(), + Error: fmt.Errorf("failed to read initial balance: %w", err).Error(), + }) + continue + } + appr.InitialBalance = ib.String() + + threshold, err := actorState.Threshold() + if err != nil { + errorsDetected = append(errorsDetected, &MultisigError{ + Addr: msg.VMMessage().To.String(), + Error: fmt.Errorf("failed to read initial balance: %w", err).Error(), + }) + continue + } + appr.Threshold = threshold + + signers, err := actorState.Signers() + if err != nil { + errorsDetected = append(errorsDetected, &MultisigError{ + Addr: msg.VMMessage().To.String(), + Error: fmt.Errorf("failed to read signers: %w", err).Error(), + }) + continue + } + for _, addr := range signers { + appr.Signers = append(appr.Signers, addr.String()) + } - signers, err := actorState.Signers() - if err != nil { - errorsDetected = append(errorsDetected, &MultisigError{ - Addr: m.Message.To.String(), - Error: fmt.Errorf("failed to read signers: %w", err).Error(), - }) - continue + results = append(results, &appr) } - for _, addr := range signers { - appr.Signers = append(appr.Signers, addr.String()) - } - - results = append(results, &appr) } if len(errorsDetected) != 0 { diff --git a/tasks/test/api.go b/tasks/test/api.go index adefac769..85b1cc3a1 100644 --- a/tasks/test/api.go +++ b/tasks/test/api.go @@ -17,6 +17,13 @@ type MockActorStateAPI struct { mock.Mock } +func (m *MockActorStateAPI) TipSetMessageReceipts(ctx context.Context, ts, pts *types.TipSet) ([]*lens.BlockMessageReceipts, error) { + args := m.Called(ctx, ts, pts) + tsmsgs := args.Get(0) + err := args.Error(1) + return tsmsgs.([]*lens.BlockMessageReceipts), err +} + func (m *MockActorStateAPI) MinerLoad(store adt.Store, act *types.Actor) (miner.State, error) { args := m.Called(store, act) state := args.Get(0) @@ -56,13 +63,6 @@ func (m *MockActorStateAPI) Store() adt.Store { return nil } -func (m *MockActorStateAPI) ExecutedAndBlockMessages(ctx context.Context, ts, pts *types.TipSet) (*lens.TipSetMessages, error) { - args := m.Called(ctx, ts, pts) - tsmsgs := args.Get(0) - err := args.Error(1) - return tsmsgs.(*lens.TipSetMessages), err -} - func (m *MockActorStateAPI) DiffPreCommits(ctx context.Context, addr address.Address, ts, pts *types.TipSet, pre, cur miner.State) (*miner.PreCommitChanges, error) { args := m.Called(ctx, ts, pts, pre, cur) tsmsgs := args.Get(0) diff --git a/testutil/lens.go b/testutil/lens.go index 45535d5c5..b17428e94 100644 --- a/testutil/lens.go +++ b/testutil/lens.go @@ -25,6 +25,31 @@ type APIWrapper struct { ctx context.Context } +func (aw *APIWrapper) BurnFundsFn(ctx context.Context, ts *types.TipSet) (lens.ShouldBurnFn, error) { + //TODO implement me + panic("implement me") +} + +func (aw *APIWrapper) GetMessageExecutionsForTipSet(ctx context.Context, ts, pts *types.TipSet) ([]*lens.MessageExecution, error) { + //TODO implement me + panic("implement me") +} + +func (aw *APIWrapper) ComputeBaseFee(ctx context.Context, ts *types.TipSet) (abi.TokenAmount, error) { + //TODO implement me + panic("implement me") +} + +func (aw *APIWrapper) MessagesForTipSetBlocks(ctx context.Context, ts *types.TipSet) ([]*lens.BlockMessages, error) { + //TODO implement me + panic("implement me") +} + +func (aw *APIWrapper) TipSetMessageReceipts(ctx context.Context, ts, pts *types.TipSet) ([]*lens.BlockMessageReceipts, error) { + //TODO implement me + panic("implement me") +} + func (aw *APIWrapper) CirculatingSupply(ctx context.Context, key types.TipSetKey) (api.CirculatingSupply, error) { return aw.StateVMCirculatingSupplyInternal(ctx, key) } @@ -37,14 +62,6 @@ func (aw *APIWrapper) Store() adt.Store { return aw } -func (aw *APIWrapper) GetExecutedAndBlockMessagesForTipset(ctx context.Context, ts, pts *types.TipSet) (*lens.TipSetMessages, error) { - return nil, fmt.Errorf("ExecutedAndBlockMessages is not implemented") -} - -func (aw *APIWrapper) GetMessageExecutionsForTipSet(ctx context.Context, ts, pts *types.TipSet) ([]*lens.MessageExecution, error) { - return nil, fmt.Errorf("MessageExecutions is not implemented") -} - func (aw *APIWrapper) Get(ctx context.Context, c cid.Cid, out interface{}) error { cu, ok := out.(cbg.CBORUnmarshaler) if !ok {