Skip to content

Commit

Permalink
feat: listeners and post processors
Browse files Browse the repository at this point in the history
  • Loading branch information
lbovet committed Aug 23, 2021
1 parent 66de03e commit b6659c8
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 8 deletions.
2 changes: 1 addition & 1 deletion kobuka-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>kobuka</artifactId>
<groupId>org.swisspush</groupId>
<version>1.0.0</version>
<version>1.0.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion kobuka-gen/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>kobuka</artifactId>
<groupId>org.swisspush</groupId>
<version>1.0.0</version>
<version>1.0.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion kobuka-spring/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>kobuka</artifactId>
<groupId>org.swisspush</groupId>
<version>1.0.0</version>
<version>1.0.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package org.swisspush.kobuka.spring.internal;

import net.karneim.pojobuilder.GeneratePojoBuilder;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.ConsumerPostProcessor;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.List;
import java.util.Map;

public class DefaultKafkaConsumerFactoryProvider {
Expand All @@ -13,7 +16,17 @@ public class DefaultKafkaConsumerFactoryProvider {
withGenerationGap = true,
intoPackage = "org.swisspush.kobuka.spring"
)
public static <K, V> DefaultKafkaConsumerFactory<K, V> createDefaultKafkaConsumerFactory(Map<String, Object> configs) {
return new DefaultKafkaConsumerFactory<K, V>(configs);
public static <K, V> DefaultKafkaConsumerFactory<K, V>
createDefaultKafkaConsumerFactory(Map<String, Object> configs,
List<ConsumerFactory.Listener<K, V>> withListeners,
List<ConsumerPostProcessor<K, V>> withPostProcessors) {
DefaultKafkaConsumerFactory<K, V> result = new DefaultKafkaConsumerFactory<>(configs);
if(withListeners != null) {
withListeners.forEach(result::addListener);
}
if(withPostProcessors != null) {
withPostProcessors.forEach(result::addPostProcessor);
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

import net.karneim.pojobuilder.GeneratePojoBuilder;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.ProducerPostProcessor;

import java.util.List;
import java.util.Map;

public class DefaultKafkaProducerFactoryProvider {
Expand All @@ -13,7 +16,17 @@ public class DefaultKafkaProducerFactoryProvider {
withGenerationGap = true,
intoPackage = "org.swisspush.kobuka.spring"
)
public static <K, V> DefaultKafkaProducerFactory<K, V> create(Map<String, Object> configs) {
return new DefaultKafkaProducerFactory<K, V>(configs);
public static <K, V> DefaultKafkaProducerFactory<K, V> create(Map<String, Object> configs,
List<ProducerFactory.Listener<K, V>> withListeners,
List<ProducerPostProcessor<K, V>> withPostProcessors)
{
DefaultKafkaProducerFactory<K, V> result = new DefaultKafkaProducerFactory<>(configs);
if(withListeners != null) {
withListeners.forEach(result::addListener);
}
if(withPostProcessors != null) {
withPostProcessors.forEach(result::addPostProcessor);
}
return result;
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<groupId>org.swisspush</groupId>
<artifactId>kobuka</artifactId>
<packaging>pom</packaging>
<version>1.0.0</version>
<version>1.0.1</version>

<build>
<plugins>
Expand Down

0 comments on commit b6659c8

Please sign in to comment.