Skip to content

Commit

Permalink
Add bootstrap server metric
Browse files Browse the repository at this point in the history
  • Loading branch information
hmstepanek committed Sep 10, 2024
1 parent 41e5475 commit 7293753
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 55 deletions.
42 changes: 41 additions & 1 deletion newrelic/hooks/messagebroker_confluentkafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from newrelic.api.transaction import current_transaction
from newrelic.common.object_wrapper import function_wrapper, wrap_function_wrapper
from newrelic.common.package_version_utils import get_package_version
from newrelic.common.signature import bind_args

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -56,14 +57,18 @@ def wrap_Producer_produce(wrapped, instance, args, kwargs):
args = args[1:]
else:
topic = kwargs.pop("topic", None)
topic = topic or "Default"

transaction.add_messagebroker_info("Confluent-Kafka", get_package_version("confluent-kafka"))
if hasattr(instance, "_nr_bootstrap_servers"):
for server_name in instance._nr_bootstrap_servers:
transaction.record_custom_metric("MessageBroker/Kafka/Nodes/%s/Produce/%s" % (server_name, topic), 1)

with MessageTrace(
library="Kafka",
operation="Produce",
destination_type="Topic",
destination_name=topic or "Default",
destination_name=topic,
source=wrapped,
):
dt_headers = {k: v.encode("utf-8") for k, v in MessageTrace.generate_request_headers(transaction)}
Expand Down Expand Up @@ -166,6 +171,11 @@ def wrap_Consumer_poll(wrapped, instance, args, kwargs):
name = "Named/%s" % destination_name
transaction.record_custom_metric("%s/%s/Received/Bytes" % (group, name), received_bytes)
transaction.record_custom_metric("%s/%s/Received/Messages" % (group, name), message_count)
if hasattr(instance, "_nr_bootstrap_servers"):
for server_name in instance._nr_bootstrap_servers:
transaction.record_custom_metric(
"MessageBroker/Kafka/Nodes/%s/Consume/%s" % (server_name, destination_name), 1
)
transaction.add_messagebroker_info("Confluent-Kafka", get_package_version("confluent-kafka"))

return record
Expand Down Expand Up @@ -219,6 +229,34 @@ def wrap_DeserializingConsumer_init(wrapped, instance, args, kwargs):
instance._value_deserializer = wrap_serializer("Deserialization/Value", "Message")(instance._value_deserializer)


def wrap_Producer_init(wrapped, instance, args, kwargs):
wrapped(*args, **kwargs)

# Try to capture the boostrap server info that is passed in in the configuration.
try:
bound_args = bind_args(wrapped, args, kwargs)
conf = bound_args["args"][0]
servers = conf.get("bootstrap.servers")
if servers:
instance._nr_bootstrap_servers = servers.split(",")
except Exception:
pass


def wrap_Consumer_init(wrapped, instance, args, kwargs):
wrapped(*args, **kwargs)

# Try to capture the boostrap server info that is passed in in the configuration.
try:
bound_args = bind_args(wrapped, args, kwargs)
conf = bound_args["args"][0]
servers = conf.get("bootstrap.servers")
if servers:
instance._nr_bootstrap_servers = servers.split(",")
except Exception:
pass


def wrap_immutable_class(module, class_name):
# Wrap immutable binary extension class with a mutable Python subclass
new_class = type(class_name, (getattr(module, class_name),), {})
Expand All @@ -230,10 +268,12 @@ def instrument_confluentkafka_cimpl(module):
if hasattr(module, "Producer"):
wrap_immutable_class(module, "Producer")
wrap_function_wrapper(module, "Producer.produce", wrap_Producer_produce)
wrap_function_wrapper(module, "Producer.__init__", wrap_Producer_init)

if hasattr(module, "Consumer"):
wrap_immutable_class(module, "Consumer")
wrap_function_wrapper(module, "Consumer.poll", wrap_Consumer_poll)
wrap_function_wrapper(module, "Consumer.__init__", wrap_Consumer_init)


def instrument_confluentkafka_serializing_producer(module):
Expand Down
11 changes: 10 additions & 1 deletion newrelic/hooks/messagebroker_kafkapython.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def wrap_KafkaProducer_send(wrapped, instance, args, kwargs):
return wrapped(*args, **kwargs)

topic, value, key, headers, partition, timestamp_ms = _bind_send(*args, **kwargs)
topic = topic or "Default"
headers = list(headers) if headers else []

transaction.add_messagebroker_info("Kafka-Python", get_package_version("kafka-python"))
Expand All @@ -55,7 +56,7 @@ def wrap_KafkaProducer_send(wrapped, instance, args, kwargs):
library="Kafka",
operation="Produce",
destination_type="Topic",
destination_name=topic or "Default",
destination_name=topic,
source=wrapped,
terminal=False,
):
Expand All @@ -64,6 +65,9 @@ def wrap_KafkaProducer_send(wrapped, instance, args, kwargs):
if headers:
dt_headers.extend(headers)

