Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Action Remote Shims Register/Unregister #15232

Merged
merged 7 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 52 additions & 6 deletions core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/executable"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/streams"
"github.com/smartcontractkit/chainlink/v2/core/logger"
Expand Down Expand Up @@ -294,12 +294,26 @@ func (w *launcher) addRemoteCapabilities(ctx context.Context, myDON registrysync
return fmt.Errorf("failed to add trigger shim: %w", err)
}
case capabilities.CapabilityTypeAction:
w.lggr.Warn("no remote client configured for capability type action, skipping configuration")
newActionFn := func(info capabilities.CapabilityInfo) (capabilityService, error) {
client := executable.NewClient(
info,
myDON.DON,
w.dispatcher,
defaultTargetRequestTimeout,
w.lggr,
)
return client, nil
}

err := w.addToRegistryAndSetDispatcher(ctx, capability, remoteDON, newActionFn)
if err != nil {
return fmt.Errorf("failed to add action shim: %w", err)
}
case capabilities.CapabilityTypeConsensus:
w.lggr.Warn("no remote client configured for capability type consensus, skipping configuration")
case capabilities.CapabilityTypeTarget:
newTargetFn := func(info capabilities.CapabilityInfo) (capabilityService, error) {
client := target.NewClient(
client := executable.NewClient(
info,
myDON.DON,
w.dispatcher,
Expand Down Expand Up @@ -419,7 +433,34 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee
// continue attempting other capabilities
}
case capabilities.CapabilityTypeAction:
w.lggr.Warn("no remote client configured for capability type action, skipping configuration")
newActionServer := func(cap capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error) {
actionCapability, ok := (cap).(capabilities.ActionCapability)
if !ok {
return nil, errors.New("capability does not implement ActionCapability")
}

remoteConfig := &capabilities.RemoteExecutableConfig{}
if capabilityConfig.RemoteTargetConfig != nil {
remoteConfig.RequestHashExcludedAttributes = capabilityConfig.RemoteTargetConfig.RequestHashExcludedAttributes
}

return executable.NewServer(
capabilityConfig.RemoteExecutableConfig,
myPeerID,
actionCapability,
info,
don.DON,
idsToDONs,
w.dispatcher,
defaultTargetRequestTimeout,
w.lggr,
), nil
}

err = w.addReceiver(ctx, capability, don, newActionServer)
if err != nil {
return fmt.Errorf("failed to add action server-side receiver: %w", err)
}
case capabilities.CapabilityTypeConsensus:
w.lggr.Warn("no remote client configured for capability type consensus, skipping configuration")
case capabilities.CapabilityTypeTarget:
Expand All @@ -429,8 +470,13 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee
return nil, errors.New("capability does not implement TargetCapability")
}

return target.NewServer(
capabilityConfig.RemoteTargetConfig,
remoteConfig := &capabilities.RemoteExecutableConfig{}
if capabilityConfig.RemoteTargetConfig != nil {
remoteConfig.RequestHashExcludedAttributes = capabilityConfig.RemoteTargetConfig.RequestHashExcludedAttributes
}

return executable.NewServer(
remoteConfig,
myPeerID,
targetCapability,
info,
Expand Down
7 changes: 4 additions & 3 deletions core/capabilities/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func TestLauncher(t *testing.T) {
)

dispatcher.On("SetReceiver", fullTriggerCapID, dID, mock.AnythingOfType("*remote.triggerPublisher")).Return(nil)
dispatcher.On("SetReceiver", fullTargetID, dID, mock.AnythingOfType("*target.server")).Return(nil)
dispatcher.On("SetReceiver", fullTargetID, dID, mock.AnythingOfType("*executable.server")).Return(nil)

err = launcher.Launch(ctx, state)
require.NoError(t, err)
Expand Down Expand Up @@ -603,7 +603,8 @@ func TestLauncher_RemoteTriggerModeAggregatorShim(t *testing.T) {
)

dispatcher.On("SetReceiver", fullTriggerCapID, capDonID, mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil)
dispatcher.On("SetReceiver", fullTargetID, capDonID, mock.AnythingOfType("*target.client")).Return(nil)
dispatcher.On("SetReceiver", fullTargetID, capDonID, mock.AnythingOfType("*executable.client")).Return(nil)
dispatcher.On("Ready").Return(nil).Maybe()
awaitRegistrationMessageCh := make(chan struct{})
dispatcher.On("Send", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) {
select {
Expand Down Expand Up @@ -919,7 +920,7 @@ func TestLauncher_WiresUpClientsForPublicWorkflowDON(t *testing.T) {
)

dispatcher.On("SetReceiver", fullTriggerCapID, capDonID, mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil)
dispatcher.On("SetReceiver", fullTargetID, capDonID, mock.AnythingOfType("*target.client")).Return(nil)
dispatcher.On("SetReceiver", fullTargetID, capDonID, mock.AnythingOfType("*executable.client")).Return(nil)

err = launcher.Launch(ctx, state)
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package target
package executable

import (
"context"
Expand All @@ -8,15 +8,15 @@ import (
"time"

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target/request"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/executable/request"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/validation"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

// client is a shim for remote target capabilities.
// client is a shim for remote executable capabilities.
// It translates between capability API calls and network messages.
// Its responsibilities are:
// 1. Transmit capability requests to remote nodes according to a transmission schedule
Expand All @@ -31,25 +31,25 @@ type client struct {
dispatcher types.Dispatcher
requestTimeout time.Duration

messageIDToCallerRequest map[string]*request.ClientRequest
requestIDToCallerRequest map[string]*request.ClientRequest
mutex sync.Mutex
stopCh services.StopChan
wg sync.WaitGroup
}

var _ commoncap.TargetCapability = &client{}
var _ commoncap.ExecutableCapability = &client{}
var _ types.Receiver = &client{}
var _ services.Service = &client{}

func NewClient(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, dispatcher types.Dispatcher,
requestTimeout time.Duration, lggr logger.Logger) *client {
return &client{
lggr: lggr.Named("TargetClient"),
lggr: lggr.Named("ExecutableCapabilityClient"),
remoteCapabilityInfo: remoteCapabilityInfo,
localDONInfo: localDonInfo,
dispatcher: dispatcher,
requestTimeout: requestTimeout,
messageIDToCallerRequest: make(map[string]*request.ClientRequest),
requestIDToCallerRequest: make(map[string]*request.ClientRequest),
stopCh: make(services.StopChan),
}
}
Expand All @@ -61,7 +61,13 @@ func (c *client) Start(ctx context.Context) error {
defer c.wg.Done()
c.checkForExpiredRequests()
}()
c.lggr.Info("TargetClient started")
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.checkDispatcherReady()
}()

c.lggr.Info("ExecutableCapability Client started")
return nil
})
}
Expand All @@ -71,11 +77,26 @@ func (c *client) Close() error {
close(c.stopCh)
c.cancelAllRequests(errors.New("client closed"))
c.wg.Wait()
c.lggr.Info("TargetClient closed")
c.lggr.Info("ExecutableCapability closed")
return nil
})
}

func (c *client) checkDispatcherReady() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-c.stopCh:
return
case <-ticker.C:
if err := c.dispatcher.Ready(); err != nil {
c.cancelAllRequests(fmt.Errorf("dispatcher not ready: %w", err))
}
}
}
}

