Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read the offset in the correct way #118

Merged
merged 10 commits into from
Sep 18, 2023
Merged

Read the offset in the correct way #118

merged 10 commits into from
Sep 18, 2023

Conversation

Gsantomaggio
Copy link
Member

Fixes #117

Fixes #117

Signed-off-by: Gabriele Santomaggio <[email protected]>
@Gsantomaggio
Copy link
Member Author

@kyrylokovalenko you are right it was a bug.

Do you have a chance to test this PR? It solves the problem

@kyrylokovalenko
Copy link

Hi @Gsantomaggio, now code from #117 works fine, but if I set offset_specification in subscribe call like this:

    await consumer.subscribe(
        stream=STREAM,
        callback=on_message,
        decoder=amqp_decoder,
        offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, 50)
    )

I receive following output:

Got message: hello: 50 from stream my-test-stream, offset 32
Got message: hello: 51 from stream my-test-stream, offset 33
Got message: hello: 52 from stream my-test-stream, offset 34
Got message: hello: 53 from stream my-test-stream, offset 35
Got message: hello: 54 from stream my-test-stream, offset 36
...
...
Got message: hello: 93 from stream my-test-stream, offset 75
Got message: hello: 94 from stream my-test-stream, offset 76
Got message: hello: 95 from stream my-test-stream, offset 77
Got message: hello: 96 from stream my-test-stream, offset 96
Got message: hello: 97 from stream my-test-stream, offset 97
Got message: hello: 98 from stream my-test-stream, offset 98
Got message: hello: 99 from stream my-test-stream, offset 99

@kyrylokovalenko
Copy link

Maybe line 275 of consumer.py

offset = frame.chunk_first_offset

should be

offset = subscriber.offset

?

It seems to solve this issue.

the offset type is OffsetType.OFFSET. The client receives the
chunk of messages that contains the offset specified in the subscribe request.
so here we skip the previous messages until we reach the offset specified.

Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
@Gsantomaggio
Copy link
Member Author

@kyrylokovalenko ok it should be okay now! I'd need to add some test

@kyrylokovalenko
Copy link

@Gsantomaggio yeah, works as intended, many thanks!

Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
@Gsantomaggio Gsantomaggio merged commit 642ce08 into master Sep 18, 2023
@Gsantomaggio Gsantomaggio deleted the fix_offset_position branch September 18, 2023 09:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Is message_context.offset wrong when messages are created using rstream Producer?
3 participants