Skip to content

Commit

Permalink
Clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
cv65kr committed Feb 10, 2024
1 parent c1dda19 commit c69ad8b
Show file tree
Hide file tree
Showing 13 changed files with 18 additions and 231 deletions.
21 changes: 14 additions & 7 deletions pubsubjobs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type Driver struct {
client *pubsub.Client

stopped uint64
pauseCh chan struct{}
stopCh chan struct{}
}

// FromConfig initializes google_pub_sub_driver_ pipeline
Expand Down Expand Up @@ -103,7 +103,7 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pip
skipDeclare: conf.SkipTopicDeclaration,
topic: conf.Topic,
pq: pq,
pauseCh: make(chan struct{}, 1),
stopCh: make(chan struct{}, 2),
cond: sync.Cond{L: &sync.Mutex{}},
msgInFlightLimit: ptr(conf.Prefetch),
msgInFlight: ptr(int64(0)),
Expand Down Expand Up @@ -156,7 +156,7 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.
tracer: tracer,
log: log,
pq: pq,
pauseCh: make(chan struct{}, 1),
stopCh: make(chan struct{}, 2),
skipDeclare: pipe.Bool(skipTopicDeclaration, false),
topic: pipe.String(topic, "default"),
cond: sync.Cond{L: &sync.Mutex{}},
Expand Down Expand Up @@ -184,10 +184,17 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.
}

func (d *Driver) Push(ctx context.Context, jb jobs.Message) error {
const op = errors.Op("google_pub_sub_push")
// check if the pipeline registered
ctx, span := trace.SpanFromContext(ctx).TracerProvider().Tracer(tracerName).Start(ctx, "google_pub_sub_push")
defer span.End()

// load atomic value
pipe := *d.pipeline.Load()
if pipe.Name() != jb.GroupID() {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.GroupID(), pipe.Name()))
}

job := fromJob(jb)

data, err := json.Marshal(job.Metadata)
Expand Down Expand Up @@ -274,7 +281,7 @@ func (d *Driver) Pause(ctx context.Context, p string) error {
}

// stop consume
d.pauseCh <- struct{}{}
d.stopCh <- struct{}{}
// if blocked, let 1 item to pass to unblock the listener and close the pipe
d.cond.Signal()

Expand Down Expand Up @@ -333,7 +340,7 @@ func (d *Driver) Stop(ctx context.Context) error {
// if blocked, let 1 item to pass to unblock the listener and close the pipe
d.cond.Signal()

d.pauseCh <- struct{}{}
d.stopCh <- struct{}{}
}

d.log.Debug("pipeline was stopped", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", time.Now().UTC()), zap.Duration("elapsed", time.Since(start)))
Expand All @@ -357,8 +364,8 @@ func (d *Driver) manageTopic(ctx context.Context) error {
}

_, err = d.client.CreateSubscription(ctx, d.sub, pubsub.SubscriptionConfig{
Topic: topic,
AckDeadline: 30 * time.Second,
Topic: topic,
AckDeadline: 10 * time.Minute,
})
if err != nil {
if !strings.Contains(err.Error(), "Subscription already exists") {
Expand Down
10 changes: 2 additions & 8 deletions pubsubjobs/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func (d *Driver) listen(ctx context.Context) {
go func() {
for {
select {
case <-d.pauseCh:
case <-d.stopCh:
d.log.Debug("listener was stopped")
return
default:
Expand All @@ -30,13 +30,7 @@ func (d *Driver) listen(ctx context.Context) {

ctxspan, span := d.tracer.Tracer(tracerName).Start(d.prop.Extract(context.Background(), propagation.HeaderCarrier(item.Metadata)), "google_pub_sub_listener")
if item.Options.AutoAck {
_, err := message.AckWithResult().Get(ctx)
if err != nil {
d.log.Error("message acknowledge", zap.Error(err))
span.RecordError(err)
span.End()
return
}
message.Ack()
d.log.Debug("auto ack is turned on, message acknowledged")
}

Expand Down
2 changes: 1 addition & 1 deletion tests/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,5 +456,5 @@ func TestAutoAck(t *testing.T) {
stopCh <- struct{}{}
wg.Wait()

require.Equal(t, 2, oLogger.FilterMessageSnippet("auto ack is turned on, message acknowledged").Len())
require.Equal(t, 4, oLogger.FilterMessageSnippet("auto ack is turned on, message acknowledged").Len())
}
22 changes: 0 additions & 22 deletions tests/php_test_files/jobs/jobs_bad_resp.php

This file was deleted.

47 changes: 0 additions & 47 deletions tests/php_test_files/jobs/jobs_create_memory.php

This file was deleted.

5 changes: 0 additions & 5 deletions tests/php_test_files/jobs/jobs_err.php
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
<?php

/**
* @var Goridge\RelayInterface $relay
*/

use Spiral\Goridge;
use Spiral\RoadRunner;
use Spiral\Goridge\StreamRelay;
Expand Down
5 changes: 0 additions & 5 deletions tests/php_test_files/jobs/jobs_ok.php
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
<?php

/**
* @var Goridge\RelayInterface $relay
*/

use Spiral\Goridge;
use Spiral\RoadRunner;
use Spiral\Goridge\StreamRelay;
Expand Down
7 changes: 1 addition & 6 deletions tests/php_test_files/jobs/jobs_ok_pq.php
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
<?php

/**
* @var Goridge\RelayInterface $relay
*/

use Spiral\Goridge;
use Spiral\RoadRunner;
use Spiral\Goridge\StreamRelay;
Expand All @@ -17,7 +12,7 @@

while ($task = $consumer->waitTask()) {
try {
sleep(15);
sleep(15);
$task->complete();
} catch (\Throwable $e) {
$rr->error((string)$e);
Expand Down
28 changes: 0 additions & 28 deletions tests/php_test_files/jobs/jobs_ok_queue_name_exist.php

This file was deleted.

25 changes: 0 additions & 25 deletions tests/php_test_files/jobs/jobs_ok_sleep1.php

This file was deleted.

25 changes: 0 additions & 25 deletions tests/php_test_files/jobs/jobs_ok_slow.php

This file was deleted.

29 changes: 0 additions & 29 deletions tests/php_test_files/jobs/jobs_ok_slow_rand.php

This file was deleted.

23 changes: 0 additions & 23 deletions tests/php_test_files/jobs/jobs_send.php

This file was deleted.

0 comments on commit c69ad8b

Please sign in to comment.