diff --git a/core/capabilities/integration_tests/framework/capabilities_registry.go b/core/capabilities/integration_tests/framework/capabilities_registry.go index 838303a9f16..5c23d2ebc1a 100644 --- a/core/capabilities/integration_tests/framework/capabilities_registry.go +++ b/core/capabilities/integration_tests/framework/capabilities_registry.go @@ -64,6 +64,8 @@ func (r *CapabilitiesRegistry) getAddress() common.Address { type capability struct { donCapabilityConfig *pb.CapabilityConfig registryConfig kcr.CapabilitiesRegistryCapability + // internalOnly is true if the capability is published in the registry but not made available outside the DON in which it runs + internalOnly bool } // SetupDON sets up a new DON with the given capabilities and returns the DON ID diff --git a/core/capabilities/integration_tests/framework/don.go b/core/capabilities/integration_tests/framework/don.go index 0c0284e53d3..999966bdc1d 100644 --- a/core/capabilities/integration_tests/framework/don.go +++ b/core/capabilities/integration_tests/framework/don.go @@ -2,6 +2,7 @@ package framework import ( "context" + "encoding/hex" "fmt" "strconv" "testing" @@ -13,6 +14,8 @@ import ( "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer" commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" @@ -40,13 +43,13 @@ import ( type DonContext struct { EthBlockchain *EthBlockchain - p2pNetwork *MockRageP2PNetwork + p2pNetwork *FakeRageP2PNetwork capabilityRegistry *CapabilitiesRegistry } func CreateDonContext(ctx context.Context, t *testing.T) DonContext { ethBlockchain := NewEthBlockchain(t, 1000, 1*time.Second) - rageP2PNetwork := NewMockRageP2PNetwork(t, 1000) + rageP2PNetwork := NewFakeRageP2PNetwork(ctx, t, 1000) capabilitiesRegistry := NewCapabilitiesRegistry(ctx, t, ethBlockchain) servicetest.Run(t, rageP2PNetwork) @@ -54,6 +57,29 @@ func CreateDonContext(ctx context.Context, t *testing.T) DonContext { return DonContext{EthBlockchain: ethBlockchain, p2pNetwork: rageP2PNetwork, capabilityRegistry: capabilitiesRegistry} } +func (c DonContext) WaitForCapabilitiesToBeExposed(t *testing.T, dons ...*DON) { + allExpectedCapabilities := make(map[CapabilityRegistration]bool) + for _, don := range dons { + caps, err := don.GetExternalCapabilities() + require.NoError(t, err) + for k, v := range caps { + allExpectedCapabilities[k] = v + } + } + + require.Eventually(t, func() bool { + registrations := c.p2pNetwork.GetCapabilityRegistrations() + + for k := range allExpectedCapabilities { + if _, ok := registrations[k]; !ok { + return false + } + } + + return true + }, 1*time.Minute, 1*time.Second, "timeout waiting for capabilities to be exposed") +} + type capabilityNode struct { *cltest.TestApplication registry *capabilities.Registry @@ -64,12 +90,13 @@ type capabilityNode struct { } type DON struct { + services.StateMachine t *testing.T config DonConfiguration lggr logger.Logger nodes []*capabilityNode standardCapabilityJobs []*job.Job - externalCapabilities []capability + publishedCapabilities []capability capabilitiesRegistry *CapabilitiesRegistry nodeConfigModifiers []func(c *chainlink.Config, node *capabilityNode) @@ -84,10 +111,13 @@ func NewDON(ctx context.Context, t *testing.T, lggr logger.Logger, donConfig Don dependentDONs []commoncap.DON, donContext DonContext, supportsOCR bool) *DON { don := &DON{t: t, lggr: lggr.Named(donConfig.name), config: donConfig, capabilitiesRegistry: donContext.capabilityRegistry} + protocolRoundInterval := 1 * time.Second + var newOracleFactoryFn standardcapabilities.NewOracleFactoryFn - var libOcr *MockLibOCR + var libOcr *FakeLibOCR if supportsOCR { - libOcr = NewMockLibOCR(t, lggr, donConfig.F, 1*time.Second) + // This is required to support the non standard OCR3 capability - will be removed when required OCR3 behaviour is implemented as standard capabilities + libOcr = NewFakeLibOCR(t, lggr, donConfig.F, protocolRoundInterval) servicetest.Run(t, libOcr) } @@ -110,7 +140,8 @@ func NewDON(ctx context.Context, t *testing.T, lggr logger.Logger, donConfig Don don.nodes = append(don.nodes, cn) if supportsOCR { - factory := newMockLibOcrOracleFactory(libOcr, donConfig.KeyBundles[i], len(donConfig.Members), int(donConfig.F)) + factory := newFakeOracleFactoryFactory(t, lggr, donConfig.KeyBundles[i], len(donConfig.Members), donConfig.F, + protocolRoundInterval) newOracleFactoryFn = factory.NewOracleFactory } @@ -134,12 +165,10 @@ func NewDON(ctx context.Context, t *testing.T, lggr logger.Logger, donConfig Don // Initialise must be called after all capabilities have been added to the DONs and before Start is called func (d *DON) Initialise() { - if len(d.externalCapabilities) > 0 { - id := d.capabilitiesRegistry.setupDON(d.config, d.externalCapabilities) + id := d.capabilitiesRegistry.setupDON(d.config, d.publishedCapabilities) - //nolint:gosec // disable G115 - d.config.DON.ID = uint32(id) - } + //nolint:gosec // disable G115 + d.config.DON.ID = uint32(id) } func (d *DON) GetID() uint32 { @@ -150,6 +179,29 @@ func (d *DON) GetID() uint32 { return d.config.ID } +func (d *DON) GetExternalCapabilities() (map[CapabilityRegistration]bool, error) { + result := map[CapabilityRegistration]bool{} + for _, publishedCapability := range d.publishedCapabilities { + if publishedCapability.internalOnly { + continue + } + + for _, node := range d.nodes { + peerIDBytes, err := peerIDToBytes(node.peerID.PeerID) + if err != nil { + return nil, fmt.Errorf("failed to convert peer ID to bytes: %w", err) + } + result[CapabilityRegistration{ + nodePeerID: hex.EncodeToString(peerIDBytes[:]), + capabilityID: publishedCapability.registryConfig.LabelledName + "@" + publishedCapability.registryConfig.Version, + capabilityDonID: d.GetID(), + }] = true + } + } + + return result, nil +} + func (d *DON) GetConfigVersion() uint32 { return d.config.ConfigVersion } @@ -162,20 +214,22 @@ func (d *DON) GetPeerIDs() []peer { return d.config.peerIDs } -func (d *DON) Start(ctx context.Context, t *testing.T) { +func (d *DON) Start(ctx context.Context) error { for _, triggerFactory := range d.triggerFactories { for _, node := range d.nodes { - trigger := triggerFactory.CreateNewTrigger(t) - err := node.registry.Add(ctx, trigger) - require.NoError(t, err) + trigger := triggerFactory.CreateNewTrigger(d.t) + if err := node.registry.Add(ctx, trigger); err != nil { + return fmt.Errorf("failed to add trigger: %w", err) + } } } for _, targetFactory := range d.targetFactories { for _, node := range d.nodes { - target := targetFactory.CreateNewTarget(t) - err := node.registry.Add(ctx, target) - require.NoError(t, err) + target := targetFactory.CreateNewTarget(d.t) + if err := node.registry.Add(ctx, target); err != nil { + return fmt.Errorf("failed to add target: %w", err) + } } } @@ -184,18 +238,31 @@ func (d *DON) Start(ctx context.Context, t *testing.T) { } if d.addOCR3NonStandardCapability { - libocr := NewMockLibOCR(t, d.lggr, d.config.F, 1*time.Second) - servicetest.Run(t, libocr) + libocr := NewFakeLibOCR(d.t, d.lggr, d.config.F, 1*time.Second) + servicetest.Run(d.t, libocr) for _, node := range d.nodes { - addOCR3Capability(ctx, t, d.lggr, node.registry, libocr, d.config.F, node.KeyBundle) + addOCR3Capability(ctx, d.t, d.lggr, node.registry, libocr, d.config.F, node.KeyBundle) } } for _, capabilityJob := range d.standardCapabilityJobs { - err := d.AddJob(ctx, capabilityJob) - require.NoError(t, err) + if err := d.AddJob(ctx, capabilityJob); err != nil { + return fmt.Errorf("failed to add standard capability job: %w", err) + } } + + return nil +} + +func (d *DON) Close() error { + for _, node := range d.nodes { + if err := node.Stop(); err != nil { + return fmt.Errorf("failed to stop node: %w", err) + } + } + + return nil } const StandardCapabilityTemplateJobSpec = ` @@ -203,7 +270,7 @@ type = "standardcapabilities" schemaVersion = 1 name = "%s" command="%s" -config="%s" +config=%s ` func (d *DON) AddStandardCapability(name string, command string, config string) { @@ -214,11 +281,30 @@ func (d *DON) AddStandardCapability(name string, command string, config string) d.standardCapabilityJobs = append(d.standardCapabilityJobs, &capabilitiesSpecJob) } +func (d *DON) AddPublishedStandardCapability(name string, command string, config string, + defaultCapabilityRequestConfig *pb.CapabilityConfig, + registryConfig kcr.CapabilitiesRegistryCapability) { + spec := fmt.Sprintf(StandardCapabilityTemplateJobSpec, name, command, config) + capabilitiesSpecJob, err := standardcapabilities.ValidatedStandardCapabilitiesSpec(spec) + require.NoError(d.t, err) + + d.standardCapabilityJobs = append(d.standardCapabilityJobs, &capabilitiesSpecJob) + + d.publishedCapabilities = append(d.publishedCapabilities, capability{ + donCapabilityConfig: defaultCapabilityRequestConfig, + registryConfig: registryConfig, + }) +} + // TODO - add configuration for remote support - do this for each capability as an option func (d *DON) AddTargetCapability(targetFactory TargetFactory) { d.targetFactories = append(d.targetFactories, targetFactory) } +func (d *DON) AddTriggerCapability(triggerFactory TriggerFactory) { + d.triggerFactories = append(d.triggerFactories, triggerFactory) +} + func (d *DON) AddExternalTriggerCapability(triggerFactory TriggerFactory) { d.triggerFactories = append(d.triggerFactories, triggerFactory) @@ -243,7 +329,7 @@ func (d *DON) AddExternalTriggerCapability(triggerFactory TriggerFactory) { }, } - d.externalCapabilities = append(d.externalCapabilities, triggerCapability) + d.publishedCapabilities = append(d.publishedCapabilities, triggerCapability) } func (d *DON) AddJob(ctx context.Context, j *job.Job) error { @@ -323,9 +409,10 @@ func (d *DON) AddOCR3NonStandardCapability() { CapabilityType: uint8(registrysyncer.ContractCapabilityTypeConsensus), } - d.externalCapabilities = append(d.externalCapabilities, capability{ + d.publishedCapabilities = append(d.publishedCapabilities, capability{ donCapabilityConfig: newCapabilityConfig(), registryConfig: ocr, + internalOnly: true, }) } @@ -357,7 +444,7 @@ func (d *DON) AddEthereumWriteTargetNonStandardCapability(forwarderAddr common.A }, } - d.externalCapabilities = append(d.externalCapabilities, capability{ + d.publishedCapabilities = append(d.publishedCapabilities, capability{ donCapabilityConfig: targetCapabilityConfig, registryConfig: writeChain, }) @@ -366,7 +453,7 @@ func (d *DON) AddEthereumWriteTargetNonStandardCapability(forwarderAddr common.A } func addOCR3Capability(ctx context.Context, t *testing.T, lggr logger.Logger, capabilityRegistry *capabilities.Registry, - libocr *MockLibOCR, donF uint8, ocr2KeyBundle ocr2key.KeyBundle) { + libocr *FakeLibOCR, donF uint8, ocr2KeyBundle ocr2key.KeyBundle) { requestTimeout := 10 * time.Minute cfg := ocr3.Config{ Logger: lggr, @@ -394,6 +481,6 @@ func addOCR3Capability(ctx context.Context, t *testing.T, lggr logger.Logger, ca libocr.AddNode(plugin, transmitter, ocr2KeyBundle) } -func Context(tb testing.TB) context.Context { - return testutils.Context(tb) +func Context(tb testing.TB) (ctx context.Context, cancel func()) { + return context.WithCancel(testutils.Context(tb)) } diff --git a/core/capabilities/integration_tests/framework/mock_dispatcher.go b/core/capabilities/integration_tests/framework/fake_dispatcher.go similarity index 62% rename from core/capabilities/integration_tests/framework/mock_dispatcher.go rename to core/capabilities/integration_tests/framework/fake_dispatcher.go index f208933f1f1..cc6655a035c 100644 --- a/core/capabilities/integration_tests/framework/mock_dispatcher.go +++ b/core/capabilities/integration_tests/framework/fake_dispatcher.go @@ -2,11 +2,15 @@ package framework import ( "context" + "encoding/hex" + "errors" "fmt" "sync" "testing" "time" + "github.com/smartcontractkit/libocr/ragep2p/types" + "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" @@ -16,11 +20,12 @@ import ( "google.golang.org/protobuf/proto" ) -// MockRageP2PNetwork backs the dispatchers created for each node in the test and effectively +// FakeRageP2PNetwork backs the dispatchers created for each node in the test and effectively // acts as the rageP2P network layer. -type MockRageP2PNetwork struct { +type FakeRageP2PNetwork struct { services.StateMachine - t *testing.T + t *testing.T + readyError error chanBufferSize int stopCh services.StopChan @@ -28,34 +33,56 @@ type MockRageP2PNetwork struct { peerIDToBrokerNode map[p2ptypes.PeerID]*brokerNode + capabilityRegistrations map[CapabilityRegistration]bool + mux sync.Mutex } -func NewMockRageP2PNetwork(t *testing.T, chanBufferSize int) *MockRageP2PNetwork { - return &MockRageP2PNetwork{ - t: t, - stopCh: make(services.StopChan), - chanBufferSize: chanBufferSize, - peerIDToBrokerNode: make(map[p2ptypes.PeerID]*brokerNode), +func NewFakeRageP2PNetwork(ctx context.Context, t *testing.T, chanBufferSize int) *FakeRageP2PNetwork { + network := &FakeRageP2PNetwork{ + t: t, + stopCh: make(services.StopChan), + chanBufferSize: chanBufferSize, + peerIDToBrokerNode: make(map[p2ptypes.PeerID]*brokerNode), + capabilityRegistrations: make(map[CapabilityRegistration]bool), } + + go func() { + <-ctx.Done() + network.SetReadyError(errors.New("context done")) + }() + + return network } -func (a *MockRageP2PNetwork) Start(ctx context.Context) error { - return a.StartOnce("MockRageP2PNetwork", func() error { +func (a *FakeRageP2PNetwork) Start(ctx context.Context) error { + return a.StartOnce("FakeRageP2PNetwork", func() error { return nil }) } -func (a *MockRageP2PNetwork) Close() error { - return a.StopOnce("MockRageP2PNetwork", func() error { +func (a *FakeRageP2PNetwork) Close() error { + return a.StopOnce("FakeRageP2PNetwork", func() error { close(a.stopCh) a.wg.Wait() return nil }) } +func (a *FakeRageP2PNetwork) Ready() error { + a.mux.Lock() + defer a.mux.Unlock() + return a.readyError +} + +func (a *FakeRageP2PNetwork) SetReadyError(err error) { + a.mux.Lock() + defer a.mux.Unlock() + a.readyError = err +} + // NewDispatcherForNode creates a new dispatcher for a node with the given peer ID. -func (a *MockRageP2PNetwork) NewDispatcherForNode(nodePeerID p2ptypes.PeerID) remotetypes.Dispatcher { +func (a *FakeRageP2PNetwork) NewDispatcherForNode(nodePeerID p2ptypes.PeerID) remotetypes.Dispatcher { return &brokerDispatcher{ callerPeerID: nodePeerID, broker: a, @@ -63,18 +90,41 @@ func (a *MockRageP2PNetwork) NewDispatcherForNode(nodePeerID p2ptypes.PeerID) re } } -func (a *MockRageP2PNetwork) HealthReport() map[string]error { +func (a *FakeRageP2PNetwork) HealthReport() map[string]error { return nil } -func (a *MockRageP2PNetwork) Name() string { - return "MockRageP2PNetwork" +func (a *FakeRageP2PNetwork) Name() string { + return "FakeRageP2PNetwork" +} + +type CapabilityRegistration struct { + nodePeerID string + capabilityID string + capabilityDonID uint32 } -func (a *MockRageP2PNetwork) registerReceiverNode(nodePeerID p2ptypes.PeerID, capabilityID string, capabilityDonID uint32, receiver remotetypes.Receiver) { +func (a *FakeRageP2PNetwork) GetCapabilityRegistrations() map[CapabilityRegistration]bool { a.mux.Lock() defer a.mux.Unlock() + copiedRegistrations := make(map[CapabilityRegistration]bool) + for k, v := range a.capabilityRegistrations { + copiedRegistrations[k] = v + } + return copiedRegistrations +} + +func (a *FakeRageP2PNetwork) registerReceiverNode(nodePeerID p2ptypes.PeerID, capabilityID string, capabilityDonID uint32, receiver remotetypes.Receiver) { + a.mux.Lock() + defer a.mux.Unlock() + + a.capabilityRegistrations[CapabilityRegistration{ + nodePeerID: hex.EncodeToString(nodePeerID[:]), + capabilityID: capabilityID, + capabilityDonID: capabilityDonID, + }] = true + node, ok := a.peerIDToBrokerNode[nodePeerID] if !ok { node = a.newNode() @@ -90,9 +140,10 @@ func (a *MockRageP2PNetwork) registerReceiverNode(nodePeerID p2ptypes.PeerID, ca } } -func (a *MockRageP2PNetwork) Send(msg *remotetypes.MessageBody) { +func (a *FakeRageP2PNetwork) Send(msg *remotetypes.MessageBody) { peerID := toPeerID(msg.Receiver) - node, ok := a.peerIDToBrokerNode[peerID] + + node, ok := a.getNodeForPeerID(peerID) if !ok { panic(fmt.Sprintf("node not found for peer ID %v", peerID)) } @@ -100,6 +151,13 @@ func (a *MockRageP2PNetwork) Send(msg *remotetypes.MessageBody) { node.receiveCh <- msg } +func (a *FakeRageP2PNetwork) getNodeForPeerID(peerID types.PeerID) (*brokerNode, bool) { + a.mux.Lock() + defer a.mux.Unlock() + node, ok := a.peerIDToBrokerNode[peerID] + return node, ok +} + type brokerNode struct { registerReceiverCh chan *registerReceiverRequest receiveCh chan *remotetypes.MessageBody @@ -115,7 +173,7 @@ type registerReceiverRequest struct { receiver remotetypes.Receiver } -func (a *MockRageP2PNetwork) newNode() *brokerNode { +func (a *FakeRageP2PNetwork) newNode() *brokerNode { n := &brokerNode{ receiveCh: make(chan *remotetypes.MessageBody, a.chanBufferSize), registerReceiverCh: make(chan *registerReceiverRequest, a.chanBufferSize), @@ -155,6 +213,7 @@ func toPeerID(id []byte) p2ptypes.PeerID { type broker interface { Send(msg *remotetypes.MessageBody) + Ready() error } type brokerDispatcher struct { @@ -190,7 +249,7 @@ func (t *brokerDispatcher) SetReceiver(capabilityId string, donId uint32, receiv } t.receivers[k] = receiver - t.broker.(*MockRageP2PNetwork).registerReceiverNode(t.callerPeerID, capabilityId, donId, receiver) + t.broker.(*FakeRageP2PNetwork).registerReceiverNode(t.callerPeerID, capabilityId, donId, receiver) return nil } func (t *brokerDispatcher) RemoveReceiver(capabilityId string, donId uint32) {} @@ -202,7 +261,7 @@ func (t *brokerDispatcher) Close() error { } func (t *brokerDispatcher) Ready() error { - return nil + return t.broker.Ready() } func (t *brokerDispatcher) HealthReport() map[string]error { @@ -210,5 +269,5 @@ func (t *brokerDispatcher) HealthReport() map[string]error { } func (t *brokerDispatcher) Name() string { - return "mockDispatcher" + return "fakeDispatcher" } diff --git a/core/capabilities/integration_tests/framework/mock_libocr.go b/core/capabilities/integration_tests/framework/fake_libocr.go similarity index 64% rename from core/capabilities/integration_tests/framework/mock_libocr.go rename to core/capabilities/integration_tests/framework/fake_libocr.go index 39705031f55..0f378a39129 100644 --- a/core/capabilities/integration_tests/framework/mock_libocr.go +++ b/core/capabilities/integration_tests/framework/fake_libocr.go @@ -24,56 +24,105 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" ) +type oracleContext struct { + t *testing.T + lggr logger.Logger + key ocr2key.KeyBundle + N int + F uint8 + protocolRoundInterval time.Duration + mux sync.Mutex + pluginNameToFakeOcr map[string]*FakeLibOCR +} + +func (m *oracleContext) addPlugin(ctx context.Context, info ocr3types.ReportingPluginInfo, plugin ocr3types.ReportingPlugin[[]byte], + args coretypes.OracleArgs) error { + m.mux.Lock() + defer m.mux.Unlock() + + libOcr := m.pluginNameToFakeOcr[info.Name] + if libOcr == nil { + libOcr = NewFakeLibOCR(m.t, m.lggr, m.F, m.protocolRoundInterval) + m.pluginNameToFakeOcr[info.Name] = libOcr + } + + libOcr.AddNode(plugin, args.ContractTransmitter, m.key) + + if libOcr.GetNodeCount() == m.N { + err := libOcr.Start(ctx) + if err != nil { + return fmt.Errorf("failed to start fake lib ocr: %w", err) + } + } + return nil +} + +func (m *oracleContext) Close() error { + m.mux.Lock() + defer m.mux.Unlock() + + for _, libOcr := range m.pluginNameToFakeOcr { + if err := libOcr.Close(); err != nil { + return fmt.Errorf("failed to close fake lib ocr: %w", err) + } + } + return nil +} + type oracleFactoryFactory struct { - mockLibOCr *MockLibOCR - key ocr2key.KeyBundle - N int - F int + oracleContext *oracleContext } -func newMockLibOcrOracleFactory(mockLibOCr *MockLibOCR, key ocr2key.KeyBundle, N int, F int) *oracleFactoryFactory { +func newFakeOracleFactoryFactory(t *testing.T, lggr logger.Logger, key ocr2key.KeyBundle, n int, f uint8, protocolRoundInterval time.Duration) *oracleFactoryFactory { return &oracleFactoryFactory{ - mockLibOCr: mockLibOCr, - key: key, - N: N, - F: F, + oracleContext: &oracleContext{ + t: t, + lggr: lggr, + key: key, + N: n, + F: f, + protocolRoundInterval: protocolRoundInterval, + pluginNameToFakeOcr: make(map[string]*FakeLibOCR), + }, } } func (o *oracleFactoryFactory) NewOracleFactory(params generic.OracleFactoryParams) (coretypes.OracleFactory, error) { - return &mockOracleFactory{o}, nil + return &fakeOracleFactory{o.oracleContext}, nil } -type mockOracle struct { - *mockOracleFactory - args coretypes.OracleArgs - libocrNodeID string +type fakeOracleFactory struct { + oracleContext *oracleContext } -func (m *mockOracle) Start(ctx context.Context) error { - plugin, _, err := m.args.ReportingPluginFactoryService.NewReportingPlugin(ctx, ocr3types.ReportingPluginConfig{ - F: m.F, - N: m.N, +func (m *fakeOracleFactory) NewOracle(ctx context.Context, args coretypes.OracleArgs) (coretypes.Oracle, error) { + return &fakeOracle{oracleContext: m.oracleContext, args: args}, nil +} + +type fakeOracle struct { + oracleContext *oracleContext + args coretypes.OracleArgs +} + +func (m *fakeOracle) Start(ctx context.Context) error { + plugin, info, err := m.args.ReportingPluginFactoryService.NewReportingPlugin(ctx, ocr3types.ReportingPluginConfig{ + F: int(m.oracleContext.F), + N: m.oracleContext.N, }) + if err != nil { return fmt.Errorf("failed to create reporting plugin: %w", err) } - m.libocrNodeID = m.mockLibOCr.AddNode(plugin, m.args.ContractTransmitter, m.key) - return nil -} + if err = m.oracleContext.addPlugin(ctx, info, plugin, m.args); err != nil { + return fmt.Errorf("failed to add plugin: %w", err) + } -func (m *mockOracle) Close(ctx context.Context) error { - m.mockLibOCr.RemoveNode(m.libocrNodeID) return nil } -type mockOracleFactory struct { - *oracleFactoryFactory -} - -func (m *mockOracleFactory) NewOracle(ctx context.Context, args coretypes.OracleArgs) (coretypes.Oracle, error) { - return &mockOracle{mockOracleFactory: m, args: args}, nil +func (m *fakeOracle) Close(ctx context.Context) error { + return m.oracleContext.Close() } type libocrNode struct { @@ -83,9 +132,9 @@ type libocrNode struct { key ocr2key.KeyBundle } -// MockLibOCR is a mock libocr implementation for testing purposes that simulates libocr protocol rounds without having +// FakeLibOCR is a fake libocr implementation for testing purposes that simulates libocr protocol rounds without having // to setup the libocr network -type MockLibOCR struct { +type FakeLibOCR struct { services.StateMachine t *testing.T lggr logger.Logger @@ -102,8 +151,8 @@ type MockLibOCR struct { wg sync.WaitGroup } -func NewMockLibOCR(t *testing.T, lggr logger.Logger, f uint8, protocolRoundInterval time.Duration) *MockLibOCR { - return &MockLibOCR{ +func NewFakeLibOCR(t *testing.T, lggr logger.Logger, f uint8, protocolRoundInterval time.Duration) *FakeLibOCR { + return &FakeLibOCR{ t: t, lggr: lggr, f: f, outcomeCtx: ocr3types.OutcomeContext{ @@ -117,8 +166,8 @@ func NewMockLibOCR(t *testing.T, lggr logger.Logger, f uint8, protocolRoundInter } } -func (m *MockLibOCR) Start(ctx context.Context) error { - return m.StartOnce("MockLibOCR", func() error { +func (m *FakeLibOCR) Start(ctx context.Context) error { + return m.StartOnce("FakeLibOCR", func() error { m.wg.Add(1) go func() { defer m.wg.Done() @@ -144,15 +193,15 @@ func (m *MockLibOCR) Start(ctx context.Context) error { }) } -func (m *MockLibOCR) Close() error { - return m.StopOnce("MockLibOCR", func() error { +func (m *FakeLibOCR) Close() error { + return m.StopOnce("FakeLibOCR", func() error { close(m.stopCh) m.wg.Wait() return nil }) } -func (m *MockLibOCR) AddNode(plugin ocr3types.ReportingPlugin[[]byte], transmitter ocr3types.ContractTransmitter[[]byte], key ocr2key.KeyBundle) string { +func (m *FakeLibOCR) AddNode(plugin ocr3types.ReportingPlugin[[]byte], transmitter ocr3types.ContractTransmitter[[]byte], key ocr2key.KeyBundle) string { m.mux.Lock() defer m.mux.Unlock() node := &libocrNode{uuid.New().String(), plugin, transmitter, key} @@ -160,7 +209,13 @@ func (m *MockLibOCR) AddNode(plugin ocr3types.ReportingPlugin[[]byte], transmitt return node.id } -func (m *MockLibOCR) RemoveNode(id string) { +func (m *FakeLibOCR) GetNodeCount() int { + m.mux.Lock() + defer m.mux.Unlock() + return len(m.nodes) +} + +func (m *FakeLibOCR) RemoveNode(id string) { m.mux.Lock() defer m.mux.Unlock() @@ -174,7 +229,7 @@ func (m *MockLibOCR) RemoveNode(id string) { m.nodes = updatedNodes } -func (m *MockLibOCR) simulateProtocolRound(ctx context.Context) error { +func (m *FakeLibOCR) simulateProtocolRound(ctx context.Context) error { m.mux.Lock() defer m.mux.Unlock() if len(m.nodes) == 0 { diff --git a/core/capabilities/integration_tests/framework/mock_target.go b/core/capabilities/integration_tests/framework/fake_target.go similarity index 80% rename from core/capabilities/integration_tests/framework/mock_target.go rename to core/capabilities/integration_tests/framework/fake_target.go index e9c03deaca2..442d35e3595 100644 --- a/core/capabilities/integration_tests/framework/mock_target.go +++ b/core/capabilities/integration_tests/framework/fake_target.go @@ -9,7 +9,7 @@ import ( ) var ( - _ capabilities.ActionCapability = &mockTarget{} + _ capabilities.ActionCapability = &fakeTarget{} ) type TargetSink struct { @@ -18,7 +18,7 @@ type TargetSink struct { targetName string version string - targets []mockTarget + targets []fakeTarget Sink chan capabilities.CapabilityRequest } @@ -56,7 +56,7 @@ func (ts *TargetSink) Close() error { } func (ts *TargetSink) CreateNewTarget(t *testing.T) capabilities.TargetCapability { - target := mockTarget{ + target := fakeTarget{ t: t, targetID: ts.targetID, ch: ts.Sink, @@ -65,29 +65,29 @@ func (ts *TargetSink) CreateNewTarget(t *testing.T) capabilities.TargetCapabilit return &target } -type mockTarget struct { +type fakeTarget struct { t *testing.T targetID string ch chan capabilities.CapabilityRequest } -func (mt *mockTarget) Execute(ctx context.Context, rawRequest capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) { +func (mt *fakeTarget) Execute(ctx context.Context, rawRequest capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) { mt.ch <- rawRequest return capabilities.CapabilityResponse{}, nil } -func (mt *mockTarget) Info(ctx context.Context) (capabilities.CapabilityInfo, error) { +func (mt *fakeTarget) Info(ctx context.Context) (capabilities.CapabilityInfo, error) { return capabilities.MustNewCapabilityInfo( mt.targetID, capabilities.CapabilityTypeTarget, - "mock target for target ID "+mt.targetID, + "fake target for target ID "+mt.targetID, ), nil } -func (mt *mockTarget) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { +func (mt *fakeTarget) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { return nil } -func (mt *mockTarget) UnregisterFromWorkflow(ctx context.Context, request capabilities.UnregisterFromWorkflowRequest) error { +func (mt *fakeTarget) UnregisterFromWorkflow(ctx context.Context, request capabilities.UnregisterFromWorkflowRequest) error { return nil } diff --git a/core/capabilities/integration_tests/framework/mock_trigger.go b/core/capabilities/integration_tests/framework/fake_trigger.go similarity index 84% rename from core/capabilities/integration_tests/framework/mock_trigger.go rename to core/capabilities/integration_tests/framework/fake_trigger.go index afc874af6c3..4274eddf5ca 100644 --- a/core/capabilities/integration_tests/framework/mock_trigger.go +++ b/core/capabilities/integration_tests/framework/fake_trigger.go @@ -20,7 +20,7 @@ type TriggerSink struct { triggerName string version string - triggers []mockTrigger + triggers []fakeTrigger stopCh services.StopChan wg sync.WaitGroup @@ -80,12 +80,12 @@ func (r *TriggerSink) SendOutput(outputs *values.Map) { } func (r *TriggerSink) CreateNewTrigger(t *testing.T) capabilities.TriggerCapability { - trigger := newMockTrigger(t, r.triggerID, &r.wg, r.stopCh) + trigger := newFakeTrigger(t, r.triggerID, &r.wg, r.stopCh) r.triggers = append(r.triggers, trigger) return &trigger } -type mockTrigger struct { +type fakeTrigger struct { t *testing.T triggerID string cancel context.CancelFunc @@ -95,8 +95,8 @@ type mockTrigger struct { stopCh services.StopChan } -func newMockTrigger(t *testing.T, triggerID string, wg *sync.WaitGroup, stopCh services.StopChan) mockTrigger { - return mockTrigger{ +func newFakeTrigger(t *testing.T, triggerID string, wg *sync.WaitGroup, stopCh services.StopChan) fakeTrigger { + return fakeTrigger{ t: t, triggerID: triggerID, toSend: make(chan capabilities.TriggerResponse, 1000), @@ -105,19 +105,19 @@ func newMockTrigger(t *testing.T, triggerID string, wg *sync.WaitGroup, stopCh s } } -func (s *mockTrigger) sendResponse(resp capabilities.TriggerResponse) { +func (s *fakeTrigger) sendResponse(resp capabilities.TriggerResponse) { s.toSend <- resp } -func (s *mockTrigger) Info(ctx context.Context) (capabilities.CapabilityInfo, error) { +func (s *fakeTrigger) Info(ctx context.Context) (capabilities.CapabilityInfo, error) { return capabilities.MustNewCapabilityInfo( s.triggerID, capabilities.CapabilityTypeTrigger, - "mock trigger for trigger id "+s.triggerID, + "fake trigger for trigger id "+s.triggerID, ), nil } -func (s *mockTrigger) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) { +func (s *fakeTrigger) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) { if s.cancel != nil { s.t.Fatal("trigger already registered") } @@ -144,7 +144,7 @@ func (s *mockTrigger) RegisterTrigger(ctx context.Context, request capabilities. return responseCh, nil } -func (s *mockTrigger) UnregisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) error { +func (s *fakeTrigger) UnregisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) error { if s.cancel == nil { s.t.Fatal("trigger not registered") } diff --git a/core/capabilities/integration_tests/keystone/keystone_test.go b/core/capabilities/integration_tests/keystone/keystone_test.go index 033bb8a2c76..17bfde7cda9 100644 --- a/core/capabilities/integration_tests/keystone/keystone_test.go +++ b/core/capabilities/integration_tests/keystone/keystone_test.go @@ -17,7 +17,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/capabilities/integration_tests/framework" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/feeds_consumer" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" reporttypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3/types" ) @@ -31,7 +30,9 @@ func Test_OneAtATimeTransmissionSchedule(t *testing.T) { } func testTransmissionSchedule(t *testing.T, deltaStage string, schedule string) { - ctx := testutils.Context(t) + ctx, cancel := framework.Context(t) + defer cancel() + lggr := logger.TestLogger(t) lggr.SetLogLevel(zapcore.InfoLevel) @@ -107,7 +108,7 @@ func waitForConsumerReports(ctx context.Context, t *testing.T, consumer *feeds_c for { select { case <-ctxWithTimeout.Done(): - t.Fatalf("timed out waiting for feed reports, expected %d, received %d", len(triggerFeedReports), feedCount) + t.Fatalf("timed out waiting for feeds reports, expected %d, received %d", len(triggerFeedReports), feedCount) case err := <-feedsSub.Err(): require.NoError(t, err) case feed := <-feedsReceived: diff --git a/core/capabilities/integration_tests/keystone/setup.go b/core/capabilities/integration_tests/keystone/setup.go index f90b582d0ee..b9b98baaf7e 100644 --- a/core/capabilities/integration_tests/keystone/setup.go +++ b/core/capabilities/integration_tests/keystone/setup.go @@ -11,6 +11,8 @@ import ( "github.com/smartcontractkit/libocr/offchainreporting2plus/chains/evmutil" + "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/datastreams" v3 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v3" @@ -46,9 +48,11 @@ func setupKeystoneDons(ctx context.Context, t *testing.T, lggr logger.SugaredLog triggerDon := createKeystoneTriggerDon(ctx, t, lggr, triggerDonInfo, donContext, trigger) - workflowDon.Start(ctx, t) - triggerDon.Start(ctx, t) - writeTargetDon.Start(ctx, t) + servicetest.Run(t, workflowDon) + servicetest.Run(t, triggerDon) + servicetest.Run(t, writeTargetDon) + + donContext.WaitForCapabilitiesToBeExposed(t, workflowDon, triggerDon, writeTargetDon) return workflowDon, consumer } diff --git a/core/services/registrysyncer/monitoring.go b/core/services/registrysyncer/monitoring.go index 97fd181515c..027d8a953d8 100644 --- a/core/services/registrysyncer/monitoring.go +++ b/core/services/registrysyncer/monitoring.go @@ -12,39 +12,38 @@ import ( localMonitoring "github.com/smartcontractkit/chainlink/v2/core/monitoring" ) -var remoteRegistrySyncFailureCounter metric.Int64Counter -var launcherFailureCounter metric.Int64Counter +// syncerMetricLabeler wraps monitoring.MetricsLabeler to provide workflow specific utilities +// for monitoring resources +type syncerMetricLabeler struct { + metrics.Labeler + remoteRegistrySyncFailureCounter metric.Int64Counter + launcherFailureCounter metric.Int64Counter +} -func initMonitoringResources() (err error) { - remoteRegistrySyncFailureCounter, err = beholder.GetMeter().Int64Counter("platform_registrysyncer_sync_failures") +func newSyncerMetricLabeler() (*syncerMetricLabeler, error) { + remoteRegistrySyncFailureCounter, err := beholder.GetMeter().Int64Counter("platform_registrysyncer_sync_failures") if err != nil { - return fmt.Errorf("failed to register sync failure counter: %w", err) + return nil, fmt.Errorf("failed to register sync failure counter: %w", err) } - launcherFailureCounter, err = beholder.GetMeter().Int64Counter("platform_registrysyncer_launch_failures") + launcherFailureCounter, err := beholder.GetMeter().Int64Counter("platform_registrysyncer_launch_failures") if err != nil { - return fmt.Errorf("failed to register launcher failure counter: %w", err) + return nil, fmt.Errorf("failed to register launcher failure counter: %w", err) } - return nil -} - -// syncerMetricLabeler wraps monitoring.MetricsLabeler to provide workflow specific utilities -// for monitoring resources -type syncerMetricLabeler struct { - metrics.Labeler + return &syncerMetricLabeler{remoteRegistrySyncFailureCounter: remoteRegistrySyncFailureCounter, launcherFailureCounter: launcherFailureCounter}, nil } -func (c syncerMetricLabeler) with(keyValues ...string) syncerMetricLabeler { - return syncerMetricLabeler{c.With(keyValues...)} +func (c *syncerMetricLabeler) with(keyValues ...string) syncerMetricLabeler { + return syncerMetricLabeler{c.With(keyValues...), c.remoteRegistrySyncFailureCounter, c.launcherFailureCounter} } -func (c syncerMetricLabeler) incrementRemoteRegistryFailureCounter(ctx context.Context) { +func (c *syncerMetricLabeler) incrementRemoteRegistryFailureCounter(ctx context.Context) { otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels) - remoteRegistrySyncFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) + c.remoteRegistrySyncFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c syncerMetricLabeler) incrementLauncherFailureCounter(ctx context.Context) { +func (c *syncerMetricLabeler) incrementLauncherFailureCounter(ctx context.Context) { otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels) - launcherFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) + c.launcherFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } diff --git a/core/services/registrysyncer/monitoring_test.go b/core/services/registrysyncer/monitoring_test.go index 1ddb6c57997..30d773aa976 100644 --- a/core/services/registrysyncer/monitoring_test.go +++ b/core/services/registrysyncer/monitoring_test.go @@ -9,11 +9,12 @@ import ( ) func Test_InitMonitoringResources(t *testing.T) { - require.NoError(t, initMonitoringResources()) + _, err := newSyncerMetricLabeler() + require.NoError(t, err) } func Test_SyncerMetricsLabeler(t *testing.T) { - testSyncerMetricLabeler := syncerMetricLabeler{metrics.NewLabeler()} + testSyncerMetricLabeler := syncerMetricLabeler{metrics.NewLabeler(), nil, nil} testSyncerMetricLabeler2 := testSyncerMetricLabeler.with("foo", "baz") require.EqualValues(t, testSyncerMetricLabeler2.Labels["foo"], "baz") } diff --git a/core/services/registrysyncer/syncer.go b/core/services/registrysyncer/syncer.go index 5fc241ad249..461824b403b 100644 --- a/core/services/registrysyncer/syncer.go +++ b/core/services/registrysyncer/syncer.go @@ -44,7 +44,7 @@ type RegistrySyncer interface { type registrySyncer struct { services.StateMachine - metrics syncerMetricLabeler + metrics *syncerMetricLabeler stopCh services.StopChan launchers []Launcher reader types.ContractReader @@ -76,7 +76,14 @@ func New( registryAddress string, orm ORM, ) (RegistrySyncer, error) { + + metricLabeler, err := newSyncerMetricLabeler() + if err != nil { + return nil, fmt.Errorf("failed to create syncer metric labeler: %w", err) + } + return ®istrySyncer{ + metrics: metricLabeler, stopCh: make(services.StopChan), updateChan: make(chan *LocalRegistry), lggr: lggr.Named("RegistrySyncer"), @@ -131,11 +138,6 @@ func newReader(ctx context.Context, lggr logger.Logger, relayer ContractReaderFa func (s *registrySyncer) Start(ctx context.Context) error { return s.StartOnce("RegistrySyncer", func() error { - err := initMonitoringResources() - if err != nil { - return err - } - s.wg.Add(1) go func() { defer s.wg.Done()