Skip to content

Commit

Permalink
Add consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
cv65kr committed Feb 9, 2024
1 parent 81394dd commit 129ea34
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 17 deletions.
28 changes: 24 additions & 4 deletions pubsubjobs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Driver struct {
topic string
msgInFlight *int64
msgInFlightLimit *int32
sub string

// if user invoke several resume operations
listeners uint32
Expand Down Expand Up @@ -107,6 +108,7 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pip
cond: sync.Cond{L: &sync.Mutex{}},
msgInFlightLimit: ptr(conf.Prefetch),
msgInFlight: ptr(int64(0)),
sub: pipe.Name(),
}

ctx := context.Background()
Expand Down Expand Up @@ -161,6 +163,7 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.
cond: sync.Cond{L: &sync.Mutex{}},
msgInFlightLimit: ptr(int32(pipe.Int(pref, 10))),
msgInFlight: ptr(int64(0)),
sub: pipe.Name(),
}

ctx := context.Background()
Expand Down Expand Up @@ -346,13 +349,30 @@ func (d *Driver) manageTopic(ctx context.Context) error {
return nil
}

_, err := d.client.CreateTopic(ctx, d.topic)
topic, err := d.client.CreateTopic(ctx, d.topic)
if err != nil {
if strings.Contains(err.Error(), "Topic already exists") {
return nil
if !strings.Contains(err.Error(), "Topic already exists") {
return err
}

topic = d.client.Topic(d.topic)
} else {
d.log.Debug("created topic", zap.String("topic", d.topic))
}

_, err = d.client.CreateSubscription(ctx, d.sub, pubsub.SubscriptionConfig{
Topic: topic,
AckDeadline: 10 * time.Second,
ExpirationPolicy: time.Duration(0),
})
if err != nil {
if !strings.Contains(err.Error(), "Subscription already exists") {
return err
}
return err
}
d.log.Debug("created subscription", zap.String("topic", d.topic), zap.String("subscription", d.sub))

topic.Stop()

return nil
}
Expand Down
1 change: 1 addition & 0 deletions pubsubjobs/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func (i *Item) Nack() error {
i.Options.cond.Signal()
atomic.AddInt64(i.Options.msgInFlight, ^int64(0))
}()

