Skip to content

Commit

Permalink
Add connection setting for blocking mode of socket (#135)
Browse files Browse the repository at this point in the history
* Add connection setting for blocking mode of socket

* Add warning that blocking socket is potentially dangerous
  • Loading branch information
Namoshek authored Dec 9, 2022
1 parent a0f8186 commit 846f0de
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 11 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@ $connectionSettings = (new \PhpMqtt\Client\ConnectionSettings)
// The password used for authentication when connecting to the broker.
->setPassword(null)

// Whether to use a blocking socket or not. By default, the socket is non-blocking,
// which is required when using subscriptions and/or {@see MqttClient::loop()}.
// In rare cases, it might be required to use a blocking socket though. One such example
// is when sending large messages (e.g. binaries) and the broker has a limited receive buffer.
//
// Note: When using a blocking socket, the MQTT client can get stuck if the socket is broken
// or when the broker does not consume the sent data fast enough. Use with caution.
->useBlockingSocket(false)

// The connect timeout defines the maximum amount of seconds the client will try to establish
// a socket connection with the broker. The value cannot be less than 1 second.
->setConnectTimeout(60)
Expand Down
30 changes: 30 additions & 0 deletions src/ConnectionSettings.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class ConnectionSettings
{
private ?string $username = null;
private ?string $password = null;
private bool $useBlockingSocket = false;
private int $connectTimeout = 60;
private int $socketTimeout = 5;
private int $resendTimeout = 10;
Expand Down Expand Up @@ -80,6 +81,35 @@ public function getPassword(): ?string
return $this->password;
}

/**
* Whether to use a blocking socket or not. By default, the socket is non-blocking,
* which is required when using subscriptions and/or {@see MqttClient::loop()}.
* In rare cases, it might be required to use a blocking socket though. One such example
* is when sending large messages (e.g. binaries) and the broker has a limited receive buffer.
*
* Note: When using a blocking socket, the MQTT client can get stuck if the socket is broken
* or when the broker does not consume the sent data fast enough. Use with caution.
*
* @param bool $useBlockingSocket
* @return ConnectionSettings
*/
public function useBlockingSocket(bool $useBlockingSocket): ConnectionSettings
{
$copy = clone $this;

$copy->useBlockingModeForSocket = $useBlockingSocket;

return $copy;
}

/**
* @return bool
*/
public function shouldUseBlockingSocket(): bool
{
return $this->useBlockingSocket;
}

/**
* The connect timeout is the maximum amount of seconds the client will try to establish
* a socket connection with the broker. The value cannot be less than 1 second.
Expand Down
2 changes: 1 addition & 1 deletion src/MqttClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ protected function establishSocketConnection(): void
}

stream_set_timeout($socket, $this->settings->getSocketTimeout());
stream_set_blocking($socket, false);
stream_set_blocking($socket, $this->settings->shouldUseBlockingSocket());

$this->logger->debug('Socket opened and ready to use.');

Expand Down
40 changes: 30 additions & 10 deletions tests/Feature/PublishSubscribeTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

namespace Tests\Feature;

use PhpMqtt\Client\ConnectionSettings;
use PhpMqtt\Client\MqttClient;
use Tests\TestCase;

Expand All @@ -20,29 +21,40 @@ class PublishSubscribeTest extends TestCase
public function publishSubscribeData(): array
{
return [
['test/foo/bar/baz', 'test/foo/bar/baz', 'hello world', []],
['test/foo/bar/+', 'test/foo/bar/baz', 'hello world', ['baz']],
['test/foo/+/baz', 'test/foo/bar/baz', 'hello world', ['bar']],
['test/foo/#', 'test/foo/bar/baz', 'hello world', ['bar/baz']],
['test/foo/+/bar/#', 'test/foo/my/bar/baz', 'hello world', ['my', 'baz']],
['test/foo/+/bar/#', 'test/foo/my/bar/baz/blub', 'hello world', ['my', 'baz/blub']],
['test/foo/bar/baz', 'test/foo/bar/baz', random_bytes(2 * 1024 * 1024), []], // 2MB message
[false, 'test/foo/bar/baz', 'test/foo/bar/baz', 'hello world', []],
[false, 'test/foo/bar/+', 'test/foo/bar/baz', 'hello world', ['baz']],
[false, 'test/foo/+/baz', 'test/foo/bar/baz', 'hello world', ['bar']],
[false, 'test/foo/#', 'test/foo/bar/baz', 'hello world', ['bar/baz']],
[false, 'test/foo/+/bar/#', 'test/foo/my/bar/baz', 'hello world', ['my', 'baz']],
[false, 'test/foo/+/bar/#', 'test/foo/my/bar/baz/blub', 'hello world', ['my', 'baz/blub']],
[false, 'test/foo/bar/baz', 'test/foo/bar/baz', random_bytes(2 * 1024 * 1024), []], // 2MB message
[true, 'test/foo/bar/baz', 'test/foo/bar/baz', 'hello world', []],
[true, 'test/foo/bar/+', 'test/foo/bar/baz', 'hello world', ['baz']],
[true, 'test/foo/+/baz', 'test/foo/bar/baz', 'hello world', ['bar']],
[true, 'test/foo/#', 'test/foo/bar/baz', 'hello world', ['bar/baz']],
[true, 'test/foo/+/bar/#', 'test/foo/my/bar/baz', 'hello world', ['my', 'baz']],
[true, 'test/foo/+/bar/#', 'test/foo/my/bar/baz/blub', 'hello world', ['my', 'baz/blub']],
[true, 'test/foo/bar/baz', 'test/foo/bar/baz', random_bytes(2 * 1024 * 1024), []], // 2MB message
];
}

/**
* @dataProvider publishSubscribeData
*/
public function test_publishing_and_subscribing_using_quality_of_service_0_works_as_intended(
bool $useBlockingSocket,
string $subscriptionTopicFilter,
string $publishTopic,
string $publishMessage,
array $matchedTopicWildcards
): void
{
// We connect and subscribe to a topic using the first client.
$connectionSettings = (new ConnectionSettings())
->useBlockingSocket($useBlockingSocket);

$subscriber = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'subscriber');
$subscriber->connect(null, true);
$subscriber->connect($connectionSettings, true);

$subscriber->subscribe(
$subscriptionTopicFilter,
Expand Down Expand Up @@ -76,15 +88,19 @@ function (string $topic, string $message, bool $retained, array $wildcards) use
* @dataProvider publishSubscribeData
*/
public function test_publishing_and_subscribing_using_quality_of_service_1_works_as_intended(
bool $useBlockingSocket,
string $subscriptionTopicFilter,
string $publishTopic,
string $publishMessage,
array $matchedTopicWildcards
): void
{
// We connect and subscribe to a topic using the first client.
$connectionSettings = (new ConnectionSettings())
->useBlockingSocket($useBlockingSocket);

$subscriber = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'subscriber');
$subscriber->connect(null, true);
$subscriber->connect($connectionSettings, true);

$subscriber->subscribe(
$subscriptionTopicFilter,
Expand Down Expand Up @@ -118,15 +134,19 @@ function (string $topic, string $message, bool $retained, array $wildcards) use
* @dataProvider publishSubscribeData
*/
public function test_publishing_and_subscribing_using_quality_of_service_2_works_as_intended(
bool $useBlockingSocket,
string $subscriptionTopicFilter,
string $publishTopic,
string $publishMessage,
array $matchedTopicWildcards
): void
{
// We connect and subscribe to a topic using the first client.
$connectionSettings = (new ConnectionSettings())
->useBlockingSocket($useBlockingSocket);

$subscriber = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'subscriber');
$subscriber->connect(null, true);
$subscriber->connect($connectionSettings, true);

$subscription = function (string $topic, string $message, bool $retained, array $wildcards) use ($subscriber, $subscriptionTopicFilter, $publishTopic, $publishMessage, $matchedTopicWildcards) {
// By asserting something here, we will avoid a no-assertions-in-test warning, making the test pass.
Expand Down

0 comments on commit 846f0de

Please sign in to comment.