Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Capture RDKafka Statistics as an internal built-in log #6131

Merged
merged 17 commits into from
Mar 23, 2021
Merged

Capture RDKafka Statistics as an internal built-in log #6131

merged 17 commits into from
Mar 23, 2021

Conversation

cirego
Copy link
Contributor

@cirego cirego commented Mar 18, 2021

This PR adds an internal system table to track statistics exposed by rdkafka via the stats callback.


This change is Reviewable

@cirego cirego self-assigned this Mar 18, 2021
@cirego cirego marked this pull request as draft March 18, 2021 00:09
#[derive(Hash, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub enum RDKafkaLog {
ConsumerStatistics,
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this probably just wants to a a KafkaConsumerStatistics variant in the existing Materialized enum! The existing separation for timely and differential logs is because those logs integrate with messages that are already integrated with the timely logging framework. As far as the timely logger is concerned, events generated by librdkafka look identical to events generated by other parts of Materialize, so I think you'll save yourself some trouble if you just combine the two.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback! I'm not sure if you saw the thread on Slack yesterday -- I originally tried adding this to the Materialized enum for the reasons you stated above. Unfortunately, rdkafka expects the GlueConsumerContext to be Send + Sync, prohibiting the ability to pass the Logger object into the callback's context. Frank advised that I write a new MPSC Logger , as I'll need to implement one that is both Send + Sync safe. By putting this into it's own enum, we won't need to modify the Materialized Logger for messages that it will never see.

Do you have thoughts on this? I'd like to keep the implementation simple and coherent with the rest of the codebase. If there's a way to use the Materialized Logger in the GlueConsumerContext, I'd be very happy to do that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, hm, I see. I think I understand what Frank is proposing, and it seems nice and general purpose, but it seems like it will require trudging through quite a bit of pain to implement. Personally I'd do something like this:

diff --git a/src/dataflow/src/source/kafka.rs b/src/dataflow/src/source/kafka.rs
index cc567dd71..61419acc1 100644
--- a/src/dataflow/src/source/kafka.rs
+++ b/src/dataflow/src/source/kafka.rs
@@ -62,6 +62,8 @@ pub struct KafkaSourceInfo {
     cached_files: Vec<PathBuf>,
     /// Timely worker logger for source events
     logger: Option<Logger>,
+    /// Receiver for librdkafka statistics events.
+    stats_rx: crossbeam_channel::Receiver<Statistics>,
 }
 
 impl SourceConstructor<Vec<u8>> for KafkaSourceInfo {
@@ -157,6 +159,12 @@ impl SourceInfo<Vec<u8>> for KafkaSourceInfo {
             }
         }
 
+        if let Some(logger) = self.logger {
+            while let Ok(statistics) = self.stats_rx.try_recv() {
+                logger.log(MaterializedEvent::KafkaConsumerStatistics { /* whatever */});
+            }
+        }
+
         let mut next_message = NextMessage::Pending;
         let consumer_count = self.get_partition_consumers_count();
         let mut attempts = 0;
@@ -329,8 +337,12 @@ impl KafkaSourceInfo {
             cluster_id,
             &config_options,
         );
+        let (stats_tx, stats_rx) = crossbeam_channel::unbounded();
         let consumer: BaseConsumer<GlueConsumerContext> = kafka_config
-            .create_with_context(GlueConsumerContext(consumer_activator))
+            .create_with_context(GlueConsumerContext {
+                activator: consumer_activator,
+                stats_tx,
+            })
             .expect("Failed to create Kafka Consumer");
         let cached_files = kc
             .cached_files
