Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support custom KafkaStreams implementations #3516

Merged
merged 6 commits into from
Sep 27, 2024

Conversation

agavra
Copy link
Contributor

@agavra agavra commented Sep 26, 2024

Fixes #3515. This PR introduces the ability to customize the instantiation of KafkaStreams using the KafkaStreamsCustomizer interface.

Copy link
Contributor

@sobychacko sobychacko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Add a section in the reference docs.
  2. Add a section in the whats-new docs.

@@ -82,7 +82,7 @@ A new `KafkaStreams` is created on each `start()`.
You might also consider using different `StreamsBuilderFactoryBean` instances, if you would like to control the lifecycles for `KStream` instances separately.

You also can specify `KafkaStreams.StateListener`, `Thread.UncaughtExceptionHandler`, and `StateRestoreListener` options on the `StreamsBuilderFactoryBean`, which are delegated to the internal `KafkaStreams` instance.
Also, apart from setting those options indirectly on `StreamsBuilderFactoryBean`, starting with _version 2.1.5_, you can use a `KafkaStreamsCustomizer` callback interface to configure an inner `KafkaStreams` instance.
Also, apart from setting those options indirectly on `StreamsBuilderFactoryBean`, starting with _version 2.1.5_, you can use a `KafkaStreamsCustomizer` callback interface to configure an inner `KafkaStreams` instance and (from _version 3.3.0_) instantiate a custom implementation of `KafkaStreams`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe split them into two lines?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For ease of readability thoughts on this structure?
image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I would go different sentences at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I kept sentences to one line where it made sense but separated some paragraphs so that it it is easier to read (I had actually struggled to read the initial flow as one paragraph has many different concepts mixed together). Here's the new look:

image

Since I'm not 100% sure what the standard is here, feel free to suggest edits or push to this branch directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what I meant was splitting them into two separate sentences.

Copy link
Contributor

@sobychacko sobychacko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still missing some author tags.

1. (from _version 2.1.5_) an inner `KafkaStreams` instance using `customize(KafkaStreams)`
2. (from _version 3.3.0_) instantiate a custom implementation of `KafkaStreams` using `initKafkaStreams(Topology, Properties, KafkaClientSupplier)`

Note that `KafkaStreamsCustomizer` overrides the options provided by `StreamsBuilderFactoryBean`. If you need to perform some `KafkaStreams` operations directly, you can access that internal `KafkaStreams` instance by using `StreamsBuilderFactoryBean.getKafkaStreams()`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second sentence needs to be on a new line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it! Thanks for bearing with me while I figure out what's standard with the docs here 🙏

Copy link
Contributor

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't comment on any of the Spring-related parts, but as a Kafka committer this change makes sense to me

@sobychacko
Copy link
Contributor

@agavra The PR build failed due to some checkstyle issues. Can you run a local build and fix them? ./gradlew check. Thanks!

@agavra
Copy link
Contributor Author

agavra commented Sep 26, 2024

Hmm, test failures seem to be unrelated? Have you seen these before @sobychacko?

EnableKafkaIntegrationTests > testProjection() FAILED
    org.opentest4j.AssertionFailedError: 
    Expecting value to be true but was false
        at [email protected]/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at [email protected]/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
        at [email protected]/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at [email protected]/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
        at app//org.springframework.kafka.annotation.EnableKafkaIntegrationTests.testProjection(EnableKafkaIntegrationTests.java:1063)
ReplyingKafkaTemplateTests > testAggregateTimeout() SKIPPED
ProjectingMessageConverterTests > createsProjectedPayloadForInterface() FAILED
    org.springframework.aop.framework.AopConfigException: Could not generate CGLIB subclass of class java.io.ByteArrayInputStream: Common causes of this problem include using a final class or a non-visible class
        at app//org.springframework.aop.framework.CglibAopProxy.buildProxy(CglibAopProxy.java:234)
        at app//org.springframework.aop.framework.CglibAopProxy.getProxy(CglibAopProxy.java:167)
        at app//org.springframework.aop.framework.ProxyFactory.getProxy(ProxyFactory.java:110)
        at app//org.springframework.data.projection.ProxyProjectionFactory.createProjection(ProxyProjectionFactory.java:124)
        at app//org.springframework.data.projection.SpelAwareProxyProjectionFactory.createProjection(SpelAwareProxyProjectionFactory.java:47)
        at app//org.springframework.kafka.support.converter.ProjectingMessageConverter.extractAndConvertValue(ProjectingMessageConverter.java:124)
        at app//org.springframework.kafka.support.converter.ProjectingMessageConverterTests.assertProjectionProxy(ProjectingMessageConverterTests.java:115)
        at app//org.springframework.kafka.support.converter.ProjectingMessageConverterTests.createsProjectedPayloadForInterface(ProjectingMessageConverterTests.java:77)
        Caused by:
        org.springframework.cglib.core.ReflectUtils$1: ClassLoader mismatch for [java.io.ByteArrayInputStream]: JVM should be started with --add-opens=java.base/java.lang=ALL-UNNAMED for ClassLoader.defineClass to be accessible on jdk.internal.loader.ClassLoaders$AppClassLoader; consider co-locating the affected class in that target ClassLoader instead.

@agavra
Copy link
Contributor Author

agavra commented Sep 26, 2024

FYI @sobychacko looks like it's related to spring-aop, pinning spring-aop to 6.1.0 fixes the issue and the test:

implementation 'org.springframework:spring-aop:6.1.0'

@sobychacko
Copy link
Contributor

We will look into it today.

@sobychacko sobychacko merged commit 52019c2 into spring-projects:main Sep 27, 2024
2 of 3 checks passed
@agavra agavra deleted the 3515_custom_kstreams branch September 27, 2024 00:04
@sobychacko
Copy link
Contributor

Thanks for the PR and for bringing the AOP proxy issue to our attention. We downgraded the framework dependency to the recent RC1 version while investigating the root cause. The PR is now merged upstream.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Customize Instantiation of KafkaStreams in StreamsBuilderFactoryBean
4 participants