diff --git a/.travis.yml b/.travis.yml index b12489d..17c1e75 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,8 +4,16 @@ php: - 5.6 - 7.0 - 7.1 + - 7.2 + - 7.3 + - hhvm - nightly +matrix: + allow_failures: + - php: hhvm + - php: nightly + before_script: - composer install diff --git a/src/PubSubConnectionFactory.php b/src/PubSubConnectionFactory.php index e0e97fd..111c3ce 100644 --- a/src/PubSubConnectionFactory.php +++ b/src/PubSubConnectionFactory.php @@ -82,21 +82,39 @@ protected function makeRedisAdapter(array $config) */ protected function makeKafkaAdapter(array $config) { - // create producer - $producer = $this->container->makeWith('pubsub.kafka.producer'); - $producer->addBrokers($config['brokers']); - - // create consumer + // create default topic $topicConf = $this->container->makeWith('pubsub.kafka.topic_conf'); $topicConf->set('auto.offset.reset', 'smallest'); + // create config $conf = $this->container->makeWith('pubsub.kafka.conf'); $conf->set('group.id', array_get($config, 'consumer_group_id', 'php-pubsub')); $conf->set('metadata.broker.list', $config['brokers']); $conf->set('enable.auto.commit', 'false'); $conf->set('offset.store.method', 'broker'); + + if (array_key_exists('security_protocol', $config)) { + switch ($config['security_protocol']) { + case 'SASL_SSL': + case 'SASL_PAINTEXT': + $conf->set('security.protocol', array_get($config, 'security_protocol', 'SASL_SSL')); + $conf->set('sasl.username', array_get($config, 'sasl_username', 'sasl_username')); + $conf->set('sasl.password', array_get($config, 'sasl_password', 'sasl_password')); + $conf->set('sasl.mechanisms', array_get($config, 'sasl_mechanisms', 'PLAIN')); + break; + + default: + break; + } + } + $conf->setDefaultTopicConf($topicConf); + // create producer + $producer = $this->container->makeWith('pubsub.kafka.producer', ['conf' => $conf]); + $producer->addBrokers($config['brokers']); + + // create consumer $consumer = $this->container->makeWith('pubsub.kafka.consumer', ['conf' => $conf]); return new KafkaPubSubAdapter($producer, $consumer); diff --git a/src/PubSubServiceProvider.php b/src/PubSubServiceProvider.php index 2058b9b..cbfe163 100644 --- a/src/PubSubServiceProvider.php +++ b/src/PubSubServiceProvider.php @@ -64,8 +64,8 @@ protected function registerAdapterDependencies() return new \RdKafka\TopicConf(); }); - $this->app->bind('pubsub.kafka.producer', function () { - return new \RdKafka\Producer(); + $this->app->bind('pubsub.kafka.producer', function ($app, $parameters) { + return new \RdKafka\Producer($parameters['conf']); }); $this->app->bind('pubsub.kafka.conf', function () {