From c69ad8bf947f47689fbf33472046e429273d9fc6 Mon Sep 17 00:00:00 2001 From: Kajetan Date: Sat, 10 Feb 2024 15:46:23 +0100 Subject: [PATCH] Clean up --- pubsubjobs/driver.go | 21 ++++++--- pubsubjobs/listener.go | 10 +--- tests/jobs_test.go | 2 +- tests/php_test_files/jobs/jobs_bad_resp.php | 22 --------- .../jobs/jobs_create_memory.php | 47 ------------------- tests/php_test_files/jobs/jobs_err.php | 5 -- tests/php_test_files/jobs/jobs_ok.php | 5 -- tests/php_test_files/jobs/jobs_ok_pq.php | 7 +-- .../jobs/jobs_ok_queue_name_exist.php | 28 ----------- tests/php_test_files/jobs/jobs_ok_sleep1.php | 25 ---------- tests/php_test_files/jobs/jobs_ok_slow.php | 25 ---------- .../php_test_files/jobs/jobs_ok_slow_rand.php | 29 ------------ tests/php_test_files/jobs/jobs_send.php | 23 --------- 13 files changed, 18 insertions(+), 231 deletions(-) delete mode 100644 tests/php_test_files/jobs/jobs_bad_resp.php delete mode 100644 tests/php_test_files/jobs/jobs_create_memory.php delete mode 100644 tests/php_test_files/jobs/jobs_ok_queue_name_exist.php delete mode 100644 tests/php_test_files/jobs/jobs_ok_sleep1.php delete mode 100644 tests/php_test_files/jobs/jobs_ok_slow.php delete mode 100644 tests/php_test_files/jobs/jobs_ok_slow_rand.php delete mode 100644 tests/php_test_files/jobs/jobs_send.php diff --git a/pubsubjobs/driver.go b/pubsubjobs/driver.go index 77ad086..0e723ee 100644 --- a/pubsubjobs/driver.go +++ b/pubsubjobs/driver.go @@ -58,7 +58,7 @@ type Driver struct { client *pubsub.Client stopped uint64 - pauseCh chan struct{} + stopCh chan struct{} } // FromConfig initializes google_pub_sub_driver_ pipeline @@ -103,7 +103,7 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pip skipDeclare: conf.SkipTopicDeclaration, topic: conf.Topic, pq: pq, - pauseCh: make(chan struct{}, 1), + stopCh: make(chan struct{}, 2), cond: sync.Cond{L: &sync.Mutex{}}, msgInFlightLimit: ptr(conf.Prefetch), msgInFlight: ptr(int64(0)), @@ -156,7 +156,7 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap. tracer: tracer, log: log, pq: pq, - pauseCh: make(chan struct{}, 1), + stopCh: make(chan struct{}, 2), skipDeclare: pipe.Bool(skipTopicDeclaration, false), topic: pipe.String(topic, "default"), cond: sync.Cond{L: &sync.Mutex{}}, @@ -184,10 +184,17 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap. } func (d *Driver) Push(ctx context.Context, jb jobs.Message) error { + const op = errors.Op("google_pub_sub_push") // check if the pipeline registered ctx, span := trace.SpanFromContext(ctx).TracerProvider().Tracer(tracerName).Start(ctx, "google_pub_sub_push") defer span.End() + // load atomic value + pipe := *d.pipeline.Load() + if pipe.Name() != jb.GroupID() { + return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.GroupID(), pipe.Name())) + } + job := fromJob(jb) data, err := json.Marshal(job.Metadata) @@ -274,7 +281,7 @@ func (d *Driver) Pause(ctx context.Context, p string) error { } // stop consume - d.pauseCh <- struct{}{} + d.stopCh <- struct{}{} // if blocked, let 1 item to pass to unblock the listener and close the pipe d.cond.Signal() @@ -333,7 +340,7 @@ func (d *Driver) Stop(ctx context.Context) error { // if blocked, let 1 item to pass to unblock the listener and close the pipe d.cond.Signal() - d.pauseCh <- struct{}{} + d.stopCh <- struct{}{} } d.log.Debug("pipeline was stopped", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", time.Now().UTC()), zap.Duration("elapsed", time.Since(start))) @@ -357,8 +364,8 @@ func (d *Driver) manageTopic(ctx context.Context) error { } _, err = d.client.CreateSubscription(ctx, d.sub, pubsub.SubscriptionConfig{ - Topic: topic, - AckDeadline: 30 * time.Second, + Topic: topic, + AckDeadline: 10 * time.Minute, }) if err != nil { if !strings.Contains(err.Error(), "Subscription already exists") { diff --git a/pubsubjobs/listener.go b/pubsubjobs/listener.go index 41c3bac..6803ebf 100644 --- a/pubsubjobs/listener.go +++ b/pubsubjobs/listener.go @@ -13,7 +13,7 @@ func (d *Driver) listen(ctx context.Context) { go func() { for { select { - case <-d.pauseCh: + case <-d.stopCh: d.log.Debug("listener was stopped") return default: @@ -30,13 +30,7 @@ func (d *Driver) listen(ctx context.Context) { ctxspan, span := d.tracer.Tracer(tracerName).Start(d.prop.Extract(context.Background(), propagation.HeaderCarrier(item.Metadata)), "google_pub_sub_listener") if item.Options.AutoAck { - _, err := message.AckWithResult().Get(ctx) - if err != nil { - d.log.Error("message acknowledge", zap.Error(err)) - span.RecordError(err) - span.End() - return - } + message.Ack() d.log.Debug("auto ack is turned on, message acknowledged") } diff --git a/tests/jobs_test.go b/tests/jobs_test.go index 1f9213c..ca28daa 100644 --- a/tests/jobs_test.go +++ b/tests/jobs_test.go @@ -456,5 +456,5 @@ func TestAutoAck(t *testing.T) { stopCh <- struct{}{} wg.Wait() - require.Equal(t, 2, oLogger.FilterMessageSnippet("auto ack is turned on, message acknowledged").Len()) + require.Equal(t, 4, oLogger.FilterMessageSnippet("auto ack is turned on, message acknowledged").Len()) } diff --git a/tests/php_test_files/jobs/jobs_bad_resp.php b/tests/php_test_files/jobs/jobs_bad_resp.php deleted file mode 100644 index 6233246..0000000 --- a/tests/php_test_files/jobs/jobs_bad_resp.php +++ /dev/null @@ -1,22 +0,0 @@ -waitPayload()) { - try { - $rr->respond(new RoadRunner\Payload('foo')); - } catch (\Throwable $e) { - $rr->error((string)$e); - } -} diff --git a/tests/php_test_files/jobs/jobs_create_memory.php b/tests/php_test_files/jobs/jobs_create_memory.php deleted file mode 100644 index 3b56d69..0000000 --- a/tests/php_test_files/jobs/jobs_create_memory.php +++ /dev/null @@ -1,47 +0,0 @@ -create(new MemoryCreateInfo( - name: 'example', - priority: 10, - prefetch: 100, -)); - -$queue1->resume(); - -// Create task prototype with default headers -$task1 = $queue1->create('ping', '{"site": "https://example.com"}') // Create task with "echo" name - ->withHeader('attempts', 4) // Number of attempts to execute the task - ->withHeader('retry-delay', 10); // Delay between attempts - -// Push "echo" task to the queue -$task1 = $queue1->dispatch($task1); - -// Select "local" pipeline from jobs -$queue2 = $jobs->connect('local'); -$queue2->resume(); - -// Create task prototype with default headers -$task = $queue2->create('ping', '{"site": "https://example.com"}') // Create task with "echo" name - ->withHeader('attempts', 4) // Number of attempts to execute the task - ->withHeader('retry-delay', 10); // Delay between attempts - -// Push "echo" task to the queue -$task = $queue2->dispatch($task); - -$consumer = new Spiral\RoadRunner\Jobs\Consumer(); - -while ($task = $consumer->waitTask()) { - $task->complete(); -} diff --git a/tests/php_test_files/jobs/jobs_err.php b/tests/php_test_files/jobs/jobs_err.php index 73509dc..6f5420c 100644 --- a/tests/php_test_files/jobs/jobs_err.php +++ b/tests/php_test_files/jobs/jobs_err.php @@ -1,9 +1,4 @@ waitTask()) { try { - sleep(15); + sleep(15); $task->complete(); } catch (\Throwable $e) { $rr->error((string)$e); diff --git a/tests/php_test_files/jobs/jobs_ok_queue_name_exist.php b/tests/php_test_files/jobs/jobs_ok_queue_name_exist.php deleted file mode 100644 index a51c488..0000000 --- a/tests/php_test_files/jobs/jobs_ok_queue_name_exist.php +++ /dev/null @@ -1,28 +0,0 @@ -waitTask()) { - try { - if ('unknown' === $task->getQueue()) { - throw new RuntimeException('Queue name was not found'); - } - - $task->complete(); - } catch (\Throwable $e) { - $rr->error((string)$e); - } -} diff --git a/tests/php_test_files/jobs/jobs_ok_sleep1.php b/tests/php_test_files/jobs/jobs_ok_sleep1.php deleted file mode 100644 index 2894b6e..0000000 --- a/tests/php_test_files/jobs/jobs_ok_sleep1.php +++ /dev/null @@ -1,25 +0,0 @@ -waitTask()) { - try { - sleep(1); - $task->complete(); - } catch (\Throwable $e) { - $rr->error((string)$e); - } -} diff --git a/tests/php_test_files/jobs/jobs_ok_slow.php b/tests/php_test_files/jobs/jobs_ok_slow.php deleted file mode 100644 index cc06dab..0000000 --- a/tests/php_test_files/jobs/jobs_ok_slow.php +++ /dev/null @@ -1,25 +0,0 @@ -waitTask()) { - try { - sleep(60); - $task->complete(); - } catch (\Throwable $e) { - $rr->error((string)$e); - } -} diff --git a/tests/php_test_files/jobs/jobs_ok_slow_rand.php b/tests/php_test_files/jobs/jobs_ok_slow_rand.php deleted file mode 100644 index 52a260c..0000000 --- a/tests/php_test_files/jobs/jobs_ok_slow_rand.php +++ /dev/null @@ -1,29 +0,0 @@ -waitTask()) { - try { - $val = random_int(0, 1000); - if ($val > 995) { - sleep(60); - } - - $task->complete(); - } catch (\Throwable $e) { - $rr->error((string)$e); - } -} diff --git a/tests/php_test_files/jobs/jobs_send.php b/tests/php_test_files/jobs/jobs_send.php deleted file mode 100644 index 10f971c..0000000 --- a/tests/php_test_files/jobs/jobs_send.php +++ /dev/null @@ -1,23 +0,0 @@ -connect('test-1'); - -$queue->dispatch( - $queue->create( - 'my-name', - ['foo' => 'bar'], - new KafkaOptions( - topic: 'mytopic', - offset: PartitionOffset::OFFSET_NEWEST - ) - ) -);