From d53c2b058041e6edc5bf88fc0f7d0019582eba1f Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Tue, 19 May 2020 18:28:44 +0200 Subject: [PATCH] Upgrade to rdkafka v4 --- .github/workflows/ci.yml | 10 ++-- .github/workflows/infection.yml | 2 +- .github/workflows/shepherd.yml | 2 +- composer.json | 9 ++-- src/Clients/Consumer/KafkaConsumer.php | 4 +- src/Clients/Producer/KafkaProducer.php | 64 +++++++++++++++++++++---- src/Clients/Producer/ProducerRecord.php | 35 -------------- tests/Clients/Consumer/TestProducer.php | 16 +++---- 8 files changed, 76 insertions(+), 66 deletions(-) delete mode 100644 src/Clients/Producer/ProducerRecord.php diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d46fe50..09e0807 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -31,7 +31,7 @@ jobs: php-version: 7.3 coverage: none tools: pecl - extensions: json, mbstring, rdkafka-3.1.2 + extensions: json, mbstring, rdkafka - name: Get Composer Cache Directory id: composer-cache @@ -68,7 +68,7 @@ jobs: php-version: 7.3 coverage: none tools: pecl - extensions: json, mbstring, rdkafka-3.1.2 + extensions: json, mbstring, rdkafka - name: Get Composer Cache Directory id: composer-cache @@ -110,7 +110,7 @@ jobs: php-version: 7.3 coverage: pcov tools: pecl - extensions: json, mbstring, rdkafka-3.1.2 + extensions: json, mbstring, rdkafka - name: Get Composer Cache Directory id: composer-cache @@ -140,7 +140,7 @@ jobs: runs-on: ubuntu-18.04 strategy: matrix: - php: [7.3] + php: [7.3, 7.4] env: [ 'DEPENDENCIES=--prefer-lowest', '', @@ -164,7 +164,7 @@ jobs: php-version: ${{ matrix.php }} coverage: none tools: pecl - extensions: json, mbstring, rdkafka-3.1.2 + extensions: json, mbstring, rdkafka - name: Get Composer Cache Directory id: composer-cache diff --git a/.github/workflows/infection.yml b/.github/workflows/infection.yml index edff49b..aba4a24 100644 --- a/.github/workflows/infection.yml +++ b/.github/workflows/infection.yml @@ -30,7 +30,7 @@ jobs: php-version: 7.3 coverage: pcov tools: pecl - extensions: json, mbstring, rdkafka-3.1.2 + extensions: json, mbstring, rdkafka - name: Install Dependencies run: composer install --prefer-dist --no-progress --no-suggest diff --git a/.github/workflows/shepherd.yml b/.github/workflows/shepherd.yml index 2e04788..a04ea0c 100644 --- a/.github/workflows/shepherd.yml +++ b/.github/workflows/shepherd.yml @@ -27,7 +27,7 @@ jobs: php-version: 7.3 coverage: none tools: pecl - extensions: json, mbstring, rdkafka-3.1.2 + extensions: json, mbstring, rdkafka - name: Install dependencies run: composer install --prefer-dist --no-progress --no-suggest diff --git a/composer.json b/composer.json index 5e86df6..803994c 100644 --- a/composer.json +++ b/composer.json @@ -15,14 +15,13 @@ "require": { "php": "^7.2", "ext-pcntl": "*", - "ext-rdkafka": "*", - "psr/log": "^1.1", - "symfony/polyfill-php73": "^1.11" + "ext-rdkafka": "^4", + "psr/log": "^1.1" }, "require-dev": { "doctrine/coding-standard": "^7.0", "infection/infection": "^0.16.3", - "kwn/php-rdkafka-stubs": "^1.0", + "kwn/php-rdkafka-stubs": "^2.0", "phpstan/extension-installer": "^1.0", "phpstan/phpstan": "0.12.25", "phpstan/phpstan-phpunit": "0.12.8", @@ -30,7 +29,7 @@ "phpunit/phpunit": "^9.1", "psalm/plugin-phpunit": "^0.10.0", "simpod/php-coveralls-mirror": "^3.0", - "vimeo/psalm": "dev-master#47cf69d as 3.11.4" + "vimeo/psalm": "^3.12" }, "suggest": { "kwn/php-rdkafka-stubs": "Support and autocompletion for RDKafka in IDE | require as dev dependency" diff --git a/src/Clients/Consumer/KafkaConsumer.php b/src/Clients/Consumer/KafkaConsumer.php index 6ba0ad7..e23e3ee 100644 --- a/src/Clients/Consumer/KafkaConsumer.php +++ b/src/Clients/Consumer/KafkaConsumer.php @@ -44,7 +44,7 @@ function (RdKafkaConsumer $kafka, int $err, string $reason) : void { } ); - $rebalanceCb = + $rebalanceCallback = /** @param array|null $partitions */ function (RdKafkaConsumer $kafka, int $err, ?array $partitions = null) : void { switch ($err) { @@ -77,7 +77,7 @@ static function (TopicPartition $partition) : string { $kafka->assign(); } }; - $config->getConf()->setRebalanceCb($rebalanceCb); + $config->getConf()->setRebalanceCb($rebalanceCallback); parent::__construct($config->getConf()); } diff --git a/src/Clients/Producer/KafkaProducer.php b/src/Clients/Producer/KafkaProducer.php index 71448fa..e7eb62b 100644 --- a/src/Clients/Producer/KafkaProducer.php +++ b/src/Clients/Producer/KafkaProducer.php @@ -4,30 +4,78 @@ namespace SimPod\Kafka\Clients\Producer; +use InvalidArgumentException; use RdKafka\Producer; +use RdKafka\ProducerTopic; +use RuntimeException; +use function sprintf; use const RD_KAFKA_PARTITION_UA; +use const RD_KAFKA_RESP_ERR_NO_ERROR; class KafkaProducer extends Producer { private const RD_KAFKA_MSG_F_COPY = 0; - public function __construct(ProducerConfig $config) + /** @var callable(KafkaProducer):void|null */ + private $exitCallback; + + /** @param callable(KafkaProducer):void|null $exitCallback */ + public function __construct(ProducerConfig $config, ?callable $exitCallback = null) { + $this->exitCallback = $exitCallback; + parent::__construct($config->getConf()); } - public function produce(ProducerRecord $record) : void + public function __destruct() { - $topic = $this->newTopic($record->topic); - /** @psalm-suppress UndefinedMethod https://github.com/vimeo/psalm/issues/3406 */ - $topic->produce($record->partition ?? RD_KAFKA_PARTITION_UA, self::RD_KAFKA_MSG_F_COPY, $record->value, $record->key); + if ($this->exitCallback === null) { + return; + } + + ($this->exitCallback)($this); + } + + /** @param array|null $headers */ + public function produce( + string $topicName, + ?int $partition, + string $value, + ?string $key = null, + ?array $headers = null, + ?int $timestampMs = null + ) : void { + if ($partition < 0) { + throw new InvalidArgumentException( + sprintf('Invalid partition: %d. Partition number should always be non-negative or null.', $partition) + ); + } + + /** @psalm-var ProducerTopic $topic Psalm thinks this is a Topic https://github.com/vimeo/psalm/issues/3406 */ + $topic = $this->newTopic($topicName); + $topic->producev( + $partition ?? RD_KAFKA_PARTITION_UA, + self::RD_KAFKA_MSG_F_COPY, + $value, + $key, + $headers, + $timestampMs + ); $this->poll(0); } - public function flush() : void + public function flushMessages(int $timeoutMs = 10000) : void { - while ($this->getOutQLen() > 0) { - $this->poll(1); + $result = null; + for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) { + $result = $this->flush($timeoutMs); + if ($result === RD_KAFKA_RESP_ERR_NO_ERROR) { + break; + } + } + + if ($result !== RD_KAFKA_RESP_ERR_NO_ERROR) { + throw new RuntimeException('Was unable to flush, messages might be lost!'); } } } diff --git a/src/Clients/Producer/ProducerRecord.php b/src/Clients/Producer/ProducerRecord.php deleted file mode 100644 index 8201378..0000000 --- a/src/Clients/Producer/ProducerRecord.php +++ /dev/null @@ -1,35 +0,0 @@ -topic = $topic; - $this->partition = $partition; - $this->key = $key; - $this->value = $value; - } -} diff --git a/tests/Clients/Consumer/TestProducer.php b/tests/Clients/Consumer/TestProducer.php index 38c75a9..015cf33 100644 --- a/tests/Clients/Consumer/TestProducer.php +++ b/tests/Clients/Consumer/TestProducer.php @@ -6,7 +6,6 @@ use SimPod\Kafka\Clients\Producer\KafkaProducer; use SimPod\Kafka\Clients\Producer\ProducerConfig; -use SimPod\Kafka\Clients\Producer\ProducerRecord; use function gethostname; final class TestProducer @@ -16,18 +15,17 @@ final class TestProducer public function __construct() { - $this->producer = new KafkaProducer($this->getConfig()); + $this->producer = new KafkaProducer( + $this->getConfig(), + static function (KafkaProducer $producer) : void { + $producer->flushMessages(5000); + } + ); } public function run(string $payload) : void { - $record = new ProducerRecord(KafkaBatchConsumerTest::TOPIC, null, $payload); - $this->producer->produce($record); - } - - public function flush() : void - { - $this->producer->flush(); + $this->producer->produce(KafkaBatchConsumerTest::TOPIC, null, $payload); } private function getConfig() : ProducerConfig