diff --git a/pkg/event_handler/consumer/consumer_test.go b/pkg/event_handler/consumer/consumer_test.go index aa39aa1..8b8a4f0 100644 --- a/pkg/event_handler/consumer/consumer_test.go +++ b/pkg/event_handler/consumer/consumer_test.go @@ -18,6 +18,7 @@ type Fixture struct { type MockConsumer struct { pollData kafka.Event + close func() } func (m *MockConsumer) SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) (err error) { @@ -36,6 +37,9 @@ func (m *MockConsumer) SubscribeTopics(topics []string, rebalanceCb kafka.Rebala } func (m *MockConsumer) Poll(timeoutMs int) (event kafka.Event) { + defer func() { + m.close() + }() return m.pollData } @@ -50,6 +54,7 @@ func (m *MockConsumer) Close() (err error) { func NewFixture(t *testing.T) *Fixture { mock := &MockConsumer{} consumer, err := NewConsumer(&config.KafkaConfiguration{}, mock) + mock.close = consumer.Close if err != nil { t.Fatalf("Error creating consumer: %v", err) }