Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply backpressure per Kafka partition #110

Closed
atamborrino opened this issue Mar 13, 2016 · 17 comments
Closed

Apply backpressure per Kafka partition #110

atamborrino opened this issue Mar 13, 2016 · 17 comments
Milestone

Comments

@atamborrino
Copy link

Hi,

I open this issue to discuss about a potential improvement.

For now, when we create a consumer, back-pressure is applied per Kafka topic. If the processing of a record of partition 1 is slow (via a .mapAsync for instance), this will block consumption for all other partitions. This can be mitigated by having several deployed clients with the same consumer-group, but I think we should allow parallel/independent consumption of partitions inside the same instance of client.

Consuming the partitions of a Kafka topic in parallel in quite standard in the Kafka world, as message ordering is only guaranteed per-partition. Apache Samza and the recent Kafka Streams lib follows this approach by mapping the consumption parallelism level to the number of Kafka partitions (see http://docs.confluent.io/2.1.0-alpha1/streams/architecture.html#parallelism-model). Hence partition processing are totally independent and a slowly processed partition does not block all the others.

To do the same with reactive-kafka, a first approach could be the following:

consumerSource
  .groupBy(maxNbPartitions, msg => msg.partition)
  .mapAsync(1)(yourAsyncProcessing)
  .mergeSubStreams
  .to(consumer.offsetSink)

However, this approach has the following drawbacks:

  • Independent processing between partitions is limited: once an input buffer for a slowly processed partition is full, all the other partitions are blocked.
  • Non-optimal efficiency: when the processing for a partition begin to be slow, the Kafka consumer will still get in a round-robin fashion records for all partitions, even if partition 1 processing is already late and its input buffer is already big. It would be more efficient to propagate this backpressure information to the Kafka client itself to tell it to stop fetching from partition 1 for now and concentrate throughput on other partitions.

Thus, I think that if we want real independent back-pressure between partition we can write a new consumer, say "fine-grained", that uses Kafka client 0.9 ConsumerRebalanceListener and Consumption Flow Control (using client.pause(partition) and client.resume(partition), see https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html).

I see this consumer being a Source[Source[Record]]. The wrapper source being a source of partition, the wrapped source being a source of records. This makes quite sense from a Kafka client point of view (with auto-assigned partitions, a Kafka client is really a source of partition where each partition is a source of records).

The code could look like this:

val perPartitionConsumer = kafka.consumePerPartitionWithOffsetSink(...) // whatever naming
val partitionStream = Source.fromPublisher(perPartitionConsumer.publisher)

partitionStream.map { recordStream =>
    recordStream
      .mapAsync(1)(yourAsyncProcessing)
}
.flatMapMerge(maxNbPartitions, identity)
.to(perPartitionConsumer.offsetSink)

With this, we have true per-partition async backpressure. This has also the benefits of exposing the assign/unassign of partitions to the lib users if they want to do something upon these events.

Do you think it will be interesting to have such fine-grained consumer in the API? If we agree on it, I'll be happy to code it and do a PR.

@13h3r
Copy link
Member

13h3r commented Mar 14, 2016

@atamborrino thanks for the idea. Right now we are discussing how new API for reactive-kafka will looks like - akka/akka-meta#14 and akka/akka-meta#13.

In general I like your idea, but first we have to agreed how very basic API will looks like.

Also I worry about how pause/resume is implemented in kafka client. Client prefetches some data and if topic/partition is paused just drop this data which is inefficient. May be this will be fixed in 0.10.

@atamborrino
Copy link
Author

@13h3r thanks for your feedback. Ok we'll wait first for the new basic API. In the meantime I'll dig into Kafka client pause/resume implementation to see if it really fulfils our needs.

@13h3r
Copy link
Member

13h3r commented Mar 14, 2016

@13h3r
Copy link
Member

13h3r commented Mar 16, 2016

I like API which can provide Source[(TopicInfo, Source[Messages])] because it also exposes dynamically assigned partitions in a stream manner.

The problem here is in kafka client which is not thread safe. Partition related sources should coordinates somehow with one materialized kafka client and concurrently call pause/resume.

@atamborrino
Copy link
Author

Yep me too. But it will be Source[(PartitionInfo, Source[Messages])], is it not?

Concerning the implementation, when I thought about it with the actor / RS Source implementation, I was thinking of a main ActorPublisher[(PartitionInfo, Source[Messages])] which has the kafka consumer client and create a child actor ActorPublisher[Messages] per partition assigned, dynamically. Child actors can send Ack and/or Nack to propagate backpressure to their parent actor. As the parent actor is the only one to use the Kafka client, thread-safety is ensured.

But as far as I understand, we prefer to use the GraphStage API to create a Source right? In that case, I'll have to dig a little bit in the GraphStage API to see how we can implement it.

@13h3r
Copy link
Member

13h3r commented Mar 17, 2016

Yep me too. But it will be Source[(PartitionInfo, Source[Messages])], is it not?

yes

AFAIU graph stage API is for complex shapes (like 3 in and 4 out with some complex semantic). Actor based API is another way to implement Source and Sink. @ktoso, @patriknw please confirm am I right about it or not.

I have another thought about backpressure per partition. Are multiple graphs solve this? If you can backpressure each partition then you should be able to process each partition independently from others. What if you create multiple graphs and each graph assign one partition for itself? Will it solve your case?

@patriknw
Copy link
Member

yes, we will use GraphStage for the implementation

@danielwegener
Copy link

I can second this request. Back pressure per partition is important if one wants to mergeSort a view of the whole topic. I had this problem roughly a year ago when we needed a deterministic replayable (single) consumer that consumed all partitions in parallel (filtered 99% of the messages away) and then merge-sorted them by event-time and then grouped/aggregated the realigned stream. Since messages are not really always perfectly distributed over all partitions (especially if they are partitioned by a business key), they do not always play back at the same speed. Plus: If one partitions producer dies or becomes slow, all other partitions should be back-pressured and wait for the slow producer to catch up.

I guess its possible to create this with one dedicated consumer per partition (back then I used the simple consumer api), but an API that creates a source per partition would be really helpful in such cases and emphasizes the I am still in a partitioned world-pattern like samza and kafka streams do it.

@atamborrino
Copy link
Author

I have another thought about backpressure per partition. Are multiple graphs solve this? If you can backpressure each partition then you should be able to process each partition independently from others. What if you create multiple graphs and each graph assign one partition for itself? Will it solve your case?

When you say "multiple graphs", you mean a graph source with multiple outlets? The problem of this is that the number of outlets is fixed, whereas the assignment / unassignment of Kafka partitions is dynamic. Hence I think a Source of Source models well this behaviour.

@13h3r
Copy link
Member

13h3r commented Mar 21, 2016

When you say "multiple graphs", you mean a graph source with multiple outlets? The problem of this is that the number of outlets is fixed, whereas the assignment / unassignment of Kafka partitions is dynamic. Hence I think a Source of Source models well this behaviour.

No, I mean running multiple sources with multiple underlying kafka clients. They are independent. Multiple kafka clients will be balanced by kafka.

Like this:

val myFlow: RunnableGraph[_] = ???
1 to 10 foreach (_ => myFlow.run)

@atamborrino
Copy link
Author

I don't think this is a good idea.

If we launch X clients on the first node (X = nbPartitions), we have backpressure per partition but I think launching 30 Kafka clients on a single node is an anti-pattern with bad performance.

Even worse, when you add a second node, this second node will not receive any partition to process as the 30 clients on the first node already have one partition each.

Thus we would miss the elastic scalability of the auto-balancing mechanism of the Kafka client.

@13h3r
Copy link
Member

13h3r commented Mar 21, 2016

Even worse, when you add a second node, this second node will not receive any partition to process as the 30 clients on the first node already have one partition each.

Agreed. Lets reject this idea

@13h3r
Copy link
Member

13h3r commented Apr 5, 2016

Also, we may be interesting to expose stream per-partition. Partition are used to limit processing scope and it is possible to perform windowed operations on per partition basis. Could we expose SubFlow from kafka consumer?

@atamborrino
Copy link
Author

A Subflow could do the job instead of a Source[Source], but in that case the api user can not react to the assignement/unassignement of partitions, can he?

@13h3r
Copy link
Member

13h3r commented Apr 30, 2016

There is discussion about this in PR - #139

@atamborrino
Copy link
Author

Great!

@13h3r
Copy link
Member

13h3r commented Jun 30, 2016

done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants