Skip to content

Commit

Permalink
attach schema to record
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso committed Jun 13, 2024
1 parent b6e7093 commit 061edf8
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 6 deletions.
4 changes: 3 additions & 1 deletion opencdc/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ var (
// ErrUnknownOperation is returned when trying to parse an Operation string
// and encountering an unknown operation.
ErrUnknownOperation = errors.New("unknown operation")

// ErrUnknownSchemaType is returned when trying to parse an Operation string
// and encountering an unknown operation.
ErrUnknownSchemaType = errors.New("unknown schema type")
// ErrInvalidProtoDataType is returned when trying to convert a proto data
// type to raw or structured data.
ErrInvalidProtoDataType = errors.New("invalid proto data type")
Expand Down
19 changes: 15 additions & 4 deletions opencdc/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package opencdc

import (
"fmt"
"github.com/conduitio/conduit-commons/schema"
"strconv"
"time"
)
Expand Down Expand Up @@ -278,13 +279,23 @@ func (m Metadata) SetSchemaVersion(version int) {

// GetSchemaType returns the value for key MetadataSchemaType.
// If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (m Metadata) GetSchemaType() (string, error) {
return m.getValue(MetadataSchemaType)
func (m Metadata) GetSchemaType() (schema.Type, error) {
typeString, err := m.getValue(MetadataSchemaType)
if err != nil {
return 0, err
}

switch typeString {
case schema.TypeAvro.String():
return schema.TypeAvro, nil
default:
return 0, fmt.Errorf("%q: %w", typeString, ErrUnknownSchemaType)
}
}

// SetSchemaType sets the metadata value for key MetadataSchemaType.
func (m Metadata) SetSchemaType(typeStr string) {
m[MetadataSchemaType] = typeStr
func (m Metadata) SetSchemaType(t schema.Type) {
m[MetadataSchemaType] = t.String()
}

// getValue returns the value for a specific key. If the value does not exist or
Expand Down
7 changes: 7 additions & 0 deletions opencdc/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package opencdc
import (
"bytes"
"fmt"
"github.com/conduitio/conduit-commons/schema"

"github.com/goccy/go-json"
)
Expand Down Expand Up @@ -147,3 +148,9 @@ func (r Record) Clone() Record {
}
return clone
}

func (r Record) AttachSchema(s schema.Instance) {
r.Metadata.SetSchemaType(schema.TypeAvro)
r.Metadata.SetSchemaName(s.Name)
r.Metadata.SetSchemaVersion(s.Version)
}
4 changes: 3 additions & 1 deletion schema/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:generate stringer -type=Type -linecomment

package schema

type Type int32
Expand All @@ -23,7 +25,7 @@ const (
type Instance struct {
ID string
Name string
Version int32
Version int
Type Type
Bytes []byte
}
24 changes: 24 additions & 0 deletions schema/type_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 061edf8

Please sign in to comment.