func (c *client) checkForExpiredRequests() {
ticker := time.NewTicker(c.requestTimeout)
defer ticker.Stop()
Expand All @@ -93,18 +114,23 @@ func (c *client) expireRequests() {
c.mutex.Lock()
defer c.mutex.Unlock()

for messageID, req := range c.messageIDToCallerRequest {
for messageID, req := range c.requestIDToCallerRequest {
if req.Expired() {
req.Cancel(errors.New("request expired"))
delete(c.messageIDToCallerRequest, messageID)
delete(c.requestIDToCallerRequest, messageID)
}

if c.dispatcher.Ready() != nil {
c.cancelAllRequests(errors.New("dispatcher not ready"))
return
}
}
}

func (c *client) cancelAllRequests(err error) {
c.mutex.Lock()
defer c.mutex.Unlock()
for _, req := range c.messageIDToCallerRequest {
for _, req := range c.requestIDToCallerRequest {
req.Cancel(err)
}
}
Expand All @@ -113,49 +139,80 @@ func (c *client) Info(ctx context.Context) (commoncap.CapabilityInfo, error) {
return c.remoteCapabilityInfo, nil
}

func (c *client) RegisterToWorkflow(ctx context.Context, request commoncap.RegisterToWorkflowRequest) error {
// do nothing
return nil
}
func (c *client) RegisterToWorkflow(ctx context.Context, registerRequest commoncap.RegisterToWorkflowRequest) error {
req, err := request.NewClientRegisterToWorkflowRequest(ctx, c.lggr, registerRequest, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher,
c.requestTimeout)

if err != nil {
return fmt.Errorf("failed to create client request: %w", err)
}

if err = c.sendRequest(req); err != nil {
return fmt.Errorf("failed to send request: %w", err)
}

func (c *client) UnregisterFromWorkflow(ctx context.Context, request commoncap.UnregisterFromWorkflowRequest) error {
// do nothing
resp := <-req.ResponseChan()
if resp.Err != nil {
return fmt.Errorf("error executing request: %w", resp.Err)
}
return nil
}

func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest) (commoncap.CapabilityResponse, error) {
req, err := c.executeRequest(ctx, capReq)
func (c *client) UnregisterFromWorkflow(ctx context.Context, unregisterRequest commoncap.UnregisterFromWorkflowRequest) error {
req, err := request.NewClientUnregisterFromWorkflowRequest(ctx, c.lggr, unregisterRequest, c.remoteCapabilityInfo,
c.localDONInfo, c.dispatcher, c.requestTimeout)

if err != nil {
return commoncap.CapabilityResponse{}, fmt.Errorf("failed to execute request: %w", err)
return fmt.Errorf("failed to create client request: %w", err)
}

if err = c.sendRequest(req); err != nil {
return fmt.Errorf("failed to send request: %w", err)
}

resp := <-req.ResponseChan()
return resp.CapabilityResponse, resp.Err
if resp.Err != nil {
return fmt.Errorf("error executing request: %w", resp.Err)
}
return nil
}

func (c *client) executeRequest(ctx context.Context, capReq commoncap.CapabilityRequest) (*request.ClientRequest, error) {
c.mutex.Lock()
defer c.mutex.Unlock()

messageID, err := GetMessageIDForRequest(capReq)
func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest) (commoncap.CapabilityResponse, error) {
req, err := request.NewClientExecuteRequest(ctx, c.lggr, capReq, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher,
c.requestTimeout)
if err != nil {
return nil, fmt.Errorf("failed to get message ID for request: %w", err)
return commoncap.CapabilityResponse{}, fmt.Errorf("failed to create client request: %w", err)
}

c.lggr.Debugw("executing remote target", "messageID", messageID)
if err = c.sendRequest(req); err != nil {
return commoncap.CapabilityResponse{}, fmt.Errorf("failed to send request: %w", err)
}

if _, ok := c.messageIDToCallerRequest[messageID]; ok {
return nil, fmt.Errorf("request for message ID %s already exists", messageID)
resp := <-req.ResponseChan()
if resp.Err != nil {
return commoncap.CapabilityResponse{}, fmt.Errorf("error executing request: %w", resp.Err)
}

req, err := request.NewClientRequest(ctx, c.lggr, capReq, messageID, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher,
c.requestTimeout)
capabilityResponse, err := pb.UnmarshalCapabilityResponse(resp.Result)
if err != nil {
return nil, fmt.Errorf("failed to create client request: %w", err)
return commoncap.CapabilityResponse{}, fmt.Errorf("failed to unmarshal capability response: %w", err)
}

return capabilityResponse, nil
}

func (c *client) sendRequest(req *request.ClientRequest) error {
c.mutex.Lock()
defer c.mutex.Unlock()

c.lggr.Debugw("executing remote execute capability", "requestID", req.ID())

if _, ok := c.requestIDToCallerRequest[req.ID()]; ok {
return fmt.Errorf("request for ID %s already exists", req.ID())
}

c.messageIDToCallerRequest[messageID] = req
return req, nil
c.requestIDToCallerRequest[req.ID()] = req
return nil
}

func (c *client) Receive(ctx context.Context, msg *types.MessageBody) {
Expand All @@ -168,9 +225,9 @@ func (c *client) Receive(ctx context.Context, msg *types.MessageBody) {
return
}

c.lggr.Debugw("Remote client target receiving message", "messageID", messageID)
c.lggr.Debugw("Remote client executable receiving message", "messageID", messageID)

req := c.messageIDToCallerRequest[messageID]
req := c.requestIDToCallerRequest[messageID]
if req == nil {
c.lggr.Warnw("received response for unknown message ID ", "messageID", messageID)
return
Expand All @@ -181,18 +238,6 @@ func (c *client) Receive(ctx context.Context, msg *types.MessageBody) {
}
}

func GetMessageIDForRequest(req commoncap.CapabilityRequest) (string, error) {
if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowID); err != nil {
return "", fmt.Errorf("workflow ID is invalid: %w", err)
}

if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowExecutionID); err != nil {
return "", fmt.Errorf("workflow execution ID is invalid: %w", err)
}

return req.Metadata.WorkflowID + req.Metadata.WorkflowExecutionID, nil
}

func (c *client) Ready() error {
return nil
}
Expand Down
Loading
Loading