Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KS-430] Provide an OracleFactory to StandardCapabilities #14305

Draft
wants to merge 14 commits into
base: develop
Choose a base branch
from
Draft
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/prometheus/client_golang v1.20.0
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chainlink-automation v1.0.4
github.com/smartcontractkit/chainlink-common v0.2.3-0.20240925085218-aded1b263ecc
github.com/smartcontractkit/chainlink-common v0.2.3-0.20240926094457-238a5afc96ef
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20240717100443-f6226e09bee7
github.com/spf13/cobra v1.8.1
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1083,8 +1083,8 @@ github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8um
github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20240924115754-8858b0423283 h1:f0vdqcOL9kJZwfmWE76roIyEuiZx/R82js0IfXNAvXg=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20240924115754-8858b0423283/go.mod h1:KP82vFCqm+M1G1t6Vos5CewGUGYJkxxCEdxnta4uLlE=
github.com/smartcontractkit/chainlink-common v0.2.3-0.20240925085218-aded1b263ecc h1:ALbyaoRzUSXQ2NhGFKVOyJqO22IB5yQjhjKWbIZGbrI=
github.com/smartcontractkit/chainlink-common v0.2.3-0.20240925085218-aded1b263ecc/go.mod h1:F6WUS6N4mP5ScwpwyTyAJc9/vjR+GXbMCRUOVekQi1g=
github.com/smartcontractkit/chainlink-common v0.2.3-0.20240926094457-238a5afc96ef h1:tZ5R0SJcxGptU0E1DhiDxxVM7HJEamv86dK9T1CYN7I=
github.com/smartcontractkit/chainlink-common v0.2.3-0.20240926094457-238a5afc96ef/go.mod h1:+yz2So1Lp90C3ATfMmqMEJaIr3zfG8e4xHa/3u1kIDk=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 h1:lTGIOQYLk1Ufn++X/AvZnt6VOcuhste5yp+C157No/Q=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7/go.mod h1:BMYE1vC/pGmdFSsOJdPrAA0/4gZ0Xo0SxTMdGspBtRo=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 h1:yRk4ektpx/UxwarqAfgxUXLrsYXlaNeP1NOwzHGrK2Q=
Expand Down
23 changes: 14 additions & 9 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,15 +453,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
pipelineRunner,
cfg.JobPipeline(),
),
job.StandardCapabilities: standardcapabilities.NewDelegate(
globalLogger,
opts.DS, jobORM,
opts.CapabilitiesRegistry,
loopRegistrarConfig,
telemetryManager,
pipelineRunner,
opts.RelayerChainInteroperators,
gatewayConnectorWrapper),
}
webhookJobRunner = delegates[job.Webhook].(*webhook.Delegate).WebhookJobRunner()
)
Expand Down Expand Up @@ -501,6 +492,20 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
return nil, fmt.Errorf("P2P stack required for OCR or OCR2")
}

// If peer wrapper is initialized, Oracle Factory dependency will be available to standard capabilities
delegates[job.StandardCapabilities] = standardcapabilities.NewDelegate(
globalLogger,
opts.DS, jobORM,
opts.CapabilitiesRegistry,
loopRegistrarConfig,
telemetryManager,
pipelineRunner,
opts.RelayerChainInteroperators,
gatewayConnectorWrapper,
keyStore.OCR2(),
peerWrapper,
)

