Skip to content

Commit

Permalink
[Schemas] Add metadata (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso authored May 17, 2024
1 parent 9ee66f9 commit 38b6036
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 23 deletions.
28 changes: 28 additions & 0 deletions opencdc/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ const (
// MetadataCollection is a Record.Metadata key for the name of the collection
// where the record originated from and/or where it should be stored.
MetadataCollection = "opencdc.collection"
// MetadataSchemaID is a Record.Metadata key for the ID of the schema of
// the record's .Payload.After field.
MetadataSchemaID = "opencdc.schema.id"
// MetadataSchemaType is a Record.Metadata key for the type of the schema of
// the record's .Payload.After field.
MetadataSchemaType = "opencdc.schema.type"

// MetadataConduitSourcePluginName is a Record.Metadata key for the name of
// the source plugin that created this record.
Expand Down Expand Up @@ -235,6 +241,28 @@ func (m Metadata) SetConduitDLQNackNodeID(id string) {
m[MetadataConduitDLQNackNodeID] = id
}

// GetSchemaID returns the value for key MetadataSchemaID.
// If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (m Metadata) GetSchemaID() (string, error) {
return m.getValue(MetadataSchemaID)
}

// SetSchemaID sets the metadata value for key MetadataSchemaID.
func (m Metadata) SetSchemaID(id string) {
m[MetadataSchemaID] = id
}

// GetSchemaType returns the value for key MetadataSchemaType.
// If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (m Metadata) GetSchemaType() (string, error) {
return m.getValue(MetadataSchemaType)
}

// SetSchemaType sets the metadata value for key MetadataSchemaType.
func (m Metadata) SetSchemaType(typeStr string) {
m[MetadataSchemaType] = typeStr
}

// getValue returns the value for a specific key. If the value does not exist or
// is empty the function returns ErrMetadataFieldNotFound.
func (m Metadata) getValue(key string) (string, error) {
Expand Down
2 changes: 2 additions & 0 deletions opencdc/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func TestMetadataConstants(t *testing.T) {
MetadataCreatedAt: opencdcv1.E_MetadataCreatedAt,
MetadataReadAt: opencdcv1.E_MetadataReadAt,
MetadataCollection: opencdcv1.E_MetadataCollection,
MetadataSchemaID: opencdcv1.E_MetadataSchemaId,
MetadataSchemaType: opencdcv1.E_MetadataSchemaType,

MetadataConduitSourcePluginName: metadatav1.E_MetadataConduitSourcePluginName,
MetadataConduitSourcePluginVersion: metadatav1.E_MetadataConduitSourcePluginVersion,
Expand Down
86 changes: 63 additions & 23 deletions proto/opencdc/v1/opencdc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions proto/opencdc/v1/opencdc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ option (metadata_created_at) = "opencdc.createdAt";
option (metadata_read_at) = "opencdc.readAt";
option (metadata_version) = "opencdc.version";
option (metadata_collection) = "opencdc.collection";
option (metadata_schema_id) = "opencdc.schema.id";
option (metadata_schema_type) = "opencdc.schema.type";

// We are (ab)using custom file options to define constants.
// See https://github.com/protocolbuffers/protobuf/issues/3520#issuecomment-323613839
Expand All @@ -39,6 +41,12 @@ extend google.protobuf.FileOptions {
// Metadata field "opencdc.collection" can contain the name of the collection
// where the record originated from and/or where it should be stored.
string metadata_collection = 10003;
// Metadata field "opencdc.schema.id" contains the ID of the schema of
// the record's .Payload.After field.
string metadata_schema_id = 10004;
// Metadata field "opencdc.schema.type" contains the type of the schema of
// the record's .Payload.After field (currently, the only supported value is "avro")
string metadata_schema_type = 10005;
}

// Operation defines what triggered the creation of a record.
Expand Down

0 comments on commit 38b6036

Please sign in to comment.