Skip to content

Commit

Permalink
feat!: store wait_time/handle_time in milliseconds
Browse files Browse the repository at this point in the history
  • Loading branch information
kbond committed Nov 15, 2024
1 parent e6a804a commit 49bef2e
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 30 deletions.
18 changes: 9 additions & 9 deletions UPGRADE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

## 0.5.0

Two new `integer` columns were added to the `processed_messages` table:
`wait_time` and `handle_time`. You will need to create a migration to
add these columns to your database. They are not nullable so your
migration will need to account for existing data. You can either
truncate (purge) the `processed_messages` table have your migration
calculate these values based on the existing data.
Two new `bigint` columns were added to the `processed_messages` table:
`wait_time` and `handle_time`. These are milliseconds. You will need to
create a migration to add these columns to your database. They are not
nullable so your migration will need to account for existing data. You
can either truncate (purge) the `processed_messages` table or have your
migration calculate these values based on the existing data.

Here's a calculation example for MySQL:

Expand All @@ -25,13 +25,13 @@ final class VersionXXX extends AbstractMigration
public function up(Schema $schema): void
{
// Add the columns as nullable
$this->addSql('ALTER TABLE processed_messages ADD wait_time INT DEFAULT NULL, ADD handle_time INT DEFAULT NULL');
$this->addSql('ALTER TABLE processed_messages ADD wait_time BIGINT DEFAULT NULL, ADD handle_time BIGINT DEFAULT NULL');

// set the times from existing data
$this->addSql('UPDATE processed_messages SET wait_time = TIMESTAMPDIFF(SECOND, dispatched_at, received_at), handle_time = TIMESTAMPDIFF(SECOND, received_at, finished_at)');
$this->addSql('UPDATE processed_messages SET wait_time = TIMESTAMPDIFF(SECOND, dispatched_at, received_at) * 1000, handle_time = TIMESTAMPDIFF(SECOND, received_at, finished_at) * 1000');

// Make the columns not nullable
$this->addSql('ALTER TABLE processed_messages CHANGE wait_time wait_time INT NOT NULL, CHANGE handle_time handle_time INT NOT NULL');
$this->addSql('ALTER TABLE processed_messages CHANGE wait_time wait_time BIGINT NOT NULL, CHANGE handle_time handle_time BIGINT NOT NULL');
}

public function down(Schema $schema): void
Expand Down
4 changes: 2 additions & 2 deletions config/doctrine/mapping/ProcessedMessage.orm.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
<field name="dispatchedAt" column="dispatched_at" type="datetime_immutable" />
<field name="receivedAt" column="received_at" type="datetime_immutable" />
<field name="finishedAt" column="finished_at" type="datetime_immutable" />
<field name="waitTime" column="wait_time" type="integer" />
<field name="handleTime" column="handle_time" type="integer" />
<field name="waitTime" column="wait_time" type="bigint" />
<field name="handleTime" column="handle_time" type="bigint" />
<field name="memoryUsage" column="memory_usage" type="integer" />
<field name="transport" column="transport" />
<field name="tags" column="tags" nullable="true" />
Expand Down
2 changes: 2 additions & 0 deletions src/History/Model/MessageTypeMetric.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ final class MessageTypeMetric