if hasattr(instance, "config"):
for server_name in instance.config.get("bootstrap_servers", []):
transaction.record_custom_metric("MessageBroker/Kafka/Nodes/%s/Produce/%s" % (server_name, topic), 1)
try:
return wrapped(
topic, value=value, key=key, headers=dt_headers, partition=partition, timestamp_ms=timestamp_ms
Expand Down Expand Up @@ -152,6 +156,11 @@ def wrap_kafkaconsumer_next(wrapped, instance, args, kwargs):
name = "Named/%s" % destination_name
transaction.record_custom_metric("%s/%s/Received/Bytes" % (group, name), received_bytes)
transaction.record_custom_metric("%s/%s/Received/Messages" % (group, name), message_count)
if hasattr(instance, "config"):
for server_name in instance.config.get("bootstrap_servers", []):
transaction.record_custom_metric(
"MessageBroker/Kafka/Nodes/%s/Consume/%s" % (server_name, destination_name), 1
)
transaction.add_messagebroker_info("Kafka-Python", get_package_version("kafka-python"))

return record
Expand Down
26 changes: 15 additions & 11 deletions tests/messagebroker_confluentkafka/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@

DB_SETTINGS = kafka_settings()[0]

BROKER = "%s:%s" % (DB_SETTINGS["host"], DB_SETTINGS["port"])

@pytest.fixture(scope="session")
def broker():
BROKER = "%s:%s" % (DB_SETTINGS["host"], DB_SETTINGS["port"])
return BROKER


_default_settings = {
Expand Down Expand Up @@ -58,23 +62,23 @@ def skip_if_not_serializing(client_type):


@pytest.fixture(scope="function")
def producer(topic, client_type, json_serializer):
def producer(topic, client_type, json_serializer, broker):
from confluent_kafka import Producer, SerializingProducer

if client_type == "cimpl":
producer = Producer({"bootstrap.servers": BROKER})
producer = Producer({"bootstrap.servers": broker})
elif client_type == "serializer_function":
producer = SerializingProducer(
{
"bootstrap.servers": BROKER,
"bootstrap.servers": broker,
"value.serializer": lambda v, c: json.dumps(v).encode("utf-8"),
"key.serializer": lambda v, c: json.dumps(v).encode("utf-8") if v is not None else None,
}
)
elif client_type == "serializer_object":
producer = SerializingProducer(
{
"bootstrap.servers": BROKER,
"bootstrap.servers": broker,
"value.serializer": json_serializer,
"key.serializer": json_serializer,
}
Expand All @@ -87,13 +91,13 @@ def producer(topic, client_type, json_serializer):


@pytest.fixture(scope="function")
def consumer(group_id, topic, producer, client_type, json_deserializer):
def consumer(group_id, topic, producer, client_type, json_deserializer, broker):
from confluent_kafka import Consumer, DeserializingConsumer

if client_type == "cimpl":
consumer = Consumer(
{
"bootstrap.servers": BROKER,
"bootstrap.servers": broker,
"auto.offset.reset": "earliest",
"heartbeat.interval.ms": 1000,
"group.id": group_id,
Expand All @@ -102,7 +106,7 @@ def consumer(group_id, topic, producer, client_type, json_deserializer):
elif client_type == "serializer_function":
consumer = DeserializingConsumer(
{
"bootstrap.servers": BROKER,
"bootstrap.servers": broker,
"auto.offset.reset": "earliest",
"heartbeat.interval.ms": 1000,
"group.id": group_id,
Expand All @@ -113,7 +117,7 @@ def consumer(group_id, topic, producer, client_type, json_deserializer):
elif client_type == "serializer_object":
consumer = DeserializingConsumer(
{
"bootstrap.servers": BROKER,
"bootstrap.servers": broker,
"auto.offset.reset": "earliest",
"heartbeat.interval.ms": 1000,
"group.id": group_id,
Expand Down Expand Up @@ -168,12 +172,12 @@ def __call__(self, obj, ctx):


@pytest.fixture(scope="function")
def topic():
def topic(broker):
from confluent_kafka.admin import AdminClient, NewTopic

topic = "test-topic-%s" % str(uuid.uuid4())

admin = AdminClient({"bootstrap.servers": BROKER})
admin = AdminClient({"bootstrap.servers": broker})
new_topics = [NewTopic(topic, num_partitions=1, replication_factor=1)]
topics = admin.create_topics(new_topics)
for _, f in topics.items():
Expand Down
29 changes: 21 additions & 8 deletions tests/messagebroker_confluentkafka/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@
from newrelic.packages import six


def test_custom_metrics(get_consumer_record, topic):
def test_custom_metrics(get_consumer_record, topic, expected_broker_metrics):
custom_metrics = [
("Message/Kafka/Topic/Named/%s/Received/Bytes" % topic, 1),
("Message/Kafka/Topic/Named/%s/Received/Messages" % topic, 1),
]
] + expected_broker_metrics

@validate_transaction_metrics(
"Named/%s" % topic,
Expand All @@ -65,7 +65,7 @@ def _test():
_test()


def test_custom_metrics_on_existing_transaction(get_consumer_record, topic):
def test_custom_metrics_on_existing_transaction(get_consumer_record, topic, expected_broker_metrics):
from confluent_kafka import __version__ as version

transaction_name = (
Expand All @@ -78,7 +78,8 @@ def test_custom_metrics_on_existing_transaction(get_consumer_record, topic):
("Message/Kafka/Topic/Named/%s/Received/Bytes" % topic, 1),
("Message/Kafka/Topic/Named/%s/Received/Messages" % topic, 1),
("Python/MessageBroker/Confluent-Kafka/%s" % version, 1),
],
]
+ expected_broker_metrics,
background_task=True,
)
@validate_transaction_count(1)
Expand All @@ -89,7 +90,7 @@ def _test():
_test()


def test_custom_metrics_inactive_transaction(get_consumer_record, topic):
def test_custom_metrics_inactive_transaction(get_consumer_record, topic, expected_missing_broker_metrics):
transaction_name = (
"test_consumer:test_custom_metrics_inactive_transaction.<locals>._test" if six.PY3 else "test_consumer:_test"
)
Expand All @@ -99,7 +100,8 @@ def test_custom_metrics_inactive_transaction(get_consumer_record, topic):
custom_metrics=[
("Message/Kafka/Topic/Named/%s/Received/Bytes" % topic, None),
("Message/Kafka/Topic/Named/%s/Received/Messages" % topic, None),
],
]
+ expected_missing_broker_metrics,
background_task=True,
)
@validate_transaction_count(1)
Expand Down Expand Up @@ -148,7 +150,7 @@ def _test():
_test()


def test_distributed_tracing_headers(topic, producer, consumer, serialize):
def test_distributed_tracing_headers(topic, producer, consumer, serialize, expected_broker_metrics):
# Produce the messages inside a transaction, making sure to close it.
@validate_transaction_count(1)
@background_task()
Expand All @@ -162,7 +164,8 @@ def _produce():
rollup_metrics=[
("Supportability/DistributedTrace/AcceptPayload/Success", None),
("Supportability/TraceContext/Accept/Success", 1),
],
]
+ expected_broker_metrics,
background_task=True,
)
@validate_transaction_count(1)
Expand All @@ -188,3 +191,13 @@ def _test():

_produce()
_consume()


@pytest.fixture(scope="function")
def expected_broker_metrics(broker, topic):
return [("MessageBroker/Kafka/Nodes/%s/Consume/%s" % (server, topic), 1) for server in broker.split(",")]


@pytest.fixture(scope="function")
def expected_missing_broker_metrics(broker, topic):
return [("MessageBroker/Kafka/Nodes/%s/Consume/%s" % (server, topic), None) for server in broker.split(",")]
19 changes: 13 additions & 6 deletions tests/messagebroker_confluentkafka/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def producer_callback2(err, msg):
assert callback2_called.wait(5), "Callback never called."


def test_trace_metrics(topic, send_producer_message):
def test_trace_metrics(topic, send_producer_message, expected_broker_metrics):
from confluent_kafka import __version__ as version

scoped_metrics = [("MessageBroker/Kafka/Topic/Produce/Named/%s" % topic, 1)]
Expand All @@ -108,7 +108,7 @@ def test_trace_metrics(topic, send_producer_message):
txn_name,
scoped_metrics=scoped_metrics,
rollup_metrics=unscoped_metrics,
custom_metrics=[("Python/MessageBroker/Confluent-Kafka/%s" % version, 1)],
custom_metrics=[("Python/MessageBroker/Confluent-Kafka/%s" % version, 1)] + expected_broker_metrics,
background_task=True,
)
@background_task()
Expand All @@ -118,15 +118,16 @@ def test():
test()


def test_distributed_tracing_headers(topic, send_producer_message):
def test_distributed_tracing_headers(topic, send_producer_message, expected_broker_metrics):
txn_name = "test_producer:test_distributed_tracing_headers.<locals>.test" if six.PY3 else "test_producer:test"

@validate_transaction_metrics(
txn_name,
rollup_metrics=[
("Supportability/TraceContext/Create/Success", 1),
("Supportability/DistributedTrace/CreatePayload/Success", 1),
],
]
+ expected_broker_metrics,
background_task=True,
)
@background_task()
Expand All @@ -138,13 +139,14 @@ def test():
test()


def test_distributed_tracing_headers_under_terminal(topic, send_producer_message):
def test_distributed_tracing_headers_under_terminal(topic, send_producer_message, expected_broker_metrics):
@validate_transaction_metrics(
"test_distributed_tracing_headers_under_terminal",
rollup_metrics=[
("Supportability/TraceContext/Create/Success", 1),
("Supportability/DistributedTrace/CreatePayload/Success", 1),
],
]
+ expected_broker_metrics,
background_task=True,
)
@background_task(name="test_distributed_tracing_headers_under_terminal")
Expand All @@ -170,3 +172,8 @@ def test():
producer.flush()

test()


@pytest.fixture(scope="function")
def expected_broker_metrics(broker, topic):
return [("MessageBroker/Kafka/Nodes/%s/Produce/%s" % (server, topic), 1) for server in broker.split(",")]
Loading

0 comments on commit 7293753

Please sign in to comment.