Skip to content

Commit

Permalink
Update how DLQ subscriber is instantiated
Browse files Browse the repository at this point in the history
  • Loading branch information
TheByronHimes committed Jan 14, 2025
1 parent 536c1ee commit fcc3c0d
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 79 deletions.
71 changes: 22 additions & 49 deletions src/hexkit/providers/akafka/provider/eventsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,10 +566,8 @@ class KafkaDLQSubscriber:
DLQEventProcessor definition.
"""

@classmethod
@asynccontextmanager
async def construct( # noqa: PLR0913
cls,
def __init__( # noqa: PLR0913
self,
*,
config: KafkaConfig,
dlq_topic: str,
Expand Down Expand Up @@ -628,48 +626,6 @@ async def construct( # noqa: PLR0913
max_partition_fetch_bytes=config.kafka_max_message_size,
)

await consumer.start()
try:
yield cls(
dlq_topic=dlq_topic,
dlq_publisher=dlq_publisher,
consumer=consumer,
process_dlq_event=process_dlq_event,
timeout_ms=timeout_ms,
)
finally:
await consumer.stop()

def __init__(
self,
*,
dlq_topic: str,
dlq_publisher: EventPublisherProtocol,
consumer: KafkaConsumerCompatible,
process_dlq_event: DLQEventProcessor,
timeout_ms: int,
):
"""Please do not call directly! Should be called by the `construct` method.
Args:
- `dlq_topic`:
The name of the DLQ topic to subscribe to. Has the format
"{original_topic}.{service_name}-dlq".
- `dlq_publisher`:
A running instance of a publishing provider that implements the
EventPublisherProtocol, such as KafkaEventPublisher.
- `consumer`:
hands over a started AIOKafkaConsumer.
- `process_dlq_event`:
An async callable adhering to the DLQEventProcessor definition that provides
validation and processing for events from the DLQ. It should return _either_
the event to publish to the retry topic (which may be altered) or `None` to
discard the event. The `KafkaDLQSubscriber` will log and interpret
`DLQValidationError` as a signal to discard/ignore the event, and all other
errors will be re-raised as a `DLQProcessingError`.
- `timeout_ms`:
The maximum time in milliseconds to spend reading from the DLQ topic.
"""
self._consumer = consumer
self._publisher = dlq_publisher
self._dlq_topic = dlq_topic
Expand All @@ -683,6 +639,23 @@ def __init__(
self._retry_topic = service_name + "-retry"
self._process_dlq_event = process_dlq_event

async def __aenter__(self) -> "KafkaDLQSubscriber":
"""Start consuming events from the DLQ topic."""
await self.start()
return self

async def __aexit__(self, *args) -> None:
"""Stop consuming events from the DLQ topic."""
await self.stop()

async def start(self) -> None:
"""Start consuming events from the DLQ topic."""
await self._consumer.start()

async def stop(self) -> None:
"""Stop consuming events from the DLQ topic."""
await self._consumer.stop()

async def _publish_to_retry(self, *, event: ExtractedEventInfo) -> None:
"""Publish the event to the retry topic."""
correlation_id = event.headers["correlation_id"]
Expand Down Expand Up @@ -722,9 +695,9 @@ async def _handle_dlq_event_manual(
# the wrong data to resolve the DLQ event (e.g. copy/paste error)
if dlq_event.headers["correlation_id"] != override.headers["correlation_id"]:
msg = (
"Cannot manually resolve DLQ event due to correlation ID mismatch.\n"
+ f"User-supplied event ID: {override.headers['correlation_id']}\n"
+ f"DLQ event ID: {dlq_event.headers['correlation_id']}"
"Cannot manually resolve DLQ event due to correlation ID mismatch."
+ f"\nExpected {dlq_event.headers['correlation_id']} but user"
+ f" gave {override.headers['correlation_id']}"
)
raise RuntimeError(msg)

Expand Down
50 changes: 20 additions & 30 deletions tests/integration/test_dlqsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
caplog_debug_fixture, # noqa: F401
)

pytestmark = pytest.mark.asyncio()
DEFAULT_SERVICE_NAME = "test_publisher" # see KafkaConfig instance in akafka.testutils
TEST_TOPIC = "test-topic"
TEST_TYPE = "test_type"
Expand Down Expand Up @@ -215,7 +216,7 @@ def make_config(


@pytest.mark.parametrize("max_retries", [-1, 0, 1])
def test_config_validation(max_retries: int):
async def test_config_validation(max_retries: int):
"""Test for config validation.
Errors should occur:
Expand All @@ -225,7 +226,6 @@ def test_config_validation(max_retries: int):
make_config(max_retries=max_retries)


