Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to seek Kafka transport to timestamp-based offsets #205

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/bgpstream_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#ifndef _BGPSTREAM_LOG_H
#define _BGPSTREAM_LOG_H

#include "config.h"
#include <stdarg.h>

#define BGPSTREAM_LOG_ERR 0
Expand All @@ -37,7 +38,11 @@
#define BGPSTREAM_LOG_VFINE 50
#define BGPSTREAM_LOG_FINEST 60

#ifdef DEBUG
#define BGPSTREAM_LOG_LEVEL BGPSTREAM_LOG_FINE
#else
#define BGPSTREAM_LOG_LEVEL BGPSTREAM_LOG_INFO
#endif

#define bgpstream_log(level, ...) \
do { \
Expand Down
3 changes: 3 additions & 0 deletions lib/bgpstream_resource.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ typedef enum bgpstream_resource_attr_type {
/** The path toward a local cache */
BGPSTREAM_RESOURCE_ATTR_CACHE_DIR_PATH = 3,

/** Kafka message timestamp to begin from */
BGPSTREAM_RESOURCE_ATTR_KAFKA_TIMESTAMP_FROM = 4,

/** INTERNAL: The total number of attribute types in use */
_BGPSTREAM_RESOURCE_ATTR_CNT,

Expand Down
37 changes: 37 additions & 0 deletions lib/datainterfaces/bsdi_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "config.h"
#include "utils.h"
#include <assert.h>
#include <errno.h>
#include <inttypes.h>
#include <string.h>
#include <wandio.h>
Expand Down Expand Up @@ -60,6 +61,7 @@ enum {
OPTION_TOPIC, // stored in kafka_topic res attribute
OPTION_CONSUMER_GROUP, // allow multiple BGPStream instances to load-balance
OPTION_OFFSET, // begin, end, committed
OPTION_TIMESTAMP_FROM, // msec since epoch (inclusive)
OPTION_DATA_TYPE, //
OPTION_PROJECT, //
OPTION_COLLECTOR, //
Expand Down Expand Up @@ -95,6 +97,13 @@ static bgpstream_data_interface_option_t options[] = {
"offset", // name
"initial offset (earliest/latest) (default: " BGPSTREAM_TRANSPORT_KAFKA_DEFAULT_OFFSET ")",
},
/* Timestamp to start from */
{
BGPSTREAM_DATA_INTERFACE_KAFKA, // interface ID
OPTION_TIMESTAMP_FROM, // internal ID
"timestamp-from", // name
"start from given timestamp (default: unused)",
},
/* Data type */
{
BGPSTREAM_DATA_INTERFACE_KAFKA, // interface ID
Expand Down Expand Up @@ -140,6 +149,9 @@ typedef struct bsdi_kafka_state {
// Offset
char *offset;

// Seek to message with timestamp
char *timestamp_from;

// explicitly set project name
char *project;

Expand Down Expand Up @@ -198,6 +210,7 @@ int bsdi_kafka_set_option(bsdi_t *di,
const char *option_value)
{
int found = 0;
int32_t i64;

switch (option_type->id) {
case OPTION_BROKERS:
Expand Down Expand Up @@ -241,6 +254,21 @@ int bsdi_kafka_set_option(bsdi_t *di,
}
break;

case OPTION_TIMESTAMP_FROM:
errno = 0;
i64 = strtoll(option_value, NULL, 10);
if (i64 == 0 || errno != 0) {
fprintf(stderr,
"ERROR: Invalid timestamp-from '%s'. Must be msec since epoch\n",
option_value);
return -1;
}
free(STATE->timestamp_from);
if ((STATE->timestamp_from = strdup(option_value)) == NULL) {
return -1;
}
break;

case OPTION_DATA_TYPE:
for (bgpstream_resource_format_type_t i = 0; i < ARR_CNT(type_strs); i++) {
if (strcmp(option_value, type_strs[i]) == 0) {
Expand Down Expand Up @@ -294,6 +322,9 @@ void bsdi_kafka_destroy(bsdi_t *di)
free(STATE->offset);
STATE->offset = NULL;

free(STATE->timestamp_from);
STATE->timestamp_from = NULL;

free(STATE->project);
STATE->project = NULL;

Expand Down Expand Up @@ -343,5 +374,11 @@ int bsdi_kafka_update_resources(bsdi_t *di)
return -1;
}

if (STATE->timestamp_from != NULL &&
bgpstream_resource_set_attr(
res, BGPSTREAM_RESOURCE_ATTR_KAFKA_TIMESTAMP_FROM, STATE->timestamp_from) != 0) {
return -1;
}

return 0;
}
144 changes: 138 additions & 6 deletions lib/transports/bs_transport_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@
#define STATE ((state_t *)(transport->state))

#define POLL_TIMEOUT_MSEC 500
#define SEEK_TIMEOUT_MSEC (10 * 1000)

typedef struct state {

// convenience local copies of attrs
char *topic;
char *group;
char *offset;
int64_t timestamp_from;

// rdkafka instance
rd_kafka_t *rk;
Expand All @@ -57,12 +59,17 @@ typedef struct state {
// has a fatal error occured?
int fatal_error;

// have we already performed an initial rebalance
// (used when seeking to timestamp)
int rebalance_done;

} state_t;

static int parse_attrs(bgpstream_transport_t *transport)
{
char buf[1024];
uint64_t ts;
uint64_t u64;
const char *tmpstr;

// Topic Name (required)
if (bgpstream_resource_get_attr(
Expand All @@ -81,9 +88,9 @@ static int parse_attrs(bgpstream_transport_t *transport)
if (bgpstream_resource_get_attr(
transport->res, BGPSTREAM_RESOURCE_ATTR_KAFKA_CONSUMER_GROUP) == NULL) {
// generate a "random" group ID
ts = epoch_msec();
srand(ts);
snprintf(buf, sizeof(buf), "bgpstream-%" PRIx64 "-%x", ts, rand());
u64 = epoch_msec();
srand(u64);
snprintf(buf, sizeof(buf), "bgpstream-%" PRIx64 "-%x", u64, rand());
if ((STATE->group = strdup(buf)) == NULL) {
return -1;
}
Expand Down Expand Up @@ -112,10 +119,18 @@ static int parse_attrs(bgpstream_transport_t *transport)
}
}

// Timestamp-from (optional)
if ((tmpstr = bgpstream_resource_get_attr(
transport->res, BGPSTREAM_RESOURCE_ATTR_KAFKA_TIMESTAMP_FROM)) != NULL) {
STATE->timestamp_from = strtoll(tmpstr, NULL, 10);
}

bgpstream_log(
BGPSTREAM_LOG_FINE,
"Kafka transport: brokers: '%s', topic: '%s', group: '%s', offset: %s",
transport->res->url, STATE->topic, STATE->group, STATE->offset);
"Kafka transport: brokers: '%s', topic: '%s', group: '%s', offset: %s, "
"timestamp-from: %"PRIi64,
transport->res->url, STATE->topic, STATE->group, STATE->offset,
STATE->timestamp_from);
return 0;
}

Expand Down Expand Up @@ -145,6 +160,118 @@ static void kafka_error_callback(rd_kafka_t *rk, int err, const char *reason,
rd_kafka_err2str(err), err, reason);
}

#ifdef DEBUG
static void log_partition_list (const rd_kafka_topic_partition_list_t *partitions)
{
int i;
for (i = 0; i < partitions->cnt; i++) {
bgpstream_log(BGPSTREAM_LOG_FINE, " - %s [%" PRId32 "] offset %" PRId64,
partitions->elems[i].topic,
partitions->elems[i].partition, partitions->elems[i].offset);
}
}
#endif

static void seek_timestamp_offset(bgpstream_transport_t *transport,
rd_kafka_topic_partition_list_t *partitions)
{
#ifdef DEBUG
bgpstream_log(BGPSTREAM_LOG_FINE, "Before seeking offsets to timestamps:");
log_partition_list(partitions);
#endif
// first, set all the offsets to our timestamp_from value
for (int i = 0; i < partitions->cnt; i++) {
partitions->elems[i].offset = STATE->timestamp_from;
}

// now ask for those to be replaced with the appropriate offset
rd_kafka_resp_err_t ret_err =
rd_kafka_offsets_for_times(STATE->rk, partitions, SEEK_TIMEOUT_MSEC);

switch (ret_err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
// all good
break;

case RD_KAFKA_RESP_ERR__TIMED_OUT:
case RD_KAFKA_RESP_ERR__INVALID_ARG:
case RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION:
case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE:
default:
// well, at least we tried
bgpstream_log(BGPSTREAM_LOG_WARN,
"Failed to seek some topics to initial timestamp: %s",
rd_kafka_err2str(ret_err));
break;
}

#ifdef DEBUG
bgpstream_log(BGPSTREAM_LOG_FINE, "After seeking offsets to timestamps:");
log_partition_list(partitions);
#endif
}

static void rebalance_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *partitions,
void *opaque)
{
bgpstream_transport_t *transport = (bgpstream_transport_t*)opaque;
rd_kafka_error_t *error = NULL;
rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;

// TODO: only seek to start time once per topic
bgpstream_log(BGPSTREAM_LOG_FINE, "Consumer group rebalanced, assigning offsets ");

switch (err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
#ifdef DEBUG
bgpstream_log(BGPSTREAM_LOG_FINE, "kafka: assigned (%s):", "TODO");
// rd_kafka_rebalance_protocol(rk));
log_partition_list(partitions);
#endif
if (STATE->rebalance_done == 0) {
seek_timestamp_offset(transport, partitions);
}
STATE->rebalance_done = 1;
// XXX TODO: fix this for new (as yet unreleased) librdkafka API!!
//if (!strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE"))
// error = rd_kafka_incremental_assign(rk, partitions);
//else
ret_err = rd_kafka_assign(rk, partitions);
break;

case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
#ifdef DEBUG
bgpstream_log(BGPSTREAM_LOG_FINE, "kafka: revoked (%s):", "TODO");
// rd_kafka_rebalance_protocol(rk));
log_partition_list(partitions);
#endif

// XXX TODO: fix this for new (as yet unreleased) librdkafka API!!
//if (!strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE")) {
// error = rd_kafka_incremental_unassign(rk, partitions);
//} else {
ret_err = rd_kafka_assign(rk, NULL);
//}
break;

default:
bgpstream_log(BGPSTREAM_LOG_ERR, "kafka: failed: %s",
rd_kafka_err2str(err));
rd_kafka_assign(rk, NULL);
break;
}

if (error != NULL) {
bgpstream_log(BGPSTREAM_LOG_ERR, "kafka: incremental assign failure: %s",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
} else if (ret_err) {
bgpstream_log(BGPSTREAM_LOG_ERR, "kafka: assign failure: %s",
rd_kafka_err2str(ret_err));
}
}

static int init_kafka_config(bgpstream_transport_t *transport,
rd_kafka_conf_t *conf)
{
Expand Down Expand Up @@ -179,6 +306,11 @@ static int init_kafka_config(bgpstream_transport_t *transport,
return -1;
}

// Set up a rebalance callback if we're going to seek to specific offsets
if (STATE->timestamp_from != 0) {
rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);
}

// Enable SO_KEEPALIVE in case we're behind a NAT
if (rd_kafka_conf_set(conf, "socket.keepalive.enable", "true", errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
Expand Down
2 changes: 1 addition & 1 deletion tools/bgpreader.c
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ static void dump_if_options()
fprintf(stderr, " [NONE]\n");
} else {
for (i = 0; i < opt_cnt; i++) {
fprintf(stderr, " %-*s", 15, options[i].name);
fprintf(stderr, " %-*s", 16, options[i].name);
wrap(options[i].description, 18, 18);
fprintf(stderr, "\n");
}
Expand Down