From ca30bc6630fcd04999b2c69e6fc49cfbb5903aba Mon Sep 17 00:00:00 2001 From: engnke Date: Fri, 20 Jun 2025 12:26:14 -0400 Subject: [PATCH 1/5] Update chip-ingress client to accept and extract additional attributes --- pkg/beholder/chip_ingress_emitter.go | 18 +++++- pkg/beholder/chip_ingress_emitter_test.go | 67 ++++++++++++++++++++++- pkg/chipingress/client.go | 27 ++++++++- 3 files changed, 109 insertions(+), 3 deletions(-) diff --git a/pkg/beholder/chip_ingress_emitter.go b/pkg/beholder/chip_ingress_emitter.go index 0ca3e991a..f1e6d2a0f 100644 --- a/pkg/beholder/chip_ingress_emitter.go +++ b/pkg/beholder/chip_ingress_emitter.go @@ -27,7 +27,12 @@ func (c *ChipIngressEmitter) Emit(ctx context.Context, body []byte, attrKVs ...a return err } - event, err := chipingress.NewEvent(sourceDomain, entityType, body) + attributes, err := ExtractAttributes(attrKVs...) + if err != nil { + return err + } + + event, err := chipingress.NewEvent(sourceDomain, entityType, body, attributes) if err != nil { return err } @@ -73,3 +78,14 @@ func ExtractSourceAndType(attrKVs ...any) (string, string, error) { return sourceDomain, entityType, nil } + +func ExtractAttributes(attrKVs ...any) (map[string]any, error) { + attributes := newAttributes(attrKVs...) + + attributesMap := make(map[string]any) + for key, value := range attributes { + attributesMap[key] = value + } + + return attributesMap, nil +} diff --git a/pkg/beholder/chip_ingress_emitter_test.go b/pkg/beholder/chip_ingress_emitter_test.go index 9798b6fff..dbdf4e1c9 100644 --- a/pkg/beholder/chip_ingress_emitter_test.go +++ b/pkg/beholder/chip_ingress_emitter_test.go @@ -2,6 +2,7 @@ package beholder_test import ( "testing" + "time" "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks" @@ -30,6 +31,12 @@ func TestChipIngressEmit(t *testing.T) { body := []byte("test body") domain := "test-domain" entity := "test-entity" + attributes := map[string]any{ + "datacontenttype": "application/protobuf", + "dataschema": "/schemas/ids/1001", + "subject": "example-subject", + "time": time.Now(), + } t.Run("happy path", func(t *testing.T) { @@ -42,7 +49,7 @@ func TestChipIngressEmit(t *testing.T) { emitter, err := beholder.NewChipIngressEmitter(clientMock) require.NoError(t, err) - err = emitter.Emit(t.Context(), body, beholder.AttrKeyDomain, domain, beholder.AttrKeyEntity, entity) + err = emitter.Emit(t.Context(), body, beholder.AttrKeyDomain, domain, beholder.AttrKeyEntity, entity, attributes) require.NoError(t, err) clientMock.AssertExpectations(t) @@ -217,3 +224,61 @@ func TestExtractSourceAndType(t *testing.T) { }) } } + +func TestExtractAttributes(t *testing.T) { + now := time.Now() + tests := []struct { + name string + attrs []any + wantAttributes map[string]any + wantErr bool + expectedError string + }{ + { + name: "valid attributes with specific keys", + attrs: []any{map[string]any{ + "datacontenttype": "application/protobuf", + "dataschema": "/schemas/ids/1001", + "subject": "example-subject", + "time": now, + }}, + wantAttributes: map[string]any{ + "datacontenttype": "application/protobuf", + "dataschema": "/schemas/ids/1001", + "subject": "example-subject", + "time": now, + }, + wantErr: false, + }, + { + name: "happy path - empty attributes", + attrs: []any{}, + wantAttributes: map[string]any{}, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotAttributes, err := beholder.ExtractAttributes(tt.attrs...) + + if tt.wantErr { + if err == nil { + t.Errorf("extractAttributes() error = nil, want error") + return + } + if tt.expectedError != "" && tt.expectedError != err.Error() { + t.Errorf("extractAttributes() error = %v, want %v", err, tt.expectedError) + } + return + } + + if err != nil { + t.Errorf("extractAttributes() unexpected error = %v", err) + return + } + + assert.Equal(t, tt.wantAttributes, gotAttributes) + }) + } +} diff --git a/pkg/chipingress/client.go b/pkg/chipingress/client.go index 7236c27fd..da7bee74b 100644 --- a/pkg/chipingress/client.go +++ b/pkg/chipingress/client.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "fmt" "strings" + t "time" "github.com/google/uuid" "go.uber.org/zap" @@ -248,13 +249,37 @@ func validateEvents(events []ce.Event) error { return nil } -func NewEvent(domain, entity string, payload []byte) (ce.Event, error) { +func NewEvent(domain, entity string, payload []byte, attributes map[string]any) (ce.Event, error) { event := ce.NewEvent() event.SetSource(domain) event.SetType(entity) event.SetID(uuid.New().String()) + now := t.Now().UTC() + event.SetExtension("recordedtime", ce.Timestamp{Time: now}) + + // Set optional attributes if provided + if attributes == nil { + attributes = make(map[string]any) + } + + if val, ok := attributes["time"].(t.Time); ok { + event.SetTime(val.UTC()) + } else { + event.SetTime(now) + } + + if val, ok := attributes["datacontenttype"].(string); ok { + event.SetDataContentType(val) + } + if val, ok := attributes["dataschema"].(string); ok { + event.SetDataSchema(val) + } + if val, ok := attributes["subject"].(string); ok { + event.SetSubject(val) + } + err := event.SetData(ceformat.ContentTypeProtobuf, payload) if err != nil { return ce.Event{}, fmt.Errorf("could not set data on event: %w", err) From a8dc24cdcb6f29bbef38e8025a2a768f5237a9c4 Mon Sep 17 00:00:00 2001 From: engnke Date: Fri, 20 Jun 2025 12:39:49 -0400 Subject: [PATCH 2/5] update test cases to include attributes --- pkg/chipingress/client_test.go | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/pkg/chipingress/client_test.go b/pkg/chipingress/client_test.go index 5639a32a7..8a609ec00 100644 --- a/pkg/chipingress/client_test.go +++ b/pkg/chipingress/client_test.go @@ -3,6 +3,7 @@ package chipingress import ( "context" "testing" + "time" ce "github.com/cloudevents/sdk-go/v2" "github.com/stretchr/testify/assert" @@ -67,7 +68,7 @@ func TestClient(t *testing.T) { testProto := pb.PingResponse{Message: "testing"} protoBytes, err := proto.Marshal(&testProto) require.NoError(t, err) - event, err := NewEvent("some-domain_here", "platform.on_chain.forwarder.ReportProcessed", protoBytes) + event, err := NewEvent("some-domain_here", "platform.on_chain.forwarder.ReportProcessed", protoBytes, nil) require.NoError(t, err) // Publish event @@ -160,8 +161,14 @@ func TestNewEvent(t *testing.T) { // Create new event testProto := pb.PingResponse{Message: "testing"} protoBytes, err := proto.Marshal(&testProto) + attributes := map[string]any{ + "datacontenttype": "application/protobuf", + "dataschema": "https://example.com/schema", + "subject": "example-subject", + "time": time.Now(), + } assert.NoError(t, err) - event, err := NewEvent("some-domain_here", "platform.on_chain.forwarder.ReportProcessed", protoBytes) + event, err := NewEvent("some-domain_here", "platform.on_chain.forwarder.ReportProcessed", protoBytes, attributes) assert.NoError(t, err) // There should be no validation errors @@ -172,6 +179,11 @@ func TestNewEvent(t *testing.T) { assert.Equal(t, "some-domain_here", event.Source()) assert.Equal(t, "platform.on_chain.forwarder.ReportProcessed", event.Type()) assert.NotEmpty(t, event.ID()) + assert.Equal(t, "application/protobuf", event.DataContentType()) + assert.Equal(t, "https://example.com/schema", event.DataSchema()) + assert.Equal(t, "example-subject", event.Subject()) + assert.Equal(t, attributes["time"].(time.Time).UTC(), event.Time()) + assert.NotEmpty(t, event.Extensions()["recordedtime"]) // Assert the event data was set as expected var resultProto pb.PingResponse @@ -220,13 +232,13 @@ func TestPublishBatch(t *testing.T) { testProto1 := pb.PingResponse{Message: "testing1"} protoBytes1, err := proto.Marshal(&testProto1) require.NoError(t, err) - event1, err := NewEvent("domain1", "entity.event1", protoBytes1) + event1, err := NewEvent("domain1", "entity.event1", protoBytes1, nil) require.NoError(t, err) testProto2 := pb.PingResponse{Message: "testing2"} protoBytes2, err := proto.Marshal(&testProto2) require.NoError(t, err) - event2, err := NewEvent("domain2", "entity.event2", protoBytes2) + event2, err := NewEvent("domain2", "entity.event2", protoBytes2, nil) require.NoError(t, err) events := []ce.Event{event1, event2} @@ -270,7 +282,7 @@ func TestPublishBatch(t *testing.T) { testProto := pb.PingResponse{Message: "testing"} protoBytes, err := proto.Marshal(&testProto) require.NoError(t, err) - event, err := NewEvent("domain", "entity.event", protoBytes) + event, err := NewEvent("domain", "entity.event", protoBytes, nil) require.NoError(t, err) events := []ce.Event{event} From 63733ce3b687ea783986f26d1d228ec30904b16f Mon Sep 17 00:00:00 2001 From: engnke Date: Tue, 24 Jun 2025 09:42:00 -0400 Subject: [PATCH 3/5] minor refactoring --- pkg/beholder/chip_ingress_emitter.go | 9 +++------ pkg/beholder/chip_ingress_emitter_test.go | 18 +----------------- 2 files changed, 4 insertions(+), 23 deletions(-) diff --git a/pkg/beholder/chip_ingress_emitter.go b/pkg/beholder/chip_ingress_emitter.go index f1e6d2a0f..41bc27ebc 100644 --- a/pkg/beholder/chip_ingress_emitter.go +++ b/pkg/beholder/chip_ingress_emitter.go @@ -27,10 +27,7 @@ func (c *ChipIngressEmitter) Emit(ctx context.Context, body []byte, attrKVs ...a return err } - attributes, err := ExtractAttributes(attrKVs...) - if err != nil { - return err - } + attributes := ExtractAttributes(attrKVs...) event, err := chipingress.NewEvent(sourceDomain, entityType, body, attributes) if err != nil { @@ -79,7 +76,7 @@ func ExtractSourceAndType(attrKVs ...any) (string, string, error) { return sourceDomain, entityType, nil } -func ExtractAttributes(attrKVs ...any) (map[string]any, error) { +func ExtractAttributes(attrKVs ...any) map[string]any { attributes := newAttributes(attrKVs...) attributesMap := make(map[string]any) @@ -87,5 +84,5 @@ func ExtractAttributes(attrKVs ...any) (map[string]any, error) { attributesMap[key] = value } - return attributesMap, nil + return attributesMap } diff --git a/pkg/beholder/chip_ingress_emitter_test.go b/pkg/beholder/chip_ingress_emitter_test.go index dbdf4e1c9..9c572d493 100644 --- a/pkg/beholder/chip_ingress_emitter_test.go +++ b/pkg/beholder/chip_ingress_emitter_test.go @@ -260,23 +260,7 @@ func TestExtractAttributes(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - gotAttributes, err := beholder.ExtractAttributes(tt.attrs...) - - if tt.wantErr { - if err == nil { - t.Errorf("extractAttributes() error = nil, want error") - return - } - if tt.expectedError != "" && tt.expectedError != err.Error() { - t.Errorf("extractAttributes() error = %v, want %v", err, tt.expectedError) - } - return - } - - if err != nil { - t.Errorf("extractAttributes() unexpected error = %v", err) - return - } + gotAttributes := beholder.ExtractAttributes(tt.attrs...) assert.Equal(t, tt.wantAttributes, gotAttributes) }) From 2984fb1da9b4dd10d270594e92834317b5a04025 Mon Sep 17 00:00:00 2001 From: engnke Date: Thu, 26 Jun 2025 13:57:15 -0400 Subject: [PATCH 4/5] update to accept recordedtime attribute --- pkg/beholder/chip_ingress_emitter_test.go | 2 ++ pkg/chipingress/client.go | 12 ++++++------ pkg/chipingress/client_test.go | 2 ++ 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/pkg/beholder/chip_ingress_emitter_test.go b/pkg/beholder/chip_ingress_emitter_test.go index 9c572d493..c632b9b00 100644 --- a/pkg/beholder/chip_ingress_emitter_test.go +++ b/pkg/beholder/chip_ingress_emitter_test.go @@ -241,12 +241,14 @@ func TestExtractAttributes(t *testing.T) { "dataschema": "/schemas/ids/1001", "subject": "example-subject", "time": now, + "recordedtime": now, }}, wantAttributes: map[string]any{ "datacontenttype": "application/protobuf", "dataschema": "/schemas/ids/1001", "subject": "example-subject", "time": now, + "recordedtime": now, }, wantErr: false, }, diff --git a/pkg/chipingress/client.go b/pkg/chipingress/client.go index da7bee74b..4fb4ec2b8 100644 --- a/pkg/chipingress/client.go +++ b/pkg/chipingress/client.go @@ -256,20 +256,20 @@ func NewEvent(domain, entity string, payload []byte, attributes map[string]any) event.SetType(entity) event.SetID(uuid.New().String()) - now := t.Now().UTC() - event.SetExtension("recordedtime", ce.Timestamp{Time: now}) - // Set optional attributes if provided if attributes == nil { attributes = make(map[string]any) } - if val, ok := attributes["time"].(t.Time); ok { - event.SetTime(val.UTC()) + if val, ok := attributes["recordedtime"].(t.Time); ok { + event.SetExtension("recordedtime", val) } else { - event.SetTime(now) + event.SetExtension("recordedtime", ce.Timestamp{Time: t.Now().UTC()}) } + if val, ok := attributes["time"].(t.Time); ok { + event.SetTime(val.UTC()) + } if val, ok := attributes["datacontenttype"].(string); ok { event.SetDataContentType(val) } diff --git a/pkg/chipingress/client_test.go b/pkg/chipingress/client_test.go index 8a609ec00..d8af63361 100644 --- a/pkg/chipingress/client_test.go +++ b/pkg/chipingress/client_test.go @@ -184,6 +184,8 @@ func TestNewEvent(t *testing.T) { assert.Equal(t, "example-subject", event.Subject()) assert.Equal(t, attributes["time"].(time.Time).UTC(), event.Time()) assert.NotEmpty(t, event.Extensions()["recordedtime"]) + assert.NotEmpty(t, event.Extensions()["recordedtime"]) + assert.True(t, event.Extensions()["recordedtime"].(ce.Timestamp).Time.After(attributes["time"].(time.Time))) // Assert the event data was set as expected var resultProto pb.PingResponse From f45f81cc197a5fea87f42fbf973854ceb5d968b2 Mon Sep 17 00:00:00 2001 From: engnke Date: Thu, 26 Jun 2025 21:50:37 -0400 Subject: [PATCH 5/5] time attributes in utc --- pkg/chipingress/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/chipingress/client.go b/pkg/chipingress/client.go index 4fb4ec2b8..fe8fd70f0 100644 --- a/pkg/chipingress/client.go +++ b/pkg/chipingress/client.go @@ -262,7 +262,7 @@ func NewEvent(domain, entity string, payload []byte, attributes map[string]any) } if val, ok := attributes["recordedtime"].(t.Time); ok { - event.SetExtension("recordedtime", val) + event.SetExtension("recordedtime", val.UTC()) } else { event.SetExtension("recordedtime", ce.Timestamp{Time: t.Now().UTC()}) }