/**
* @param class-string $class
* @param float $averageWaitTime In seconds
* @param float $averageHandlingTime In seconds
*/
public function __construct(
string $class,
Expand Down
25 changes: 17 additions & 8 deletions src/History/Model/ProcessedMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ public function __construct(Envelope $envelope, Results $results, ?\Throwable $e
$this->transport = $monitorStamp->transport();
$this->tags = $tags->count() ? (string) $tags : null;
$this->results = $results;
$this->waitTime = \max(0, $this->receivedAt->getTimestamp() - $this->dispatchedAt->getTimestamp());
$this->handleTime = \max(0, $this->finishedAt->getTimestamp() - $this->receivedAt->getTimestamp());
$this->waitTime = (int) \max(0, \round(($this->receivedAt->format('U.u') - $this->dispatchedAt->format('U.u')) * 1000));
$this->handleTime = (int) \max(0, \round(($this->finishedAt->format('U.u') - $this->receivedAt->format('U.u')) * 1000));

if ($retryStamp = $envelope->last(RedeliveryStamp::class)) {
$this->attempt += $retryStamp->getRetryCount();
Expand Down Expand Up @@ -145,19 +145,28 @@ final public function isFailure(): bool
return null !== $this->failureType;
}

final public function timeInQueue(): int
/**
* @return float In seconds
*/
final public function timeInQueue(): float
{
return $this->waitTime;
return $this->waitTime / 1000;
}

final public function timeToHandle(): int
/**
* @return float In seconds
*/
final public function timeToHandle(): float
{
return $this->handleTime;
return $this->handleTime / 1000;
}

final public function timeToProcess(): int
/**
* @return float In seconds
*/
final public function timeToProcess(): float
{
return $this->waitTime + $this->handleTime;
return $this->timeInQueue() + $this->timeToHandle();
}

final public function memoryUsage(): Bytes
Expand Down
9 changes: 9 additions & 0 deletions src/History/Snapshot.php
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,25 @@ public function failRate(): float
}
}

/**
* @return float In seconds
*/
public function averageWaitTime(): float
{
return $this->averageWaitTime ??= $this->storage->averageWaitTime($this->specification) ?? 0.0;
}

/**
* @return float In seconds
*/
public function averageHandlingTime(): float
{
return $this->averageHandlingTime ??= $this->storage->averageHandlingTime($this->specification) ?? 0.0;
}

