Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ticdc: Support Vector data type #11620

Merged
merged 45 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
ba1663c
vector support
wk989898 Aug 12, 2024
20d340b
update
wk989898 Aug 12, 2024
2283416
.
wk989898 Aug 15, 2024
e3ec642
add dep
wk989898 Aug 22, 2024
2ce0160
Merge remote-tracking branch 'origin' into vector
wk989898 Aug 22, 2024
aca6b17
update
wk989898 Aug 22, 2024
826a24d
update
wk989898 Aug 26, 2024
173ddcc
Merge branch 'master' of https://github.com/wk989898/tiflow into vector
wk989898 Aug 26, 2024
d018cab
update
wk989898 Aug 27, 2024
352c435
fmt
wk989898 Aug 29, 2024
7686261
fix
wk989898 Aug 30, 2024
d6d5df6
add test
wk989898 Aug 30, 2024
68a0417
update test
wk989898 Sep 3, 2024
473ea11
update
wk989898 Sep 4, 2024
0536332
sink(ticdc): use admin statement to query async ddl status (#11535)
CharlesCheung96 Sep 2, 2024
17dd162
update
wk989898 Sep 5, 2024
3d41f03
chore
wk989898 Sep 6, 2024
3402020
update
wk989898 Sep 6, 2024
fd8d907
.
wk989898 Sep 6, 2024
4a5d301
test(ticdc,dm): modify download-integration-test-binaries script (#11…
wk989898 Sep 5, 2024
059bd0f
update
wk989898 Sep 6, 2024
ed6cf61
add test
wk989898 Sep 6, 2024
8ab796c
fix test
wk989898 Sep 6, 2024
d91e22d
chore
wk989898 Sep 7, 2024
a483fc2
fix
wk989898 Sep 8, 2024
d8ff62b
.
wk989898 Sep 8, 2024
d195be6
lint
wk989898 Sep 8, 2024
0054111
debug
wk989898 Sep 9, 2024
b43a678
revert
wk989898 Sep 9, 2024
0f38fa8
revert
wk989898 Sep 9, 2024
cae2309
update sync_diff_inspector_url
wk989898 Sep 11, 2024
f072df3
update test
wk989898 Sep 11, 2024
d70369d
test(ticdc): fix data inconsistence on integration_tests (#11584)
wk989898 Sep 11, 2024
c134bc0
update
wk989898 Sep 11, 2024
4b3bb17
set HasVectorType default false
wk989898 Sep 12, 2024
c157b20
chore: update print log
wk989898 Sep 12, 2024
d97adf5
fix test
wk989898 Sep 12, 2024
15ddca3
Revert "sink(ticdc): use admin statement to query async ddl status (#…
wk989898 Sep 20, 2024
8fb9173
remove unused code
wk989898 Sep 20, 2024
6f708fa
Merge branch 'master' into vector
wk989898 Sep 23, 2024
c2750a7
Merge branch 'vector' of https://github.com/wk989898/tiflow into vector
wk989898 Sep 23, 2024
9a4b4b6
Merge branch 'master' into vector
wk989898 Sep 23, 2024
acd2679
fix deps
wk989898 Sep 24, 2024
20ca15a
update
wk989898 Sep 24, 2024
1a3136a
update sync_diff_inspector_url
wk989898 Sep 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cdc/entry/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@
byteSize := (ft.GetFlen() + 7) >> 3
datum.SetUint64(0)
datum.SetMysqlBit(types.NewBinaryLiteralFromUint(val, byteSize))
case mysql.TypeTiDBVectorFloat32:
datum.SetVectorFloat32(types.ZeroVectorFloat32)
return datum, nil

Check warning on line 215 in cdc/entry/codec.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/codec.go#L213-L215

Added lines #L213 - L215 were not covered by tests
}
return datum, nil
}
5 changes: 5 additions & 0 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,8 @@
return types.NewFloat32Datum(value.(float32)), nil
case mysql.TypeDouble:
return types.NewFloat64Datum(value.(float64)), nil
case mysql.TypeTiDBVectorFloat32:
return types.NewVectorFloat32Datum(value.(types.VectorFloat32)), nil

Check warning on line 596 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L595-L596

Added lines #L595 - L596 were not covered by tests
default:
log.Panic("unexpected mysql type found", zap.Any("type", ft.GetType()))
}
Expand Down Expand Up @@ -888,6 +890,9 @@
}
const sizeOfV = unsafe.Sizeof(v)
return v, int(sizeOfV), warn, nil
case mysql.TypeTiDBVectorFloat32:
b := datum.GetVectorFloat32()
return b, b.Len(), "", nil
default:
// NOTICE: GetValue() may return some types that go sql not support, which will cause sink DML fail
// Make specified convert upper if you need
Expand Down
9 changes: 9 additions & 0 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1682,4 +1682,13 @@ func TestFormatColVal(t *testing.T) {
require.NoError(t, err)
require.Equal(t, float32(0), value)
require.NotZero(t, warn)

vector, _ := types.ParseVectorFloat32("[1,2,3,4,5]")
ftTypeVector := types.NewFieldType(mysql.TypeTiDBVectorFloat32)
col = &timodel.ColumnInfo{FieldType: *ftTypeVector}
datum.SetVectorFloat32(vector)
value, _, warn, err = formatColVal(datum, col)
require.NoError(t, err)
require.Equal(t, vector, value)
require.Zero(t, warn)
}
64 changes: 64 additions & 0 deletions cdc/sink/ddlsink/mysql/format_ddl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package mysql

import (
"bytes"

"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/format"
"github.com/pingcap/tidb/pkg/parser/mysql"
"go.uber.org/zap"
)

type visiter struct{}

func (f *visiter) Enter(n ast.Node) (node ast.Node, skipChildren bool) {
switch v := n.(type) {
case *ast.ColumnDef:
if v.Tp != nil {
switch v.Tp.GetType() {
case mysql.TypeTiDBVectorFloat32:
v.Tp.SetType(mysql.TypeLongBlob)
v.Tp.SetCharset("")
v.Tp.SetCollate("")
v.Tp.SetFlen(-1)
v.Options = []*ast.ColumnOption{} // clear COMMENT
}
}
}
return n, false
}

func (f *visiter) Leave(n ast.Node) (node ast.Node, ok bool) {
return n, true
}

func formatQuery(sql string) string {
p := parser.New()
stmt, err := p.ParseOneStmt(sql, "", "")
if err != nil {
log.Error("format query parse one stmt failed", zap.Error(err))
}
stmt.Accept(&visiter{})

buf := new(bytes.Buffer)
restoreCtx := format.NewRestoreCtx(format.DefaultRestoreFlags, buf)
if err = stmt.Restore(restoreCtx); err != nil {
log.Error("format query restore failed", zap.Error(err))
}
return buf.String()

Check warning on line 63 in cdc/sink/ddlsink/mysql/format_ddl.go

View check run for this annotation

Codecov / codecov/patch

cdc/sink/ddlsink/mysql/format_ddl.go#L50-L63

Added lines #L50 - L63 were not covered by tests
}
47 changes: 47 additions & 0 deletions cdc/sink/ddlsink/mysql/format_ddl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package mysql

import (
"bytes"
"testing"

"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/format"
"github.com/stretchr/testify/require"
)

func TestFormatQuery(t *testing.T) {
sql := "CREATE TABLE `test` (`id` INT PRIMARY KEY,`data` VECTOR(5))"
expectSQL := "CREATE TABLE `test` (`id` INT PRIMARY KEY,`data` LONGTEXT)"
p := parser.New()
stmt, err := p.ParseOneStmt(sql, "", "")
require.NoError(t, err)
stmt.Accept(&visiter{})

buf := new(bytes.Buffer)
restoreCtx := format.NewRestoreCtx(format.DefaultRestoreFlags, buf)
err = stmt.Restore(restoreCtx)
require.NoError(t, err)
require.Equal(t, buf.String(), expectSQL)
}

func BenchmarkFormatQuery(b *testing.B) {
sql := "CREATE TABLE `test` (`id` INT PRIMARY KEY,`data` LONGTEXT)"
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
formatQuery(sql)
}
}
38 changes: 38 additions & 0 deletions cdc/sink/ddlsink/mysql/mysql_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
"net/url"
"time"

"github.com/coreos/go-semver/semver"
lru "github.com/hashicorp/golang-lru"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/dumpling/export"
timodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/ddlsink"
Expand All @@ -42,6 +45,8 @@

// networkDriftDuration is used to construct a context timeout for database operations.
networkDriftDuration = 5 * time.Second

