diff --git a/cmd/collection/main.go b/cmd/collection/main.go index 1a241ba703b..fad3548d071 100644 --- a/cmd/collection/main.go +++ b/cmd/collection/main.go @@ -480,6 +480,7 @@ func main() { node.EngineRegistry, node.State, node.Metrics.Engine, + node.Metrics.Mempool, colMetrics, node.Me, node.Storage.Collections, diff --git a/engine/collection/pusher/engine.go b/engine/collection/pusher/engine.go index 996fb82b29d..a750cd7842b 100644 --- a/engine/collection/pusher/engine.go +++ b/engine/collection/pusher/engine.go @@ -38,7 +38,7 @@ type Engine struct { transactions storage.Transactions notifier engine.Notifier - inbound *fifoqueue.FifoQueue + queue *fifoqueue.FifoQueue component.Component cm *component.ComponentManager @@ -48,11 +48,26 @@ type Engine struct { var _ network.Engine = (*Engine)(nil) var _ component.Component = (*Engine)(nil) -func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, engMetrics module.EngineMetrics, colMetrics module.CollectionMetrics, me module.Local, collections storage.Collections, transactions storage.Transactions) (*Engine, error) { - // TODO length observer metrics - inbound, err := fifoqueue.NewFifoQueue(1000) +// New creates a new pusher engine. +func New( + log zerolog.Logger, + net network.EngineRegistry, + state protocol.State, + engMetrics module.EngineMetrics, + mempoolMetrics module.MempoolMetrics, + colMetrics module.CollectionMetrics, + me module.Local, + collections storage.Collections, + transactions storage.Transactions, +) (*Engine, error) { + queue, err := fifoqueue.NewFifoQueue( + 1000, + fifoqueue.WithLengthObserver(func(len int) { + mempoolMetrics.MempoolEntries(metrics.ResourceSubmitCollectionGuaranteesQueue, uint(len)) + }), + ) if err != nil { - return nil, fmt.Errorf("could not create inbound fifoqueue: %w", err) + return nil, fmt.Errorf("could not create fifoqueue: %w", err) } notifier := engine.NewNotifier() @@ -67,7 +82,7 @@ func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, e transactions: transactions, notifier: notifier, - inbound: inbound, + queue: queue, } conduit, err := net.Register(channels.PushGuarantees, e) @@ -107,7 +122,7 @@ func (e *Engine) outboundQueueWorker(ctx irrecoverable.SignalerContext, ready co // Only returns when the queue is empty (or the engine is terminated). func (e *Engine) processOutboundMessages(ctx context.Context) error { for { - nextMessage, ok := e.inbound.Pop() + nextMessage, ok := e.queue.Pop() if !ok { return nil } @@ -167,9 +182,9 @@ func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, mes // SubmitCollectionGuarantee adds a collection guarantee to the engine's queue // to later be published to consensus nodes. func (e *Engine) SubmitCollectionGuarantee(msg *messages.SubmitCollectionGuarantee) { - ok := e.inbound.Push(msg) + ok := e.queue.Push(msg) if !ok { - e.log.Err(fmt.Errorf("failed to store collection guarantee in queue")) + engine.LogError(e.log, fmt.Errorf("failed to store collection guarantee in queue")) return } e.notifier.Notify() diff --git a/engine/collection/pusher/engine_test.go b/engine/collection/pusher/engine_test.go index 9f59f79f666..29c4d41a1b5 100644 --- a/engine/collection/pusher/engine_test.go +++ b/engine/collection/pusher/engine_test.go @@ -72,6 +72,7 @@ func (suite *Suite) SetupTest() { suite.state, metrics, metrics, + metrics, suite.me, suite.collections, suite.transactions, diff --git a/engine/testutil/nodes.go b/engine/testutil/nodes.go index a602e628565..ecab659bbfe 100644 --- a/engine/testutil/nodes.go +++ b/engine/testutil/nodes.go @@ -313,7 +313,7 @@ func CollectionNode(t *testing.T, hub *stub.Hub, identity bootstrap.NodeInfo, ro retrieve) require.NoError(t, err) - pusherEngine, err := pusher.New(node.Log, node.Net, node.State, node.Metrics, node.Metrics, node.Me, collections, transactions) + pusherEngine, err := pusher.New(node.Log, node.Net, node.State, node.Metrics, node.Metrics, node.Metrics, node.Me, collections, transactions) require.NoError(t, err) clusterStateFactory, err := factories.NewClusterStateFactory( diff --git a/module/metrics/labels.go b/module/metrics/labels.go index 20b66ad7d68..70520a61836 100644 --- a/module/metrics/labels.go +++ b/module/metrics/labels.go @@ -113,6 +113,7 @@ const ( ResourceFollowerLoopCertifiedBlocksChannel = "follower_loop_certified_blocks_channel" // follower loop, certified blocks buffered channel ResourceClusterBlockProposalQueue = "cluster_compliance_proposal_queue" // collection node, compliance engine ResourceTransactionIngestQueue = "ingest_transaction_queue" // collection node, ingest engine + ResourceSubmitCollectionGuaranteesQueue = "submit_col_guarantee_queue" // collection node, pusher engine ResourceBeaconKey = "beacon-key" // consensus node, DKG engine ResourceDKGMessage = "dkg_private_message" // consensus, DKG messaging engine ResourceApprovalQueue = "sealing_approval_queue" // consensus node, sealing engine