-
Hello, I am in fact blocking the eventloop thread, but this is exactly what I want to accomplish. Here is what I am doing: @Incoming("copy-ready-topic")
@Acknowledgment(Acknowledgment.Strategy.NONE)
public Subscriber<IncomingKafkaRecord<String, PlayerCopyReady>> copyTopicConsumer() {
return new BaseSubscriber<>() {
@Override
public void hookOnSubscribe(Subscription subscription) {
request(1);
}
@Override
public void hookOnNext(IncomingKafkaRecord<String, PlayerCopyReady> message) {
boolean capacityIsFull;
// insert payload in queue
// `insert` returns true when payload was inserted
do {
capacityIsFull = !keyFinderCache.insert(message);
} while (capacityIsFull);
request(1);
}
};
} The idea is simple, I am inserting all messages in a queue object in the
|
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 8 replies
-
Your logic is blocking so you need to run it on a worker thread. You can loop like this on the event loop. However, here a few other suggestions. Also, if you do not request, the kafka connector will automatically pause the consumption and so, you may not have to do anything. Finally, you delay some action, you can use vertx.setTimer(1000, x->...). This will not block the event loop. |
Beta Was this translation helpful? Give feedback.
-
I found a solution following your advice ("if you do not request, the kafka connector will automatically pause the consumption and so, you may not have to do anything"). Basically my solution is: // create a subscriber and assign it to a variable
@PostConstruct
void init() {
copyReadySubscriber = new BaseSubscriber<>() {
@Override
public void hookOnNext(IncomingKafkaRecord<String, PlayerCopyReady> message) {
// subscriber inserts payloads in cache (cache from this class or any other class)
keyFinderCache.insert(message.getPayload());
}
};
}
// just return this subscriber here
@Incoming("copy-ready-topic")
@Acknowledgment(Acknowledgment.Strategy.NONE)
public Subscriber<IncomingKafkaRecord<String, PlayerCopyReady>> copyTopicConsumer() {
return copyReadySubscriber;
}
// then you can use the subscriber reference anywhere to keep requesting. For example:
@Scheduled(
every = "1s",
concurrentExecution = SKIP
)
void pushCopyReadyRequests() {
// any condition you want
if (keyFinderCache.hasCapacity())
copyReadySubscriber.request(batchNumber);
} |
Beta Was this translation helpful? Give feedback.
I found a solution following your advice ("if you do not request, the kafka connector will automatically pause the consumption and so, you may not have to do anything"). Basically my solution is: