Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

execution factory constructor updated #800

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/loop/ccip_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ type ExecutionFactoryService struct {

// NewExecutionService returns a new [*ExecutionFactoryService].
// cmd must return a new exec.Cmd each time it is called.
func NewExecutionService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd, provider types.CCIPExecProvider) *ExecutionFactoryService {
func NewExecutionService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd, srcProvider types.CCIPExecProvider, dstProvider types.CCIPExecProvider, srcChain uint32, dstChain uint32, sourceTokenAddress string) *ExecutionFactoryService {
newService := func(ctx context.Context, instance any) (types.ReportingPluginFactory, error) {
plug, ok := instance.(types.CCIPExecutionFactoryGenerator)
if !ok {
return nil, fmt.Errorf("expected CCIPExecutionFactoryGenerator but got %T", instance)
}
return plug.NewExecutionFactory(ctx, provider)
return plug.NewExecutionFactory(ctx, srcProvider, dstProvider, int64(srcChain), int64(dstChain), sourceTokenAddress)
}
stopCh := make(chan struct{})
lggr = logger.Named(lggr, "CCIPExecutionService")
Expand Down
12 changes: 9 additions & 3 deletions pkg/loop/ccip_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestExecService(t *testing.T) {

exec := loop.NewExecutionService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd {
return NewHelperProcessCommand(loop.CCIPExecutionLOOPName, false, 0)
}, cciptest.ExecutionProvider)
}, cciptest.ExecutionProvider, cciptest.ExecutionProvider, 0, 0, "")
hook := exec.PluginService.XXXTestHook()
servicetest.Run(t, exec)

Expand Down Expand Up @@ -64,7 +64,7 @@ func TestExecService_recovery(t *testing.T) {
Limit: int(limit.Add(1)),
}
return h.New()
}, cciptest.ExecutionProvider)
}, cciptest.ExecutionProvider, cciptest.ExecutionProvider, 0, 0, "")
servicetest.Run(t, exec)

reportingplugintest.RunFactory(t, exec)
Expand Down Expand Up @@ -99,7 +99,13 @@ func TestExecLOOP(t *testing.T) {
assert.Contains(t, err.Error(), "BCF-3061")
if err == nil {
// test to run when BCF-3061 is fixed
cciptest.ExecutionLOOPTester{CCIPExecProvider: remoteProvider}.Run(t, remoteExecFactory)
cciptest.ExecutionLOOPTester{
SrcProvider: remoteProvider,
DstProvider: remoteProvider,
SrcChainID: 0,
DstChainID: 0,
SourceTokenAddress: "",
}.Run(t, remoteExecFactory)
}
})
}
Expand Down
526 changes: 284 additions & 242 deletions pkg/loop/internal/pb/ccip/factories.pb.go

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion pkg/loop/internal/pb/ccip/factories.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ service ExecutionCustomHandlers {

// NewExecutionFactoryRequest is a gRPC adapter to the factory configuration [https://github.com/smartcontractkit/ccip/core/services/ocr2/plugins/ccip/ccipexec/ExecutionPluginStaticConfig]
message NewExecutionFactoryRequest {
uint32 provider_service_id = 1;
uint32 src_provider_service_id = 1;
uint32 dst_provider_service_id = 2;
uint32 src_chain = 3;
uint32 dst_chain = 4;
string src_token_address = 5;
}

// NewExecutionFactoryResponse is a contains the id of the created execution factory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,35 @@ type execFactoryServer struct {
}

