Skip to content

Commit

Permalink
DE-111852 Add support for custom SQS message attributes - part 02 (#103)
Browse files Browse the repository at this point in the history
* DE-111852 Add support for custom SQS message attributes - part 02

* DE-111852 Leverage nullsafe operator when dispatching events

* DE-111852 Refactor SqsMessageAttribute into a class
  • Loading branch information
dorrogeray authored Sep 4, 2024
1 parent 761730b commit 34910c9
Show file tree
Hide file tree
Showing 16 changed files with 609 additions and 125 deletions.
15 changes: 13 additions & 2 deletions src/Jobs/JobInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace BE\QueueManagement\Jobs;

use BE\QueueManagement\Jobs\JobDefinitions\JobDefinitionInterface;
use BE\QueueManagement\Queue\AWSSQS\SqsMessageAttribute;
use DateTimeImmutable;

interface JobInterface
Expand Down Expand Up @@ -64,13 +65,23 @@ public function setExecutionPlannedAt(DateTimeImmutable $executionPlannedAt): vo


/**
* @return array<string,array{DataType: string, StringValue?: string, BinaryValue?: string}>
* @return array<string,SqsMessageAttribute>
*/
public function getMessageAttributes(): array;


/**
* @param array<string,array{DataType: string, StringValue?: string, BinaryValue?: string}> $messageAttributes
* @param array<string,SqsMessageAttribute> $messageAttributes
*/
public function setMessageAttributes(array $messageAttributes): void;


public function getMessageAttribute(
string $messageAttributeName,
): SqsMessageAttribute|null;


public function setMessageAttribute(
SqsMessageAttribute $sqsMessageAttribute,
): void;
}
42 changes: 19 additions & 23 deletions src/Jobs/SimpleJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

use BE\QueueManagement\Jobs\Execution\MaximumAttemptsExceededException;
use BE\QueueManagement\Jobs\JobDefinitions\JobDefinitionInterface;
use BE\QueueManagement\Queue\AWSSQS\SqsMessageAttribute;
use BE\QueueManagement\Queue\AWSSQS\SqsMessageAttributeDataType;
use BE\QueueManagement\Queue\AWSSQS\SqsMessageAttributeFields;
use BrandEmbassy\DateTime\DateTimeFormatter;
use DateTimeImmutable;
use Doctrine\Common\Collections\Collection;
Expand All @@ -32,14 +32,14 @@ class SimpleJob implements JobInterface
private ?DateTimeImmutable $executionPlannedAt;

/**
* @var array<string,array{DataType: string, StringValue?: string, BinaryValue?: string}>
* @var array<string,SqsMessageAttribute>
*/
private array $messageAttributes;


/**
* @param Collection<string, mixed> $parameters
* @param array<string, mixed> $messageAttributes
* @param array<string, SqsMessageAttribute> $messageAttributes
*/
public function __construct(
string $uuid,
Expand Down Expand Up @@ -188,36 +188,32 @@ public function setExecutionPlannedAt(DateTimeImmutable $executionPlannedAt): vo
}