if cfg.OCR().Enabled() {
delegates[job.OffchainReporting] = ocr.NewDelegate(
opts.DS,
Expand Down
11 changes: 6 additions & 5 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,11 +928,12 @@ func (w *WorkflowSpec) SDKSpec(ctx context.Context) (sdk.WorkflowSpec, error) {
}

type StandardCapabilitiesSpec struct {
ID int32
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
Command string `toml:"command"`
Config string `toml:"config"`
ID int32
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
Command string `toml:"command"`
Config string `toml:"config"`
OracleFactory JSONConfig `toml:"oracleFactory"`
Copy link
Contributor

@cedric-cordenier cedric-cordenier Sep 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DeividasK OracleFactoryConfig?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put a comment here to document where we define what's expected in OracleConfig?

}

func (w *StandardCapabilitiesSpec) GetID() string {
Expand Down
123 changes: 123 additions & 0 deletions core/services/ocr2/plugins/generic/oracle_factory_db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package generic

import (
"context"
"time"

"github.com/pkg/errors"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink/v2/core/logger"
)

type memoryDb struct {
// The ID is used for logging and error messages
// A single standard capabilities spec can instantiate multiple oracles
// TODO: NewOracle should take a unique identifier for the oracle
specID int32
lggr logger.SugaredLogger
config *ocrtypes.ContractConfig
states map[ocrtypes.ConfigDigest]*ocrtypes.PersistentState
pendingTransmissions map[ocrtypes.ReportTimestamp]ocrtypes.PendingTransmission
protocolStates map[ocrtypes.ConfigDigest]map[string][]byte
}

var (
_ ocrtypes.Database = &memoryDb{}
)

// NewDB returns a new DB scoped to this instanceID
func NewMemoryDB(specID int32, lggr logger.Logger) *memoryDb {
return &memoryDb{
specID: specID,
lggr: logger.Sugared(lggr.Named("OracleFactoryMemoryDb")),
}
}

func (md *memoryDb) ReadState(ctx context.Context, cd ocrtypes.ConfigDigest) (ps *ocrtypes.PersistentState, err error) {
ps, ok := md.states[cd]
if !ok {
return nil, errors.Errorf("state not found for standard capabilities spec ID %d, config digest %s", md.specID, cd)
}

return ps, nil
}

func (md *memoryDb) WriteState(ctx context.Context, cd ocrtypes.ConfigDigest, state ocrtypes.PersistentState) error {
md.states[cd] = &state
return nil
}

func (md *memoryDb) ReadConfig(ctx context.Context) (c *ocrtypes.ContractConfig, err error) {
if md.config == nil {
return nil, errors.Errorf("config not found for standard capabilities spec ID %d", md.specID)
}
return md.config, nil
}

func (md *memoryDb) WriteConfig(ctx context.Context, c ocrtypes.ContractConfig) error {
md.config = &c
return nil
}

func (md *memoryDb) StorePendingTransmission(ctx context.Context, t ocrtypes.ReportTimestamp, tx ocrtypes.PendingTransmission) error {
md.pendingTransmissions[t] = tx
return nil
}

func (md *memoryDb) PendingTransmissionsWithConfigDigest(ctx context.Context, cd ocrtypes.ConfigDigest) (map[ocrtypes.ReportTimestamp]ocrtypes.PendingTransmission, error) {
m := make(map[ocrtypes.ReportTimestamp]ocrtypes.PendingTransmission)
for k, v := range md.pendingTransmissions {
if k.ConfigDigest == cd {
m[k] = v
}
}

return m, nil
}

func (md *memoryDb) DeletePendingTransmission(ctx context.Context, t ocrtypes.ReportTimestamp) error {
delete(md.pendingTransmissions, t)
return nil
}

func (md *memoryDb) DeletePendingTransmissionsOlderThan(ctx context.Context, t time.Time) error {
for k, v := range md.pendingTransmissions {
if v.Time.Before(t) {
delete(md.pendingTransmissions, k)
}
}

return nil
}

func (md *memoryDb) ReadProtocolState(
ctx context.Context,
configDigest ocrtypes.ConfigDigest,
key string,
) ([]byte, error) {
value, ok := md.protocolStates[configDigest][key]
if !ok {
// Previously implementation returned nil if the state is not found
// TODO: Should this return nil, nil?
return nil, nil
}
return value, nil
}

func (md *memoryDb) WriteProtocolState(
ctx context.Context,
configDigest ocrtypes.ConfigDigest,
key string,
value []byte,
) error {
if value == nil {
delete(md.protocolStates[configDigest], key)
} else {
if md.protocolStates[configDigest] == nil {
md.protocolStates[configDigest] = make(map[string][]byte)
}
md.protocolStates[configDigest][key] = value
}
return nil
}
135 changes: 135 additions & 0 deletions core/services/ocr2/plugins/generic/oraclefactory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package generic

import (
"context"
"encoding/json"
"fmt"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/smartcontractkit/libocr/commontypes"
ocr "github.com/smartcontractkit/libocr/offchainreporting2plus"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ocr2key"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon"
"github.com/smartcontractkit/chainlink/v2/core/services/telemetry"

"github.com/smartcontractkit/chainlink-common/pkg/types/core"
)

type oracleFactoryConfig struct {
Enabled bool
TraceLogging bool
BootstrapPeers []commontypes.BootstrapperLocator
}

func NewOracleFactoryConfig(config job.JSONConfig) (*oracleFactoryConfig, error) {
var ofc struct {
Enabled bool `json:"enabled"`
TraceLogging bool `json:"traceLogging"`
BootstrapPeers []string `json:"bootstrapPeers"`
}
err := json.Unmarshal(config.Bytes(), &ofc)
if err != nil {
return nil, errors.Wrap(err, "failed to unmarshal oracle factory config")
}

if !ofc.Enabled {
return &oracleFactoryConfig{}, nil
}

// If Oracle Factory is enabled, it must have at least one bootstrap peer
if len(ofc.BootstrapPeers) == 0 {
return nil, errors.New("no bootstrap peers found")
}

bootstrapPeers, err := ocrcommon.ParseBootstrapPeers(ofc.BootstrapPeers)
if err != nil {
return nil, errors.Wrap(err, "failed to parse bootstrap peers")
}

return &oracleFactoryConfig{
Enabled: true,
TraceLogging: ofc.TraceLogging,
BootstrapPeers: bootstrapPeers,
}, nil
}

type oracleFactory struct {
database ocr3types.Database
jobID int32
jobName string
jobORM job.ORM
kb ocr2key.KeyBundle
lggr logger.Logger
config *oracleFactoryConfig
peerWrapper *ocrcommon.SingletonPeerWrapper
}

type OracleFactoryParams struct {
JobID int32
JobName string
JobORM job.ORM
Kb ocr2key.KeyBundle
Logger logger.Logger
Config *oracleFactoryConfig
PeerWrapper *ocrcommon.SingletonPeerWrapper
}

func NewOracleFactory(params OracleFactoryParams) (core.OracleFactory, error) {
return &oracleFactory{
database: NewMemoryDB(params.JobID, params.Logger),
jobID: params.JobID,
jobName: params.JobName,
jobORM: params.JobORM,
kb: params.Kb,
lggr: params.Logger,
config: params.Config,
peerWrapper: params.PeerWrapper,
}, nil
}

func (of *oracleFactory) NewOracle(ctx context.Context, args core.OracleArgs) (core.Oracle, error) {
if !of.peerWrapper.IsStarted() {
return nil, errors.New("peer wrapper not started")
}
oracle, err := ocr.NewOracle(ocr.OCR3OracleArgs[[]byte]{
LocalConfig: args.LocalConfig,
ContractConfigTracker: args.ContractConfigTracker,
ContractTransmitter: args.ContractTransmitter,
OffchainConfigDigester: args.OffchainConfigDigester,
ReportingPluginFactory: args.ReportingPluginFactoryService,
BinaryNetworkEndpointFactory: of.peerWrapper.Peer2,
V2Bootstrappers: of.config.BootstrapPeers,
Database: of.database,
Logger: ocrcommon.NewOCRWrapper(of.lggr, of.config.TraceLogging, func(ctx context.Context, msg string) {
logger.Sugared(of.lggr).ErrorIf(of.jobORM.RecordError(ctx, of.jobID, msg), "unable to record error")
}),
// TODO?
MonitoringEndpoint: &telemetry.NoopAgent{},
OffchainKeyring: of.kb,
OnchainKeyring: ocrcommon.NewOCR3OnchainKeyringAdapter(of.kb),
MetricsRegisterer: prometheus.WrapRegistererWith(map[string]string{"job_name": of.jobName}, prometheus.DefaultRegisterer),
})

if err != nil {
return nil, fmt.Errorf("%w: failed to create new OCR oracle", err)
}

return &adaptedOracle{oracle: oracle}, nil
}

type adaptedOracle struct {
oracle ocr.Oracle
}

func (a *adaptedOracle) Start(ctx context.Context) error {
return a.oracle.Start()
}

func (a *adaptedOracle) Close(ctx context.Context) error {
return a.oracle.Close()
}
Loading