Skip to content

Support reactive KafkaEntity #3862

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.context.annotation.Import;
import org.springframework.kafka.entity.reactive.KafkaEntityDefaultConfiguration;
import org.springframework.kafka.entity.reactive.KafkaEntityProcessor;
import org.springframework.kafka.entity.reactive.KafkaEntityPublisher;
import org.springframework.kafka.entity.reactive.KafkaEntitySubscriber;

/**
* Enable Kafka Entities annotated fields. To be used on
* {@link org.springframework.context.annotation.Configuration Configuration} classes.
*
* Note that all fields in beans annotated with fields in Beans annotated
* with @{@link KafkaEntityProcessor} @{@link KafkaEntitySubscriber} @{@link KafkaEntityPublisher} will be detected.
*
* @author Popovics Boglarka
*
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(KafkaEntityDefaultConfiguration.class)
public @interface EnableKafkaEntity {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2016-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.entity;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* define per topic one class or record and mark with @KafkaEntity (mark a field
* with @KafkaEntityKey)
* Topic must exists.
* Topic name will be the name of the entity class with included packagename but
* you can use custom Topic name like
* this @KafkaEntity(customTopicName="PRODUCT")
*
* @author Popovics Boglarka
*/
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface KafkaEntity {

// readonly true/false

/**
* Topic name will be the name of the entity class with included packagename
* unless you use this property.
*
* @return custom topic name
*/
String customTopicName() default "";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2016-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.entity;

/**
* Thrown by spring-kafka-extensions library if there is a problem creating a
* Kafka Entity Bean.
*
* @author Popovics Boglarka
*/
public class KafkaEntityException extends Exception {

private static final long serialVersionUID = 8485773596430780144L;

/**
* The name of the Kafka Entity Bean.
*/
private final String beanName;

/**
* Constructs a new exception with the specified message and a name of the Kafka
* Entity Bean, which had the problem.
*
* @param beanName name of the Kafka Entity Bean
* @param message problem bei creating the Kafka Entity Bean
*/
public KafkaEntityException(String beanName, String message) {
super(message);
this.beanName = beanName;
}

/**
* Constructs a new exception with the specified cause and a name of the Kafka
* Entity Bean, which had the problem.
*
* @param beanName name of the Kafka Entity Bean
* @param e problem bei creating the Kafka Entity Bean
*/
public KafkaEntityException(String beanName, Exception e) {
super(e);
this.beanName = beanName;
}

/**
* Getter to the name of the Kafka Entity Bean, which had the problem.
*
* @return name of the Kafka Entity Bean
*/
public String getBeanName() {
return this.beanName;
}

@Override
public String getMessage() {
return this.beanName + ": " + super.getMessage();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2016-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.entity;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Define per topic one class or record and mark with @KafkaEntity (mark a field
* with @KafkaEntityKey) .
*
* @author Popovics Boglarka
*/
@Target({ ElementType.FIELD })
@Retention(RetentionPolicy.RUNTIME)
public @interface KafkaEntityKey {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2016-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.entity;

import io.micrometer.common.util.StringUtils;

/**
* Kafka Entity Utility Class.
*
* @author Popovics Boglarka
*/
public final class KafkaEntityUtil {

private KafkaEntityUtil() {
super();
}

/**
* Gets the name of the topic to a @KafkaEntity.
*
* @param entity class which is marked with @KafkaEntity
*
* @return the name of the entity class with included packagename but you can
* use custom Topic name like
* this @KafkaEntity(customTopicName="PRODUCT")
*/
public static String getTopicName(Class<?> entity) {
KafkaEntity topic = extractKafkaEntity(entity);
return getTopicName(entity, topic);
}

private static String getTopicName(Class<?> entity, KafkaEntity topic) {
if (!StringUtils.isEmpty(topic.customTopicName())) {
return topic.customTopicName();
}
return entity.getName();
}

private static KafkaEntity extractKafkaEntity(Class<?> entity) {
return (KafkaEntity) entity.getAnnotation(KafkaEntity.class);
}
}
Loading