Skip to content

Commit

Permalink
fix: prevent ConsumerStoppedError exception when engine or a stream i…
Browse files Browse the repository at this point in the history
…s stopped. Related to #234
  • Loading branch information
marcosschroh committed Nov 15, 2024
1 parent 08691d8 commit 497a809
Show file tree
Hide file tree
Showing 18 changed files with 857 additions and 28 deletions.
3 changes: 2 additions & 1 deletion examples/fastapi-webapp/fastapi_webapp/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from kstreams.streams_utils import StreamErrorPolicy

from .resources import stream_engine
from .streams import consume
from .streams import consume, consume_2
from .views import router

app = FastAPI()
Expand All @@ -21,6 +21,7 @@ async def shutdown_event():


stream_engine.add_stream(consume, error_policy=StreamErrorPolicy.STOP_APPLICATION)
stream_engine.add_stream(consume_2, error_policy=StreamErrorPolicy.RESTART)
app.include_router(router)
app.add_middleware(PrometheusMiddleware, filter_unhandled_paths=True)
app.add_api_route("/metrics", metrics) # type: ignore
8 changes: 8 additions & 0 deletions examples/fastapi-webapp/fastapi_webapp/streams.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging

import asyncio
from kstreams import ConsumerRecord, stream

logger = logging.getLogger(__name__)
Expand All @@ -11,3 +12,10 @@ async def consume(cr: ConsumerRecord):

if cr.value == b"error":
raise ValueError("error....")


@stream("local--hello-world", group_id="example-group-2")
async def consume_2(cr: ConsumerRecord):
print(f"Event consumed: headers: {cr.headers}, payload: {cr}")
await asyncio.sleep(10)
raise ValueError
9 changes: 5 additions & 4 deletions examples/fastapi-webapp/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import aiorun

import kstreams
import kstreams.streams_utils

logger = logging.getLogger(__name__)

Expand Down
11 changes: 6 additions & 5 deletions examples/graceful-shutdown-example/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Empty file.
35 changes: 35 additions & 0 deletions examples/rqlite-example/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
version: '3'
services:
zookeeper:
image: "confluentinc/cp-zookeeper:7.7.0"
hostname: zookeeper
container_name: kstream_zookeeper
ports:
- 32181:32181
environment:
- ZOOKEEPER_CLIENT_PORT=32181
kafka:
image: confluentinc/cp-kafka:7.7.0
hostname: kafka
container_name: kstream_kafka
ports:
- 9092:9092
- 9093:9093
depends_on:
- zookeeper
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:32181
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT2:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092,PLAINTEXT2://kafka:9093
- KAFKA_BROKER_ID=1
- KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0
- CONFLUENT_METRICS_ENABLE=true
- CONFLUENT_SUPPORT_CUSTOMER_ID=anonymous
- KAFKA_AUTO_CREATE_TOPICS_ENABL="true"
rqlite:
image: rqlite/rqlite:8.28.4
hostname: rqlite
container_name: kstream_rqlite
ports:
- 4001:4001
Loading

0 comments on commit 497a809

Please sign in to comment.