From 532339e4ef040b9a6246e4673e46953bc8384f57 Mon Sep 17 00:00:00 2001 From: Namoshek Date: Sat, 25 Nov 2023 20:06:43 +0100 Subject: [PATCH] Implement delivery of retain flag (#169) * Implement delivery of retain flag * Randomize tests to prevent overlaps and ensure delivery of messages * Update SonarQube build step --- .github/workflows/tests.yml | 9 +- src/Message.php | 13 +- .../Mqtt31MessageProcessor.php | 20 +- src/MqttClient.php | 7 +- tests/Feature/PublishSubscribeTest.php | 186 ++++++++++++++++-- 5 files changed, 203 insertions(+), 32 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index c3301a2..8143bf4 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -27,13 +27,6 @@ jobs: with: fetch-depth: 0 - - name: Setup Java 11 (for keytool) - uses: actions/setup-java@v3 - with: - distribution: 'zulu' - java-version: '11' - java-package: 'jre' - - name: Setup PHP ${{ matrix.php-version }} uses: shivammathur/setup-php@v2 with: @@ -140,7 +133,7 @@ jobs: sed -i "s|$GITHUB_WORKSPACE|/github/workspace|g" phpunit.report-junit.xml - name: Run SonarQube analysis - uses: sonarsource/sonarcloud-github-action@v1.9 + uses: sonarsource/sonarcloud-github-action@v2.0.2 if: matrix.run-sonarqube-analysis env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/src/Message.php b/src/Message.php index 14f4995..40ee9ad 100644 --- a/src/Message.php +++ b/src/Message.php @@ -18,6 +18,7 @@ class Message { private MessageType $type; private int $qualityOfService; + private bool $retained; private ?int $messageId = null; private ?string $topic = null; private ?string $content = null; @@ -30,11 +31,13 @@ class Message * * @param MessageType $type * @param int $qualityOfService + * @param bool $retained */ - public function __construct(MessageType $type, int $qualityOfService = 0) + public function __construct(MessageType $type, int $qualityOfService = 0, bool $retained = false) { $this->type = $type; $this->qualityOfService = $qualityOfService; + $this->retained = $retained; } /** @@ -53,6 +56,14 @@ public function getQualityOfService(): int return $this->qualityOfService; } + /** + * @return bool + */ + public function getRetained(): bool + { + return $this->retained; + } + /** * @return int|null */ diff --git a/src/MessageProcessors/Mqtt31MessageProcessor.php b/src/MessageProcessors/Mqtt31MessageProcessor.php index 7c2c5cb..e8d43de 100644 --- a/src/MessageProcessors/Mqtt31MessageProcessor.php +++ b/src/MessageProcessors/Mqtt31MessageProcessor.php @@ -419,8 +419,9 @@ public function buildPublishCompleteMessage(int $messageId): string public function parseAndValidateMessage(string $message): ?Message { $qualityOfService = 0; + $retained = false; $data = ''; - $result = $this->tryDecodeMessage($message, $command, $qualityOfService, $data); + $result = $this->tryDecodeMessage($message, $command, $qualityOfService, $retained, $data); if ($result === false) { throw new InvalidMessageException('The passed message could not be decoded.'); @@ -440,7 +441,7 @@ public function parseAndValidateMessage(string $message): ?Message throw new ProtocolViolationException('Unexpected connection acknowledgement.'); case 0x03: - return $this->parseAndValidatePublishMessage($data, $qualityOfService); + return $this->parseAndValidatePublishMessage($data, $qualityOfService, $retained); case 0x04: return $this->parseAndValidatePublishAcknowledgementMessage($data); @@ -484,10 +485,17 @@ public function parseAndValidateMessage(string $message): ?Message * @param string $message * @param int|null $command * @param int|null $qualityOfService + * @param bool $retained * @param string|null $data * @return bool */ - protected function tryDecodeMessage(string $message, int &$command = null, int &$qualityOfService = null, string &$data = null): bool + protected function tryDecodeMessage( + string $message, + int &$command = null, + int &$qualityOfService = null, + bool &$retained = false, + string &$data = null + ): bool { // If we received no input, we can return immediately without doing work. if (strlen($message) === 0) { @@ -504,6 +512,7 @@ protected function tryDecodeMessage(string $message, int &$command = null, int & $byte = $message[0]; $command = (int) (ord($byte) / 16); $qualityOfService = (ord($byte) & 0x06) >> 1; + $retained = (bool) (ord($byte) & 0x01); // Read the second byte of a message (remaining length). // If the continuation bit (8) is set on the length byte, another byte will be read as length. @@ -546,15 +555,16 @@ protected function tryDecodeMessage(string $message, int &$command = null, int & * * @param string $data * @param int $qualityOfServiceLevel + * @param bool $retained * @return Message|null */ - protected function parseAndValidatePublishMessage(string $data, int $qualityOfServiceLevel): ?Message + protected function parseAndValidatePublishMessage(string $data, int $qualityOfServiceLevel, bool $retained): ?Message { $topicLength = (ord($data[0]) << 8) + ord($data[1]); $topic = substr($data, 2, $topicLength); $content = substr($data, ($topicLength + 2)); - $message = new Message(MessageType::PUBLISH(), $qualityOfServiceLevel); + $message = new Message(MessageType::PUBLISH(), $qualityOfServiceLevel, $retained); if ($qualityOfServiceLevel > self::QOS_AT_MOST_ONCE) { if (strlen($content) < 2) { diff --git a/src/MqttClient.php b/src/MqttClient.php index a336a0d..45a2279 100644 --- a/src/MqttClient.php +++ b/src/MqttClient.php @@ -757,7 +757,7 @@ protected function handleMessage(Message $message): void $message->getTopic(), $message->getContent(), 2, - false + $message->getRetained() ); $this->repository->addPendingIncomingMessage($pendingMessage); } catch (PendingMessageAlreadyExistsException $e) { @@ -772,7 +772,7 @@ protected function handleMessage(Message $message): void } // For QoS 0 and QoS 1 we can deliver right away. - $this->deliverPublishedMessage($message->getTopic(), $message->getContent(), $message->getQualityOfService()); + $this->deliverPublishedMessage($message->getTopic(), $message->getContent(), $message->getQualityOfService(), $message->getRetained()); return; } @@ -821,7 +821,8 @@ protected function handleMessage(Message $message): void $this->deliverPublishedMessage( $pendingMessage->getTopicName(), $pendingMessage->getMessage(), - $pendingMessage->getQualityOfServiceLevel() + $pendingMessage->getQualityOfServiceLevel(), + $pendingMessage->wantsToBeRetained() ); $this->repository->removePendingIncomingMessage($message->getMessageId()); diff --git a/tests/Feature/PublishSubscribeTest.php b/tests/Feature/PublishSubscribeTest.php index a11b559..3c76dd3 100644 --- a/tests/Feature/PublishSubscribeTest.php +++ b/tests/Feature/PublishSubscribeTest.php @@ -20,22 +20,32 @@ class PublishSubscribeTest extends TestCase { public function publishSubscribeData(): array { - return [ - [false, 'test/foo/bar/baz', 'test/foo/bar/baz', 'hello world', []], - [false, 'test/foo/bar/+', 'test/foo/bar/baz', 'hello world', ['baz']], - [false, 'test/foo/+/baz', 'test/foo/bar/baz', 'hello world', ['bar']], - [false, 'test/foo/#', 'test/foo/bar/baz', 'hello world', ['bar/baz']], - [false, 'test/foo/+/bar/#', 'test/foo/my/bar/baz', 'hello world', ['my', 'baz']], - [false, 'test/foo/+/bar/#', 'test/foo/my/bar/baz/blub', 'hello world', ['my', 'baz/blub']], - [false, 'test/foo/bar/baz', 'test/foo/bar/baz', random_bytes(2 * 1024 * 1024), []], // 2MB message - [true, 'test/foo/bar/baz', 'test/foo/bar/baz', 'hello world', []], - [true, 'test/foo/bar/+', 'test/foo/bar/baz', 'hello world', ['baz']], - [true, 'test/foo/+/baz', 'test/foo/bar/baz', 'hello world', ['bar']], - [true, 'test/foo/#', 'test/foo/bar/baz', 'hello world', ['bar/baz']], - [true, 'test/foo/+/bar/#', 'test/foo/my/bar/baz', 'hello world', ['my', 'baz']], - [true, 'test/foo/+/bar/#', 'test/foo/my/bar/baz/blub', 'hello world', ['my', 'baz/blub']], - [true, 'test/foo/bar/baz', 'test/foo/bar/baz', random_bytes(2 * 1024 * 1024), []], // 2MB message + $data = [ + [false, 'foo/bar/baz', 'foo/bar/baz', 'hello world', []], + [false, 'foo/bar/+', 'foo/bar/baz', 'hello world', ['baz']], + [false, 'foo/+/baz', 'foo/bar/baz', 'hello world', ['bar']], + [false, 'foo/#', 'foo/bar/baz', 'hello world', ['bar/baz']], + [false, 'foo/+/bar/#', 'foo/my/bar/baz', 'hello world', ['my', 'baz']], + [false, 'foo/+/bar/#', 'foo/my/bar/baz/blub', 'hello world', ['my', 'baz/blub']], + [false, 'foo/bar/baz', 'foo/bar/baz', random_bytes(2 * 1024 * 1024), []], // 2MB message + [true, 'foo/bar/baz', 'foo/bar/baz', 'hello world', []], + [true, 'foo/bar/+', 'foo/bar/baz', 'hello world', ['baz']], + [true, 'foo/+/baz', 'foo/bar/baz', 'hello world', ['bar']], + [true, 'foo/#', 'foo/bar/baz', 'hello world', ['bar/baz']], + [true, 'foo/+/bar/#', 'foo/my/bar/baz', 'hello world', ['my', 'baz']], + [true, 'foo/+/bar/#', 'foo/my/bar/baz/blub', 'hello world', ['my', 'baz/blub']], + [true, 'foo/bar/baz', 'foo/bar/baz', random_bytes(2 * 1024 * 1024), []], // 2MB message ]; + + // Because our tests are run against a real MQTT broker and some messages are retained, + // we need to prevent false-positives by giving each test case its own 'test space' using a random prefix. + for ($i = 0; $i < count($data); $i++) { + $prefix = 'test/' . uniqid('', true) . '/'; + $data[$i][1] = $prefix . $data[$i][1]; + $data[$i][2] = $prefix . $data[$i][2]; + } + + return $data; } /** @@ -84,6 +94,57 @@ function (string $topic, string $message, bool $retained, array $wildcards) use $subscriber->disconnect(); } + /** + * @dataProvider publishSubscribeData + */ + public function test_publishing_and_subscribing_using_quality_of_service_0_with_message_retention_works_as_intended( + bool $useBlockingSocket, + string $subscriptionTopicFilter, + string $publishTopic, + string $publishMessage, + array $matchedTopicWildcards + ): void + { + // We publish a message from the first client, which disconnects before the other client even subscribes. + $publisher = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'publisher'); + $publisher->connect(null, true); + + $publisher->publish($publishTopic, $publishMessage, 0, true); + + $publisher->disconnect(); + + // Because we need to make sure the message reached the broker, we delay the execution for a short period (100ms) intentionally. + // With higher QoS, this is replaced by awaiting delivery of the message. + usleep(100_000); + + // We connect and subscribe to a topic using the second client. + $connectionSettings = (new ConnectionSettings()) + ->useBlockingSocket($useBlockingSocket); + + $subscriber = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'subscriber'); + $subscriber->connect($connectionSettings, true); + + $subscriber->subscribe( + $subscriptionTopicFilter, + function (string $topic, string $message, bool $retained, array $wildcards) use ($subscriber, $publishTopic, $publishMessage, $matchedTopicWildcards) { + // By asserting something here, we will avoid a no-assertions-in-test warning, making the test pass. + $this->assertEquals($publishTopic, $topic); + $this->assertEquals($publishMessage, $message); + $this->assertTrue($retained); + $this->assertEquals($matchedTopicWildcards, $wildcards); + + $subscriber->interrupt(); // This allows us to exit the test as soon as possible. + }, + 0 + ); + + // Then we loop on the subscriber to (hopefully) receive the published message. + $subscriber->loop(true); + + // Finally, we disconnect for a graceful shutdown on the broker side. + $subscriber->disconnect(); + } + /** * @dataProvider publishSubscribeData */ @@ -130,6 +191,54 @@ function (string $topic, string $message, bool $retained, array $wildcards) use $subscriber->disconnect(); } + /** + * @dataProvider publishSubscribeData + */ + public function test_publishing_and_subscribing_using_quality_of_service_1_with_message_retention_works_as_intended( + bool $useBlockingSocket, + string $subscriptionTopicFilter, + string $publishTopic, + string $publishMessage, + array $matchedTopicWildcards + ): void + { + // We publish a message from the first client, which disconnects before the other client even subscribes. + $publisher = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'publisher'); + $publisher->connect(null, true); + + $publisher->publish($publishTopic, $publishMessage, 1, true); + $publisher->loop(true, true); + + $publisher->disconnect(); + + // We connect and subscribe to a topic using the second client. + $connectionSettings = (new ConnectionSettings()) + ->useBlockingSocket($useBlockingSocket); + + $subscriber = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'subscriber'); + $subscriber->connect($connectionSettings, true); + + $subscriber->subscribe( + $subscriptionTopicFilter, + function (string $topic, string $message, bool $retained, array $wildcards) use ($subscriber, $publishTopic, $publishMessage, $matchedTopicWildcards) { + // By asserting something here, we will avoid a no-assertions-in-test warning, making the test pass. + $this->assertEquals($publishTopic, $topic); + $this->assertEquals($publishMessage, $message); + $this->assertTrue($retained); + $this->assertEquals($matchedTopicWildcards, $wildcards); + + $subscriber->interrupt(); // This allows us to exit the test as soon as possible. + }, + 1 + ); + + // Then we loop on the subscriber to (hopefully) receive the published message. + $subscriber->loop(true); + + // Finally, we disconnect for a graceful shutdown on the broker side. + $subscriber->disconnect(); + } + /** * @dataProvider publishSubscribeData */ @@ -176,6 +285,53 @@ public function test_publishing_and_subscribing_using_quality_of_service_2_works $subscriber->disconnect(); } + /** + * @dataProvider publishSubscribeData + */ + public function test_publishing_and_subscribing_using_quality_of_service_2_with_message_retention_works_as_intended( + bool $useBlockingSocket, + string $subscriptionTopicFilter, + string $publishTopic, + string $publishMessage, + array $matchedTopicWildcards + ): void + { + // We publish a message from the first client. The loop is called until all QoS 2 handshakes are done. + $publisher = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'publisher'); + $publisher->connect(null, true); + + $publisher->publish($publishTopic, $publishMessage, 2, true); + $publisher->loop(true, true); + + $publisher->disconnect(); + + // We connect and subscribe to a topic using the second client. + $connectionSettings = (new ConnectionSettings()) + ->useBlockingSocket($useBlockingSocket); + + $subscriber = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'subscriber'); + $subscriber->connect($connectionSettings, true); + + $subscription = function (string $topic, string $message, bool $retained, array $wildcards) use ($subscriber, $subscriptionTopicFilter, $publishTopic, $publishMessage, $matchedTopicWildcards) { + // By asserting something here, we will avoid a no-assertions-in-test warning, making the test pass. + $this->assertEquals($publishTopic, $topic); + $this->assertEquals($publishMessage, $message); + $this->assertTrue($retained); + $this->assertEquals($matchedTopicWildcards, $wildcards); + + $subscriber->unsubscribe($subscriptionTopicFilter); + $subscriber->interrupt(); // This allows us to exit the test as soon as possible. + }; + + $subscriber->subscribe($subscriptionTopicFilter, $subscription, 2); + + // Then we loop on the subscriber to (hopefully) receive the published message until the receive handshake is done. + $subscriber->loop(true, true); + + // Finally, we disconnect for a graceful shutdown on the broker side. + $subscriber->disconnect(); + } + public function test_unsubscribe_stops_receiving_messages_on_topic(): void { // We connect and subscribe to a topic using the first client.