diff --git a/destination/config.go b/destination/config.go index 67622c2..9107fc5 100644 --- a/destination/config.go +++ b/destination/config.go @@ -149,6 +149,14 @@ func (c Config) ParseTopic() (topic string, f TopicFn, err error) { if err := t.Execute(&buf, r); err != nil { return "", fmt.Errorf("failed to execute topic template: %w", err) } - return buf.String(), nil + topic := buf.String() + if topic == "" { + return "", fmt.Errorf( + "topic not found on record %s using template %s", + string(r.Key.Bytes()), c.Topic, + ) + } + + return topic, nil }, nil } diff --git a/destination/config_test.go b/destination/config_test.go index d55a0b0..fde7e49 100644 --- a/destination/config_test.go +++ b/destination/config_test.go @@ -18,6 +18,7 @@ import ( "strings" "testing" + sdk "github.com/conduitio/conduit-connector-sdk" "github.com/matryer/is" ) @@ -85,3 +86,26 @@ func TestConfig_Validate(t *testing.T) { }) } } + +func TestConfig_ParseTopic_DoesErrorOnTopicNotFound(t *testing.T) { + is := is.New(t) + template := `{{ index .Metadata "topiccc" }}` + + cfg := Config{Topic: template} + topic, getTopic, err := cfg.ParseTopic() + is.NoErr(err) + + is.Equal(topic, "") + + rec := sdk.Record{ + Key: sdk.RawData("testkey"), + Metadata: map[string]string{ + "topic": "testtopic", + }, + } + topic, err = getTopic(rec) + is.True(err != nil) // expected error on topic not found + is.True(strings.Contains(err.Error(), "topic not found on record")) // expected topic not found error + + is.Equal(topic, "") +}