Skip to content

Commit

Permalink
Merge branch 'main' into will/unknown-addr-2
Browse files Browse the repository at this point in the history
  • Loading branch information
winder authored Oct 15, 2024
2 parents f495481 + 6c3cc4d commit 9cdde81
Show file tree
Hide file tree
Showing 115 changed files with 1,792 additions and 1,232 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/santhosh-tekuri/jsonschema/v5 v5.2.0
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7
github.com/smartcontractkit/libocr v0.0.0-20240717100443-f6226e09bee7
github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0
go.opentelemetry.io/otel v1.28.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7 h1:12ijqMM9tvYVEm+nR826WsrNi6zCKpwBhuApq127wHs=
github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7/go.mod h1:FX7/bVdoep147QQhsOPkYsPEXhGZjeYx6lBSaSXtZOA=
github.com/smartcontractkit/libocr v0.0.0-20240717100443-f6226e09bee7 h1:e38V5FYE7DA1JfKXeD5Buo/7lczALuVXlJ8YNTAUxcw=
github.com/smartcontractkit/libocr v0.0.0-20240717100443-f6226e09bee7/go.mod h1:fb1ZDVXACvu4frX3APHZaEBp0xi1DIm34DcA0CwTsZM=
github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12 h1:NzZGjaqez21I3DU7objl3xExTH4fxYvzTqar8DC6360=
github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12/go.mod h1:fb1ZDVXACvu4frX3APHZaEBp0xi1DIm34DcA0CwTsZM=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
Expand Down
2 changes: 1 addition & 1 deletion pkg/capabilities/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (e errStopExecution) Error() string {
}

func (e errStopExecution) Is(err error) bool {
return err.Error() == errStopExecutionMsg
return strings.Contains(err.Error(), errStopExecutionMsg)
}

// CapabilityType enum values.
Expand Down
124 changes: 124 additions & 0 deletions pkg/capabilities/consensus/ocr3/aggregators/identical.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package aggregators

import (
"crypto/sha256"
"fmt"

"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/types"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/values"

ocrcommon "github.com/smartcontractkit/libocr/commontypes"
)

type identicalAggregator struct {
config aggregatorConfig
lggr logger.Logger
}

type aggregatorConfig struct {
// Length of the list of observations that each node is expected to provide.
// Aggregator's output (i.e. EncodableOutcome) will be a values.Map with the same
// number of elements and keyed by indices 0,1,2,... (unless KeyOverrides are provided).
// Defaults to 1.
ExpectedObservationsLen int
// If non-empty, the keys in the outcome map will be replaced with these values.
// If non-empty, must be of length ExpectedObservationsLen.
KeyOverrides []string
}

type counter struct {
fullObservation values.Value
count int
}

var _ types.Aggregator = (*identicalAggregator)(nil)

func (a *identicalAggregator) Aggregate(lggr logger.Logger, _ *types.AggregationOutcome, observations map[ocrcommon.OracleID][]values.Value, f int) (*types.AggregationOutcome, error) {
counters := []map[[32]byte]*counter{}
for i := 0; i < a.config.ExpectedObservationsLen; i++ {
counters = append(counters, map[[32]byte]*counter{})
}
for nodeID, nodeObservations := range observations {
if len(nodeObservations) == 0 || nodeObservations[0] == nil {
lggr.Warnf("node %d contributed with empty observations", nodeID)
continue
}
if len(nodeObservations) != a.config.ExpectedObservationsLen {
lggr.Warnf("node %d contributed with an incorrect number of observations %d - ignoring them", nodeID, len(nodeObservations))
continue
}
for idx, observation := range nodeObservations {
marshalled, err := proto.MarshalOptions{Deterministic: true}.Marshal(values.Proto(observation))
if err != nil {
return nil, err
}
sha := sha256.Sum256(marshalled)
elem, ok := counters[idx][sha]
if !ok {
counters[idx][sha] = &counter{
fullObservation: observation,
count: 1,
}
} else {
elem.count++
}
}
}
return a.collectHighestCounts(counters, f)
}

func (a *identicalAggregator) collectHighestCounts(counters []map[[32]byte]*counter, f int) (*types.AggregationOutcome, error) {
useOverrides := len(a.config.KeyOverrides) == len(counters)
outcome := make(map[string]any)
for idx, shaToCounter := range counters {
highestCount := 0
var highestObservation values.Value
for _, counter := range shaToCounter {
if counter.count > highestCount {
highestCount = counter.count
highestObservation = counter.fullObservation
}
}
if highestCount < 2*f+1 {
return nil, fmt.Errorf("can't reach consensus on observations with index %d", idx)
}
if useOverrides {
outcome[a.config.KeyOverrides[idx]] = highestObservation
} else {
outcome[fmt.Sprintf("%d", idx)] = highestObservation
}
}
valMap, err := values.NewMap(outcome)
if err != nil {
return nil, err
}
return &types.AggregationOutcome{
EncodableOutcome: values.ProtoMap(valMap),
Metadata: nil,
ShouldReport: true,
}, nil
}

