From 846f0de5c5713d947892998f868e6b28607fc0a0 Mon Sep 17 00:00:00 2001 From: Namoshek Date: Fri, 9 Dec 2022 18:02:12 +0100 Subject: [PATCH] Add connection setting for blocking mode of socket (#135) * Add connection setting for blocking mode of socket * Add warning that blocking socket is potentially dangerous --- README.md | 9 ++++++ src/ConnectionSettings.php | 30 +++++++++++++++++++ src/MqttClient.php | 2 +- tests/Feature/PublishSubscribeTest.php | 40 +++++++++++++++++++------- 4 files changed, 70 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 863da48..17dbb0e 100644 --- a/README.md +++ b/README.md @@ -142,6 +142,15 @@ $connectionSettings = (new \PhpMqtt\Client\ConnectionSettings) // The password used for authentication when connecting to the broker. ->setPassword(null) + // Whether to use a blocking socket or not. By default, the socket is non-blocking, + // which is required when using subscriptions and/or {@see MqttClient::loop()}. + // In rare cases, it might be required to use a blocking socket though. One such example + // is when sending large messages (e.g. binaries) and the broker has a limited receive buffer. + // + // Note: When using a blocking socket, the MQTT client can get stuck if the socket is broken + // or when the broker does not consume the sent data fast enough. Use with caution. + ->useBlockingSocket(false) + // The connect timeout defines the maximum amount of seconds the client will try to establish // a socket connection with the broker. The value cannot be less than 1 second. ->setConnectTimeout(60) diff --git a/src/ConnectionSettings.php b/src/ConnectionSettings.php index e0c4af6..a448e16 100644 --- a/src/ConnectionSettings.php +++ b/src/ConnectionSettings.php @@ -13,6 +13,7 @@ class ConnectionSettings { private ?string $username = null; private ?string $password = null; + private bool $useBlockingSocket = false; private int $connectTimeout = 60; private int $socketTimeout = 5; private int $resendTimeout = 10; @@ -80,6 +81,35 @@ public function getPassword(): ?string return $this->password; } + /** + * Whether to use a blocking socket or not. By default, the socket is non-blocking, + * which is required when using subscriptions and/or {@see MqttClient::loop()}. + * In rare cases, it might be required to use a blocking socket though. One such example + * is when sending large messages (e.g. binaries) and the broker has a limited receive buffer. + * + * Note: When using a blocking socket, the MQTT client can get stuck if the socket is broken + * or when the broker does not consume the sent data fast enough. Use with caution. + * + * @param bool $useBlockingSocket + * @return ConnectionSettings + */ + public function useBlockingSocket(bool $useBlockingSocket): ConnectionSettings + { + $copy = clone $this; + + $copy->useBlockingModeForSocket = $useBlockingSocket; + + return $copy; + } + + /** + * @return bool + */ + public function shouldUseBlockingSocket(): bool + { + return $this->useBlockingSocket; + } + /** * The connect timeout is the maximum amount of seconds the client will try to establish * a socket connection with the broker. The value cannot be less than 1 second. diff --git a/src/MqttClient.php b/src/MqttClient.php index 4070568..4134ad2 100644 --- a/src/MqttClient.php +++ b/src/MqttClient.php @@ -290,7 +290,7 @@ protected function establishSocketConnection(): void } stream_set_timeout($socket, $this->settings->getSocketTimeout()); - stream_set_blocking($socket, false); + stream_set_blocking($socket, $this->settings->shouldUseBlockingSocket()); $this->logger->debug('Socket opened and ready to use.'); diff --git a/tests/Feature/PublishSubscribeTest.php b/tests/Feature/PublishSubscribeTest.php index cc46bfe..a11b559 100644 --- a/tests/Feature/PublishSubscribeTest.php +++ b/tests/Feature/PublishSubscribeTest.php @@ -7,6 +7,7 @@ namespace Tests\Feature; +use PhpMqtt\Client\ConnectionSettings; use PhpMqtt\Client\MqttClient; use Tests\TestCase; @@ -20,13 +21,20 @@ class PublishSubscribeTest extends TestCase public function publishSubscribeData(): array { return [ - ['test/foo/bar/baz', 'test/foo/bar/baz', 'hello world', []], - ['test/foo/bar/+', 'test/foo/bar/baz', 'hello world', ['baz']], - ['test/foo/+/baz', 'test/foo/bar/baz', 'hello world', ['bar']], - ['test/foo/#', 'test/foo/bar/baz', 'hello world', ['bar/baz']], - ['test/foo/+/bar/#', 'test/foo/my/bar/baz', 'hello world', ['my', 'baz']], - ['test/foo/+/bar/#', 'test/foo/my/bar/baz/blub', 'hello world', ['my', 'baz/blub']], - ['test/foo/bar/baz', 'test/foo/bar/baz', random_bytes(2 * 1024 * 1024), []], // 2MB message + [false, 'test/foo/bar/baz', 'test/foo/bar/baz', 'hello world', []], + [false, 'test/foo/bar/+', 'test/foo/bar/baz', 'hello world', ['baz']], + [false, 'test/foo/+/baz', 'test/foo/bar/baz', 'hello world', ['bar']], + [false, 'test/foo/#', 'test/foo/bar/baz', 'hello world', ['bar/baz']], + [false, 'test/foo/+/bar/#', 'test/foo/my/bar/baz', 'hello world', ['my', 'baz']], + [false, 'test/foo/+/bar/#', 'test/foo/my/bar/baz/blub', 'hello world', ['my', 'baz/blub']], + [false, 'test/foo/bar/baz', 'test/foo/bar/baz', random_bytes(2 * 1024 * 1024), []], // 2MB message + [true, 'test/foo/bar/baz', 'test/foo/bar/baz', 'hello world', []], + [true, 'test/foo/bar/+', 'test/foo/bar/baz', 'hello world', ['baz']], + [true, 'test/foo/+/baz', 'test/foo/bar/baz', 'hello world', ['bar']], + [true, 'test/foo/#', 'test/foo/bar/baz', 'hello world', ['bar/baz']], + [true, 'test/foo/+/bar/#', 'test/foo/my/bar/baz', 'hello world', ['my', 'baz']], + [true, 'test/foo/+/bar/#', 'test/foo/my/bar/baz/blub', 'hello world', ['my', 'baz/blub']], + [true, 'test/foo/bar/baz', 'test/foo/bar/baz', random_bytes(2 * 1024 * 1024), []], // 2MB message ]; } @@ -34,6 +42,7 @@ public function publishSubscribeData(): array * @dataProvider publishSubscribeData */ public function test_publishing_and_subscribing_using_quality_of_service_0_works_as_intended( + bool $useBlockingSocket, string $subscriptionTopicFilter, string $publishTopic, string $publishMessage, @@ -41,8 +50,11 @@ public function test_publishing_and_subscribing_using_quality_of_service_0_works ): void { // We connect and subscribe to a topic using the first client. + $connectionSettings = (new ConnectionSettings()) + ->useBlockingSocket($useBlockingSocket); + $subscriber = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'subscriber'); - $subscriber->connect(null, true); + $subscriber->connect($connectionSettings, true); $subscriber->subscribe( $subscriptionTopicFilter, @@ -76,6 +88,7 @@ function (string $topic, string $message, bool $retained, array $wildcards) use * @dataProvider publishSubscribeData */ public function test_publishing_and_subscribing_using_quality_of_service_1_works_as_intended( + bool $useBlockingSocket, string $subscriptionTopicFilter, string $publishTopic, string $publishMessage, @@ -83,8 +96,11 @@ public function test_publishing_and_subscribing_using_quality_of_service_1_works ): void { // We connect and subscribe to a topic using the first client. + $connectionSettings = (new ConnectionSettings()) + ->useBlockingSocket($useBlockingSocket); + $subscriber = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'subscriber'); - $subscriber->connect(null, true); + $subscriber->connect($connectionSettings, true); $subscriber->subscribe( $subscriptionTopicFilter, @@ -118,6 +134,7 @@ function (string $topic, string $message, bool $retained, array $wildcards) use * @dataProvider publishSubscribeData */ public function test_publishing_and_subscribing_using_quality_of_service_2_works_as_intended( + bool $useBlockingSocket, string $subscriptionTopicFilter, string $publishTopic, string $publishMessage, @@ -125,8 +142,11 @@ public function test_publishing_and_subscribing_using_quality_of_service_2_works ): void { // We connect and subscribe to a topic using the first client. + $connectionSettings = (new ConnectionSettings()) + ->useBlockingSocket($useBlockingSocket); + $subscriber = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'subscriber'); - $subscriber->connect(null, true); + $subscriber->connect($connectionSettings, true); $subscription = function (string $topic, string $message, bool $retained, array $wildcards) use ($subscriber, $subscriptionTopicFilter, $publishTopic, $publishMessage, $matchedTopicWildcards) { // By asserting something here, we will avoid a no-assertions-in-test warning, making the test pass.