Skip to content

Commit

Permalink
Merge pull request salebab#7 in PH/phpkafka from intern-master to master
Browse files Browse the repository at this point in the history
* commit 'e52e0c8cadd1766404e2d834681863c4043db7ab':
  Do not pass timeout of 0 ms (== NOWAIT kafka)
  Test new methods
  Add method en enable/disable logging
  Add getTopics method -> use meta-api
  Add setBrokers method -> switch connection on instance
  • Loading branch information
Elias Van Ootegem committed Apr 7, 2015
2 parents 19178c4 + e52e0c8 commit 6e19b95
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 65 deletions.
146 changes: 93 additions & 53 deletions kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "librdkafka/rdkafka.h"

static int run = 1;
static int log_level = 1;
static rd_kafka_t *rk;
static rd_kafka_type_t rk_type;
static int exit_eof = 1; //Exit consumer when last message
Expand All @@ -42,6 +43,11 @@ void kafka_connect(char *brokers)
kafka_setup(brokers);
}

void kafka_set_log_level( int ll )
{
log_level = ll;
}

//return 1 if rd is not NULL
int kafka_is_connected( void )
{
Expand All @@ -63,18 +69,19 @@ void kafka_stop(int sig) {
}

void kafka_err_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) {
openlog("phpkafka", 0, LOG_USER);
syslog(LOG_INFO, "phpkafka - ERROR CALLBACK: %s: %s: %s\n",
if (log_level) {
openlog("phpkafka", 0, LOG_USER);
syslog(LOG_INFO, "phpkafka - ERROR CALLBACK: %s: %s: %s\n",
rd_kafka_name(rk), rd_kafka_err2str(err), reason);

}
kafka_stop(err);
}

void kafka_msg_delivered (rd_kafka_t *rk,
void *payload, size_t len,
int error_code,
void *opaque, void *msg_opaque) {
if (error_code) {
if (error_code && log_level) {
openlog("phpkafka", 0, LOG_USER);
syslog(LOG_INFO, "phpkafka - Message delivery failed: %s",
rd_kafka_err2str(error_code));
Expand Down Expand Up @@ -107,14 +114,18 @@ static void kafka_init( rd_kafka_type_t type )
char errstr[512];
rd_kafka_conf_t *conf = rd_kafka_conf_new();
if (!(rk = rd_kafka_new(type, conf, errstr, sizeof(errstr)))) {
openlog("phpkafka", 0, LOG_USER);
syslog(LOG_INFO, "phpkafka - failed to create new producer: %s", errstr);
if (log_level) {
openlog("phpkafka", 0, LOG_USER);
syslog(LOG_INFO, "phpkafka - failed to create new producer: %s", errstr);
}
exit(1);
}
/* Add brokers */
if (rd_kafka_brokers_add(rk, brokers) == 0) {
openlog("phpkafka", 0, LOG_USER);
syslog(LOG_INFO, "php kafka - No valid brokers specified");
if (log_level) {
openlog("phpkafka", 0, LOG_USER);
syslog(LOG_INFO, "php kafka - No valid brokers specified");
}
exit(1);
}
/* Set up a message delivery report callback.
Expand All @@ -123,8 +134,10 @@ static void kafka_init( rd_kafka_type_t type )
rd_kafka_conf_set_dr_cb(conf, kafka_msg_delivered);
rd_kafka_conf_set_error_cb(conf, kafka_err_cb);

openlog("phpkafka", 0, LOG_USER);
syslog(LOG_INFO, "phpkafka - using: %s", brokers);
if (log_level) {
openlog("phpkafka", 0, LOG_USER);
syslog(LOG_INFO, "phpkafka - using: %s", brokers);
}
}
}

Expand Down Expand Up @@ -157,52 +170,75 @@ void kafka_produce(char* topic, char* msg, int msg_len)
* delivery report callback as
* msg_opaque. */
NULL) == -1) {
openlog("phpkafka", 0, LOG_USER);
syslog(LOG_INFO, "phpkafka - %% Failed to produce to topic %s "
"partition %i: %s",
rd_kafka_topic_name(rkt), partition,
rd_kafka_err2str(
rd_kafka_errno2err(errno)));
rd_kafka_poll(rk, 0);
if (log_level) {
openlog("phpkafka", 0, LOG_USER);
syslog(LOG_INFO, "phpkafka - %% Failed to produce to topic %s "
"partition %i: %s",
rd_kafka_topic_name(rkt), partition,
rd_kafka_err2str(
rd_kafka_errno2err(errno)));
}
rd_kafka_poll(rk, 1);
}

/* Poll to handle delivery reports */
rd_kafka_poll(rk, 0);
rd_kafka_poll(rk, 1);

/* Wait for messages to be delivered */
while (run && rd_kafka_outq_len(rk) > 0)
rd_kafka_poll(rk, 100);
rd_kafka_poll(rk, 10);

rd_kafka_topic_destroy(rkt);
}

static rd_kafka_message_t *msg_consume(rd_kafka_message_t *rkmessage,
void *opaque) {
if (rkmessage->err) {
if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
openlog("phpkafka", 0, LOG_USER);
syslog(LOG_INFO,
"phpkafka - %% Consumer reached end of %s [%"PRId32"] "
"message queue at offset %"PRId64"\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition, rkmessage->offset);
if (exit_eof)
run = 0;
return NULL;
if (rkmessage->err) {
if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
if (log_level) {
openlog("phpkafka", 0, LOG_USER);
syslog(LOG_INFO,
"phpkafka - %% Consumer reached end of %s [%"PRId32"] "
"message queue at offset %"PRId64"\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition, rkmessage->offset);
}
if (exit_eof)
run = 0;
return NULL;
}
if (log_level) {
openlog("phpkafka", 0, LOG_USER);
syslog(LOG_INFO, "phpkafka - %% Consume error for topic \"%s\" [%"PRId32"] "
"offset %"PRId64": %s\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition,
rkmessage->offset,
rd_kafka_message_errstr(rkmessage)
);
return NULL;
}
}

openlog("phpkafka", 0, LOG_USER);
syslog(LOG_INFO, "phpkafka - %% Consume error for topic \"%s\" [%"PRId32"] "
"offset %"PRId64": %s\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition,
rkmessage->offset,
rd_kafka_message_errstr(rkmessage));
return NULL;
}
return rkmessage;
}

//php_printf("%.*s\n", (int)rkmessage->len, (char *)rkmessage->payload);
return rkmessage;
//get topics + partition count
void kafka_get_topics(zval *return_value)
{
int i;
const struct rd_kafka_metadata *meta;
kafka_init(RD_KAFKA_CONSUMER);
if (RD_KAFKA_RESP_ERR_NO_ERROR == rd_kafka_metadata(rk, 1, NULL, &meta, 200)) {
for (i=0;i<meta->topic_cnt;++i) {
add_assoc_long(
return_value,
meta->topics[i].topic,
(long) meta->topics[i].partition_cnt
);
}
}
rd_kafka_metadata_destroy(meta);
}

//get the available partitions for a given topic
Expand All @@ -211,8 +247,7 @@ void kafka_get_partitions(zval *return_value, char *topic)
rd_kafka_topic_t *rkt;
rd_kafka_topic_conf_t *conf;
int i;//C89 compliant
//connect if required
//preserve current type
//connect as consumer if required
kafka_init(RD_KAFKA_CONSUMER);
/* Topic configuration */
conf = rd_kafka_topic_conf_new();
Expand All @@ -226,8 +261,8 @@ void kafka_get_partitions(zval *return_value, char *topic)
for (i=0;i<meta[0]->topics->partition_cnt;++i) {
add_next_index_long(return_value, i);
}
rd_kafka_metadata_destroy(meta[0]);
}
rd_kafka_metadata_destroy(meta[0]);
}

