Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offset-based deduplication for Kafka source. #33596

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

tomstepp
Copy link
Contributor

Offset-based deduplication for Kafka source.


See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@tomstepp tomstepp force-pushed the delegated-kafka branch 2 times, most recently from 829cc76 to 0307c30 Compare January 15, 2025 19:34
@tomstepp
Copy link
Contributor Author

R: @scwhittle

Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

@tomstepp tomstepp force-pushed the delegated-kafka branch 4 times, most recently from 3f19ae6 to 6431783 Compare January 22, 2025 00:07
@tomstepp tomstepp force-pushed the delegated-kafka branch 7 times, most recently from 949eb3d to baca3e2 Compare January 22, 2025 20:58
@tomstepp tomstepp requested a review from scwhittle January 22, 2025 21:30
@tomstepp tomstepp force-pushed the delegated-kafka branch 2 times, most recently from 84dbc7b to 9b30e79 Compare January 23, 2025 20:46
@tomstepp tomstepp force-pushed the delegated-kafka branch 2 times, most recently from 09a16ab to 8578e89 Compare January 24, 2025 17:17
@tomstepp tomstepp force-pushed the delegated-kafka branch 3 times, most recently from 8a5b978 to ef33cba Compare January 24, 2025 19:53
@tomstepp tomstepp requested a review from scwhittle January 24, 2025 20:08
Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would also be good to loop in someone more familiar with the source/translation stuff to make sure we're not missing something.

@tomstepp tomstepp force-pushed the delegated-kafka branch 4 times, most recently from a8317b0 to fc82f02 Compare January 27, 2025 23:05
@tomstepp tomstepp force-pushed the delegated-kafka branch 2 times, most recently from 6bfb8ff to f2d55b0 Compare January 28, 2025 16:51
@tomstepp
Copy link
Contributor Author

R: @chamikaramj and @johnjcasey

Can one of you help review source/translation changes here? I am working on a change that applies to Kafka Read on Java legacy. I am having trouble understanding why the change breaks KafkaIOExternalTest.testConstructKafkaRead and KafkaIOExternalTest.testConstructKafkaReadWithoutMetadata http://screen/3taBLZo28VjePkZ.

@tomstepp tomstepp force-pushed the delegated-kafka branch 2 times, most recently from d4d40e3 to becfb31 Compare January 28, 2025 17:26
@tomstepp
Copy link
Contributor Author

tomstepp commented Jan 28, 2025

@scwhittle if you have any tips, or know who we can ask:

WARNING: Configuration class 'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no schema registered. Attempting to construct with setter approach.

Expansion result: error: "java.lang.RuntimeException: Failed to build transform from spec urn: 

\"beam:transform:org.apache.beam:kafka_read_with_metadata:v1\"\npayload: \"\\n\\212\\003\\n\\020\\n\\006topics\\032\\006\\032\\004\\n\\002\\020\\a\\n!\\n\\017consumer_config\\032\\n*\\b\\n\\002\\020\\a\\022\\002\\020\\a \\001(\\001\\n\\032\\n\\020key_deserializer\\032\\002\\020\\a \\002(\\002\\n\\034\\n\\022value_deserializer\\032\\002\\020\\a \\003(\\003\\n\\031\\n\\017start_read_time\\032\\002\\020\\004 \\004(\\004\\n#\\n\\031commit_offset_in_finalize\\032\\002\\020\\b \\005(\\005\\n\\032\\n\\020timestamp_policy\\032\\002\\020\\a \\006(\\006\\n\\\"\\n\\030consumer_polling_timeout\\032\\002\\020\\004 \\a(\\a\\n\\037\\n\\025redistribute_num_keys\\032\\002\\020\\003 \\b(\\b\\n\\026\\n\\fredistribute\\032\\002\\020\\b \\t(\\t\\n\\032\\n\\020allow_duplicates\\032\\002\\020\\b \\n(\\n\\n\\036\\n\\024offset_deduplication\\032\\002\\020\\b \\v(\\v\\022$059ad81c-549f-4ab3-adb7-37134f22938e\\022\\372\\002\\f\\000\\000\\000\\000\\002\\006

...

worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)\nCaused by: 

java.lang.IllegalStateException: Missing required properties: offsetDeduplication\n\tat org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Read$Builder.build(AutoValue_KafkaIO_Read.java:627) org.apache.beam.sdk.io.kafka.KafkaIO$RowsWithMetadata$Builder.buildExternal(KafkaIO.java:2139)
org.apache.beam.sdk.io.kafka.KafkaIO$RowsWithMetadata$Builder.buildExternal(KafkaIO.java:2113) org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder.getTransform(ExpansionService.java:335)
... 47 more"

Expected: a collection containing a string matching the pattern '.*KafkaIO-Read.*'
     but: was empty

@chamikaramj
Copy link
Contributor

Probably try changing the property definition to

public abstract @Nullable Boolean isOffsetDeduplication();

@chamikaramj
Copy link
Contributor

Seems like the builder is considering this to be a required parameter and failing since it's not provided through the test schema here:

@tomstepp
Copy link
Contributor Author

Probably try changing the property definition

Done, I think this resolves it.

Seems like the builder is considering this to be a required parameter and failing since it's not provided through the test schema

I think this PR had it added already, done.

Thanks for the feedback!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants