Skip to content

Commit

Permalink
Stream pkg system test
Browse files Browse the repository at this point in the history
Outstanding Todos:
- Include smart producer public api calls when producer is merged
- Ensure the number of messages consumed is the same as the ones
  produced
  • Loading branch information
ablease committed Dec 5, 2023
1 parent db5a7d1 commit fa93b35
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 3 deletions.
42 changes: 39 additions & 3 deletions pkg/stream/stream_suite_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package stream_test

import (
"bytes"
"encoding/binary"
"github.com/onsi/gomega/gbytes"
"github.com/onsi/gomega/gexec"
"log"
Expand All @@ -25,6 +27,40 @@ const (
containerName string = "rabbitmq-stream-go-client"
)

type plainTextMessage struct {
body string
}

func (p *plainTextMessage) UnmarshalBinary(data []byte) error {
var dataLen uint32
err := binary.Read(bytes.NewReader(data), binary.BigEndian, &dataLen)
if err != nil {
return err
}

p.body = string(data[4 : dataLen+4])

return nil
}

func (p *plainTextMessage) MarshalBinary() ([]byte, error) {
buff := new(bytes.Buffer)
err := binary.Write(buff, binary.BigEndian, uint32(len(p.body)))
if err != nil {
return nil, err
}
err = binary.Write(buff, binary.BigEndian, []byte(p.body))
if err != nil {
return nil, err
}

return buff.Bytes(), nil
}

func (p *plainTextMessage) Body() string {
return p.body
}

var _ = SynchronizedBeforeSuite(func() {
// Just once
logger := log.New(GinkgoWriter, "[SBS] ", log.Ldate|log.Lmsgprefix)
Expand All @@ -49,20 +85,20 @@ var _ = SynchronizedBeforeSuite(func() {
session, _ := gexec.Start(cmd, bufErr, bufErr)
session.Wait()
return bufErr
}).WithPolling(time.Second).WithTimeout(time.Second*10).
}).WithPolling(time.Second).WithTimeout(time.Second*60).
Should(gbytes.Say("rabbit"), "expected epmd to report rabbit app as running")

awaitStartArgs := strings.Split("exec --user rabbitmq -i "+containerName+" rabbitmqctl await_startup", " ")
awaitCmd := exec.Command("docker", awaitStartArgs...)
awaitSession, err := gexec.Start(awaitCmd, GinkgoWriter, GinkgoWriter)
Expect(err).ToNot(HaveOccurred())
awaitSession.Wait(time.Second * 10)
awaitSession.Wait(time.Second * 60)

enablePluginArgs := strings.Split("exec --user rabbitmq -i "+containerName+" rabbitmq-plugins enable rabbitmq_stream", " ")
enablePluginCmd := exec.Command("docker", enablePluginArgs...)
enablePluginSession, err := gexec.Start(enablePluginCmd, GinkgoWriter, GinkgoWriter)
Expect(err).ToNot(HaveOccurred())
enablePluginSession.Wait(time.Second * 10)
enablePluginSession.Wait(time.Second * 60)
}, func() {
// All processes
})
Expand Down
125 changes: 125 additions & 0 deletions pkg/stream/system_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,128 @@
//go:build rabbitmq.stream.system_test

package stream_test

import (
"fmt"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gmeasure"
"github.com/rabbitmq/rabbitmq-stream-go-client/v2/pkg/codecs/amqp"
"github.com/rabbitmq/rabbitmq-stream-go-client/v2/pkg/common"
"github.com/rabbitmq/rabbitmq-stream-go-client/v2/pkg/raw"
"github.com/rabbitmq/rabbitmq-stream-go-client/v2/pkg/stream"
"golang.org/x/exp/slog"
"sync"
"time"
)