@pytest.mark.asyncio()
async def test_original_topic_is_preserved(kafka: KafkaFixture):
"""Ensure the original topic is preserved when it comes back to the subscriber.
Expand All @@ -251,7 +251,7 @@ async def test_original_topic_is_preserved(kafka: KafkaFixture):
await event_subscriber.run(forever=False)

# Run the DLQ subscriber, telling it to publish the event to the retry topic
async with KafkaDLQSubscriber.construct(
async with KafkaDLQSubscriber(
config=config, dlq_topic=TEST_DLQ_TOPIC, dlq_publisher=kafka.publisher
) as dlq_subscriber:
await dlq_subscriber.process()
Expand All @@ -267,7 +267,6 @@ async def test_original_topic_is_preserved(kafka: KafkaFixture):
assert translator.successes == [TEST_EVENT]


@pytest.mark.asyncio()
async def test_invalid_retries_left(kafka: KafkaFixture, caplog_debug):
"""Ensure that the proper error is raised when retries_left is invalid."""
config = make_config(kafka.config, max_retries=2)
Expand Down Expand Up @@ -298,7 +297,6 @@ async def test_invalid_retries_left(kafka: KafkaFixture, caplog_debug):

@pytest.mark.parametrize("max_retries", [0, 1, 2])
@pytest.mark.parametrize("enable_dlq", [True, False])
@pytest.mark.asyncio()
async def test_retries_exhausted(
kafka: KafkaFixture, max_retries: int, enable_dlq: bool, caplog_debug
):
Expand Down Expand Up @@ -385,7 +383,6 @@ async def test_retries_exhausted(
assert parsed_log.endswith("(DLQ is disabled)")


@pytest.mark.asyncio()
async def test_send_to_retry(kafka: KafkaFixture, caplog_debug):
"""Ensure the event is sent to the retry topic when the DLQ subscriber is instructed
to do so. This would occur in whatever service or app is resolving DLQ events.
Expand All @@ -396,7 +393,7 @@ async def test_send_to_retry(kafka: KafkaFixture, caplog_debug):

