-
Notifications
You must be signed in to change notification settings - Fork 225
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add v3 version of nats jetstream protocol with integration tests and …
…samples Signed-off-by: stephen-totty-hpe <[email protected]>
- Loading branch information
1 parent
2db66e0
commit 0e2da9c
Showing
17 changed files
with
1,641 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
module github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3 | ||
|
||
go 1.18 | ||
|
||
replace github.com/cloudevents/sdk-go/v2 => ../../../v2 | ||
|
||
require ( | ||
github.com/cloudevents/sdk-go/v2 v2.15.2 | ||
github.com/nats-io/nats.go v1.37.0 | ||
) | ||
|
||
require ( | ||
github.com/davecgh/go-spew v1.1.1 // indirect | ||
github.com/google/go-cmp v0.5.0 // indirect | ||
github.com/json-iterator/go v1.1.12 // indirect | ||
github.com/klauspost/compress v1.17.9 // indirect | ||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect | ||
github.com/modern-go/reflect2 v1.0.2 // indirect | ||
github.com/nats-io/nkeys v0.4.7 // indirect | ||
github.com/nats-io/nuid v1.0.1 // indirect | ||
github.com/pmezard/go-difflib v1.0.0 // indirect | ||
github.com/stretchr/testify v1.8.0 // indirect | ||
golang.org/x/crypto v0.27.0 // indirect | ||
golang.org/x/sys v0.25.0 // indirect | ||
golang.org/x/text v0.18.0 // indirect | ||
gopkg.in/yaml.v3 v3.0.1 // indirect | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | ||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= | ||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | ||
github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= | ||
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= | ||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= | ||
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= | ||
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= | ||
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= | ||
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= | ||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= | ||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= | ||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= | ||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= | ||
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= | ||
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= | ||
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= | ||
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= | ||
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= | ||
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= | ||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= | ||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= | ||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= | ||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= | ||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= | ||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= | ||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= | ||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= | ||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= | ||
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= | ||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= | ||
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= | ||
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= | ||
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= | ||
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= | ||
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= | ||
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= | ||
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= | ||
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= | ||
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= | ||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= | ||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= | ||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= | ||
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= | ||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= | ||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= | ||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
/* | ||
Copyright 2024 The CloudEvents Authors | ||
SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package nats_jetstream | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"errors" | ||
"fmt" | ||
"strings" | ||
|
||
"github.com/nats-io/nats.go/jetstream" | ||
|
||
"github.com/cloudevents/sdk-go/v2/binding" | ||
"github.com/cloudevents/sdk-go/v2/binding/format" | ||
"github.com/cloudevents/sdk-go/v2/binding/spec" | ||
) | ||
|
||
const ( | ||
// see https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/nats-protocol-binding.md | ||
prefix = "ce-" | ||
contentTypeHeader = "content-type" | ||
) | ||
|
||
var ( | ||
specs = spec.WithPrefix(prefix) | ||
|
||
// ErrNoVersion returned when no version header is found in the protocol header. | ||
ErrNoVersion = errors.New("message does not contain version header") | ||
) | ||
|
||
// Message implements binding.Message by wrapping an jetstream.Msg. | ||
// This message *can* be read several times safely | ||
type Message struct { | ||
Msg jetstream.Msg | ||
encoding binding.Encoding | ||
} | ||
|
||
// NewMessage wraps an *nats.Msg in a binding.Message. | ||
// The returned message *can* be read several times safely | ||
// The default encoding returned is EncodingStructured unless the NATS message contains a specversion header. | ||
func NewMessage(msg jetstream.Msg) *Message { | ||
encoding := binding.EncodingStructured | ||
if msg.Headers() != nil { | ||
if msg.Headers().Get(specs.PrefixedSpecVersionName()) != "" { | ||
encoding = binding.EncodingBinary | ||
} | ||
} | ||
return &Message{Msg: msg, encoding: encoding} | ||
} | ||
|
||
var _ binding.Message = (*Message)(nil) | ||
|
||
// ReadEncoding return the type of the message Encoding. | ||
func (m *Message) ReadEncoding() binding.Encoding { | ||
return m.encoding | ||
} | ||
|
||
// ReadStructured transfers a structured-mode event to a StructuredWriter. | ||
func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error { | ||
if m.encoding != binding.EncodingStructured { | ||
return binding.ErrNotStructured | ||
} | ||
return encoder.SetStructuredEvent(ctx, format.JSON, bytes.NewReader(m.Msg.Data())) | ||
} | ||
|
||
// ReadBinary transfers a binary-mode event to an BinaryWriter. | ||
func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error { | ||
if m.encoding != binding.EncodingBinary { | ||
return binding.ErrNotBinary | ||
} | ||
|
||
version := m.GetVersion() | ||
if version == nil { | ||
return ErrNoVersion | ||
} | ||
|
||
var err error | ||
for k, v := range m.Msg.Headers() { | ||
headerValue := v[0] | ||
if strings.HasPrefix(k, prefix) { | ||
attr := version.Attribute(k) | ||
if attr != nil { | ||
err = encoder.SetAttribute(attr, headerValue) | ||
} else { | ||
err = encoder.SetExtension(strings.TrimPrefix(k, prefix), headerValue) | ||
} | ||
} else if k == contentTypeHeader { | ||
err = encoder.SetAttribute(version.AttributeFromKind(spec.DataContentType), headerValue) | ||
} | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
if m.Msg.Data() != nil { | ||
err = encoder.SetData(bytes.NewBuffer(m.Msg.Data())) | ||
} | ||
|
||
return err | ||
} | ||
|
||
// Finish *must* be called when message from a Receiver can be forgotten by the receiver. | ||
func (m *Message) Finish(err error) error { | ||
return nil | ||
} | ||
|
||
// GetAttribute implements binding.MessageMetadataReader | ||
func (m *Message) GetAttribute(attributeKind spec.Kind) (spec.Attribute, interface{}) { | ||
key := withPrefix(attributeKind.String()) | ||
if m.Msg.Headers() != nil { | ||
version := m.GetVersion() | ||
headerValue := m.Msg.Headers().Get(key) | ||
if headerValue != "" { | ||
return version.Attribute(key), headerValue | ||
} | ||
return version.Attribute(key), nil | ||
} | ||
// if the headers are nil, the version is also nil. Therefore return nil. | ||
return nil, nil | ||
} | ||
|
||
// GetExtension implements binding.MessageMetadataReader | ||
func (m *Message) GetExtension(name string) interface{} { | ||
key := withPrefix(name) | ||
if m.Msg.Headers() != nil { | ||
headerValue := m.Msg.Headers().Get(key) | ||
if headerValue != "" { | ||
return headerValue | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
// GetVersion looks for specVersion header and returns a Version object | ||
func (m *Message) GetVersion() spec.Version { | ||
if m.Msg.Headers() == nil { | ||
return nil | ||
} | ||
versionValue := m.Msg.Headers().Get(specs.PrefixedSpecVersionName()) | ||
if versionValue == "" { | ||
return nil | ||
} | ||
return specs.Version(versionValue) | ||
} | ||
|
||
// withPrefix prepends the prefix to the attribute name | ||
func withPrefix(attributeName string) string { | ||
return fmt.Sprintf("%s%s", prefix, attributeName) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
/* | ||
Copyright 2024 The CloudEvents Authors | ||
SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package nats_jetstream | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"testing" | ||
|
||
"github.com/cloudevents/sdk-go/v2/binding/spec" | ||
bindingtest "github.com/cloudevents/sdk-go/v2/binding/test" | ||
|
||
"github.com/cloudevents/sdk-go/v2/binding" | ||
"github.com/cloudevents/sdk-go/v2/test" | ||
"github.com/nats-io/nats.go" | ||
"github.com/nats-io/nats.go/jetstream" | ||
) | ||
|
||
type jetStreamMsg struct { | ||
jetstream.Msg | ||
msg *nats.Msg | ||
} | ||
|
||
func (j *jetStreamMsg) Data() []byte { return j.msg.Data } | ||
func (j *jetStreamMsg) Headers() nats.Header { return j.msg.Header } | ||
|
||
var ( | ||
outBinaryMessage = bindingtest.MockBinaryMessage{ | ||
Metadata: map[spec.Attribute]interface{}{}, | ||
Extensions: map[string]interface{}{}, | ||
} | ||
outStructMessage = bindingtest.MockStructuredMessage{} | ||
|
||
testEvent = test.FullEvent() | ||
binaryData, _ = json.Marshal(map[string]string{ | ||
"ce_type": testEvent.Type(), | ||
"ce_source": testEvent.Source(), | ||
"ce_id": testEvent.ID(), | ||
"ce_time": test.Timestamp.String(), | ||
"ce_specversion": "1.0", | ||
"ce_dataschema": test.Schema.String(), | ||
"ce_datacontenttype": "text/json", | ||
"ce_subject": "receiverTopic", | ||
"ce_exta": "someext", | ||
}) | ||
structuredReceiverMessage = &jetStreamMsg{ | ||
msg: &nats.Msg{ | ||
Subject: "hello", | ||
Data: binaryData, | ||
}, | ||
} | ||
binaryReceiverMessage = &jetStreamMsg{ | ||
msg: &nats.Msg{ | ||
Subject: "hello", | ||
Data: testEvent.Data(), | ||
Header: nats.Header{ | ||
"ce-type": {testEvent.Type()}, | ||
"ce-source": {testEvent.Source()}, | ||
"ce-id": {testEvent.ID()}, | ||
"ce-time": {test.Timestamp.String()}, | ||
"ce-specversion": {"1.0"}, | ||
"ce-dataschema": {test.Schema.String()}, | ||
"ce-datacontenttype": {"text/json"}, | ||
"ce-subject": {"receiverTopic"}, | ||
"ce-exta": {"someext"}, | ||
}, | ||
}, | ||
} | ||
) | ||
|
||
func TestNewMessage(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
receiverMessage jetstream.Msg | ||
expectedEncoding binding.Encoding | ||
expectedStructuredError error | ||
expectedBinaryError error | ||
}{ | ||
{ | ||
name: "Structured encoding", | ||
receiverMessage: structuredReceiverMessage, | ||
expectedEncoding: binding.EncodingStructured, | ||
expectedStructuredError: nil, | ||
expectedBinaryError: binding.ErrNotBinary, | ||
}, | ||
{ | ||
name: "Binary encoding", | ||
receiverMessage: binaryReceiverMessage, | ||
expectedEncoding: binding.EncodingBinary, | ||
expectedStructuredError: binding.ErrNotStructured, | ||
expectedBinaryError: nil, | ||
}, | ||
} | ||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
got := NewMessage(tt.receiverMessage) | ||
if got == nil { | ||
t.Errorf("Error in NewMessage!") | ||
} | ||
err := got.ReadBinary(context.TODO(), &outBinaryMessage) | ||
if err != tt.expectedBinaryError { | ||
t.Errorf("ReadBinary err:%s", err.Error()) | ||
} | ||
err = got.ReadStructured(context.TODO(), &outStructMessage) | ||
if err != tt.expectedStructuredError { | ||
t.Errorf("ReadStructured err:%s", err.Error()) | ||
} | ||
if got.ReadEncoding() != tt.expectedEncoding { | ||
t.Errorf("ExpectedEncoding %s, while got %s", tt.expectedEncoding, got.ReadEncoding()) | ||
} | ||
}) | ||
} | ||
} |
Oops, something went wrong.