Skip to content

Commit

Permalink
handle empty parsed topic (#150)
Browse files Browse the repository at this point in the history
* handle empty parsed topic

* add more context to error

* apply pr suggestions

* rename test

* fix record key bytes to string in error

* fix failing test

---------

Co-authored-by: Guillem <[email protected]>
  • Loading branch information
alarbada and Guillem authored Jun 12, 2024
1 parent 9bfe3e1 commit 9f09470
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 1 deletion.
10 changes: 9 additions & 1 deletion destination/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ func (c Config) ParseTopic() (topic string, f TopicFn, err error) {
if err := t.Execute(&buf, r); err != nil {
return "", fmt.Errorf("failed to execute topic template: %w", err)
}
return buf.String(), nil
topic := buf.String()
if topic == "" {
return "", fmt.Errorf(
"topic not found on record %s using template %s",
string(r.Key.Bytes()), c.Topic,
)
}

return topic, nil
}, nil
}
24 changes: 24 additions & 0 deletions destination/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"strings"
"testing"

sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/matryer/is"
)

Expand Down Expand Up @@ -85,3 +86,26 @@ func TestConfig_Validate(t *testing.T) {
})
}
}

func TestConfig_ParseTopic_DoesErrorOnTopicNotFound(t *testing.T) {
is := is.New(t)
template := `{{ index .Metadata "topiccc" }}`

cfg := Config{Topic: template}
topic, getTopic, err := cfg.ParseTopic()
is.NoErr(err)

is.Equal(topic, "")

rec := sdk.Record{
Key: sdk.RawData("testkey"),
Metadata: map[string]string{
"topic": "testtopic",
},
}
topic, err = getTopic(rec)
is.True(err != nil) // expected error on topic not found
is.True(strings.Contains(err.Error(), "topic not found on record")) // expected topic not found error

is.Equal(topic, "")
}

0 comments on commit 9f09470

Please sign in to comment.