@@ -621,17 +633,21 @@ impl PartitionConsumer {
 
 /// An implementation of [`ConsumerContext`] that unparks the wrapped thread
 /// when the message queue switches from nonempty to empty.
-struct GlueConsumerContext(SyncActivator);
+struct GlueConsumerContext {
+    activator: SyncActivator,
+    stats_tx: crossbeam_channel::Sender<Statistics>,
+}
 
 impl ClientContext for GlueConsumerContext {
     fn stats(&self, statistics: Statistics) {
-        info!("Client stats: {:#?}", statistics);
+        self.stats_tx.send(statistics).expect("timely operator hung up while Kafka source active");
+        self.activate();
     }
 }
 
 impl GlueConsumerContext {
     fn activate(&self) {
-        self.0
+        self.activator
             .activate()
             .expect("timely operator hung up while Kafka source active");
     }

Possibly that's a terrible idea for some reason, but seems easy enough just to try out.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium term we can also make rust-rdkafka provide access to the stats events on a thread of your choice, which would also sidestep this issue nicely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That does seem much easier than the route that I was going down -- thank you! I'll give this a shot first thing tomorrow!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You bet! Probably worth pinging Frank or Eli or someone else on integrations to sanity check this. Maybe there's some reason this is crazy that I'm not seeing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mind linking the slack thread, @cirego ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was in the Rust channel. Here's the link to Frank's comment about creating an Arc<EventLinkArc> class: https://materializeinc.slack.com/archives/CMH6PG4CW/p1615938814067100

Copy link
Contributor

@antifuchs antifuchs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks cool to me! I left a few comments, am interested in what it'll look like with Nikhil's suggestions applied!

@@ -237,6 +238,7 @@ pub(crate) trait SourceConstructor<Out> {
worker_id: usize,
worker_count: usize,
logger: Option<Logger>,
rdkafka_logger: Option<Logger>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is turning into a ton of arguments, wow. Sounds like @benesch's suggestion will cut that argument? Otherwise, I guess one day someone should make a constructor with fewer muddled responsibilities (:

Copy link
Contributor Author

@cirego cirego Mar 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this will be removed in the new patchset!

rx_bytes,
tx,
tx_bytes,
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems fine, but as you're not using the tuples below (for, e.g. joins with other dataflows), you can save yourself one repetition of all those names below if you .give the packed row directly here (see timely.rs for an example of things doing that).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll look into that!

Instead of trying to create a new Logger class that implements Send +
Sync, use crossbeam to send messages from the callback context into the
source info context. This will activate the source, triggering the
source to read messages from the callback and update the system log
tables. When a source is dropped, the statistics are removed.
kafka_consumer_info_session.give((
(source_name, source_id, consumer_name),
time_ms,
DiffVector::new(vec![rx, rx_bytes, tx, tx_bytes]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fwiw, this would be more efficiently implemented as a bunch of DiffPair things. It's a bunch less clean, but compiles down to a 4-tuple, rather than an allocation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you say a bunch of DiffPair things, do you mean something like DiffPair::new(DiffPair::new(rx, rx_bytes), DiffPair::new(tx, tx_bytes))?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, unfortunately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me! Thanks for the tip!

Copy link
Contributor

@frankmcsherry frankmcsherry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

@cirego
Copy link
Contributor Author

cirego commented Mar 19, 2021

Last thing on my list to understand is whether or not the ForeignKey annotation actually does anything. It didn't cause any errors even though I had failed to implement drop for KafkaSourceInfo.

@antifuchs
Copy link
Contributor

Last thing on my list to understand is whether or not the ForeignKey annotation actually does anything.

It doesn't - decorative only, as far as I know. The primary key annotation is really dreadfully important, though.

@@ -49,6 +62,10 @@ pub struct KafkaSourceReader {
last_offsets: HashMap<i32, i64>,
/// Map from partition -> offset to start reading at
start_offsets: HashMap<i32, i64>,
/// Timely worker logger for source events
logger: Option<Logger>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm I think this is fine but maybe a better interface would be to add a function called emit_logs or something like that to the SourceReader trait which then ConsistencyInfo or something else that holds onto the logger can actually log.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not something that needs to happen in this pr though!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess another nice thing about having a separate function is that we can then amortize the logs and only emit them once per operator execution instead of once per incoming message

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this might be something to explore if we find ourselves adding more source types that want to log "source specific" things?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure yeah! as long as this seems to pass our load tests without a regression I'm fine with it :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! I'm running the avro upsert benchmark now. Previous testing didn't seem to show much of a delta with this logging enabled. Should have full results tomorrow.

@cirego cirego marked this pull request as ready for review March 22, 2021 23:13
@cirego
Copy link
Contributor Author

cirego commented Mar 22, 2021

There are no more TODO items on this PR from my perspective. It now does everything we want, as defined in MaterializeInc/database-issues#1742.

Copy link
Contributor

@ruchirK ruchirK left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good so far! the diffpair stuff seems painful to work with - maybe sticking with a diffvec for now is not so bad?

use crate::source::{NextMessage, SourceMessage, SourceReader};

/// Values recorded from the last rdkafka statistics callback, used to generate a
/// diff of values for logging
pub struct PreviousStats {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you #[derive(Default)] you don't have to explicitly zero init later

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Looks like Option defines it's default as None as well, so this should work nicely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

@@ -49,6 +62,10 @@ pub struct KafkaSourceReader {
last_offsets: HashMap<i32, i64>,
/// Map from partition -> offset to start reading at
start_offsets: HashMap<i32, i64>,
/// Timely worker logger for source events
logger: Option<Logger>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not something that needs to happen in this pr though!

@cirego
Copy link
Contributor Author

cirego commented Mar 22, 2021

looks good so far! the diffpair stuff seems painful to work with - maybe sticking with a diffvec for now is not so bad?

I think this might be "six of one, half a dozen of the other". Eliminating memory allocations is nice and the code has been written / is tested, so perhaps let's keep the DiffPair?

@ruchirK
Copy link
Contributor

ruchirK commented Mar 22, 2021

looks good so far! the diffpair stuff seems painful to work with - maybe sticking with a diffvec for now is not so bad?

I think this might be "six of one, half a dozen of the other". Eliminating memory allocations is nice and the code has been written / is tested, so perhaps let's keep the DiffPair?

Works for me! I think it could also be interesting in future work to try to have a SmallVec backed difference type so saving the allocation for statically sized vecs like this can get even easier

@cirego
Copy link
Contributor Author

cirego commented Mar 22, 2021

looks good so far! the diffpair stuff seems painful to work with - maybe sticking with a diffvec for now is not so bad?

I think this might be "six of one, half a dozen of the other". Eliminating memory allocations is nice and the code has been written / is tested, so perhaps let's keep the DiffPair?

Works for me! I think it could also be interesting in future work to try to have a SmallVec backed difference type so saving the allocation for statically sized vecs like this can get even easier

Awesome! Also, I'm not familiar enough with Rust, but perhaps a variadic type would also do the trick here?

Edit: looks like variadic types are still a work in progress

.send(statistics)
.expect("timely operator hung up while Kafka source active");
self.activate();
// info!("Client stats: {:#?}", statistics);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this can be removed now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good catch. Thank you!

We no longer need the print message (that's what this PR replaces!)
@ruchirK
Copy link
Contributor

ruchirK commented Mar 23, 2021

you could also try this for the diffpair stuff

diff --git a/src/dataflow/src/logging/materialized.rs b/src/dataflow/src/logging/materialized.rs
index 4dd3f4855..cd6d6448b 100644
--- a/src/dataflow/src/logging/materialized.rs
+++ b/src/dataflow/src/logging/materialized.rs
@@ -299,17 +299,39 @@ pub fn construct<A: Allocate>(
         use differential_dataflow::operators::Count;
         let kafka_consumer_info_current = kafka_consumer_info.as_collection().count().map({
             let mut row_packer = repr::RowPacker::new();
-            move |((consumer_name, source_id, partition_id), pairs)| {
+            move |(
+                (consumer_name, source_id, partition_id),
+                DiffPair {
+                    element1:
+                        DiffPair {
+                            element1:
+                                DiffPair {
+                                    element1: rxmsgs,
+                                    element2: rxbytes,
+                                },
+                            element2:
+                                DiffPair {
+                                    element1: txmsgs,
+                                    element2: txbytes,
+                                },
+                        },
+                    element2:
+                        DiffPair {
+                            element1: app_offset,
+                            element2: consumer_lag,
+                        },
+                },
+            )| {
                 row_packer.pack(&[
                     Datum::String(&consumer_name),
                     Datum::String(&source_id.to_string()),
                     Datum::String(&partition_id),
-                    Datum::Int64(pairs.element1.element1.element1),
-                    Datum::Int64(pairs.element1.element1.element2),
-                    Datum::Int64(pairs.element1.element2.element1),
-                    Datum::Int64(pairs.element1.element2.element2),
-                    Datum::Int64(pairs.element2.element1),
-                    Datum::Int64(pairs.element2.element2),
+                    Datum::Int64(rxmsgs),
+                    Datum::Int64(rxbytes),
+                    Datum::Int64(txmsgs),
+                    Datum::Int64(txbytes),
+                    Datum::Int64(app_offset),
+                    Datum::Int64(consumer_lag),
                 ])
             }
         });

its a bit more lines but it makes the structure of the input / meaning of the output more clear and is hopefully more resistant to someone else making a typo in the future

Copy link
Contributor

@antifuchs antifuchs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

riffing on ruchir's comment, I think a vector might look nicer?

src/dataflow/src/logging/materialized.rs Outdated Show resolved Hide resolved
src/dataflow/src/logging/materialized.rs Outdated Show resolved Hide resolved
@cirego
Copy link
Contributor Author

cirego commented Mar 23, 2021

riffing on ruchir's comment, I think a vector might look nicer?

@krishmanoh2 asked for 4 more metrics to gather. I'm going to go with DiffVector for now and then, ideally, we can use DiffArray once TimelyDataflow/differential-dataflow#318 lands.

Comment on lines +47 to +48
/// Materialize source identifier
source_id: SourceInstanceId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this will generate a string that has a global_id and instance_id inline as e.g. u0/1 where the global id tracks the source, and the instance ID tracks the per-worker/per-instantiation key.

I think it'd be better to split these out into two columns, because then the source_id here would be directly comparable to the GlobalIDs in other MaterializedEvent variants.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that but didn't see any easy way to split this out. Let me look again!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both the fields are pub, I think just renaming this to source: GlobalId, source_instance: u64 should do it.

Copy link
Contributor Author

@cirego cirego Mar 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! I missed that SourceInfo does &id.source_id.to_string(). Updated kafka_consumer_info to do the same.

I also added a test case to verify the join behavior.

2 1

# There should only be metrics from a single partition consumer
> SELECT count(*) FROM mz_kafka_consumer_statistics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any way to have a test that actually verifies all the columns look correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main issue is that the ``rxbytes` field depends on the size of the Avro encoding. Also, the offset stuff might be broker version dependent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to assert on the results; I'm concerned they might be brittle.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having a test that includes as many columns as you feel confident about would be fine, just to be sure that the things that it captures that are expected are correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I updated the test with the columns that we know should be correct and added a comment about when the answers might change.

@cirego cirego enabled auto-merge (squash) March 23, 2021 19:45
@@ -315,7 +315,7 @@ pub fn construct<A: Allocate>(
move |((consumer_name, source_id, partition_id), diff_vector)| {
row_packer.pack(&[
Datum::String(&consumer_name),
Datum::String(&source_id.to_string()),
Datum::String(&source_id.source_id.to_string()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is correct now, I think that you need both source_id and dataflow_id to uniquely identify these metrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, you're right! I just pushed a fix for this.

@cirego cirego disabled auto-merge March 23, 2021 20:50
@cirego cirego merged commit ca39730 into MaterializeInc:main Mar 23, 2021
@cirego cirego deleted the chris/system-table-kafka-metrics branch March 23, 2021 21:50
philip-stoev pushed a commit to philip-stoev/materialize that referenced this pull request Apr 6, 2021
…rializeInc#6131)

Kafka sources can be defined with a statistics_interval_ms option that
defines how often librdkafka / rdkafka should call into Materialize code
with kafka client statistics. Previously, we simply printed these metrics
to the logfile. This commit stashes a select number of metrics in a system
table to give users the ability to query these metrics directly.

Fixes #5666
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants