Skip to content

Commit

Permalink
Implement delivery of retain flag (#169)
Browse files Browse the repository at this point in the history
* Implement delivery of retain flag

* Randomize tests to prevent overlaps and ensure delivery of messages

* Update SonarQube build step
  • Loading branch information
Namoshek authored Nov 25, 2023
1 parent 4d6ecec commit 532339e
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 32 deletions.
9 changes: 1 addition & 8 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,6 @@ jobs:
with:
fetch-depth: 0

- name: Setup Java 11 (for keytool)
uses: actions/setup-java@v3
with:
distribution: 'zulu'
java-version: '11'
java-package: 'jre'

- name: Setup PHP ${{ matrix.php-version }}
uses: shivammathur/setup-php@v2
with:
Expand Down Expand Up @@ -140,7 +133,7 @@ jobs:
sed -i "s|$GITHUB_WORKSPACE|/github/workspace|g" phpunit.report-junit.xml
- name: Run SonarQube analysis
uses: sonarsource/sonarcloud-github-action@v1.9
uses: sonarsource/sonarcloud-github-action@v2.0.2
if: matrix.run-sonarqube-analysis
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
Expand Down
13 changes: 12 additions & 1 deletion src/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class Message
{
private MessageType $type;
private int $qualityOfService;
private bool $retained;
private ?int $messageId = null;
private ?string $topic = null;
private ?string $content = null;
Expand All @@ -30,11 +31,13 @@ class Message
*
* @param MessageType $type
* @param int $qualityOfService
* @param bool $retained
*/
public function __construct(MessageType $type, int $qualityOfService = 0)
public function __construct(MessageType $type, int $qualityOfService = 0, bool $retained = false)
{
$this->type = $type;
$this->qualityOfService = $qualityOfService;
$this->retained = $retained;
}

/**
Expand All @@ -53,6 +56,14 @@ public function getQualityOfService(): int
return $this->qualityOfService;
}

/**
* @return bool
*/
public function getRetained(): bool
{
return $this->retained;
}

/**
* @return int|null
*/
Expand Down
20 changes: 15 additions & 5 deletions src/MessageProcessors/Mqtt31MessageProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,9 @@ public function buildPublishCompleteMessage(int $messageId): string
public function parseAndValidateMessage(string $message): ?Message
{
$qualityOfService = 0;
$retained = false;
$data = '';
$result = $this->tryDecodeMessage($message, $command, $qualityOfService, $data);
$result = $this->tryDecodeMessage($message, $command, $qualityOfService, $retained, $data);

if ($result === false) {
throw new InvalidMessageException('The passed message could not be decoded.');
Expand All @@ -440,7 +441,7 @@ public function parseAndValidateMessage(string $message): ?Message
throw new ProtocolViolationException('Unexpected connection acknowledgement.');

case 0x03:
return $this->parseAndValidatePublishMessage($data, $qualityOfService);
return $this->parseAndValidatePublishMessage($data, $qualityOfService, $retained);

case 0x04:
return $this->parseAndValidatePublishAcknowledgementMessage($data);
Expand Down Expand Up @@ -484,10 +485,17 @@ public function parseAndValidateMessage(string $message): ?Message
* @param string $message
* @param int|null $command
* @param int|null $qualityOfService
* @param bool $retained
* @param string|null $data
* @return bool
*/
protected function tryDecodeMessage(string $message, int &$command = null, int &$qualityOfService = null, string &$data = null): bool
protected function tryDecodeMessage(
string $message,
int &$command = null,
int &$qualityOfService = null,
bool &$retained = false,
string &$data = null
): bool
{
// If we received no input, we can return immediately without doing work.
if (strlen($message) === 0) {
Expand All @@ -504,6 +512,7 @@ protected function tryDecodeMessage(string $message, int &$command = null, int &
$byte = $message[0];
$command = (int) (ord($byte) / 16);
$qualityOfService = (ord($byte) & 0x06) >> 1;
$retained = (bool) (ord($byte) & 0x01);

// Read the second byte of a message (remaining length).
// If the continuation bit (8) is set on the length byte, another byte will be read as length.
Expand Down Expand Up @@ -546,15 +555,16 @@ protected function tryDecodeMessage(string $message, int &$command = null, int &
*
* @param string $data
* @param int $qualityOfServiceLevel
* @param bool $retained
* @return Message|null
*/
protected function parseAndValidatePublishMessage(string $data, int $qualityOfServiceLevel): ?Message
protected function parseAndValidatePublishMessage(string $data, int $qualityOfServiceLevel, bool $retained): ?Message
{
$topicLength = (ord($data[0]) << 8) + ord($data[1]);
$topic = substr($data, 2, $topicLength);
$content = substr($data, ($topicLength + 2));

$message = new Message(MessageType::PUBLISH(), $qualityOfServiceLevel);
$message = new Message(MessageType::PUBLISH(), $qualityOfServiceLevel, $retained);

if ($qualityOfServiceLevel > self::QOS_AT_MOST_ONCE) {
if (strlen($content) < 2) {
Expand Down
7 changes: 4 additions & 3 deletions src/MqttClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ protected function handleMessage(Message $message): void
$message->getTopic(),
$message->getContent(),
2,
false
$message->getRetained()
);
$this->repository->addPendingIncomingMessage($pendingMessage);
} catch (PendingMessageAlreadyExistsException $e) {
Expand All @@ -772,7 +772,7 @@ protected function handleMessage(Message $message): void
}

// For QoS 0 and QoS 1 we can deliver right away.
$this->deliverPublishedMessage($message->getTopic(), $message->getContent(), $message->getQualityOfService());
$this->deliverPublishedMessage($message->getTopic(), $message->getContent(), $message->getQualityOfService(), $message->getRetained());
return;
}

Expand Down Expand Up @@ -821,7 +821,8 @@ protected function handleMessage(Message $message): void
$this->deliverPublishedMessage(
$pendingMessage->getTopicName(),
$pendingMessage->getMessage(),
$pendingMessage->getQualityOfServiceLevel()
$pendingMessage->getQualityOfServiceLevel(),
$pendingMessage->wantsToBeRetained()
);

$this->repository->removePendingIncomingMessage($message->getMessageId());
Expand Down
186 changes: 171 additions & 15 deletions tests/Feature/PublishSubscribeTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,32 @@ class PublishSubscribeTest extends TestCase
{
public function publishSubscribeData(): array
{
return [
[false, 'test/foo/bar/baz', 'test/foo/bar/baz', 'hello world', []],
[false, 'test/foo/bar/+', 'test/foo/bar/baz', 'hello world', ['baz']],
[false, 'test/foo/+/baz', 'test/foo/bar/baz', 'hello world', ['bar']],
[false, 'test/foo/#', 'test/foo/bar/baz', 'hello world', ['bar/baz']],
[false, 'test/foo/+/bar/#', 'test/foo/my/bar/baz', 'hello world', ['my', 'baz']],
[false, 'test/foo/+/bar/#', 'test/foo/my/bar/baz/blub', 'hello world', ['my', 'baz/blub']],
[false, 'test/foo/bar/baz', 'test/foo/bar/baz', random_bytes(2 * 1024 * 1024), []], // 2MB message
[true, 'test/foo/bar/baz', 'test/foo/bar/baz', 'hello world', []],
[true, 'test/foo/bar/+', 'test/foo/bar/baz', 'hello world', ['baz']],
[true, 'test/foo/+/baz', 'test/foo/bar/baz', 'hello world', ['bar']],
[true, 'test/foo/#', 'test/foo/bar/baz', 'hello world', ['bar/baz']],
[true, 'test/foo/+/bar/#', 'test/foo/my/bar/baz', 'hello world', ['my', 'baz']],
[true, 'test/foo/+/bar/#', 'test/foo/my/bar/baz/blub', 'hello world', ['my', 'baz/blub']],
[true, 'test/foo/bar/baz', 'test/foo/bar/baz', random_bytes(2 * 1024 * 1024), []], // 2MB message
$data = [
[false, 'foo/bar/baz', 'foo/bar/baz', 'hello world', []],
[false, 'foo/bar/+', 'foo/bar/baz', 'hello world', ['baz']],
[false, 'foo/+/baz', 'foo/bar/baz', 'hello world', ['bar']],
[false, 'foo/#', 'foo/bar/baz', 'hello world', ['bar/baz']],
[false, 'foo/+/bar/#', 'foo/my/bar/baz', 'hello world', ['my', 'baz']],
[false, 'foo/+/bar/#', 'foo/my/bar/baz/blub', 'hello world', ['my', 'baz/blub']],
[false, 'foo/bar/baz', 'foo/bar/baz', random_bytes(2 * 1024 * 1024), []], // 2MB message
[true, 'foo/bar/baz', 'foo/bar/baz', 'hello world', []],
[true, 'foo/bar/+', 'foo/bar/baz', 'hello world', ['baz']],
[true, 'foo/+/baz', 'foo/bar/baz', 'hello world', ['bar']],
[true, 'foo/#', 'foo/bar/baz', 'hello world', ['bar/baz']],
[true, 'foo/+/bar/#', 'foo/my/bar/baz', 'hello world', ['my', 'baz']],
[true, 'foo/+/bar/#', 'foo/my/bar/baz/blub', 'hello world', ['my', 'baz/blub']],
[true, 'foo/bar/baz', 'foo/bar/baz', random_bytes(2 * 1024 * 1024), []], // 2MB message
];

// Because our tests are run against a real MQTT broker and some messages are retained,
// we need to prevent false-positives by giving each test case its own 'test space' using a random prefix.
for ($i = 0; $i < count($data); $i++) {
$prefix = 'test/' . uniqid('', true) . '/';
$data[$i][1] = $prefix . $data[$i][1];
$data[$i][2] = $prefix . $data[$i][2];
}

return $data;
}

/**
Expand Down Expand Up @@ -84,6 +94,57 @@ function (string $topic, string $message, bool $retained, array $wildcards) use
$subscriber->disconnect();
}

/**
* @dataProvider publishSubscribeData
*/
public function test_publishing_and_subscribing_using_quality_of_service_0_with_message_retention_works_as_intended(
bool $useBlockingSocket,
string $subscriptionTopicFilter,
string $publishTopic,
string $publishMessage,
array $matchedTopicWildcards
): void
{
// We publish a message from the first client, which disconnects before the other client even subscribes.
$publisher = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'publisher');
$publisher->connect(null, true);

$publisher->publish($publishTopic, $publishMessage, 0, true);

$publisher->disconnect();

// Because we need to make sure the message reached the broker, we delay the execution for a short period (100ms) intentionally.
// With higher QoS, this is replaced by awaiting delivery of the message.
usleep(100_000);

// We connect and subscribe to a topic using the second client.
$connectionSettings = (new ConnectionSettings())
->useBlockingSocket($useBlockingSocket);

$subscriber = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'subscriber');
$subscriber->connect($connectionSettings, true);

$subscriber->subscribe(
$subscriptionTopicFilter,
function (string $topic, string $message, bool $retained, array $wildcards) use ($subscriber, $publishTopic, $publishMessage, $matchedTopicWildcards) {
// By asserting something here, we will avoid a no-assertions-in-test warning, making the test pass.
$this->assertEquals($publishTopic, $topic);
$this->assertEquals($publishMessage, $message);
$this->assertTrue($retained);
$this->assertEquals($matchedTopicWildcards, $wildcards);

$subscriber->interrupt(); // This allows us to exit the test as soon as possible.
},
0
);

// Then we loop on the subscriber to (hopefully) receive the published message.
$subscriber->loop(true);

// Finally, we disconnect for a graceful shutdown on the broker side.
$subscriber->disconnect();
}

/**
* @dataProvider publishSubscribeData
*/
Expand Down Expand Up @@ -130,6 +191,54 @@ function (string $topic, string $message, bool $retained, array $wildcards) use
$subscriber->disconnect();
}

/**
* @dataProvider publishSubscribeData
*/
public function test_publishing_and_subscribing_using_quality_of_service_1_with_message_retention_works_as_intended(
bool $useBlockingSocket,
string $subscriptionTopicFilter,
string $publishTopic,
string $publishMessage,
array $matchedTopicWildcards
): void
{
// We publish a message from the first client, which disconnects before the other client even subscribes.
$publisher = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'publisher');
$publisher->connect(null, true);

$publisher->publish($publishTopic, $publishMessage, 1, true);
$publisher->loop(true, true);

$publisher->disconnect();

// We connect and subscribe to a topic using the second client.
$connectionSettings = (new ConnectionSettings())
->useBlockingSocket($useBlockingSocket);

$subscriber = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'subscriber');
$subscriber->connect($connectionSettings, true);

$subscriber->subscribe(
$subscriptionTopicFilter,
function (string $topic, string $message, bool $retained, array $wildcards) use ($subscriber, $publishTopic, $publishMessage, $matchedTopicWildcards) {
// By asserting something here, we will avoid a no-assertions-in-test warning, making the test pass.
$this->assertEquals($publishTopic, $topic);
$this->assertEquals($publishMessage, $message);
$this->assertTrue($retained);
$this->assertEquals($matchedTopicWildcards, $wildcards);

$subscriber->interrupt(); // This allows us to exit the test as soon as possible.
},
1
);

// Then we loop on the subscriber to (hopefully) receive the published message.
$subscriber->loop(true);

// Finally, we disconnect for a graceful shutdown on the broker side.
$subscriber->disconnect();
}

/**
* @dataProvider publishSubscribeData
*/
Expand Down Expand Up @@ -176,6 +285,53 @@ public function test_publishing_and_subscribing_using_quality_of_service_2_works
$subscriber->disconnect();
}

/**
* @dataProvider publishSubscribeData
*/
public function test_publishing_and_subscribing_using_quality_of_service_2_with_message_retention_works_as_intended(
bool $useBlockingSocket,
string $subscriptionTopicFilter,
string $publishTopic,
string $publishMessage,
array $matchedTopicWildcards
): void
{
// We publish a message from the first client. The loop is called until all QoS 2 handshakes are done.
$publisher = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'publisher');
$publisher->connect(null, true);

$publisher->publish($publishTopic, $publishMessage, 2, true);
$publisher->loop(true, true);

$publisher->disconnect();

// We connect and subscribe to a topic using the second client.
$connectionSettings = (new ConnectionSettings())
->useBlockingSocket($useBlockingSocket);

$subscriber = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'subscriber');
$subscriber->connect($connectionSettings, true);

$subscription = function (string $topic, string $message, bool $retained, array $wildcards) use ($subscriber, $subscriptionTopicFilter, $publishTopic, $publishMessage, $matchedTopicWildcards) {
// By asserting something here, we will avoid a no-assertions-in-test warning, making the test pass.
$this->assertEquals($publishTopic, $topic);
$this->assertEquals($publishMessage, $message);
$this->assertTrue($retained);
$this->assertEquals($matchedTopicWildcards, $wildcards);

$subscriber->unsubscribe($subscriptionTopicFilter);
$subscriber->interrupt(); // This allows us to exit the test as soon as possible.
};

$subscriber->subscribe($subscriptionTopicFilter, $subscription, 2);

// Then we loop on the subscriber to (hopefully) receive the published message until the receive handshake is done.
$subscriber->loop(true, true);

// Finally, we disconnect for a graceful shutdown on the broker side.
$subscriber->disconnect();
}

public function test_unsubscribe_stops_receiving_messages_on_topic(): void
{
// We connect and subscribe to a topic using the first client.
Expand Down

0 comments on commit 532339e

Please sign in to comment.