From 9e31c4c47eedc765ae35549a1ea461b8f559b0bd Mon Sep 17 00:00:00 2001 From: geyer-za Date: Wed, 13 Mar 2019 12:04:34 +0200 Subject: [PATCH 1/7] pull return immediately, with sleep --- src/GoogleCloudPubSubAdapter.php | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/GoogleCloudPubSubAdapter.php b/src/GoogleCloudPubSubAdapter.php index 8527efc..03d4c81 100644 --- a/src/GoogleCloudPubSubAdapter.php +++ b/src/GoogleCloudPubSubAdapter.php @@ -198,8 +198,12 @@ public function subscribe($channel, callable $handler) 'timeoutMillis' => null, ], 'maxMessages' => $this->maxMessages, + 'returnImmediately' => true, ]); - + if ( ! $messages || count($messages) < 1) { + usleep(200000); + continue; + } foreach ($messages as $message) { /** @var Message $message */ $payload = Utils::unserializeMessagePayload($message->data()); From b8b32db46741dd814b99d4baf5321769ac1a09d1 Mon Sep 17 00:00:00 2001 From: geyer-za Date: Fri, 15 Mar 2019 15:16:36 +0200 Subject: [PATCH 2/7] Change to configurable flags --- src/GoogleCloudPubSubAdapter.php | 56 +++++++++++++++++++++++++++++--- 1 file changed, 52 insertions(+), 4 deletions(-) diff --git a/src/GoogleCloudPubSubAdapter.php b/src/GoogleCloudPubSubAdapter.php index 03d4c81..a25ffcd 100644 --- a/src/GoogleCloudPubSubAdapter.php +++ b/src/GoogleCloudPubSubAdapter.php @@ -40,6 +40,16 @@ class GoogleCloudPubSubAdapter implements PubSubAdapterInterface */ protected $maxMessages; + /** + * @var bool + */ + protected $returnImmediately; + + /** + * @var int + */ + protected $returnImmediatelyPause; + /** * @param PubSubClient $client * @param string $clientIdentifier @@ -54,7 +64,9 @@ public function __construct( $autoCreateTopics = true, $autoCreateSubscriptions = true, $backgroundBatching = false, - $maxMessages = 1000 + $maxMessages = 1000, + $returnImmediately = false, + $returnImmediatelyPause = 500000 ) { $this->client = $client; $this->clientIdentifier = $clientIdentifier; @@ -62,6 +74,8 @@ public function __construct( $this->autoCreateSubscriptions = $autoCreateSubscriptions; $this->backgroundBatching = $backgroundBatching; $this->maxMessages = $maxMessages; + $this->returnImmediately = $returnImmediately; + $this->returnImmediatelyPause = $returnImmediatelyPause; } /** @@ -140,6 +154,40 @@ public function areSubscriptionsAutoCreated() return $this->autoCreateSubscriptions; } + /** + * Set if a pull should return immediately if there are no messages + * @param bool $returnImmediately + */ + public function setReturnImmediately($returnImmediately) { + $this->returnImmediately = $returnImmediately; + } + + /** + * Return the return immediately configuration + * @return bool + */ + public function getReturnImmediately() { + return $this->returnImmediately; + } + + /** + * Set the amount of time to pause between attempts to pull messages if return immediately is enabled. + * Value is in microseconds + * + * @param $returnImmediatelyPause + */ + public function setReturnImmediatelyPause($returnImmediatelyPause) { + $this->returnImmediatelyPause = $returnImmediatelyPause; + } + + /** + * Return the return immediately pause configuration + * @return int + */ + public function getReturnImmediatelyPause() { + return $this->returnImmediatelyPause; + } + /** * Set whether or not background batching is enabled. * @@ -198,10 +246,10 @@ public function subscribe($channel, callable $handler) 'timeoutMillis' => null, ], 'maxMessages' => $this->maxMessages, - 'returnImmediately' => true, + 'returnImmediately' => $this->returnImmediately, ]); - if ( ! $messages || count($messages) < 1) { - usleep(200000); + if ($this->returnImmediately && empty($messages)) { + usleep($this->returnImmediatelyPause); continue; } foreach ($messages as $message) { From 85f22cd65c77d32502f69361a0dbb09478ff4861 Mon Sep 17 00:00:00 2001 From: geyer-za Date: Tue, 19 Mar 2019 12:01:00 +0200 Subject: [PATCH 3/7] add unit tests for returnImmediately --- tests/GoogleCloudPubSubAdapterTest.php | 103 +++++++++++++++++++++++++ 1 file changed, 103 insertions(+) diff --git a/tests/GoogleCloudPubSubAdapterTest.php b/tests/GoogleCloudPubSubAdapterTest.php index 149ff67..f95b159 100644 --- a/tests/GoogleCloudPubSubAdapterTest.php +++ b/tests/GoogleCloudPubSubAdapterTest.php @@ -63,6 +63,26 @@ public function testGetSetBackgroundBatching() $this->assertTrue($adapter->isBackgroundBatchingEnabled()); } + public function testGetSetReturnImmediately() + { + $client = Mockery::mock(PubSubClient::class); + $adapter = new GoogleCloudPubSubAdapter($client); + $this->assertFalse($adapter->getReturnImmediately()); + + $adapter->setReturnImmediately(true); + $this->assertTrue($adapter->getReturnImmediately()); + } + + public function testGetSetReturnImmediatelyPause() + { + $client = Mockery::mock(PubSubClient::class); + $adapter = new GoogleCloudPubSubAdapter($client); + $this->assertEquals(500000, $adapter->getReturnImmediatelyPause()); + + $adapter->setReturnImmediatelyPause(1000000); + $this->assertEquals(1000000, $adapter->getReturnImmediatelyPause()); + } + public function testPublishWhenTopicMustBeCreated() { $topic = Mockery::mock(Topic::class); @@ -320,6 +340,7 @@ public function testSubscribeWhenSubscriptionMustBeCreated() 'timeoutMillis' => null, ], 'maxMessages' => 1000, + 'returnImmediately' => false ]) ->once() ->andReturn($messageBatch1); @@ -336,6 +357,7 @@ public function testSubscribeWhenSubscriptionMustBeCreated() 'timeoutMillis' => null, ], 'maxMessages' => 1000, + 'returnImmediately' => false ]) ->once() ->andReturn($messageBatch2); @@ -397,6 +419,7 @@ public function testSubscribeWhenSubscriptionExists() 'timeoutMillis' => null, ], 'maxMessages' => 1000, + 'returnImmediately' => false ]) ->once() ->andReturn($messageBatch1); @@ -412,6 +435,7 @@ public function testSubscribeWhenSubscriptionExists() 'timeoutMillis' => null, ], 'maxMessages' => 1000, + 'returnImmediately' => false ]) ->once() ->andReturn($messageBatch2); @@ -471,6 +495,7 @@ public function testSubscribeWhenAutoTopicCreationIsDisabled() 'timeoutMillis' => null, ], 'maxMessages' => 1000, + 'returnImmediately' => false ]) ->once() ->andReturn($messageBatch1); @@ -486,6 +511,7 @@ public function testSubscribeWhenAutoTopicCreationIsDisabled() 'timeoutMillis' => null, ], 'maxMessages' => 1000, + 'returnImmediately' => false ]) ->once() ->andReturn($messageBatch2); @@ -521,4 +547,81 @@ public function testSubscribeWhenAutoTopicCreationIsDisabled() $adapter->subscribe('channel_name', [$handler1, 'handle']); } + + public function testSubscribeWhenReturnImmediatelyIsEnabled() + { + $message1 = new Message(['data' => '{"hello":"world"}'], ['ackId' => 1]); + $message2 = new Message(['data' => '"this is a string"'], ['ackId' => 2]); + $message3 = new Message(['data' => '"unsubscribe"'], ['ackId' => 3]); + + $messageBatch1 = [ + $message1, + $message2, + ]; + + $messageBatch2 = [ + $message3, + ]; + + $subscription = Mockery::mock(Subscription::class); + $subscription->shouldReceive('exists') + ->once() + ->andReturn(true); + $subscription->shouldNotHaveReceived('create'); + + $expectedPullOptions = [ + 'grpcOptions' => [ + 'timeoutMillis' => null, + ], + 'maxMessages' => 1000, + 'returnImmediately' => true + ]; + + $subscription->shouldReceive('pull') + ->with($expectedPullOptions) + ->once() + ->andReturn($messageBatch1); + $subscription->shouldReceive('acknowledge') + ->with($message1) + ->once(); + $subscription->shouldReceive('acknowledge') + ->with($message2) + ->once(); + + $subscription->shouldReceive('pull') + ->with($expectedPullOptions) + ->once() + ->andReturn($messageBatch2); + $subscription->shouldReceive('acknowledge') + ->with($message3) + ->once(); + + $topic = Mockery::mock(Topic::class); + $topic->shouldReceive('exists') + ->once() + ->andReturn(true); + $topic->shouldNotHaveReceived('create'); + $topic->shouldReceive('subscription') + ->with('default.channel_name') + ->once() + ->andReturn($subscription); + + $client = Mockery::mock(PubSubClient::class); + $client->shouldReceive('topic') + ->with('channel_name') + ->once() + ->andReturn($topic); + + $handler1 = Mockery::mock(\stdClass::class); + $handler1->shouldReceive('handle') + ->with(['hello' => 'world']) + ->once(); + $handler1->shouldReceive('handle') + ->with('this is a string') + ->once(); + + $adapter = new GoogleCloudPubSubAdapter($client); + $adapter->setReturnImmediately(true); + $adapter->subscribe('channel_name', [$handler1, 'handle']); + } } From 021391569f40f5fe05debbaf109acdce3051c386 Mon Sep 17 00:00:00 2001 From: geyer-za Date: Tue, 19 Mar 2019 12:12:05 +0200 Subject: [PATCH 4/7] better loop --- src/GoogleCloudPubSubAdapter.php | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/GoogleCloudPubSubAdapter.php b/src/GoogleCloudPubSubAdapter.php index a25ffcd..587eac9 100644 --- a/src/GoogleCloudPubSubAdapter.php +++ b/src/GoogleCloudPubSubAdapter.php @@ -174,10 +174,10 @@ public function getReturnImmediately() { * Set the amount of time to pause between attempts to pull messages if return immediately is enabled. * Value is in microseconds * - * @param $returnImmediatelyPause + * @param int $returnImmediatelyPause */ public function setReturnImmediatelyPause($returnImmediatelyPause) { - $this->returnImmediatelyPause = $returnImmediatelyPause; + $this->returnImmediatelyPause = (int) $returnImmediatelyPause; } /** @@ -239,6 +239,7 @@ public function subscribe($channel, callable $handler) $subscription = $this->getSubscriptionForChannel($channel); $isSubscriptionLoopActive = true; + $isPauseEnabled = $this->returnImmediately && ($this->returnImmediatelyPause > 0); while ($isSubscriptionLoopActive) { $messages = $subscription->pull([ @@ -248,7 +249,7 @@ public function subscribe($channel, callable $handler) 'maxMessages' => $this->maxMessages, 'returnImmediately' => $this->returnImmediately, ]); - if ($this->returnImmediately && empty($messages)) { + if ($isPauseEnabled && empty($messages)) { usleep($this->returnImmediatelyPause); continue; } From deec6e62828870cf2234a55362d4237eed955a03 Mon Sep 17 00:00:00 2001 From: geyer-za Date: Tue, 19 Mar 2019 12:14:28 +0200 Subject: [PATCH 5/7] better loop --- src/GoogleCloudPubSubAdapter.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/GoogleCloudPubSubAdapter.php b/src/GoogleCloudPubSubAdapter.php index 587eac9..b895ddd 100644 --- a/src/GoogleCloudPubSubAdapter.php +++ b/src/GoogleCloudPubSubAdapter.php @@ -75,7 +75,7 @@ public function __construct( $this->backgroundBatching = $backgroundBatching; $this->maxMessages = $maxMessages; $this->returnImmediately = $returnImmediately; - $this->returnImmediatelyPause = $returnImmediatelyPause; + $this->returnImmediatelyPause = (int) $returnImmediatelyPause; } /** From 3686792be600534b85c2dd17e99636f4f370b59c Mon Sep 17 00:00:00 2001 From: geyer-za Date: Tue, 19 Mar 2019 14:40:58 +0200 Subject: [PATCH 6/7] changelog --- changelog.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/changelog.md b/changelog.md index 5ab7401..ca772b4 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,9 @@ # Changelog +## 5.1.1 - 2019-03-19 + +* Add ability to use returnImmediately flag when pulling messages + ## 5.1.0 - 2019-02-28 * Bump up google/cloud requirement to ^0.95.0 From 70f32ba80e201c95f17a84f223fb55e9f18f6ba0 Mon Sep 17 00:00:00 2001 From: geyer-za Date: Tue, 19 Mar 2019 15:02:29 +0200 Subject: [PATCH 7/7] change to minor --- changelog.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.md b/changelog.md index ca772b4..e88b98f 100644 --- a/changelog.md +++ b/changelog.md @@ -1,6 +1,6 @@ # Changelog -## 5.1.1 - 2019-03-19 +## 5.2.0 - 2019-03-19 * Add ability to use returnImmediately flag when pulling messages