Skip to content

Commit

Permalink
feat(output): cnosdb_subscription plugin add cnosdbv2.4.0 support
Browse files Browse the repository at this point in the history
  • Loading branch information
zipper-meng committed Oct 29, 2024
1 parent 63efb6b commit d528c50
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 46 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (s TSKVServiceServerImpl) WriteSubscription(server kv_service.TSKVService_W

var tableSchema cnosdb_v4.TskvTableSchema
if err := json.Unmarshal(req.TableSchema, &tableSchema); err != nil {
s.accumulator.AddError(fmt.Errorf("failed to parse TskvTableSchema: %w", err))
return server.Send(&kv_service.SubscriptionResponse{})
}

Expand Down
77 changes: 63 additions & 14 deletions plugins/inputs/cnosdb_subscription/cnosdb/v4/tskv_table_schema.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package v4

type ColumnType uint32
type ColumnTypeCode uint32

const (
ColumnTypeUnknown ColumnType = iota
ColumnTypeUnknown ColumnTypeCode = iota
ColumnTypeTag
ColumnTypeTime
ColumnTypeFieldUnknown
Expand All @@ -15,36 +15,37 @@ const (
ColumnTypeFieldGeometry
)

type TimeUnit uint32
type TimeUnitCode uint32

const (
TimeUnitUnknown TimeUnit = iota
TimeUnitUnknown TimeUnitCode = iota
TimeUnitSecond
TimeUnitMillisecond
TimeUnitMicrosecond
TimeUnitNanosecond
)

type TskvTableSchema struct {
Tenant string `json:"tenant"`
Db string `json:"db"`
Name string `json:"name"`
SchemaVersion uint64 `json:"schema_version"`
NextColumnID uint32 `json:"next_column_id"`
Columns []TableColumn `json:"columns"`
ColumnsIndex map[string]uint32 `json:"columns_index"`
// Tenant string `json:"tenant"`
// Db string `json:"db"`
Name string `json:"name"`
// SchemaId uint64 `json:"schema_id"` // v2.4.0 field
// SchemaVersion uint64 `json:"schema_version"` // v2.4.1 field
// NextColumnID uint32 `json:"next_column_id"`
Columns []TableColumn `json:"columns"`
// ColumnsIndex map[string]uint32 `json:"columns_index"`
}

type TableColumn struct {
ID uint64 `json:"id"`
Name string `json:"name"`
ColumnType interface{} `json:"column_type"`
Encoding interface{} `json:"encoding"`
// Encoding interface{} `json:"encoding"`
}

type ColumnTypeUnited struct {
ColumnType ColumnType
TimeUnit TimeUnit
ColumnType ColumnTypeCode
TimeUnit TimeUnitCode
}

func (c *TableColumn) GetColumnTypeUnited() ColumnTypeUnited {
Expand All @@ -56,6 +57,54 @@ func (c *TableColumn) GetColumnTypeUnited() ColumnTypeUnited {
ColumnType: ColumnTypeTag,
TimeUnit: TimeUnitUnknown,
}
} else {
// In cnosdb-v2.4.0, columnType is string
// After cnosdb-v2.4.0, columnType is string or object
columnTypeCode := ColumnTypeUnknown
timeUnitCode := TimeUnitUnknown
switch columnType {
case "TAG_STRING":
// "column_type": "TAG_STRING"
return ColumnTypeUnited{
ColumnType: ColumnTypeTag,
TimeUnit: TimeUnitUnknown,
}
case "FIELD_STRING":
// "column_type": "FIELD_STRING"
columnTypeCode = ColumnTypeFieldString
case "FIELD_BIGINT":
// "column_type": "FIELD_BIGINT""
columnTypeCode = ColumnTypeFieldInteger
case "FIELD_BIGINT UNSIGNED":
// "column_type": "FIELD_BIGINT UNSIGNED""
columnTypeCode = ColumnTypeFieldUnsigned
case "FIELD_DOUBLE":
// "column_type": "FIELD_STRING"
columnTypeCode = ColumnTypeFieldFloat
case "FIELD_BOOLEAN":
// "column_type": "FIELD_BOOLEAN""
columnTypeCode = ColumnTypeFieldBoolean
case "TIME_TIMESTAMP(SECOND)":
// "column_type": "TIME_TIMESTAMP(SECOND)""
columnTypeCode = ColumnTypeTime
timeUnitCode = TimeUnitSecond
case "TIME_TIMESTAMP(MILLISECOND)":
// "column_type": "TIME_TIMESTAMP(MILLISECOND)""
columnTypeCode = ColumnTypeTime
timeUnitCode = TimeUnitMillisecond
case "TIME_TIMESTAMP(MICROSECOND)":
// "column_type": "TIME_TIMESTAMP(MICROSECOND)""
columnTypeCode = ColumnTypeTime
timeUnitCode = TimeUnitMicrosecond
case "TIME_TIMESTAMP(NANOSECOND)":
// "column_type": "TIME_TIMESTAMP(NANOSECOND)""
columnTypeCode = ColumnTypeTime
timeUnitCode = TimeUnitNanosecond
}
return ColumnTypeUnited{
ColumnType: columnTypeCode,
TimeUnit: timeUnitCode,
}
}
case map[string]interface{}:
if timeUnitObj := columnType["Time"]; timeUnitObj != nil {
Expand Down
8 changes: 5 additions & 3 deletions plugins/inputs/cnosdb_subscription/protocol/kv_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ message WritePointsResponse {

// CnosDB subscription v4 message.
message SubscriptionRequest {
uint32 precision = 1;
bytes table_schema = 2;
bytes record_data = 3;
string tenant = 1;
string db = 2;
bytes table_schema = 3;
bytes record_data = 4;
uint32 precision = 5;
}

// CnosDB subscription v4 message.
Expand Down

0 comments on commit d528c50

Please sign in to comment.