Skip to content

Commit

Permalink
Merge branch 'develop' into feature/MERC-114/block-range-guarantee
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav authored Jun 7, 2023
2 parents a548fae + d0d0800 commit b493f4a
Show file tree
Hide file tree
Showing 16 changed files with 113 additions and 58 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package functions

import (
"context"
"crypto/ecdsa"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector"
)

Expand All @@ -31,9 +32,17 @@ func (h *functionsConnectorHandler) SetConnector(connector connector.GatewayConn
}

func (h *functionsConnectorHandler) Sign(data ...[]byte) ([]byte, error) {
return gateway.SignData(h.signerKey, data...)
return api.SignData(h.signerKey, data...)
}

func (h *functionsConnectorHandler) HandleGatewayMessage(gatewayId string, msg *gateway.Message) {
func (h *functionsConnectorHandler) HandleGatewayMessage(gatewayId string, msg *api.Message) {
h.lggr.Debugw("functionsConnectorHandler: received message from gateway", "id", gatewayId)
}

func (h *functionsConnectorHandler) Start(ctx context.Context) error {
return nil
}

func (h *functionsConnectorHandler) Close() error {
return nil
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package gateway
package api

// Codec implements (de)serialization of Message objects.
type Codec interface {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package gateway
package api

import (
"encoding/json"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package gateway_test
package api_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/services/gateway"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
)

func TestJsonRPCRequest_Decode_Correct(t *testing.T) {
t.Parallel()

input := []byte(`{"jsonrpc": "2.0", "id": "aa-bb", "method": "upload", "params": {"body":{"don_id": "functions_local", "payload": {"field": 123}}}}`)
codec := gateway.JsonRPCCodec{}
codec := api.JsonRPCCodec{}
msg, err := codec.DecodeRequest(input)
require.NoError(t, err)
require.Equal(t, "functions_local", msg.Body.DonId)
Expand All @@ -31,7 +31,7 @@ func TestJsonRPCRequest_Decode_Incorrect(t *testing.T) {
"incorrect rpc version": `{"jsonrpc": "5.1", "id": "abc", "method": "upload", "params": {}}`,
}

codec := gateway.JsonRPCCodec{}
codec := api.JsonRPCCodec{}
for _, input := range testCases {
_, err := codec.DecodeRequest([]byte(input))
require.Error(t, err)
Expand All @@ -41,13 +41,13 @@ func TestJsonRPCRequest_Decode_Incorrect(t *testing.T) {
func TestJsonRPCRequest_Encode(t *testing.T) {
t.Parallel()

var msg gateway.Message
msg.Body = gateway.MessageBody{
var msg api.Message
msg.Body = api.MessageBody{
MessageId: "aA-bB",
Sender: "0x1234",
Method: "upload",
}
codec := gateway.JsonRPCCodec{}
codec := api.JsonRPCCodec{}
bytes, err := codec.EncodeRequest(&msg)
require.NoError(t, err)

Expand All @@ -62,7 +62,7 @@ func TestJsonRPCResponse_Decode(t *testing.T) {
t.Parallel()

input := []byte(`{"jsonrpc": "2.0", "id": "aa-bb", "result": {"body": {"don_id": "functions_local", "payload": {"field": 123}}}}`)
codec := gateway.JsonRPCCodec{}
codec := api.JsonRPCCodec{}
msg, err := codec.DecodeResponse(input)
require.NoError(t, err)
require.Equal(t, "functions_local", msg.Body.DonId)
Expand All @@ -73,13 +73,13 @@ func TestJsonRPCResponse_Decode(t *testing.T) {
func TestJsonRPCResponse_Encode(t *testing.T) {
t.Parallel()

var msg gateway.Message
msg.Body = gateway.MessageBody{
var msg api.Message
msg.Body = api.MessageBody{
MessageId: "aA-bB",
Sender: "0x1234",
Method: "upload",
}
codec := gateway.JsonRPCCodec{}
codec := api.JsonRPCCodec{}
bytes, err := codec.EncodeResponse(&msg)
require.NoError(t, err)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package gateway
package api

import (
"encoding/json"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package gateway
package api

import (
"bytes"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package gateway_test
package api_test

import (
"bytes"
Expand All @@ -7,15 +7,15 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/services/gateway"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

func TestSignatures_MessageSignAndValidate(t *testing.T) {
t.Parallel()

msg := &gateway.Message{
Body: gateway.MessageBody{
msg := &api.Message{
Body: api.MessageBody{
MessageId: "abcd",
Method: "request",
DonId: "donA",
Expand All @@ -27,13 +27,13 @@ func TestSignatures_MessageSignAndValidate(t *testing.T) {
require.NoError(t, err)
address := crypto.PubkeyToAddress(privateKey.PublicKey).Bytes()

signature, err := gateway.SignMessage(&msg.Body, privateKey)
signature, err := api.SignMessage(&msg.Body, privateKey)
require.NoError(t, err)
require.Equal(t, 65, len(signature))

msg.Signature = utils.StringToHex(string(signature))
msg.Body.Sender = utils.StringToHex(string(address))
require.NoError(t, gateway.ValidateMessageSignature(msg))
require.NoError(t, api.ValidateMessageSignature(msg))
}

func TestSignatures_BytesSignAndValidate(t *testing.T) {
Expand All @@ -45,11 +45,11 @@ func TestSignatures_BytesSignAndValidate(t *testing.T) {
require.NoError(t, err)
address := crypto.PubkeyToAddress(privateKey.PublicKey).Bytes()

signature, err := gateway.SignData(privateKey, data)
signature, err := api.SignData(privateKey, data)
require.NoError(t, err)
require.Equal(t, 65, len(signature))

signer, err := gateway.ValidateSignature(signature, data)
signer, err := api.ValidateSignature(signature, data)
require.NoError(t, err)
require.True(t, bytes.Equal(signer, address))
}
13 changes: 7 additions & 6 deletions core/services/gateway/connectionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.uber.org/multierr"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
gw_common "github.com/smartcontractkit/chainlink/v2/core/services/gateway/common"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/network"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
Expand All @@ -30,7 +31,7 @@ type DONConnectionManager interface {
SetHandler(handler Handler)

// Thread-safe.
SendToNode(ctx context.Context, nodeAddress string, msg *Message) error
SendToNode(ctx context.Context, nodeAddress string, msg *api.Message) error
}

type connectionManager struct {
Expand All @@ -50,7 +51,7 @@ type donConnectionManager struct {
donConfig *DONConfig
nodes map[string]*nodeState
handler Handler
codec Codec
codec api.Codec
closeWait sync.WaitGroup
shutdownCh chan struct{}
lggr logger.Logger
Expand All @@ -71,7 +72,7 @@ type connAttempt struct {
}

func NewConnectionManager(config *GatewayConfig, clock gw_common.Clock, lggr logger.Logger) (ConnectionManager, error) {
codec := &JsonRPCCodec{}
codec := &api.JsonRPCCodec{}
dons := make(map[string]*donConnectionManager)
for _, donConfig := range config.Dons {
donConfig := donConfig
Expand Down Expand Up @@ -187,7 +188,7 @@ func (m *connectionManager) parseAuthHeader(authHeader []byte) (nodeAddress stri
return "", nil, errors.New("unable to parse auth header")
}
signature := authHeader[n-network.HandshakeSignatureLen:]
signer, err := ValidateSignature(signature, authHeader[:n-network.HandshakeSignatureLen])
signer, err := api.ValidateSignature(signature, authHeader[:n-network.HandshakeSignatureLen])
nodeAddress = "0x" + hex.EncodeToString(signer)
return
}
Expand All @@ -214,7 +215,7 @@ func (m *connectionManager) FinalizeHandshake(attemptId string, response []byte,
if !ok {
return errors.New("connection attempt not found")
}
signer, err := ValidateSignature(response, attempt.challenge)
signer, err := api.ValidateSignature(response, attempt.challenge)
if err != nil {
return errors.New("invalid challenge response")
}
Expand Down Expand Up @@ -242,7 +243,7 @@ func (m *donConnectionManager) SetHandler(handler Handler) {
m.handler = handler
}

func (m *donConnectionManager) SendToNode(ctx context.Context, nodeAddress string, msg *Message) error {
func (m *donConnectionManager) SendToNode(ctx context.Context, nodeAddress string, msg *api.Message) error {
data, err := m.codec.EncodeRequest(msg)
if err != nil {
return fmt.Errorf("error encoding request for node %s: %v", nodeAddress, err)
Expand Down
19 changes: 12 additions & 7 deletions core/services/gateway/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/gorilla/websocket"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/common"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/network"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
Expand All @@ -23,7 +23,7 @@ type GatewayConnector interface {
job.ServiceCtx
network.ConnectionInitiator

SendToGateway(ctx context.Context, gatewayId string, msg *gateway.Message) error
SendToGateway(ctx context.Context, gatewayId string, msg *api.Message) error
}

// Signer implementation needs to be provided by a GatewayConnector user (node)
Expand All @@ -39,14 +39,16 @@ type Signer interface {

//go:generate mockery --quiet --name GatewayConnectorHandler --output ./mocks/ --case=underscore
type GatewayConnectorHandler interface {
HandleGatewayMessage(gatewayId string, msg *gateway.Message)
job.ServiceCtx

HandleGatewayMessage(gatewayId string, msg *api.Message)
}

type gatewayConnector struct {
utils.StartStopOnce

config *ConnectorConfig
codec gateway.Codec
codec api.Codec
clock common.Clock
nodeAddress []byte
signer Signer
Expand Down Expand Up @@ -77,7 +79,7 @@ func NewGatewayConnector(config *ConnectorConfig, signer Signer, handler Gateway
}
connector := &gatewayConnector{
config: config,
codec: &gateway.JsonRPCCodec{},
codec: &api.JsonRPCCodec{},
clock: clock,
nodeAddress: addressBytes,
signer: signer,
Expand Down Expand Up @@ -107,7 +109,7 @@ func NewGatewayConnector(config *ConnectorConfig, signer Signer, handler Gateway
return connector, nil
}

func (c *gatewayConnector) SendToGateway(ctx context.Context, gatewayId string, msg *gateway.Message) error {
func (c *gatewayConnector) SendToGateway(ctx context.Context, gatewayId string, msg *api.Message) error {
data, err := c.codec.EncodeResponse(msg)
if err != nil {
return fmt.Errorf("error encoding response for gateway %s: %v", gatewayId, err)
Expand Down Expand Up @@ -166,6 +168,9 @@ func (c *gatewayConnector) reconnectLoop(gatewayState *gatewayState) {
func (c *gatewayConnector) Start(ctx context.Context) error {
return c.StartOnce("GatewayConnector", func() error {
c.lggr.Info("starting gateway connector")
if err := c.handler.Start(ctx); err != nil {
return err
}
c.closeWait.Add(2 * len(c.gateways))
for _, gatewayState := range c.gateways {
gatewayState := gatewayState
Expand All @@ -185,7 +190,7 @@ func (c *gatewayConnector) Close() error {
gatewayState.conn.Close()
}
c.closeWait.Wait()
return nil
return c.handler.Close()
})
}

Expand Down
35 changes: 33 additions & 2 deletions core/services/gateway/connector/mocks/gateway_connector_handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b493f4a

Please sign in to comment.