Skip to content

Commit

Permalink
add gateway connector capability config
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidOrchard committed Aug 29, 2024
1 parent 0294e1f commit c639695
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 45 deletions.
22 changes: 22 additions & 0 deletions core/config/capabilities_config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package config

import (
"math/big"

"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/network"
)

type CapabilitiesExternalRegistry interface {
Expand All @@ -11,7 +14,26 @@ type CapabilitiesExternalRegistry interface {
RelayID() types.RelayID
}

type GatewayConnectorConfig interface {
NodeAddress() string
DonId() string
Gateways() []ConnectorGatewayConfig
WsClientConfig() network.WebSocketClientConfig
AuthMinChallengeLen() int
AuthTimestampToleranceSec() uint32
}

type ConnectorGatewayConfig interface {
Id() string
URL() string
}

type WorkflowConnectorConfig interface {
ChainIDForNodeKey() big.Int
GatewayConnectorConfig() GatewayConnectorConfig
}
type Capabilities interface {
Peering() P2P
ExternalRegistry() CapabilitiesExternalRegistry
WorkflowConnectorConfig() WorkflowConnectorConfig
}
66 changes: 64 additions & 2 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ import (
"fmt"
"net"
"net/url"
"reflect"
"regexp"
"strings"

"math/big"

"github.com/google/uuid"
"go.uber.org/multierr"
"go.uber.org/zap/zapcore"
Expand All @@ -20,11 +23,13 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/config/parse"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/network"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/p2pkey"
"github.com/smartcontractkit/chainlink/v2/core/sessions"
"github.com/smartcontractkit/chainlink/v2/core/store/dialects"
"github.com/smartcontractkit/chainlink/v2/core/store/models"
"github.com/smartcontractkit/chainlink/v2/core/utils"

configutils "github.com/smartcontractkit/chainlink/v2/core/utils/config"
)

Expand Down Expand Up @@ -1434,14 +1439,71 @@ func (r *ExternalRegistry) setFrom(f *ExternalRegistry) {
}
}

type GatewayConnectorConfig struct {
NodeAddress string
DonId string
Gateways []ConnectorGatewayConfig
WsClientConfig network.WebSocketClientConfig
AuthMinChallengeLen int
AuthTimestampToleranceSec uint32
}

func (r *GatewayConnectorConfig) setFrom(f *GatewayConnectorConfig) {
if f.NodeAddress != "" {
r.NodeAddress = f.NodeAddress
}

if f.DonId != "" {
r.DonId = f.DonId
}

// TODO: verify this copy by reference is ok, or does array need to be copied by value
if f.Gateways != nil {
r.Gateways = f.Gateways
}

if !reflect.ValueOf(f.WsClientConfig).IsZero() {
r.WsClientConfig = f.WsClientConfig
}

if f.AuthMinChallengeLen != 0 {
r.AuthMinChallengeLen = f.AuthMinChallengeLen
}

if f.AuthTimestampToleranceSec != 0 {
r.AuthTimestampToleranceSec = f.AuthTimestampToleranceSec
}
}

type ConnectorGatewayConfig struct {
Id string
URL string
}
type WorkflowConnectorConfig struct {
ChainIDForNodeKey big.Int
GatewayConnectorConfig GatewayConnectorConfig `json:"gatewayConnectorConfig"`
}

func (r *WorkflowConnectorConfig) setFrom(f *WorkflowConnectorConfig) {
if len(f.ChainIDForNodeKey.Bits()) != 0 {
r.ChainIDForNodeKey = f.ChainIDForNodeKey
}

if !reflect.ValueOf(f.GatewayConnectorConfig).IsZero() {
r.GatewayConnectorConfig.setFrom(&f.GatewayConnectorConfig)
}
}

type Capabilities struct {
Peering P2P `toml:",omitempty"`
ExternalRegistry ExternalRegistry `toml:",omitempty"`
Peering P2P `toml:",omitempty"`
ExternalRegistry ExternalRegistry `toml:",omitempty"`
WorkflowConnectorConfig WorkflowConnectorConfig `toml:", omitempty"`
}

func (c *Capabilities) setFrom(f *Capabilities) {
c.Peering.setFrom(&f.Peering)
c.ExternalRegistry.setFrom(&f.ExternalRegistry)
c.WorkflowConnectorConfig.setFrom(&f.WorkflowConnectorConfig)
}

type ThresholdKeyShareSecrets struct {
Expand Down
35 changes: 33 additions & 2 deletions core/services/chainlink/config_capabilities.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package chainlink

import (
"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/config/toml"
"math/big"

"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/config/toml"
)

var _ config.Capabilities = (*capabilitiesConfig)(nil)
Expand All @@ -23,6 +24,21 @@ func (c *capabilitiesConfig) ExternalRegistry() config.CapabilitiesExternalRegis
}
}

