Skip to content

Commit

Permalink
add context
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Apr 24, 2024
1 parent add4946 commit f441fe3
Show file tree
Hide file tree
Showing 41 changed files with 195 additions and 291 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/smartcontractkit/chainlink-common

go 1.21

replace github.com/smartcontractkit/libocr => github.com/jmank88/libocr v0.0.0-20240311135254-702bd5bd3727

require (
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
github.com/ethereum/go-ethereum v1.13.8
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ github.com/invopop/jsonschema v0.12.0 h1:6ovsNSuvn9wEQVOyc72aycBMVQFKz7cPdMJn10C
github.com/invopop/jsonschema v0.12.0/go.mod h1:ffZ5Km5SWWRAIN6wbDXItl95euhFz2uON45H2qjYt+0=
github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c=
github.com/jhump/protoreflect v1.15.1/go.mod h1:jD/2GMKKE6OqX8qTjhADU1e6DShO+gavG9e0Q693nKo=
github.com/jmank88/libocr v0.0.0-20240311135254-702bd5bd3727 h1:G9Djr4klIISnLqRgTOl75ay4PXb7BnxTl+JCCvk0Dm8=
github.com/jmank88/libocr v0.0.0-20240311135254-702bd5bd3727/go.mod h1:SJEZCHgMCAzzBvo9vMV2DQ9onfEcIJCYSViyP4JI6c4=
github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g=
github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ=
github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4=
Expand Down Expand Up @@ -289,8 +291,6 @@ github.com/smartcontractkit/go-plugin v0.0.0-20240208201424-b3b91517de16 h1:TFe+
github.com/smartcontractkit/go-plugin v0.0.0-20240208201424-b3b91517de16/go.mod h1:lBS5MtSSBZk0SHc66KACcjjlU6WzEVP/8pwz68aMkCI=
github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f h1:hgJif132UCdjo8u43i7iPN1/MFnu49hv7lFGFftCHKU=
github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f/go.mod h1:MvMXoufZAtqExNexqi4cjrNYE9MefKddKylxjS+//n0=
github.com/smartcontractkit/libocr v0.0.0-20240326191951-2bbe9382d052 h1:1WFjrrVrWoQ9UpVMh7Mx4jDpzhmo1h8hFUKd9awIhIU=
github.com/smartcontractkit/libocr v0.0.0-20240326191951-2bbe9382d052/go.mod h1:SJEZCHgMCAzzBvo9vMV2DQ9onfEcIJCYSViyP4JI6c4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
Expand Down
2 changes: 1 addition & 1 deletion pkg/capabilities/consensus/ocr3/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func newFactory(s *store, c *capability, batchSize int, lggr logger.Logger) (*fa
}, nil
}

