diff --git a/newrelic/hooks/messagebroker_confluentkafka.py b/newrelic/hooks/messagebroker_confluentkafka.py index b7c70a129..7efcf1266 100644 --- a/newrelic/hooks/messagebroker_confluentkafka.py +++ b/newrelic/hooks/messagebroker_confluentkafka.py @@ -56,14 +56,18 @@ def wrap_Producer_produce(wrapped, instance, args, kwargs): args = args[1:] else: topic = kwargs.pop("topic", None) + topic = topic or "Default" transaction.add_messagebroker_info("Confluent-Kafka", get_package_version("confluent-kafka")) + if hasattr(instance, "_nr_bootstrap_servers"): + for server_name in instance._nr_bootstrap_servers: + transaction.record_custom_metric("MessageBroker/Kafka/Nodes/%s/Produce/%s" % (server_name, topic), 1) with MessageTrace( library="Kafka", operation="Produce", destination_type="Topic", - destination_name=topic or "Default", + destination_name=topic, source=wrapped, ): dt_headers = {k: v.encode("utf-8") for k, v in MessageTrace.generate_request_headers(transaction)} @@ -166,6 +170,11 @@ def wrap_Consumer_poll(wrapped, instance, args, kwargs): name = "Named/%s" % destination_name transaction.record_custom_metric("%s/%s/Received/Bytes" % (group, name), received_bytes) transaction.record_custom_metric("%s/%s/Received/Messages" % (group, name), message_count) + if hasattr(instance, "_nr_bootstrap_servers"): + for server_name in instance._nr_bootstrap_servers: + transaction.record_custom_metric( + "MessageBroker/Kafka/Nodes/%s/Consume/%s" % (server_name, destination_name), 1 + ) transaction.add_messagebroker_info("Confluent-Kafka", get_package_version("confluent-kafka")) return record @@ -219,6 +228,32 @@ def wrap_DeserializingConsumer_init(wrapped, instance, args, kwargs): instance._value_deserializer = wrap_serializer("Deserialization/Value", "Message")(instance._value_deserializer) +def wrap_Producer_init(wrapped, instance, args, kwargs): + wrapped(*args, **kwargs) + + # Try to capture the boostrap server info that is passed in in the configuration. + try: + conf = args[0] + servers = conf.get("bootstrap.servers") + if servers: + instance._nr_bootstrap_servers = servers.split(",") + except Exception: + pass + + +def wrap_Consumer_init(wrapped, instance, args, kwargs): + wrapped(*args, **kwargs) + + # Try to capture the boostrap server info that is passed in in the configuration. + try: + conf = args[0] + servers = conf.get("bootstrap.servers") + if servers: + instance._nr_bootstrap_servers = servers.split(",") + except Exception: + pass + + def wrap_immutable_class(module, class_name): # Wrap immutable binary extension class with a mutable Python subclass new_class = type(class_name, (getattr(module, class_name),), {}) @@ -230,10 +265,12 @@ def instrument_confluentkafka_cimpl(module): if hasattr(module, "Producer"): wrap_immutable_class(module, "Producer") wrap_function_wrapper(module, "Producer.produce", wrap_Producer_produce) + wrap_function_wrapper(module, "Producer.__init__", wrap_Producer_init) if hasattr(module, "Consumer"): wrap_immutable_class(module, "Consumer") wrap_function_wrapper(module, "Consumer.poll", wrap_Consumer_poll) + wrap_function_wrapper(module, "Consumer.__init__", wrap_Consumer_init) def instrument_confluentkafka_serializing_producer(module): diff --git a/newrelic/hooks/messagebroker_kafkapython.py b/newrelic/hooks/messagebroker_kafkapython.py index cead5ede4..b5e0fc51d 100644 --- a/newrelic/hooks/messagebroker_kafkapython.py +++ b/newrelic/hooks/messagebroker_kafkapython.py @@ -47,6 +47,7 @@ def wrap_KafkaProducer_send(wrapped, instance, args, kwargs): return wrapped(*args, **kwargs) topic, value, key, headers, partition, timestamp_ms = _bind_send(*args, **kwargs) + topic = topic or "Default" headers = list(headers) if headers else [] transaction.add_messagebroker_info( @@ -57,7 +58,7 @@ def wrap_KafkaProducer_send(wrapped, instance, args, kwargs): library="Kafka", operation="Produce", destination_type="Topic", - destination_name=topic or "Default", + destination_name=topic, source=wrapped, terminal=False, ): @@ -66,6 +67,9 @@ def wrap_KafkaProducer_send(wrapped, instance, args, kwargs): if headers: dt_headers.extend(headers) + if hasattr(instance, "config"): + for server_name in instance.config.get("bootstrap_servers", []): + transaction.record_custom_metric("MessageBroker/Kafka/Nodes/%s/Produce/%s" % (server_name, topic), 1) try: return wrapped( topic, value=value, key=key, headers=dt_headers, partition=partition, timestamp_ms=timestamp_ms @@ -154,6 +158,11 @@ def wrap_kafkaconsumer_next(wrapped, instance, args, kwargs): name = "Named/%s" % destination_name transaction.record_custom_metric("%s/%s/Received/Bytes" % (group, name), received_bytes) transaction.record_custom_metric("%s/%s/Received/Messages" % (group, name), message_count) + if hasattr(instance, "config"): + for server_name in instance.config.get("bootstrap_servers", []): + transaction.record_custom_metric( + "MessageBroker/Kafka/Nodes/%s/Consume/%s" % (server_name, destination_name), 1 + ) transaction.add_messagebroker_info( "Kafka-Python", get_package_version("kafka-python") or get_package_version("kafka-python-ng") ) diff --git a/tests/messagebroker_confluentkafka/conftest.py b/tests/messagebroker_confluentkafka/conftest.py index 576ec27f8..64e91046e 100644 --- a/tests/messagebroker_confluentkafka/conftest.py +++ b/tests/messagebroker_confluentkafka/conftest.py @@ -27,7 +27,11 @@ DB_SETTINGS = kafka_settings()[0] -BROKER = "%s:%s" % (DB_SETTINGS["host"], DB_SETTINGS["port"]) + +@pytest.fixture(scope="session") +def broker(): + BROKER = "%s:%s" % (DB_SETTINGS["host"], DB_SETTINGS["port"]) + return BROKER _default_settings = { @@ -58,15 +62,15 @@ def skip_if_not_serializing(client_type): @pytest.fixture(scope="function") -def producer(topic, client_type, json_serializer): +def producer(topic, client_type, json_serializer, broker): from confluent_kafka import Producer, SerializingProducer if client_type == "cimpl": - producer = Producer({"bootstrap.servers": BROKER}) + producer = Producer({"bootstrap.servers": broker}) elif client_type == "serializer_function": producer = SerializingProducer( { - "bootstrap.servers": BROKER, + "bootstrap.servers": broker, "value.serializer": lambda v, c: json.dumps(v).encode("utf-8"), "key.serializer": lambda v, c: json.dumps(v).encode("utf-8") if v is not None else None, } @@ -74,7 +78,7 @@ def producer(topic, client_type, json_serializer): elif client_type == "serializer_object": producer = SerializingProducer( { - "bootstrap.servers": BROKER, + "bootstrap.servers": broker, "value.serializer": json_serializer, "key.serializer": json_serializer, } @@ -87,13 +91,13 @@ def producer(topic, client_type, json_serializer): @pytest.fixture(scope="function") -def consumer(group_id, topic, producer, client_type, json_deserializer): +def consumer(group_id, topic, producer, client_type, json_deserializer, broker): from confluent_kafka import Consumer, DeserializingConsumer if client_type == "cimpl": consumer = Consumer( { - "bootstrap.servers": BROKER, + "bootstrap.servers": broker, "auto.offset.reset": "earliest", "heartbeat.interval.ms": 1000, "group.id": group_id, @@ -102,7 +106,7 @@ def consumer(group_id, topic, producer, client_type, json_deserializer): elif client_type == "serializer_function": consumer = DeserializingConsumer( { - "bootstrap.servers": BROKER, + "bootstrap.servers": broker, "auto.offset.reset": "earliest", "heartbeat.interval.ms": 1000, "group.id": group_id, @@ -113,7 +117,7 @@ def consumer(group_id, topic, producer, client_type, json_deserializer): elif client_type == "serializer_object": consumer = DeserializingConsumer( { - "bootstrap.servers": BROKER, + "bootstrap.servers": broker, "auto.offset.reset": "earliest", "heartbeat.interval.ms": 1000, "group.id": group_id, @@ -168,12 +172,12 @@ def __call__(self, obj, ctx): @pytest.fixture(scope="function") -def topic(): +def topic(broker): from confluent_kafka.admin import AdminClient, NewTopic topic = "test-topic-%s" % str(uuid.uuid4()) - admin = AdminClient({"bootstrap.servers": BROKER}) + admin = AdminClient({"bootstrap.servers": broker}) new_topics = [NewTopic(topic, num_partitions=1, replication_factor=1)] topics = admin.create_topics(new_topics) for _, f in topics.items(): diff --git a/tests/messagebroker_confluentkafka/test_consumer.py b/tests/messagebroker_confluentkafka/test_consumer.py index 31f9478b3..d7780f7c7 100644 --- a/tests/messagebroker_confluentkafka/test_consumer.py +++ b/tests/messagebroker_confluentkafka/test_consumer.py @@ -37,11 +37,11 @@ from newrelic.packages import six -def test_custom_metrics(get_consumer_record, topic): +def test_custom_metrics(get_consumer_record, topic, expected_broker_metrics): custom_metrics = [ ("Message/Kafka/Topic/Named/%s/Received/Bytes" % topic, 1), ("Message/Kafka/Topic/Named/%s/Received/Messages" % topic, 1), - ] + ] + expected_broker_metrics @validate_transaction_metrics( "Named/%s" % topic, @@ -65,7 +65,7 @@ def _test(): _test() -def test_custom_metrics_on_existing_transaction(get_consumer_record, topic): +def test_custom_metrics_on_existing_transaction(get_consumer_record, topic, expected_broker_metrics): from confluent_kafka import __version__ as version transaction_name = ( @@ -78,7 +78,8 @@ def test_custom_metrics_on_existing_transaction(get_consumer_record, topic): ("Message/Kafka/Topic/Named/%s/Received/Bytes" % topic, 1), ("Message/Kafka/Topic/Named/%s/Received/Messages" % topic, 1), ("Python/MessageBroker/Confluent-Kafka/%s" % version, 1), - ], + ] + + expected_broker_metrics, background_task=True, ) @validate_transaction_count(1) @@ -89,7 +90,7 @@ def _test(): _test() -def test_custom_metrics_inactive_transaction(get_consumer_record, topic): +def test_custom_metrics_inactive_transaction(get_consumer_record, topic, expected_missing_broker_metrics): transaction_name = ( "test_consumer:test_custom_metrics_inactive_transaction.._test" if six.PY3 else "test_consumer:_test" ) @@ -99,7 +100,8 @@ def test_custom_metrics_inactive_transaction(get_consumer_record, topic): custom_metrics=[ ("Message/Kafka/Topic/Named/%s/Received/Bytes" % topic, None), ("Message/Kafka/Topic/Named/%s/Received/Messages" % topic, None), - ], + ] + + expected_missing_broker_metrics, background_task=True, ) @validate_transaction_count(1) @@ -148,7 +150,7 @@ def _test(): _test() -def test_distributed_tracing_headers(topic, producer, consumer, serialize): +def test_distributed_tracing_headers(topic, producer, consumer, serialize, expected_broker_metrics): # Produce the messages inside a transaction, making sure to close it. @validate_transaction_count(1) @background_task() @@ -162,7 +164,8 @@ def _produce(): rollup_metrics=[ ("Supportability/DistributedTrace/AcceptPayload/Success", None), ("Supportability/TraceContext/Accept/Success", 1), - ], + ] + + expected_broker_metrics, background_task=True, ) @validate_transaction_count(1) @@ -188,3 +191,13 @@ def _test(): _produce() _consume() + + +@pytest.fixture(scope="function") +def expected_broker_metrics(broker, topic): + return [("MessageBroker/Kafka/Nodes/%s/Consume/%s" % (server, topic), 1) for server in broker.split(",")] + + +@pytest.fixture(scope="function") +def expected_missing_broker_metrics(broker, topic): + return [("MessageBroker/Kafka/Nodes/%s/Consume/%s" % (server, topic), None) for server in broker.split(",")] diff --git a/tests/messagebroker_confluentkafka/test_producer.py b/tests/messagebroker_confluentkafka/test_producer.py index fe33794fa..170edfb38 100644 --- a/tests/messagebroker_confluentkafka/test_producer.py +++ b/tests/messagebroker_confluentkafka/test_producer.py @@ -97,7 +97,7 @@ def producer_callback2(err, msg): assert callback2_called.wait(5), "Callback never called." -def test_trace_metrics(topic, send_producer_message): +def test_trace_metrics(topic, send_producer_message, expected_broker_metrics): from confluent_kafka import __version__ as version scoped_metrics = [("MessageBroker/Kafka/Topic/Produce/Named/%s" % topic, 1)] @@ -108,7 +108,7 @@ def test_trace_metrics(topic, send_producer_message): txn_name, scoped_metrics=scoped_metrics, rollup_metrics=unscoped_metrics, - custom_metrics=[("Python/MessageBroker/Confluent-Kafka/%s" % version, 1)], + custom_metrics=[("Python/MessageBroker/Confluent-Kafka/%s" % version, 1)] + expected_broker_metrics, background_task=True, ) @background_task() @@ -118,7 +118,7 @@ def test(): test() -def test_distributed_tracing_headers(topic, send_producer_message): +def test_distributed_tracing_headers(topic, send_producer_message, expected_broker_metrics): txn_name = "test_producer:test_distributed_tracing_headers..test" if six.PY3 else "test_producer:test" @validate_transaction_metrics( @@ -126,7 +126,8 @@ def test_distributed_tracing_headers(topic, send_producer_message): rollup_metrics=[ ("Supportability/TraceContext/Create/Success", 1), ("Supportability/DistributedTrace/CreatePayload/Success", 1), - ], + ] + + expected_broker_metrics, background_task=True, ) @background_task() @@ -138,13 +139,14 @@ def test(): test() -def test_distributed_tracing_headers_under_terminal(topic, send_producer_message): +def test_distributed_tracing_headers_under_terminal(topic, send_producer_message, expected_broker_metrics): @validate_transaction_metrics( "test_distributed_tracing_headers_under_terminal", rollup_metrics=[ ("Supportability/TraceContext/Create/Success", 1), ("Supportability/DistributedTrace/CreatePayload/Success", 1), - ], + ] + + expected_broker_metrics, background_task=True, ) @background_task(name="test_distributed_tracing_headers_under_terminal") @@ -170,3 +172,8 @@ def test(): producer.flush() test() + + +@pytest.fixture(scope="function") +def expected_broker_metrics(broker, topic): + return [("MessageBroker/Kafka/Nodes/%s/Produce/%s" % (server, topic), 1) for server in broker.split(",")] diff --git a/tests/messagebroker_kafkapython/conftest.py b/tests/messagebroker_kafkapython/conftest.py index 4a692f18a..8d82cef63 100644 --- a/tests/messagebroker_kafkapython/conftest.py +++ b/tests/messagebroker_kafkapython/conftest.py @@ -28,8 +28,11 @@ DB_SETTINGS = kafka_settings()[0] -BOOTSTRAP_SERVER = "%s:%s" % (DB_SETTINGS["host"], DB_SETTINGS["port"]) -BROKER = [BOOTSTRAP_SERVER] + +@pytest.fixture(scope="session") +def broker(): + BOOTSTRAP_SERVER = "%s:%s" % (DB_SETTINGS["host"], DB_SETTINGS["port"]) + return [BOOTSTRAP_SERVER] _default_settings = { @@ -62,24 +65,24 @@ def skip_if_not_serializing(client_type): @pytest.fixture(scope="function") -def producer(client_type, json_serializer, json_callable_serializer): +def producer(client_type, json_serializer, json_callable_serializer, broker): if client_type == "no_serializer": - producer = kafka.KafkaProducer(bootstrap_servers=BROKER) + producer = kafka.KafkaProducer(bootstrap_servers=broker) elif client_type == "serializer_function": producer = kafka.KafkaProducer( - bootstrap_servers=BROKER, + bootstrap_servers=broker, value_serializer=lambda v: json.dumps(v).encode("utf-8") if v else None, key_serializer=lambda v: json.dumps(v).encode("utf-8") if v else None, ) elif client_type == "callable_object": producer = kafka.KafkaProducer( - bootstrap_servers=BROKER, + bootstrap_servers=broker, value_serializer=json_callable_serializer, key_serializer=json_callable_serializer, ) elif client_type == "serializer_object": producer = kafka.KafkaProducer( - bootstrap_servers=BROKER, + bootstrap_servers=broker, value_serializer=json_serializer, key_serializer=json_serializer, ) @@ -89,11 +92,11 @@ def producer(client_type, json_serializer, json_callable_serializer): @pytest.fixture(scope="function") -def consumer(group_id, topic, producer, client_type, json_deserializer, json_callable_deserializer): +def consumer(group_id, topic, producer, client_type, json_deserializer, json_callable_deserializer, broker): if client_type == "no_serializer": consumer = kafka.KafkaConsumer( topic, - bootstrap_servers=BROKER, + bootstrap_servers=broker, auto_offset_reset="earliest", consumer_timeout_ms=100, heartbeat_interval_ms=1000, @@ -102,7 +105,7 @@ def consumer(group_id, topic, producer, client_type, json_deserializer, json_cal elif client_type == "serializer_function": consumer = kafka.KafkaConsumer( topic, - bootstrap_servers=BROKER, + bootstrap_servers=broker, key_deserializer=lambda v: json.loads(v.decode("utf-8")) if v else None, value_deserializer=lambda v: json.loads(v.decode("utf-8")) if v else None, auto_offset_reset="earliest", @@ -113,7 +116,7 @@ def consumer(group_id, topic, producer, client_type, json_deserializer, json_cal elif client_type == "callable_object": consumer = kafka.KafkaConsumer( topic, - bootstrap_servers=BROKER, + bootstrap_servers=broker, key_deserializer=json_callable_deserializer, value_deserializer=json_callable_deserializer, auto_offset_reset="earliest", @@ -124,7 +127,7 @@ def consumer(group_id, topic, producer, client_type, json_deserializer, json_cal elif client_type == "serializer_object": consumer = kafka.KafkaConsumer( topic, - bootstrap_servers=BROKER, + bootstrap_servers=broker, key_deserializer=json_deserializer, value_deserializer=json_deserializer, auto_offset_reset="earliest", @@ -190,13 +193,13 @@ def __call__(self, obj): @pytest.fixture(scope="function") -def topic(): +def topic(broker): from kafka.admin.client import KafkaAdminClient from kafka.admin.new_topic import NewTopic topic = "test-topic-%s" % str(uuid.uuid4()) - admin = KafkaAdminClient(bootstrap_servers=BROKER) + admin = KafkaAdminClient(bootstrap_servers=broker) new_topics = [NewTopic(topic, num_partitions=1, replication_factor=1)] admin.create_topics(new_topics) diff --git a/tests/messagebroker_kafkapython/test_consumer.py b/tests/messagebroker_kafkapython/test_consumer.py index 78ba086c6..93da0f877 100644 --- a/tests/messagebroker_kafkapython/test_consumer.py +++ b/tests/messagebroker_kafkapython/test_consumer.py @@ -37,14 +37,15 @@ from newrelic.packages import six -def test_custom_metrics(get_consumer_record, topic): +def test_custom_metrics(get_consumer_record, topic, expected_broker_metrics): @validate_transaction_metrics( "Named/%s" % topic, group="Message/Kafka/Topic", custom_metrics=[ ("Message/Kafka/Topic/Named/%s/Received/Bytes" % topic, 1), ("Message/Kafka/Topic/Named/%s/Received/Messages" % topic, 1), - ], + ] + + expected_broker_metrics, background_task=True, ) def _test(): @@ -62,7 +63,7 @@ def _test(): _test() -def test_custom_metrics_on_existing_transaction(get_consumer_record, topic): +def test_custom_metrics_on_existing_transaction(get_consumer_record, topic, expected_broker_metrics): from kafka.version import __version__ as version transaction_name = ( @@ -75,7 +76,8 @@ def test_custom_metrics_on_existing_transaction(get_consumer_record, topic): ("Message/Kafka/Topic/Named/%s/Received/Bytes" % topic, 1), ("Message/Kafka/Topic/Named/%s/Received/Messages" % topic, 1), ("Python/MessageBroker/Kafka-Python/%s" % version, 1), - ], + ] + + expected_broker_metrics, background_task=True, ) @validate_transaction_count(1) @@ -86,7 +88,7 @@ def _test(): _test() -def test_custom_metrics_inactive_transaction(get_consumer_record, topic): +def test_custom_metrics_inactive_transaction(get_consumer_record, topic, expected_missing_broker_metrics): transaction_name = ( "test_consumer:test_custom_metrics_inactive_transaction.._test" if six.PY3 else "test_consumer:_test" ) @@ -96,7 +98,8 @@ def test_custom_metrics_inactive_transaction(get_consumer_record, topic): custom_metrics=[ ("Message/Kafka/Topic/Named/%s/Received/Bytes" % topic, None), ("Message/Kafka/Topic/Named/%s/Received/Messages" % topic, None), - ], + ] + + expected_missing_broker_metrics, background_task=True, ) @validate_transaction_count(1) @@ -139,7 +142,7 @@ def _test(): _test() -def test_distributed_tracing_headers(topic, producer, consumer, serialize): +def test_distributed_tracing_headers(topic, producer, consumer, serialize, expected_broker_metrics): # Produce the messages inside a transaction, making sure to close it. @background_task() def _produce(): @@ -152,7 +155,8 @@ def _produce(): rollup_metrics=[ ("Supportability/DistributedTrace/AcceptPayload/Success", None), ("Supportability/TraceContext/Accept/Success", 1), - ], + ] + + expected_broker_metrics, background_task=True, ) @validate_transaction_count(1) @@ -189,3 +193,13 @@ def _poll(*args, **kwargs): consumer.poll = _poll return consumer + + +@pytest.fixture(scope="function") +def expected_broker_metrics(broker, topic): + return [("MessageBroker/Kafka/Nodes/%s/Consume/%s" % (server, topic), 1) for server in broker] + + +@pytest.fixture(scope="function") +def expected_missing_broker_metrics(broker, topic): + return [("MessageBroker/Kafka/Nodes/%s/Consume/%s" % (server, topic), None) for server in broker] diff --git a/tests/messagebroker_kafkapython/test_producer.py b/tests/messagebroker_kafkapython/test_producer.py index 418ea4b40..9f069e003 100644 --- a/tests/messagebroker_kafkapython/test_producer.py +++ b/tests/messagebroker_kafkapython/test_producer.py @@ -30,7 +30,7 @@ from newrelic.packages import six -def test_trace_metrics(topic, send_producer_message): +def test_trace_metrics(topic, send_producer_message, expected_broker_metrics): from kafka.version import __version__ as version scoped_metrics = [("MessageBroker/Kafka/Topic/Produce/Named/%s" % topic, 1)] @@ -41,7 +41,7 @@ def test_trace_metrics(topic, send_producer_message): txn_name, scoped_metrics=scoped_metrics, rollup_metrics=unscoped_metrics, - custom_metrics=[("Python/MessageBroker/Kafka-Python/%s" % version, 1)], + custom_metrics=[("Python/MessageBroker/Kafka-Python/%s" % version, 1)] + expected_broker_metrics, background_task=True, ) @background_task() @@ -51,7 +51,7 @@ def test(): test() -def test_distributed_tracing_headers(topic, send_producer_message): +def test_distributed_tracing_headers(topic, send_producer_message, expected_broker_metrics): txn_name = "test_producer:test_distributed_tracing_headers..test" if six.PY3 else "test_producer:test" @validate_transaction_metrics( @@ -59,7 +59,8 @@ def test_distributed_tracing_headers(topic, send_producer_message): rollup_metrics=[ ("Supportability/TraceContext/Create/Success", 1), ("Supportability/DistributedTrace/CreatePayload/Success", 1), - ], + ] + + expected_broker_metrics, background_task=True, ) @background_task() @@ -71,13 +72,14 @@ def test(): test() -def test_distributed_tracing_headers_under_terminal(topic, send_producer_message): +def test_distributed_tracing_headers_under_terminal(topic, send_producer_message, expected_broker_metrics): @validate_transaction_metrics( "test_distributed_tracing_headers_under_terminal", rollup_metrics=[ ("Supportability/TraceContext/Create/Success", 1), ("Supportability/DistributedTrace/CreatePayload/Success", 1), - ], + ] + + expected_broker_metrics, background_task=True, ) @background_task(name="test_distributed_tracing_headers_under_terminal") @@ -102,3 +104,8 @@ def test(): producer.flush() test() + + +@pytest.fixture(scope="function") +def expected_broker_metrics(broker, topic): + return [("MessageBroker/Kafka/Nodes/%s/Produce/%s" % (server, topic), 1) for server in broker]