Skip to content

Commit

Permalink
Merge pull request #7 from simPod/rdk4
Browse files Browse the repository at this point in the history
Upgrade to rdkafka v4
  • Loading branch information
simPod authored Jul 3, 2020
2 parents a8bfaf6 + d53c2b0 commit 95697d6
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 66 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
php-version: 7.3
coverage: none
tools: pecl
extensions: json, mbstring, rdkafka-3.1.2
extensions: json, mbstring, rdkafka

- name: Get Composer Cache Directory
id: composer-cache
Expand Down Expand Up @@ -68,7 +68,7 @@ jobs:
php-version: 7.3
coverage: none
tools: pecl
extensions: json, mbstring, rdkafka-3.1.2
extensions: json, mbstring, rdkafka

- name: Get Composer Cache Directory
id: composer-cache
Expand Down Expand Up @@ -110,7 +110,7 @@ jobs:
php-version: 7.3
coverage: pcov
tools: pecl
extensions: json, mbstring, rdkafka-3.1.2
extensions: json, mbstring, rdkafka

- name: Get Composer Cache Directory
id: composer-cache
Expand Down Expand Up @@ -140,7 +140,7 @@ jobs:
runs-on: ubuntu-18.04
strategy:
matrix:
php: [7.3]
php: [7.3, 7.4]
env: [
'DEPENDENCIES=--prefer-lowest',
'',
Expand All @@ -164,7 +164,7 @@ jobs:
php-version: ${{ matrix.php }}
coverage: none
tools: pecl
extensions: json, mbstring, rdkafka-3.1.2
extensions: json, mbstring, rdkafka

- name: Get Composer Cache Directory
id: composer-cache
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/infection.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
php-version: 7.3
coverage: pcov
tools: pecl
extensions: json, mbstring, rdkafka-3.1.2
extensions: json, mbstring, rdkafka

- name: Install Dependencies
run: composer install --prefer-dist --no-progress --no-suggest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/shepherd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
php-version: 7.3
coverage: none
tools: pecl
extensions: json, mbstring, rdkafka-3.1.2
extensions: json, mbstring, rdkafka

- name: Install dependencies
run: composer install --prefer-dist --no-progress --no-suggest
Expand Down
9 changes: 4 additions & 5 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,21 @@
"require": {
"php": "^7.2",
"ext-pcntl": "*",
"ext-rdkafka": "*",
"psr/log": "^1.1",
"symfony/polyfill-php73": "^1.11"
"ext-rdkafka": "^4",
"psr/log": "^1.1"
},
"require-dev": {
"doctrine/coding-standard": "^7.0",
"infection/infection": "^0.16.3",
"kwn/php-rdkafka-stubs": "^1.0",
"kwn/php-rdkafka-stubs": "^2.0",
"phpstan/extension-installer": "^1.0",
"phpstan/phpstan": "0.12.25",
"phpstan/phpstan-phpunit": "0.12.8",
"phpstan/phpstan-strict-rules": "0.12.2",
"phpunit/phpunit": "^9.1",
"psalm/plugin-phpunit": "^0.10.0",
"simpod/php-coveralls-mirror": "^3.0",
"vimeo/psalm": "dev-master#47cf69d as 3.11.4"
"vimeo/psalm": "^3.12"
},
"suggest": {
"kwn/php-rdkafka-stubs": "Support and autocompletion for RDKafka in IDE | require as dev dependency"
Expand Down
4 changes: 2 additions & 2 deletions src/Clients/Consumer/KafkaConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ function (RdKafkaConsumer $kafka, int $err, string $reason) : void {
}
);

$rebalanceCb =
$rebalanceCallback =
/** @param array<string, TopicPartition>|null $partitions */
function (RdKafkaConsumer $kafka, int $err, ?array $partitions = null) : void {
switch ($err) {
Expand Down Expand Up @@ -77,7 +77,7 @@ static function (TopicPartition $partition) : string {
$kafka->assign();
}
};
$config->getConf()->setRebalanceCb($rebalanceCb);
$config->getConf()->setRebalanceCb($rebalanceCallback);

parent::__construct($config->getConf());
}
Expand Down
64 changes: 56 additions & 8 deletions src/Clients/Producer/KafkaProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,78 @@

namespace SimPod\Kafka\Clients\Producer;

use InvalidArgumentException;
use RdKafka\Producer;
use RdKafka\ProducerTopic;
use RuntimeException;
use function sprintf;
use const RD_KAFKA_PARTITION_UA;
use const RD_KAFKA_RESP_ERR_NO_ERROR;

class KafkaProducer extends Producer
{
private const RD_KAFKA_MSG_F_COPY = 0;

public function __construct(ProducerConfig $config)
/** @var callable(KafkaProducer):void|null */
private $exitCallback;

/** @param callable(KafkaProducer):void|null $exitCallback */
public function __construct(ProducerConfig $config, ?callable $exitCallback = null)
{
$this->exitCallback = $exitCallback;

parent::__construct($config->getConf());
}

public function produce(ProducerRecord $record) : void
public function __destruct()
{
$topic = $this->newTopic($record->topic);
/** @psalm-suppress UndefinedMethod https://github.com/vimeo/psalm/issues/3406 */
$topic->produce($record->partition ?? RD_KAFKA_PARTITION_UA, self::RD_KAFKA_MSG_F_COPY, $record->value, $record->key);
if ($this->exitCallback === null) {
return;
}

($this->exitCallback)($this);
}

/** @param array<string, string>|null $headers */
public function produce(
string $topicName,
?int $partition,
string $value,
?string $key = null,
?array $headers = null,
?int $timestampMs = null
) : void {
if ($partition < 0) {
throw new InvalidArgumentException(
sprintf('Invalid partition: %d. Partition number should always be non-negative or null.', $partition)
);
}

/** @psalm-var ProducerTopic $topic Psalm thinks this is a Topic https://github.com/vimeo/psalm/issues/3406 */
$topic = $this->newTopic($topicName);
$topic->producev(
$partition ?? RD_KAFKA_PARTITION_UA,
self::RD_KAFKA_MSG_F_COPY,
$value,
$key,
$headers,
$timestampMs
);
$this->poll(0);
}

public function flush() : void
public function flushMessages(int $timeoutMs = 10000) : void
{
while ($this->getOutQLen() > 0) {
$this->poll(1);
$result = null;
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $this->flush($timeoutMs);
if ($result === RD_KAFKA_RESP_ERR_NO_ERROR) {
break;
}
}

if ($result !== RD_KAFKA_RESP_ERR_NO_ERROR) {
throw new RuntimeException('Was unable to flush, messages might be lost!');
}
}
}
35 changes: 0 additions & 35 deletions src/Clients/Producer/ProducerRecord.php

This file was deleted.

16 changes: 7 additions & 9 deletions tests/Clients/Consumer/TestProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

use SimPod\Kafka\Clients\Producer\KafkaProducer;
use SimPod\Kafka\Clients\Producer\ProducerConfig;
use SimPod\Kafka\Clients\Producer\ProducerRecord;
use function gethostname;

final class TestProducer
Expand All @@ -16,18 +15,17 @@ final class TestProducer

public function __construct()
{
$this->producer = new KafkaProducer($this->getConfig());
$this->producer = new KafkaProducer(
$this->getConfig(),
static function (KafkaProducer $producer) : void {
$producer->flushMessages(5000);
}
);
}

public function run(string $payload) : void
{
$record = new ProducerRecord(KafkaBatchConsumerTest::TOPIC, null, $payload);
$this->producer->produce($record);
}

public function flush() : void
{
$this->producer->flush();
$this->producer->produce(KafkaBatchConsumerTest::TOPIC, null, $payload);
}

private function getConfig() : ProducerConfig
Expand Down

0 comments on commit 95697d6

Please sign in to comment.