Skip to content

Commit

Permalink
Add Db syncing for registry syncer (#13756)
Browse files Browse the repository at this point in the history
* Adds migration for syncer state data

* Implements syncer ORM

* Implements syncing with local registry and properly updating it

* Fixes db sync channel

* Connects DB to syncer constructor

* Adds changeset

* Fixes changeset

* Uses custom marshal JSON call on DB store call

* Fixes errors from merge conflicts

* Fixes existing tests

* Prevents tests from hanging

* Fixes lint issue

* Fixes setup on `New()` call

* Implements marshal/unnmarshal mechanics on syncer state

* Fixes migration name

* Fixes lint

* Keeps the latest 10 records on `registry_syncer_states`

* Adds ORM tests

* Prevents possible flake

* Fixes errors from conflict

* Fixes syncer tests

* Fixes syncer ORM tests

* Fixes linter

* Fixes tests

* Fixes tests

* Review changes

* Fixes tests

* Fixes lint

* Improves implementation

* Removes unused `to32Byte` func

* fixes errors on `Close()`

* Adds more custom types to avoid data races

* Update tests

* fixes lint

* Removes unnecessary comments

* Sends deep copy of local registry to launchers

* Fixes linter

* Uses interface for syncer ORM

* Mocks the ORM for tests

* Improves `syncer.Close()` mechanism

* Fixes merge conflicts

* Fixes test

* Fixes linter

* Fixes race condition with `syncer.reader`

* Fixes race condition in test

* Fixes race condition in test

* Fixes test

---------

Co-authored-by: Bolek <[email protected]>
  • Loading branch information
vyzaldysanchez and bolekk authored Aug 15, 2024
1 parent a41b353 commit c92a721
Show file tree
Hide file tree
Showing 12 changed files with 837 additions and 36 deletions.
5 changes: 5 additions & 0 deletions .changeset/many-knives-play.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#updated Adds DB syncing for registry syncer
5 changes: 4 additions & 1 deletion .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -570,4 +570,7 @@ packages:
interfaces:
Reader:
config:
mockname: "Mock{{ .InterfaceName }}"
mockname: "Mock{{ .InterfaceName }}"
github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer:
interfaces:
ORM:
2 changes: 2 additions & 0 deletions core/capabilities/ccip/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/smartcontractkit/chainlink-common/pkg/loop"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/common"
configsevm "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/configs/evm"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/launcher"
Expand Down Expand Up @@ -119,6 +120,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) (services
},
relayer,
cfg.ExternalRegistry().Address(),
registrysyncer.NewORM(d.ds, d.lggr),
)
if err != nil {
return nil, fmt.Errorf("could not configure syncer: %w", err)
Expand Down
3 changes: 3 additions & 0 deletions core/capabilities/ccip/launcher/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

it "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/ccip_integration_tests/integrationhelpers"
cctypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/types"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"

"github.com/onsi/gomega"
Expand All @@ -30,12 +31,14 @@ func TestIntegration_Launcher(t *testing.T) {
p2pIDs := it.P2pIDsFromInts(arr)
uni.AddCapability(p2pIDs)

db := pgtest.NewSqlxDB(t)
regSyncer, err := registrysyncer.New(lggr,
func() (p2ptypes.PeerID, error) {
return p2pIDs[0], nil
},
uni,
uni.CapReg.Address().String(),
registrysyncer.NewORM(db, lggr),
)
require.NoError(t, err)

Expand Down
1 change: 1 addition & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
},
relayer,
registryAddress,
registrysyncer.NewORM(opts.DS, globalLogger),
)
if err != nil {
return nil, fmt.Errorf("could not configure syncer: %w", err)
Expand Down
16 changes: 16 additions & 0 deletions core/services/registrysyncer/local_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,22 @@ type LocalRegistry struct {
IDsToCapabilities map[string]Capability
}

func NewLocalRegistry(
lggr logger.Logger,
getPeerID func() (p2ptypes.PeerID, error),
IDsToDONs map[DonID]DON,
IDsToNodes map[p2ptypes.PeerID]kcr.CapabilitiesRegistryNodeInfo,
IDsToCapabilities map[string]Capability,
) LocalRegistry {
return LocalRegistry{
lggr: lggr.Named("LocalRegistry"),
getPeerID: getPeerID,
IDsToDONs: IDsToDONs,
IDsToNodes: IDsToNodes,
IDsToCapabilities: IDsToCapabilities,
}
}

func (l *LocalRegistry) LocalNode(ctx context.Context) (capabilities.Node, error) {
// Load the current nodes PeerWrapper, this gets us the current node's
// PeerID, allowing us to contextualize registry information in terms of DON ownership
Expand Down
142 changes: 142 additions & 0 deletions core/services/registrysyncer/mocks/orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

167 changes: 167 additions & 0 deletions core/services/registrysyncer/orm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package registrysyncer

import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"math/big"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"

kcr "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/capabilities_registry"
"github.com/smartcontractkit/chainlink/v2/core/logger"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

type capabilitiesRegistryNodeInfo struct {
NodeOperatorId uint32 `json:"nodeOperatorId"`
ConfigCount uint32 `json:"configCount"`
WorkflowDONId uint32 `json:"workflowDONId"`
Signer p2ptypes.PeerID `json:"signer"`
P2pId p2ptypes.PeerID `json:"p2pId"`
HashedCapabilityIds []p2ptypes.PeerID `json:"hashedCapabilityIds"`
CapabilitiesDONIds []string `json:"capabilitiesDONIds"`
}

func (l *LocalRegistry) MarshalJSON() ([]byte, error) {
idsToNodes := make(map[p2ptypes.PeerID]capabilitiesRegistryNodeInfo)
for k, v := range l.IDsToNodes {
hashedCapabilityIds := make([]p2ptypes.PeerID, len(v.HashedCapabilityIds))
for i, id := range v.HashedCapabilityIds {
hashedCapabilityIds[i] = p2ptypes.PeerID(id[:])
}
capabilitiesDONIds := make([]string, len(v.CapabilitiesDONIds))
for i, id := range v.CapabilitiesDONIds {
capabilitiesDONIds[i] = id.String()
}
idsToNodes[k] = capabilitiesRegistryNodeInfo{
NodeOperatorId: v.NodeOperatorId,
ConfigCount: v.ConfigCount,
WorkflowDONId: v.WorkflowDONId,
Signer: p2ptypes.PeerID(v.Signer[:]),
P2pId: p2ptypes.PeerID(v.P2pId[:]),
HashedCapabilityIds: hashedCapabilityIds,
CapabilitiesDONIds: capabilitiesDONIds,
}
}

b, err := json.Marshal(&struct {
IDsToDONs map[DonID]DON
IDsToNodes map[p2ptypes.PeerID]capabilitiesRegistryNodeInfo
IDsToCapabilities map[string]Capability
}{
IDsToDONs: l.IDsToDONs,
IDsToNodes: idsToNodes,
IDsToCapabilities: l.IDsToCapabilities,
})
if err != nil {
return []byte{}, err
}
return b, nil
}

func (l *LocalRegistry) UnmarshalJSON(data []byte) error {
temp := struct {
IDsToDONs map[DonID]DON
IDsToNodes map[p2ptypes.PeerID]capabilitiesRegistryNodeInfo
IDsToCapabilities map[string]Capability
}{
IDsToDONs: make(map[DonID]DON),
IDsToNodes: make(map[p2ptypes.PeerID]capabilitiesRegistryNodeInfo),
IDsToCapabilities: make(map[string]Capability),
}

if err := json.Unmarshal(data, &temp); err != nil {
return fmt.Errorf("failed to unmarshal state: %w", err)
}

l.IDsToDONs = temp.IDsToDONs

l.IDsToNodes = make(map[p2ptypes.PeerID]kcr.CapabilitiesRegistryNodeInfo)
for peerID, v := range temp.IDsToNodes {
hashedCapabilityIds := make([][32]byte, len(v.HashedCapabilityIds))
for i, id := range v.HashedCapabilityIds {
copy(hashedCapabilityIds[i][:], id[:])
}

capabilitiesDONIds := make([]*big.Int, len(v.CapabilitiesDONIds))
for i, id := range v.CapabilitiesDONIds {
bigInt := new(big.Int)
bigInt.SetString(id, 10)
capabilitiesDONIds[i] = bigInt
}
l.IDsToNodes[peerID] = kcr.CapabilitiesRegistryNodeInfo{
NodeOperatorId: v.NodeOperatorId,
ConfigCount: v.ConfigCount,
WorkflowDONId: v.WorkflowDONId,
Signer: v.Signer,
P2pId: v.P2pId,
HashedCapabilityIds: hashedCapabilityIds,
CapabilitiesDONIds: capabilitiesDONIds,
}
}

l.IDsToCapabilities = temp.IDsToCapabilities

return nil
}

type ORM interface {
AddLocalRegistry(ctx context.Context, localRegistry LocalRegistry) error
LatestLocalRegistry(ctx context.Context) (*LocalRegistry, error)
}

type orm struct {
ds sqlutil.DataSource
lggr logger.Logger
}

var _ ORM = (*orm)(nil)

func NewORM(ds sqlutil.DataSource, lggr logger.Logger) orm {
namedLogger := lggr.Named("RegistrySyncerORM")
return orm{
ds: ds,
lggr: namedLogger,
}
}

func (orm orm) AddLocalRegistry(ctx context.Context, localRegistry LocalRegistry) error {
return sqlutil.TransactDataSource(ctx, orm.ds, nil, func(tx sqlutil.DataSource) error {
localRegistryJSON, err := localRegistry.MarshalJSON()
if err != nil {
return err
}
hash := sha256.Sum256(localRegistryJSON)
_, err = tx.ExecContext(
ctx,
`INSERT INTO registry_syncer_states (data, data_hash) VALUES ($1, $2) ON CONFLICT (data_hash) DO NOTHING`,
localRegistryJSON, fmt.Sprintf("%x", hash[:]),
)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, `DELETE FROM registry_syncer_states
WHERE data_hash NOT IN (
SELECT data_hash FROM registry_syncer_states
ORDER BY id DESC
LIMIT 10
);`)
return err
})
}

func (orm orm) LatestLocalRegistry(ctx context.Context) (*LocalRegistry, error) {
var localRegistry LocalRegistry
var localRegistryJSON string
err := orm.ds.GetContext(ctx, &localRegistryJSON, `SELECT data FROM registry_syncer_states ORDER BY id DESC LIMIT 1`)
if err != nil {
return nil, err
}
err = localRegistry.UnmarshalJSON([]byte(localRegistryJSON))
if err != nil {
return nil, err
}
return &localRegistry, nil
}
Loading

0 comments on commit c92a721

Please sign in to comment.