Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka: Optimizing the output of consume cli #1723

Merged
merged 2 commits into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion hstream-kafka/HStream/Kafka/Client/Cli.hs
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ data ConsumeCommandOpts = ConsumeCommandOpts
, offsetReset :: Maybe OffsetReset
, eof :: Bool
, autoCommit :: Bool
, verbose :: Bool
} deriving (Show, Eq)

-- librdkafka doesn't support set `allow.auto.create.topics` any more for consumer since v1.6
Expand All @@ -467,6 +468,7 @@ consumeCommandParser = ConsumeCommandOpts
<> help "Exit consumer when last message in partition has been received."
)
<*> flag True False (long "no-auto-commit" <> help "disable auto commit")
<*> switch (long "verbose" <> short 'v' <> help "print record key and create timestamp.")

handleConsumeCommand :: Options -> ConsumeCommandOpts -> IO ()
handleConsumeCommand Options{..} cmdopts = do
Expand All @@ -483,6 +485,7 @@ handleConsumeCommand Options{..} cmdopts = do
Just OffsetResetLatest -> "latest"
cAutoCommit = if cmdopts.autoCommit then 1 else 0
ceof = if cmdopts.eof then 1 else 0
cVerbose = if cmdopts.verbose then 1 else 0
consumer <-
HsForeign.withByteString brokers $ \brokers' brokers_size ->
HsForeign.withByteString groupIdBs $ \groupid' groupid_size ->
Expand All @@ -499,7 +502,7 @@ handleConsumeCommand Options{..} cmdopts = do

HsForeign.withByteStringList (map encodeUtf8 topics) $ \tds' tss' tl -> do
(errmsg, ret) <- unsafeWithStdString $
hs_consumer_consume consumer tds' tss' tl
hs_consumer_consume consumer tds' tss' tl cVerbose
when (ret /= 0) $ errorWithoutStackTrace $
"Consume failed: " <> (Text.unpack $ decodeUtf8 errmsg)

Expand All @@ -519,6 +522,7 @@ foreign import ccall interruptible "hs_consumer_consume"
hs_consumer_consume
:: Ptr HsConsumer
-> Ptr (Ptr Word8) -> Ptr Int -> Int -- topics
-> CBool -- verbose
-> Ptr HsForeign.StdString
-> IO Int

Expand Down
57 changes: 29 additions & 28 deletions hstream-kafka/cbits/hs_kafka_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
*/
#include <HsFFI.h>
#include <csignal>
#include <iomanip>
#include <iostream>
#include <librdkafka/rdkafkacpp.h>
#include <sstream>
#include <string>
#include <sys/time.h>
#include <unistd.h>

static int verbosity = 1;
static volatile sig_atomic_t run = 1;
static void sigterm(int sig) { run = 0; }

Expand Down Expand Up @@ -110,9 +111,6 @@ class HsRebalanceCb : public RdKafka::RebalanceCb {
public:
void rebalance_cb(RdKafka::KafkaConsumer* consumer, RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition*>& partitions) {
std::cerr << "RebalanceCb: " << RdKafka::err2str(err) << ": ";
part_list_print(partitions);

RdKafka::Error* error = NULL;
RdKafka::ErrorCode ret_err = RdKafka::ERR_NO_ERROR;

Expand Down Expand Up @@ -141,38 +139,41 @@ class HsRebalanceCb : public RdKafka::RebalanceCb {
}
};

void msg_consume(RdKafka::Message* message, void* opaque) {
void msg_consume(RdKafka::Message* message, bool verbose, void* opaque) {
switch (message->err()) {
case RdKafka::ERR__TIMED_OUT:
break;

case RdKafka::ERR_NO_ERROR:
case RdKafka::ERR_NO_ERROR: {
/* Real message */
consumer_msg_cnt++;
consumer_msg_bytes += message->len();
if (verbosity >= 3)
std::cerr << "Read msg at offset " << message->offset() << std::endl;
RdKafka::MessageTimestamp ts;
ts = message->timestamp();
if (verbosity >= 2 &&
ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE) {
std::string tsname = "?";
if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME)
tsname = "create time";
else if (ts.type ==
RdKafka::MessageTimestamp::MSG_TIMESTAMP_LOG_APPEND_TIME)
tsname = "log append time";
std::cout << "Timestamp: " << tsname << " " << ts.timestamp
<< std::endl;
}
if (verbosity >= 2 && message->key()) {
std::cout << "Key: " << *message->key() << std::endl;
}
if (verbosity >= 1) {
printf("%.*s\n", static_cast<int>(message->len()),
static_cast<const char*>(message->payload()));
std::ostringstream ss;
if (verbose) {
ss << "CreateTimestamp: ";
ss << std::left << std::setw(15) << std::setfill(' ');
if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME) {
ss << ts.timestamp;
} else {
ss << "";
}
ss << " ";
ss << "Key: ";
ss << std::left << std::setw(20) << std::setfill(' ');
if (message->key()) {
ss << *message->key();
} else {
ss << "";
}
ss << " ";
}
break;

ss << std::string(static_cast<const char*>(message->payload()),
message->len());
std::cout << ss.str() << std::endl;
} break;

case RdKafka::ERR__PARTITION_EOF:
/* Last message */
Expand Down Expand Up @@ -352,7 +353,7 @@ HsConsumer* hs_new_consumer(const char* brokers_, HsInt brokers_size_,
}

HsInt hs_consumer_consume(HsConsumer* c, const char** topic_datas,
HsInt* topic_sizes, HsInt topics_len,
HsInt* topic_sizes, HsInt topics_len, bool verbose,
std::string* errstr) {
std::signal(SIGINT, sigterm);
std::signal(SIGTERM, sigterm);
Expand All @@ -369,7 +370,7 @@ HsInt hs_consumer_consume(HsConsumer* c, const char** topic_datas,

while (run) {
RdKafka::Message* msg = c->consumer->consume(1000);
msg_consume(msg, NULL);
msg_consume(msg, verbose, NULL);
delete msg;
}

Expand Down