// message already deleted
if i.Options.AutoAck {
return nil
Expand Down
17 changes: 8 additions & 9 deletions pubsubjobs/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,7 @@ func (d *Driver) listen(ctx context.Context) {
d.log.Debug("listener was stopped")
return
default:
s, err := d.client.Topic(d.topic).Subscriptions(ctx).Next()
if err != nil {
d.log.Error("subscription iteration", zap.Error(err))
continue
}

s.Receive(context.Background(), func(ctx context.Context, message *pubsub.Message) {
d.client.Subscription(d.sub).Receive(ctx, func(ctx context.Context, message *pubsub.Message) {

Check failure on line 20 in pubsubjobs/listener.go

View workflow job for this annotation

GitHub Actions / Golang-CI (lint)

Error return value of `(*cloud.google.com/go/pubsub.Subscription).Receive` is not checked (errcheck)

Check failure on line 20 in pubsubjobs/listener.go

View workflow job for this annotation

GitHub Actions / Golang-CI (lint)

Error return value of `(*cloud.google.com/go/pubsub.Subscription).Receive` is not checked (errcheck)
d.cond.L.Lock()
// lock when we hit the limit
for atomic.LoadInt64(d.msgInFlight) >= int64(atomic.LoadInt32(d.msgInFlightLimit)) {
Expand All @@ -36,9 +30,14 @@ func (d *Driver) listen(ctx context.Context) {

ctxspan, span := d.tracer.Tracer(tracerName).Start(d.prop.Extract(context.Background(), propagation.HeaderCarrier(item.headers)), "google_pub_sub_listener")
if item.Options.AutoAck {
item.Ack()
_, err := message.AckWithResult().Get(ctx)
if err != nil {
d.log.Error("message acknowledge", zap.Error(err))
span.RecordError(err)
span.End()
return
}
d.log.Debug("auto ack is turned on, message acknowledged")
span.End()
}

if item.headers == nil {
Expand Down
45 changes: 45 additions & 0 deletions tests/helpers/helpers.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package helpers

import (
"context"
"net"
"net/rpc"
"os"
"strings"
"testing"
"time"

"cloud.google.com/go/pubsub"
"github.com/google/uuid"
jobsProto "github.com/roadrunner-server/api/v4/build/jobs/v1"
jobState "github.com/roadrunner-server/api/v4/plugins/v1/jobs"
Expand Down Expand Up @@ -182,3 +186,44 @@ func DeclarePipe(queue string, address string, pipeline string) func(t *testing.
assert.NoError(t, err)
}
}

func CleanEmulator() error {
os.Setenv("PUBSUB_EMULATOR_HOST", "127.0.0.1:8085")
ctx := context.Background()
client, err := pubsub.NewClient(ctx, "test")
if err != nil {
return err
}

for {
sub, err := client.Subscriptions(ctx).Next()
if err != nil {
if strings.Contains(err.Error(), "no more items in iterator") {
break;
}

return err
}

err = sub.Delete(ctx)
if err != nil {
return err
}
}

for {
topic, err := client.Topics(ctx).Next()
if err != nil {
if strings.Contains(err.Error(), "no more items in iterator") {
return nil
}

return err
}

err = topic.Delete(ctx)
if err != nil {
return err
}
}
}
113 changes: 109 additions & 4 deletions tests/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,16 @@ import (
rpcPlugin "github.com/roadrunner-server/rpc/v4"
"github.com/roadrunner-server/server/v4"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestInit(t *testing.T) {
err := helpers.CleanEmulator()
if err != nil {
assert.FailNow(t, "error", err.Error())
}

cont := endure.New(slog.LevelDebug)

cfg := &config.Plugin{
Expand All @@ -34,7 +40,7 @@ func TestInit(t *testing.T) {
Prefix: "rr",
}

err := cont.RegisterAll(
err = cont.RegisterAll(
cfg,
&server.Plugin{},
&rpcPlugin.Plugin{},
Expand Down Expand Up @@ -103,6 +109,11 @@ func TestInit(t *testing.T) {
}

func TestDeclare(t *testing.T) {
err := helpers.CleanEmulator()
if err != nil {
assert.FailNow(t, "error", err.Error())
}

cont := endure.New(slog.LevelDebug)

cfg := &config.Plugin{
Expand All @@ -111,7 +122,7 @@ func TestDeclare(t *testing.T) {
Prefix: "rr",
}

err := cont.RegisterAll(
err = cont.RegisterAll(
cfg,
&server.Plugin{},
&rpcPlugin.Plugin{},
Expand Down Expand Up @@ -184,6 +195,11 @@ func TestDeclare(t *testing.T) {
}

func TestJobsError(t *testing.T) {
err := helpers.CleanEmulator()
if err != nil {
assert.FailNow(t, "error", err.Error())
}

cont := endure.New(slog.LevelDebug)

cfg := &config.Plugin{
Expand All @@ -192,7 +208,7 @@ func TestJobsError(t *testing.T) {
Prefix: "rr",
}

err := cont.RegisterAll(
err = cont.RegisterAll(
cfg,
&server.Plugin{},
&rpcPlugin.Plugin{},
Expand Down Expand Up @@ -267,6 +283,11 @@ func TestJobsError(t *testing.T) {
}

func TestRemovePQ(t *testing.T) {
err := helpers.CleanEmulator()
if err != nil {
assert.FailNow(t, "error", err.Error())
}

cont := endure.New(slog.LevelDebug)

cfg := &config.Plugin{
Expand All @@ -276,7 +297,7 @@ func TestRemovePQ(t *testing.T) {
}

l, oLogger := mocklogger.ZapTestLogger(zap.DebugLevel)
err := cont.RegisterAll(
err = cont.RegisterAll(
cfg,
&server.Plugin{},
&rpcPlugin.Plugin{},
Expand Down Expand Up @@ -353,3 +374,87 @@ func TestRemovePQ(t *testing.T) {
assert.Equal(t, 2, oLogger.FilterMessageSnippet("job processing was started").Len())
assert.Equal(t, 2, oLogger.FilterMessageSnippet("listener was stopped").Len())
}

func TestAutoAck(t *testing.T) {
err := helpers.CleanEmulator()
if err != nil {
assert.FailNow(t, "error", err.Error())
}

cont := endure.New(slog.LevelDebug)

cfg := &config.Plugin{
Version: "2023.3.0",
Path: "configs/.rr-init.yaml",
Prefix: "rr",
}

l, oLogger := mocklogger.ZapTestLogger(zap.DebugLevel)
err = cont.RegisterAll(
cfg,
&server.Plugin{},
&rpcPlugin.Plugin{},
l,
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
&googlePubSub.Plugin{},
)
assert.NoError(t, err)

err = cont.Init()
if err != nil {
t.Fatal(err)
}

ch, err := cont.Serve()
if err != nil {
t.Fatal(err)
}

sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)

wg := &sync.WaitGroup{}
wg.Add(1)

stopCh := make(chan struct{}, 1)

go func() {
defer wg.Done()
for {
select {
case e := <-ch:
assert.Fail(t, "error", e.Error.Error())
err = cont.Stop()
if err != nil {
assert.FailNow(t, "error", err.Error())
}
case <-sig:
err = cont.Stop()
if err != nil {
assert.FailNow(t, "error", err.Error())
}
return
case <-stopCh:
// timeout
err = cont.Stop()
if err != nil {
assert.FailNow(t, "error", err.Error())
}
return
}
}
}()

time.Sleep(time.Second * 3)
t.Run("PushPipeline", helpers.PushToPipe("test-1", true, "127.0.0.1:6001"))
t.Run("PushPipeline", helpers.PushToPipe("test-2", true, "127.0.0.1:6001"))
time.Sleep(time.Second * 2)
t.Run("DestroyPipeline", helpers.DestroyPipelines("127.0.0.1:6001", "test-1", "test-2"))

stopCh <- struct{}{}
wg.Wait()

require.Equal(t, 2, oLogger.FilterMessageSnippet("auto ack is turned on, message acknowledged").Len())
}

0 comments on commit 129ea34

Please sign in to comment.