defaultSupportVectorVersion = "8.3.0"
)

// GetDBConnImpl is the implementation of pmysql.IDBConnectionFactory.
Expand All @@ -66,6 +71,8 @@
// is running in downstream.
// map: model.TableName -> timodel.ActionType
lastExecutedNormalDDLCache *lru.Cache

needFormat bool
}

// NewDDLSink creates a new DDLSink.
Expand Down Expand Up @@ -102,12 +109,14 @@
if err != nil {
return nil, err
}

m := &DDLSink{
id: changefeedID,
db: db,
cfg: cfg,
statistics: metrics.NewStatistics(changefeedID, sink.TxnSink),
lastExecutedNormalDDLCache: lruCache,
needFormat: needFormatDDL(db, cfg),
}

log.Info("MySQL DDL sink is created",
Expand Down Expand Up @@ -195,6 +204,14 @@

shouldSwitchDB := needSwitchDB(ddl)

// Convert vector type to string type for unsupport database
if m.needFormat {
if newQuery := formatQuery(ddl.Query); newQuery != ddl.Query {
log.Warn("format ddl query", zap.String("newQuery", newQuery), zap.String("query", ddl.Query), zap.String("collate", ddl.Collate), zap.String("charset", ddl.Charset))
ddl.Query = newQuery
}

Check warning on line 212 in cdc/sink/ddlsink/mysql/mysql_ddl_sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/sink/ddlsink/mysql/mysql_ddl_sink.go#L209-L212

Added lines #L209 - L212 were not covered by tests
}

failpoint.Inject("MySQLSinkExecDDLDelay", func() {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -268,6 +285,27 @@
return true
}

// needFormatDDL checks vector type support
func needFormatDDL(db *sql.DB, cfg *pmysql.Config) bool {
if !cfg.HasVectorType {
log.Warn("please set `has-vector-type` to be true if a column is vector type when the downstream is not TiDB or TiDB version less than specify version",
zap.Any("hasVectorType", cfg.HasVectorType), zap.Any("supportVectorVersion", defaultSupportVectorVersion))
return false
}
versionInfo, err := export.SelectVersion(db)
if err != nil {
log.Warn("fail to get version", zap.Error(err), zap.Bool("isTiDB", cfg.IsTiDB))
return false
}
serverInfo := version.ParseServerInfo(versionInfo)
version := semver.New(defaultSupportVectorVersion)
if !cfg.IsTiDB || serverInfo.ServerVersion.LessThan(*version) {
log.Error("downstream unsupport vector type. it will be converted to longtext", zap.String("version", serverInfo.ServerVersion.String()), zap.String("supportVectorVersion", defaultSupportVectorVersion), zap.Bool("isTiDB", cfg.IsTiDB))
return true
}
return false

Check warning on line 306 in cdc/sink/ddlsink/mysql/mysql_ddl_sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/sink/ddlsink/mysql/mysql_ddl_sink.go#L295-L306

Added lines #L295 - L306 were not covered by tests
}

// WriteCheckpointTs does nothing.
func (m *DDLSink) WriteCheckpointTs(_ context.Context, _ uint64, _ []*model.TableInfo) error {
// Only for RowSink for now.
Expand Down
17 changes: 9 additions & 8 deletions cdc/sink/dmlsink/txn/mysql/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"strings"

"github.com/pingcap/tidb/pkg/parser/charset"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/quotes"
)
Expand Down Expand Up @@ -109,16 +110,16 @@ func prepareReplace(
// will automatically set `_binary` charset for that column, which is not expected.
// See https://github.com/go-sql-driver/mysql/blob/ce134bfc/connection.go#L267
func appendQueryArgs(args []interface{}, col *model.Column) []interface{} {
if col.Charset != "" && col.Charset != charset.CharsetBin {
colValBytes, ok := col.Value.([]byte)
if ok {
args = append(args, string(colValBytes))
} else {
args = append(args, col.Value)
switch v := col.Value.(type) {
case []byte:
if col.Charset != "" && col.Charset != charset.CharsetBin {
args = append(args, string(v))
return args
}
} else {
args = append(args, col.Value)
case types.VectorFloat32:
col.Value = v.String()
}
args = append(args, col.Value)

return args
}
Expand Down
50 changes: 50 additions & 0 deletions cdc/sink/dmlsink/txn/mysql/dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (

"github.com/pingcap/tidb/pkg/parser/charset"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -248,6 +250,37 @@ func TestPrepareUpdate(t *testing.T) {
expectedSQL: "UPDATE `test`.`t1` SET `a` = ?, `b` = ? WHERE `a` = ? AND `b` = ? LIMIT 1",
expectedArgs: []interface{}{2, "世界", 1, "你好"},
},
{
quoteTable: "`test`.`t1`",
preCols: []*model.Column{
{
Name: "a",
Type: mysql.TypeLong,
Flag: model.MultipleKeyFlag | model.HandleKeyFlag,
Value: 1,
},
{
Name: "b",
Type: mysql.TypeTiDBVectorFloat32,
Value: util.Must(types.ParseVectorFloat32("[1.0,-2,0.33,-4.4,55]")),
},
},
cols: []*model.Column{
{
Name: "a",
Type: mysql.TypeLong,
Flag: model.MultipleKeyFlag | model.HandleKeyFlag,
Value: 1,
},
{
Name: "b",
Type: mysql.TypeTiDBVectorFloat32,
Value: util.Must(types.ParseVectorFloat32("[1,2,3,4,5]")),
},
},
expectedSQL: "UPDATE `test`.`t1` SET `a` = ?, `b` = ? WHERE `a` = ? LIMIT 1",
expectedArgs: []interface{}{1, "[1,2,3,4,5]", 1},
},
}
for _, tc := range testCases {
query, args := prepareUpdate(tc.quoteTable, tc.preCols, tc.cols, false)
Expand Down Expand Up @@ -709,6 +742,23 @@ func TestMapReplace(t *testing.T) {
[]byte("你好,世界"),
},
},
{
quoteTable: "`test`.`t1`",
cols: []*model.Column{
{
Name: "a",
Type: mysql.TypeTiDBVectorFloat32,
Value: util.Must(types.ParseVectorFloat32("[1.0,-2,0.3,-4.4,55]")),
},
{
Name: "b",
Type: mysql.TypeTiDBVectorFloat32,
Value: util.Must(types.ParseVectorFloat32("[1,2,3,4,5]")),
},
},
expectedQuery: "REPLACE INTO `test`.`t1` (`a`,`b`) VALUES ",
expectedArgs: []interface{}{"[1,-2,0.3,-4.4,55]", "[1,2,3,4,5]"},
},
}
for _, tc := range testCases {
// multiple times to verify the stability of column sequence in query string
Expand Down
19 changes: 11 additions & 8 deletions cdc/sink/dmlsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"github.com/pingcap/tidb/pkg/parser/charset"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/cdc/sink/metrics"
Expand Down Expand Up @@ -335,17 +336,19 @@
return res
}

func convertBinaryToString(cols []*model.ColumnData, tableInfo *model.TableInfo) {
func convertValue(cols []*model.ColumnData, tableInfo *model.TableInfo) {
for i, col := range cols {
if col == nil {
continue
}
colInfo := tableInfo.ForceGetColumnInfo(col.ColumnID)
if colInfo.GetCharset() != "" && colInfo.GetCharset() != charset.CharsetBin {
colValBytes, ok := col.Value.([]byte)
if ok {
cols[i].Value = string(colValBytes)
switch v := col.Value.(type) {
case []byte:
colInfo := tableInfo.ForceGetColumnInfo(col.ColumnID)
if colInfo.GetCharset() != "" && colInfo.GetCharset() != charset.CharsetBin {
cols[i].Value = string(v)
}
case types.VectorFloat32:
cols[i].Value = v.String()

Check warning on line 351 in cdc/sink/dmlsink/txn/mysql/mysql.go

View check run for this annotation

Codecov / codecov/patch

cdc/sink/dmlsink/txn/mysql/mysql.go#L350-L351

Added lines #L350 - L351 were not covered by tests
}
}
}
Expand All @@ -364,8 +367,8 @@
deleteRow := make([]*sqlmodel.RowChange, 0, preAllocateSize)

for _, row := range event.Event.Rows {
convertBinaryToString(row.Columns, tableInfo)
convertBinaryToString(row.PreColumns, tableInfo)
convertValue(row.Columns, tableInfo)
convertValue(row.PreColumns, tableInfo)

if row.IsInsert() {
insertRow = append(
Expand Down
Loading
Loading