Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FetchMessage Why submit offset automatically #1354

Open
yangtongbing opened this issue Dec 5, 2024 · 6 comments
Open

FetchMessage Why submit offset automatically #1354

yangtongbing opened this issue Dec 5, 2024 · 6 comments
Labels

Comments

@yangtongbing
Copy link

I tried several ways, including setting CommitInterval, to commit automatically, and didn't commit offset through CommitMessages as expected. Here is my sample code.

package main

import (
	"context"
	"fmt"
	kafka "github.com/segmentio/kafka-go"
)

func main() {
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:  []string{"127.0.0.1:9092"},
		Topic:    "test",
		GroupID:  "9",
		MinBytes: 10e3, // 10KB
		MaxBytes: 10e6, // 10MB
	})
	defer r.Close()
	ctx := context.Background()
	for {
		m, err := r.FetchMessage(ctx)
		if err != nil {
			break
		}
		fmt.Printf("message at offset %d:%s\n", m.Offset, string(m.Value))
		if shouldCommit(m) {
			err = r.CommitMessages(ctx, m)
			if err != nil {
				fmt.Println("commit failed", err)
			}
		}
	}
}

func shouldCommit(message kafka.Message) bool {
	return false
}

@yangtongbing
Copy link
Author

go version go1.22.4 darwin/arm64
kafka-go version is v0.4.47

@luffyao
Copy link
Contributor

luffyao commented Jan 2, 2025

why did you think the FetchMessage function will submit offset automatically? according to code, it will not submit offset , and also it doesn't appear this issue in our env.

@yangtongbing
Copy link
Author

In my own tests, instead of committing, shouldCommit returning false will still automatically consume the next offset

@oggyunao
Copy link

oggyunao commented Jan 4, 2025

plz use QueueCapacity:1,and try again..

r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"127.0.0.1:9092"},
Topic: "test",
GroupID: "9",
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
QueueCapacity:1,
})

@yangtongbing
Copy link
Author

Ok, I'll try

@yangtongbing
Copy link
Author

I have tested it and configured QueueCapacity equal to 1 to automatically commit offset without CommitMessages being explicitly called

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants