diff --git a/source/franz_integration_test.go b/source/franz_integration_test.go index 607f73c..588c850 100644 --- a/source/franz_integration_test.go +++ b/source/franz_integration_test.go @@ -28,13 +28,12 @@ func TestFranzConsumer_Consume_FromBeginning(t *testing.T) { is := is.New(t) ctx := context.Background() - cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, true)) + cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, false)) cfg.ReadFromBeginning = true records := test.GenerateFranzRecords(1, 6) test.CreateTopics(t, cfg.Servers, cfg.Topic) - test.Produce(t, cfg.Servers, cfg.Topic[0], records[0:3]) - test.Produce(t, cfg.Servers, cfg.Topic[1], records[3:]) + test.Produce(t, cfg.Servers, cfg.Topic[0], records) c, err := NewFranzConsumer(ctx, cfg) is.NoErr(err) @@ -57,7 +56,7 @@ func TestFranzConsumer_Consume_LastOffset(t *testing.T) { is := is.New(t) ctx := context.Background() - cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, true)) + cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, false)) cfg.ReadFromBeginning = false records := test.GenerateFranzRecords(1, 6) @@ -78,7 +77,7 @@ func TestFranzConsumer_Consume_LastOffset(t *testing.T) { is.Equal(got, nil) records = test.GenerateFranzRecords(7, 9) - test.Produce(t, cfg.Servers, cfg.Topic[1], records) + test.Produce(t, cfg.Servers, cfg.Topic[0], records) for i := 0; i < len(records); i++ { ctx, cancel := context.WithTimeout(ctx, time.Second) @@ -88,3 +87,40 @@ func TestFranzConsumer_Consume_LastOffset(t *testing.T) { is.Equal(got.Key, records[i].Key) } } + +func TestFranzConsumer_Consume_MultipleTopics(t *testing.T) { + t.Parallel() + is := is.New(t) + ctx := context.Background() + + cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, true)) + cfg.ReadFromBeginning = true + + records := test.GenerateFranzRecords(1, 6) + test.CreateTopics(t, cfg.Servers, cfg.Topic) + test.Produce(t, cfg.Servers, cfg.Topic[0], records[0:3]) + test.Produce(t, cfg.Servers, cfg.Topic[1], records[3:]) + + c, err := NewFranzConsumer(ctx, cfg) + is.NoErr(err) + defer func() { + err := c.Close(ctx) + is.NoErr(err) + }() + + topic1 := 0 + topic2 := 0 + for i := 0; i < len(records); i++ { + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + got, err := c.Consume(ctx) + is.NoErr(err) + if got.Topic == cfg.Topic[0] { + topic1++ + } else if got.Topic == cfg.Topic[1] { + topic2++ + } + } + is.Equal(topic1, 3) + is.Equal(topic2, 3) +}