Skip to content

Commit

Permalink
feat(output): add cnosdb_subscription plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
zipper-meng committed Aug 1, 2024
1 parent b389ac4 commit 5cbd388
Show file tree
Hide file tree
Showing 12 changed files with 1,559 additions and 0 deletions.
5 changes: 5 additions & 0 deletions plugins/inputs/all/cnosdb_subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//go:build !custom || inputs || inputs.bcache

package all

import _ "github.com/influxdata/telegraf/plugins/inputs/cnosdb_subscription" // register plugin
21 changes: 21 additions & 0 deletions plugins/inputs/cnosdb_subscription/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# CnosDB Subscription Input Plugin

## Build

To compile this plugin it requires protoc-gen-go and protoc-gen-go-grpc

```shell
# install protoc-gen-go
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
# install protoc-gen-go-grpc
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
```

## Usages

To listen on port 8803:

```toml
[[inputs.cnosdb_subscription]]
service_address = ":8803"
```
87 changes: 87 additions & 0 deletions plugins/inputs/cnosdb_subscription/cnosdb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
//go:generate ../../../tools/readme_config_includer/generator
package cnosdb_subscription

import (
_ "embed"
"fmt"
"net"
"sync"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/inputs/cnosdb_subscription/protos"
"google.golang.org/grpc"
)

func init() {
inputs.Add("cnosdb_subscription", func() telegraf.Input {
return &CnosDbSubscription{
ServiceAddress: ":8803",
}
})
}

//go:embed sample.conf
var sampleConfig string

type CnosDbSubscription struct {
ServiceAddress string `toml:"service_address"`
Timeout config.Duration `toml:"timeout"`

Log telegraf.Logger `toml:"-"`

wg sync.WaitGroup `toml:"-"`

listener net.Listener `toml:"-"`
grpcServer *grpc.Server `toml:"-"`
}

func (*CnosDbSubscription) SampleConfig() string {
return sampleConfig
}

func (c *CnosDbSubscription) Init() error {
c.Log.Info("Initialization completed.")
return nil
}

func (c *CnosDbSubscription) Gather(_ telegraf.Accumulator) error {
return nil
}

func (c *CnosDbSubscription) Start(acc telegraf.Accumulator) error {
c.grpcServer = grpc.NewServer(grpc.MaxRecvMsgSize(10 * 1024 * 1024))
protos.RegisterTSKVServiceServer(c.grpcServer, NewTSKVService(acc))

if c.listener == nil {
listener, err := net.Listen("tcp", c.ServiceAddress)
if err != nil {
return err
}
c.listener = listener
}

c.wg.Add(1)
go func() {
defer c.wg.Done()
if err := c.grpcServer.Serve(c.listener); err != nil {
acc.AddError(fmt.Errorf("failed to stop CnosDbSubscription gRPC service: %w", err))
}
}()

c.Log.Infof("Listening on %s", c.listener.Addr().String())

return nil
}

func (c *CnosDbSubscription) Stop() {
if c.grpcServer != nil {
c.grpcServer.Stop()
}
c.wg.Wait()
}

func (c *CnosDbSubscription) MarkHighPriority() {
// Do nothing
}
124 changes: 124 additions & 0 deletions plugins/inputs/cnosdb_subscription/cnosdb/tskv_table_schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package cnosdb

type ColumnType int

const (
ColumnTypeUnknown ColumnType = iota
ColumnTypeTag
ColumnTypeTime
ColumnTypeFieldUnknown
ColumnTypeFieldFloat
ColumnTypeFieldInteger
ColumnTypeFieldUnsigned
ColumnTypeFieldBoolean
ColumnTypeFieldString
ColumnTypeFieldGeometry
)

type TimeUnit int

const (
TimeUnitUnknown TimeUnit = iota
TimeUnitSecond
TimeUnitMillisecond
TimeUnitMicrosecond
TimeUnitNanosecond
)

type TskvTableSchema struct {
Tenant string `json:"tenant"`
Db string `json:"db"`
Name string `json:"name"`
SchemaVersion uint64 `json:"schema_version"`
NextColumnID uint32 `json:"next_column_id"`
Columns []TableColumn `json:"columns"`
ColumnsIndex map[string]uint32 `json:"columns_index"`
}

type TableColumn struct {
ID uint64 `json:"id"`
Name string `json:"name"`
ColumnType interface{} `json:"column_type"`
Encoding interface{} `json:"encoding"`
}

type ColumnTypeUnited struct {
ColumnType ColumnType
TimeUnit TimeUnit
}

func (c *TableColumn) GetColumnTypeUnited() ColumnTypeUnited {
switch columnType := c.ColumnType.(type) {
case string:
if columnType == "Tag" {
// "column_type": "Tag"
return ColumnTypeUnited{
ColumnType: ColumnTypeTag,
TimeUnit: TimeUnitUnknown,
}
}
case map[string]interface{}:
if timeUnitObj := columnType["Time"]; timeUnitObj != nil {
// "column_type": {"Time":"Microsecond"}
if timeUnit, ok := timeUnitObj.(string); ok {
timeUnitCode := TimeUnitUnknown
switch timeUnit {
case "Second":
timeUnitCode = TimeUnitSecond
case "Millisecond":
timeUnitCode = TimeUnitMillisecond
case "Microsecond":
timeUnitCode = TimeUnitMicrosecond
case "Nanosecond":
timeUnitCode = TimeUnitNanosecond
}
return ColumnTypeUnited{
ColumnType: ColumnTypeTime,
TimeUnit: timeUnitCode,
}
}
} else if fieldTypeObj := columnType["Field"]; fieldTypeObj != nil {
fieldTypeCode := ColumnTypeFieldUnknown
switch fieldType := fieldTypeObj.(type) {
case string:
switch fieldType {
case "Float":
// "column_type": {"Field":"Float"}
fieldTypeCode = ColumnTypeFieldFloat
case "Integer":
// "column_type": {"Field":"Integer"}
fieldTypeCode = ColumnTypeFieldInteger
case "Unsigned":
// "column_type": {"Field":"Unsigned"}
fieldTypeCode = ColumnTypeFieldUnsigned
case "Boolean":
// "column_type": {"Field":"Boolean"}
fieldTypeCode = ColumnTypeFieldBoolean
case "String":
// "column_type": {"Field":"String"}
fieldTypeCode = ColumnTypeFieldString
case "Geometry":
// "column_type": {"Field":"Geometry"}
fieldTypeCode = ColumnTypeFieldGeometry
case "Unknown":
// "column_type": {"Field":"Unknown"}
fieldTypeCode = ColumnTypeFieldUnknown
}
case map[string]interface{}:
if geometryInfo := fieldType["Geometry"]; geometryInfo != nil {
// "column_type": {"Field":{"Geometry":{"sub_type":"Point","srid":10}}}
fieldTypeCode = ColumnTypeFieldGeometry
}
}
return ColumnTypeUnited{
ColumnType: fieldTypeCode,
TimeUnit: TimeUnitUnknown,
}
}
}

return ColumnTypeUnited{
ColumnType: ColumnTypeUnknown,
TimeUnit: TimeUnitUnknown,
}
}
Loading

0 comments on commit 5cbd388

Please sign in to comment.