diff --git a/opencdc/metadata.go b/opencdc/metadata.go index 0d83772..21b4717 100644 --- a/opencdc/metadata.go +++ b/opencdc/metadata.go @@ -40,6 +40,9 @@ const ( // read from the 3rd party system. The expected format is a unix timestamp // in nanoseconds. MetadataReadAt = "opencdc.readAt" + // MetadataCollection is a Record.Metadata key for the name of the collection + // where the record originated from and/or where it should be stored. + MetadataCollection = "opencdc.collection" // MetadataConduitSourcePluginName is a Record.Metadata key for the name of // the source plugin that created this record. @@ -57,7 +60,7 @@ const ( MetadataConduitDestinationPluginVersion = "conduit.destination.plugin.version" // MetadataConduitSourceConnectorID is a Record.Metadata key for the ID of - // the source connector that received this record. + // the source connector that produced this record. MetadataConduitSourceConnectorID = "conduit.source.connector.id" // MetadataConduitDLQNackError is a Record.Metadata key for the error that // caused a record to be nacked and pushed to the dead-letter queue. @@ -130,6 +133,17 @@ func (m Metadata) SetReadAt(createdAt time.Time) { m[MetadataReadAt] = strconv.FormatInt(createdAt.UnixNano(), 10) } +// GetCollection returns the value for key MetadataCollection. If the value does +// not exist or is empty the function returns ErrMetadataFieldNotFound. +func (m Metadata) GetCollection() (string, error) { + return m.getValue(MetadataCollection) +} + +// SetCollection sets the metadata value for key MetadataCollection. +func (m Metadata) SetCollection(collection string) { + m[MetadataCollection] = collection +} + // GetConduitSourcePluginName returns the value for key // MetadataConduitSourcePluginName. If the value does not exist or is empty the // function returns ErrMetadataFieldNotFound. diff --git a/opencdc/metadata_test.go b/opencdc/metadata_test.go index 4ba4893..869c4d0 100644 --- a/opencdc/metadata_test.go +++ b/opencdc/metadata_test.go @@ -17,6 +17,7 @@ package opencdc import ( "testing" + metadatav1 "github.com/conduitio/conduit-commons/proto/metadata/v1" opencdcv1 "github.com/conduitio/conduit-commons/proto/opencdc/v1" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/runtime/protoimpl" @@ -28,6 +29,15 @@ func TestMetadataConstants(t *testing.T) { MetadataOpenCDCVersion: opencdcv1.E_MetadataVersion, MetadataCreatedAt: opencdcv1.E_MetadataCreatedAt, MetadataReadAt: opencdcv1.E_MetadataReadAt, + MetadataCollection: opencdcv1.E_MetadataCollection, + + MetadataConduitSourcePluginName: metadatav1.E_MetadataConduitSourcePluginName, + MetadataConduitSourcePluginVersion: metadatav1.E_MetadataConduitSourcePluginVersion, + MetadataConduitDestinationPluginName: metadatav1.E_MetadataConduitDestinationPluginName, + MetadataConduitDestinationPluginVersion: metadatav1.E_MetadataConduitDestinationPluginVersion, + MetadataConduitSourceConnectorID: metadatav1.E_MetadataConduitSourceConnectorId, + MetadataConduitDLQNackError: metadatav1.E_MetadataConduitDlqNackError, + MetadataConduitDLQNackNodeID: metadatav1.E_MetadataConduitDlqNackNodeId, } for goConstant, extensionInfo := range wantMapping { protoConstant := proto.GetExtension(extensionInfo.TypeDescriptor().ParentFile().Options(), extensionInfo) diff --git a/proto/metadata/v1/constants.pb.go b/proto/metadata/v1/constants.pb.go new file mode 100644 index 0000000..d83b2a2 --- /dev/null +++ b/proto/metadata/v1/constants.pb.go @@ -0,0 +1,249 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.31.0 +// protoc (unknown) +// source: metadata/v1/constants.proto + +package metadatav1 + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + descriptorpb "google.golang.org/protobuf/types/descriptorpb" + reflect "reflect" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +var file_metadata_v1_constants_proto_extTypes = []protoimpl.ExtensionInfo{ + { + ExtendedType: (*descriptorpb.FileOptions)(nil), + ExtensionType: (*string)(nil), + Field: 20000, + Name: "metadata.v1.metadata_conduit_source_plugin_name", + Tag: "bytes,20000,opt,name=metadata_conduit_source_plugin_name", + Filename: "metadata/v1/constants.proto", + }, + { + ExtendedType: (*descriptorpb.FileOptions)(nil), + ExtensionType: (*string)(nil), + Field: 20001, + Name: "metadata.v1.metadata_conduit_source_plugin_version", + Tag: "bytes,20001,opt,name=metadata_conduit_source_plugin_version", + Filename: "metadata/v1/constants.proto", + }, + { + ExtendedType: (*descriptorpb.FileOptions)(nil), + ExtensionType: (*string)(nil), + Field: 20002, + Name: "metadata.v1.metadata_conduit_destination_plugin_name", + Tag: "bytes,20002,opt,name=metadata_conduit_destination_plugin_name", + Filename: "metadata/v1/constants.proto", + }, + { + ExtendedType: (*descriptorpb.FileOptions)(nil), + ExtensionType: (*string)(nil), + Field: 20003, + Name: "metadata.v1.metadata_conduit_destination_plugin_version", + Tag: "bytes,20003,opt,name=metadata_conduit_destination_plugin_version", + Filename: "metadata/v1/constants.proto", + }, + { + ExtendedType: (*descriptorpb.FileOptions)(nil), + ExtensionType: (*string)(nil), + Field: 20004, + Name: "metadata.v1.metadata_conduit_source_connector_id", + Tag: "bytes,20004,opt,name=metadata_conduit_source_connector_id", + Filename: "metadata/v1/constants.proto", + }, + { + ExtendedType: (*descriptorpb.FileOptions)(nil), + ExtensionType: (*string)(nil), + Field: 20005, + Name: "metadata.v1.metadata_conduit_dlq_nack_error", + Tag: "bytes,20005,opt,name=metadata_conduit_dlq_nack_error", + Filename: "metadata/v1/constants.proto", + }, + { + ExtendedType: (*descriptorpb.FileOptions)(nil), + ExtensionType: (*string)(nil), + Field: 20006, + Name: "metadata.v1.metadata_conduit_dlq_nack_node_id", + Tag: "bytes,20006,opt,name=metadata_conduit_dlq_nack_node_id", + Filename: "metadata/v1/constants.proto", + }, +} + +// Extension fields to descriptorpb.FileOptions. +var ( + // Metadata field "conduit.source.plugin.name" contains the name of the source + // plugin that created this record. + // + // optional string metadata_conduit_source_plugin_name = 20000; + E_MetadataConduitSourcePluginName = &file_metadata_v1_constants_proto_extTypes[0] + // Metadata field "conduit.source.plugin.version" contains the version of the + // source plugin that created this record. + // + // optional string metadata_conduit_source_plugin_version = 20001; + E_MetadataConduitSourcePluginVersion = &file_metadata_v1_constants_proto_extTypes[1] + // Metadata field "conduit.destination.plugin.name" contains the name of the + // destination plugin that has written this record (only available in records + // once they are written by a destination). + // + // optional string metadata_conduit_destination_plugin_name = 20002; + E_MetadataConduitDestinationPluginName = &file_metadata_v1_constants_proto_extTypes[2] + // Metadata field "conduit.destination.plugin.version" contains the version of + // the destination plugin that has written this record (only available in + // records once they are written by a destination). + // + // optional string metadata_conduit_destination_plugin_version = 20003; + E_MetadataConduitDestinationPluginVersion = &file_metadata_v1_constants_proto_extTypes[3] + // Metadata field "conduit.source.connector.id" contains the ID of the source + // connector that produced this record. + // + // optional string metadata_conduit_source_connector_id = 20004; + E_MetadataConduitSourceConnectorId = &file_metadata_v1_constants_proto_extTypes[4] + // Metadata field "conduit.dlq.nack.error" contains the error that caused a + // record to be nacked and pushed to the dead-letter queue. + // + // optional string metadata_conduit_dlq_nack_error = 20005; + E_MetadataConduitDlqNackError = &file_metadata_v1_constants_proto_extTypes[5] + // Metadata field "conduit.dlq.nack.node.id" contains the ID of the internal + // node that nacked the record. + // + // optional string metadata_conduit_dlq_nack_node_id = 20006; + E_MetadataConduitDlqNackNodeId = &file_metadata_v1_constants_proto_extTypes[6] +) + +var File_metadata_v1_constants_proto protoreflect.FileDescriptor + +var file_metadata_v1_constants_proto_rawDesc = []byte{ + 0x0a, 0x1b, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x2f, 0x76, 0x31, 0x2f, 0x63, 0x6f, + 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x74, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0b, 0x6d, + 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x76, 0x31, 0x1a, 0x20, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x64, 0x65, 0x73, 0x63, + 0x72, 0x69, 0x70, 0x74, 0x6f, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3a, 0x6c, 0x0a, 0x23, + 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, + 0x5f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x5f, 0x6e, + 0x61, 0x6d, 0x65, 0x12, 0x1c, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, + 0x73, 0x18, 0xa0, 0x9c, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x1f, 0x6d, 0x65, 0x74, 0x61, 0x64, + 0x61, 0x74, 0x61, 0x43, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, + 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x4e, 0x61, 0x6d, 0x65, 0x3a, 0x72, 0x0a, 0x26, 0x6d, 0x65, + 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x5f, 0x73, + 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x5f, 0x76, 0x65, 0x72, + 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1c, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x18, 0xa1, 0x9c, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x22, 0x6d, 0x65, 0x74, 0x61, + 0x64, 0x61, 0x74, 0x61, 0x43, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x53, 0x6f, 0x75, 0x72, 0x63, + 0x65, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x3a, 0x76, + 0x0a, 0x28, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x63, 0x6f, 0x6e, 0x64, 0x75, + 0x69, 0x74, 0x5f, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x70, + 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x1c, 0x2e, 0x67, 0x6f, 0x6f, + 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x46, 0x69, 0x6c, + 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0xa2, 0x9c, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x24, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x43, 0x6f, 0x6e, 0x64, 0x75, 0x69, + 0x74, 0x44, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x6c, 0x75, 0x67, + 0x69, 0x6e, 0x4e, 0x61, 0x6d, 0x65, 0x3a, 0x7c, 0x0a, 0x2b, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, + 0x74, 0x61, 0x5f, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x5f, 0x64, 0x65, 0x73, 0x74, 0x69, + 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x5f, 0x76, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1c, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x4f, 0x70, 0x74, 0x69, + 0x6f, 0x6e, 0x73, 0x18, 0xa3, 0x9c, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x27, 0x6d, 0x65, 0x74, + 0x61, 0x64, 0x61, 0x74, 0x61, 0x43, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x44, 0x65, 0x73, 0x74, + 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x56, 0x65, 0x72, + 0x73, 0x69, 0x6f, 0x6e, 0x3a, 0x6e, 0x0a, 0x24, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, + 0x5f, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x5f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, + 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x5f, 0x69, 0x64, 0x12, 0x1c, 0x2e, 0x67, + 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x46, + 0x69, 0x6c, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0xa4, 0x9c, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x20, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x43, 0x6f, 0x6e, 0x64, + 0x75, 0x69, 0x74, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, + 0x6f, 0x72, 0x49, 0x64, 0x3a, 0x64, 0x0a, 0x1f, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, + 0x5f, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x5f, 0x64, 0x6c, 0x71, 0x5f, 0x6e, 0x61, 0x63, + 0x6b, 0x5f, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x1c, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x4f, 0x70, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0xa5, 0x9c, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x1b, 0x6d, + 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x43, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x44, 0x6c, + 0x71, 0x4e, 0x61, 0x63, 0x6b, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x3a, 0x67, 0x0a, 0x21, 0x6d, 0x65, + 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x5f, 0x64, + 0x6c, 0x71, 0x5f, 0x6e, 0x61, 0x63, 0x6b, 0x5f, 0x6e, 0x6f, 0x64, 0x65, 0x5f, 0x69, 0x64, 0x12, + 0x1c, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0xa6, 0x9c, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x1c, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x43, + 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x44, 0x6c, 0x71, 0x4e, 0x61, 0x63, 0x6b, 0x4e, 0x6f, 0x64, + 0x65, 0x49, 0x64, 0x42, 0x8e, 0x03, 0x82, 0xe2, 0x09, 0x1a, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, + 0x74, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2e, + 0x6e, 0x61, 0x6d, 0x65, 0x8a, 0xe2, 0x09, 0x1d, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x2e, + 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2e, 0x76, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x92, 0xe2, 0x09, 0x1f, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, + 0x2e, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x6c, 0x75, + 0x67, 0x69, 0x6e, 0x2e, 0x6e, 0x61, 0x6d, 0x65, 0x9a, 0xe2, 0x09, 0x22, 0x63, 0x6f, 0x6e, 0x64, + 0x75, 0x69, 0x74, 0x2e, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, + 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2e, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0xa2, 0xe2, + 0x09, 0x1b, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, + 0x2e, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x69, 0x64, 0xaa, 0xe2, 0x09, + 0x16, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x2e, 0x64, 0x6c, 0x71, 0x2e, 0x6e, 0x61, 0x63, + 0x6b, 0x2e, 0x65, 0x72, 0x72, 0x6f, 0x72, 0xb2, 0xe2, 0x09, 0x18, 0x63, 0x6f, 0x6e, 0x64, 0x75, + 0x69, 0x74, 0x2e, 0x64, 0x6c, 0x71, 0x2e, 0x6e, 0x61, 0x63, 0x6b, 0x2e, 0x6e, 0x6f, 0x64, 0x65, + 0x2e, 0x69, 0x64, 0x0a, 0x0f, 0x63, 0x6f, 0x6d, 0x2e, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, + 0x61, 0x2e, 0x76, 0x31, 0x42, 0x0e, 0x43, 0x6f, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x74, 0x73, 0x50, + 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x41, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x69, 0x6f, 0x2f, 0x63, 0x6f, 0x6e, + 0x64, 0x75, 0x69, 0x74, 0x2d, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x73, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2f, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x2f, 0x76, 0x31, 0x3b, 0x6d, + 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x76, 0x31, 0xa2, 0x02, 0x03, 0x4d, 0x58, 0x58, 0xaa, + 0x02, 0x0b, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x0b, + 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x17, 0x4d, 0x65, + 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, + 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0c, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, + 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var file_metadata_v1_constants_proto_goTypes = []interface{}{ + (*descriptorpb.FileOptions)(nil), // 0: google.protobuf.FileOptions +} +var file_metadata_v1_constants_proto_depIdxs = []int32{ + 0, // 0: metadata.v1.metadata_conduit_source_plugin_name:extendee -> google.protobuf.FileOptions + 0, // 1: metadata.v1.metadata_conduit_source_plugin_version:extendee -> google.protobuf.FileOptions + 0, // 2: metadata.v1.metadata_conduit_destination_plugin_name:extendee -> google.protobuf.FileOptions + 0, // 3: metadata.v1.metadata_conduit_destination_plugin_version:extendee -> google.protobuf.FileOptions + 0, // 4: metadata.v1.metadata_conduit_source_connector_id:extendee -> google.protobuf.FileOptions + 0, // 5: metadata.v1.metadata_conduit_dlq_nack_error:extendee -> google.protobuf.FileOptions + 0, // 6: metadata.v1.metadata_conduit_dlq_nack_node_id:extendee -> google.protobuf.FileOptions + 7, // [7:7] is the sub-list for method output_type + 7, // [7:7] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 0, // [0:7] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_metadata_v1_constants_proto_init() } +func file_metadata_v1_constants_proto_init() { + if File_metadata_v1_constants_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_metadata_v1_constants_proto_rawDesc, + NumEnums: 0, + NumMessages: 0, + NumExtensions: 7, + NumServices: 0, + }, + GoTypes: file_metadata_v1_constants_proto_goTypes, + DependencyIndexes: file_metadata_v1_constants_proto_depIdxs, + ExtensionInfos: file_metadata_v1_constants_proto_extTypes, + }.Build() + File_metadata_v1_constants_proto = out.File + file_metadata_v1_constants_proto_rawDesc = nil + file_metadata_v1_constants_proto_goTypes = nil + file_metadata_v1_constants_proto_depIdxs = nil +} diff --git a/proto/metadata/v1/constants.proto b/proto/metadata/v1/constants.proto new file mode 100644 index 0000000..e1a8196 --- /dev/null +++ b/proto/metadata/v1/constants.proto @@ -0,0 +1,45 @@ +syntax = "proto3"; + +package metadata.v1; + +import "google/protobuf/descriptor.proto"; + +option go_package = "github.com/conduitio/conduit-commons/proto/metadata/v1"; + +option (metadata_conduit_source_plugin_name) = "conduit.source.plugin.name"; +option (metadata_conduit_source_plugin_version) = "conduit.source.plugin.version"; +option (metadata_conduit_destination_plugin_name) = "conduit.destination.plugin.name"; +option (metadata_conduit_destination_plugin_version) = "conduit.destination.plugin.version"; + +option(metadata_conduit_source_connector_id) = "conduit.source.connector.id"; +option(metadata_conduit_dlq_nack_error) = "conduit.dlq.nack.error"; +option(metadata_conduit_dlq_nack_node_id) = "conduit.dlq.nack.node.id"; + +// We are (ab)using custom file options to define constants. +// See https://github.com/protocolbuffers/protobuf/issues/3520#issuecomment-323613839 +extend google.protobuf.FileOptions { + // Metadata field "conduit.source.plugin.name" contains the name of the source + // plugin that created this record. + string metadata_conduit_source_plugin_name = 20000; + // Metadata field "conduit.source.plugin.version" contains the version of the + // source plugin that created this record. + string metadata_conduit_source_plugin_version = 20001; + // Metadata field "conduit.destination.plugin.name" contains the name of the + // destination plugin that has written this record (only available in records + // once they are written by a destination). + string metadata_conduit_destination_plugin_name = 20002; + // Metadata field "conduit.destination.plugin.version" contains the version of + // the destination plugin that has written this record (only available in + // records once they are written by a destination). + string metadata_conduit_destination_plugin_version = 20003; + + // Metadata field "conduit.source.connector.id" contains the ID of the source + // connector that produced this record. + string metadata_conduit_source_connector_id = 20004; + // Metadata field "conduit.dlq.nack.error" contains the error that caused a + // record to be nacked and pushed to the dead-letter queue. + string metadata_conduit_dlq_nack_error = 20005; + // Metadata field "conduit.dlq.nack.node.id" contains the ID of the internal + // node that nacked the record. + string metadata_conduit_dlq_nack_node_id = 20006; +} diff --git a/proto/opencdc/v1/opencdc.pb.go b/proto/opencdc/v1/opencdc.pb.go index f42e370..bfd18f0 100644 --- a/proto/opencdc/v1/opencdc.pb.go +++ b/proto/opencdc/v1/opencdc.pb.go @@ -355,18 +355,49 @@ var file_opencdc_v1_opencdc_proto_extTypes = []protoimpl.ExtensionInfo{ Tag: "bytes,10002,opt,name=metadata_read_at", Filename: "opencdc/v1/opencdc.proto", }, + { + ExtendedType: (*descriptorpb.FileOptions)(nil), + ExtensionType: (*string)(nil), + Field: 10003, + Name: "opencdc.v1.metadata_collection", + Tag: "bytes,10003,opt,name=metadata_collection", + Filename: "opencdc/v1/opencdc.proto", + }, } // Extension fields to descriptorpb.FileOptions. var ( + // OpenCDC version is a constant that should be used as the value in the + // metadata field opencdc.version. It ensures the OpenCDC format version can be + // easily identified in case the record gets marshaled into a different untyped + // format (e.g. JSON). + // // optional string opencdc_version = 9999; E_OpencdcVersion = &file_opencdc_v1_opencdc_proto_extTypes[0] + // Metadata field "opencdc.version" contains the version of the OpenCDC format + // (e.g. "v1"). This field exists to ensure the OpenCDC format version can be + // easily identified in case the record gets marshaled into a different + // untyped format (e.g. JSON). + // // optional string metadata_version = 10000; E_MetadataVersion = &file_opencdc_v1_opencdc_proto_extTypes[1] + // Metadata field "opencdc.createdAt" can contain the time when the record was + // created in the 3rd party system. The expected format is a unix timestamp in + // nanoseconds. + // // optional string metadata_created_at = 10001; E_MetadataCreatedAt = &file_opencdc_v1_opencdc_proto_extTypes[2] + // Metadata field "opencdc.readAt" can contain the time when the record was + // read from the 3rd party system. The expected format is a unix timestamp in + // nanoseconds. + // // optional string metadata_read_at = 10002; E_MetadataReadAt = &file_opencdc_v1_opencdc_proto_extTypes[3] + // Metadata field "opencdc.collection" can contain the name of the collection + // where the record originated from and/or where it should be stored. + // + // optional string metadata_collection = 10003; + E_MetadataCollection = &file_opencdc_v1_opencdc_proto_extTypes[4] ) var File_opencdc_v1_opencdc_proto protoreflect.FileDescriptor @@ -436,22 +467,29 @@ var file_opencdc_v1_opencdc_proto_rawDesc = []byte{ 0x64, 0x5f, 0x61, 0x74, 0x12, 0x1c, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x92, 0x4e, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x6d, 0x65, 0x74, 0x61, 0x64, - 0x61, 0x74, 0x61, 0x52, 0x65, 0x61, 0x64, 0x41, 0x74, 0x42, 0xe8, 0x01, 0xfa, 0xf0, 0x04, 0x02, + 0x61, 0x74, 0x61, 0x52, 0x65, 0x61, 0x64, 0x41, 0x74, 0x3a, 0x4e, 0x0a, 0x13, 0x6d, 0x65, 0x74, + 0x61, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, + 0x12, 0x1c, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x93, + 0x4e, 0x20, 0x01, 0x28, 0x09, 0x52, 0x12, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x43, + 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0xfe, 0x01, 0xfa, 0xf0, 0x04, 0x02, 0x76, 0x31, 0x82, 0xf1, 0x04, 0x0f, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x2e, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x8a, 0xf1, 0x04, 0x11, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x2e, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x92, 0xf1, 0x04, 0x0e, 0x6f, 0x70, - 0x65, 0x6e, 0x63, 0x64, 0x63, 0x2e, 0x72, 0x65, 0x61, 0x64, 0x41, 0x74, 0x0a, 0x0e, 0x63, 0x6f, - 0x6d, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x2e, 0x76, 0x31, 0x42, 0x0c, 0x4f, 0x70, - 0x65, 0x6e, 0x63, 0x64, 0x63, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x3f, 0x67, 0x69, - 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, - 0x69, 0x6f, 0x2f, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x2d, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, - 0x6e, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, - 0x2f, 0x76, 0x31, 0x3b, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x76, 0x31, 0xa2, 0x02, 0x03, - 0x4f, 0x58, 0x58, 0xaa, 0x02, 0x0a, 0x4f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x2e, 0x56, 0x31, - 0xca, 0x02, 0x0a, 0x4f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x16, - 0x4f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, - 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0b, 0x4f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, - 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x6e, 0x63, 0x64, 0x63, 0x2e, 0x72, 0x65, 0x61, 0x64, 0x41, 0x74, 0x9a, 0xf1, 0x04, 0x12, + 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x2e, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, + 0x6f, 0x6e, 0x0a, 0x0e, 0x63, 0x6f, 0x6d, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x2e, + 0x76, 0x31, 0x42, 0x0c, 0x4f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x50, 0x72, 0x6f, 0x74, 0x6f, + 0x50, 0x01, 0x5a, 0x3f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, + 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, 0x69, 0x6f, 0x2f, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x69, 0x74, + 0x2d, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6f, + 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x2f, 0x76, 0x31, 0x3b, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x64, + 0x63, 0x76, 0x31, 0xa2, 0x02, 0x03, 0x4f, 0x58, 0x58, 0xaa, 0x02, 0x0a, 0x4f, 0x70, 0x65, 0x6e, + 0x63, 0x64, 0x63, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x0a, 0x4f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, + 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x16, 0x4f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x5c, 0x56, 0x31, + 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0b, 0x4f, + 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( @@ -489,10 +527,11 @@ var file_opencdc_v1_opencdc_proto_depIdxs = []int32{ 6, // 8: opencdc.v1.metadata_version:extendee -> google.protobuf.FileOptions 6, // 9: opencdc.v1.metadata_created_at:extendee -> google.protobuf.FileOptions 6, // 10: opencdc.v1.metadata_read_at:extendee -> google.protobuf.FileOptions - 11, // [11:11] is the sub-list for method output_type - 11, // [11:11] is the sub-list for method input_type - 11, // [11:11] is the sub-list for extension type_name - 7, // [7:11] is the sub-list for extension extendee + 6, // 11: opencdc.v1.metadata_collection:extendee -> google.protobuf.FileOptions + 12, // [12:12] is the sub-list for method output_type + 12, // [12:12] is the sub-list for method input_type + 12, // [12:12] is the sub-list for extension type_name + 7, // [7:12] is the sub-list for extension extendee 0, // [0:7] is the sub-list for field type_name } @@ -550,7 +589,7 @@ func file_opencdc_v1_opencdc_proto_init() { RawDescriptor: file_opencdc_v1_opencdc_proto_rawDesc, NumEnums: 1, NumMessages: 4, - NumExtensions: 4, + NumExtensions: 5, NumServices: 0, }, GoTypes: file_opencdc_v1_opencdc_proto_goTypes, diff --git a/proto/opencdc/v1/opencdc.proto b/proto/opencdc/v1/opencdc.proto index 4a1d7a3..14292f1 100644 --- a/proto/opencdc/v1/opencdc.proto +++ b/proto/opencdc/v1/opencdc.proto @@ -7,30 +7,38 @@ import "google/protobuf/struct.proto"; option go_package = "github.com/conduitio/conduit-commons/proto/opencdc/v1"; -// CreatedAt can contain the time when the record was created in the 3rd party -// system. The expected format is a unix timestamp in nanoseconds. +option (opencdc_version) = "v1"; + option (metadata_created_at) = "opencdc.createdAt"; -// ReadAt can contain the time when the record was read from the 3rd party -// system. The expected format is a unix timestamp in nanoseconds. option (metadata_read_at) = "opencdc.readAt"; -// Version contains the version of the OpenCDC format (e.g. "v1"). This field -// exists to ensure the OpenCDC format version can be easily identified in case -// the record gets marshaled into a different untyped format (e.g. JSON). option (metadata_version) = "opencdc.version"; -// OpenCDC version is a constant that should be used as the value in the -// metadata field opencdc.version. It ensures the OpenCDC format version can be -// easily identified in case the record gets marshaled into a different untyped -// format (e.g. JSON). -option (opencdc_version) = "v1"; +option (metadata_collection) = "opencdc.collection"; // We are (ab)using custom file options to define constants. // See https://github.com/protocolbuffers/protobuf/issues/3520#issuecomment-323613839 extend google.protobuf.FileOptions { + // OpenCDC version is a constant that should be used as the value in the + // metadata field opencdc.version. It ensures the OpenCDC format version can be + // easily identified in case the record gets marshaled into a different untyped + // format (e.g. JSON). string opencdc_version = 9999; + // Metadata field "opencdc.version" contains the version of the OpenCDC format + // (e.g. "v1"). This field exists to ensure the OpenCDC format version can be + // easily identified in case the record gets marshaled into a different + // untyped format (e.g. JSON). string metadata_version = 10000; + // Metadata field "opencdc.createdAt" can contain the time when the record was + // created in the 3rd party system. The expected format is a unix timestamp in + // nanoseconds. string metadata_created_at = 10001; + // Metadata field "opencdc.readAt" can contain the time when the record was + // read from the 3rd party system. The expected format is a unix timestamp in + // nanoseconds. string metadata_read_at = 10002; + // Metadata field "opencdc.collection" can contain the name of the collection + // where the record originated from and/or where it should be stored. + string metadata_collection = 10003; } // Operation defines what triggered the creation of a record.