/**
* @return array<string,array{DataType: string, StringValue?: string, BinaryValue?: string}>
*/
public function getMessageAttributes(): array
{
return $this->messageAttributes;
public function getMessageAttribute(
string $messageAttributeName,
SqsMessageAttributeDataType $messageAttributeDataType = SqsMessageAttributeDataType::STRING
): ?SqsMessageAttribute {
return $this->messageAttributes[$messageAttributeName] ?? null;
}


public function setMessageAttribute(
string $messageAttributeName,
string $messageAttributeValue,
SqsMessageAttributeDataType $messageAttributeDataType = SqsMessageAttributeDataType::STRING
SqsMessageAttribute $sqsMessageAttribute,
): void {
$valueKey = $messageAttributeDataType === SqsMessageAttributeDataType::BINARY
? SqsMessageAttributeFields::BINARY_VALUE->value
: SqsMessageAttributeFields::STRING_VALUE->value;

/** @var array{DataType: string, StringValue?: string, BinaryValue?: string} $messageAttribute */
$messageAttribute = [
SqsMessageAttributeFields::DATA_TYPE->value => $messageAttributeDataType->value,
$valueKey => $messageAttributeValue,
];
$this->messageAttributes[$sqsMessageAttribute->getName()] = $sqsMessageAttribute;
}

$this->messageAttributes[$messageAttributeName] = $messageAttribute;

/**
* @return array<string,SqsMessageAttribute>
*/
public function getMessageAttributes(): array
{
return $this->messageAttributes;
}


/**
* @param array<string,array{DataType: string, StringValue?: string, BinaryValue?: string}> $messageAttributes
* @param array<string,SqsMessageAttribute> $messageAttributes
*/
public function setMessageAttributes(array $messageAttributes): void
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace BE\QueueManagement\Observability;

class MessageSentEvent
class AfterMessageSentEvent
{
/**
* @param mixed[] $messageAttributes
Expand Down
15 changes: 15 additions & 0 deletions src/Observability/BeforeMessageSentEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php declare(strict_types = 1);

namespace BE\QueueManagement\Observability;

use BE\QueueManagement\Jobs\JobInterface;

class BeforeMessageSentEvent
{
public function __construct(
public readonly JobInterface $job,
public readonly int $delayInSeconds,
public readonly string $prefixedQueueName,
) {
}
}
28 changes: 19 additions & 9 deletions src/Queue/AWSSQS/SqsMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@
* AWS SQS API does not provide type for SQSMessage, only type \Aws\Result. This class is simple abstraction over this generic type.
* For details see https://docs.aws.amazon.com/aws-sdk-php/v3/api/class-Aws.Result.html
*
* @phpstan-type TSqsMessage array{
* MessageAttributes: array<string,SqsMessageAttribute>,
* Body: string,
* Attributes: mixed[],
* ReceiptHandle: mixed,
* MessageId: string,
* }
* @final
*/
class SqsMessage
Expand All @@ -18,15 +25,15 @@ class SqsMessage
public const MAX_SQS_SIZE_KB = 256;

/**
* @var mixed[]
* @var TSqsMessage
*/
private array $message;

private string $queueUrl;


/**
* @param array<mixed> $message
* @param TSqsMessage $message
*/
public function __construct(array $message, string $queueUrl)
{
Expand All @@ -35,10 +42,7 @@ public function __construct(array $message, string $queueUrl)
}


/**
* @return mixed
*/
public function getReceiptHandle()
public function getReceiptHandle(): mixed
{
return $this->message[SqsMessageFields::RECEIPT_HANDLE];
}
Expand Down Expand Up @@ -72,6 +76,13 @@ public function getMessageAttributes(): array
}


public function getMessageAttribute(
string $messageAttributeName
): ?SqsMessageAttribute {
return $this->message[SqsMessageFields::MESSAGE_ATTRIBUTES][$messageAttributeName] ?? null;
}


public function getQueueUrl(): string
{
return $this->queueUrl;
Expand All @@ -87,15 +98,14 @@ public function getMessageId(): string
/**
* Returns true if message is bigger than 256 KB (AWS SQS message size limit), false otherwise
*
* @param array<string, array<string, string>> $messageAttributes
* @param array<string, SqsMessageAttribute> $messageAttributes
*/
public static function isTooBig(string $messageBody, array $messageAttributes): bool
{
$messageSize = strlen($messageBody);
foreach ($messageAttributes as $messageAttributeKey => $messageAttribute) {
$messageSize += strlen($messageAttributeKey);
$messageSize += strlen($messageAttribute['DataType'] ?? '');
$messageSize += strlen($messageAttribute['StringValue'] ?? '');
$messageSize += $messageAttribute->getSizeInBytes();
}

return $messageSize > self::MAX_SQS_SIZE_KB * 1024;
Expand Down
75 changes: 75 additions & 0 deletions src/Queue/AWSSQS/SqsMessageAttribute.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
<?php declare(strict_types = 1);

namespace BE\QueueManagement\Queue\AWSSQS;

use function is_numeric;
use function str_contains;
use function strlen;

/**
* Represent SQS Message Attribute
*
* @final
*/
class SqsMessageAttribute
{
public function __construct(
private readonly string $name,
private readonly string $value,
private readonly SqsMessageAttributeDataType $dataType,
) {
}


public function getName(): string
{
return $this->name;
}


public function getValue(): string|int|float
{
if ($this->dataType === SqsMessageAttributeDataType::NUMBER) {
if (is_numeric($this->value)) {
if (str_contains($this->value, '.')) {
return (float)$this->value;
}

return (int)$this->value;
}
}

return $this->value;
}


public function getDataType(): SqsMessageAttributeDataType
{
return $this->dataType;
}


public function getSizeInBytes(): int
{
return strlen($this->dataType->value) + strlen($this->value);
}


/**
* @return array{DataType: 'String'|'Number', StringValue: string}|array{DataType: 'Binary', BinaryValue: string}
*/
public function toArray(): array
{
if ($this->dataType === SqsMessageAttributeDataType::BINARY) {
return [
SqsMessageAttributeFields::DATA_TYPE->value => $this->dataType->value,
SqsMessageAttributeFields::BINARY_VALUE->value => $this->value,
];
}

return [
SqsMessageAttributeFields::DATA_TYPE->value => $this->dataType->value,
SqsMessageAttributeFields::STRING_VALUE->value => $this->value,
];
}
}
26 changes: 26 additions & 0 deletions src/Queue/AWSSQS/SqsMessageAttributeFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php declare(strict_types = 1);

namespace BE\QueueManagement\Queue\AWSSQS;

/**
* @final
*/
class SqsMessageAttributeFactory
{
/**
* @param array{DataType: 'String'|'Number', StringValue: string}|array{DataType: 'Binary', BinaryValue: string} $value
*/
public function createFromArray(string $name, array $value): SqsMessageAttribute
{
$dataType = SqsMessageAttributeDataType::from($value[SqsMessageAttributeFields::DATA_TYPE->value]);
$valueKey = $dataType === SqsMessageAttributeDataType::BINARY
? SqsMessageAttributeFields::BINARY_VALUE->value
: SqsMessageAttributeFields::STRING_VALUE->value;

return new SqsMessageAttribute(
$name,
$value[$valueKey],
$dataType,
);
}
}
Loading

0 comments on commit 34910c9

Please sign in to comment.