func (c *capabilitiesConfig) WorkflowConnectorConfig() config.WorkflowConnectorConfig {
// I don't understand how the mixing of the toml.WorkflowConnectorConfig can be forced to pick up the config.GatewayConnectorConfig.
// when it's coming from the toml. It seems to be intermixing the toml.WorkflowConnectorConfig with the config.WorkflowConnectorConfig
// And I don't understand the use of a reference access with the use of the c: c.c.* syntax
// error described below.
// cannot use &workflowConnectorConfig{…} (value of type *workflowConnectorConfig) as
// "github.com/smartcontractkit/chainlink/v2/core/config".WorkflowConnectorConfig value in return statement:
// *workflowConnectorConfig does not implement "github.com/smartcontractkit/chainlink/v2/core/config".WorkflowConnectorConfig (wrong type for method GatewayConnectorConfig)
// have GatewayConnectorConfig() "github.com/smartcontractkit/chainlink/v2/core/config/toml".GatewayConnectorConfig
// want GatewayConnectorConfig() "github.com/smartcontractkit/chainlink/v2/core/config".GatewayConnectorConfigcompilerInvalidIfaceAssign
return &workflowConnectorConfig{
c: c.c.WorkflowConnectorConfig,
}
}

type capabilitiesExternalRegistry struct {
c toml.ExternalRegistry
}
Expand All @@ -42,3 +58,18 @@ func (c *capabilitiesExternalRegistry) ChainID() string {
func (c *capabilitiesExternalRegistry) Address() string {
return *c.c.Address
}

type workflowConnectorConfig struct {
c toml.WorkflowConnectorConfig
}

func (c *workflowConnectorConfig) ChainIDForNodeKey() big.Int {
return c.c.ChainIDForNodeKey
}

func (c *workflowConnectorConfig) GatewayConnectorConfig() toml.GatewayConnectorConfig {
// invalid operation: cannot indirect
// c.c.GatewayConnectorConfig (variable of type "github.com/smartcontractkit/chainlink/v2/core/config/toml".ConnectorConfig)
// compilerInvalidIndirection
return c.c.GatewayConnectorConfig
}
19 changes: 0 additions & 19 deletions core/services/gateway/connector/config.go

This file was deleted.

40 changes: 22 additions & 18 deletions core/services/gateway/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils/hex"

"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/network"
Expand Down Expand Up @@ -46,7 +47,7 @@ type GatewayConnectorHandler interface {
type gatewayConnector struct {
services.StateMachine

config *ConnectorConfig
config config.GatewayConnectorConfig
codec api.Codec
clock clockwork.Clock
nodeAddress []byte
Expand All @@ -71,24 +72,27 @@ func (c *gatewayConnector) Name() string { return c.lggr.Name() }

type gatewayState struct {
conn network.WSConnectionWrapper
config ConnectorGatewayConfig
config config.ConnectorGatewayConfig
url *url.URL
wsClient network.WebSocketClient
}

func NewGatewayConnector(config *ConnectorConfig, signer Signer, handler GatewayConnectorHandler, clock clockwork.Clock, lggr logger.Logger) (GatewayConnector, error) {
if config == nil || signer == nil || handler == nil || clock == nil || lggr == nil {
func NewGatewayConnector(configLocal config.GatewayConnectorConfig, signer Signer, handler GatewayConnectorHandler, clock clockwork.Clock, lggr logger.Logger) (GatewayConnector, error) {
if configLocal == nil || signer == nil || handler == nil || clock == nil || lggr == nil {
return nil, errors.New("nil dependency")
}
if len(config.DonId) == 0 || len(config.DonId) > network.HandshakeDonIdLen {
// configLocal.DonId undefined
// (type *"github.com/smartcontractkit/chainlink/v2/core/config".ConnectorConfig
// is pointer to interface, not interface)compilerMissingFieldOrMethod
if len(configLocal.DonId()) == 0 || len(configLocal.DonId()) > network.HandshakeDonIdLen {
return nil, errors.New("invalid DON ID")
}
addressBytes, err := hex.DecodeString(config.NodeAddress)
addressBytes, err := hex.DecodeString(configLocal.NodeAddress())
if err != nil {
return nil, err
}
connector := &gatewayConnector{
config: config,
config: configLocal,
codec: &api.JsonRPCCodec{},
clock: clock,
nodeAddress: addressBytes,
Expand All @@ -99,26 +103,26 @@ func NewGatewayConnector(config *ConnectorConfig, signer Signer, handler Gateway
}
gateways := make(map[string]*gatewayState)
urlToId := make(map[string]string)
for _, gw := range config.Gateways {
for _, gw := range configLocal.Gateways() {
gw := gw
if _, exists := gateways[gw.Id]; exists {
if _, exists := gateways[gw.Id()]; exists {
return nil, fmt.Errorf("duplicate Gateway ID %s", gw.Id)
}
if _, exists := urlToId[gw.URL]; exists {
if _, exists := urlToId[gw.URL()]; exists {
return nil, fmt.Errorf("duplicate Gateway URL %s", gw.URL)
}
parsedURL, err := url.Parse(gw.URL)
parsedURL, err := url.Parse(gw.URL())
if err != nil {
return nil, err
}
gateway := &gatewayState{
conn: network.NewWSConnectionWrapper(lggr),
config: gw,
url: parsedURL,
wsClient: network.NewWebSocketClient(config.WsClientConfig, connector, lggr),
wsClient: network.NewWebSocketClient(configLocal.WsClientConfig(), connector, lggr),
}
gateways[gw.Id] = gateway
urlToId[gw.URL] = gw.Id
gateways[gw.Id()] = gateway
urlToId[gw.URL()] = gw.Id()
}
connector.gateways = gateways
connector.urlToId = urlToId
Expand Down Expand Up @@ -159,7 +163,7 @@ func (c *gatewayConnector) readLoop(gatewayState *gatewayState) {
c.lggr.Errorw("failed to validate message signature", "id", gatewayState.config.Id, "err", err)
break
}
c.handler.HandleGatewayMessage(ctx, gatewayState.config.Id, msg)
c.handler.HandleGatewayMessage(ctx, gatewayState.config.Id(), msg)
}
}
}
Expand Down Expand Up @@ -229,7 +233,7 @@ func (c *gatewayConnector) NewAuthHeader(url *url.URL) ([]byte, error) {
}
authHeaderElems := &network.AuthHeaderElems{
Timestamp: uint32(c.clock.Now().Unix()),
DonId: c.config.DonId,
DonId: c.config.DonId(),
GatewayId: gatewayId,
}
packedElems := network.PackAuthHeader(authHeaderElems)
Expand All @@ -245,7 +249,7 @@ func (c *gatewayConnector) ChallengeResponse(url *url.URL, challenge []byte) ([]
if err != nil {
return nil, err
}
if len(challengeElems.ChallengeBytes) < c.config.AuthMinChallengeLen {
if len(challengeElems.ChallengeBytes) < c.config.AuthMinChallengeLen() {
return nil, network.ErrChallengeTooShort
}
gatewayId, found := c.urlToId[url.String()]
Expand All @@ -254,7 +258,7 @@ func (c *gatewayConnector) ChallengeResponse(url *url.URL, challenge []byte) ([]
}
nowTs := uint32(c.clock.Now().Unix())
ts := challengeElems.Timestamp
if ts < nowTs-c.config.AuthTimestampToleranceSec || nowTs+c.config.AuthTimestampToleranceSec < ts {
if ts < nowTs-c.config.AuthTimestampToleranceSec() || nowTs+c.config.AuthTimestampToleranceSec() < ts {
return nil, network.ErrAuthInvalidTimestamp
}
return c.signer.Sign(challenge)
Expand Down
12 changes: 8 additions & 4 deletions core/services/gateway/connector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector/mocks"
Expand All @@ -33,18 +34,21 @@ Id = "another_one"
URL = "wss://example.com:8090/node_endpoint"
`

func parseTOMLConfig(t *testing.T, tomlConfig string) *connector.ConnectorConfig {
var cfg connector.ConnectorConfig
func parseTOMLConfig(t *testing.T, tomlConfig string) *config.ConnectorConfig {
var cfg config.ConnectorConfig
err := toml.Unmarshal([]byte(tomlConfig), &cfg)
require.NoError(t, err)
return &cfg
}

func newTestConnector(t *testing.T, config *connector.ConnectorConfig, now time.Time) (connector.GatewayConnector, *mocks.Signer, *mocks.GatewayConnectorHandler) {
func newTestConnector(t *testing.T, configInstance *config.ConnectorConfig, now time.Time) (connector.GatewayConnector, *mocks.Signer, *mocks.GatewayConnectorHandler) {
signer := mocks.NewSigner(t)
handler := mocks.NewGatewayConnectorHandler(t)
clock := clockwork.NewFakeClock()
connector, err := connector.NewGatewayConnector(config, signer, handler, clock, logger.TestLogger(t))
// cannot use configInstance
// (variable of type *"github.com/smartcontractkit/chainlink/v2/core/config".ConnectorConfig)
// as *invalid type value in argument to connector.NewGatewayConnector
connector, err := connector.NewGatewayConnector(configInstance, signer, handler, clock, logger.TestLogger(t))
require.NoError(t, err)
return connector, signer, handler
}
Expand Down

0 comments on commit c639695

Please sign in to comment.