Skip to content

Commit

Permalink
Merge pull request #232 from rabbitmq/query-offset
Browse files Browse the repository at this point in the history
QueryOffset
  • Loading branch information
ablease authored Sep 22, 2023
2 parents 9675818 + 8259676 commit a6cced8
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 7 deletions.
66 changes: 61 additions & 5 deletions pkg/stream/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ const (
)

type Environment struct {
configuration EnvironmentConfiguration
locators []*locator
backOffPolicy func(int) time.Duration
configuration EnvironmentConfiguration
locators []*locator
backOffPolicy func(int) time.Duration
locatorSelectSequential bool
}

func NewEnvironment(ctx context.Context, configuration EnvironmentConfiguration) (*Environment, error) {
Expand Down Expand Up @@ -125,8 +126,15 @@ func (e *Environment) DeleteStream(ctx context.Context, name string) error {
n := len(e.locators)

var lastError error
var l *locator
for i := 0; i < n; i++ {
l := e.pickLocator((i + rn) % n)
if e.locatorSelectSequential {
// round robin / sequential
l = e.locators[i]
} else {
// pick at random
l = e.pickLocator((i + rn) % n)
}

if err := l.maybeInitializeLocator(); err != nil {
logger.Error("locator not available", slog.Any("error", err))
Expand Down Expand Up @@ -182,8 +190,16 @@ func (e *Environment) QueryStreamStats(ctx context.Context, name string) (Stats,
n := len(e.locators)

var lastError error
var l *locator
for i := 0; i < n; i++ {
l := e.pickLocator((i + rn) % n)
if e.locatorSelectSequential {
// round robin / sequential
l = e.locators[i]
} else {
// pick at random
l = e.pickLocator((i + rn) % n)
}

if err := l.maybeInitializeLocator(); err != nil {
lastError = err
logger.Error("error initializing locator", slog.Any("error", err))
Expand All @@ -205,3 +221,43 @@ func (e *Environment) QueryStreamStats(ctx context.Context, name string) (Stats,
}
return Stats{-1, -1}, lastError
}

// QueryOffset retrieves the last consumer offset stored for a given consumer
// name and stream name.
func (e *Environment) QueryOffset(ctx context.Context, consumer, stream string) (uint64, error) {
logger := raw.LoggerFromCtxOrDiscard(ctx)
rn := rand.Intn(100)
n := len(e.locators)

var lastError error
var l *locator
for i := 0; i < n; i++ {
if e.locatorSelectSequential {
// round robin / sequential
l = e.locators[i]
} else {
// pick at random
l = e.pickLocator((i + rn) % n)
}

if err := l.maybeInitializeLocator(); err != nil {
lastError = err
logger.Error("error initializing locator", slog.Any("error", err))
continue
}

result := l.locatorOperation((*locator).operationQueryOffset, ctx, consumer, stream)
if result[1] != nil {
lastError = result[1].(error)
if isNonRetryableError(lastError) {
return uint64(0), lastError
}
logger.Error("locator operation failed", slog.Any("error", lastError))
continue
}

offset := result[0].(uint64)
return offset, nil
}
return uint64(0), lastError
}
102 changes: 100 additions & 2 deletions pkg/stream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ var _ = Describe("Environment", func() {

// marked as flaky because the environment picks a locator randomly
// the test flakes if locator2 is picked first
When("there are multiple locators", FlakeAttempts(3), Label("flaky"), func() {
When("there are multiple locators", func() {
var (
locator2rawClient *stream.MockRawClient
)
Expand All @@ -221,6 +221,7 @@ var _ = Describe("Environment", func() {
locator2rawClient = stream.NewMockRawClient(mockCtrl)
environment.AppendLocatorRawClient(locator2rawClient)
environment.SetBackoffPolicy(backOffPolicyFn)
environment.SetLocatorSelectSequential(true)

mockRawClient.EXPECT().
IsOpen().
Expand All @@ -229,6 +230,7 @@ var _ = Describe("Environment", func() {

It("uses different locators when one fails", func() {
// setup

locator2rawClient.EXPECT().
IsOpen().
Return(true)
Expand Down Expand Up @@ -364,7 +366,7 @@ var _ = Describe("Environment", func() {
})
})

When("there are multiple locators", FlakeAttempts(3), Label("flaky"), func() {
When("there are multiple locators", func() {
var (
locator2rawClient *stream.MockRawClient
)
Expand All @@ -373,6 +375,7 @@ var _ = Describe("Environment", func() {
locator2rawClient = stream.NewMockRawClient(mockCtrl)
environment.AppendLocatorRawClient(locator2rawClient)
environment.SetBackoffPolicy(backOffPolicyFn)
environment.SetLocatorSelectSequential(true)

// have to set server version again because there's a new locator
environment.SetServerVersion("3.11.1")
Expand Down Expand Up @@ -429,4 +432,99 @@ var _ = Describe("Environment", func() {
Eventually(logBuffer).Within(time.Millisecond * 500).Should(gbytes.Say(`"locator operation failed" error="err maybe later"`))
})
})

Context("query offset", func() {
BeforeEach(func() {
mockRawClient.EXPECT().
IsOpen().
Return(true) // from maybeInitializeLocator
})

It("queries offset for a given consumer and stream", func() {
// setup
mockRawClient.EXPECT().
QueryOffset(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("string"), gomock.AssignableToTypeOf("string")).
Return(uint64(42), nil)

// act
offset, err := environment.QueryOffset(rootCtx, "consumer-with-offset", "stream")
Expect(err).ToNot(HaveOccurred())
Expect(offset).To(BeNumerically("==", 42))
})

When("there is an error", func() {
It("bubbles up the error", func() {
// setup
mockRawClient.EXPECT().
QueryOffset(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("string"), gomock.AssignableToTypeOf("string")).
Return(uint64(0), errors.New("err not today")).
Times(3)

_, err := environment.QueryOffset(rootCtx, "retryable-error", "stream")
Expect(err).To(MatchError("err not today"))
})
})

When("there are multiple locators", func() {
var (
locator2rawClient *stream.MockRawClient
)

BeforeEach(func() {
locator2rawClient = stream.NewMockRawClient(mockCtrl)
environment.AppendLocatorRawClient(locator2rawClient)
environment.SetBackoffPolicy(backOffPolicyFn)
environment.SetLocatorSelectSequential(true)
})

It("uses different locators when one fails", func() {
// setup
locator2rawClient.EXPECT().
IsOpen().
Return(true)
locator2rawClient.EXPECT().
QueryOffset(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("string"), gomock.AssignableToTypeOf("string")).
Return(uint64(42), nil)

mockRawClient.EXPECT().
QueryOffset(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("string"), gomock.AssignableToTypeOf("string")).
Return(uint64(0), errors.New("something went wrong")).
Times(3)

// act
offset, err := environment.QueryOffset(rootCtx, "retried-offset", "stream")
Expect(err).ToNot(HaveOccurred())
Expect(offset).To(BeNumerically("==", 42))
})

It("gives up on non-retryable errors", func() {
// setup
mockRawClient.EXPECT().
QueryOffset(gomock.AssignableToTypeOf(ctxType), gomock.Eq("non-retryable"), gomock.AssignableToTypeOf("string")).
Return(uint64(0), raw.ErrStreamDoesNotExist)

// act
_, err := environment.QueryOffset(rootCtx, "non-retryable", "stream")
Expect(err).To(HaveOccurred())
})
})

It("logs intermediate error messages", func() {
// setup
logBuffer := gbytes.NewBuffer()
logger := slog.New(slog.NewTextHandler(logBuffer))
ctx := raw.NewContextWithLogger(context.Background(), *logger)

mockRawClient.EXPECT().
QueryOffset(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("string"), gomock.AssignableToTypeOf("string")).
Return(uint64(0), errors.New("err maybe later")).
Times(3)

// act
_, err := environment.QueryOffset(ctx, "log-things", "stream")
Expect(err).To(HaveOccurred())

Eventually(logBuffer).Within(time.Millisecond * 500).Should(gbytes.Say(`"locator operation failed" error="err maybe later"`))
})
})
})
8 changes: 8 additions & 0 deletions pkg/stream/locator.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,11 @@ func (l *locator) operationQueryStreamStats(args ...any) []any {
stats, err := l.client.StreamStats(ctx, name)
return []any{stats, err}
}

func (l *locator) operationQueryOffset(args ...any) []any {
ctx := args[0].(context.Context)
reference := args[1].(string)
stream := args[2].(string)
offset, err := l.client.QueryOffset(ctx, reference, stream)
return []any{offset, err}
}
4 changes: 4 additions & 0 deletions pkg/stream/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@ func (e *Environment) SetServerVersion(v string) {
func (e *Environment) SetBackoffPolicy(f func(int) time.Duration) {
e.backOffPolicy = f
}

func (e *Environment) SetLocatorSelectSequential(v bool) {
e.locatorSelectSequential = v
}

0 comments on commit a6cced8

Please sign in to comment.