func (o *factory) NewReportingPlugin(config ocr3types.ReportingPluginConfig) (ocr3types.ReportingPlugin[[]byte], ocr3types.ReportingPluginInfo, error) {
func (o *factory) NewReportingPlugin(ctx context.Context, config ocr3types.ReportingPluginConfig) (ocr3types.ReportingPlugin[[]byte], ocr3types.ReportingPluginInfo, error) {
rp, err := newReportingPlugin(o.store, o.capability, o.batchSize, config, o.lggr)
info := ocr3types.ReportingPluginInfo{
Name: "OCR3 Capability Plugin",
Expand Down
14 changes: 9 additions & 5 deletions pkg/capabilities/consensus/ocr3/reporting_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,15 @@ func (r *reportingPlugin) Observation(ctx context.Context, outctx ocr3types.Outc
return proto.MarshalOptions{Deterministic: true}.Marshal(obs)
}

func (r *reportingPlugin) ValidateObservation(outctx ocr3types.OutcomeContext, query types.Query, ao types.AttributedObservation) error {
func (r *reportingPlugin) ValidateObservation(ctx context.Context, outctx ocr3types.OutcomeContext, query types.Query, ao types.AttributedObservation) error {
return nil
}

func (r *reportingPlugin) ObservationQuorum(outctx ocr3types.OutcomeContext, query types.Query) (ocr3types.Quorum, error) {
func (r *reportingPlugin) ObservationQuorum(ctx context.Context, outctx ocr3types.OutcomeContext, query types.Query) (ocr3types.Quorum, error) {
return ocr3types.QuorumTwoFPlusOne, nil
}

func (r *reportingPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, aos []types.AttributedObservation) (ocr3types.Outcome, error) {
func (r *reportingPlugin) Outcome(ctx context.Context, outctx ocr3types.OutcomeContext, query types.Query, aos []types.AttributedObservation) (ocr3types.Outcome, error) {
// execution ID -> oracle ID -> list of observations
m := map[string]map[ocrcommon.OracleID][]values.Value{}
for _, o := range aos {
Expand Down Expand Up @@ -212,7 +212,7 @@ func (r *reportingPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Q
return rawOutcome, err
}

func (r *reportingPlugin) Reports(seqNr uint64, outcome ocr3types.Outcome) ([]ocr3types.ReportWithInfo[[]byte], error) {
func (r *reportingPlugin) Reports(ctx context.Context, seqNr uint64, outcome ocr3types.Outcome) ([]ocr3types.ReportWithInfo[[]byte], error) {
o := &pbtypes.Outcome{}
err := proto.Unmarshal(outcome, o)
if err != nil {
Expand Down Expand Up @@ -244,8 +244,12 @@ func (r *reportingPlugin) Reports(seqNr uint64, outcome ocr3types.Outcome) ([]oc
}

mv := values.FromMapValueProto(newOutcome.EncodableOutcome)
report, err = enc.Encode(context.TODO(), *mv)
report, err = enc.Encode(ctx, *mv)
if err != nil {
if cerr := ctx.Err(); cerr != nil {
r.lggr.Errorw("report encoding cancelled", "err", cerr)
return nil, cerr
}
r.lggr.Errorw("could not encode report for workflow", "error", err, "workflowID", id.WorkflowId)
continue
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/capabilities/consensus/ocr3/reporting_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func TestReportingPlugin_Outcome(t *testing.T) {
},
}

outcome, err := rp.Outcome(ocr3types.OutcomeContext{}, qb, aos)
outcome, err := rp.Outcome(tests.Context(t), ocr3types.OutcomeContext{}, qb, aos)
require.NoError(t, err)

opb := &pbtypes.Outcome{}
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestReportingPlugin_Reports_ShouldReportFalse(t *testing.T) {
}
pl, err := proto.Marshal(outcome)
require.NoError(t, err)
reports, err := rp.Reports(sqNr, pl)
reports, err := rp.Reports(tests.Context(t), sqNr, pl)
require.NoError(t, err)

assert.Len(t, reports, 1)
Expand Down Expand Up @@ -315,7 +315,7 @@ func TestReportingPlugin_Reports_ShouldReportTrue(t *testing.T) {
}
pl, err := proto.Marshal(outcome)
require.NoError(t, err)
reports, err := rp.Reports(sqNr, pl)
reports, err := rp.Reports(tests.Context(t), sqNr, pl)
require.NoError(t, err)

assert.Len(t, reports, 1)
Expand Down
73 changes: 0 additions & 73 deletions pkg/loop/adapter.go

This file was deleted.

4 changes: 2 additions & 2 deletions pkg/loop/ccip_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ func NewExecutionService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec
return &efs
}

func (m *ExecutionFactoryService) NewReportingPlugin(config ocrtypes.ReportingPluginConfig) (ocrtypes.ReportingPlugin, ocrtypes.ReportingPluginInfo, error) {
func (m *ExecutionFactoryService) NewReportingPlugin(ctx context.Context, config ocrtypes.ReportingPluginConfig) (ocrtypes.ReportingPlugin, ocrtypes.ReportingPluginInfo, error) {
if err := m.Wait(); err != nil {
return nil, ocrtypes.ReportingPluginInfo{}, err
}
return m.Service.NewReportingPlugin(config)
return m.Service.NewReportingPlugin(ctx, config)
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ func NewReportingPluginFactoryClient(b *net.BrokerExt, cc grpc.ClientConnInterfa
}
}

func (r *ReportingPluginFactoryClient) NewReportingPlugin(config libocr.ReportingPluginConfig) (libocr.ReportingPlugin, libocr.ReportingPluginInfo, error) {
ctx, cancel := r.StopCtx()
defer cancel()
func (r *ReportingPluginFactoryClient) NewReportingPlugin(ctx context.Context, config libocr.ReportingPluginConfig) (libocr.ReportingPlugin, libocr.ReportingPluginInfo, error) {
reply, err := r.grpc.NewReportingPlugin(ctx, &pb.NewReportingPluginRequest{ReportingPluginConfig: &pb.ReportingPluginConfig{
ConfigDigest: config.ConfigDigest[:],
OracleID: uint32(config.OracleID),
Expand Down Expand Up @@ -98,7 +96,7 @@ func (r *ReportingPluginFactoryServer) NewReportingPlugin(ctx context.Context, r
}
copy(cfg.ConfigDigest[:], request.ReportingPluginConfig.ConfigDigest)

rp, rpi, err := r.impl.NewReportingPlugin(cfg)
rp, rpi, err := r.impl.NewReportingPlugin(ctx, cfg)
if err != nil {
return nil, err
}
Expand Down
30 changes: 10 additions & 20 deletions pkg/loop/internal/core/services/reportingplugin/ocr3/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ func newReportingPluginFactoryClient(b *net.BrokerExt, cc grpc.ClientConnInterfa
return &reportingPluginFactoryClient{b.WithName("OCR3ReportingPluginProviderClient"), goplugin.NewServiceClient(b, cc), ocr3.NewReportingPluginFactoryClient(cc)}
}

func (r *reportingPluginFactoryClient) NewReportingPlugin(config ocr3types.ReportingPluginConfig) (ocr3types.ReportingPlugin[[]byte], ocr3types.ReportingPluginInfo, error) {
ctx, cancel := r.StopCtx()
defer cancel()
func (r *reportingPluginFactoryClient) NewReportingPlugin(ctx context.Context, config ocr3types.ReportingPluginConfig) (ocr3types.ReportingPlugin[[]byte], ocr3types.ReportingPluginInfo, error) {
reply, err := r.grpc.NewReportingPlugin(ctx, &ocr3.NewReportingPluginRequest{ReportingPluginConfig: &ocr3.ReportingPluginConfig{
ConfigDigest: config.ConfigDigest[:],
OracleID: uint32(config.OracleID),
Expand Down Expand Up @@ -96,7 +94,7 @@ func (r *reportingPluginFactoryServer) NewReportingPlugin(ctx context.Context, r
}
copy(cfg.ConfigDigest[:], request.ReportingPluginConfig.ConfigDigest)

rp, rpi, err := r.impl.NewReportingPlugin(cfg)
rp, rpi, err := r.impl.NewReportingPlugin(ctx, cfg)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -150,9 +148,7 @@ func (o *reportingPluginClient) Observation(ctx context.Context, outctx ocr3type
return reply.Observation, nil
}

func (o *reportingPluginClient) ValidateObservation(outctx ocr3types.OutcomeContext, query libocr.Query, ao libocr.AttributedObservation) error {
ctx, cancel := o.StopCtx()
defer cancel()
func (o *reportingPluginClient) ValidateObservation(ctx context.Context, outctx ocr3types.OutcomeContext, query libocr.Query, ao libocr.AttributedObservation) error {
_, err := o.grpc.ValidateObservation(ctx, &ocr3.ValidateObservationRequest{
OutcomeContext: pbOutcomeContext(outctx),
Query: query,
Expand All @@ -161,9 +157,7 @@ func (o *reportingPluginClient) ValidateObservation(outctx ocr3types.OutcomeCont
return err
}

func (o *reportingPluginClient) ObservationQuorum(outctx ocr3types.OutcomeContext, query libocr.Query) (ocr3types.Quorum, error) {
ctx, cancel := o.StopCtx()
defer cancel()
func (o *reportingPluginClient) ObservationQuorum(ctx context.Context, outctx ocr3types.OutcomeContext, query libocr.Query) (ocr3types.Quorum, error) {
reply, err := o.grpc.ObservationQuorum(ctx, &ocr3.ObservationQuorumRequest{
OutcomeContext: pbOutcomeContext(outctx),
Query: query,
Expand All @@ -174,9 +168,7 @@ func (o *reportingPluginClient) ObservationQuorum(outctx ocr3types.OutcomeContex
return ocr3types.Quorum(reply.Quorum), nil
}

func (o *reportingPluginClient) Outcome(outctx ocr3types.OutcomeContext, query libocr.Query, aos []libocr.AttributedObservation) (ocr3types.Outcome, error) {
ctx, cancel := o.StopCtx()
defer cancel()
func (o *reportingPluginClient) Outcome(ctx context.Context, outctx ocr3types.OutcomeContext, query libocr.Query, aos []libocr.AttributedObservation) (ocr3types.Outcome, error) {
reply, err := o.grpc.Outcome(ctx, &ocr3.OutcomeRequest{
OutcomeContext: pbOutcomeContext(outctx),
Query: query,
Expand All @@ -188,9 +180,7 @@ func (o *reportingPluginClient) Outcome(outctx ocr3types.OutcomeContext, query l
return reply.Outcome, nil
}

func (o *reportingPluginClient) Reports(seqNr uint64, outcome ocr3types.Outcome) ([]ocr3types.ReportWithInfo[[]byte], error) {
ctx, cancel := o.StopCtx()
defer cancel()
func (o *reportingPluginClient) Reports(ctx context.Context, seqNr uint64, outcome ocr3types.Outcome) ([]ocr3types.ReportWithInfo[[]byte], error) {
reply, err := o.grpc.Reports(ctx, &ocr3.ReportsRequest{
SeqNr: seqNr,
Outcome: outcome,
Expand Down Expand Up @@ -265,12 +255,12 @@ func (o *reportingPluginServer) ValidateObservation(ctx context.Context, request
if err != nil {
return nil, err
}
err = o.impl.ValidateObservation(outcomeContext(request.OutcomeContext), request.Query, ao)
err = o.impl.ValidateObservation(ctx, outcomeContext(request.OutcomeContext), request.Query, ao)
return new(emptypb.Empty), err
}

func (o *reportingPluginServer) ObservationQuorum(ctx context.Context, request *ocr3.ObservationQuorumRequest) (*ocr3.ObservationQuorumReply, error) {
oq, err := o.impl.ObservationQuorum(outcomeContext(request.OutcomeContext), request.Query)
oq, err := o.impl.ObservationQuorum(ctx, outcomeContext(request.OutcomeContext), request.Query)
if err != nil {
return nil, err
}
Expand All @@ -282,7 +272,7 @@ func (o *reportingPluginServer) Outcome(ctx context.Context, request *ocr3.Outco
if err != nil {
return nil, err
}
out, err := o.impl.Outcome(outcomeContext(request.OutcomeContext), request.Query, aos)
out, err := o.impl.Outcome(ctx, outcomeContext(request.OutcomeContext), request.Query, aos)
if err != nil {
return nil, err
}
Expand All @@ -292,7 +282,7 @@ func (o *reportingPluginServer) Outcome(ctx context.Context, request *ocr3.Outco
}

func (o *reportingPluginServer) Reports(ctx context.Context, request *ocr3.ReportsRequest) (*ocr3.ReportsReply, error) {
ri, err := o.impl.Reports(request.SeqNr, request.Outcome)
ri, err := o.impl.Reports(ctx, request.SeqNr, request.Outcome)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (o ocr3staticPluginFactory) Ready() error { panic("implement me") }

func (o ocr3staticPluginFactory) HealthReport() map[string]error { panic("implement me") }

func (o ocr3staticPluginFactory) NewReportingPlugin(config ocr3types.ReportingPluginConfig) (ocr3types.ReportingPlugin[[]byte], ocr3types.ReportingPluginInfo, error) {
func (o ocr3staticPluginFactory) NewReportingPlugin(ctx context.Context, config ocr3types.ReportingPluginConfig) (ocr3types.ReportingPlugin[[]byte], ocr3types.ReportingPluginInfo, error) {
err := o.equalConfig(config)
if err != nil {
return nil, ocr3types.ReportingPluginInfo{}, fmt.Errorf("config mismatch: %w", err)
Expand Down Expand Up @@ -89,7 +89,7 @@ func OCR3ReportingPluginFactory(t *testing.T, factory core.OCR3ReportingPluginFa
expectedFactory := Factory
t.Run("OCR3ReportingPluginFactory", func(t *testing.T) {
ctx := tests.Context(t)
rp, gotRPI, err := factory.NewReportingPlugin(ocr3reportingPluginConfig)
rp, gotRPI, err := factory.NewReportingPlugin(ctx, ocr3reportingPluginConfig)
require.NoError(t, err)
assert.Equal(t, ocr3rpi, gotRPI)
t.Cleanup(func() { assert.NoError(t, rp.Close()) })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (s ocr3staticReportingPlugin) Observation(ctx context.Context, outcomeCtx o
return s.observationResponse.observation, nil
}

func (s ocr3staticReportingPlugin) ValidateObservation(outcomeCtx ocr3types.OutcomeContext, q libocr.Query, a libocr.AttributedObservation) error {
func (s ocr3staticReportingPlugin) ValidateObservation(ctx context.Context, outcomeCtx ocr3types.OutcomeContext, q libocr.Query, a libocr.AttributedObservation) error {
err := s.checkOutCtx(outcomeCtx)
if err != nil {
return err
Expand All @@ -220,7 +220,7 @@ func (s ocr3staticReportingPlugin) ValidateObservation(outcomeCtx ocr3types.Outc
return nil
}

func (s ocr3staticReportingPlugin) ObservationQuorum(outcomeCtx ocr3types.OutcomeContext, q libocr.Query) (ocr3types.Quorum, error) {
func (s ocr3staticReportingPlugin) ObservationQuorum(ctx context.Context, outcomeCtx ocr3types.OutcomeContext, q libocr.Query) (ocr3types.Quorum, error) {
err := s.checkOutCtx(outcomeCtx)
if err != nil {
return ocr3types.Quorum(0), err
Expand All @@ -232,7 +232,7 @@ func (s ocr3staticReportingPlugin) ObservationQuorum(outcomeCtx ocr3types.Outcom
return s.observationQuorumResponse.quorum, nil
}

func (s ocr3staticReportingPlugin) Outcome(outcomeCtx ocr3types.OutcomeContext, q libocr.Query, aos []libocr.AttributedObservation) (ocr3types.Outcome, error) {
func (s ocr3staticReportingPlugin) Outcome(ctx context.Context, outcomeCtx ocr3types.OutcomeContext, q libocr.Query, aos []libocr.AttributedObservation) (ocr3types.Outcome, error) {
err := s.checkOutCtx(outcomeCtx)
if err != nil {
return nil, err
Expand All @@ -247,7 +247,7 @@ func (s ocr3staticReportingPlugin) Outcome(outcomeCtx ocr3types.OutcomeContext,
return s.outcomeResponse.outcome, nil
}

func (s ocr3staticReportingPlugin) Reports(seq uint64, o ocr3types.Outcome) ([]ocr3types.ReportWithInfo[[]byte], error) {
func (s ocr3staticReportingPlugin) Reports(ctx context.Context, seq uint64, o ocr3types.Outcome) ([]ocr3types.ReportWithInfo[[]byte], error) {
if seq != s.reportsRequest.seq {
return nil, fmt.Errorf("expected %x but got %x", s.reportsRequest.seq, seq)
}
Expand Down Expand Up @@ -300,18 +300,18 @@ func (s ocr3staticReportingPlugin) AssertEqual(ctx context.Context, t *testing.T
require.NoError(t, err)
assert.Equal(t, s.observationResponse.observation, gotObs)

err = rp.ValidateObservation(s.validateObservationRequest.outcomeCtx, s.validateObservationRequest.query, s.validateObservationRequest.attributedObservation)
err = rp.ValidateObservation(ctx, s.validateObservationRequest.outcomeCtx, s.validateObservationRequest.query, s.validateObservationRequest.attributedObservation)
require.NoError(t, err)

gotQuorum, err := rp.ObservationQuorum(s.observationQuorumRequest.outcomeCtx, s.observationQuorumRequest.query)
gotQuorum, err := rp.ObservationQuorum(ctx, s.observationQuorumRequest.outcomeCtx, s.observationQuorumRequest.query)
require.NoError(t, err)
assert.Equal(t, s.observationQuorumResponse.quorum, gotQuorum)

gotOutcome, err := rp.Outcome(s.outcomeRequest.outcomeCtx, s.outcomeRequest.query, s.outcomeRequest.observations)
gotOutcome, err := rp.Outcome(ctx, s.outcomeRequest.outcomeCtx, s.outcomeRequest.query, s.outcomeRequest.observations)
require.NoError(t, err)
assert.Equal(t, s.outcomeResponse.outcome, gotOutcome)

gotRI, err := rp.Reports(s.reportsRequest.seq, s.reportsRequest.outcome)
gotRI, err := rp.Reports(ctx, s.reportsRequest.seq, s.reportsRequest.outcome)
require.NoError(t, err)
assert.Equal(t, s.reportsResponse.reportWithInfo, gotRI)

Expand Down
Loading

0 comments on commit f441fe3

Please sign in to comment.