From 129ea3437302ea1d0e9b61e3bc2cb8b07a6f93f1 Mon Sep 17 00:00:00 2001 From: Kajetan Date: Fri, 9 Feb 2024 19:48:04 +0100 Subject: [PATCH] Add consumer --- pubsubjobs/driver.go | 28 ++++++++-- pubsubjobs/item.go | 1 + pubsubjobs/listener.go | 17 +++--- tests/helpers/helpers.go | 45 ++++++++++++++++ tests/jobs_test.go | 113 +++++++++++++++++++++++++++++++++++++-- 5 files changed, 187 insertions(+), 17 deletions(-) diff --git a/pubsubjobs/driver.go b/pubsubjobs/driver.go index 0504540..2f1f7ac 100644 --- a/pubsubjobs/driver.go +++ b/pubsubjobs/driver.go @@ -48,6 +48,7 @@ type Driver struct { topic string msgInFlight *int64 msgInFlightLimit *int32 + sub string // if user invoke several resume operations listeners uint32 @@ -107,6 +108,7 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pip cond: sync.Cond{L: &sync.Mutex{}}, msgInFlightLimit: ptr(conf.Prefetch), msgInFlight: ptr(int64(0)), + sub: pipe.Name(), } ctx := context.Background() @@ -161,6 +163,7 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap. cond: sync.Cond{L: &sync.Mutex{}}, msgInFlightLimit: ptr(int32(pipe.Int(pref, 10))), msgInFlight: ptr(int64(0)), + sub: pipe.Name(), } ctx := context.Background() @@ -346,13 +349,30 @@ func (d *Driver) manageTopic(ctx context.Context) error { return nil } - _, err := d.client.CreateTopic(ctx, d.topic) + topic, err := d.client.CreateTopic(ctx, d.topic) if err != nil { - if strings.Contains(err.Error(), "Topic already exists") { - return nil + if !strings.Contains(err.Error(), "Topic already exists") { + return err + } + + topic = d.client.Topic(d.topic) + } else { + d.log.Debug("created topic", zap.String("topic", d.topic)) + } + + _, err = d.client.CreateSubscription(ctx, d.sub, pubsub.SubscriptionConfig{ + Topic: topic, + AckDeadline: 10 * time.Second, + ExpirationPolicy: time.Duration(0), + }) + if err != nil { + if !strings.Contains(err.Error(), "Subscription already exists") { + return err } - return err } + d.log.Debug("created subscription", zap.String("topic", d.topic), zap.String("subscription", d.sub)) + + topic.Stop() return nil } diff --git a/pubsubjobs/item.go b/pubsubjobs/item.go index 88065e8..82b9c83 100644 --- a/pubsubjobs/item.go +++ b/pubsubjobs/item.go @@ -127,6 +127,7 @@ func (i *Item) Nack() error { i.Options.cond.Signal() atomic.AddInt64(i.Options.msgInFlight, ^int64(0)) }() + // message already deleted if i.Options.AutoAck { return nil diff --git a/pubsubjobs/listener.go b/pubsubjobs/listener.go index 590ed0b..2d66ac2 100644 --- a/pubsubjobs/listener.go +++ b/pubsubjobs/listener.go @@ -17,13 +17,7 @@ func (d *Driver) listen(ctx context.Context) { d.log.Debug("listener was stopped") return default: - s, err := d.client.Topic(d.topic).Subscriptions(ctx).Next() - if err != nil { - d.log.Error("subscription iteration", zap.Error(err)) - continue - } - - s.Receive(context.Background(), func(ctx context.Context, message *pubsub.Message) { + d.client.Subscription(d.sub).Receive(ctx, func(ctx context.Context, message *pubsub.Message) { d.cond.L.Lock() // lock when we hit the limit for atomic.LoadInt64(d.msgInFlight) >= int64(atomic.LoadInt32(d.msgInFlightLimit)) { @@ -36,9 +30,14 @@ func (d *Driver) listen(ctx context.Context) { ctxspan, span := d.tracer.Tracer(tracerName).Start(d.prop.Extract(context.Background(), propagation.HeaderCarrier(item.headers)), "google_pub_sub_listener") if item.Options.AutoAck { - item.Ack() + _, err := message.AckWithResult().Get(ctx) + if err != nil { + d.log.Error("message acknowledge", zap.Error(err)) + span.RecordError(err) + span.End() + return + } d.log.Debug("auto ack is turned on, message acknowledged") - span.End() } if item.headers == nil { diff --git a/tests/helpers/helpers.go b/tests/helpers/helpers.go index 0b93b27..4fb6da2 100644 --- a/tests/helpers/helpers.go +++ b/tests/helpers/helpers.go @@ -1,11 +1,15 @@ package helpers import ( + "context" "net" "net/rpc" + "os" + "strings" "testing" "time" + "cloud.google.com/go/pubsub" "github.com/google/uuid" jobsProto "github.com/roadrunner-server/api/v4/build/jobs/v1" jobState "github.com/roadrunner-server/api/v4/plugins/v1/jobs" @@ -182,3 +186,44 @@ func DeclarePipe(queue string, address string, pipeline string) func(t *testing. assert.NoError(t, err) } } + +func CleanEmulator() error { + os.Setenv("PUBSUB_EMULATOR_HOST", "127.0.0.1:8085") + ctx := context.Background() + client, err := pubsub.NewClient(ctx, "test") + if err != nil { + return err + } + + for { + sub, err := client.Subscriptions(ctx).Next() + if err != nil { + if strings.Contains(err.Error(), "no more items in iterator") { + break; + } + + return err + } + + err = sub.Delete(ctx) + if err != nil { + return err + } + } + + for { + topic, err := client.Topics(ctx).Next() + if err != nil { + if strings.Contains(err.Error(), "no more items in iterator") { + return nil + } + + return err + } + + err = topic.Delete(ctx) + if err != nil { + return err + } + } +} diff --git a/tests/jobs_test.go b/tests/jobs_test.go index 66e8e13..1f9213c 100644 --- a/tests/jobs_test.go +++ b/tests/jobs_test.go @@ -22,10 +22,16 @@ import ( rpcPlugin "github.com/roadrunner-server/rpc/v4" "github.com/roadrunner-server/server/v4" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap" ) func TestInit(t *testing.T) { + err := helpers.CleanEmulator() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + cont := endure.New(slog.LevelDebug) cfg := &config.Plugin{ @@ -34,7 +40,7 @@ func TestInit(t *testing.T) { Prefix: "rr", } - err := cont.RegisterAll( + err = cont.RegisterAll( cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, @@ -103,6 +109,11 @@ func TestInit(t *testing.T) { } func TestDeclare(t *testing.T) { + err := helpers.CleanEmulator() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + cont := endure.New(slog.LevelDebug) cfg := &config.Plugin{ @@ -111,7 +122,7 @@ func TestDeclare(t *testing.T) { Prefix: "rr", } - err := cont.RegisterAll( + err = cont.RegisterAll( cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, @@ -184,6 +195,11 @@ func TestDeclare(t *testing.T) { } func TestJobsError(t *testing.T) { + err := helpers.CleanEmulator() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + cont := endure.New(slog.LevelDebug) cfg := &config.Plugin{ @@ -192,7 +208,7 @@ func TestJobsError(t *testing.T) { Prefix: "rr", } - err := cont.RegisterAll( + err = cont.RegisterAll( cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, @@ -267,6 +283,11 @@ func TestJobsError(t *testing.T) { } func TestRemovePQ(t *testing.T) { + err := helpers.CleanEmulator() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + cont := endure.New(slog.LevelDebug) cfg := &config.Plugin{ @@ -276,7 +297,7 @@ func TestRemovePQ(t *testing.T) { } l, oLogger := mocklogger.ZapTestLogger(zap.DebugLevel) - err := cont.RegisterAll( + err = cont.RegisterAll( cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, @@ -353,3 +374,87 @@ func TestRemovePQ(t *testing.T) { assert.Equal(t, 2, oLogger.FilterMessageSnippet("job processing was started").Len()) assert.Equal(t, 2, oLogger.FilterMessageSnippet("listener was stopped").Len()) } + +func TestAutoAck(t *testing.T) { + err := helpers.CleanEmulator() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + + cont := endure.New(slog.LevelDebug) + + cfg := &config.Plugin{ + Version: "2023.3.0", + Path: "configs/.rr-init.yaml", + Prefix: "rr", + } + + l, oLogger := mocklogger.ZapTestLogger(zap.DebugLevel) + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + l, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &googlePubSub.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + t.Run("PushPipeline", helpers.PushToPipe("test-1", true, "127.0.0.1:6001")) + t.Run("PushPipeline", helpers.PushToPipe("test-2", true, "127.0.0.1:6001")) + time.Sleep(time.Second * 2) + t.Run("DestroyPipeline", helpers.DestroyPipelines("127.0.0.1:6001", "test-1", "test-2")) + + stopCh <- struct{}{} + wg.Wait() + + require.Equal(t, 2, oLogger.FilterMessageSnippet("auto ack is turned on, message acknowledged").Len()) +}