Skip to content

Commit

Permalink
do not substring on all the line length
Browse files Browse the repository at this point in the history
  • Loading branch information
chazzbg committed Oct 14, 2024
1 parent 4acd8b6 commit 6e4f2c2
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 4 deletions.
18 changes: 14 additions & 4 deletions src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class Connection
private float $pingAt = 0;
private float $pongAt = 0;
private float $prolongateTill = 0;
private int $paketSize = 1024;

private ?Authenticator $authenticator;
private Configuration $config;
Expand Down Expand Up @@ -128,28 +129,32 @@ public function sendMessage(Message $message)

$line = $message->render() . "\r\n";
$length = strlen($line);
$total = 0;

$this->logger?->debug('send ' . $line);

while (strlen($line)) {
while ($total < $length) {
try {
$written = @fwrite($this->socket, $line, 1024);
$written = @fwrite($this->socket, substr($line, $total, $this->paketSize));
if ($written === false) {
throw new LogicException('Error sending data');
}
if ($written === 0) {
throw new LogicException('Broken pipe or closed connection');
}
if ($length == $written) {
$total += $written;

if ($length == $total) {
break;
}
$line = substr($line, $written);
} catch (Throwable $e) {
$this->processException($e);
$line = $message->render() . "\r\n";
}
}

unset($line);

if ($message instanceof Publish) {
if (strpos($message->subject, '$JS.API.CONSUMER.MSG.NEXT.') === 0) {
$prolongate = $message->payload->expires / 1_000_000_000;
Expand Down Expand Up @@ -291,6 +296,11 @@ private function processException(Throwable $e)
}
}

public function setPacketSize(int $size): void
{
$this->paketSize = $size;
}

public function close(): void
{
if ($this->socket) {
Expand Down
27 changes: 27 additions & 0 deletions tests/Performance/PerformanceTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ class PerformanceTest extends FunctionalTestCase
{
private int $limit = 500_000;
private int $counter = 0;
private int $bigMessageIterationLimit = 1000;
private int $payloadSize = 1024 * 450; // 900kb

public function testPerformance()
{
Expand Down Expand Up @@ -49,4 +51,29 @@ public function testPerformance()
// at least 5000rps should be enough for test
$this->assertGreaterThan(5000, $this->limit / $processing);
}


public function testPerformanceWithBigMessages()
{
$client = $this->createClient()->setTimeout(0.1)->setDelay(0);
$client->connection->setLogger(null);

$bigPayload = bin2hex(random_bytes($this->payloadSize));
$this->logger?->info('start big message performance test with size: '. strlen($bigPayload) / 1024 .'kb');

$publishing = microtime(true);
for ($i = 0; $i < $this->bigMessageIterationLimit; $i++) {
$client->publish('hello', $bigPayload);
}
$publishing = microtime(true) - $publishing;

$this->logger?->info('publishing', [
'rps' => floor($this->bigMessageIterationLimit / $publishing),
'length' => $this->bigMessageIterationLimit,
'time' => $publishing,
]);

// at least 50rps should be enough for test
$this->assertGreaterThan(500, $this->bigMessageIterationLimit / $publishing);
}
}

0 comments on commit 6e4f2c2

Please sign in to comment.