/**
* @return float In seconds
*/
public function averageProcessingTime(): float
{
return $this->averageWaitTime() + $this->averageHandlingTime();
Expand Down
6 changes: 6 additions & 0 deletions src/History/Storage.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,14 @@ public function save(Envelope $envelope, Results $results, ?\Throwable $exceptio

public function delete(mixed $id): void;

/**
* @return float|null In seconds
*/
public function averageWaitTime(Specification $specification): ?float;

/**
* @return float|null In seconds
*/
public function averageHandlingTime(Specification $specification): ?float;

public function count(Specification $specification): int;
Expand Down
8 changes: 4 additions & 4 deletions src/History/Storage/ORMStorage.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ public function perMessageTypeMetrics(Specification $specification): Collection
->select('m.type')
->addSelect('COUNT(m.type) as total_count')
->addSelect('COUNT(m.failureType) as failure_count')
->addSelect('AVG(m.waitTime) as avg_wait_time')
->addSelect('AVG(m.handleTime) as avg_handling_time')
->addSelect('AVG(m.waitTime) / 1000 AS avg_wait_time')
->addSelect('AVG(m.handleTime) / 1000 AS avg_handling_time')
->groupBy('m.type')
;

Expand Down Expand Up @@ -121,7 +121,7 @@ public function averageWaitTime(Specification $specification): ?float
{
$qb = $this
->queryBuilderFor($specification, false)
->select('AVG(m.waitTime)')
->select('AVG(m.waitTime) / 1000')
;

return (new EntityResult($qb))->asFloat()->first();
Expand All @@ -131,7 +131,7 @@ public function averageHandlingTime(Specification $specification): ?float
{
$qb = $this
->queryBuilderFor($specification, false)
->select('AVG(m.handleTime)')
->select('AVG(m.handleTime) / 1000')
;

return (new EntityResult($qb))->asFloat()->first();
Expand Down
4 changes: 2 additions & 2 deletions tests/Fixture/Factory/ProcessedMessageFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ protected function initialize(): static
->instantiateWith(Instantiator::withoutConstructor()->alwaysForce())
->beforeInstantiate(function(array $attributes) {
if (!isset($attributes['waitTime'])) {
$attributes['waitTime'] = \max(0, $attributes['receivedAt']->getTimestamp() - $attributes['dispatchedAt']->getTimestamp());
$attributes['waitTime'] = \max(0, \round(($attributes['receivedAt']->format('U.u') - $attributes['dispatchedAt']->format('U.u')) * 1000));
}

if (!isset($attributes['processingTime'])) {
$attributes['handleTime'] = \max(0, $attributes['finishedAt']->getTimestamp() - $attributes['receivedAt']->getTimestamp());
$attributes['handleTime'] = \max(0, \round(($attributes['finishedAt']->format('U.u') - $attributes['receivedAt']->format('U.u')) * 1000));
}

return $attributes;
Expand Down
2 changes: 1 addition & 1 deletion tests/Integration/History/Storage/ORMStorageTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public function average_wait_time(): void
'receivedAt' => $start->modify('+10 seconds'),
]);

$this->assertSame(15, (int) $this->storage()->averageWaitTime(Specification::new()));
$this->assertSame(15.0, $this->storage()->averageWaitTime(Specification::new()));
}

/**
Expand Down
42 changes: 38 additions & 4 deletions tests/Unit/History/Model/ProcessedMessageTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ final class ProcessedMessageTest extends TestCase
public function minimal_constructor(): void
{
$start = self::mockTime()->now()->getTimestamp();

$stamp = new MonitorStamp();

Clock::get()->sleep(1);
Expand Down Expand Up @@ -64,9 +65,9 @@ public function id(): string|int|\Stringable|null
$this->assertSame([], $message->tags()->all());
$this->assertSame([], $message->results()->all());
$this->assertSame('foo', $message->transport());
$this->assertSame(1, $message->timeInQueue());
$this->assertSame(2, $message->timeToHandle());
$this->assertSame(3, $message->timeToProcess());
$this->assertSame(1.0, $message->timeInQueue());
$this->assertSame(2.0, $message->timeToHandle());
$this->assertSame(3.0, $message->timeToProcess());
$this->assertFalse($message->isFailure());
$this->assertNull($message->failure());
$this->assertTrue($message->memoryUsage()->isGreaterThan(0));
Expand Down Expand Up @@ -101,7 +102,7 @@ public function id(): string|int|null
$this->assertSame([['exception' => \RuntimeException::class, 'message' => 'failure', 'data' => []]], $message->results()->jsonSerialize());
$this->assertTrue($message->isFailure());
$this->assertSame('RuntimeException', (string) $message->failure());
$this->assertSame('fail', $message->failure()->description());
$this->assertSame('fail', $message->failure()?->description());
}

/**
Expand Down Expand Up @@ -139,4 +140,37 @@ public function id(): string|int|null
$this->assertSame(StringableObject::class, $message->type()->class());
$this->assertSame('description value', $message->description());
}

/**
* @test
*/
public function partial_seconds(): void
{
$start = self::mockTime('2024-10-06')->now();

$stamp = new MonitorStamp();

Clock::get()->sleep(1.1);

$stamp = $stamp->markReceived('foo');

Clock::get()->sleep(2.2);

$stamp = $stamp->markFinished();

$envelope = new Envelope(new \stdClass(), [$stamp]);
$message = new class($envelope, new Results([])) extends ProcessedMessage {
public function id(): string|int|\Stringable|null
{
return null;
}
};

$this->assertEquals($start, $message->dispatchedAt());
$this->assertEquals($start->modify('+1100 milliseconds'), $message->receivedAt());
$this->assertEquals($start->modify('+3300 milliseconds'), $message->finishedAt());
$this->assertSame(1.1, $message->timeInQueue());
$this->assertSame(2.2, $message->timeToHandle());
$this->assertSame(3.3, \round($message->timeToProcess(), 1));
}
}

0 comments on commit 49bef2e

Please sign in to comment.