From 38b603646cb9a18b0d7f8ed51fedbd4db6056da8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Haris=20Osmanagi=C4=87?= Date: Fri, 17 May 2024 11:18:00 +0200 Subject: [PATCH] [Schemas] Add metadata (#54) --- opencdc/metadata.go | 28 +++++++++++ opencdc/metadata_test.go | 2 + proto/opencdc/v1/opencdc.pb.go | 86 +++++++++++++++++++++++++--------- proto/opencdc/v1/opencdc.proto | 8 ++++ 4 files changed, 101 insertions(+), 23 deletions(-) diff --git a/opencdc/metadata.go b/opencdc/metadata.go index 21b4717..0086bbc 100644 --- a/opencdc/metadata.go +++ b/opencdc/metadata.go @@ -43,6 +43,12 @@ const ( // MetadataCollection is a Record.Metadata key for the name of the collection // where the record originated from and/or where it should be stored. MetadataCollection = "opencdc.collection" + // MetadataSchemaID is a Record.Metadata key for the ID of the schema of + // the record's .Payload.After field. + MetadataSchemaID = "opencdc.schema.id" + // MetadataSchemaType is a Record.Metadata key for the type of the schema of + // the record's .Payload.After field. + MetadataSchemaType = "opencdc.schema.type" // MetadataConduitSourcePluginName is a Record.Metadata key for the name of // the source plugin that created this record. @@ -235,6 +241,28 @@ func (m Metadata) SetConduitDLQNackNodeID(id string) { m[MetadataConduitDLQNackNodeID] = id } +// GetSchemaID returns the value for key MetadataSchemaID. +// If the value does not exist or is empty the function returns ErrMetadataFieldNotFound. +func (m Metadata) GetSchemaID() (string, error) { + return m.getValue(MetadataSchemaID) +} + +// SetSchemaID sets the metadata value for key MetadataSchemaID. +func (m Metadata) SetSchemaID(id string) { + m[MetadataSchemaID] = id +} + +// GetSchemaType returns the value for key MetadataSchemaType. +// If the value does not exist or is empty the function returns ErrMetadataFieldNotFound. +func (m Metadata) GetSchemaType() (string, error) { + return m.getValue(MetadataSchemaType) +} + +// SetSchemaType sets the metadata value for key MetadataSchemaType. +func (m Metadata) SetSchemaType(typeStr string) { + m[MetadataSchemaType] = typeStr +} + // getValue returns the value for a specific key. If the value does not exist or // is empty the function returns ErrMetadataFieldNotFound. func (m Metadata) getValue(key string) (string, error) { diff --git a/opencdc/metadata_test.go b/opencdc/metadata_test.go index 869c4d0..ababe79 100644 --- a/opencdc/metadata_test.go +++ b/opencdc/metadata_test.go @@ -30,6 +30,8 @@ func TestMetadataConstants(t *testing.T) { MetadataCreatedAt: opencdcv1.E_MetadataCreatedAt, MetadataReadAt: opencdcv1.E_MetadataReadAt, MetadataCollection: opencdcv1.E_MetadataCollection, + MetadataSchemaID: opencdcv1.E_MetadataSchemaId, + MetadataSchemaType: opencdcv1.E_MetadataSchemaType, MetadataConduitSourcePluginName: metadatav1.E_MetadataConduitSourcePluginName, MetadataConduitSourcePluginVersion: metadatav1.E_MetadataConduitSourcePluginVersion, diff --git a/proto/opencdc/v1/opencdc.pb.go b/proto/opencdc/v1/opencdc.pb.go index bfd18f0..601126b 100644 --- a/proto/opencdc/v1/opencdc.pb.go +++ b/proto/opencdc/v1/opencdc.pb.go @@ -363,6 +363,22 @@ var file_opencdc_v1_opencdc_proto_extTypes = []protoimpl.ExtensionInfo{ Tag: "bytes,10003,opt,name=metadata_collection", Filename: "opencdc/v1/opencdc.proto", }, + { + ExtendedType: (*descriptorpb.FileOptions)(nil), + ExtensionType: (*string)(nil), + Field: 10004, + Name: "opencdc.v1.metadata_schema_id", + Tag: "bytes,10004,opt,name=metadata_schema_id", + Filename: "opencdc/v1/opencdc.proto", + }, + { + ExtendedType: (*descriptorpb.FileOptions)(nil), + ExtensionType: (*string)(nil), + Field: 10005, + Name: "opencdc.v1.metadata_schema_type", + Tag: "bytes,10005,opt,name=metadata_schema_type", + Filename: "opencdc/v1/opencdc.proto", + }, } // Extension fields to descriptorpb.FileOptions. @@ -398,6 +414,16 @@ var ( // // optional string metadata_collection = 10003; E_MetadataCollection = &file_opencdc_v1_opencdc_proto_extTypes[4] + // Metadata field "opencdc.schema.id" contains the ID of the schema of + // the record's .Payload.After field. + // + // optional string metadata_schema_id = 10004; + E_MetadataSchemaId = &file_opencdc_v1_opencdc_proto_extTypes[5] + // Metadata field "opencdc.schema.type" contains the type of the schema of + // the record's .Payload.After field (currently, the only supported value is "avro") + // + // optional string metadata_schema_type = 10005; + E_MetadataSchemaType = &file_opencdc_v1_opencdc_proto_extTypes[6] ) var File_opencdc_v1_opencdc_proto protoreflect.FileDescriptor @@ -472,24 +498,36 @@ var file_opencdc_v1_opencdc_proto_rawDesc = []byte{ 0x12, 0x1c, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x93, 0x4e, 0x20, 0x01, 0x28, 0x09, 0x52, 0x12, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x43, - 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0xfe, 0x01, 0xfa, 0xf0, 0x04, 0x02, - 0x76, 0x31, 0x82, 0xf1, 0x04, 0x0f, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x2e, 0x76, 0x65, - 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x8a, 0xf1, 0x04, 0x11, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, - 0x2e, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x92, 0xf1, 0x04, 0x0e, 0x6f, 0x70, - 0x65, 0x6e, 0x63, 0x64, 0x63, 0x2e, 0x72, 0x65, 0x61, 0x64, 0x41, 0x74, 0x9a, 0xf1, 0x04, 0x12, - 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x2e, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x0a, 0x0e, 0x63, 0x6f, 0x6d, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x2e, - 0x76, 0x31, 0x42, 0x0c, 0x4f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x50, 0x72, 0x6f, 0x74, 0x6f, - 0x50, 0x01, 0x5a, 0x3f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, - 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x69, 0x6f, 0x2f, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, - 0x2d, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6f, - 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x2f, 0x76, 0x31, 0x3b, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x64, - 0x63, 0x76, 0x31, 0xa2, 0x02, 0x03, 0x4f, 0x58, 0x58, 0xaa, 0x02, 0x0a, 0x4f, 0x70, 0x65, 0x6e, - 0x63, 0x64, 0x63, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x0a, 0x4f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, - 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x16, 0x4f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x5c, 0x56, 0x31, - 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0b, 0x4f, - 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x3a, 0x4b, 0x0a, 0x12, 0x6d, 0x65, 0x74, + 0x61, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x69, 0x64, 0x12, + 0x1c, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x94, 0x4e, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x53, 0x63, + 0x68, 0x65, 0x6d, 0x61, 0x49, 0x64, 0x3a, 0x4f, 0x0a, 0x14, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, + 0x74, 0x61, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x12, 0x1c, + 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x95, 0x4e, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x12, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x53, 0x63, 0x68, + 0x65, 0x6d, 0x61, 0x54, 0x79, 0x70, 0x65, 0x42, 0xaa, 0x02, 0xfa, 0xf0, 0x04, 0x02, 0x76, 0x31, + 0x82, 0xf1, 0x04, 0x0f, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x2e, 0x76, 0x65, 0x72, 0x73, + 0x69, 0x6f, 0x6e, 0x8a, 0xf1, 0x04, 0x11, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x2e, 0x63, + 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x92, 0xf1, 0x04, 0x0e, 0x6f, 0x70, 0x65, 0x6e, + 0x63, 0x64, 0x63, 0x2e, 0x72, 0x65, 0x61, 0x64, 0x41, 0x74, 0x9a, 0xf1, 0x04, 0x12, 0x6f, 0x70, + 0x65, 0x6e, 0x63, 0x64, 0x63, 0x2e, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, + 0xa2, 0xf1, 0x04, 0x11, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x2e, 0x73, 0x63, 0x68, 0x65, + 0x6d, 0x61, 0x2e, 0x69, 0x64, 0xaa, 0xf1, 0x04, 0x13, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, + 0x2e, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x2e, 0x74, 0x79, 0x70, 0x65, 0x0a, 0x0e, 0x63, 0x6f, + 0x6d, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x2e, 0x76, 0x31, 0x42, 0x0c, 0x4f, 0x70, + 0x65, 0x6e, 0x63, 0x64, 0x63, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x3f, 0x67, 0x69, + 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, + 0x69, 0x6f, 0x2f, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x2d, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, + 0x6e, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, + 0x2f, 0x76, 0x31, 0x3b, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x76, 0x31, 0xa2, 0x02, 0x03, + 0x4f, 0x58, 0x58, 0xaa, 0x02, 0x0a, 0x4f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x2e, 0x56, 0x31, + 0xca, 0x02, 0x0a, 0x4f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x16, + 0x4f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, + 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0b, 0x4f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, + 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -528,10 +566,12 @@ var file_opencdc_v1_opencdc_proto_depIdxs = []int32{ 6, // 9: opencdc.v1.metadata_created_at:extendee -> google.protobuf.FileOptions 6, // 10: opencdc.v1.metadata_read_at:extendee -> google.protobuf.FileOptions 6, // 11: opencdc.v1.metadata_collection:extendee -> google.protobuf.FileOptions - 12, // [12:12] is the sub-list for method output_type - 12, // [12:12] is the sub-list for method input_type - 12, // [12:12] is the sub-list for extension type_name - 7, // [7:12] is the sub-list for extension extendee + 6, // 12: opencdc.v1.metadata_schema_id:extendee -> google.protobuf.FileOptions + 6, // 13: opencdc.v1.metadata_schema_type:extendee -> google.protobuf.FileOptions + 14, // [14:14] is the sub-list for method output_type + 14, // [14:14] is the sub-list for method input_type + 14, // [14:14] is the sub-list for extension type_name + 7, // [7:14] is the sub-list for extension extendee 0, // [0:7] is the sub-list for field type_name } @@ -589,7 +629,7 @@ func file_opencdc_v1_opencdc_proto_init() { RawDescriptor: file_opencdc_v1_opencdc_proto_rawDesc, NumEnums: 1, NumMessages: 4, - NumExtensions: 5, + NumExtensions: 7, NumServices: 0, }, GoTypes: file_opencdc_v1_opencdc_proto_goTypes, diff --git a/proto/opencdc/v1/opencdc.proto b/proto/opencdc/v1/opencdc.proto index 14292f1..a5e234d 100644 --- a/proto/opencdc/v1/opencdc.proto +++ b/proto/opencdc/v1/opencdc.proto @@ -13,6 +13,8 @@ option (metadata_created_at) = "opencdc.createdAt"; option (metadata_read_at) = "opencdc.readAt"; option (metadata_version) = "opencdc.version"; option (metadata_collection) = "opencdc.collection"; +option (metadata_schema_id) = "opencdc.schema.id"; +option (metadata_schema_type) = "opencdc.schema.type"; // We are (ab)using custom file options to define constants. // See https://github.com/protocolbuffers/protobuf/issues/3520#issuecomment-323613839 @@ -39,6 +41,12 @@ extend google.protobuf.FileOptions { // Metadata field "opencdc.collection" can contain the name of the collection // where the record originated from and/or where it should be stored. string metadata_collection = 10003; + // Metadata field "opencdc.schema.id" contains the ID of the schema of + // the record's .Payload.After field. + string metadata_schema_id = 10004; + // Metadata field "opencdc.schema.type" contains the type of the schema of + // the record's .Payload.After field (currently, the only supported value is "avro") + string metadata_schema_type = 10005; } // Operation defines what triggered the creation of a record.