var _ = Describe("stream package", func() {
const (
streamName = "stream-system-test"
// 100 byte message
messageBody = "Rabbitmq-is-awesomeRabbitmq-is-awesomeRabbitmq-is-awesomeRabbitmq-is-awesomeRabbitmq-is-awesome!!!!!"
defaultRabbitmqUri = "rabbitmq-stream://guest:guest@localhost/%2F"
)
It("can create and connect to a stream, publish and receive messages", func(ctx SpecContext) {
h := slog.HandlerOptions{Level: slog.LevelDebug}.NewTextHandler(GinkgoWriter)
debugLogger := slog.New(h)
itCtx := raw.NewContextWithLogger(ctx, *debugLogger)

By("creating a new environment")
envConfig := stream.NewEnvironmentConfiguration(
stream.WithUri(defaultRabbitmqUri))
env, err := stream.NewEnvironment(itCtx, envConfig)
Expect(err).NotTo(HaveOccurred())

By("creating a stream")
csOpts := stream.CreateStreamOptions{time.Second * 60, stream.Kilobyte, stream.Megabyte}
Expect(env.CreateStream(itCtx, streamName, csOpts)).To(Succeed())

// See https://github.com/rabbitmq/rabbitmq-stream-go-client/pull/255
// TODO Change this By when stream pkg producer is megered
By("creating a raw client producer")
throughputExp := gmeasure.NewExperiment("100-byte message throughput")
AddReportEntry(throughputExp.Name, throughputExp)
stopWatch := throughputExp.NewStopwatch()

streamClientConfiguration, err := raw.NewClientConfiguration(defaultRabbitmqUri)
Expect(err).ToNot(HaveOccurred())
streamClient, err := raw.DialConfig(itCtx, streamClientConfiguration)
Expect(err).ToNot(HaveOccurred())
Expect(streamClient.IsOpen()).To(BeTrue(), "expected stream client to be open")

const publisherId = 1
Expect(
streamClient.DeclarePublisher(itCtx, publisherId, "e2e-publisher", streamName),
).To(Succeed())

// See https://github.com/rabbitmq/rabbitmq-stream-go-client/pull/255
// TODO Change this By when stream pkg producer is megered
By("receiving confirmations")
c := streamClient.NotifyPublish(make(chan *raw.PublishConfirm, 100))
var wg sync.WaitGroup
wg.Add(1)

const numMessages = 100_000
go func() {
defer GinkgoRecover()
defer wg.Done()
confirmStopwatch := throughputExp.NewStopwatch()
defer confirmStopwatch.Record("publish confirmation")

confirmedIds := make(map[uint64]struct{}, numMessages)
for {
select {
case <-ctx.Done():
Fail(fmt.Sprintf("context timed out: expected to receive 100_000 confirmations: received %d", len(confirmedIds)))
case confirm, ok := <-c:
if !ok {
return
}
ids := confirm.PublishingIds()
for i := 0; i < len(ids); i++ {
confirmedIds[ids[i]] = struct{}{}
}

if len(confirmedIds) == numMessages {
return
}
}
}
}()

// See https://github.com/rabbitmq/rabbitmq-stream-go-client/pull/255
// TODO Change this By when stream pkg producer is megered
By("sending messages")
for i := uint64(0); i < numMessages; i++ {
messageContainer := raw.NewPublishingMessage(i, &plainTextMessage{messageBody})
Expect(
streamClient.Send(itCtx, publisherId, wrap[common.PublishingMessager](messageContainer)),
).To(Succeed())
}
stopWatch.Record("Send").Reset()

wg.Wait()

By("creating a consumer")
handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
fmt.Printf("messages: %s\n", message.Data)
}
opts := &stream.ConsumerOptions{}
mutex := &sync.Mutex{}
consumer, err := stream.NewConsumer(streamName, streamClient, handleMessages, opts, mutex)
Expect(err).NotTo(HaveOccurred())
err = consumer.Subscribe(itCtx)
Expect(err).NotTo(HaveOccurred())
// TODO ensure all messages are consumed

By("deleting the stream")
Expect(env.DeleteStream(itCtx, streamName)).To(Succeed())
})
})

func wrap[T any](v T) []T {
r := make([]T, 1)
r[0] = v
return r
}

0 comments on commit fa93b35

Please sign in to comment.