Skip to content

Commit

Permalink
Explicitly set timestamps for kafka record producer (#441)
Browse files Browse the repository at this point in the history
  • Loading branch information
pulltab authored Jun 8, 2018
1 parent fc60805 commit 2920829
Showing 1 changed file with 3 additions and 2 deletions.
5 changes: 3 additions & 2 deletions src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use rdkafka::error::{KafkaError, RDKafkaError};
use rdkafka::message::{Message, OwnedMessage};
use rdkafka::producer::FutureProducer;
use rdkafka::producer::future_producer::DeliveryFuture;
use rdkafka::util::current_time_millis;
use sink::Sink;
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -145,7 +146,7 @@ impl KafkaMessageSender for FutureProducer<STFUContext> {
/* partition */ None,
Some(payload),
Some(key),
/* timestamp */ None,
Some(current_time_millis()),
/* block_ms */ 0,
)),
})
Expand Down Expand Up @@ -541,7 +542,7 @@ mod tests {
Some(payload.to_vec())
},
topic.to_owned(),
Timestamp::NotAvailable,
Timestamp::CreateTime(current_time_millis()),
-1,
-1,
);
Expand Down

0 comments on commit 2920829

Please sign in to comment.