diff --git a/plugins/inputs/all/cnosdb_subscription.go b/plugins/inputs/all/cnosdb_subscription.go new file mode 100644 index 0000000000000..a654a503234ee --- /dev/null +++ b/plugins/inputs/all/cnosdb_subscription.go @@ -0,0 +1,5 @@ +//go:build !custom || inputs || inputs.bcache + +package all + +import _ "github.com/influxdata/telegraf/plugins/inputs/cnosdb_subscription" // register plugin diff --git a/plugins/inputs/cnosdb_subscription/README.md b/plugins/inputs/cnosdb_subscription/README.md new file mode 100644 index 0000000000000..34ea32250e731 --- /dev/null +++ b/plugins/inputs/cnosdb_subscription/README.md @@ -0,0 +1,21 @@ +# CnosDB Subscription Input Plugin + +## Build + +To compile this plugin it requires protoc-gen-go and protoc-gen-go-grpc + +```shell +# install protoc-gen-go +go install google.golang.org/protobuf/cmd/protoc-gen-go@latest +# install protoc-gen-go-grpc +go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest +``` + +## Usages + +To listen on port 8803: + +```toml +[[inputs.cnosdb_subscription]] +service_address = ":8803" +``` diff --git a/plugins/inputs/cnosdb_subscription/cnosdb.go b/plugins/inputs/cnosdb_subscription/cnosdb.go new file mode 100644 index 0000000000000..82b8b5b544332 --- /dev/null +++ b/plugins/inputs/cnosdb_subscription/cnosdb.go @@ -0,0 +1,87 @@ +//go:generate ../../../tools/readme_config_includer/generator +package cnosdb_subscription + +import ( + _ "embed" + "fmt" + "net" + "sync" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/inputs/cnosdb_subscription/protos" + "google.golang.org/grpc" +) + +func init() { + inputs.Add("cnosdb_subscription", func() telegraf.Input { + return &CnosDbSubscription{ + ServiceAddress: ":8803", + } + }) +} + +//go:embed sample.conf +var sampleConfig string + +type CnosDbSubscription struct { + ServiceAddress string `toml:"service_address"` + Timeout config.Duration `toml:"timeout"` + + Log telegraf.Logger `toml:"-"` + + wg sync.WaitGroup `toml:"-"` + + listener net.Listener `toml:"-"` + grpcServer *grpc.Server `toml:"-"` +} + +func (*CnosDbSubscription) SampleConfig() string { + return sampleConfig +} + +func (c *CnosDbSubscription) Init() error { + c.Log.Info("Initialization completed.") + return nil +} + +func (c *CnosDbSubscription) Gather(_ telegraf.Accumulator) error { + return nil +} + +func (c *CnosDbSubscription) Start(acc telegraf.Accumulator) error { + c.grpcServer = grpc.NewServer(grpc.MaxRecvMsgSize(10 * 1024 * 1024)) + protos.RegisterTSKVServiceServer(c.grpcServer, NewTSKVService(acc)) + + if c.listener == nil { + listener, err := net.Listen("tcp", c.ServiceAddress) + if err != nil { + return err + } + c.listener = listener + } + + c.wg.Add(1) + go func() { + defer c.wg.Done() + if err := c.grpcServer.Serve(c.listener); err != nil { + acc.AddError(fmt.Errorf("failed to stop CnosDbSubscription gRPC service: %w", err)) + } + }() + + c.Log.Infof("Listening on %s", c.listener.Addr().String()) + + return nil +} + +func (c *CnosDbSubscription) Stop() { + if c.grpcServer != nil { + c.grpcServer.Stop() + } + c.wg.Wait() +} + +func (c *CnosDbSubscription) MarkHighPriority() { + // Do nothing +} diff --git a/plugins/inputs/cnosdb_subscription/cnosdb/tskv_table_schema.go b/plugins/inputs/cnosdb_subscription/cnosdb/tskv_table_schema.go new file mode 100644 index 0000000000000..7196c4276c0e6 --- /dev/null +++ b/plugins/inputs/cnosdb_subscription/cnosdb/tskv_table_schema.go @@ -0,0 +1,124 @@ +package cnosdb + +type ColumnType int + +const ( + ColumnTypeUnknown ColumnType = iota + ColumnTypeTag + ColumnTypeTime + ColumnTypeFieldUnknown + ColumnTypeFieldFloat + ColumnTypeFieldInteger + ColumnTypeFieldUnsigned + ColumnTypeFieldBoolean + ColumnTypeFieldString + ColumnTypeFieldGeometry +) + +type TimeUnit int + +const ( + TimeUnitUnknown TimeUnit = iota + TimeUnitSecond + TimeUnitMillisecond + TimeUnitMicrosecond + TimeUnitNanosecond +) + +type TskvTableSchema struct { + Tenant string `json:"tenant"` + Db string `json:"db"` + Name string `json:"name"` + SchemaVersion uint64 `json:"schema_version"` + NextColumnID uint32 `json:"next_column_id"` + Columns []TableColumn `json:"columns"` + ColumnsIndex map[string]uint32 `json:"columns_index"` +} + +type TableColumn struct { + ID uint64 `json:"id"` + Name string `json:"name"` + ColumnType interface{} `json:"column_type"` + Encoding interface{} `json:"encoding"` +} + +type ColumnTypeUnited struct { + ColumnType ColumnType + TimeUnit TimeUnit +} + +func (c *TableColumn) GetColumnTypeUnited() ColumnTypeUnited { + switch columnType := c.ColumnType.(type) { + case string: + if columnType == "Tag" { + // "column_type": "Tag" + return ColumnTypeUnited{ + ColumnType: ColumnTypeTag, + TimeUnit: TimeUnitUnknown, + } + } + case map[string]interface{}: + if timeUnitObj := columnType["Time"]; timeUnitObj != nil { + // "column_type": {"Time":"Microsecond"} + if timeUnit, ok := timeUnitObj.(string); ok { + timeUnitCode := TimeUnitUnknown + switch timeUnit { + case "Second": + timeUnitCode = TimeUnitSecond + case "Millisecond": + timeUnitCode = TimeUnitMillisecond + case "Microsecond": + timeUnitCode = TimeUnitMicrosecond + case "Nanosecond": + timeUnitCode = TimeUnitNanosecond + } + return ColumnTypeUnited{ + ColumnType: ColumnTypeTime, + TimeUnit: timeUnitCode, + } + } + } else if fieldTypeObj := columnType["Field"]; fieldTypeObj != nil { + fieldTypeCode := ColumnTypeFieldUnknown + switch fieldType := fieldTypeObj.(type) { + case string: + switch fieldType { + case "Float": + // "column_type": {"Field":"Float"} + fieldTypeCode = ColumnTypeFieldFloat + case "Integer": + // "column_type": {"Field":"Integer"} + fieldTypeCode = ColumnTypeFieldInteger + case "Unsigned": + // "column_type": {"Field":"Unsigned"} + fieldTypeCode = ColumnTypeFieldUnsigned + case "Boolean": + // "column_type": {"Field":"Boolean"} + fieldTypeCode = ColumnTypeFieldBoolean + case "String": + // "column_type": {"Field":"String"} + fieldTypeCode = ColumnTypeFieldString + case "Geometry": + // "column_type": {"Field":"Geometry"} + fieldTypeCode = ColumnTypeFieldGeometry + case "Unknown": + // "column_type": {"Field":"Unknown"} + fieldTypeCode = ColumnTypeFieldUnknown + } + case map[string]interface{}: + if geometryInfo := fieldType["Geometry"]; geometryInfo != nil { + // "column_type": {"Field":{"Geometry":{"sub_type":"Point","srid":10}}} + fieldTypeCode = ColumnTypeFieldGeometry + } + } + return ColumnTypeUnited{ + ColumnType: fieldTypeCode, + TimeUnit: TimeUnitUnknown, + } + } + } + + return ColumnTypeUnited{ + ColumnType: ColumnTypeUnknown, + TimeUnit: TimeUnitUnknown, + } +} diff --git a/plugins/inputs/cnosdb_subscription/kv_service_impl.go b/plugins/inputs/cnosdb_subscription/kv_service_impl.go new file mode 100644 index 0000000000000..919750e572428 --- /dev/null +++ b/plugins/inputs/cnosdb_subscription/kv_service_impl.go @@ -0,0 +1,187 @@ +package cnosdb_subscription + +import ( + "bufio" + "bytes" + "encoding/json" + "fmt" + "io" + "time" + + "github.com/apache/arrow/go/v15/arrow" + "github.com/apache/arrow/go/v15/arrow/array" + "github.com/apache/arrow/go/v15/arrow/ipc" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/inputs/cnosdb_subscription/cnosdb" + "github.com/influxdata/telegraf/plugins/inputs/cnosdb_subscription/protos" +) + +//go:generate flatc -o ./protos --go --go-namespace protos --gen-onefile ./protos/flatbuffers/models.fbs +//go:generate protoc --go_out=./protos --go-grpc_out=./protos ./protos/protobuf/kv_service.proto + +var _ protos.TSKVServiceServer = (*TSKVServiceServerImpl)(nil) + +type TSKVServiceServerImpl struct { + accumulator telegraf.Accumulator + + protos.UnimplementedTSKVServiceServer +} + +func NewTSKVService(acc telegraf.Accumulator) protos.TSKVServiceServer { + return TSKVServiceServerImpl{ + accumulator: acc, + } +} + +// WriteSubscription receive subscription request that has a CnosDB table schema and an Arrow RecordBatch, +// parse them into metrics and write to accumulator. +func (s TSKVServiceServerImpl) WriteSubscription(server protos.TSKVService_WriteSubscriptionServer) error { + for { + req, err := server.Recv() + if err == io.EOF { + break + } + if err != nil { + s.accumulator.AddError(fmt.Errorf("failed to receive SubscriptionRequest: %w", err)) + return server.Send(&protos.SubscriptionResponse{}) + } + + var tableSchema cnosdb.TskvTableSchema + if err := json.Unmarshal(req.TableSchema, &tableSchema); err != nil { + return server.Send(&protos.SubscriptionResponse{}) + } + + recordReader, err := ipc.NewReader(bufio.NewReader(bytes.NewReader(req.RecordData))) + if err != nil { + s.accumulator.AddError(fmt.Errorf("failed to read record data: %w", err)) + return server.Send(&protos.SubscriptionResponse{}) + } + + columnIndexToType := make([]cnosdb.ColumnTypeUnited, len(tableSchema.Columns)) + columnIndexToName := make([]string, len(tableSchema.Columns)) + + for _, col := range tableSchema.Columns { + colType := col.GetColumnTypeUnited() + if colType.ColumnType != cnosdb.ColumnTypeUnknown && colType.ColumnType != cnosdb.ColumnTypeFieldUnknown { + columnIndexToType[col.ID] = colType + } else { + s.accumulator.AddError(fmt.Errorf("column '%s': type is unknown: ", col.Name)) + return server.Send(&protos.SubscriptionResponse{}) + } + columnIndexToName[col.ID] = col.Name + } + for { + r, err := recordReader.Read() + if err == io.EOF { + break + } + if err != nil { + s.accumulator.AddError(fmt.Errorf("failed to read record data: %w", err)) + continue + } + + numRows := r.NumRows() + numCols := r.NumCols() + colArrays := make([]arrow.Array, len(tableSchema.Columns)) + + for j, col := range r.Columns() { + colType := columnIndexToType[j] + switch colType.ColumnType { + case cnosdb.ColumnTypeTag: + colArrays[j] = array.NewStringData(col.Data()) + case cnosdb.ColumnTypeTime: + if colType.TimeUnit == cnosdb.TimeUnitUnknown { + colName := columnIndexToName[j] + s.accumulator.AddError(fmt.Errorf("column '%s': parsed time unit(%d) is unknown", colName, colType)) + return server.Send(&protos.SubscriptionResponse{}) + } + colArrays[j] = array.NewTime64Data(col.Data()) + case cnosdb.ColumnTypeFieldFloat: + colArrays[j] = array.NewFloat64Data(col.Data()) + case cnosdb.ColumnTypeFieldInteger: + colArrays[j] = array.NewInt64Data(col.Data()) + case cnosdb.ColumnTypeFieldUnsigned: + colArrays[j] = array.NewUint64Data(col.Data()) + case cnosdb.ColumnTypeFieldBoolean: + colArrays[j] = array.NewBooleanData(col.Data()) + case cnosdb.ColumnTypeFieldString, cnosdb.ColumnTypeFieldGeometry: + colArrays[j] = array.NewStringData(col.Data()) + default: + colName := columnIndexToName[j] + s.accumulator.AddError(fmt.Errorf("column '%s': parsed type(%d) is unknown", colName, colType)) + return server.Send(&protos.SubscriptionResponse{}) + } + } + + for i := 0; i < int(numRows); i++ { + tags := make(map[string]string) + fields := make(map[string]interface{}) + var timestamp time.Time + hasTimestamp := false + for j := 0; j < int(numCols); j++ { + if colArrays[j].IsNull(i) { + continue + } + colType := columnIndexToType[j] + switch colType.ColumnType { + case cnosdb.ColumnTypeTag: + tags[columnIndexToName[j]] = colArrays[j].(*array.String).Value(i) + case cnosdb.ColumnTypeTime: + switch colType.TimeUnit { + case cnosdb.TimeUnitSecond: + hasTimestamp = true + timestamp = time.Unix(int64(colArrays[j].(*array.Time64).Value(i)), 0) + case cnosdb.TimeUnitMillisecond: + hasTimestamp = true + timestamp = time.UnixMilli(int64(colArrays[j].(*array.Time64).Value(i))) + case cnosdb.TimeUnitMicrosecond: + hasTimestamp = true + timestamp = time.UnixMicro(int64(colArrays[j].(*array.Time64).Value(i))) + case cnosdb.TimeUnitNanosecond: + hasTimestamp = true + timestamp = time.Unix(0, int64(colArrays[j].(*array.Time64).Value(i))) + default: + colName := columnIndexToName[j] + s.accumulator.AddError(fmt.Errorf("column '%s': parsed time unit(%d) is unknown", colName, colType)) + return server.Send(&protos.SubscriptionResponse{}) + } + case cnosdb.ColumnTypeFieldFloat: + fields[columnIndexToName[j]] = colArrays[j].(*array.Float64).Value(i) + case cnosdb.ColumnTypeFieldInteger: + fields[columnIndexToName[j]] = colArrays[j].(*array.Int64).Value(i) + case cnosdb.ColumnTypeFieldUnsigned: + fields[columnIndexToName[j]] = colArrays[j].(*array.Uint64).Value(i) + case cnosdb.ColumnTypeFieldBoolean: + fields[columnIndexToName[j]] = colArrays[j].(*array.Boolean).Value(i) + case cnosdb.ColumnTypeFieldString, cnosdb.ColumnTypeFieldGeometry: + fields[columnIndexToName[j]] = colArrays[j].(*array.String).Value(i) + default: + colName := columnIndexToName[j] + s.accumulator.AddError(fmt.Errorf("column '%s': parsed type(%d) is unknown", colName, colType)) + return server.Send(&protos.SubscriptionResponse{}) + } + } + switch acc := s.accumulator.(type) { + case telegraf.HighPriorityAccumulator: + if hasTimestamp { + if err = acc.AddMetricHighPriority(metric.New(tableSchema.Name, tags, fields, timestamp)); err != nil { + acc.AddError(fmt.Errorf("writing data to output failed: %w", err)) + } + } else { + if err = acc.AddMetricHighPriority(metric.New(tableSchema.Name, tags, fields, time.Now())); err != nil { + acc.AddError(fmt.Errorf("writing data to output failed: %w", err)) + } + } + default: + if hasTimestamp { + acc.AddMetric(metric.New(tableSchema.Name, tags, fields, timestamp)) + } else { + acc.AddFields(tableSchema.Name, fields, tags) + } + } + } + } + } + return nil +} diff --git a/plugins/inputs/cnosdb_subscription/kv_service_impl_test.go b/plugins/inputs/cnosdb_subscription/kv_service_impl_test.go new file mode 100644 index 0000000000000..413777ac9d9ce --- /dev/null +++ b/plugins/inputs/cnosdb_subscription/kv_service_impl_test.go @@ -0,0 +1,66 @@ +package cnosdb_subscription + +import ( + "encoding/json" + "testing" + + "github.com/influxdata/telegraf/plugins/inputs/cnosdb_subscription/cnosdb" +) + +func TestParseColumnTypeUnited(t *testing.T) { + var jsonStrings = []string{ + `{ "column_type": "tag" }`, + `{ "column_type": "Tag" }`, + `{ "column_type": { "time": "Second" } }`, + `{ "column_type": { "Time": "sec" } }`, + `{ "column_type": { "Time": "Second" } }`, + `{ "column_type": { "Time": "Millisecond" } }`, + `{ "column_type": { "Time": "Microsecond" } }`, + `{ "column_type": { "Time": "Nanosecond" } }`, + `{ "column_type": { "field": "Float" } }`, + `{ "column_type": { "Field": "f64" } }`, + `{ "column_type": { "Field": "Float" } }`, + `{ "column_type": { "Field": "Integer" } }`, + `{ "column_type": { "Field": "Unsigned" } }`, + `{ "column_type": { "Field": "Boolean" } }`, + `{ "column_type": { "Field": "String" } }`, + `{ "column_type": { "Field": { "Geometry": { "sub_type": "Point", "srid": 10 } } } }`, + `{ "column_type": { "Field": { "Geometry": { } } } }`, + } + var expectedColType = []cnosdb.ColumnTypeUnited{ + {ColumnType: cnosdb.ColumnTypeUnknown, TimeUnit: cnosdb.TimeUnitUnknown}, + {ColumnType: cnosdb.ColumnTypeTag, TimeUnit: cnosdb.TimeUnitUnknown}, + {ColumnType: cnosdb.ColumnTypeUnknown, TimeUnit: cnosdb.TimeUnitUnknown}, + {ColumnType: cnosdb.ColumnTypeTime, TimeUnit: cnosdb.TimeUnitUnknown}, + {ColumnType: cnosdb.ColumnTypeTime, TimeUnit: cnosdb.TimeUnitSecond}, + {ColumnType: cnosdb.ColumnTypeTime, TimeUnit: cnosdb.TimeUnitMillisecond}, + {ColumnType: cnosdb.ColumnTypeTime, TimeUnit: cnosdb.TimeUnitMicrosecond}, + {ColumnType: cnosdb.ColumnTypeTime, TimeUnit: cnosdb.TimeUnitNanosecond}, + {ColumnType: cnosdb.ColumnTypeUnknown, TimeUnit: cnosdb.TimeUnitUnknown}, + {ColumnType: cnosdb.ColumnTypeFieldUnknown, TimeUnit: cnosdb.TimeUnitUnknown}, + {ColumnType: cnosdb.ColumnTypeFieldFloat, TimeUnit: cnosdb.TimeUnitUnknown}, + {ColumnType: cnosdb.ColumnTypeFieldInteger, TimeUnit: cnosdb.TimeUnitUnknown}, + {ColumnType: cnosdb.ColumnTypeFieldUnsigned, TimeUnit: cnosdb.TimeUnitUnknown}, + {ColumnType: cnosdb.ColumnTypeFieldBoolean, TimeUnit: cnosdb.TimeUnitUnknown}, + {ColumnType: cnosdb.ColumnTypeFieldString, TimeUnit: cnosdb.TimeUnitUnknown}, + {ColumnType: cnosdb.ColumnTypeFieldGeometry, TimeUnit: cnosdb.TimeUnitUnknown}, + {ColumnType: cnosdb.ColumnTypeFieldGeometry, TimeUnit: cnosdb.TimeUnitUnknown}, + } + + if len(jsonStrings) != len(expectedColType) { + t.Fatal("Incorrect init of test case") + } + + for i := 0; i < len(jsonStrings); i++ { + jsonStr := jsonStrings[i] + obj := cnosdb.TableColumn{} + if err := json.Unmarshal([]byte(jsonStr), &obj); err != nil { + t.Error(err) + } + colType := obj.GetColumnTypeUnited() + expColType := expectedColType[i] + if obj.GetColumnTypeUnited() != expColType { + t.Fatal("expected", expColType, "got", colType) + } + } +} diff --git a/plugins/inputs/cnosdb_subscription/protos/flatbuffers/models.fbs b/plugins/inputs/cnosdb_subscription/protos/flatbuffers/models.fbs new file mode 100644 index 0000000000000..7e3ed698947e7 --- /dev/null +++ b/plugins/inputs/cnosdb_subscription/protos/flatbuffers/models.fbs @@ -0,0 +1,48 @@ +namespace protos; + +table PingBody { + payload: [ubyte]; +} + +enum FieldType: int { + Unknown = -1, + Float, + Integer, + Unsigned, + Boolean, + String, +} + +enum ColumnType : int { + Unknown = -1, + Time, + Tag, + Field, +} + +table Values { + float_value: [float64]; + int_value: [int64]; + uint_value: [uint64]; + bool_value: [bool]; + string_value: [string]; +} + +table Column { + name: string; + column_type: ColumnType; + field_type: FieldType; + col_values: Values; + nullbits: [ubyte]; +} + +table Table { + tab: string; + columns: [Column]; + num_rows: uint64; +} + +table Points { + db: string; + tables: [Table]; +} diff --git a/plugins/inputs/cnosdb_subscription/protos/kv_service.pb.go b/plugins/inputs/cnosdb_subscription/protos/kv_service.pb.go new file mode 100644 index 0000000000000..18b85edd3dbcd --- /dev/null +++ b/plugins/inputs/cnosdb_subscription/protos/kv_service.pb.go @@ -0,0 +1,225 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.34.2 +// protoc v5.27.1 +// source: protos/protobuf/kv_service.proto + +package protos + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type SubscriptionRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Precision uint32 `protobuf:"varint,1,opt,name=precision,proto3" json:"precision,omitempty"` + TableSchema []byte `protobuf:"bytes,2,opt,name=table_schema,json=tableSchema,proto3" json:"table_schema,omitempty"` + RecordData []byte `protobuf:"bytes,3,opt,name=record_data,json=recordData,proto3" json:"record_data,omitempty"` +} + +func (x *SubscriptionRequest) Reset() { + *x = SubscriptionRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_protos_protobuf_kv_service_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscriptionRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscriptionRequest) ProtoMessage() {} + +func (x *SubscriptionRequest) ProtoReflect() protoreflect.Message { + mi := &file_protos_protobuf_kv_service_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscriptionRequest.ProtoReflect.Descriptor instead. +func (*SubscriptionRequest) Descriptor() ([]byte, []int) { + return file_protos_protobuf_kv_service_proto_rawDescGZIP(), []int{0} +} + +func (x *SubscriptionRequest) GetPrecision() uint32 { + if x != nil { + return x.Precision + } + return 0 +} + +func (x *SubscriptionRequest) GetTableSchema() []byte { + if x != nil { + return x.TableSchema + } + return nil +} + +func (x *SubscriptionRequest) GetRecordData() []byte { + if x != nil { + return x.RecordData + } + return nil +} + +type SubscriptionResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *SubscriptionResponse) Reset() { + *x = SubscriptionResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_protos_protobuf_kv_service_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscriptionResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscriptionResponse) ProtoMessage() {} + +func (x *SubscriptionResponse) ProtoReflect() protoreflect.Message { + mi := &file_protos_protobuf_kv_service_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscriptionResponse.ProtoReflect.Descriptor instead. +func (*SubscriptionResponse) Descriptor() ([]byte, []int) { + return file_protos_protobuf_kv_service_proto_rawDescGZIP(), []int{1} +} + +var File_protos_protobuf_kv_service_proto protoreflect.FileDescriptor + +var file_protos_protobuf_kv_service_proto_rawDesc = []byte{ + 0x0a, 0x20, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2f, 0x6b, 0x76, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x12, 0x0a, 0x6b, 0x76, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x22, 0x77, + 0x0a, 0x13, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x70, 0x72, 0x65, 0x63, 0x69, 0x73, 0x69, + 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, 0x70, 0x72, 0x65, 0x63, 0x69, 0x73, + 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x73, 0x63, 0x68, + 0x65, 0x6d, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x74, 0x61, 0x62, 0x6c, 0x65, + 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, + 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x72, 0x65, 0x63, + 0x6f, 0x72, 0x64, 0x44, 0x61, 0x74, 0x61, 0x22, 0x16, 0x0a, 0x14, 0x53, 0x75, 0x62, 0x73, 0x63, + 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, + 0x6b, 0x0a, 0x0b, 0x54, 0x53, 0x4b, 0x56, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x5c, + 0x0a, 0x11, 0x57, 0x72, 0x69, 0x74, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, + 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x2e, 0x6b, 0x76, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x6b, 0x76, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x65, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x0a, 0x5a, 0x08, + 0x2e, 0x3b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_protos_protobuf_kv_service_proto_rawDescOnce sync.Once + file_protos_protobuf_kv_service_proto_rawDescData = file_protos_protobuf_kv_service_proto_rawDesc +) + +func file_protos_protobuf_kv_service_proto_rawDescGZIP() []byte { + file_protos_protobuf_kv_service_proto_rawDescOnce.Do(func() { + file_protos_protobuf_kv_service_proto_rawDescData = protoimpl.X.CompressGZIP(file_protos_protobuf_kv_service_proto_rawDescData) + }) + return file_protos_protobuf_kv_service_proto_rawDescData +} + +var file_protos_protobuf_kv_service_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_protos_protobuf_kv_service_proto_goTypes = []any{ + (*SubscriptionRequest)(nil), // 0: kv_service.SubscriptionRequest + (*SubscriptionResponse)(nil), // 1: kv_service.SubscriptionResponse +} +var file_protos_protobuf_kv_service_proto_depIdxs = []int32{ + 0, // 0: kv_service.TSKVService.WriteSubscription:input_type -> kv_service.SubscriptionRequest + 1, // 1: kv_service.TSKVService.WriteSubscription:output_type -> kv_service.SubscriptionResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_protos_protobuf_kv_service_proto_init() } +func file_protos_protobuf_kv_service_proto_init() { + if File_protos_protobuf_kv_service_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_protos_protobuf_kv_service_proto_msgTypes[0].Exporter = func(v any, i int) any { + switch v := v.(*SubscriptionRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_protos_protobuf_kv_service_proto_msgTypes[1].Exporter = func(v any, i int) any { + switch v := v.(*SubscriptionResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_protos_protobuf_kv_service_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_protos_protobuf_kv_service_proto_goTypes, + DependencyIndexes: file_protos_protobuf_kv_service_proto_depIdxs, + MessageInfos: file_protos_protobuf_kv_service_proto_msgTypes, + }.Build() + File_protos_protobuf_kv_service_proto = out.File + file_protos_protobuf_kv_service_proto_rawDesc = nil + file_protos_protobuf_kv_service_proto_goTypes = nil + file_protos_protobuf_kv_service_proto_depIdxs = nil +} diff --git a/plugins/inputs/cnosdb_subscription/protos/kv_service_grpc.pb.go b/plugins/inputs/cnosdb_subscription/protos/kv_service_grpc.pb.go new file mode 100644 index 0000000000000..a82d3295c2838 --- /dev/null +++ b/plugins/inputs/cnosdb_subscription/protos/kv_service_grpc.pb.go @@ -0,0 +1,115 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.5.1 +// - protoc v5.27.1 +// source: protos/protobuf/kv_service.proto + +package protos + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + TSKVService_WriteSubscription_FullMethodName = "/kv_service.TSKVService/WriteSubscription" +) + +// TSKVServiceClient is the client API for TSKVService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type TSKVServiceClient interface { + WriteSubscription(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[SubscriptionRequest, SubscriptionResponse], error) +} + +type tSKVServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewTSKVServiceClient(cc grpc.ClientConnInterface) TSKVServiceClient { + return &tSKVServiceClient{cc} +} + +func (c *tSKVServiceClient) WriteSubscription(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[SubscriptionRequest, SubscriptionResponse], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &TSKVService_ServiceDesc.Streams[0], TSKVService_WriteSubscription_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[SubscriptionRequest, SubscriptionResponse]{ClientStream: stream} + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type TSKVService_WriteSubscriptionClient = grpc.BidiStreamingClient[SubscriptionRequest, SubscriptionResponse] + +// TSKVServiceServer is the server API for TSKVService service. +// All implementations must embed UnimplementedTSKVServiceServer +// for forward compatibility. +type TSKVServiceServer interface { + WriteSubscription(grpc.BidiStreamingServer[SubscriptionRequest, SubscriptionResponse]) error + mustEmbedUnimplementedTSKVServiceServer() +} + +// UnimplementedTSKVServiceServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedTSKVServiceServer struct{} + +func (UnimplementedTSKVServiceServer) WriteSubscription(grpc.BidiStreamingServer[SubscriptionRequest, SubscriptionResponse]) error { + return status.Errorf(codes.Unimplemented, "method WriteSubscription not implemented") +} +func (UnimplementedTSKVServiceServer) mustEmbedUnimplementedTSKVServiceServer() {} +func (UnimplementedTSKVServiceServer) testEmbeddedByValue() {} + +// UnsafeTSKVServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to TSKVServiceServer will +// result in compilation errors. +type UnsafeTSKVServiceServer interface { + mustEmbedUnimplementedTSKVServiceServer() +} + +func RegisterTSKVServiceServer(s grpc.ServiceRegistrar, srv TSKVServiceServer) { + // If the following call pancis, it indicates UnimplementedTSKVServiceServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&TSKVService_ServiceDesc, srv) +} + +func _TSKVService_WriteSubscription_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(TSKVServiceServer).WriteSubscription(&grpc.GenericServerStream[SubscriptionRequest, SubscriptionResponse]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type TSKVService_WriteSubscriptionServer = grpc.BidiStreamingServer[SubscriptionRequest, SubscriptionResponse] + +// TSKVService_ServiceDesc is the grpc.ServiceDesc for TSKVService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var TSKVService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "kv_service.TSKVService", + HandlerType: (*TSKVServiceServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "WriteSubscription", + Handler: _TSKVService_WriteSubscription_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "protos/protobuf/kv_service.proto", +} diff --git a/plugins/inputs/cnosdb_subscription/protos/models_generated.go b/plugins/inputs/cnosdb_subscription/protos/models_generated.go new file mode 100644 index 0000000000000..c5d6b972ef287 --- /dev/null +++ b/plugins/inputs/cnosdb_subscription/protos/models_generated.go @@ -0,0 +1,661 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package protos + +import ( + flatbuffers "github.com/google/flatbuffers/go" + "strconv" +) + +type FieldType int32 + +const ( + FieldTypeUnknown FieldType = -1 + FieldTypeFloat FieldType = 0 + FieldTypeInteger FieldType = 1 + FieldTypeUnsigned FieldType = 2 + FieldTypeBoolean FieldType = 3 + FieldTypeString FieldType = 4 +) + +var EnumNamesFieldType = map[FieldType]string{ + FieldTypeUnknown: "Unknown", + FieldTypeFloat: "Float", + FieldTypeInteger: "Integer", + FieldTypeUnsigned: "Unsigned", + FieldTypeBoolean: "Boolean", + FieldTypeString: "String", +} + +var EnumValuesFieldType = map[string]FieldType{ + "Unknown": FieldTypeUnknown, + "Float": FieldTypeFloat, + "Integer": FieldTypeInteger, + "Unsigned": FieldTypeUnsigned, + "Boolean": FieldTypeBoolean, + "String": FieldTypeString, +} + +func (v FieldType) String() string { + if s, ok := EnumNamesFieldType[v]; ok { + return s + } + return "FieldType(" + strconv.FormatInt(int64(v), 10) + ")" +} + +type ColumnType int32 + +const ( + ColumnTypeUnknown ColumnType = -1 + ColumnTypeTime ColumnType = 0 + ColumnTypeTag ColumnType = 1 + ColumnTypeField ColumnType = 2 +) + +var EnumNamesColumnType = map[ColumnType]string{ + ColumnTypeUnknown: "Unknown", + ColumnTypeTime: "Time", + ColumnTypeTag: "Tag", + ColumnTypeField: "Field", +} + +var EnumValuesColumnType = map[string]ColumnType{ + "Unknown": ColumnTypeUnknown, + "Time": ColumnTypeTime, + "Tag": ColumnTypeTag, + "Field": ColumnTypeField, +} + +func (v ColumnType) String() string { + if s, ok := EnumNamesColumnType[v]; ok { + return s + } + return "ColumnType(" + strconv.FormatInt(int64(v), 10) + ")" +} + +type PingBody struct { + _tab flatbuffers.Table +} + +func GetRootAsPingBody(buf []byte, offset flatbuffers.UOffsetT) *PingBody { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &PingBody{} + x.Init(buf, n+offset) + return x +} + +func FinishPingBodyBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.Finish(offset) +} + +func GetSizePrefixedRootAsPingBody(buf []byte, offset flatbuffers.UOffsetT) *PingBody { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &PingBody{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func FinishSizePrefixedPingBodyBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.FinishSizePrefixed(offset) +} + +func (rcv *PingBody) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *PingBody) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *PingBody) Payload(j int) byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1)) + } + return 0 +} + +func (rcv *PingBody) PayloadLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *PingBody) PayloadBytes() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *PingBody) MutatePayload(j int, n byte) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n) + } + return false +} + +func PingBodyStart(builder *flatbuffers.Builder) { + builder.StartObject(1) +} +func PingBodyAddPayload(builder *flatbuffers.Builder, payload flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(payload), 0) +} +func PingBodyStartPayloadVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(1, numElems, 1) +} +func PingBodyEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} + +type Values struct { + _tab flatbuffers.Table +} + +func GetRootAsValues(buf []byte, offset flatbuffers.UOffsetT) *Values { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Values{} + x.Init(buf, n+offset) + return x +} + +func FinishValuesBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.Finish(offset) +} + +func GetSizePrefixedRootAsValues(buf []byte, offset flatbuffers.UOffsetT) *Values { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &Values{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func FinishSizePrefixedValuesBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.FinishSizePrefixed(offset) +} + +func (rcv *Values) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Values) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Values) FloatValue(j int) float64 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.GetFloat64(a + flatbuffers.UOffsetT(j*8)) + } + return 0 +} + +func (rcv *Values) FloatValueLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Values) MutateFloatValue(j int, n float64) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.MutateFloat64(a+flatbuffers.UOffsetT(j*8), n) + } + return false +} + +func (rcv *Values) IntValue(j int) int64 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.GetInt64(a + flatbuffers.UOffsetT(j*8)) + } + return 0 +} + +func (rcv *Values) IntValueLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Values) MutateIntValue(j int, n int64) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.MutateInt64(a+flatbuffers.UOffsetT(j*8), n) + } + return false +} + +func (rcv *Values) UintValue(j int) uint64 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.GetUint64(a + flatbuffers.UOffsetT(j*8)) + } + return 0 +} + +func (rcv *Values) UintValueLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Values) MutateUintValue(j int, n uint64) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.MutateUint64(a+flatbuffers.UOffsetT(j*8), n) + } + return false +} + +func (rcv *Values) BoolValue(j int) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.GetBool(a + flatbuffers.UOffsetT(j*1)) + } + return false +} + +func (rcv *Values) BoolValueLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Values) MutateBoolValue(j int, n bool) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.MutateBool(a+flatbuffers.UOffsetT(j*1), n) + } + return false +} + +func (rcv *Values) StringValue(j int) []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.ByteVector(a + flatbuffers.UOffsetT(j*4)) + } + return nil +} + +func (rcv *Values) StringValueLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func ValuesStart(builder *flatbuffers.Builder) { + builder.StartObject(5) +} +func ValuesAddFloatValue(builder *flatbuffers.Builder, floatValue flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(floatValue), 0) +} +func ValuesStartFloatValueVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(8, numElems, 8) +} +func ValuesAddIntValue(builder *flatbuffers.Builder, intValue flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(intValue), 0) +} +func ValuesStartIntValueVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(8, numElems, 8) +} +func ValuesAddUintValue(builder *flatbuffers.Builder, uintValue flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(uintValue), 0) +} +func ValuesStartUintValueVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(8, numElems, 8) +} +func ValuesAddBoolValue(builder *flatbuffers.Builder, boolValue flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(boolValue), 0) +} +func ValuesStartBoolValueVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(1, numElems, 1) +} +func ValuesAddStringValue(builder *flatbuffers.Builder, stringValue flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(4, flatbuffers.UOffsetT(stringValue), 0) +} +func ValuesStartStringValueVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func ValuesEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} + +type Column struct { + _tab flatbuffers.Table +} + +func GetRootAsColumn(buf []byte, offset flatbuffers.UOffsetT) *Column { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Column{} + x.Init(buf, n+offset) + return x +} + +func FinishColumnBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.Finish(offset) +} + +func GetSizePrefixedRootAsColumn(buf []byte, offset flatbuffers.UOffsetT) *Column { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &Column{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func FinishSizePrefixedColumnBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.FinishSizePrefixed(offset) +} + +func (rcv *Column) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Column) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Column) Name() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Column) ColumnType() ColumnType { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return ColumnType(rcv._tab.GetInt32(o + rcv._tab.Pos)) + } + return 0 +} + +func (rcv *Column) MutateColumnType(n ColumnType) bool { + return rcv._tab.MutateInt32Slot(6, int32(n)) +} + +func (rcv *Column) FieldType() FieldType { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + return FieldType(rcv._tab.GetInt32(o + rcv._tab.Pos)) + } + return 0 +} + +func (rcv *Column) MutateFieldType(n FieldType) bool { + return rcv._tab.MutateInt32Slot(8, int32(n)) +} + +func (rcv *Column) ColValues(obj *Values) *Values { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + x := rcv._tab.Indirect(o + rcv._tab.Pos) + if obj == nil { + obj = new(Values) + } + obj.Init(rcv._tab.Bytes, x) + return obj + } + return nil +} + +func (rcv *Column) Nullbits(j int) byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1)) + } + return 0 +} + +func (rcv *Column) NullbitsLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Column) NullbitsBytes() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Column) MutateNullbits(j int, n byte) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n) + } + return false +} + +func ColumnStart(builder *flatbuffers.Builder) { + builder.StartObject(5) +} +func ColumnAddName(builder *flatbuffers.Builder, name flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(name), 0) +} +func ColumnAddColumnType(builder *flatbuffers.Builder, columnType ColumnType) { + builder.PrependInt32Slot(1, int32(columnType), 0) +} +func ColumnAddFieldType(builder *flatbuffers.Builder, fieldType FieldType) { + builder.PrependInt32Slot(2, int32(fieldType), 0) +} +func ColumnAddColValues(builder *flatbuffers.Builder, colValues flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(colValues), 0) +} +func ColumnAddNullbits(builder *flatbuffers.Builder, nullbits flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(4, flatbuffers.UOffsetT(nullbits), 0) +} +func ColumnStartNullbitsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(1, numElems, 1) +} +func ColumnEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} + +type Table struct { + _tab flatbuffers.Table +} + +func GetRootAsTable(buf []byte, offset flatbuffers.UOffsetT) *Table { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Table{} + x.Init(buf, n+offset) + return x +} + +func FinishTableBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.Finish(offset) +} + +func GetSizePrefixedRootAsTable(buf []byte, offset flatbuffers.UOffsetT) *Table { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &Table{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func FinishSizePrefixedTableBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.FinishSizePrefixed(offset) +} + +func (rcv *Table) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Table) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Table) Tab() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Table) Columns(obj *Column, j int) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + x := rcv._tab.Vector(o) + x += flatbuffers.UOffsetT(j) * 4 + x = rcv._tab.Indirect(x) + obj.Init(rcv._tab.Bytes, x) + return true + } + return false +} + +func (rcv *Table) ColumnsLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Table) NumRows() uint64 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + return rcv._tab.GetUint64(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *Table) MutateNumRows(n uint64) bool { + return rcv._tab.MutateUint64Slot(8, n) +} + +func TableStart(builder *flatbuffers.Builder) { + builder.StartObject(3) +} +func TableAddTab(builder *flatbuffers.Builder, tab flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(tab), 0) +} +func TableAddColumns(builder *flatbuffers.Builder, columns flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(columns), 0) +} +func TableStartColumnsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func TableAddNumRows(builder *flatbuffers.Builder, numRows uint64) { + builder.PrependUint64Slot(2, numRows, 0) +} +func TableEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} + +type Points struct { + _tab flatbuffers.Table +} + +func GetRootAsPoints(buf []byte, offset flatbuffers.UOffsetT) *Points { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Points{} + x.Init(buf, n+offset) + return x +} + +func FinishPointsBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.Finish(offset) +} + +func GetSizePrefixedRootAsPoints(buf []byte, offset flatbuffers.UOffsetT) *Points { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &Points{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func FinishSizePrefixedPointsBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.FinishSizePrefixed(offset) +} + +func (rcv *Points) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Points) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Points) Db() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Points) Tables(obj *Table, j int) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + x := rcv._tab.Vector(o) + x += flatbuffers.UOffsetT(j) * 4 + x = rcv._tab.Indirect(x) + obj.Init(rcv._tab.Bytes, x) + return true + } + return false +} + +func (rcv *Points) TablesLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func PointsStart(builder *flatbuffers.Builder) { + builder.StartObject(2) +} +func PointsAddDb(builder *flatbuffers.Builder, db flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(db), 0) +} +func PointsAddTables(builder *flatbuffers.Builder, tables flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(tables), 0) +} +func PointsStartTablesVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func PointsEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/plugins/inputs/cnosdb_subscription/protos/protobuf/kv_service.proto b/plugins/inputs/cnosdb_subscription/protos/protobuf/kv_service.proto new file mode 100644 index 0000000000000..fd137c35e131c --- /dev/null +++ b/plugins/inputs/cnosdb_subscription/protos/protobuf/kv_service.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +package kv_service; +option go_package = ".;protos"; + +message SubscriptionRequest { + uint32 precision = 1; + bytes table_schema = 2; + bytes record_data = 3; +} + +message SubscriptionResponse {} + +service TSKVService { + rpc WriteSubscription(stream SubscriptionRequest) returns (stream SubscriptionResponse){}; +} diff --git a/plugins/inputs/cnosdb_subscription/sample.conf b/plugins/inputs/cnosdb_subscription/sample.conf new file mode 100644 index 0000000000000..1ea7fd35b0cdc --- /dev/null +++ b/plugins/inputs/cnosdb_subscription/sample.conf @@ -0,0 +1,4 @@ +# CnosDB subscription gRPC service to receive CnosDB subscription push requests. +[[inputs.cnosdb_subscription]] + # Address and port to host a listener on + service_address = ":8803"