From 2526d4bcaab77340dc9f07077105fa59c22989c8 Mon Sep 17 00:00:00 2001 From: zipper-meng Date: Fri, 2 Aug 2024 13:44:46 +0800 Subject: [PATCH] feat(output): cnosdb_subscription plugin add cnosdbv2.3 support --- plugins/inputs/cnosdb_subscription/README.md | 30 +- .../generated/kv_service/kv_service.pb.go | 469 ++++++++++ .../kv_service}/kv_service_grpc.pb.go | 44 +- .../models/v3/models.v3_generated.go | 846 ++++++++++++++++++ .../models/v4/models.v4_generated.go} | 2 +- .../cnosdb/kv_service_impl.go | 290 ++++++ .../cnosdb_subscription/cnosdb/v3/bitset.go | 15 + .../cnosdb/v3/bitset_test.go | 50 ++ .../cnosdb/{ => v4}/tskv_table_schema.go | 8 +- .../cnosdb/v4/tskv_table_schema_test.go | 64 ++ .../{cnosdb.go => cnosdb_sbuscription.go} | 6 +- .../inputs/cnosdb_subscription/generate.go | 5 + .../cnosdb_subscription/kv_service_impl.go | 187 ---- .../kv_service_impl_test.go | 66 -- .../protocol/kv_service.proto | 41 + .../protocol/models.v3.fbs | 49 + .../models.fbs => protocol/models.v4.fbs} | 2 +- .../protos/kv_service.pb.go | 225 ----- .../protos/protobuf/kv_service.proto | 16 - 19 files changed, 1899 insertions(+), 516 deletions(-) create mode 100644 plugins/inputs/cnosdb_subscription/cnosdb/generated/kv_service/kv_service.pb.go rename plugins/inputs/cnosdb_subscription/{protos => cnosdb/generated/kv_service}/kv_service_grpc.pb.go (68%) create mode 100644 plugins/inputs/cnosdb_subscription/cnosdb/generated/models/v3/models.v3_generated.go rename plugins/inputs/cnosdb_subscription/{protos/models_generated.go => cnosdb/generated/models/v4/models.v4_generated.go} (99%) create mode 100644 plugins/inputs/cnosdb_subscription/cnosdb/kv_service_impl.go create mode 100644 plugins/inputs/cnosdb_subscription/cnosdb/v3/bitset.go create mode 100644 plugins/inputs/cnosdb_subscription/cnosdb/v3/bitset_test.go rename plugins/inputs/cnosdb_subscription/cnosdb/{ => v4}/tskv_table_schema.go (96%) create mode 100644 plugins/inputs/cnosdb_subscription/cnosdb/v4/tskv_table_schema_test.go rename plugins/inputs/cnosdb_subscription/{cnosdb.go => cnosdb_sbuscription.go} (88%) create mode 100644 plugins/inputs/cnosdb_subscription/generate.go delete mode 100644 plugins/inputs/cnosdb_subscription/kv_service_impl.go delete mode 100644 plugins/inputs/cnosdb_subscription/kv_service_impl_test.go create mode 100644 plugins/inputs/cnosdb_subscription/protocol/kv_service.proto create mode 100644 plugins/inputs/cnosdb_subscription/protocol/models.v3.fbs rename plugins/inputs/cnosdb_subscription/{protos/flatbuffers/models.fbs => protocol/models.v4.fbs} (97%) delete mode 100644 plugins/inputs/cnosdb_subscription/protos/kv_service.pb.go delete mode 100644 plugins/inputs/cnosdb_subscription/protos/protobuf/kv_service.proto diff --git a/plugins/inputs/cnosdb_subscription/README.md b/plugins/inputs/cnosdb_subscription/README.md index 34ea32250e731..1618a0c6d7723 100644 --- a/plugins/inputs/cnosdb_subscription/README.md +++ b/plugins/inputs/cnosdb_subscription/README.md @@ -1,21 +1,31 @@ # CnosDB Subscription Input Plugin -## Build +## Usages -To compile this plugin it requires protoc-gen-go and protoc-gen-go-grpc +To listen on port 8803: -```shell -# install protoc-gen-go -go install google.golang.org/protobuf/cmd/protoc-gen-go@latest -# install protoc-gen-go-grpc -go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest +```toml +[[inputs.cnosdb_subscription]] +service_address = ":8803" ``` -## Usages - -To listen on port 8803: +If you want to get response from output plugins immediately, add config `high_priority_io = true`: ```toml [[inputs.cnosdb_subscription]] service_address = ":8803" +high_priority_io = true +``` + +## Develop + +If you want to edit protocol files (in directory `protocol/`), +you need **protoc-gen-go** and **protoc-gen-go-grpc** to re-compile them. +See [generate.go](./generate.go). + +```shell +# install protoc-gen-go +go install google.golang.org/protobuf/cmd/protoc-gen-go@latest +# install protoc-gen-go-grpc +go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest ``` diff --git a/plugins/inputs/cnosdb_subscription/cnosdb/generated/kv_service/kv_service.pb.go b/plugins/inputs/cnosdb_subscription/cnosdb/generated/kv_service/kv_service.pb.go new file mode 100644 index 0000000000000..63995840fd434 --- /dev/null +++ b/plugins/inputs/cnosdb_subscription/cnosdb/generated/kv_service/kv_service.pb.go @@ -0,0 +1,469 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.34.2 +// protoc v5.27.1 +// source: kv_service.proto + +package kv_service + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// CnosDB subscription v3 message. +type Meta struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Tenant string `protobuf:"bytes,1,opt,name=tenant,proto3" json:"tenant,omitempty"` + User *string `protobuf:"bytes,2,opt,name=user,proto3,oneof" json:"user,omitempty"` + Password *string `protobuf:"bytes,3,opt,name=password,proto3,oneof" json:"password,omitempty"` +} + +func (x *Meta) Reset() { + *x = Meta{} + if protoimpl.UnsafeEnabled { + mi := &file_kv_service_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Meta) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Meta) ProtoMessage() {} + +func (x *Meta) ProtoReflect() protoreflect.Message { + mi := &file_kv_service_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Meta.ProtoReflect.Descriptor instead. +func (*Meta) Descriptor() ([]byte, []int) { + return file_kv_service_proto_rawDescGZIP(), []int{0} +} + +func (x *Meta) GetTenant() string { + if x != nil { + return x.Tenant + } + return "" +} + +func (x *Meta) GetUser() string { + if x != nil && x.User != nil { + return *x.User + } + return "" +} + +func (x *Meta) GetPassword() string { + if x != nil && x.Password != nil { + return *x.Password + } + return "" +} + +// CnosDB subscription v3 message. +type WritePointsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Version uint64 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"` + Meta *Meta `protobuf:"bytes,2,opt,name=meta,proto3" json:"meta,omitempty"` + Points []byte `protobuf:"bytes,3,opt,name=points,proto3" json:"points,omitempty"` +} + +func (x *WritePointsRequest) Reset() { + *x = WritePointsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_kv_service_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *WritePointsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WritePointsRequest) ProtoMessage() {} + +func (x *WritePointsRequest) ProtoReflect() protoreflect.Message { + mi := &file_kv_service_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WritePointsRequest.ProtoReflect.Descriptor instead. +func (*WritePointsRequest) Descriptor() ([]byte, []int) { + return file_kv_service_proto_rawDescGZIP(), []int{1} +} + +func (x *WritePointsRequest) GetVersion() uint64 { + if x != nil { + return x.Version + } + return 0 +} + +func (x *WritePointsRequest) GetMeta() *Meta { + if x != nil { + return x.Meta + } + return nil +} + +func (x *WritePointsRequest) GetPoints() []byte { + if x != nil { + return x.Points + } + return nil +} + +// CnosDB subscription v3 message. +type WritePointsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + PointsNumber uint64 `protobuf:"varint,1,opt,name=points_number,json=pointsNumber,proto3" json:"points_number,omitempty"` +} + +func (x *WritePointsResponse) Reset() { + *x = WritePointsResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_kv_service_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *WritePointsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WritePointsResponse) ProtoMessage() {} + +func (x *WritePointsResponse) ProtoReflect() protoreflect.Message { + mi := &file_kv_service_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WritePointsResponse.ProtoReflect.Descriptor instead. +func (*WritePointsResponse) Descriptor() ([]byte, []int) { + return file_kv_service_proto_rawDescGZIP(), []int{2} +} + +func (x *WritePointsResponse) GetPointsNumber() uint64 { + if x != nil { + return x.PointsNumber + } + return 0 +} + +// CnosDB subscription v4 message. +type SubscriptionRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Precision uint32 `protobuf:"varint,1,opt,name=precision,proto3" json:"precision,omitempty"` + TableSchema []byte `protobuf:"bytes,2,opt,name=table_schema,json=tableSchema,proto3" json:"table_schema,omitempty"` + RecordData []byte `protobuf:"bytes,3,opt,name=record_data,json=recordData,proto3" json:"record_data,omitempty"` +} + +func (x *SubscriptionRequest) Reset() { + *x = SubscriptionRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_kv_service_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscriptionRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscriptionRequest) ProtoMessage() {} + +func (x *SubscriptionRequest) ProtoReflect() protoreflect.Message { + mi := &file_kv_service_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscriptionRequest.ProtoReflect.Descriptor instead. +func (*SubscriptionRequest) Descriptor() ([]byte, []int) { + return file_kv_service_proto_rawDescGZIP(), []int{3} +} + +func (x *SubscriptionRequest) GetPrecision() uint32 { + if x != nil { + return x.Precision + } + return 0 +} + +func (x *SubscriptionRequest) GetTableSchema() []byte { + if x != nil { + return x.TableSchema + } + return nil +} + +func (x *SubscriptionRequest) GetRecordData() []byte { + if x != nil { + return x.RecordData + } + return nil +} + +// CnosDB subscription v4 message. +type SubscriptionResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *SubscriptionResponse) Reset() { + *x = SubscriptionResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_kv_service_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscriptionResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscriptionResponse) ProtoMessage() {} + +func (x *SubscriptionResponse) ProtoReflect() protoreflect.Message { + mi := &file_kv_service_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscriptionResponse.ProtoReflect.Descriptor instead. +func (*SubscriptionResponse) Descriptor() ([]byte, []int) { + return file_kv_service_proto_rawDescGZIP(), []int{4} +} + +var File_kv_service_proto protoreflect.FileDescriptor + +var file_kv_service_proto_rawDesc = []byte{ + 0x0a, 0x10, 0x6b, 0x76, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x12, 0x0a, 0x6b, 0x76, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x22, 0x6e, + 0x0a, 0x04, 0x4d, 0x65, 0x74, 0x61, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x12, 0x17, + 0x0a, 0x04, 0x75, 0x73, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x04, + 0x75, 0x73, 0x65, 0x72, 0x88, 0x01, 0x01, 0x12, 0x1f, 0x0a, 0x08, 0x70, 0x61, 0x73, 0x73, 0x77, + 0x6f, 0x72, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x48, 0x01, 0x52, 0x08, 0x70, 0x61, 0x73, + 0x73, 0x77, 0x6f, 0x72, 0x64, 0x88, 0x01, 0x01, 0x42, 0x07, 0x0a, 0x05, 0x5f, 0x75, 0x73, 0x65, + 0x72, 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x22, 0x6c, + 0x0a, 0x12, 0x57, 0x72, 0x69, 0x74, 0x65, 0x50, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x24, + 0x0a, 0x04, 0x6d, 0x65, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x6b, + 0x76, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x52, 0x04, + 0x6d, 0x65, 0x74, 0x61, 0x12, 0x16, 0x0a, 0x06, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x22, 0x3a, 0x0a, 0x13, + 0x57, 0x72, 0x69, 0x74, 0x65, 0x50, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x5f, 0x6e, 0x75, + 0x6d, 0x62, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x70, 0x6f, 0x69, 0x6e, + 0x74, 0x73, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x22, 0x77, 0x0a, 0x13, 0x53, 0x75, 0x62, 0x73, + 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x1c, 0x0a, 0x09, 0x70, 0x72, 0x65, 0x63, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0d, 0x52, 0x09, 0x70, 0x72, 0x65, 0x63, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x0a, + 0x0c, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, + 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x44, 0x61, 0x74, + 0x61, 0x22, 0x16, 0x0a, 0x14, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, + 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xc1, 0x01, 0x0a, 0x0b, 0x54, 0x53, + 0x4b, 0x56, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x54, 0x0a, 0x0b, 0x57, 0x72, 0x69, + 0x74, 0x65, 0x50, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x12, 0x1e, 0x2e, 0x6b, 0x76, 0x5f, 0x73, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x50, 0x6f, 0x69, 0x6e, 0x74, + 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6b, 0x76, 0x5f, 0x73, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x50, 0x6f, 0x69, 0x6e, 0x74, + 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, + 0x5c, 0x0a, 0x11, 0x57, 0x72, 0x69, 0x74, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, + 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x2e, 0x6b, 0x76, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x65, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x6b, 0x76, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, + 0x63, 0x65, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x0e, 0x5a, + 0x0c, 0x2e, 0x3b, 0x6b, 0x76, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_kv_service_proto_rawDescOnce sync.Once + file_kv_service_proto_rawDescData = file_kv_service_proto_rawDesc +) + +func file_kv_service_proto_rawDescGZIP() []byte { + file_kv_service_proto_rawDescOnce.Do(func() { + file_kv_service_proto_rawDescData = protoimpl.X.CompressGZIP(file_kv_service_proto_rawDescData) + }) + return file_kv_service_proto_rawDescData +} + +var file_kv_service_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_kv_service_proto_goTypes = []any{ + (*Meta)(nil), // 0: kv_service.Meta + (*WritePointsRequest)(nil), // 1: kv_service.WritePointsRequest + (*WritePointsResponse)(nil), // 2: kv_service.WritePointsResponse + (*SubscriptionRequest)(nil), // 3: kv_service.SubscriptionRequest + (*SubscriptionResponse)(nil), // 4: kv_service.SubscriptionResponse +} +var file_kv_service_proto_depIdxs = []int32{ + 0, // 0: kv_service.WritePointsRequest.meta:type_name -> kv_service.Meta + 1, // 1: kv_service.TSKVService.WritePoints:input_type -> kv_service.WritePointsRequest + 3, // 2: kv_service.TSKVService.WriteSubscription:input_type -> kv_service.SubscriptionRequest + 2, // 3: kv_service.TSKVService.WritePoints:output_type -> kv_service.WritePointsResponse + 4, // 4: kv_service.TSKVService.WriteSubscription:output_type -> kv_service.SubscriptionResponse + 3, // [3:5] is the sub-list for method output_type + 1, // [1:3] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_kv_service_proto_init() } +func file_kv_service_proto_init() { + if File_kv_service_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_kv_service_proto_msgTypes[0].Exporter = func(v any, i int) any { + switch v := v.(*Meta); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_kv_service_proto_msgTypes[1].Exporter = func(v any, i int) any { + switch v := v.(*WritePointsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_kv_service_proto_msgTypes[2].Exporter = func(v any, i int) any { + switch v := v.(*WritePointsResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_kv_service_proto_msgTypes[3].Exporter = func(v any, i int) any { + switch v := v.(*SubscriptionRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_kv_service_proto_msgTypes[4].Exporter = func(v any, i int) any { + switch v := v.(*SubscriptionResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_kv_service_proto_msgTypes[0].OneofWrappers = []any{} + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_kv_service_proto_rawDesc, + NumEnums: 0, + NumMessages: 5, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_kv_service_proto_goTypes, + DependencyIndexes: file_kv_service_proto_depIdxs, + MessageInfos: file_kv_service_proto_msgTypes, + }.Build() + File_kv_service_proto = out.File + file_kv_service_proto_rawDesc = nil + file_kv_service_proto_goTypes = nil + file_kv_service_proto_depIdxs = nil +} diff --git a/plugins/inputs/cnosdb_subscription/protos/kv_service_grpc.pb.go b/plugins/inputs/cnosdb_subscription/cnosdb/generated/kv_service/kv_service_grpc.pb.go similarity index 68% rename from plugins/inputs/cnosdb_subscription/protos/kv_service_grpc.pb.go rename to plugins/inputs/cnosdb_subscription/cnosdb/generated/kv_service/kv_service_grpc.pb.go index a82d3295c2838..062119d311074 100644 --- a/plugins/inputs/cnosdb_subscription/protos/kv_service_grpc.pb.go +++ b/plugins/inputs/cnosdb_subscription/cnosdb/generated/kv_service/kv_service_grpc.pb.go @@ -2,9 +2,9 @@ // versions: // - protoc-gen-go-grpc v1.5.1 // - protoc v5.27.1 -// source: protos/protobuf/kv_service.proto +// source: kv_service.proto -package protos +package kv_service import ( context "context" @@ -19,6 +19,7 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( + TSKVService_WritePoints_FullMethodName = "/kv_service.TSKVService/WritePoints" TSKVService_WriteSubscription_FullMethodName = "/kv_service.TSKVService/WriteSubscription" ) @@ -26,6 +27,9 @@ const ( // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type TSKVServiceClient interface { + // CnosDB subscription v3 API. + WritePoints(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[WritePointsRequest, WritePointsResponse], error) + // CnosDB subscription v4 API. WriteSubscription(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[SubscriptionRequest, SubscriptionResponse], error) } @@ -37,9 +41,22 @@ func NewTSKVServiceClient(cc grpc.ClientConnInterface) TSKVServiceClient { return &tSKVServiceClient{cc} } +func (c *tSKVServiceClient) WritePoints(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[WritePointsRequest, WritePointsResponse], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &TSKVService_ServiceDesc.Streams[0], TSKVService_WritePoints_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[WritePointsRequest, WritePointsResponse]{ClientStream: stream} + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type TSKVService_WritePointsClient = grpc.BidiStreamingClient[WritePointsRequest, WritePointsResponse] + func (c *tSKVServiceClient) WriteSubscription(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[SubscriptionRequest, SubscriptionResponse], error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - stream, err := c.cc.NewStream(ctx, &TSKVService_ServiceDesc.Streams[0], TSKVService_WriteSubscription_FullMethodName, cOpts...) + stream, err := c.cc.NewStream(ctx, &TSKVService_ServiceDesc.Streams[1], TSKVService_WriteSubscription_FullMethodName, cOpts...) if err != nil { return nil, err } @@ -54,6 +71,9 @@ type TSKVService_WriteSubscriptionClient = grpc.BidiStreamingClient[Subscription // All implementations must embed UnimplementedTSKVServiceServer // for forward compatibility. type TSKVServiceServer interface { + // CnosDB subscription v3 API. + WritePoints(grpc.BidiStreamingServer[WritePointsRequest, WritePointsResponse]) error + // CnosDB subscription v4 API. WriteSubscription(grpc.BidiStreamingServer[SubscriptionRequest, SubscriptionResponse]) error mustEmbedUnimplementedTSKVServiceServer() } @@ -65,6 +85,9 @@ type TSKVServiceServer interface { // pointer dereference when methods are called. type UnimplementedTSKVServiceServer struct{} +func (UnimplementedTSKVServiceServer) WritePoints(grpc.BidiStreamingServer[WritePointsRequest, WritePointsResponse]) error { + return status.Errorf(codes.Unimplemented, "method WritePoints not implemented") +} func (UnimplementedTSKVServiceServer) WriteSubscription(grpc.BidiStreamingServer[SubscriptionRequest, SubscriptionResponse]) error { return status.Errorf(codes.Unimplemented, "method WriteSubscription not implemented") } @@ -89,6 +112,13 @@ func RegisterTSKVServiceServer(s grpc.ServiceRegistrar, srv TSKVServiceServer) { s.RegisterService(&TSKVService_ServiceDesc, srv) } +func _TSKVService_WritePoints_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(TSKVServiceServer).WritePoints(&grpc.GenericServerStream[WritePointsRequest, WritePointsResponse]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type TSKVService_WritePointsServer = grpc.BidiStreamingServer[WritePointsRequest, WritePointsResponse] + func _TSKVService_WriteSubscription_Handler(srv interface{}, stream grpc.ServerStream) error { return srv.(TSKVServiceServer).WriteSubscription(&grpc.GenericServerStream[SubscriptionRequest, SubscriptionResponse]{ServerStream: stream}) } @@ -104,6 +134,12 @@ var TSKVService_ServiceDesc = grpc.ServiceDesc{ HandlerType: (*TSKVServiceServer)(nil), Methods: []grpc.MethodDesc{}, Streams: []grpc.StreamDesc{ + { + StreamName: "WritePoints", + Handler: _TSKVService_WritePoints_Handler, + ServerStreams: true, + ClientStreams: true, + }, { StreamName: "WriteSubscription", Handler: _TSKVService_WriteSubscription_Handler, @@ -111,5 +147,5 @@ var TSKVService_ServiceDesc = grpc.ServiceDesc{ ClientStreams: true, }, }, - Metadata: "protos/protobuf/kv_service.proto", + Metadata: "kv_service.proto", } diff --git a/plugins/inputs/cnosdb_subscription/cnosdb/generated/models/v3/models.v3_generated.go b/plugins/inputs/cnosdb_subscription/cnosdb/generated/models/v3/models.v3_generated.go new file mode 100644 index 0000000000000..3d289f105c7f4 --- /dev/null +++ b/plugins/inputs/cnosdb_subscription/cnosdb/generated/models/v3/models.v3_generated.go @@ -0,0 +1,846 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package v3 + +import ( + flatbuffers "github.com/google/flatbuffers/go" + "strconv" +) + +type FieldType int32 + +const ( + FieldTypeUnknown FieldType = -1 + FieldTypeFloat FieldType = 0 + FieldTypeInteger FieldType = 1 + FieldTypeUnsigned FieldType = 2 + FieldTypeBoolean FieldType = 3 + FieldTypeString FieldType = 4 +) + +var EnumNamesFieldType = map[FieldType]string{ + FieldTypeUnknown: "Unknown", + FieldTypeFloat: "Float", + FieldTypeInteger: "Integer", + FieldTypeUnsigned: "Unsigned", + FieldTypeBoolean: "Boolean", + FieldTypeString: "String", +} + +var EnumValuesFieldType = map[string]FieldType{ + "Unknown": FieldTypeUnknown, + "Float": FieldTypeFloat, + "Integer": FieldTypeInteger, + "Unsigned": FieldTypeUnsigned, + "Boolean": FieldTypeBoolean, + "String": FieldTypeString, +} + +func (v FieldType) String() string { + if s, ok := EnumNamesFieldType[v]; ok { + return s + } + return "FieldType(" + strconv.FormatInt(int64(v), 10) + ")" +} + +type PingBody struct { + _tab flatbuffers.Table +} + +func GetRootAsPingBody(buf []byte, offset flatbuffers.UOffsetT) *PingBody { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &PingBody{} + x.Init(buf, n+offset) + return x +} + +func FinishPingBodyBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.Finish(offset) +} + +func GetSizePrefixedRootAsPingBody(buf []byte, offset flatbuffers.UOffsetT) *PingBody { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &PingBody{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func FinishSizePrefixedPingBodyBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.FinishSizePrefixed(offset) +} + +func (rcv *PingBody) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *PingBody) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *PingBody) Payload(j int) byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1)) + } + return 0 +} + +func (rcv *PingBody) PayloadLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *PingBody) PayloadBytes() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *PingBody) MutatePayload(j int, n byte) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n) + } + return false +} + +func PingBodyStart(builder *flatbuffers.Builder) { + builder.StartObject(1) +} +func PingBodyAddPayload(builder *flatbuffers.Builder, payload flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(payload), 0) +} +func PingBodyStartPayloadVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(1, numElems, 1) +} +func PingBodyEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} + +type Tag struct { + _tab flatbuffers.Table +} + +func GetRootAsTag(buf []byte, offset flatbuffers.UOffsetT) *Tag { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Tag{} + x.Init(buf, n+offset) + return x +} + +func FinishTagBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.Finish(offset) +} + +func GetSizePrefixedRootAsTag(buf []byte, offset flatbuffers.UOffsetT) *Tag { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &Tag{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func FinishSizePrefixedTagBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.FinishSizePrefixed(offset) +} + +func (rcv *Tag) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Tag) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Tag) Value(j int) byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1)) + } + return 0 +} + +func (rcv *Tag) ValueLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Tag) ValueBytes() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Tag) MutateValue(j int, n byte) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n) + } + return false +} + +func TagStart(builder *flatbuffers.Builder) { + builder.StartObject(1) +} +func TagAddValue(builder *flatbuffers.Builder, value flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(value), 0) +} +func TagStartValueVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(1, numElems, 1) +} +func TagEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} + +type Field struct { + _tab flatbuffers.Table +} + +func GetRootAsField(buf []byte, offset flatbuffers.UOffsetT) *Field { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Field{} + x.Init(buf, n+offset) + return x +} + +func FinishFieldBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.Finish(offset) +} + +func GetSizePrefixedRootAsField(buf []byte, offset flatbuffers.UOffsetT) *Field { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &Field{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func FinishSizePrefixedFieldBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.FinishSizePrefixed(offset) +} + +func (rcv *Field) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Field) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Field) Value(j int) byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1)) + } + return 0 +} + +func (rcv *Field) ValueLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Field) ValueBytes() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Field) MutateValue(j int, n byte) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n) + } + return false +} + +func FieldStart(builder *flatbuffers.Builder) { + builder.StartObject(1) +} +func FieldAddValue(builder *flatbuffers.Builder, value flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(value), 0) +} +func FieldStartValueVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(1, numElems, 1) +} +func FieldEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} + +type Schema struct { + _tab flatbuffers.Table +} + +func GetRootAsSchema(buf []byte, offset flatbuffers.UOffsetT) *Schema { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Schema{} + x.Init(buf, n+offset) + return x +} + +func FinishSchemaBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.Finish(offset) +} + +func GetSizePrefixedRootAsSchema(buf []byte, offset flatbuffers.UOffsetT) *Schema { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &Schema{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func FinishSizePrefixedSchemaBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.FinishSizePrefixed(offset) +} + +func (rcv *Schema) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Schema) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Schema) TagName(j int) []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.ByteVector(a + flatbuffers.UOffsetT(j*4)) + } + return nil +} + +func (rcv *Schema) TagNameLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Schema) FieldName(j int) []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.ByteVector(a + flatbuffers.UOffsetT(j*4)) + } + return nil +} + +func (rcv *Schema) FieldNameLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Schema) FieldType(j int) FieldType { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + a := rcv._tab.Vector(o) + return FieldType(rcv._tab.GetInt32(a + flatbuffers.UOffsetT(j*4))) + } + return 0 +} + +func (rcv *Schema) FieldTypeLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Schema) MutateFieldType(j int, n FieldType) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.MutateInt32(a+flatbuffers.UOffsetT(j*4), int32(n)) + } + return false +} + +func SchemaStart(builder *flatbuffers.Builder) { + builder.StartObject(3) +} +func SchemaAddTagName(builder *flatbuffers.Builder, tagName flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(tagName), 0) +} +func SchemaStartTagNameVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func SchemaAddFieldName(builder *flatbuffers.Builder, fieldName flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(fieldName), 0) +} +func SchemaStartFieldNameVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func SchemaAddFieldType(builder *flatbuffers.Builder, fieldType flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(fieldType), 0) +} +func SchemaStartFieldTypeVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func SchemaEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} + +type Point struct { + _tab flatbuffers.Table +} + +func GetRootAsPoint(buf []byte, offset flatbuffers.UOffsetT) *Point { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Point{} + x.Init(buf, n+offset) + return x +} + +func FinishPointBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.Finish(offset) +} + +func GetSizePrefixedRootAsPoint(buf []byte, offset flatbuffers.UOffsetT) *Point { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &Point{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func FinishSizePrefixedPointBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.FinishSizePrefixed(offset) +} + +func (rcv *Point) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Point) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Point) Timestamp() int64 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.GetInt64(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *Point) MutateTimestamp(n int64) bool { + return rcv._tab.MutateInt64Slot(4, n) +} + +func (rcv *Point) Tags(obj *Tag, j int) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + x := rcv._tab.Vector(o) + x += flatbuffers.UOffsetT(j) * 4 + x = rcv._tab.Indirect(x) + obj.Init(rcv._tab.Bytes, x) + return true + } + return false +} + +func (rcv *Point) TagsLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Point) TagsNullbit(j int) byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1)) + } + return 0 +} + +func (rcv *Point) TagsNullbitLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Point) TagsNullbitBytes() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Point) MutateTagsNullbit(j int, n byte) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n) + } + return false +} + +func (rcv *Point) Fields(obj *Field, j int) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + x := rcv._tab.Vector(o) + x += flatbuffers.UOffsetT(j) * 4 + x = rcv._tab.Indirect(x) + obj.Init(rcv._tab.Bytes, x) + return true + } + return false +} + +func (rcv *Point) FieldsLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Point) FieldsNullbit(j int) byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1)) + } + return 0 +} + +func (rcv *Point) FieldsNullbitLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Point) FieldsNullbitBytes() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Point) MutateFieldsNullbit(j int, n byte) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n) + } + return false +} + +func PointStart(builder *flatbuffers.Builder) { + builder.StartObject(5) +} +func PointAddTimestamp(builder *flatbuffers.Builder, timestamp int64) { + builder.PrependInt64Slot(0, timestamp, 0) +} +func PointAddTags(builder *flatbuffers.Builder, tags flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(tags), 0) +} +func PointStartTagsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func PointAddTagsNullbit(builder *flatbuffers.Builder, tagsNullbit flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(tagsNullbit), 0) +} +func PointStartTagsNullbitVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(1, numElems, 1) +} +func PointAddFields(builder *flatbuffers.Builder, fields flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(fields), 0) +} +func PointStartFieldsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func PointAddFieldsNullbit(builder *flatbuffers.Builder, fieldsNullbit flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(4, flatbuffers.UOffsetT(fieldsNullbit), 0) +} +func PointStartFieldsNullbitVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(1, numElems, 1) +} +func PointEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} + +type Table struct { + _tab flatbuffers.Table +} + +func GetRootAsTable(buf []byte, offset flatbuffers.UOffsetT) *Table { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Table{} + x.Init(buf, n+offset) + return x +} + +func FinishTableBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.Finish(offset) +} + +func GetSizePrefixedRootAsTable(buf []byte, offset flatbuffers.UOffsetT) *Table { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &Table{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func FinishSizePrefixedTableBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.FinishSizePrefixed(offset) +} + +func (rcv *Table) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Table) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Table) Tab(j int) byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1)) + } + return 0 +} + +func (rcv *Table) TabLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Table) TabBytes() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Table) MutateTab(j int, n byte) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n) + } + return false +} + +func (rcv *Table) Schema(obj *Schema) *Schema { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + x := rcv._tab.Indirect(o + rcv._tab.Pos) + if obj == nil { + obj = new(Schema) + } + obj.Init(rcv._tab.Bytes, x) + return obj + } + return nil +} + +func (rcv *Table) Points(obj *Point, j int) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + x := rcv._tab.Vector(o) + x += flatbuffers.UOffsetT(j) * 4 + x = rcv._tab.Indirect(x) + obj.Init(rcv._tab.Bytes, x) + return true + } + return false +} + +func (rcv *Table) PointsLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Table) NumRows() uint64 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + return rcv._tab.GetUint64(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *Table) MutateNumRows(n uint64) bool { + return rcv._tab.MutateUint64Slot(10, n) +} + +func TableStart(builder *flatbuffers.Builder) { + builder.StartObject(4) +} +func TableAddTab(builder *flatbuffers.Builder, tab flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(tab), 0) +} +func TableStartTabVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(1, numElems, 1) +} +func TableAddSchema(builder *flatbuffers.Builder, schema flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(schema), 0) +} +func TableAddPoints(builder *flatbuffers.Builder, points flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(points), 0) +} +func TableStartPointsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func TableAddNumRows(builder *flatbuffers.Builder, numRows uint64) { + builder.PrependUint64Slot(3, numRows, 0) +} +func TableEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} + +type Points struct { + _tab flatbuffers.Table +} + +func GetRootAsPoints(buf []byte, offset flatbuffers.UOffsetT) *Points { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Points{} + x.Init(buf, n+offset) + return x +} + +func FinishPointsBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.Finish(offset) +} + +func GetSizePrefixedRootAsPoints(buf []byte, offset flatbuffers.UOffsetT) *Points { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &Points{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func FinishSizePrefixedPointsBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.FinishSizePrefixed(offset) +} + +func (rcv *Points) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Points) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Points) Db(j int) byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1)) + } + return 0 +} + +func (rcv *Points) DbLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Points) DbBytes() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Points) MutateDb(j int, n byte) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n) + } + return false +} + +func (rcv *Points) Tables(obj *Table, j int) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + x := rcv._tab.Vector(o) + x += flatbuffers.UOffsetT(j) * 4 + x = rcv._tab.Indirect(x) + obj.Init(rcv._tab.Bytes, x) + return true + } + return false +} + +func (rcv *Points) TablesLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func PointsStart(builder *flatbuffers.Builder) { + builder.StartObject(2) +} +func PointsAddDb(builder *flatbuffers.Builder, db flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(db), 0) +} +func PointsStartDbVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(1, numElems, 1) +} +func PointsAddTables(builder *flatbuffers.Builder, tables flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(tables), 0) +} +func PointsStartTablesVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func PointsEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/plugins/inputs/cnosdb_subscription/protos/models_generated.go b/plugins/inputs/cnosdb_subscription/cnosdb/generated/models/v4/models.v4_generated.go similarity index 99% rename from plugins/inputs/cnosdb_subscription/protos/models_generated.go rename to plugins/inputs/cnosdb_subscription/cnosdb/generated/models/v4/models.v4_generated.go index c5d6b972ef287..1fd811d885c63 100644 --- a/plugins/inputs/cnosdb_subscription/protos/models_generated.go +++ b/plugins/inputs/cnosdb_subscription/cnosdb/generated/models/v4/models.v4_generated.go @@ -1,6 +1,6 @@ // Code generated by the FlatBuffers compiler. DO NOT EDIT. -package protos +package v4 import ( flatbuffers "github.com/google/flatbuffers/go" diff --git a/plugins/inputs/cnosdb_subscription/cnosdb/kv_service_impl.go b/plugins/inputs/cnosdb_subscription/cnosdb/kv_service_impl.go new file mode 100644 index 0000000000000..72a1e9578fc79 --- /dev/null +++ b/plugins/inputs/cnosdb_subscription/cnosdb/kv_service_impl.go @@ -0,0 +1,290 @@ +package cnosdb + +import ( + "bufio" + "bytes" + "encoding/binary" + "encoding/json" + "fmt" + flatbuffers "github.com/google/flatbuffers/go" + "google.golang.org/grpc" + "io" + "math" + "time" + + "github.com/apache/arrow/go/v15/arrow" + "github.com/apache/arrow/go/v15/arrow/array" + "github.com/apache/arrow/go/v15/arrow/ipc" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/inputs/cnosdb_subscription/cnosdb/generated/kv_service" + models_v3 "github.com/influxdata/telegraf/plugins/inputs/cnosdb_subscription/cnosdb/generated/models/v3" + cnosdb_v3 "github.com/influxdata/telegraf/plugins/inputs/cnosdb_subscription/cnosdb/v3" + cnosdb_v4 "github.com/influxdata/telegraf/plugins/inputs/cnosdb_subscription/cnosdb/v4" +) + +func RegisterTSKVServiceServer(s grpc.ServiceRegistrar, srv kv_service.TSKVServiceServer) { + kv_service.RegisterTSKVServiceServer(s, srv) +} + +var _ kv_service.TSKVServiceServer = (*TSKVServiceServerImpl)(nil) + +type TSKVServiceServerImpl struct { + accumulator telegraf.Accumulator + + kv_service.UnimplementedTSKVServiceServer +} + +func NewTSKVService(acc telegraf.Accumulator) kv_service.TSKVServiceServer { + return TSKVServiceServerImpl{ + accumulator: acc, + } +} + +// WritePoints receive subscription requests with flat-buffers message `Points`, +// parse them into metrics and write to accumulator. +func (s TSKVServiceServerImpl) WritePoints(server kv_service.TSKVService_WritePointsServer) error { + for { + req, err := server.Recv() + if err == io.EOF { + break + } + if err != nil { + s.accumulator.AddError(fmt.Errorf("failed to receive WritePointsRequest: %w", err)) + return server.Send(&kv_service.WritePointsResponse{ + PointsNumber: 0, + }) + } + + var points models_v3.Points + flatbuffers.GetRootAs(req.Points, 0, &points) + + var fbTable models_v3.Table + var fbSchema models_v3.Schema + var fbPoint models_v3.Point + var fbTag models_v3.Tag + var fbField models_v3.Field + for tableIndex := 0; tableIndex < points.TablesLength(); tableIndex++ { + if !points.Tables(&fbTable, tableIndex) { + continue + } + fbTable.Schema(&fbSchema) + table := string(fbTable.TabBytes()) + for pointIndex := 0; pointIndex < fbTable.PointsLength(); pointIndex++ { + if !fbTable.Points(&fbPoint, pointIndex) { + continue + } + + tags := make(map[string]string) + fields := make(map[string]interface{}) + + tagsBitSet := cnosdb_v3.BitSet{ + Buf: fbPoint.TagsNullbitBytes(), + Len: fbSchema.TagNameLength(), + } + if fbPoint.TagsLength() > 0 { + for tagIndex := 0; tagIndex < fbPoint.TagsLength(); tagIndex++ { + if !tagsBitSet.Get(tagIndex) { + continue + } + if !fbPoint.Tags(&fbTag, tagIndex) { + continue + } + tags[string(fbSchema.TagName(tagIndex))] = string(fbTag.ValueBytes()) + } + } + fieldsBitSet := cnosdb_v3.BitSet{ + Buf: fbPoint.FieldsNullbitBytes(), + Len: fbSchema.FieldNameLength(), + } + if fbPoint.FieldsLength() > 0 { + for fieldIndex := 0; fieldIndex < fbPoint.FieldsLength(); fieldIndex++ { + if !fieldsBitSet.Get(fieldIndex) { + continue + } + if !fbPoint.Fields(&fbField, fieldIndex) { + continue + } + switch fbSchema.FieldType(fieldIndex) { + case models_v3.FieldTypeInteger: + v := binary.BigEndian.Uint64(fbField.ValueBytes()) + fields[string(fbSchema.FieldName(fieldIndex))] = int64(v) + case models_v3.FieldTypeUnsigned: + v := binary.BigEndian.Uint64(fbField.ValueBytes()) + fields[string(fbSchema.FieldName(fieldIndex))] = v + case models_v3.FieldTypeFloat: + tmp := binary.BigEndian.Uint64(fbField.ValueBytes()) + v := math.Float64frombits(tmp) + fields[string(fbSchema.FieldName(fieldIndex))] = v + case models_v3.FieldTypeBoolean: + v := fbField.ValueBytes() + fields[string(fbSchema.FieldName(fieldIndex))] = v[0] == byte(1) + case models_v3.FieldTypeString: + v := string(fbField.ValueBytes()) + fields[string(fbSchema.FieldName(fieldIndex))] = v + default: + // Do nothing ? + } + } + } + s.accumulator.AddFields(table, fields, tags) + } + + } + + } + + return nil +} + +// WriteSubscription receive subscription request that has a CnosDB table schema and an Arrow RecordBatch, +// parse them into metrics and write to accumulator. +func (s TSKVServiceServerImpl) WriteSubscription(server kv_service.TSKVService_WriteSubscriptionServer) error { + for { + req, err := server.Recv() + if err == io.EOF { + break + } + if err != nil { + s.accumulator.AddError(fmt.Errorf("failed to receive SubscriptionRequest: %w", err)) + return server.Send(&kv_service.SubscriptionResponse{}) + } + + var tableSchema cnosdb_v4.TskvTableSchema + if err := json.Unmarshal(req.TableSchema, &tableSchema); err != nil { + return server.Send(&kv_service.SubscriptionResponse{}) + } + + recordReader, err := ipc.NewReader(bufio.NewReader(bytes.NewReader(req.RecordData))) + if err != nil { + s.accumulator.AddError(fmt.Errorf("failed to read record data: %w", err)) + return server.Send(&kv_service.SubscriptionResponse{}) + } + + columnIndexToType := make([]cnosdb_v4.ColumnTypeUnited, len(tableSchema.Columns)) + columnIndexToName := make([]string, len(tableSchema.Columns)) + + for _, col := range tableSchema.Columns { + colType := col.GetColumnTypeUnited() + if colType.ColumnType != cnosdb_v4.ColumnTypeUnknown && colType.ColumnType != cnosdb_v4.ColumnTypeFieldUnknown { + columnIndexToType[col.ID] = colType + } else { + s.accumulator.AddError(fmt.Errorf("column '%s': type is unknown: ", col.Name)) + return server.Send(&kv_service.SubscriptionResponse{}) + } + columnIndexToName[col.ID] = col.Name + } + for { + r, err := recordReader.Read() + if err == io.EOF { + break + } + if err != nil { + s.accumulator.AddError(fmt.Errorf("failed to read record data: %w", err)) + continue + } + + numRows := r.NumRows() + numCols := r.NumCols() + colArrays := make([]arrow.Array, len(tableSchema.Columns)) + + for j, col := range r.Columns() { + colType := columnIndexToType[j] + switch colType.ColumnType { + case cnosdb_v4.ColumnTypeTag: + colArrays[j] = array.NewStringData(col.Data()) + case cnosdb_v4.ColumnTypeTime: + if colType.TimeUnit == cnosdb_v4.TimeUnitUnknown { + colName := columnIndexToName[j] + s.accumulator.AddError(fmt.Errorf("column '%s': parsed time unit(%d) is unknown", colName, colType)) + return server.Send(&kv_service.SubscriptionResponse{}) + } + colArrays[j] = array.NewTime64Data(col.Data()) + case cnosdb_v4.ColumnTypeFieldFloat: + colArrays[j] = array.NewFloat64Data(col.Data()) + case cnosdb_v4.ColumnTypeFieldInteger: + colArrays[j] = array.NewInt64Data(col.Data()) + case cnosdb_v4.ColumnTypeFieldUnsigned: + colArrays[j] = array.NewUint64Data(col.Data()) + case cnosdb_v4.ColumnTypeFieldBoolean: + colArrays[j] = array.NewBooleanData(col.Data()) + case cnosdb_v4.ColumnTypeFieldString, cnosdb_v4.ColumnTypeFieldGeometry: + colArrays[j] = array.NewStringData(col.Data()) + default: + colName := columnIndexToName[j] + s.accumulator.AddError(fmt.Errorf("column '%s': parsed type(%d) is unknown", colName, colType)) + return server.Send(&kv_service.SubscriptionResponse{}) + } + } + + for i := 0; i < int(numRows); i++ { + tags := make(map[string]string) + fields := make(map[string]interface{}) + var timestamp time.Time + hasTimestamp := false + for j := 0; j < int(numCols); j++ { + if colArrays[j].IsNull(i) { + continue + } + colType := columnIndexToType[j] + switch colType.ColumnType { + case cnosdb_v4.ColumnTypeTag: + tags[columnIndexToName[j]] = colArrays[j].(*array.String).Value(i) + case cnosdb_v4.ColumnTypeTime: + switch colType.TimeUnit { + case cnosdb_v4.TimeUnitSecond: + hasTimestamp = true + timestamp = time.Unix(int64(colArrays[j].(*array.Time64).Value(i)), 0) + case cnosdb_v4.TimeUnitMillisecond: + hasTimestamp = true + timestamp = time.UnixMilli(int64(colArrays[j].(*array.Time64).Value(i))) + case cnosdb_v4.TimeUnitMicrosecond: + hasTimestamp = true + timestamp = time.UnixMicro(int64(colArrays[j].(*array.Time64).Value(i))) + case cnosdb_v4.TimeUnitNanosecond: + hasTimestamp = true + timestamp = time.Unix(0, int64(colArrays[j].(*array.Time64).Value(i))) + default: + colName := columnIndexToName[j] + s.accumulator.AddError(fmt.Errorf("column '%s': parsed time unit(%d) is unknown", colName, colType)) + return server.Send(&kv_service.SubscriptionResponse{}) + } + case cnosdb_v4.ColumnTypeFieldFloat: + fields[columnIndexToName[j]] = colArrays[j].(*array.Float64).Value(i) + case cnosdb_v4.ColumnTypeFieldInteger: + fields[columnIndexToName[j]] = colArrays[j].(*array.Int64).Value(i) + case cnosdb_v4.ColumnTypeFieldUnsigned: + fields[columnIndexToName[j]] = colArrays[j].(*array.Uint64).Value(i) + case cnosdb_v4.ColumnTypeFieldBoolean: + fields[columnIndexToName[j]] = colArrays[j].(*array.Boolean).Value(i) + case cnosdb_v4.ColumnTypeFieldString, cnosdb_v4.ColumnTypeFieldGeometry: + fields[columnIndexToName[j]] = colArrays[j].(*array.String).Value(i) + default: + colName := columnIndexToName[j] + s.accumulator.AddError(fmt.Errorf("column '%s': parsed type(%d) is unknown", colName, colType)) + return server.Send(&kv_service.SubscriptionResponse{}) + } + } + switch acc := s.accumulator.(type) { + case telegraf.HighPriorityAccumulator: + if hasTimestamp { + if err = acc.AddMetricHighPriority(metric.New(tableSchema.Name, tags, fields, timestamp)); err != nil { + acc.AddError(fmt.Errorf("writing data to output failed: %w", err)) + } + } else { + if err = acc.AddMetricHighPriority(metric.New(tableSchema.Name, tags, fields, time.Now())); err != nil { + acc.AddError(fmt.Errorf("writing data to output failed: %w", err)) + } + } + default: + if hasTimestamp { + acc.AddMetric(metric.New(tableSchema.Name, tags, fields, timestamp)) + } else { + acc.AddFields(tableSchema.Name, fields, tags) + } + } + } + } + } + return nil +} diff --git a/plugins/inputs/cnosdb_subscription/cnosdb/v3/bitset.go b/plugins/inputs/cnosdb_subscription/cnosdb/v3/bitset.go new file mode 100644 index 0000000000000..cc7381fc58dfc --- /dev/null +++ b/plugins/inputs/cnosdb_subscription/cnosdb/v3/bitset.go @@ -0,0 +1,15 @@ +package v3 + +type BitSet struct { + Buf []byte + Len int +} + +func (s *BitSet) Get(idx int) bool { + if idx > s.Len { + return false + } + byteIdx := idx >> 3 + bitIdx := idx & 7 + return (s.Buf[byteIdx]>>bitIdx)&1 != 0 +} diff --git a/plugins/inputs/cnosdb_subscription/cnosdb/v3/bitset_test.go b/plugins/inputs/cnosdb_subscription/cnosdb/v3/bitset_test.go new file mode 100644 index 0000000000000..93ce36e178b32 --- /dev/null +++ b/plugins/inputs/cnosdb_subscription/cnosdb/v3/bitset_test.go @@ -0,0 +1,50 @@ +package v3 + +import "testing" + +func TestBitset(t *testing.T) { + var b BitSet + var exp []bool + + b = BitSet{ + Buf: []byte{0xFF}, // 1111_1111 + Len: 8, + } + exp = []bool{true, true, true, true, true, true, true, true} + for i := 0; i < 8; i++ { + if b.Get(i) != exp[i] { + t.Errorf("BitSet.Get(%d) should return %v", i, exp[i]) + } + } + if b.Get(9) { + t.Errorf("BitSet.Get(9) should return false") + } + + b = BitSet{ + Buf: []byte{0xA3}, // 1010_0011 + Len: 8, + } + exp = []bool{true, true, false, false, false, true, false, true} + for i := 0; i < 8; i++ { + if b.Get(i) != exp[i] { + t.Errorf("BitSet.Get(%d) should return %v", i, exp[i]) + } + } + if b.Get(9) { + t.Errorf("BitSet.Get(9) should return false") + } + + b = BitSet{ + Buf: []byte{0x00}, // 0000_0000 + Len: 8, + } + exp = []bool{false, false, false, false, false, false, false, false} + for i := 0; i < 8; i++ { + if b.Get(i) != exp[i] { + t.Errorf("BitSet.Get(%d) should return %v", i, exp[i]) + } + } + if b.Get(9) { + t.Errorf("BitSet.Get(9) should return false") + } +} diff --git a/plugins/inputs/cnosdb_subscription/cnosdb/tskv_table_schema.go b/plugins/inputs/cnosdb_subscription/cnosdb/v4/tskv_table_schema.go similarity index 96% rename from plugins/inputs/cnosdb_subscription/cnosdb/tskv_table_schema.go rename to plugins/inputs/cnosdb_subscription/cnosdb/v4/tskv_table_schema.go index 7196c4276c0e6..32c6c51cb5c2d 100644 --- a/plugins/inputs/cnosdb_subscription/cnosdb/tskv_table_schema.go +++ b/plugins/inputs/cnosdb_subscription/cnosdb/v4/tskv_table_schema.go @@ -1,6 +1,6 @@ -package cnosdb +package v4 -type ColumnType int +type ColumnType uint32 const ( ColumnTypeUnknown ColumnType = iota @@ -15,7 +15,7 @@ const ( ColumnTypeFieldGeometry ) -type TimeUnit int +type TimeUnit uint32 const ( TimeUnitUnknown TimeUnit = iota @@ -106,7 +106,7 @@ func (c *TableColumn) GetColumnTypeUnited() ColumnTypeUnited { } case map[string]interface{}: if geometryInfo := fieldType["Geometry"]; geometryInfo != nil { - // "column_type": {"Field":{"Geometry":{"sub_type":"Point","srid":10}}} + // "column_type": {"Field":{"Geometry":{"sub_type":"Point"}}} fieldTypeCode = ColumnTypeFieldGeometry } } diff --git a/plugins/inputs/cnosdb_subscription/cnosdb/v4/tskv_table_schema_test.go b/plugins/inputs/cnosdb_subscription/cnosdb/v4/tskv_table_schema_test.go new file mode 100644 index 0000000000000..101e67ea1b651 --- /dev/null +++ b/plugins/inputs/cnosdb_subscription/cnosdb/v4/tskv_table_schema_test.go @@ -0,0 +1,64 @@ +package v4 + +import ( + "encoding/json" + "testing" +) + +func TestParseColumnTypeUnited(t *testing.T) { + var jsonStrings = []string{ + `{ "column_type": "tag" }`, + `{ "column_type": "Tag" }`, + `{ "column_type": { "time": "Second" } }`, + `{ "column_type": { "Time": "sec" } }`, + `{ "column_type": { "Time": "Second" } }`, + `{ "column_type": { "Time": "Millisecond" } }`, + `{ "column_type": { "Time": "Microsecond" } }`, + `{ "column_type": { "Time": "Nanosecond" } }`, + `{ "column_type": { "field": "Float" } }`, + `{ "column_type": { "Field": "f64" } }`, + `{ "column_type": { "Field": "Float" } }`, + `{ "column_type": { "Field": "Integer" } }`, + `{ "column_type": { "Field": "Unsigned" } }`, + `{ "column_type": { "Field": "Boolean" } }`, + `{ "column_type": { "Field": "String" } }`, + `{ "column_type": { "Field": { "Geometry": { "sub_type": "Point" } } } }`, + `{ "column_type": { "Field": { "Geometry": { } } } }`, + } + var expectedColType = []ColumnTypeUnited{ + {ColumnType: ColumnTypeUnknown, TimeUnit: TimeUnitUnknown}, + {ColumnType: ColumnTypeTag, TimeUnit: TimeUnitUnknown}, + {ColumnType: ColumnTypeUnknown, TimeUnit: TimeUnitUnknown}, + {ColumnType: ColumnTypeTime, TimeUnit: TimeUnitUnknown}, + {ColumnType: ColumnTypeTime, TimeUnit: TimeUnitSecond}, + {ColumnType: ColumnTypeTime, TimeUnit: TimeUnitMillisecond}, + {ColumnType: ColumnTypeTime, TimeUnit: TimeUnitMicrosecond}, + {ColumnType: ColumnTypeTime, TimeUnit: TimeUnitNanosecond}, + {ColumnType: ColumnTypeUnknown, TimeUnit: TimeUnitUnknown}, + {ColumnType: ColumnTypeFieldUnknown, TimeUnit: TimeUnitUnknown}, + {ColumnType: ColumnTypeFieldFloat, TimeUnit: TimeUnitUnknown}, + {ColumnType: ColumnTypeFieldInteger, TimeUnit: TimeUnitUnknown}, + {ColumnType: ColumnTypeFieldUnsigned, TimeUnit: TimeUnitUnknown}, + {ColumnType: ColumnTypeFieldBoolean, TimeUnit: TimeUnitUnknown}, + {ColumnType: ColumnTypeFieldString, TimeUnit: TimeUnitUnknown}, + {ColumnType: ColumnTypeFieldGeometry, TimeUnit: TimeUnitUnknown}, + {ColumnType: ColumnTypeFieldGeometry, TimeUnit: TimeUnitUnknown}, + } + + if len(jsonStrings) != len(expectedColType) { + t.Fatal("Incorrect init of test case") + } + + for i := 0; i < len(jsonStrings); i++ { + jsonStr := jsonStrings[i] + obj := TableColumn{} + if err := json.Unmarshal([]byte(jsonStr), &obj); err != nil { + t.Error(err) + } + colType := obj.GetColumnTypeUnited() + expColType := expectedColType[i] + if obj.GetColumnTypeUnited() != expColType { + t.Fatal("expected", expColType, "got", colType) + } + } +} diff --git a/plugins/inputs/cnosdb_subscription/cnosdb.go b/plugins/inputs/cnosdb_subscription/cnosdb_sbuscription.go similarity index 88% rename from plugins/inputs/cnosdb_subscription/cnosdb.go rename to plugins/inputs/cnosdb_subscription/cnosdb_sbuscription.go index 82b8b5b544332..b274cf647f8e6 100644 --- a/plugins/inputs/cnosdb_subscription/cnosdb.go +++ b/plugins/inputs/cnosdb_subscription/cnosdb_sbuscription.go @@ -10,7 +10,8 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/inputs" - "github.com/influxdata/telegraf/plugins/inputs/cnosdb_subscription/protos" + "github.com/influxdata/telegraf/plugins/inputs/cnosdb_subscription/cnosdb" + "github.com/influxdata/telegraf/plugins/inputs/cnosdb_subscription/cnosdb/generated/kv_service" "google.golang.org/grpc" ) @@ -27,6 +28,7 @@ var sampleConfig string type CnosDbSubscription struct { ServiceAddress string `toml:"service_address"` + ServiceVersion string `toml:"service_version"` Timeout config.Duration `toml:"timeout"` Log telegraf.Logger `toml:"-"` @@ -52,7 +54,7 @@ func (c *CnosDbSubscription) Gather(_ telegraf.Accumulator) error { func (c *CnosDbSubscription) Start(acc telegraf.Accumulator) error { c.grpcServer = grpc.NewServer(grpc.MaxRecvMsgSize(10 * 1024 * 1024)) - protos.RegisterTSKVServiceServer(c.grpcServer, NewTSKVService(acc)) + kv_service.RegisterTSKVServiceServer(c.grpcServer, cnosdb.NewTSKVService(acc)) if c.listener == nil { listener, err := net.Listen("tcp", c.ServiceAddress) diff --git a/plugins/inputs/cnosdb_subscription/generate.go b/plugins/inputs/cnosdb_subscription/generate.go new file mode 100644 index 0000000000000..a5cfe31b8cb61 --- /dev/null +++ b/plugins/inputs/cnosdb_subscription/generate.go @@ -0,0 +1,5 @@ +package cnosdb_subscription + +//go:generate flatc -o ./cnosdb/generated/models/v3 --go --go-namespace v3 --gen-onefile ./protocol/models.v3.fbs +//go:generate flatc -o ./cnosdb/generated/models/v4 --go --go-namespace v4 --gen-onefile ./protocol/models.v4.fbs +//go:generate protoc --go_out=./cnosdb/generated/kv_service --go-grpc_out=./cnosdb/generated/kv_service --proto_path ./protocol ./protocol/kv_service.proto diff --git a/plugins/inputs/cnosdb_subscription/kv_service_impl.go b/plugins/inputs/cnosdb_subscription/kv_service_impl.go deleted file mode 100644 index 919750e572428..0000000000000 --- a/plugins/inputs/cnosdb_subscription/kv_service_impl.go +++ /dev/null @@ -1,187 +0,0 @@ -package cnosdb_subscription - -import ( - "bufio" - "bytes" - "encoding/json" - "fmt" - "io" - "time" - - "github.com/apache/arrow/go/v15/arrow" - "github.com/apache/arrow/go/v15/arrow/array" - "github.com/apache/arrow/go/v15/arrow/ipc" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/metric" - "github.com/influxdata/telegraf/plugins/inputs/cnosdb_subscription/cnosdb" - "github.com/influxdata/telegraf/plugins/inputs/cnosdb_subscription/protos" -) - -//go:generate flatc -o ./protos --go --go-namespace protos --gen-onefile ./protos/flatbuffers/models.fbs -//go:generate protoc --go_out=./protos --go-grpc_out=./protos ./protos/protobuf/kv_service.proto - -var _ protos.TSKVServiceServer = (*TSKVServiceServerImpl)(nil) - -type TSKVServiceServerImpl struct { - accumulator telegraf.Accumulator - - protos.UnimplementedTSKVServiceServer -} - -func NewTSKVService(acc telegraf.Accumulator) protos.TSKVServiceServer { - return TSKVServiceServerImpl{ - accumulator: acc, - } -} - -// WriteSubscription receive subscription request that has a CnosDB table schema and an Arrow RecordBatch, -// parse them into metrics and write to accumulator. -func (s TSKVServiceServerImpl) WriteSubscription(server protos.TSKVService_WriteSubscriptionServer) error { - for { - req, err := server.Recv() - if err == io.EOF { - break - } - if err != nil { - s.accumulator.AddError(fmt.Errorf("failed to receive SubscriptionRequest: %w", err)) - return server.Send(&protos.SubscriptionResponse{}) - } - - var tableSchema cnosdb.TskvTableSchema - if err := json.Unmarshal(req.TableSchema, &tableSchema); err != nil { - return server.Send(&protos.SubscriptionResponse{}) - } - - recordReader, err := ipc.NewReader(bufio.NewReader(bytes.NewReader(req.RecordData))) - if err != nil { - s.accumulator.AddError(fmt.Errorf("failed to read record data: %w", err)) - return server.Send(&protos.SubscriptionResponse{}) - } - - columnIndexToType := make([]cnosdb.ColumnTypeUnited, len(tableSchema.Columns)) - columnIndexToName := make([]string, len(tableSchema.Columns)) - - for _, col := range tableSchema.Columns { - colType := col.GetColumnTypeUnited() - if colType.ColumnType != cnosdb.ColumnTypeUnknown && colType.ColumnType != cnosdb.ColumnTypeFieldUnknown { - columnIndexToType[col.ID] = colType - } else { - s.accumulator.AddError(fmt.Errorf("column '%s': type is unknown: ", col.Name)) - return server.Send(&protos.SubscriptionResponse{}) - } - columnIndexToName[col.ID] = col.Name - } - for { - r, err := recordReader.Read() - if err == io.EOF { - break - } - if err != nil { - s.accumulator.AddError(fmt.Errorf("failed to read record data: %w", err)) - continue - } - - numRows := r.NumRows() - numCols := r.NumCols() - colArrays := make([]arrow.Array, len(tableSchema.Columns)) - - for j, col := range r.Columns() { - colType := columnIndexToType[j] - switch colType.ColumnType { - case cnosdb.ColumnTypeTag: - colArrays[j] = array.NewStringData(col.Data()) - case cnosdb.ColumnTypeTime: - if colType.TimeUnit == cnosdb.TimeUnitUnknown { - colName := columnIndexToName[j] - s.accumulator.AddError(fmt.Errorf("column '%s': parsed time unit(%d) is unknown", colName, colType)) - return server.Send(&protos.SubscriptionResponse{}) - } - colArrays[j] = array.NewTime64Data(col.Data()) - case cnosdb.ColumnTypeFieldFloat: - colArrays[j] = array.NewFloat64Data(col.Data()) - case cnosdb.ColumnTypeFieldInteger: - colArrays[j] = array.NewInt64Data(col.Data()) - case cnosdb.ColumnTypeFieldUnsigned: - colArrays[j] = array.NewUint64Data(col.Data()) - case cnosdb.ColumnTypeFieldBoolean: - colArrays[j] = array.NewBooleanData(col.Data()) - case cnosdb.ColumnTypeFieldString, cnosdb.ColumnTypeFieldGeometry: - colArrays[j] = array.NewStringData(col.Data()) - default: - colName := columnIndexToName[j] - s.accumulator.AddError(fmt.Errorf("column '%s': parsed type(%d) is unknown", colName, colType)) - return server.Send(&protos.SubscriptionResponse{}) - } - } - - for i := 0; i < int(numRows); i++ { - tags := make(map[string]string) - fields := make(map[string]interface{}) - var timestamp time.Time - hasTimestamp := false - for j := 0; j < int(numCols); j++ { - if colArrays[j].IsNull(i) { - continue - } - colType := columnIndexToType[j] - switch colType.ColumnType { - case cnosdb.ColumnTypeTag: - tags[columnIndexToName[j]] = colArrays[j].(*array.String).Value(i) - case cnosdb.ColumnTypeTime: - switch colType.TimeUnit { - case cnosdb.TimeUnitSecond: - hasTimestamp = true - timestamp = time.Unix(int64(colArrays[j].(*array.Time64).Value(i)), 0) - case cnosdb.TimeUnitMillisecond: - hasTimestamp = true - timestamp = time.UnixMilli(int64(colArrays[j].(*array.Time64).Value(i))) - case cnosdb.TimeUnitMicrosecond: - hasTimestamp = true - timestamp = time.UnixMicro(int64(colArrays[j].(*array.Time64).Value(i))) - case cnosdb.TimeUnitNanosecond: - hasTimestamp = true - timestamp = time.Unix(0, int64(colArrays[j].(*array.Time64).Value(i))) - default: - colName := columnIndexToName[j] - s.accumulator.AddError(fmt.Errorf("column '%s': parsed time unit(%d) is unknown", colName, colType)) - return server.Send(&protos.SubscriptionResponse{}) - } - case cnosdb.ColumnTypeFieldFloat: - fields[columnIndexToName[j]] = colArrays[j].(*array.Float64).Value(i) - case cnosdb.ColumnTypeFieldInteger: - fields[columnIndexToName[j]] = colArrays[j].(*array.Int64).Value(i) - case cnosdb.ColumnTypeFieldUnsigned: - fields[columnIndexToName[j]] = colArrays[j].(*array.Uint64).Value(i) - case cnosdb.ColumnTypeFieldBoolean: - fields[columnIndexToName[j]] = colArrays[j].(*array.Boolean).Value(i) - case cnosdb.ColumnTypeFieldString, cnosdb.ColumnTypeFieldGeometry: - fields[columnIndexToName[j]] = colArrays[j].(*array.String).Value(i) - default: - colName := columnIndexToName[j] - s.accumulator.AddError(fmt.Errorf("column '%s': parsed type(%d) is unknown", colName, colType)) - return server.Send(&protos.SubscriptionResponse{}) - } - } - switch acc := s.accumulator.(type) { - case telegraf.HighPriorityAccumulator: - if hasTimestamp { - if err = acc.AddMetricHighPriority(metric.New(tableSchema.Name, tags, fields, timestamp)); err != nil { - acc.AddError(fmt.Errorf("writing data to output failed: %w", err)) - } - } else { - if err = acc.AddMetricHighPriority(metric.New(tableSchema.Name, tags, fields, time.Now())); err != nil { - acc.AddError(fmt.Errorf("writing data to output failed: %w", err)) - } - } - default: - if hasTimestamp { - acc.AddMetric(metric.New(tableSchema.Name, tags, fields, timestamp)) - } else { - acc.AddFields(tableSchema.Name, fields, tags) - } - } - } - } - } - return nil -} diff --git a/plugins/inputs/cnosdb_subscription/kv_service_impl_test.go b/plugins/inputs/cnosdb_subscription/kv_service_impl_test.go deleted file mode 100644 index 413777ac9d9ce..0000000000000 --- a/plugins/inputs/cnosdb_subscription/kv_service_impl_test.go +++ /dev/null @@ -1,66 +0,0 @@ -package cnosdb_subscription - -import ( - "encoding/json" - "testing" - - "github.com/influxdata/telegraf/plugins/inputs/cnosdb_subscription/cnosdb" -) - -func TestParseColumnTypeUnited(t *testing.T) { - var jsonStrings = []string{ - `{ "column_type": "tag" }`, - `{ "column_type": "Tag" }`, - `{ "column_type": { "time": "Second" } }`, - `{ "column_type": { "Time": "sec" } }`, - `{ "column_type": { "Time": "Second" } }`, - `{ "column_type": { "Time": "Millisecond" } }`, - `{ "column_type": { "Time": "Microsecond" } }`, - `{ "column_type": { "Time": "Nanosecond" } }`, - `{ "column_type": { "field": "Float" } }`, - `{ "column_type": { "Field": "f64" } }`, - `{ "column_type": { "Field": "Float" } }`, - `{ "column_type": { "Field": "Integer" } }`, - `{ "column_type": { "Field": "Unsigned" } }`, - `{ "column_type": { "Field": "Boolean" } }`, - `{ "column_type": { "Field": "String" } }`, - `{ "column_type": { "Field": { "Geometry": { "sub_type": "Point", "srid": 10 } } } }`, - `{ "column_type": { "Field": { "Geometry": { } } } }`, - } - var expectedColType = []cnosdb.ColumnTypeUnited{ - {ColumnType: cnosdb.ColumnTypeUnknown, TimeUnit: cnosdb.TimeUnitUnknown}, - {ColumnType: cnosdb.ColumnTypeTag, TimeUnit: cnosdb.TimeUnitUnknown}, - {ColumnType: cnosdb.ColumnTypeUnknown, TimeUnit: cnosdb.TimeUnitUnknown}, - {ColumnType: cnosdb.ColumnTypeTime, TimeUnit: cnosdb.TimeUnitUnknown}, - {ColumnType: cnosdb.ColumnTypeTime, TimeUnit: cnosdb.TimeUnitSecond}, - {ColumnType: cnosdb.ColumnTypeTime, TimeUnit: cnosdb.TimeUnitMillisecond}, - {ColumnType: cnosdb.ColumnTypeTime, TimeUnit: cnosdb.TimeUnitMicrosecond}, - {ColumnType: cnosdb.ColumnTypeTime, TimeUnit: cnosdb.TimeUnitNanosecond}, - {ColumnType: cnosdb.ColumnTypeUnknown, TimeUnit: cnosdb.TimeUnitUnknown}, - {ColumnType: cnosdb.ColumnTypeFieldUnknown, TimeUnit: cnosdb.TimeUnitUnknown}, - {ColumnType: cnosdb.ColumnTypeFieldFloat, TimeUnit: cnosdb.TimeUnitUnknown}, - {ColumnType: cnosdb.ColumnTypeFieldInteger, TimeUnit: cnosdb.TimeUnitUnknown}, - {ColumnType: cnosdb.ColumnTypeFieldUnsigned, TimeUnit: cnosdb.TimeUnitUnknown}, - {ColumnType: cnosdb.ColumnTypeFieldBoolean, TimeUnit: cnosdb.TimeUnitUnknown}, - {ColumnType: cnosdb.ColumnTypeFieldString, TimeUnit: cnosdb.TimeUnitUnknown}, - {ColumnType: cnosdb.ColumnTypeFieldGeometry, TimeUnit: cnosdb.TimeUnitUnknown}, - {ColumnType: cnosdb.ColumnTypeFieldGeometry, TimeUnit: cnosdb.TimeUnitUnknown}, - } - - if len(jsonStrings) != len(expectedColType) { - t.Fatal("Incorrect init of test case") - } - - for i := 0; i < len(jsonStrings); i++ { - jsonStr := jsonStrings[i] - obj := cnosdb.TableColumn{} - if err := json.Unmarshal([]byte(jsonStr), &obj); err != nil { - t.Error(err) - } - colType := obj.GetColumnTypeUnited() - expColType := expectedColType[i] - if obj.GetColumnTypeUnited() != expColType { - t.Fatal("expected", expColType, "got", colType) - } - } -} diff --git a/plugins/inputs/cnosdb_subscription/protocol/kv_service.proto b/plugins/inputs/cnosdb_subscription/protocol/kv_service.proto new file mode 100644 index 0000000000000..4f2dc3213cf6a --- /dev/null +++ b/plugins/inputs/cnosdb_subscription/protocol/kv_service.proto @@ -0,0 +1,41 @@ +syntax = "proto3"; + +package kv_service; +option go_package = ".;kv_service"; + +// CnosDB subscription v3 message. +message Meta { + string tenant = 1; + optional string user = 2; + optional string password = 3; +} + +// CnosDB subscription v3 message. +message WritePointsRequest { + uint64 version = 1; + Meta meta = 2; + bytes points = 3; +} + +// CnosDB subscription v3 message. +message WritePointsResponse { + uint64 points_number = 1; +} + +// CnosDB subscription v4 message. +message SubscriptionRequest { + uint32 precision = 1; + bytes table_schema = 2; + bytes record_data = 3; +} + +// CnosDB subscription v4 message. +message SubscriptionResponse {} + +service TSKVService { + // CnosDB subscription v3 API. + rpc WritePoints(stream WritePointsRequest) returns (stream WritePointsResponse) {}; + + // CnosDB subscription v4 API. + rpc WriteSubscription(stream SubscriptionRequest) returns (stream SubscriptionResponse){}; +} diff --git a/plugins/inputs/cnosdb_subscription/protocol/models.v3.fbs b/plugins/inputs/cnosdb_subscription/protocol/models.v3.fbs new file mode 100644 index 0000000000000..0695ecb750eab --- /dev/null +++ b/plugins/inputs/cnosdb_subscription/protocol/models.v3.fbs @@ -0,0 +1,49 @@ +namespace models; + +table PingBody { + payload: [ubyte]; +} + +enum FieldType: int { + Unknown = -1, + Float, + Integer, + Unsigned, + Boolean, + String, +} + +table Tag { + value: [ubyte]; +} + +table Field { + value: [ubyte]; +} + +table Schema { + tag_name: [string]; + // field column 1, name field_name[1], type field_type[1] + field_name: [string]; + field_type: [FieldType]; +} + +table Point { + timestamp: int64; + tags: [Tag]; + tags_nullbit: [ubyte]; + fields: [Field]; + fields_nullbit: [ubyte]; +} + +table Table { + tab: [ubyte]; + schema: Schema; + points: [Point]; + num_rows: uint64; +} + +table Points { + db: [ubyte]; + tables: [Table]; +} diff --git a/plugins/inputs/cnosdb_subscription/protos/flatbuffers/models.fbs b/plugins/inputs/cnosdb_subscription/protocol/models.v4.fbs similarity index 97% rename from plugins/inputs/cnosdb_subscription/protos/flatbuffers/models.fbs rename to plugins/inputs/cnosdb_subscription/protocol/models.v4.fbs index 7e3ed698947e7..be45b3033aa32 100644 --- a/plugins/inputs/cnosdb_subscription/protos/flatbuffers/models.fbs +++ b/plugins/inputs/cnosdb_subscription/protocol/models.v4.fbs @@ -1,4 +1,4 @@ -namespace protos; +namespace models; table PingBody { payload: [ubyte]; diff --git a/plugins/inputs/cnosdb_subscription/protos/kv_service.pb.go b/plugins/inputs/cnosdb_subscription/protos/kv_service.pb.go deleted file mode 100644 index 18b85edd3dbcd..0000000000000 --- a/plugins/inputs/cnosdb_subscription/protos/kv_service.pb.go +++ /dev/null @@ -1,225 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.34.2 -// protoc v5.27.1 -// source: protos/protobuf/kv_service.proto - -package protos - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type SubscriptionRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Precision uint32 `protobuf:"varint,1,opt,name=precision,proto3" json:"precision,omitempty"` - TableSchema []byte `protobuf:"bytes,2,opt,name=table_schema,json=tableSchema,proto3" json:"table_schema,omitempty"` - RecordData []byte `protobuf:"bytes,3,opt,name=record_data,json=recordData,proto3" json:"record_data,omitempty"` -} - -func (x *SubscriptionRequest) Reset() { - *x = SubscriptionRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_protos_protobuf_kv_service_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *SubscriptionRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*SubscriptionRequest) ProtoMessage() {} - -func (x *SubscriptionRequest) ProtoReflect() protoreflect.Message { - mi := &file_protos_protobuf_kv_service_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use SubscriptionRequest.ProtoReflect.Descriptor instead. -func (*SubscriptionRequest) Descriptor() ([]byte, []int) { - return file_protos_protobuf_kv_service_proto_rawDescGZIP(), []int{0} -} - -func (x *SubscriptionRequest) GetPrecision() uint32 { - if x != nil { - return x.Precision - } - return 0 -} - -func (x *SubscriptionRequest) GetTableSchema() []byte { - if x != nil { - return x.TableSchema - } - return nil -} - -func (x *SubscriptionRequest) GetRecordData() []byte { - if x != nil { - return x.RecordData - } - return nil -} - -type SubscriptionResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields -} - -func (x *SubscriptionResponse) Reset() { - *x = SubscriptionResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_protos_protobuf_kv_service_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *SubscriptionResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*SubscriptionResponse) ProtoMessage() {} - -func (x *SubscriptionResponse) ProtoReflect() protoreflect.Message { - mi := &file_protos_protobuf_kv_service_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use SubscriptionResponse.ProtoReflect.Descriptor instead. -func (*SubscriptionResponse) Descriptor() ([]byte, []int) { - return file_protos_protobuf_kv_service_proto_rawDescGZIP(), []int{1} -} - -var File_protos_protobuf_kv_service_proto protoreflect.FileDescriptor - -var file_protos_protobuf_kv_service_proto_rawDesc = []byte{ - 0x0a, 0x20, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, - 0x66, 0x2f, 0x6b, 0x76, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x12, 0x0a, 0x6b, 0x76, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x22, 0x77, - 0x0a, 0x13, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x70, 0x72, 0x65, 0x63, 0x69, 0x73, 0x69, - 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, 0x70, 0x72, 0x65, 0x63, 0x69, 0x73, - 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x73, 0x63, 0x68, - 0x65, 0x6d, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x74, 0x61, 0x62, 0x6c, 0x65, - 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, - 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x72, 0x65, 0x63, - 0x6f, 0x72, 0x64, 0x44, 0x61, 0x74, 0x61, 0x22, 0x16, 0x0a, 0x14, 0x53, 0x75, 0x62, 0x73, 0x63, - 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, - 0x6b, 0x0a, 0x0b, 0x54, 0x53, 0x4b, 0x56, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x5c, - 0x0a, 0x11, 0x57, 0x72, 0x69, 0x74, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, - 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x2e, 0x6b, 0x76, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, - 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x6b, 0x76, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, - 0x65, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x0a, 0x5a, 0x08, - 0x2e, 0x3b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_protos_protobuf_kv_service_proto_rawDescOnce sync.Once - file_protos_protobuf_kv_service_proto_rawDescData = file_protos_protobuf_kv_service_proto_rawDesc -) - -func file_protos_protobuf_kv_service_proto_rawDescGZIP() []byte { - file_protos_protobuf_kv_service_proto_rawDescOnce.Do(func() { - file_protos_protobuf_kv_service_proto_rawDescData = protoimpl.X.CompressGZIP(file_protos_protobuf_kv_service_proto_rawDescData) - }) - return file_protos_protobuf_kv_service_proto_rawDescData -} - -var file_protos_protobuf_kv_service_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_protos_protobuf_kv_service_proto_goTypes = []any{ - (*SubscriptionRequest)(nil), // 0: kv_service.SubscriptionRequest - (*SubscriptionResponse)(nil), // 1: kv_service.SubscriptionResponse -} -var file_protos_protobuf_kv_service_proto_depIdxs = []int32{ - 0, // 0: kv_service.TSKVService.WriteSubscription:input_type -> kv_service.SubscriptionRequest - 1, // 1: kv_service.TSKVService.WriteSubscription:output_type -> kv_service.SubscriptionResponse - 1, // [1:2] is the sub-list for method output_type - 0, // [0:1] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name -} - -func init() { file_protos_protobuf_kv_service_proto_init() } -func file_protos_protobuf_kv_service_proto_init() { - if File_protos_protobuf_kv_service_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_protos_protobuf_kv_service_proto_msgTypes[0].Exporter = func(v any, i int) any { - switch v := v.(*SubscriptionRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_protos_protobuf_kv_service_proto_msgTypes[1].Exporter = func(v any, i int) any { - switch v := v.(*SubscriptionResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_protos_protobuf_kv_service_proto_rawDesc, - NumEnums: 0, - NumMessages: 2, - NumExtensions: 0, - NumServices: 1, - }, - GoTypes: file_protos_protobuf_kv_service_proto_goTypes, - DependencyIndexes: file_protos_protobuf_kv_service_proto_depIdxs, - MessageInfos: file_protos_protobuf_kv_service_proto_msgTypes, - }.Build() - File_protos_protobuf_kv_service_proto = out.File - file_protos_protobuf_kv_service_proto_rawDesc = nil - file_protos_protobuf_kv_service_proto_goTypes = nil - file_protos_protobuf_kv_service_proto_depIdxs = nil -} diff --git a/plugins/inputs/cnosdb_subscription/protos/protobuf/kv_service.proto b/plugins/inputs/cnosdb_subscription/protos/protobuf/kv_service.proto deleted file mode 100644 index fd137c35e131c..0000000000000 --- a/plugins/inputs/cnosdb_subscription/protos/protobuf/kv_service.proto +++ /dev/null @@ -1,16 +0,0 @@ -syntax = "proto3"; - -package kv_service; -option go_package = ".;protos"; - -message SubscriptionRequest { - uint32 precision = 1; - bytes table_schema = 2; - bytes record_data = 3; -} - -message SubscriptionResponse {} - -service TSKVService { - rpc WriteSubscription(stream SubscriptionRequest) returns (stream SubscriptionResponse){}; -}