Skip to content

Commit

Permalink
add multiple topic test
Browse files Browse the repository at this point in the history
  • Loading branch information
maha-hajja committed Apr 12, 2024
1 parent 6908c10 commit 5fff8b0
Showing 1 changed file with 41 additions and 5 deletions.
46 changes: 41 additions & 5 deletions source/franz_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ func TestFranzConsumer_Consume_FromBeginning(t *testing.T) {
is := is.New(t)
ctx := context.Background()

cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, true))
cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, false))
cfg.ReadFromBeginning = true

records := test.GenerateFranzRecords(1, 6)
test.CreateTopics(t, cfg.Servers, cfg.Topic)
test.Produce(t, cfg.Servers, cfg.Topic[0], records[0:3])
test.Produce(t, cfg.Servers, cfg.Topic[1], records[3:])
test.Produce(t, cfg.Servers, cfg.Topic[0], records)

c, err := NewFranzConsumer(ctx, cfg)
is.NoErr(err)
Expand All @@ -57,7 +56,7 @@ func TestFranzConsumer_Consume_LastOffset(t *testing.T) {
is := is.New(t)
ctx := context.Background()

cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, true))
cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, false))
cfg.ReadFromBeginning = false

records := test.GenerateFranzRecords(1, 6)
Expand All @@ -78,7 +77,7 @@ func TestFranzConsumer_Consume_LastOffset(t *testing.T) {
is.Equal(got, nil)

records = test.GenerateFranzRecords(7, 9)
test.Produce(t, cfg.Servers, cfg.Topic[1], records)
test.Produce(t, cfg.Servers, cfg.Topic[0], records)

for i := 0; i < len(records); i++ {
ctx, cancel := context.WithTimeout(ctx, time.Second)
Expand All @@ -88,3 +87,40 @@ func TestFranzConsumer_Consume_LastOffset(t *testing.T) {
is.Equal(got.Key, records[i].Key)
}
}

func TestFranzConsumer_Consume_MultipleTopics(t *testing.T) {
t.Parallel()
is := is.New(t)
ctx := context.Background()

cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, true))
cfg.ReadFromBeginning = true

records := test.GenerateFranzRecords(1, 6)
test.CreateTopics(t, cfg.Servers, cfg.Topic)
test.Produce(t, cfg.Servers, cfg.Topic[0], records[0:3])
test.Produce(t, cfg.Servers, cfg.Topic[1], records[3:])

c, err := NewFranzConsumer(ctx, cfg)
is.NoErr(err)
defer func() {
err := c.Close(ctx)
is.NoErr(err)
}()

topic1 := 0
topic2 := 0
for i := 0; i < len(records); i++ {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
got, err := c.Consume(ctx)
is.NoErr(err)
if got.Topic == cfg.Topic[0] {
topic1++
} else if got.Topic == cfg.Topic[1] {
topic2++
}
}
is.Equal(topic1, 3)
is.Equal(topic2, 3)
}

0 comments on commit 5fff8b0

Please sign in to comment.