Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MVP of smart producer #255

Merged
merged 24 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/build_test_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: oldstable
go-version: stable
# go-version: oldstable

- name: Build
run: make build
Expand Down Expand Up @@ -50,7 +51,8 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: oldstable
go-version: stable
# go-version: oldstable

- name: E2E Tests
env:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/static-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
with:
version: "2023.1.3"
install-go: false
build-tags: "rabbitmq.stream.test,rabbitmq.stream.e2e"
build-tags: "rabbitmq.stream.test,rabbitmq.stream.e2e,rabbitmq.stream.system_test"

format-check:
name: Check Go Formatting
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ coverage.txt

# Dependency directories (remove the comment below to include it)
# vendor/
/bin/
29 changes: 21 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,27 @@ help:

GO ?= $(shell which go)
GOPATH ?= $(shell $(GO) env GOPATH)
GOBIN ?= $(CURDIR)/bin
$(GOBIN):
mkdir -pv $(GOBIN)

define GO_TOOLS
"github.com/golang/mock/mockgen" \
"go.uber.org/mock/mockgen" \
"github.com/onsi/ginkgo/v2/ginkgo"
endef

GINKGO ?= $(GOPATH)/bin/ginkgo
$(GINKGO):
GINKGO ?= $(GOBIN)/ginkgo
$(GINKGO): | $(GOBIN)
@printf "$(GREEN)Installing ginkgo CLI$(NORMAL)\n"
$(GO) install -mod=mod github.com/onsi/ginkgo/v2/ginkgo
GOBIN="$(GOBIN)" $(GO) install -mod=mod github.com/onsi/ginkgo/v2/ginkgo

.PHONY: ginkgo
ginkgo: | $(GINKGO)

MOCKGEN ?= $(GOPATH)/bin/mockgen
MOCKGEN ?= $(GOBIN)/mockgen
$(MOCKGEN):
@printf "$(GREEN)Installing mockgen CLI$(NORMAL)\n"
$(GO) install -mod=mod go.uber.org/mock/mockgen
GOBIN="$(GOBIN)" $(GO) install -mod=mod go.uber.org/mock/mockgen

.PHONY: mockgen
mockgen: | $(MOCKGEN)
Expand All @@ -50,8 +54,8 @@ install-tools: ## Install tool dependencies for development
### Golang targets

.PHONY: go-mod-tidy
go-mod-tidy: ## Run 'go mod tidy' with compatibility to Go 1.19
$(GO) mod tidy -go=1.19
go-mod-tidy: ## Run 'go mod tidy' with compatibility to Go 1.21
$(GO) mod tidy -go=1.21

.PHONY: go-generate-mocks
go-generate-mocks: | $(MOCKGEN) ## Generate Mocks for testing
Expand Down Expand Up @@ -88,6 +92,15 @@ tests-ci:
--fail-on-pending \
--keep-going

.PHONY: system-tests
system-tests: ## Run system tests. It starts a rabbitmq container. To skip starting the rabbit container, use RABBITMQ_STREAM_SKIP_RABBIT_START="skip"
@printf "$(GREEN)Running system tests in parallel$(NORMAL)\n"
RABBITMQ_STREAM_RUN_SYSTEM_TEST="run" \
$(GINKGO) $(GINKGO_RUN_SHARED_FLAGS) $(GINKGO_RUN_FLAGS) \
--tags="rabbitmq.stream.test,rabbitmq.stream.system_test" \
--focus 'System tests' \
$(GINKGO_EXTRA) ./pkg/stream/


#### e2e test suite accepts the flags -keep-rabbit-container=true and -rabbit-debug-log=true
#### -keep-rabbit-container=true does not delete the rabbit container after the suite run. It is useful to examine rabbit logs after a test failure
Expand Down
29 changes: 15 additions & 14 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
module github.com/rabbitmq/rabbitmq-stream-go-client/v2

go 1.19
go 1.21

toolchain go1.21.0

require (
github.com/michaelklishin/rabbit-hole/v2 v2.13.0
github.com/onsi/ginkgo/v2 v2.11.0
github.com/onsi/gomega v1.27.10
go.uber.org/mock v0.2.0
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
golang.org/x/mod v0.12.0
github.com/michaelklishin/rabbit-hole/v2 v2.15.0
github.com/onsi/ginkgo/v2 v2.13.2
github.com/onsi/gomega v1.30.0
go.uber.org/mock v0.4.0
golang.org/x/mod v0.14.0
)

require (
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
golang.org/x/tools v0.11.0 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/pprof v0.0.0-20231229205709-960ae82b1e42 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.16.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
77 changes: 40 additions & 37 deletions go.sum

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion internal/constants.go

This file was deleted.

36 changes: 36 additions & 0 deletions internal/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,55 @@ type MetadataResponse struct {
streamsMetadata []StreamMetadata
}

func (m *MetadataResponse) Brokers() []Broker {
return m.brokers
}

func (m *MetadataResponse) StreamsMetadata() []StreamMetadata {
return m.streamsMetadata
}

type Broker struct {
reference uint16
host string
port uint32
}

func (b *Broker) Reference() uint16 {
return b.reference
}

func (b *Broker) Host() string {
return b.host
}

func (b *Broker) Port() uint32 {
return b.port
}

type StreamMetadata struct {
streamName string
responseCode uint16
leaderReference uint16
replicasReferences []uint16
}

func (s *StreamMetadata) StreamName() string {
return s.streamName
}

func (s *StreamMetadata) ResponseCode() uint16 {
return s.responseCode
}

func (s *StreamMetadata) LeaderReference() uint16 {
return s.leaderReference
}

func (s *StreamMetadata) ReplicasReferences() []uint16 {
return s.replicasReferences
}

func NewMetadataResponse(correlationId,
port uint32,
brokerReference,
Expand Down
74 changes: 55 additions & 19 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,77 +3,107 @@ package main
import (
"bufio"
"context"
"flag"
"fmt"
"github.com/rabbitmq/rabbitmq-stream-go-client/v2/pkg/codecs/amqp"
"github.com/rabbitmq/rabbitmq-stream-go-client/v2/pkg/common"
"github.com/rabbitmq/rabbitmq-stream-go-client/v2/pkg/constants"
"github.com/rabbitmq/rabbitmq-stream-go-client/v2/pkg/raw"
"github.com/rabbitmq/rabbitmq-stream-go-client/v2/pkg/stream"
"golang.org/x/exp/slog"
"log/slog"
"os"
"time"
)

func main() {
runSmartClient()
runRawClient()
runRawClientFlag := flag.Bool("run-raw-client", false, "set it to run raw client")
runSmartClientFlag := flag.Bool("run-smart-client", false, "set it to run raw client")
flag.Parse()
if *runRawClientFlag {
runRawClient()
}
if *runSmartClientFlag {
runSmartClient()
}
}

func runSmartClient() {
h := slog.HandlerOptions{
Level: slog.LevelDebug,
slogOpts := &slog.HandlerOptions{
Level: slog.LevelInfo,
}
log := slog.New(h.NewTextHandler(os.Stdout))
log := slog.New(slog.NewTextHandler(os.Stdout, slogOpts))

// FIXME: producer manager does not register notify publish
ctx := raw.NewContextWithLogger(context.Background(), *log)

c := stream.NewEnvironmentConfiguration(
stream.WithLazyInitialization(false),
stream.WithUri("rabbitmq-stream://localhost:5552"),
stream.WithAddressResolver(func(_ string, _ int) (_ string, _ int) {
return "localhost", 5552
}),
)

env, err := stream.NewEnvironment(ctx, c)
if err != nil {
panic(err)
}
//defer env.Close()

err = env.CreateStream(ctx, "my-stream", stream.CreateStreamOptions{})
if err != nil {
panic(err)
}

sc := bufio.NewScanner(os.Stdin)
fmt.Print("Close the connection and press enter")
sc.Scan()

err = env.DeleteStream(ctx, "my-stream")
var nConfirm int
producer, err := env.CreateProducer(ctx, "my-stream", &stream.ProducerOptions{
MaxInFlight: 1_000,
MaxBufferedMessages: 100,
ConfirmationHandler: func(c *stream.MessageConfirmation) {
if c.Status() == stream.Confirmed {
nConfirm += 1
if nConfirm%1_000 == 0 {
log.Info("received confirmations", slog.Int("confirm-count", nConfirm))
}
} else {
log.Warn("message not confirmed", slog.Int("confirm-status", int(c.Status())))
}
},
})
if err != nil {
panic(err)
}

err = env.CreateStream(ctx, "other-stream", stream.CreateStreamOptions{})
if err != nil {
panic(err)
for i := 0; i < 1_000_000; i++ {
err = producer.Send(context.Background(), amqp.Message{Data: []byte(fmt.Sprintf("Message #%d", i))})
if err != nil {
log.Warn("failed to send a message", slog.Int("message-n", i))
}
if i%1_000 == 0 {
log.Info("sent messages", slog.Int("send-count", i))
}
}

fmt.Print("Life good! Press enter to exit")
sc := bufio.NewScanner(os.Stdin)
fmt.Print("Press enter to continue and exit")
sc.Scan()
err = env.DeleteStream(ctx, "other-stream")

err = env.DeleteStream(ctx, "my-stream")
if err != nil {
panic(err)
}

env.Close(ctx)
}

func runRawClient() {
log := slog.New(slog.NewTextHandler(os.Stdout))
log := slog.New(slog.NewTextHandler(os.Stdout, nil))
streamName := "test-streamName"
config, err := raw.NewClientConfiguration("rabbitmq-stream://guest:guest@localhost:5552")
if err != nil {
panic(err)
}

config.SetConnectionName("test-connection")
config.ConnectionName = "test-connection"
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
rabbitmqCtx := raw.NewContextWithLogger(ctx, *log)
Expand Down Expand Up @@ -104,6 +134,12 @@ func runRawClient() {
panic(err)
}

metadata, err := streamClient.MetadataQuery(ctx, []string{streamName})
if err != nil {
panic(err)
}
log.Info("metadata query success", slog.Any("metadata", *metadata))

const batchSize = 100
const iterations = 1000
const totalMessages = iterations * batchSize
Expand Down
14 changes: 7 additions & 7 deletions pkg/e2e/end_to_end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ import (
"github.com/rabbitmq/rabbitmq-stream-go-client/v2/pkg/common"
"github.com/rabbitmq/rabbitmq-stream-go-client/v2/pkg/constants"
"github.com/rabbitmq/rabbitmq-stream-go-client/v2/pkg/raw"
"golang.org/x/exp/slog"
"io"
"log/slog"
"os"
"sync"
"time"
)

var e2eLogger = slog.New(slog.NewTextHandler(GinkgoWriter))
var e2eLogger = slog.New(slog.NewTextHandler(GinkgoWriter, nil))

var _ = Describe("E2E", Serial, Label("e2e"), func() {
const (
Expand Down Expand Up @@ -134,8 +134,8 @@ var _ = Describe("E2E", Serial, Label("e2e"), func() {

// Send and Recveive Messages, assert messages received are valid.
It("sends, and receives messages", Label("behaviour"), func(ctx SpecContext) {
h := slog.HandlerOptions{Level: slog.LevelDebug}.NewTextHandler(GinkgoWriter)
debugLogger := slog.New(h)
opts := &slog.HandlerOptions{Level: slog.LevelDebug}
debugLogger := slog.New(slog.NewTextHandler(GinkgoWriter, opts))
itCtx := raw.NewContextWithLogger(ctx, *debugLogger)
streamClientConfiguration, err := raw.NewClientConfiguration(rabbitmqUri)
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -243,13 +243,13 @@ var _ = Describe("E2E", Serial, Label("e2e"), func() {
// With the HTTP API, we can check the connection name and kill it.
// The client has to notify the disconnection.
It("connection name and notify disconnection", Label("behaviour"), func(ctx SpecContext) {
h := slog.HandlerOptions{Level: slog.LevelDebug}.NewTextHandler(GinkgoWriter)
debugLogger := slog.New(h)
options := &slog.HandlerOptions{Level: slog.LevelDebug}
debugLogger := slog.New(slog.NewTextHandler(GinkgoWriter, options))
itCtx := raw.NewContextWithLogger(ctx, *debugLogger)
streamClientConfiguration, err := raw.NewClientConfiguration(rabbitmqUri)
Expect(err).ToNot(HaveOccurred())
connectionName := "notify-disconnection-test-1"
streamClientConfiguration.SetConnectionName(connectionName)
streamClientConfiguration.ConnectionName = connectionName

By("preparing the environment")
streamClient, err := raw.DialConfig(itCtx, streamClientConfiguration)
Expand Down
Loading