// NewExecutionFactory implements types.CCIPExecFactoryGenerator.
func (e execFactoryServer) NewExecutionFactory(ctx context.Context, provider types.CCIPExecProvider) (types.ReportingPluginFactory, error) {
err := e.provider.Evaluate(ctx, provider)
func (e execFactoryServer) NewExecutionFactory(ctx context.Context, srcProvider types.CCIPExecProvider, dstProvider types.CCIPExecProvider, srcChainID int64, dstChainID int64, sourceTokenAddress string) (types.ReportingPluginFactory, error) {
err := e.provider.Evaluate(ctx, srcProvider)
if err != nil {
return nil, err
}

err2 := e.provider.Evaluate(ctx, dstProvider)
if err2 != nil {
return nil, err2
}
return reportingplugintest.Factory, nil
}

func RunExecutionLOOP(t *testing.T, p types.CCIPExecutionFactoryGenerator) {
ExecutionLOOPTester{ExecutionProvider}.Run(t, p)
ExecutionLOOPTester{SrcProvider: ExecutionProvider, DstProvider: ExecutionProvider, SrcChainID: 0, DstChainID: 0}.Run(t, p)
}

type ExecutionLOOPTester struct {
types.CCIPExecProvider
SrcProvider types.CCIPExecProvider
DstProvider types.CCIPExecProvider
SrcChainID int64
DstChainID int64
SourceTokenAddress string
}

func (e ExecutionLOOPTester) Run(t *testing.T, p types.CCIPExecutionFactoryGenerator) {
t.Run("ExecutionLOOP", func(t *testing.T) {
ctx := tests.Context(t)
factory, err := p.NewExecutionFactory(ctx, e.CCIPExecProvider)
factory, err := p.NewExecutionFactory(ctx, e.SrcProvider, e.DstProvider, e.SrcChainID, e.DstChainID, e.SourceTokenAddress)
require.NoError(t, err)

runExecReportingPluginFactory(t, factory)
Expand Down
60 changes: 45 additions & 15 deletions pkg/loop/internal/reportingplugin/ccip/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,33 +44,55 @@ func NewExecutionLOOPClient(broker net.Broker, brokerCfg net.BrokerConfig, conn
// is run as an external process via hashicorp plugin. If the given provider is a GRPCClientConn, then the provider is proxied to the
// to the relayer, which is its own process via hashicorp plugin. If the provider is not a GRPCClientConn, then the provider is a local
// to the core node. The core must wrap the provider in a grpc server and serve it locally.
func (c *ExecutionLOOPClient) NewExecutionFactory(ctx context.Context, provider types.CCIPExecProvider) (types.ReportingPluginFactory, error) {
func (c *ExecutionLOOPClient) NewExecutionFactory(ctx context.Context, srcProvider types.CCIPExecProvider, dstProvider types.CCIPExecProvider, srcChainID int64, dstChainID int64, srcTokenAddress string) (types.ReportingPluginFactory, error) {
newExecClientFn := func(ctx context.Context) (id uint32, deps net.Resources, err error) {
// TODO are there any local resources that need to be passed to the executor and started as a server?

// the proxyable resources are the Provider, which may or may not be local to the client process. (legacy vs loopp)
var (
providerID uint32
providerResource net.Resource
srcProviderID uint32
srcProviderResource net.Resource
dstProviderID uint32
dstProviderResource net.Resource
)
if grpcProvider, ok := provider.(goplugin.GRPCClientConn); ok {
if srcGrpcProvider, ok := srcProvider.(goplugin.GRPCClientConn); ok {
// TODO: BCF-3061 ccip provider can create new services. the proxying needs to be augmented
// to intercept and route to the created services. also, need to prevent leaks.
providerID, providerResource, err = c.Serve("ExecProvider", proxy.NewProxy(grpcProvider.ClientConn()))
srcProviderID, srcProviderResource, err = c.Serve("ExecProvider", proxy.NewProxy(srcGrpcProvider.ClientConn()))
} else {
// loop client runs in the core node. if the provider is not a grpc client conn, then we are in legacy mode
// and need to serve all the required services locally.
providerID, providerResource, err = c.ServeNew("ExecProvider", func(s *grpc.Server) {
ccipprovider.RegisterExecutionProviderServices(s, provider, c.BrokerExt)
srcProviderID, srcProviderResource, err = c.ServeNew("ExecProvider", func(s *grpc.Server) {
ccipprovider.RegisterExecutionProviderServices(s, srcProvider, c.BrokerExt)
})
}
if err != nil {
return 0, nil, err
}
deps.Add(providerResource)
deps.Add(srcProviderResource)

if dstGrpcProvider, ok := dstProvider.(goplugin.GRPCClientConn); ok {
// TODO: BCF-3061 ccip provider can create new services. the proxying needs to be augmented
// to intercept and route to the created services. also, need to prevent leaks.
dstProviderID, dstProviderResource, err = c.Serve("ExecProvider", proxy.NewProxy(dstGrpcProvider.ClientConn()))
} else {
// loop client runs in the core node. if the provider is not a grpc client conn, then we are in legacy mode
// and need to serve all the required services locally.
dstProviderID, dstProviderResource, err = c.ServeNew("ExecProvider", func(s *grpc.Server) {
ccipprovider.RegisterExecutionProviderServices(s, dstProvider, c.BrokerExt)
})
}
if err != nil {
return 0, nil, err
}
deps.Add(dstProviderResource)

resp, err := c.generator.NewExecutionFactory(ctx, &ccippb.NewExecutionFactoryRequest{
ProviderServiceId: providerID,
SrcProviderServiceId: srcProviderID,
DstProviderServiceId: dstProviderID,
SrcChain: uint32(srcChainID),
DstChain: uint32(dstChainID),
SrcTokenAddress: srcTokenAddress,
})
if err != nil {
return 0, nil, err
Expand Down Expand Up @@ -115,15 +137,23 @@ func (r *ExecutionLOOPServer) NewExecutionFactory(ctx context.Context, request *
}
}()

// lookup the provider service
providerConn, err := r.Dial(request.ProviderServiceId)
// lookup the source provider service
srcProviderConn, err := r.Dial(request.SrcProviderServiceId)
if err != nil {
return nil, net.ErrConnDial{Name: "ExecProvider", ID: request.SrcProviderServiceId, Err: err}
}
deps.Add(net.Resource{Closer: srcProviderConn, Name: "ExecProvider"})
srcProvider := ccipprovider.NewExecProviderClient(r.BrokerExt, srcProviderConn)

// lookup the dest provider service
dstProviderConn, err := r.Dial(request.DstProviderServiceId)
if err != nil {
return nil, net.ErrConnDial{Name: "ExecProvider", ID: request.ProviderServiceId, Err: err}
return nil, net.ErrConnDial{Name: "ExecProvider", ID: request.DstProviderServiceId, Err: err}
}
deps.Add(net.Resource{Closer: providerConn, Name: "ExecProvider"})
provider := ccipprovider.NewExecProviderClient(r.BrokerExt, providerConn)
deps.Add(net.Resource{Closer: dstProviderConn, Name: "ExecProvider"})
dstProvider := ccipprovider.NewExecProviderClient(r.BrokerExt, dstProviderConn)

factory, err := r.impl.NewExecutionFactory(ctx, provider)
factory, err := r.impl.NewExecutionFactory(ctx, srcProvider, dstProvider, int64(request.SrcChain), int64(request.DstChain), request.SrcTokenAddress)
if err != nil {
return nil, fmt.Errorf("failed to create new execution factory: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/types/provider_ccip.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type CCIPCommitFactoryGenerator interface {
}

type CCIPExecutionFactoryGenerator interface {
NewExecutionFactory(ctx context.Context, provider CCIPExecProvider) (ReportingPluginFactory, error)
NewExecutionFactory(ctx context.Context, srcProvider CCIPExecProvider, dstProvider CCIPExecProvider, srcChainID int64, dstChainID int64, sourceTokenAddress string) (ReportingPluginFactory, error)
}
type CCIPFactoryGenerator interface {
CCIPCommitFactoryGenerator
Expand Down
Loading