Skip to content

Commit

Permalink
Use in-memory DB for OCR persistance
Browse files Browse the repository at this point in the history
  • Loading branch information
DeividasK committed Sep 30, 2024
1 parent 5f2fc40 commit 0d24b8b
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 4 deletions.
123 changes: 123 additions & 0 deletions core/services/ocr2/plugins/generic/oracle_factory_db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package generic

import (
"context"
"time"

"github.com/pkg/errors"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink/v2/core/logger"
)

type memoryDb struct {
// The ID is used for logging and error messages
// A single standard capabilities spec can instantiate multiple oracles
// TODO: NewOracle should take a unique identifier for the oracle
specID int32
lggr logger.SugaredLogger
config *ocrtypes.ContractConfig
states map[ocrtypes.ConfigDigest]*ocrtypes.PersistentState
pendingTransmissions map[ocrtypes.ReportTimestamp]ocrtypes.PendingTransmission
protocolStates map[ocrtypes.ConfigDigest]map[string][]byte
}

var (
_ ocrtypes.Database = &memoryDb{}
)

// NewDB returns a new DB scoped to this instanceID
func NewMemoryDB(specID int32, lggr logger.Logger) *memoryDb {
return &memoryDb{
specID: specID,
lggr: logger.Sugared(lggr.Named("OracleFactoryMemoryDb")),
}
}

func (md *memoryDb) ReadState(ctx context.Context, cd ocrtypes.ConfigDigest) (ps *ocrtypes.PersistentState, err error) {
ps, ok := md.states[cd]
if !ok {
return nil, errors.Errorf("state not found for standard capabilities spec ID %d, config digest %s", md.specID, cd)
}

return ps, nil
}

func (md *memoryDb) WriteState(ctx context.Context, cd ocrtypes.ConfigDigest, state ocrtypes.PersistentState) error {
md.states[cd] = &state
return nil
}

func (md *memoryDb) ReadConfig(ctx context.Context) (c *ocrtypes.ContractConfig, err error) {
if md.config == nil {
return nil, errors.Errorf("config not found for standard capabilities spec ID %d", md.specID)
}
return md.config, nil
}

func (md *memoryDb) WriteConfig(ctx context.Context, c ocrtypes.ContractConfig) error {
md.config = &c
return nil
}

func (md *memoryDb) StorePendingTransmission(ctx context.Context, t ocrtypes.ReportTimestamp, tx ocrtypes.PendingTransmission) error {
md.pendingTransmissions[t] = tx
return nil
}

func (md *memoryDb) PendingTransmissionsWithConfigDigest(ctx context.Context, cd ocrtypes.ConfigDigest) (map[ocrtypes.ReportTimestamp]ocrtypes.PendingTransmission, error) {
m := make(map[ocrtypes.ReportTimestamp]ocrtypes.PendingTransmission)
for k, v := range md.pendingTransmissions {
if k.ConfigDigest == cd {
m[k] = v
}
}

return m, nil
}

func (md *memoryDb) DeletePendingTransmission(ctx context.Context, t ocrtypes.ReportTimestamp) error {
delete(md.pendingTransmissions, t)
return nil
}

func (md *memoryDb) DeletePendingTransmissionsOlderThan(ctx context.Context, t time.Time) error {
for k, v := range md.pendingTransmissions {
if v.Time.Before(t) {
delete(md.pendingTransmissions, k)
}
}

return nil
}

func (md *memoryDb) ReadProtocolState(
ctx context.Context,
configDigest ocrtypes.ConfigDigest,
key string,
) ([]byte, error) {
value, ok := md.protocolStates[configDigest][key]
if !ok {
// Previously implementation returned nil if the state is not found
// TODO: Should this return nil, nil?
return nil, nil
}
return value, nil
}

func (md *memoryDb) WriteProtocolState(
ctx context.Context,
configDigest ocrtypes.ConfigDigest,
key string,
value []byte,
) error {
if value == nil {
delete(md.protocolStates[configDigest], key)
} else {
if md.protocolStates[configDigest] == nil {
md.protocolStates[configDigest] = make(map[string][]byte)
}
md.protocolStates[configDigest][key] = value
}
return nil
}
3 changes: 1 addition & 2 deletions core/services/ocr2/plugins/generic/oraclefactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ type oracleFactory struct {
}

type OracleFactoryParams struct {
Database ocr3types.Database
JobID int32
JobName string
JobORM job.ORM
Expand All @@ -82,7 +81,7 @@ type OracleFactoryParams struct {

func NewOracleFactory(params OracleFactoryParams) (core.OracleFactory, error) {
return &oracleFactory{
database: params.Database,
database: NewMemoryDB(params.JobID, params.Logger),
jobID: params.JobID,
jobName: params.JobName,
jobORM: params.JobORM,
Expand Down
22 changes: 20 additions & 2 deletions core/services/standardcapabilities/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package standardcapabilities

import (
"context"
"encoding/json"
"fmt"

"github.com/google/uuid"
Expand All @@ -19,7 +20,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/chaintype"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ocr2key"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/generic"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
Expand Down Expand Up @@ -123,6 +123,25 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser
keyBundle = keyBundles[0]
}

keyBundleBytes, err := json.Marshal(struct {
PeerID string
PublicKey []byte
OffchainPublicKey [32]byte
ConfigEncryptionPublicKey [32]byte
}{
PeerID: d.peerWrapper.Peer2.PeerID(),
PublicKey: keyBundle.PublicKey(),
OffchainPublicKey: keyBundle.OffchainPublicKey(),
ConfigEncryptionPublicKey: keyBundle.ConfigEncryptionPublicKey(),
})
if err != nil {
return nil, errors.Wrap(err, "failed to marshal key bundle")
}
log.Debug("string(keyBundleBytes): ", string(keyBundleBytes))

spec.StandardCapabilitiesSpec.Config = spec.StandardCapabilitiesSpec.Config + string(keyBundleBytes)
log.Debug("Config: ", spec.StandardCapabilitiesSpec.Config)

oracleFactoryConfig, err := generic.NewOracleFactoryConfig(spec.StandardCapabilitiesSpec.OracleFactory)
if err != nil {
return nil, errors.Wrap(err, "failed to unmarshal oracle factory config")
Expand All @@ -137,7 +156,6 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser
JobORM: d.jobORM,
JobID: spec.ID,
JobName: spec.Name.ValueOrZero(),
Database: ocr2.NewDB(d.ds, spec.ID, 0, log),
Kb: keyBundle,
Config: oracleFactoryConfig,
PeerWrapper: d.peerWrapper,
Expand Down

0 comments on commit 0d24b8b

Please sign in to comment.