# Set up dummies and consume the event with the DLQ Subscriber
dummy_publisher = DummyPublisher()
async with KafkaDLQSubscriber.construct(
async with KafkaDLQSubscriber(
config=config, dlq_topic=TEST_DLQ_TOPIC, dlq_publisher=dummy_publisher
) as dlq_subscriber:
assert not dummy_publisher.published
Expand All @@ -419,7 +416,6 @@ async def test_send_to_retry(kafka: KafkaFixture, caplog_debug):
assert dummy_publisher.published == [dlq_event]


@pytest.mark.asyncio()
async def test_consume_retry_without_og_topic(kafka: KafkaFixture, caplog_debug):
"""If the original topic is missing when consuming an event from the retry queue,
the event should be ignored and the offset committed. The information should be logged.
Expand Down Expand Up @@ -460,7 +456,6 @@ async def test_consume_retry_without_og_topic(kafka: KafkaFixture, caplog_debug)
assert parsed_log.endswith("errors: topic is empty")


@pytest.mark.asyncio()
async def test_dlq_subscriber_ignore(kafka: KafkaFixture, caplog_debug):
"""Test what happens when a DLQ Subscriber is instructed to ignore an event."""
config = make_config(kafka.config)
Expand All @@ -479,7 +474,7 @@ async def test_dlq_subscriber_ignore(kafka: KafkaFixture, caplog_debug):

# Set up dummies and consume the event with the DLQ Subscriber
dummy_publisher = DummyPublisher()
async with KafkaDLQSubscriber.construct(
async with KafkaDLQSubscriber(
config=config, dlq_topic=TEST_DLQ_TOPIC, dlq_publisher=dummy_publisher
) as dlq_subscriber:
assert not dummy_publisher.published
Expand All @@ -499,7 +494,6 @@ async def test_dlq_subscriber_ignore(kafka: KafkaFixture, caplog_debug):
assert not dummy_publisher.published


@pytest.mark.asyncio()
async def test_no_retries_no_dlq_original_error(kafka: KafkaFixture, caplog_debug):
"""Test that not using the DLQ and configuring 0 retries results in failures that
propagate the underlying error to the provider.
Expand Down Expand Up @@ -541,7 +535,6 @@ async def test_no_retries_no_dlq_original_error(kafka: KafkaFixture, caplog_debu


@pytest.mark.parametrize("event_type", ["upserted", "deleted"])
@pytest.mark.asyncio()
async def test_outbox_with_dlq(kafka: KafkaFixture, event_type: str):
"""Ensure that the DLQ lifecycle works with the KafkaOutboxSubscriber."""
config = make_config(kafka.config)
Expand All @@ -566,7 +559,7 @@ async def test_outbox_with_dlq(kafka: KafkaFixture, event_type: str):

# Consume event from the DLQ topic, publish to retry topic
dlq_topic = f"users.{config.service_name}-dlq"
async with KafkaDLQSubscriber.construct(
async with KafkaDLQSubscriber(
config=config, dlq_topic=dlq_topic, dlq_publisher=kafka.publisher
) as dlq_subscriber:
await dlq_subscriber.process()
Expand All @@ -579,7 +572,6 @@ async def test_outbox_with_dlq(kafka: KafkaFixture, event_type: str):
assert list_to_check == [event] if event_type == "upserted" else [event.key]


@pytest.mark.asyncio
async def test_kafka_event_subscriber_construction(caplog):
"""Test construction of the KafkaEventSubscriber, ensuring an error is raised if
the DLQ is enabled but no provider is used.
Expand All @@ -603,7 +595,6 @@ async def test_kafka_event_subscriber_construction(caplog):
@pytest.mark.parametrize(
"validation_error", [True, False], ids=["validation_error", "no_validation_error"]
)
@pytest.mark.asyncio
async def test_default_dlq_processor(
kafka: KafkaFixture, caplog, validation_error: bool
):
Expand All @@ -619,7 +610,7 @@ async def test_default_dlq_processor(
await kafka.publish_event(**vars(event))

dummy_publisher = DummyPublisher()
async with KafkaDLQSubscriber.construct(
async with KafkaDLQSubscriber(
config=config, dlq_topic=TEST_DLQ_TOPIC, dlq_publisher=dummy_publisher
) as dlq_subscriber:
assert not dummy_publisher.published
Expand All @@ -636,7 +627,6 @@ async def test_default_dlq_processor(
@pytest.mark.parametrize(
"processing_error", [True, False], ids=["processing_error", "no_processing_error"]
)
@pytest.mark.asyncio
async def test_custom_dlq_processors(kafka: KafkaFixture, processing_error: bool):
"""Test that a custom DLQ processor can be used with the KafkaDLQSubscriber."""

Expand All @@ -662,7 +652,7 @@ async def process(self, event: ConsumerEvent) -> Optional[ExtractedEventInfo]:

# Create custom processor instance and consume with the KafkaDLQSubscriber
custom_processor = CustomDLQProcessor()
async with KafkaDLQSubscriber.construct(
async with KafkaDLQSubscriber(
config=config,
dlq_topic=TEST_DLQ_TOPIC,
dlq_publisher=DummyPublisher(),
Expand All @@ -678,7 +668,6 @@ async def process(self, event: ConsumerEvent) -> Optional[ExtractedEventInfo]:
assert event == TEST_DLQ_EVENT


@pytest.mark.asyncio()
async def test_preview_repeatability(kafka: KafkaFixture):
"""Make sure the preview functionality works as expected.
Also make sure that previewing events doesn't impact the offset of the
Expand All @@ -697,7 +686,7 @@ async def test_preview_repeatability(kafka: KafkaFixture):

# Set up the DLQ Subscriber and preview/process the events
dummy_publisher = DummyPublisher()
async with KafkaDLQSubscriber.construct(
async with KafkaDLQSubscriber(
config=config, dlq_topic=TEST_DLQ_TOPIC, dlq_publisher=dummy_publisher
) as dlq_sub:
assert not dummy_publisher.published
Expand All @@ -723,7 +712,6 @@ async def test_preview_repeatability(kafka: KafkaFixture):


@pytest.mark.parametrize("event_published", [True, False])
@pytest.mark.asyncio()
async def test_preview_when_limit_exceeds_available_records(
kafka: KafkaFixture, event_published: bool
):
Expand All @@ -736,7 +724,7 @@ async def test_preview_when_limit_exceeds_available_records(

# Set up the DLQ Subscriber and preview the events
dummy_publisher = DummyPublisher()
async with KafkaDLQSubscriber.construct(
async with KafkaDLQSubscriber(
config=config, dlq_topic=TEST_DLQ_TOPIC, dlq_publisher=dummy_publisher
) as dlq_sub:
assert not dummy_publisher.published
Expand All @@ -747,7 +735,6 @@ async def test_preview_when_limit_exceeds_available_records(
assert results == [] if event_published else [TEST_DLQ_EVENT]


@pytest.mark.asyncio()
async def test_preview_pagination(kafka: KafkaFixture):
"""Test that the preview pagination works as expected."""
config = make_config(kafka.config)
Expand All @@ -762,7 +749,7 @@ async def test_preview_pagination(kafka: KafkaFixture):
await kafka.publisher.publish(**vars(dlq_event2))

# spin up the DLQ subscribers and preview events
async with KafkaDLQSubscriber.construct(
async with KafkaDLQSubscriber(
config=config, dlq_topic=TEST_DLQ_TOPIC, dlq_publisher=DummyPublisher()
) as dlq_subscriber:
results1 = await dlq_subscriber.preview()
Expand All @@ -775,7 +762,6 @@ async def test_preview_pagination(kafka: KafkaFixture):
assert not results3


@pytest.mark.asyncio()
async def test_empty_dlq_timeout(kafka: KafkaFixture):
"""Test that fetching from an empty DLQ topic does not hang.
Expand All @@ -787,7 +773,7 @@ async def test_empty_dlq_timeout(kafka: KafkaFixture):
async def custom_processor(event: ConsumerEvent) -> Optional[ExtractedEventInfo]:
raise RuntimeError("This should not be called.")

async with KafkaDLQSubscriber.construct(
async with KafkaDLQSubscriber(
config=config,
dlq_topic=TEST_DLQ_TOPIC,
dlq_publisher=DummyPublisher(),
Expand Down Expand Up @@ -819,7 +805,7 @@ async def custom_processor(event: ConsumerEvent) -> Optional[ExtractedEventInfo]
dummy_publisher = DummyPublisher()

# Create the DLQ subscriber and manually resolve the DLQ event
async with KafkaDLQSubscriber.construct(
async with KafkaDLQSubscriber(
config=config,
dlq_topic=TEST_DLQ_TOPIC,
dlq_publisher=dummy_publisher,
Expand Down Expand Up @@ -856,11 +842,15 @@ async def test_process_override_different_cid(kafka: KafkaFixture):
)

# Create the DLQ subscriber and manually resolve the DLQ event, expecting an error
msg = "Cannot manually resolve DLQ event because the correlation ID doesn't match."
async with KafkaDLQSubscriber.construct(
msg = (
"Cannot manually resolve DLQ event due to correlation ID mismatch."
+ f"\nExpected {TEST_CORRELATION_ID} but user gave {headers['correlation_id']}"
)
async with KafkaDLQSubscriber(
config=config,
dlq_topic=TEST_DLQ_TOPIC,
dlq_publisher=DummyPublisher(),
) as dlq_subscriber:
with pytest.raises(ValueError, match=msg):
with pytest.raises(DLQProcessingError) as err:
await dlq_subscriber.process(override=override_event)
assert err.exconly().endswith(msg)

0 comments on commit fcc3c0d

Please sign in to comment.