diff --git a/plugins/inputs/cnosdb_subscription/cnosdb/generated/kv_service/kv_service.pb.go b/plugins/inputs/cnosdb_subscription/cnosdb/generated/kv_service/kv_service.pb.go index 63995840fd434..f503f2ae1f872 100644 --- a/plugins/inputs/cnosdb_subscription/cnosdb/generated/kv_service/kv_service.pb.go +++ b/plugins/inputs/cnosdb_subscription/cnosdb/generated/kv_service/kv_service.pb.go @@ -202,9 +202,13 @@ type SubscriptionRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Precision uint32 `protobuf:"varint,1,opt,name=precision,proto3" json:"precision,omitempty"` - TableSchema []byte `protobuf:"bytes,2,opt,name=table_schema,json=tableSchema,proto3" json:"table_schema,omitempty"` - RecordData []byte `protobuf:"bytes,3,opt,name=record_data,json=recordData,proto3" json:"record_data,omitempty"` + // Deprecated: Marked as deprecated in kv_service.proto. + Tenant string `protobuf:"bytes,1,opt,name=tenant,proto3" json:"tenant,omitempty"` + // Deprecated: Marked as deprecated in kv_service.proto. + Db string `protobuf:"bytes,2,opt,name=db,proto3" json:"db,omitempty"` + TableSchema []byte `protobuf:"bytes,3,opt,name=table_schema,json=tableSchema,proto3" json:"table_schema,omitempty"` + RecordData []byte `protobuf:"bytes,4,opt,name=record_data,json=recordData,proto3" json:"record_data,omitempty"` + Precision uint32 `protobuf:"varint,5,opt,name=precision,proto3" json:"precision,omitempty"` } func (x *SubscriptionRequest) Reset() { @@ -239,11 +243,20 @@ func (*SubscriptionRequest) Descriptor() ([]byte, []int) { return file_kv_service_proto_rawDescGZIP(), []int{3} } -func (x *SubscriptionRequest) GetPrecision() uint32 { +// Deprecated: Marked as deprecated in kv_service.proto. +func (x *SubscriptionRequest) GetTenant() string { if x != nil { - return x.Precision + return x.Tenant } - return 0 + return "" +} + +// Deprecated: Marked as deprecated in kv_service.proto. +func (x *SubscriptionRequest) GetDb() string { + if x != nil { + return x.Db + } + return "" } func (x *SubscriptionRequest) GetTableSchema() []byte { @@ -260,6 +273,13 @@ func (x *SubscriptionRequest) GetRecordData() []byte { return nil } +func (x *SubscriptionRequest) GetPrecision() uint32 { + if x != nil { + return x.Precision + } + return 0 +} + // CnosDB subscription v4 message. type SubscriptionResponse struct { state protoimpl.MessageState @@ -321,30 +341,33 @@ var file_kv_service_proto_rawDesc = []byte{ 0x57, 0x72, 0x69, 0x74, 0x65, 0x50, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x5f, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x70, 0x6f, 0x69, 0x6e, - 0x74, 0x73, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x22, 0x77, 0x0a, 0x13, 0x53, 0x75, 0x62, 0x73, - 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, - 0x1c, 0x0a, 0x09, 0x70, 0x72, 0x65, 0x63, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x0d, 0x52, 0x09, 0x70, 0x72, 0x65, 0x63, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x0a, - 0x0c, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, - 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x44, 0x61, 0x74, - 0x61, 0x22, 0x16, 0x0a, 0x14, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, - 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xc1, 0x01, 0x0a, 0x0b, 0x54, 0x53, - 0x4b, 0x56, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x54, 0x0a, 0x0b, 0x57, 0x72, 0x69, - 0x74, 0x65, 0x50, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x12, 0x1e, 0x2e, 0x6b, 0x76, 0x5f, 0x73, 0x65, - 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x50, 0x6f, 0x69, 0x6e, 0x74, - 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6b, 0x76, 0x5f, 0x73, 0x65, - 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x50, 0x6f, 0x69, 0x6e, 0x74, - 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, - 0x5c, 0x0a, 0x11, 0x57, 0x72, 0x69, 0x74, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, - 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x2e, 0x6b, 0x76, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, - 0x65, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x6b, 0x76, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, + 0x74, 0x73, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x22, 0xa7, 0x01, 0x0a, 0x13, 0x53, 0x75, 0x62, + 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x12, 0x1a, 0x0a, 0x06, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x42, 0x02, 0x18, 0x01, 0x52, 0x06, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x12, 0x12, 0x0a, 0x02, + 0x64, 0x62, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x42, 0x02, 0x18, 0x01, 0x52, 0x02, 0x64, 0x62, + 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x63, 0x68, + 0x65, 0x6d, 0x61, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x5f, 0x64, 0x61, + 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, + 0x44, 0x61, 0x74, 0x61, 0x12, 0x1c, 0x0a, 0x09, 0x70, 0x72, 0x65, 0x63, 0x69, 0x73, 0x69, 0x6f, + 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, 0x70, 0x72, 0x65, 0x63, 0x69, 0x73, 0x69, + 0x6f, 0x6e, 0x22, 0x16, 0x0a, 0x14, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, + 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xc1, 0x01, 0x0a, 0x0b, 0x54, + 0x53, 0x4b, 0x56, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x54, 0x0a, 0x0b, 0x57, 0x72, + 0x69, 0x74, 0x65, 0x50, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x12, 0x1e, 0x2e, 0x6b, 0x76, 0x5f, 0x73, + 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x50, 0x6f, 0x69, 0x6e, + 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6b, 0x76, 0x5f, 0x73, + 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x50, 0x6f, 0x69, 0x6e, + 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, + 0x12, 0x5c, 0x0a, 0x11, 0x57, 0x72, 0x69, 0x74, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, + 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x2e, 0x6b, 0x76, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x0e, 0x5a, - 0x0c, 0x2e, 0x3b, 0x6b, 0x76, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x6b, 0x76, 0x5f, 0x73, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x0e, + 0x5a, 0x0c, 0x2e, 0x3b, 0x6b, 0x76, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/plugins/inputs/cnosdb_subscription/cnosdb/kv_service_impl.go b/plugins/inputs/cnosdb_subscription/cnosdb/kv_service_impl.go index 72a1e9578fc79..29881e8a3cab2 100644 --- a/plugins/inputs/cnosdb_subscription/cnosdb/kv_service_impl.go +++ b/plugins/inputs/cnosdb_subscription/cnosdb/kv_service_impl.go @@ -152,6 +152,7 @@ func (s TSKVServiceServerImpl) WriteSubscription(server kv_service.TSKVService_W var tableSchema cnosdb_v4.TskvTableSchema if err := json.Unmarshal(req.TableSchema, &tableSchema); err != nil { + s.accumulator.AddError(fmt.Errorf("failed to parse TskvTableSchema: %w", err)) return server.Send(&kv_service.SubscriptionResponse{}) } diff --git a/plugins/inputs/cnosdb_subscription/cnosdb/v4/tskv_table_schema.go b/plugins/inputs/cnosdb_subscription/cnosdb/v4/tskv_table_schema.go index 32c6c51cb5c2d..43d454dff1aa7 100644 --- a/plugins/inputs/cnosdb_subscription/cnosdb/v4/tskv_table_schema.go +++ b/plugins/inputs/cnosdb_subscription/cnosdb/v4/tskv_table_schema.go @@ -1,9 +1,9 @@ package v4 -type ColumnType uint32 +type ColumnTypeCode uint32 const ( - ColumnTypeUnknown ColumnType = iota + ColumnTypeUnknown ColumnTypeCode = iota ColumnTypeTag ColumnTypeTime ColumnTypeFieldUnknown @@ -15,10 +15,10 @@ const ( ColumnTypeFieldGeometry ) -type TimeUnit uint32 +type TimeUnitCode uint32 const ( - TimeUnitUnknown TimeUnit = iota + TimeUnitUnknown TimeUnitCode = iota TimeUnitSecond TimeUnitMillisecond TimeUnitMicrosecond @@ -26,25 +26,26 @@ const ( ) type TskvTableSchema struct { - Tenant string `json:"tenant"` - Db string `json:"db"` - Name string `json:"name"` - SchemaVersion uint64 `json:"schema_version"` - NextColumnID uint32 `json:"next_column_id"` - Columns []TableColumn `json:"columns"` - ColumnsIndex map[string]uint32 `json:"columns_index"` + // Tenant string `json:"tenant"` + // Db string `json:"db"` + Name string `json:"name"` + // SchemaId uint64 `json:"schema_id"` // v2.4.0 field + // SchemaVersion uint64 `json:"schema_version"` // v2.4.1 field + // NextColumnID uint32 `json:"next_column_id"` + Columns []TableColumn `json:"columns"` + // ColumnsIndex map[string]uint32 `json:"columns_index"` } type TableColumn struct { ID uint64 `json:"id"` Name string `json:"name"` ColumnType interface{} `json:"column_type"` - Encoding interface{} `json:"encoding"` + // Encoding interface{} `json:"encoding"` } type ColumnTypeUnited struct { - ColumnType ColumnType - TimeUnit TimeUnit + ColumnType ColumnTypeCode + TimeUnit TimeUnitCode } func (c *TableColumn) GetColumnTypeUnited() ColumnTypeUnited { @@ -56,6 +57,54 @@ func (c *TableColumn) GetColumnTypeUnited() ColumnTypeUnited { ColumnType: ColumnTypeTag, TimeUnit: TimeUnitUnknown, } + } else { + // In cnosdb-v2.4.0, columnType is string + // After cnosdb-v2.4.0, columnType is string or object + columnTypeCode := ColumnTypeUnknown + timeUnitCode := TimeUnitUnknown + switch columnType { + case "TAG_STRING": + // "column_type": "TAG_STRING" + return ColumnTypeUnited{ + ColumnType: ColumnTypeTag, + TimeUnit: TimeUnitUnknown, + } + case "FIELD_STRING": + // "column_type": "FIELD_STRING" + columnTypeCode = ColumnTypeFieldString + case "FIELD_BIGINT": + // "column_type": "FIELD_BIGINT"" + columnTypeCode = ColumnTypeFieldInteger + case "FIELD_BIGINT UNSIGNED": + // "column_type": "FIELD_BIGINT UNSIGNED"" + columnTypeCode = ColumnTypeFieldUnsigned + case "FIELD_DOUBLE": + // "column_type": "FIELD_STRING" + columnTypeCode = ColumnTypeFieldFloat + case "FIELD_BOOLEAN": + // "column_type": "FIELD_BOOLEAN"" + columnTypeCode = ColumnTypeFieldBoolean + case "TIME_TIMESTAMP(SECOND)": + // "column_type": "TIME_TIMESTAMP(SECOND)"" + columnTypeCode = ColumnTypeTime + timeUnitCode = TimeUnitSecond + case "TIME_TIMESTAMP(MILLISECOND)": + // "column_type": "TIME_TIMESTAMP(MILLISECOND)"" + columnTypeCode = ColumnTypeTime + timeUnitCode = TimeUnitMillisecond + case "TIME_TIMESTAMP(MICROSECOND)": + // "column_type": "TIME_TIMESTAMP(MICROSECOND)"" + columnTypeCode = ColumnTypeTime + timeUnitCode = TimeUnitMicrosecond + case "TIME_TIMESTAMP(NANOSECOND)": + // "column_type": "TIME_TIMESTAMP(NANOSECOND)"" + columnTypeCode = ColumnTypeTime + timeUnitCode = TimeUnitNanosecond + } + return ColumnTypeUnited{ + ColumnType: columnTypeCode, + TimeUnit: timeUnitCode, + } } case map[string]interface{}: if timeUnitObj := columnType["Time"]; timeUnitObj != nil { diff --git a/plugins/inputs/cnosdb_subscription/protocol/kv_service.proto b/plugins/inputs/cnosdb_subscription/protocol/kv_service.proto index 4f2dc3213cf6a..1d1a2a31a332d 100644 --- a/plugins/inputs/cnosdb_subscription/protocol/kv_service.proto +++ b/plugins/inputs/cnosdb_subscription/protocol/kv_service.proto @@ -24,9 +24,11 @@ message WritePointsResponse { // CnosDB subscription v4 message. message SubscriptionRequest { - uint32 precision = 1; - bytes table_schema = 2; - bytes record_data = 3; + string tenant = 1; + string db = 2; + bytes table_schema = 3; + bytes record_data = 4; + uint32 precision = 5; } // CnosDB subscription v4 message.