func NewIdenticalAggregator(config values.Map) (*identicalAggregator, error) {
parsedConfig, err := ParseConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to parse config (%+v): %w", config, err)
}
return &identicalAggregator{
config: parsedConfig,
}, nil
}

func ParseConfig(config values.Map) (aggregatorConfig, error) {
parsedConfig := aggregatorConfig{}
if err := config.UnwrapTo(&parsedConfig); err != nil {
return aggregatorConfig{}, err
}
if parsedConfig.ExpectedObservationsLen == 0 {
parsedConfig.ExpectedObservationsLen = 1
}
return parsedConfig, nil
}
93 changes: 93 additions & 0 deletions pkg/capabilities/consensus/ocr3/aggregators/identical_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package aggregators_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/libocr/commontypes"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/aggregators"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/values"
)

func TestDataFeedsAggregator_Aggregate(t *testing.T) {
config := getConfig(t, nil)
agg, err := aggregators.NewIdenticalAggregator(*config)
require.NoError(t, err)

observations := map[commontypes.OracleID][]values.Value{
0: {values.NewString("a")},
1: {values.NewString("a")},
2: {values.NewString("a")},
3: {values.NewString("a")},
}
outcome, err := agg.Aggregate(logger.Nop(), nil, observations, 1)
require.NoError(t, err)
require.True(t, outcome.ShouldReport)
require.Equal(t, "", outcome.EncoderName)
require.Nil(t, outcome.EncoderConfig)

m, err := values.FromMapValueProto(outcome.EncodableOutcome)
require.NoError(t, err)

require.Len(t, m.Underlying, 1)
require.Equal(t, m.Underlying["0"], values.NewString("a"))
}

func TestDataFeedsAggregator_Aggregate_OverrideWithKeys(t *testing.T) {
config := getConfig(t, []string{"outcome"})
agg, err := aggregators.NewIdenticalAggregator(*config)
require.NoError(t, err)

observations := map[commontypes.OracleID][]values.Value{
0: {values.NewString("a")},
1: {values.NewString("a")},
2: {values.NewString("a")},
3: {values.NewString("a")},
}
outcome, err := agg.Aggregate(logger.Nop(), nil, observations, 1)
require.NoError(t, err)
require.True(t, outcome.ShouldReport)
require.Equal(t, "", outcome.EncoderName)
require.Nil(t, outcome.EncoderConfig)

m, err := values.FromMapValueProto(outcome.EncodableOutcome)
require.NoError(t, err)

require.Len(t, m.Underlying, 1)
require.Equal(t, m.Underlying["outcome"], values.NewString("a"))
}

func TestDataFeedsAggregator_Aggregate_NoConsensus(t *testing.T) {
config := getConfig(t, []string{"outcome"})
agg, err := aggregators.NewIdenticalAggregator(*config)
require.NoError(t, err)

encoderStr := "evm"
encoderName := values.NewString(encoderStr)
encoderCfg, err := values.NewMap(map[string]any{"foo": "bar"})
require.NoError(t, err)

observations := map[commontypes.OracleID][]values.Value{
0: {values.NewString("a"), encoderName, encoderCfg},
1: {values.NewString("b"), encoderName, encoderCfg},
2: {values.NewString("b"), encoderName, encoderCfg},
3: {values.NewString("a"), encoderName, encoderCfg},
}
outcome, err := agg.Aggregate(logger.Nop(), nil, observations, 1)
require.Nil(t, outcome)
require.ErrorContains(t, err, "can't reach consensus on observations with index 0")
}

func getConfig(t *testing.T, overrideKeys []string) *values.Map {
unwrappedConfig := map[string]any{
"expectedObservationsLen": len(overrideKeys),
"keyOverrides": overrideKeys,
}

config, err := values.NewMap(unwrappedConfig)
require.NoError(t, err)
return config
}
2 changes: 2 additions & 0 deletions pkg/capabilities/consensus/ocr3/capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ func (o *capability) queueRequestForProcessing(
WorkflowDonID: metadata.WorkflowDonID,
WorkflowDonConfigVersion: metadata.WorkflowDonConfigVersion,
Observations: i.Observations,
OverriddenEncoderName: i.EncoderName,
OverriddenEncoderConfig: i.EncoderConfig,
KeyID: c.KeyID,
ExpiresAt: o.clock.Now().Add(requestTimeout),
}
Expand Down
Loading

0 comments on commit 9cdde81

Please sign in to comment.