Skip to content

Commit

Permalink
Close airtai#2060
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniil Dumchenko committed Feb 6, 2025
1 parent 8c96127 commit 3bfe927
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 0 deletions.
12 changes: 12 additions & 0 deletions faststream/confluent/broker/registrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,10 @@ def subscriber(
bool,
Doc("Whetever to include operation in AsyncAPI schema or not."),
] = True,
max_workers: Annotated[
int,
Doc("Number of workers to process messages concurrently."),
] = 1,
) -> "AsyncAPIBatchSubscriber": ...

@overload
Expand Down Expand Up @@ -627,6 +631,10 @@ def subscriber(
bool,
Doc("Whetever to include operation in AsyncAPI schema or not."),
] = True,
max_workers: Annotated[
int,
Doc("Number of workers to process messages concurrently."),
] = 1,
) -> "AsyncAPIDefaultSubscriber": ...

@overload
Expand Down Expand Up @@ -909,6 +917,10 @@ def subscriber(
bool,
Doc("Whetever to include operation in AsyncAPI schema or not."),
] = True,
max_workers: Annotated[
int,
Doc("Number of workers to process messages concurrently."),
] = 1,
) -> Union[
"AsyncAPIDefaultSubscriber",
"AsyncAPIBatchSubscriber",
Expand Down
33 changes: 33 additions & 0 deletions faststream/kafka/broker/registrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,17 @@ def subscriber(
Sequence["SubscriberMiddleware[KafkaMessage]"],
Doc("Subscriber middlewares to wrap incoming message processing."),
] = (),
max_workers: Annotated[
int,
Doc(
"Maximum number of messages being processed concurrently. With "
"`auto_commit=False` processing is concurrent between partitions and "
"sequential within a partition. With `auto_commit=False` maximum "
"concurrency is achieved when total number of workers across all "
"application instances running workers in the same consumer group "
"is equal to the number of partitions in the topic."
),
] = 1,
filter: Annotated[
"Filter[KafkaMessage]",
Doc(
Expand Down Expand Up @@ -791,6 +802,17 @@ def subscriber(
Sequence["SubscriberMiddleware[KafkaMessage]"],
Doc("Subscriber middlewares to wrap incoming message processing."),
] = (),
max_workers: Annotated[
int,
Doc(
"Maximum number of messages being processed concurrently. With "
"`auto_commit=False` processing is concurrent between partitions and "
"sequential within a partition. With `auto_commit=False` maximum "
"concurrency is achieved when total number of workers across all "
"application instances running workers in the same consumer group "
"is equal to the number of partitions in the topic."
),
] = 1,
filter: Annotated[
"Filter[KafkaMessage]",
Doc(
Expand Down Expand Up @@ -1172,6 +1194,17 @@ def subscriber(
Sequence["SubscriberMiddleware[KafkaMessage]"],
Doc("Subscriber middlewares to wrap incoming message processing."),
] = (),
max_workers: Annotated[
int,
Doc(
"Maximum number of messages being processed concurrently. With "
"`auto_commit=False` processing is concurrent between partitions and "
"sequential within a partition. With `auto_commit=False` maximum "
"concurrency is achieved when total number of workers across all "
"application instances running workers in the same consumer group "
"is equal to the number of partitions in the topic."
),
] = 1,
filter: Annotated[
"Filter[KafkaMessage]",
Doc(
Expand Down

0 comments on commit 3bfe927

Please sign in to comment.