Skip to content

Commit

Permalink
[Schemas] Add name, version to metadata, remove ID
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso committed Jun 12, 2024
1 parent 31b9227 commit b6e7093
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 58 deletions.
40 changes: 32 additions & 8 deletions opencdc/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ const (
// MetadataCollection is a Record.Metadata key for the name of the collection
// where the record originated from and/or where it should be stored.
MetadataCollection = "opencdc.collection"
// MetadataSchemaID is a Record.Metadata key for the ID of the schema of
// MetadataSchemaName is a Record.Metadata key for the name of the schema of
// the record's .Payload.After field.
MetadataSchemaID = "opencdc.schema.id"
MetadataSchemaName = "opencdc.schema.name"
// MetadataSchemaVersion is a Record.Metadata key for the version of the schema of
// the record's .Payload.After field.
MetadataSchemaVersion = "opencdc.schema.version"
// MetadataSchemaType is a Record.Metadata key for the type of the schema of
// the record's .Payload.After field.
MetadataSchemaType = "opencdc.schema.type"
Expand Down Expand Up @@ -241,15 +244,36 @@ func (m Metadata) SetConduitDLQNackNodeID(id string) {
m[MetadataConduitDLQNackNodeID] = id
}

// GetSchemaID returns the value for key MetadataSchemaID.
// GetSchemaName returns the value for key MetadataSchemaName.
// If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (m Metadata) GetSchemaName() (string, error) {
return m.getValue(MetadataSchemaName)
}

// SetSchemaName sets the metadata value for key MetadataSchemaName.
func (m Metadata) SetSchemaName(name string) {
m[MetadataSchemaName] = name
}

// GetSchemaVersion returns the value for key MetadataSchemaVersion.
// If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (m Metadata) GetSchemaID() (string, error) {
return m.getValue(MetadataSchemaID)
func (m Metadata) GetSchemaVersion() (int, error) {
vs, err := m.getValue(MetadataSchemaVersion)
if err != nil {
return 0, err
}

v, err := strconv.Atoi(vs)
if err != nil {
return 0, fmt.Errorf("invalid version %q: %w", vs, err)
}

return v, nil
}

// SetSchemaID sets the metadata value for key MetadataSchemaID.
func (m Metadata) SetSchemaID(id string) {
m[MetadataSchemaID] = id
// SetSchemaVersion sets the metadata value for key MetadataSchemaVersion.
func (m Metadata) SetSchemaVersion(version int) {
m[MetadataSchemaVersion] = strconv.Itoa(version)
}

// GetSchemaType returns the value for key MetadataSchemaType.
Expand Down
3 changes: 2 additions & 1 deletion opencdc/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ func TestMetadataConstants(t *testing.T) {
MetadataCreatedAt: opencdcv1.E_MetadataCreatedAt,
MetadataReadAt: opencdcv1.E_MetadataReadAt,
MetadataCollection: opencdcv1.E_MetadataCollection,
MetadataSchemaID: opencdcv1.E_MetadataSchemaId,
MetadataSchemaName: opencdcv1.E_MetadataSchemaName,
MetadataSchemaVersion: opencdcv1.E_MetadataSchemaVersion,
MetadataSchemaType: opencdcv1.E_MetadataSchemaType,

MetadataConduitSourcePluginName: metadatav1.E_MetadataConduitSourcePluginName,
Expand Down
112 changes: 67 additions & 45 deletions proto/opencdc/v1/opencdc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions proto/opencdc/v1/opencdc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ option (metadata_created_at) = "opencdc.createdAt";
option (metadata_read_at) = "opencdc.readAt";
option (metadata_version) = "opencdc.version";
option (metadata_collection) = "opencdc.collection";
option (metadata_schema_id) = "opencdc.schema.id";
option (metadata_schema_name) = "opencdc.schema.name";
option (metadata_schema_version) = "opencdc.schema.version";
option (metadata_schema_type) = "opencdc.schema.type";

// We are (ab)using custom file options to define constants.
Expand Down Expand Up @@ -41,12 +42,15 @@ extend google.protobuf.FileOptions {
// Metadata field "opencdc.collection" can contain the name of the collection
// where the record originated from and/or where it should be stored.
string metadata_collection = 10003;
// Metadata field "opencdc.schema.id" contains the ID of the schema of
// Metadata field "opencdc.schema.name" contains the name of the schema of
// the record's .Payload.After field.
string metadata_schema_id = 10004;
string metadata_schema_name = 10004;

Check failure on line 47 in proto/opencdc/v1/opencdc.proto

View workflow job for this annotation

GitHub Actions / validate

Extension "10004" on message "google.protobuf.FileOptions" changed name from "opencdc.v1.metadata_schema_id" to "opencdc.v1.metadata_schema_name".
// Metadata field "opencdc.schema.version" contains the version of the schema of
// the record's .Payload.After field.
string metadata_schema_version = 10005;

Check failure on line 50 in proto/opencdc/v1/opencdc.proto

View workflow job for this annotation

GitHub Actions / validate

Extension "10005" on message "google.protobuf.FileOptions" changed name from "opencdc.v1.metadata_schema_type" to "opencdc.v1.metadata_schema_version".
// Metadata field "opencdc.schema.type" contains the type of the schema of
// the record's .Payload.After field (currently, the only supported value is "avro")
string metadata_schema_type = 10005;
string metadata_schema_type = 10006;
}

// Operation defines what triggered the creation of a record.
Expand Down

0 comments on commit b6e7093

Please sign in to comment.