void kafka_consume(zval* return_value, char* topic, char* offset, int item_count)
Expand Down Expand Up @@ -257,16 +292,19 @@ void kafka_consume(zval* return_value, char* topic, char* offset, int item_count

/* Create topic */
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
if (log_level) {
openlog("phpkafka", 0, LOG_USER);
syslog(LOG_INFO, "phpkafka - start_offset: %"PRId64" and offset passed: %s", start_offset, offset);

openlog("phpkafka", 0, LOG_USER);
syslog(LOG_INFO, "phpkafka - start_offset: %"PRId64" and offset passed: %s", start_offset, offset);

}
/* Start consuming */
if (rd_kafka_consume_start(rkt, partition, start_offset) == -1) {
openlog("phpkafka", 0, LOG_USER);
syslog(LOG_INFO, "phpkafka - %% Failed to start consuming: %s",
rd_kafka_err2str(rd_kafka_errno2err(errno)));
exit(1);
if (log_level) {
openlog("phpkafka", 0, LOG_USER);
syslog(LOG_INFO, "phpkafka - %% Failed to start consuming: %s",
rd_kafka_err2str(rd_kafka_errno2err(errno)));
}
exit(1);
}

if (item_count != 0) {
Expand All @@ -276,8 +314,10 @@ void kafka_consume(zval* return_value, char* topic, char* offset, int item_count
while (run) {
if (item_count != 0 && read_counter >= 0) {
read_counter--;
openlog("phpkafka", 0, LOG_USER);
syslog(LOG_INFO, "phpkafka - read_counter: %d", read_counter);
if (log_level) {
openlog("phpkafka", 0, LOG_USER);
syslog(LOG_INFO, "phpkafka - read_counter: %d", read_counter);
}
if (read_counter == -1) {
run = 0;
continue;//so continue, or we'll get a segfault
Expand Down
2 changes: 2 additions & 0 deletions kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
#define __KAFKA_H__

void kafka_setup(char *brokers);
void kafka_set_log_level(int ll);
void kafka_set_partition(int partition);
void kafka_produce(char* topic, char* msg, int msg_len);
int kafka_is_connected( void );
void kafka_consume(zval* return_value, char* topic, char* offset, int item_count);
void kafka_get_partitions(zval *return_value, char *topic);
void kafka_get_topics(zval *return_value);
void kafka_destroy();

#endif
70 changes: 67 additions & 3 deletions php_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ static zend_function_entry kafka_functions[] = {
PHP_ME(Kafka, __destruct, NULL, ZEND_ACC_DTOR | ZEND_ACC_PUBLIC)
PHP_ME(Kafka, set_partition, NULL, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED)
PHP_ME(Kafka, setPartition, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Kafka, setLogLevel, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Kafka, getPartitionsForTopic, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Kafka, setBrokers, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Kafka, getTopics, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Kafka, disconnect, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Kafka, isConnected, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Kafka, produce, NULL, ZEND_ACC_PUBLIC)
Expand All @@ -57,9 +60,10 @@ zend_module_entry kafka_module_entry = {
#ifdef COMPILE_DL_KAFKA
ZEND_GET_MODULE(kafka)
#endif

#define REGISTER_KAFKA_CLASS_CONST_STRING(ce, name, value) \
zend_declare_class_constant_stringl(ce, name, sizeof(name)-1, value, sizeof(value)-1)
#define REGISTER_KAFKA_CLASS_CONST_LONG(ce, name, value) \
zend_declare_class_constant_long(ce, name, sizeof(name)-1, value)

#ifndef BASE_EXCEPTION
#if (PHP_MAJOR_VERSION < 5) || ( ( PHP_MAJOR_VERSION == 5 ) && (PHP_MINOR_VERSION < 2) )
Expand All @@ -79,6 +83,8 @@ PHP_MINIT_FUNCTION(kafka)
zend_declare_property_null(kafka_ce, "partition", sizeof("partition") -1, ZEND_ACC_PRIVATE TSRMLS_CC);
REGISTER_KAFKA_CLASS_CONST_STRING(kafka_ce, "OFFSET_BEGIN", PHP_KAFKA_OFFSET_BEGIN);
REGISTER_KAFKA_CLASS_CONST_STRING(kafka_ce, "OFFSET_END", PHP_KAFKA_OFFSET_END);
REGISTER_KAFKA_CLASS_CONST_LONG(kafka_ce, "LOG_ON", PHP_KAFKA_LOGLEVEL_ON);
REGISTER_KAFKA_CLASS_CONST_LONG(kafka_ce, "LOG_OFF", PHP_KAFKA_LOGLEVEL_OFF);
return SUCCESS;
}
PHP_RSHUTDOWN_FUNCTION(kafka) { return SUCCESS; }
Expand Down Expand Up @@ -136,7 +142,7 @@ PHP_METHOD(Kafka, set_partition)
zval *partition;

if (
zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|z", &partition) == FAILURE
zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z", &partition) == FAILURE
||
Z_TYPE_P(partition) != IS_LONG
) {
Expand All @@ -150,6 +156,33 @@ PHP_METHOD(Kafka, set_partition)
}
/* }}} end Kafka::set_partition */

/* {{{ proto Kafka Kafka::setLogLevel( mixed $logLevel )
toggle syslogging on or off use Kafka::LOG_* constants
*/
PHP_METHOD(Kafka, setLogLevel)
{
zval *log_level;
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z", &log_level) == FAILURE)
{
return;//?
}
if (Z_TYPE_P(log_level) != IS_LONG) {
zend_throw_exception(BASE_EXCEPTION, "Kafka::setLogLevel expects argument to be an int", 0 TSRMLS_CC);
return;
}
if (
Z_LVAL_P(log_level) != PHP_KAFKA_LOGLEVEL_ON
&&
Z_LVAL_P(log_level) != PHP_KAFKA_LOGLEVEL_OFF
) {
zend_throw_exception(BASE_EXCEPTION, "Invalid argument, use Kafka::LOG_* constants", 0 TSRMLS_CC);
return;
}
kafka_set_log_level(Z_LVAL_P(log_level));
RETURN_ZVAL(getThis(), 1, 0);
}
/* }}} end Kafka::setLogLevel */

/* {{{ proto Kafka Kafka::setPartition( int $partition );
Set partition to use for Kafka::consume calls
*/
Expand All @@ -158,7 +191,7 @@ PHP_METHOD(Kafka, setPartition)
zval *partition;

if (
zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|z", &partition) == FAILURE
zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z", &partition) == FAILURE
||
Z_TYPE_P(partition) != IS_LONG
) {
Expand All @@ -172,6 +205,37 @@ PHP_METHOD(Kafka, setPartition)
}
/* }}} end Kafka::setPartition */

/* {{{ proto array Kafka::getTopics( void )
Get all existing topics
*/
PHP_METHOD(Kafka, getTopics)
{
array_init(return_value);
kafka_get_topics(return_value);
}
/* }}} end Kafka::getTopics */

/* {{{ proto Kafka Kafka::setBrokers ( string $brokers)
Set brokers on-the-fly
*/
PHP_METHOD(Kafka, setBrokers)
{
zval *brokers;

if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z",
&brokers) == FAILURE) {
return;
}
if (Z_TYPE_P(brokers) != IS_STRING) {
zend_throw_exception(BASE_EXCEPTION, "Kafka::setBrokers expects argument to be a string", 0 TSRMLS_CC);
return;
}
kafka_destroy();
kafka_connect(Z_STRVAL_P(brokers));
RETURN_ZVAL(getThis(), 1, 0);
}
/* }}} end Kafka::setBrokers */

/* {{{ proto array Kafka::getPartitionsForTopic( string $topic )
Get an array of available partitions for a given topic
*/
Expand Down
Loading

0 comments on commit 6e19b95

Please sign in to comment.