Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

build test #15833

Draft
wants to merge 6 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 55 additions & 32 deletions core/capabilities/compute/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
coretypes "github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/validation"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi"
"github.com/smartcontractkit/chainlink/v2/core/platform"
Expand Down Expand Up @@ -76,10 +75,15 @@ var (

var _ capabilities.ActionCapability = (*Compute)(nil)

type FetcherFn func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error)

type FetcherFactory interface {
NewFetcher(log logger.Logger, emitter custmsg.MessageEmitter) FetcherFn
}

type Compute struct {
stopCh services.StopChan
log logger.Logger
metrics *computeMetricsLabeler
stopCh services.StopChan
log logger.Logger

// emitter is used to emit messages from the WASM module to a configured collector.
emitter custmsg.MessageEmitter
Expand All @@ -88,9 +92,9 @@ type Compute struct {

// transformer is used to transform a values.Map into a ParsedConfig struct on each execution
// of a request.
transformer *transformer
outgoingConnectorHandler *webapi.OutgoingConnectorHandler
idGenerator func() string
transformer *transformer

fetcherFactory FetcherFactory

numWorkers int
queue chan request
Expand Down Expand Up @@ -185,7 +189,7 @@ func (c *Compute) execute(ctx context.Context, respCh chan response, req capabil
func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, requestMetadata capabilities.RequestMetadata) (*module, error) {
initStart := time.Now()

cfg.Fetch = c.createFetcher()
cfg.Fetch = c.fetcherFactory.NewFetcher(c.log, c.emitter)
mod, err := host.NewModule(cfg, binary)
if err != nil {
return nil, fmt.Errorf("failed to instantiate WASM module: %w", err)
Expand Down Expand Up @@ -289,7 +293,32 @@ func (c *Compute) Close() error {
return nil
}

func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
type outgoingConnectorFetcherFactory struct {
outgoingConnectorHandler *webapi.OutgoingConnectorHandler
idGenerator func() string
metrics *computeMetricsLabeler
}

func NewOutgoingConnectorFetcherFactory(
outgoingConnectorHandler *webapi.OutgoingConnectorHandler,
idGenerator func() string,
) (FetcherFactory, error) {

metricsLabeler, err := newComputeMetricsLabeler(metrics.NewLabeler().With("capability", CapabilityIDCompute))
if err != nil {
return nil, fmt.Errorf("failed to create compute metrics labeler: %w", err)
}

factory := &outgoingConnectorFetcherFactory{
outgoingConnectorHandler: outgoingConnectorHandler,
idGenerator: idGenerator,
metrics: metricsLabeler,
}

return factory, nil
}

func (f *outgoingConnectorFetcherFactory) NewFetcher(log logger.Logger, emitter custmsg.MessageEmitter) FetcherFn {
return func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowId); err != nil {
return nil, fmt.Errorf("workflow ID %q is invalid: %w", req.Metadata.WorkflowId, err)
Expand All @@ -298,7 +327,7 @@ func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchReq
return nil, fmt.Errorf("workflow execution ID %q is invalid: %w", req.Metadata.WorkflowExecutionId, err)
}

cma := c.emitter.With(
cma := emitter.With(
platform.KeyWorkflowID, req.Metadata.WorkflowId,
platform.KeyWorkflowName, req.Metadata.WorkflowName,
platform.KeyWorkflowOwner, req.Metadata.WorkflowOwner,
Expand All @@ -309,7 +338,7 @@ func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchReq
messageID := strings.Join([]string{
req.Metadata.WorkflowExecutionId,
ghcapabilities.MethodComputeAction,
c.idGenerator(),
f.idGenerator(),
}, "/")

fields := req.Headers.GetFields()
Expand All @@ -318,7 +347,7 @@ func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchReq
headersReq[k] = v.String()
}

resp, err := c.outgoingConnectorHandler.HandleSingleNodeRequest(ctx, messageID, ghcapabilities.Request{
resp, err := f.outgoingConnectorHandler.HandleSingleNodeRequest(ctx, messageID, ghcapabilities.Request{
URL: req.Url,
Method: req.Method,
Headers: headersReq,
Expand All @@ -329,14 +358,14 @@ func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchReq
return nil, err
}

c.log.Debugw("received gateway response", "resp", resp)
log.Debugw("received gateway response", "resp", resp)
var response wasmpb.FetchResponse
err = json.Unmarshal(resp.Body.Payload, &response)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal fetch response: %w", err)
}

c.metrics.with(
f.metrics.with(
"status", strconv.FormatUint(uint64(response.StatusCode), 10),
platform.KeyWorkflowID, req.Metadata.WorkflowId,
platform.KeyWorkflowName, req.Metadata.WorkflowName,
Expand All @@ -348,7 +377,7 @@ func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchReq
msg := fmt.Sprintf("compute fetch request failed with status code %d", response.StatusCode)
err = cma.Emit(ctx, msg)
if err != nil {
c.log.Errorf("failed to send custom message with msg: %s, err: %v", msg, err)
log.Errorf("failed to send custom message with msg: %s, err: %v", msg, err)
}
}

Expand All @@ -369,32 +398,26 @@ func NewAction(
config Config,
log logger.Logger,
registry coretypes.CapabilitiesRegistry,
handler *webapi.OutgoingConnectorHandler,
idGenerator func() string,
fetcherFactory FetcherFactory,
opts ...func(*Compute),
) (*Compute, error) {
if config.NumWorkers == 0 {
config.NumWorkers = defaultNumWorkers
}
metricsLabeler, err := newComputeMetricsLabeler(metrics.NewLabeler().With("capability", CapabilityIDCompute))
if err != nil {
return nil, fmt.Errorf("failed to create compute metrics labeler: %w", err)
}

var (
lggr = logger.Named(log, "CustomCompute")
labeler = custmsg.NewLabeler()
compute = &Compute{
stopCh: make(services.StopChan),
log: lggr,
emitter: labeler,
metrics: metricsLabeler,
registry: registry,
modules: newModuleCache(clockwork.NewRealClock(), 1*time.Minute, 10*time.Minute, 3),
transformer: NewTransformer(lggr, labeler),
outgoingConnectorHandler: handler,
idGenerator: idGenerator,
queue: make(chan request),
numWorkers: defaultNumWorkers,
stopCh: make(services.StopChan),
log: lggr,
emitter: labeler,
registry: registry,
modules: newModuleCache(clockwork.NewRealClock(), 1*time.Minute, 10*time.Minute, 3),
transformer: NewTransformer(lggr, labeler),
fetcherFactory: fetcherFactory,
queue: make(chan request),
numWorkers: defaultNumWorkers,
}
)

Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/compute/compute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func setup(t *testing.T, config Config) testHarness {
connectorHandler, err := webapi.NewOutgoingConnectorHandler(connector, config.ServiceConfig, ghcapabilities.MethodComputeAction, log)
require.NoError(t, err)

compute, err := NewAction(config, log, registry, connectorHandler, idGeneratorFn)
compute, err := NewAction(config, log, registry, NewOutgoingConnectorFetcherFactory(connectorHandler, idGeneratorFn))
require.NoError(t, err)
compute.modules.clock = clockwork.NewFakeClock()

Expand Down
79 changes: 67 additions & 12 deletions core/capabilities/integration_tests/framework/don.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package framework
import (
"context"
"encoding/hex"
"errors"
"fmt"
"strconv"
"testing"
Expand All @@ -12,6 +13,8 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/durationpb"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/compute"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"

"github.com/smartcontractkit/chainlink-common/pkg/services"
Expand Down Expand Up @@ -42,9 +45,12 @@ import (
)

type DonContext struct {
EthBlockchain *EthBlockchain
p2pNetwork *FakeRageP2PNetwork
capabilityRegistry *CapabilitiesRegistry
EthBlockchain *EthBlockchain
p2pNetwork *FakeRageP2PNetwork
capabilityRegistry *CapabilitiesRegistry
workflowRegistry *WorkflowRegistry
syncerFetcherFunc syncer.FetcherFunc
computeFetcherFactory compute.FetcherFactory
}

func CreateDonContext(ctx context.Context, t *testing.T) DonContext {
Expand All @@ -57,6 +63,16 @@ func CreateDonContext(ctx context.Context, t *testing.T) DonContext {
return DonContext{EthBlockchain: ethBlockchain, p2pNetwork: rageP2PNetwork, capabilityRegistry: capabilitiesRegistry}
}

func CreateDonContextWithWorkflowRegistry(ctx context.Context, t *testing.T, syncerFetcherFunc syncer.FetcherFunc,
computeFetcherFactory compute.FetcherFactory) DonContext {
donContext := CreateDonContext(ctx, t)
workflowRegistry := NewWorkflowRegistry(ctx, t, donContext.EthBlockchain)
donContext.workflowRegistry = workflowRegistry
donContext.syncerFetcherFunc = syncerFetcherFunc
donContext.computeFetcherFactory = computeFetcherFactory
return donContext
}

func (c DonContext) WaitForCapabilitiesToBeExposed(t *testing.T, dons ...*DON) {
allExpectedCapabilities := make(map[CapabilityRegistration]bool)
for _, don := range dons {
Expand Down Expand Up @@ -92,12 +108,17 @@ type capabilityNode struct {
type DON struct {
services.StateMachine
t *testing.T
id *uint32
config DonConfiguration
lggr logger.Logger
nodes []*capabilityNode
standardCapabilityJobs []*job.Job
publishedCapabilities []capability
capabilitiesRegistry *CapabilitiesRegistry

initialised bool

capabilitiesRegistry *CapabilitiesRegistry
workflowRegistry *WorkflowRegistry

nodeConfigModifiers []func(c *chainlink.Config, node *capabilityNode)

Expand All @@ -109,7 +130,8 @@ type DON struct {

func NewDON(ctx context.Context, t *testing.T, lggr logger.Logger, donConfig DonConfiguration,
dependentDONs []commoncap.DON, donContext DonContext, supportsOCR bool) *DON {
don := &DON{t: t, lggr: lggr.Named(donConfig.name), config: donConfig, capabilitiesRegistry: donContext.capabilityRegistry}
don := &DON{t: t, lggr: lggr.Named(donConfig.name), config: donConfig, capabilitiesRegistry: donContext.capabilityRegistry,
workflowRegistry: donContext.workflowRegistry}

protocolRoundInterval := 1 * time.Second

Expand Down Expand Up @@ -153,7 +175,7 @@ func NewDON(ctx context.Context, t *testing.T, lggr logger.Logger, donConfig Don
for _, modifier := range don.nodeConfigModifiers {
modifier(c, cn)
}
})
}, donContext.syncerFetcherFunc, donContext.computeFetcherFactory)

require.NoError(t, node.Start(testutils.Context(t)))
cn.TestApplication = node
Expand All @@ -169,6 +191,17 @@ func (d *DON) Initialise() {

//nolint:gosec // disable G115
d.config.DON.ID = uint32(id)
d.id = &d.config.DON.ID

if d.config.AcceptsWorkflows && d.workflowRegistry != nil {
d.workflowRegistry.UpdateAllowedDons([]uint32{d.config.DON.ID})
d.nodeConfigModifiers = append(d.nodeConfigModifiers, func(c *chainlink.Config, node *capabilityNode) {
workflowRegistryAddressStr := d.workflowRegistry.addr.String()
c.Capabilities.WorkflowRegistry.Address = &workflowRegistryAddressStr
})
}
d.initialised = true

}

func (d *DON) GetID() uint32 {
Expand Down Expand Up @@ -343,6 +376,20 @@ func (d *DON) AddJob(ctx context.Context, j *job.Job) error {
return nil
}

func (d *DON) AddWorkflow(workflow Workflow) error {
if !d.config.AcceptsWorkflows {
return errors.New("cannot add workflow to non-workflow DON")
}

if !d.initialised {
return errors.New("cannot add workflow to non-initialised DON")
}

d.workflowRegistry.RegisterWorkflow(workflow, *d.id)

return nil
}

type TriggerFactory interface {
CreateNewTrigger(t *testing.T) commoncap.TriggerCapability
GetTriggerID() string
Expand All @@ -366,12 +413,15 @@ func startNewNode(ctx context.Context,
newOracleFactoryFn standardcapabilities.NewOracleFactoryFn,
keyV2 ethkey.KeyV2,
setupCfg func(c *chainlink.Config),
fetcherFunc syncer.FetcherFunc,
fetcherFactoryFunc compute.FetcherFactory,
) *cltest.TestApplication {
config, _ := heavyweight.FullTestDBV2(t, func(c *chainlink.Config, s *chainlink.Secrets) {
c.Capabilities.ExternalRegistry.ChainID = ptr(fmt.Sprintf("%d", testutils.SimulatedChainID))
c.Capabilities.ExternalRegistry.Address = ptr(capRegistryAddr.String())
c.Capabilities.Peering.V2.Enabled = ptr(true)
c.Feature.FeedsManager = ptr(false)
c.Feature.LogPoller = ptr(true)

if setupCfg != nil {
setupCfg(c)
Expand All @@ -394,7 +444,7 @@ func startNewNode(ctx context.Context,
ethBlockchain.Commit()

return cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, ethBlockchain.Backend, nodeInfo,
dispatcher, peerWrapper, newOracleFactoryFn, localCapabilities, keyV2, lggr)
dispatcher, peerWrapper, newOracleFactoryFn, localCapabilities, keyV2, lggr, fetcherFunc, fetcherFactoryFunc)
}

// Functions below this point are for adding non-standard capabilities to a DON, deliberately verbose. Eventually these
Expand All @@ -416,24 +466,29 @@ func (d *DON) AddOCR3NonStandardCapability() {
})
}

func (d *DON) AddEthereumWriteTargetNonStandardCapability(forwarderAddr common.Address) error {
func (d *DON) AddEthereumWriteTargetNonStandardCapability(forwarderAddr common.Address) (string, error) {
d.nodeConfigModifiers = append(d.nodeConfigModifiers, func(c *chainlink.Config, node *capabilityNode) {
eip55Address := types.EIP55AddressFromAddress(forwarderAddr)
c.EVM[0].Chain.Workflow.ForwarderAddress = &eip55Address
c.EVM[0].Chain.Workflow.FromAddress = &node.key.EIP55Address
})

labelledName := "write_geth-testnet"
version := "1.0.0"

writeChain := kcr.CapabilitiesRegistryCapability{
LabelledName: "write_geth-testnet",
Version: "1.0.0",
LabelledName: labelledName,
Version: version,
CapabilityType: uint8(registrysyncer.ContractCapabilityTypeTarget),
}

capabilityID := fmt.Sprintf("%s@%s", labelledName, version)

targetCapabilityConfig := newCapabilityConfig()

configWithLimit, err := values.WrapMap(map[string]any{"gasLimit": 500000})
if err != nil {
return fmt.Errorf("failed to wrap map: %w", err)
return "", fmt.Errorf("failed to wrap map: %w", err)
}

targetCapabilityConfig.DefaultConfig = values.Proto(configWithLimit).GetMapValue()
Expand All @@ -449,7 +504,7 @@ func (d *DON) AddEthereumWriteTargetNonStandardCapability(forwarderAddr common.A
registryConfig: writeChain,
})

return nil
return capabilityID, nil
}

func addOCR3Capability(ctx context.Context, t *testing.T, lggr logger.Logger, capabilityRegistry *capabilities.Registry,
Expand Down
Loading
Loading