Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35299] Respect initial position for new streams #140

Open
wants to merge 14 commits into
base: main
Choose a base branch
from

Conversation

antssilva96
Copy link

Purpose of the change

According to the javadoc, the STREAM_INITIAL_POSITION property defines where to start reading Kinesis streams from. However, in the current implementation this is only true if there isn't any restore state at all for any streams for that KinesisConsumer, otherwise the new stream is handled the same way a new shard for and existing stream is: start consuming from EARLIEST (same as TRIM_HORIZON initial position).

This MR changes that by making FlinkKinesisConsumer to use STREAM_INITIAL_POSITION config for new streams, which aligns with that is documented.

This behavior is disabled by default to not introduce a breaking change, but can be enabled by setting flink.stream.initpos-for-new-streams to true.

Additionally, a second config was created - flink.stream.initpos-streams - to allow specific streams to be "reset" to whatever the STREAM_INITIAL_POSITION is defined. This is an important addition in this MR because users who notice this bug and want to enable the correct behaviour will want to reset the now recorded offset for the new stream.

Verifying this change

TODO

Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment
  • Added unit tests
  • Manually verified by running the Kinesis connector on a local Flink cluster.

Significant changes

(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)

  • Dependencies have been added or upgraded
  • Public API has been changed (Public API is any class annotated with @Public(Evolving))
  • Serializers have been changed
  • New feature has been introduced
    • If yes, how is this documented? (not applicable / docs / JavaDocs / not documented) docs and JavaDocs where appropiate

Antonio Silva added 3 commits May 6, 2024 18:45
…reams - disabled by default so that this can go in a minor release. Default can be adjusted for a major.
Copy link

boring-cyborg bot commented May 7, 2024

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

@antssilva96 antssilva96 changed the title FLINK-35299: Add logic to respect initial position for new streams [FLINK-35299] Add logic to respect initial position for new streams May 7, 2024
@antssilva96 antssilva96 changed the title [FLINK-35299] Add logic to respect initial position for new streams [FLINK-35299] Respect initial position for new streams May 7, 2024
Comment on lines 327 to 340
public static final String APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS =
"flink.stream.initpos-for-new-streams";

/**
* Property that can be used to ignore the restore state for a particular stream and instead use
* the initial position. This is useful to reset a specific stream to consume from TRIM_HORIZON
* or LATEST if needed. Values must be passed in a comma separated list.
*
* <p>If a stream is in this list, it will use initial position regardless of the value of the
* {@link #APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS} property.
*/
public static final String STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO =
"flink.stream.initpos-streams";

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really sure about these property names... any suggestions ?

@antssilva96 antssilva96 marked this pull request as ready for review July 8, 2024 10:46

For example, if you configure your application with
```
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if we are able to set different streams with different INITIAL POSITION. Let's say we would add streamA, streamB and streamC as new streams, I want to have streamA and streamB to consume from LATEST and streamC from AT_TIMESTAMP. Is this possible?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was not possible before and is still not possible now 😅

Even though it would be possible to do that with minimal changes, I feel like that is another feature request on its own and probably doesn't belong in this one.
The main goal of this change is: allow users to say when INITIAL position should be used regardless of the stored state.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants