diff --git a/cdc/entry/codec.go b/cdc/entry/codec.go index 7224957d4d1..7828b7a1665 100644 --- a/cdc/entry/codec.go +++ b/cdc/entry/codec.go @@ -210,6 +210,9 @@ func unflatten(datum types.Datum, ft *types.FieldType, loc *time.Location) (type byteSize := (ft.GetFlen() + 7) >> 3 datum.SetUint64(0) datum.SetMysqlBit(types.NewBinaryLiteralFromUint(val, byteSize)) + case mysql.TypeTiDBVectorFloat32: + datum.SetVectorFloat32(types.ZeroVectorFloat32) + return datum, nil } return datum, nil } diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index 3d1fe7ac631..b351291ca5f 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -592,6 +592,8 @@ func newDatum(value interface{}, ft types.FieldType) (types.Datum, error) { return types.NewFloat32Datum(value.(float32)), nil case mysql.TypeDouble: return types.NewFloat64Datum(value.(float64)), nil + case mysql.TypeTiDBVectorFloat32: + return types.NewVectorFloat32Datum(value.(types.VectorFloat32)), nil default: log.Panic("unexpected mysql type found", zap.Any("type", ft.GetType())) } @@ -888,6 +890,9 @@ func formatColVal(datum types.Datum, col *timodel.ColumnInfo) ( } const sizeOfV = unsafe.Sizeof(v) return v, int(sizeOfV), warn, nil + case mysql.TypeTiDBVectorFloat32: + b := datum.GetVectorFloat32() + return b, b.Len(), "", nil default: // NOTICE: GetValue() may return some types that go sql not support, which will cause sink DML fail // Make specified convert upper if you need diff --git a/cdc/entry/mounter_test.go b/cdc/entry/mounter_test.go index e563ba9c533..831607f25ca 100644 --- a/cdc/entry/mounter_test.go +++ b/cdc/entry/mounter_test.go @@ -1682,4 +1682,13 @@ func TestFormatColVal(t *testing.T) { require.NoError(t, err) require.Equal(t, float32(0), value) require.NotZero(t, warn) + + vector, _ := types.ParseVectorFloat32("[1,2,3,4,5]") + ftTypeVector := types.NewFieldType(mysql.TypeTiDBVectorFloat32) + col = &timodel.ColumnInfo{FieldType: *ftTypeVector} + datum.SetVectorFloat32(vector) + value, _, warn, err = formatColVal(datum, col) + require.NoError(t, err) + require.Equal(t, vector, value) + require.Zero(t, warn) } diff --git a/cdc/sink/ddlsink/mysql/format_ddl.go b/cdc/sink/ddlsink/mysql/format_ddl.go new file mode 100644 index 00000000000..8332d18ff28 --- /dev/null +++ b/cdc/sink/ddlsink/mysql/format_ddl.go @@ -0,0 +1,64 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "bytes" + + "github.com/pingcap/log" + "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/parser/format" + "github.com/pingcap/tidb/pkg/parser/mysql" + "go.uber.org/zap" +) + +type visiter struct{} + +func (f *visiter) Enter(n ast.Node) (node ast.Node, skipChildren bool) { + switch v := n.(type) { + case *ast.ColumnDef: + if v.Tp != nil { + switch v.Tp.GetType() { + case mysql.TypeTiDBVectorFloat32: + v.Tp.SetType(mysql.TypeLongBlob) + v.Tp.SetCharset("") + v.Tp.SetCollate("") + v.Tp.SetFlen(-1) + v.Options = []*ast.ColumnOption{} // clear COMMENT + } + } + } + return n, false +} + +func (f *visiter) Leave(n ast.Node) (node ast.Node, ok bool) { + return n, true +} + +func formatQuery(sql string) string { + p := parser.New() + stmt, err := p.ParseOneStmt(sql, "", "") + if err != nil { + log.Error("format query parse one stmt failed", zap.Error(err)) + } + stmt.Accept(&visiter{}) + + buf := new(bytes.Buffer) + restoreCtx := format.NewRestoreCtx(format.DefaultRestoreFlags, buf) + if err = stmt.Restore(restoreCtx); err != nil { + log.Error("format query restore failed", zap.Error(err)) + } + return buf.String() +} diff --git a/cdc/sink/ddlsink/mysql/format_ddl_test.go b/cdc/sink/ddlsink/mysql/format_ddl_test.go new file mode 100644 index 00000000000..2059877ce79 --- /dev/null +++ b/cdc/sink/ddlsink/mysql/format_ddl_test.go @@ -0,0 +1,47 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "bytes" + "testing" + + "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tidb/pkg/parser/format" + "github.com/stretchr/testify/require" +) + +func TestFormatQuery(t *testing.T) { + sql := "CREATE TABLE `test` (`id` INT PRIMARY KEY,`data` VECTOR(5))" + expectSQL := "CREATE TABLE `test` (`id` INT PRIMARY KEY,`data` LONGTEXT)" + p := parser.New() + stmt, err := p.ParseOneStmt(sql, "", "") + require.NoError(t, err) + stmt.Accept(&visiter{}) + + buf := new(bytes.Buffer) + restoreCtx := format.NewRestoreCtx(format.DefaultRestoreFlags, buf) + err = stmt.Restore(restoreCtx) + require.NoError(t, err) + require.Equal(t, buf.String(), expectSQL) +} + +func BenchmarkFormatQuery(b *testing.B) { + sql := "CREATE TABLE `test` (`id` INT PRIMARY KEY,`data` LONGTEXT)" + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + formatQuery(sql) + } +} diff --git a/cdc/sink/ddlsink/mysql/mysql_ddl_sink.go b/cdc/sink/ddlsink/mysql/mysql_ddl_sink.go index f65c1c6aaa7..e35251163ed 100644 --- a/cdc/sink/ddlsink/mysql/mysql_ddl_sink.go +++ b/cdc/sink/ddlsink/mysql/mysql_ddl_sink.go @@ -20,9 +20,12 @@ import ( "net/url" "time" + "github.com/coreos/go-semver/semver" lru "github.com/hashicorp/golang-lru" "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/version" + "github.com/pingcap/tidb/dumpling/export" timodel "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/ddlsink" @@ -42,6 +45,8 @@ const ( // networkDriftDuration is used to construct a context timeout for database operations. networkDriftDuration = 5 * time.Second + + defaultSupportVectorVersion = "8.3.0" ) // GetDBConnImpl is the implementation of pmysql.IDBConnectionFactory. @@ -66,6 +71,8 @@ type DDLSink struct { // is running in downstream. // map: model.TableName -> timodel.ActionType lastExecutedNormalDDLCache *lru.Cache + + needFormat bool } // NewDDLSink creates a new DDLSink. @@ -102,12 +109,14 @@ func NewDDLSink( if err != nil { return nil, err } + m := &DDLSink{ id: changefeedID, db: db, cfg: cfg, statistics: metrics.NewStatistics(changefeedID, sink.TxnSink), lastExecutedNormalDDLCache: lruCache, + needFormat: needFormatDDL(db, cfg), } log.Info("MySQL DDL sink is created", @@ -195,6 +204,14 @@ func (m *DDLSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error { shouldSwitchDB := needSwitchDB(ddl) + // Convert vector type to string type for unsupport database + if m.needFormat { + if newQuery := formatQuery(ddl.Query); newQuery != ddl.Query { + log.Warn("format ddl query", zap.String("newQuery", newQuery), zap.String("query", ddl.Query), zap.String("collate", ddl.Collate), zap.String("charset", ddl.Charset)) + ddl.Query = newQuery + } + } + failpoint.Inject("MySQLSinkExecDDLDelay", func() { select { case <-ctx.Done(): @@ -268,6 +285,27 @@ func needSwitchDB(ddl *model.DDLEvent) bool { return true } +// needFormatDDL checks vector type support +func needFormatDDL(db *sql.DB, cfg *pmysql.Config) bool { + if !cfg.HasVectorType { + log.Warn("please set `has-vector-type` to be true if a column is vector type when the downstream is not TiDB or TiDB version less than specify version", + zap.Any("hasVectorType", cfg.HasVectorType), zap.Any("supportVectorVersion", defaultSupportVectorVersion)) + return false + } + versionInfo, err := export.SelectVersion(db) + if err != nil { + log.Warn("fail to get version", zap.Error(err), zap.Bool("isTiDB", cfg.IsTiDB)) + return false + } + serverInfo := version.ParseServerInfo(versionInfo) + version := semver.New(defaultSupportVectorVersion) + if !cfg.IsTiDB || serverInfo.ServerVersion.LessThan(*version) { + log.Error("downstream unsupport vector type. it will be converted to longtext", zap.String("version", serverInfo.ServerVersion.String()), zap.String("supportVectorVersion", defaultSupportVectorVersion), zap.Bool("isTiDB", cfg.IsTiDB)) + return true + } + return false +} + // WriteCheckpointTs does nothing. func (m *DDLSink) WriteCheckpointTs(_ context.Context, _ uint64, _ []*model.TableInfo) error { // Only for RowSink for now. diff --git a/cdc/sink/dmlsink/txn/mysql/dml.go b/cdc/sink/dmlsink/txn/mysql/dml.go index da29618908f..a82d7fc65c4 100644 --- a/cdc/sink/dmlsink/txn/mysql/dml.go +++ b/cdc/sink/dmlsink/txn/mysql/dml.go @@ -17,6 +17,7 @@ import ( "strings" "github.com/pingcap/tidb/pkg/parser/charset" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/quotes" ) @@ -109,16 +110,16 @@ func prepareReplace( // will automatically set `_binary` charset for that column, which is not expected. // See https://github.com/go-sql-driver/mysql/blob/ce134bfc/connection.go#L267 func appendQueryArgs(args []interface{}, col *model.Column) []interface{} { - if col.Charset != "" && col.Charset != charset.CharsetBin { - colValBytes, ok := col.Value.([]byte) - if ok { - args = append(args, string(colValBytes)) - } else { - args = append(args, col.Value) + switch v := col.Value.(type) { + case []byte: + if col.Charset != "" && col.Charset != charset.CharsetBin { + args = append(args, string(v)) + return args } - } else { - args = append(args, col.Value) + case types.VectorFloat32: + col.Value = v.String() } + args = append(args, col.Value) return args } diff --git a/cdc/sink/dmlsink/txn/mysql/dml_test.go b/cdc/sink/dmlsink/txn/mysql/dml_test.go index dab7c4b104f..ffc3c846982 100644 --- a/cdc/sink/dmlsink/txn/mysql/dml_test.go +++ b/cdc/sink/dmlsink/txn/mysql/dml_test.go @@ -18,7 +18,9 @@ import ( "github.com/pingcap/tidb/pkg/parser/charset" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" ) @@ -248,6 +250,37 @@ func TestPrepareUpdate(t *testing.T) { expectedSQL: "UPDATE `test`.`t1` SET `a` = ?, `b` = ? WHERE `a` = ? AND `b` = ? LIMIT 1", expectedArgs: []interface{}{2, "世界", 1, "你好"}, }, + { + quoteTable: "`test`.`t1`", + preCols: []*model.Column{ + { + Name: "a", + Type: mysql.TypeLong, + Flag: model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 1, + }, + { + Name: "b", + Type: mysql.TypeTiDBVectorFloat32, + Value: util.Must(types.ParseVectorFloat32("[1.0,-2,0.33,-4.4,55]")), + }, + }, + cols: []*model.Column{ + { + Name: "a", + Type: mysql.TypeLong, + Flag: model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 1, + }, + { + Name: "b", + Type: mysql.TypeTiDBVectorFloat32, + Value: util.Must(types.ParseVectorFloat32("[1,2,3,4,5]")), + }, + }, + expectedSQL: "UPDATE `test`.`t1` SET `a` = ?, `b` = ? WHERE `a` = ? LIMIT 1", + expectedArgs: []interface{}{1, "[1,2,3,4,5]", 1}, + }, } for _, tc := range testCases { query, args := prepareUpdate(tc.quoteTable, tc.preCols, tc.cols, false) @@ -709,6 +742,23 @@ func TestMapReplace(t *testing.T) { []byte("你好,世界"), }, }, + { + quoteTable: "`test`.`t1`", + cols: []*model.Column{ + { + Name: "a", + Type: mysql.TypeTiDBVectorFloat32, + Value: util.Must(types.ParseVectorFloat32("[1.0,-2,0.3,-4.4,55]")), + }, + { + Name: "b", + Type: mysql.TypeTiDBVectorFloat32, + Value: util.Must(types.ParseVectorFloat32("[1,2,3,4,5]")), + }, + }, + expectedQuery: "REPLACE INTO `test`.`t1` (`a`,`b`) VALUES ", + expectedArgs: []interface{}{"[1,-2,0.3,-4.4,55]", "[1,2,3,4,5]"}, + }, } for _, tc := range testCases { // multiple times to verify the stability of column sequence in query string diff --git a/cdc/sink/dmlsink/txn/mysql/mysql.go b/cdc/sink/dmlsink/txn/mysql/mysql.go index 27fddeeafa7..66d819cb5ab 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/charset" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/dmlsink" "github.com/pingcap/tiflow/cdc/sink/metrics" @@ -335,17 +336,19 @@ func convert2RowChanges( return res } -func convertBinaryToString(cols []*model.ColumnData, tableInfo *model.TableInfo) { +func convertValue(cols []*model.ColumnData, tableInfo *model.TableInfo) { for i, col := range cols { if col == nil { continue } - colInfo := tableInfo.ForceGetColumnInfo(col.ColumnID) - if colInfo.GetCharset() != "" && colInfo.GetCharset() != charset.CharsetBin { - colValBytes, ok := col.Value.([]byte) - if ok { - cols[i].Value = string(colValBytes) + switch v := col.Value.(type) { + case []byte: + colInfo := tableInfo.ForceGetColumnInfo(col.ColumnID) + if colInfo.GetCharset() != "" && colInfo.GetCharset() != charset.CharsetBin { + cols[i].Value = string(v) } + case types.VectorFloat32: + cols[i].Value = v.String() } } } @@ -364,8 +367,8 @@ func (s *mysqlBackend) groupRowsByType( deleteRow := make([]*sqlmodel.RowChange, 0, preAllocateSize) for _, row := range event.Event.Rows { - convertBinaryToString(row.Columns, tableInfo) - convertBinaryToString(row.PreColumns, tableInfo) + convertValue(row.Columns, tableInfo) + convertValue(row.PreColumns, tableInfo) if row.IsInsert() { insertRow = append( diff --git a/cdc/sink/dmlsink/txn/mysql/mysql_test.go b/cdc/sink/dmlsink/txn/mysql/mysql_test.go index 702fc75347c..945c9da5f4b 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql_test.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql_test.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/charset" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/dmlsink" "github.com/pingcap/tiflow/cdc/sink/metrics" @@ -41,6 +42,7 @@ import ( "github.com/pingcap/tiflow/pkg/sink" pmysql "github.com/pingcap/tiflow/pkg/sink/mysql" "github.com/pingcap/tiflow/pkg/sqlmodel" + "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" @@ -112,6 +114,17 @@ func TestPrepareDML(t *testing.T) { }, }, [][]int{{1, 2}}) + tableInfoVector := model.BuildTableInfo("common_1", "uk_without_pk", []*model.Column{ + nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, + }, { + Name: "a3", + Type: mysql.TypeTiDBVectorFloat32, + }, + }, [][]int{{1, 2}}) + testCases := []struct { input []*model.RowChangedEvent expected *preparedDMLs @@ -181,6 +194,35 @@ func TestPrepareDML(t *testing.T) { approximateSize: 63, }, }, + // vector type + { + input: []*model.RowChangedEvent{ + { + StartTs: 418658114257813518, + CommitTs: 418658114257813519, + TableInfo: tableInfoVector, + Columns: model.Columns2ColumnDatas( + []*model.Column{ + nil, { + Name: "a1", + Type: mysql.TypeLong, + Value: 1, + }, { + Name: "a3", + Type: mysql.TypeTiDBVectorFloat32, + Value: util.Must(types.ParseVectorFloat32("[1.1,-2,3.33,-4.12,-5]")), + }, + }, tableInfoVector), + }, + }, + expected: &preparedDMLs{ + startTs: []model.Ts{418658114257813518}, + sqls: []string{"INSERT INTO `common_1`.`uk_without_pk` (`a1`,`a3`) VALUES (?,?)"}, + values: [][]interface{}{{1, "[1.1,-2,3.33,-4.12,-5]"}}, + rowCount: 1, + approximateSize: 63, + }, + }, } ms := newMySQLBackendWithoutDB() @@ -1104,6 +1146,14 @@ func TestPrepareBatchDMLs(t *testing.T) { Charset: charset.CharsetGBK, Flag: model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, }}, [][]int{{0, 1}}) + tableInfoWithVector := model.BuildTableInfo("common_1", "uk_without_pk", []*model.Column{{ + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, + }, { + Name: "a3", + Type: mysql.TypeTiDBVectorFloat32, + }}, [][]int{{0, 1}}) testCases := []struct { isTiDB bool input []*model.RowChangedEvent @@ -1418,6 +1468,37 @@ func TestPrepareBatchDMLs(t *testing.T) { approximateSize: 204, }, }, + + // inser vector data + { + isTiDB: true, + input: []*model.RowChangedEvent{ + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + TableInfo: tableInfoWithVector, + Columns: model.Columns2ColumnDatas([]*model.Column{{ + Name: "a1", + Value: 1, + }, { + Name: "a3", + Value: util.Must(types.ParseVectorFloat32("[1,2,3,4,5]")), + }}, tableInfoWithVector), + ApproximateDataSize: 10, + }, + }, + expected: &preparedDMLs{ + startTs: []model.Ts{418658114257813516}, + sqls: []string{ + "INSERT INTO `common_1`.`uk_without_pk` (`a1`,`a3`) VALUES (?,?)", + }, + values: [][]interface{}{ + {1, "[1,2,3,4,5]"}, + }, + rowCount: 1, + approximateSize: 73, + }, + }, } ms := newMySQLBackendWithoutDB() diff --git a/dm/tests/download-compatibility-test-binaries.sh b/dm/tests/download-compatibility-test-binaries.sh index 2ec96afaee8..df29fa3e303 100755 --- a/dm/tests/download-compatibility-test-binaries.sh +++ b/dm/tests/download-compatibility-test-binaries.sh @@ -44,6 +44,19 @@ download() { wget --no-verbose --retry-connrefused --waitretry=1 -t 3 -O "${file_path}" "${url}" } +function get_sha1() { + local repo="$1" + local branch="$2" + file_server_url="http://fileserver.pingcap.net" + sha1=$(curl -s "${file_server_url}/download/refs/pingcap/${repo}/${branch}/sha1") + if [ $? -ne 0 ] || echo "$sha1" | grep -q "Error"; then + echo "Failed to get sha1 with repo ${repo} branch ${branch}: $sha1. use branch master to instead" >&2 + branch=master + sha1=$(curl -s "${file_server_url}/download/refs/pingcap/${repo}/${branch}/sha1") + fi + echo $sha1 +} + # Extract function extract() { local file_name=$1 diff --git a/go.mod b/go.mod index c011031157f..30b5d279c03 100644 --- a/go.mod +++ b/go.mod @@ -398,7 +398,7 @@ require ( ) // Fix https://github.com/pingcap/tiflow/issues/4961 -replace github.com/benbjohnson/clock v1.3.0 => github.com/benbjohnson/clock v1.1.0 +replace github.com/benbjohnson/clock v1.3.5 => github.com/benbjohnson/clock v1.1.0 // copy from TiDB replace go.opencensus.io => go.opencensus.io v0.23.1-0.20220331163232-052120675fac @@ -410,8 +410,6 @@ replace github.com/tildeleb/hashland => leb.io/hashland v0.1.5 replace github.com/chaos-mesh/go-sqlsmith => github.com/PingCAP-QE/go-sqlsmith v0.0.0-20231213065948-336e064b488d -replace gorm.io/driver/mysql v1.4.5 => gorm.io/driver/mysql v1.3.3 - // TODO: `sourcegraph.com/sourcegraph/appdash` has been archived, and the original host has been removed. // Please remove these dependencies. replace sourcegraph.com/sourcegraph/appdash => github.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0 diff --git a/go.sum b/go.sum index d2b21c92f05..446bf8b7f89 100644 --- a/go.sum +++ b/go.sum @@ -152,9 +152,9 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.20.1 h1:U7h9CPoyMfVoN5jUglB0LglCMP10 github.com/aws/aws-sdk-go-v2/service/sts v1.20.1/go.mod h1:BUHusg4cOA1TFGegj7x8/eoWrbdHzJfoMrXcbMQAG0k= github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8= github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= +github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= -github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= -github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bits-and-blooms/bitset v1.4.0 h1:+YZ8ePm+He2pU3dZlIZiOeAKfrBkXi1lSrXJ/Xzgbu8= diff --git a/pkg/sink/codec/avro/avro.go b/pkg/sink/codec/avro/avro.go index 36291ca0248..d7e646b1f5d 100644 --- a/pkg/sink/codec/avro/avro.go +++ b/pkg/sink/codec/avro/avro.go @@ -359,30 +359,31 @@ const ( ) var type2TiDBType = map[byte]string{ - mysql.TypeTiny: "INT", - mysql.TypeShort: "INT", - mysql.TypeInt24: "INT", - mysql.TypeLong: "INT", - mysql.TypeLonglong: "BIGINT", - mysql.TypeFloat: "FLOAT", - mysql.TypeDouble: "DOUBLE", - mysql.TypeBit: "BIT", - mysql.TypeNewDecimal: "DECIMAL", - mysql.TypeTinyBlob: "TEXT", - mysql.TypeMediumBlob: "TEXT", - mysql.TypeBlob: "TEXT", - mysql.TypeLongBlob: "TEXT", - mysql.TypeVarchar: "TEXT", - mysql.TypeVarString: "TEXT", - mysql.TypeString: "TEXT", - mysql.TypeEnum: "ENUM", - mysql.TypeSet: "SET", - mysql.TypeJSON: "JSON", - mysql.TypeDate: "DATE", - mysql.TypeDatetime: "DATETIME", - mysql.TypeTimestamp: "TIMESTAMP", - mysql.TypeDuration: "TIME", - mysql.TypeYear: "YEAR", + mysql.TypeTiny: "INT", + mysql.TypeShort: "INT", + mysql.TypeInt24: "INT", + mysql.TypeLong: "INT", + mysql.TypeLonglong: "BIGINT", + mysql.TypeFloat: "FLOAT", + mysql.TypeDouble: "DOUBLE", + mysql.TypeBit: "BIT", + mysql.TypeNewDecimal: "DECIMAL", + mysql.TypeTinyBlob: "TEXT", + mysql.TypeMediumBlob: "TEXT", + mysql.TypeBlob: "TEXT", + mysql.TypeLongBlob: "TEXT", + mysql.TypeVarchar: "TEXT", + mysql.TypeVarString: "TEXT", + mysql.TypeString: "TEXT", + mysql.TypeEnum: "ENUM", + mysql.TypeSet: "SET", + mysql.TypeJSON: "JSON", + mysql.TypeDate: "DATE", + mysql.TypeDatetime: "DATETIME", + mysql.TypeTimestamp: "TIMESTAMP", + mysql.TypeDuration: "TIME", + mysql.TypeYear: "YEAR", + mysql.TypeTiDBVectorFloat32: "TiDBVECTORFloat32", } func getTiDBTypeFromColumn(col *model.Column) string { @@ -439,6 +440,8 @@ func mysqlTypeFromTiDBType(tidbType string) byte { result = mysql.TypeDuration case "YEAR": result = mysql.TypeYear + case "TiDBVECTORFloat32": + result = mysql.TypeTiDBVectorFloat32 default: log.Panic("this should not happen, unknown TiDB type", zap.String("type", tidbType)) } @@ -812,6 +815,11 @@ func (a *BatchEncoder) columnToAvroSchema( Type: "int", Parameters: map[string]string{tidbType: tt}, }, nil + case mysql.TypeTiDBVectorFloat32: + return avroSchema{ + Type: "string", + Parameters: map[string]string{tidbType: tt}, + }, nil default: log.Error("unknown mysql type", zap.Any("mysqlType", col.Type)) return nil, cerror.ErrAvroEncodeFailed.GenWithStack("unknown mysql type") @@ -971,6 +979,11 @@ func (a *BatchEncoder) columnToAvroData( return int32(n), "int", nil } return int32(col.Value.(int64)), "int", nil + case mysql.TypeTiDBVectorFloat32: + if vec, ok := col.Value.(types.VectorFloat32); ok { + return vec.String(), "string", nil + } + return nil, "", cerror.ErrAvroEncodeFailed default: log.Error("unknown mysql type", zap.Any("value", col.Value), zap.Any("mysqlType", col.Type)) return nil, "", cerror.ErrAvroEncodeFailed.GenWithStack("unknown mysql type") diff --git a/pkg/sink/codec/canal/canal_entry.go b/pkg/sink/codec/canal/canal_entry.go index 0b7ed26dac9..bcb4c7c09b7 100644 --- a/pkg/sink/codec/canal/canal_entry.go +++ b/pkg/sink/codec/canal/canal_entry.go @@ -24,6 +24,7 @@ import ( mm "github.com/pingcap/tidb/pkg/meta/model" timodel "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink/codec/common" @@ -109,6 +110,8 @@ func (b *canalEntryBuilder) formatValue(value interface{}, isBinary bool) (resul } else { result = string(v) } + case types.VectorFloat32: + result = v.String() default: result = fmt.Sprintf("%v", v) } diff --git a/pkg/sink/codec/canal/canal_entry_test.go b/pkg/sink/codec/canal/canal_entry_test.go index 385c595dfcf..43ef22bec0c 100644 --- a/pkg/sink/codec/canal/canal_entry_test.go +++ b/pkg/sink/codec/canal/canal_entry_test.go @@ -35,10 +35,11 @@ func TestInsert(t *testing.T) { name varchar(32), tiny tinyint, comment text, - bb blob)` + bb blob, + vec vector(5))` _ = helper.DDL2Event(sql) - event := helper.DML2Event(`insert into test.t values(1, "Bob", 127, "测试", "测试blob")`, "test", "t") + event := helper.DML2Event(`insert into test.t values(1, "Bob", 127, "测试", "测试blob", '[1,2,3,4,5]')`, "test", "t") codecConfig := common.NewConfig(config.ProtocolCanalJSON) builder := newCanalEntryBuilder(codecConfig) @@ -97,6 +98,13 @@ func TestInsert(t *testing.T) { require.NoError(t, err) require.Equal(t, "测试blob", s) require.Equal(t, "blob", col.GetMysqlType()) + case "vec": + require.Equal(t, int32(internal.JavaSQLTypeVARCHAR), col.GetSqlType()) + require.False(t, col.GetIsKey()) + require.False(t, col.GetIsNull()) + require.NoError(t, err) + require.Equal(t, "[1,2,3,4,5]", col.GetValue()) + require.Equal(t, "vector", col.GetMysqlType()) } } } diff --git a/pkg/sink/codec/canal/canal_json_message.go b/pkg/sink/codec/canal/canal_json_message.go index 8467f7cb6f0..c4ce63f6ddb 100644 --- a/pkg/sink/codec/canal/canal_json_message.go +++ b/pkg/sink/codec/canal/canal_json_message.go @@ -287,6 +287,7 @@ func canalJSONFormatColumn(value interface{}, name string, mysqlTypeStr string) if err != nil { log.Panic("invalid column value for double", zap.Any("col", result), zap.Error(err)) } + case mysql.TypeTiDBVectorFloat32: } result.Value = value diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go index 9a6fafd22ba..d8682314733 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go @@ -19,6 +19,7 @@ import ( "encoding/json" "testing" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/compression" @@ -88,7 +89,12 @@ func TestDMLE2E(t *testing.T) { colName := insertEvent.TableInfo.ForceGetColumnName(col.ColumnID) decoded, ok := decodedColumns[colName] require.True(t, ok) - require.EqualValues(t, col.Value, decoded.Value) + switch v := col.Value.(type) { + case types.VectorFloat32: + require.EqualValues(t, v.String(), decoded.Value) + default: + require.EqualValues(t, v, decoded.Value) + } } err = encoder.AppendRowChangedEvent(ctx, "", updateEvent, func() {}) @@ -256,7 +262,12 @@ func TestCanalJSONClaimCheckE2E(t *testing.T) { colName := insertEvent.TableInfo.ForceGetColumnName(col.ColumnID) decoded, ok := decodedColumns[colName] require.True(t, ok) - require.EqualValues(t, col.Value, decoded.Value) + switch v := col.Value.(type) { + case types.VectorFloat32: + require.EqualValues(t, v.String(), decoded.Value) + default: + require.EqualValues(t, v, decoded.Value) + } } } } @@ -667,7 +678,12 @@ func TestCanalJSONContentCompatibleE2E(t *testing.T) { colName := insertEvent.TableInfo.ForceGetColumnName(col.ColumnID) decoded, ok := obtainedColumns[colName] require.True(t, ok) - require.EqualValues(t, col.Value, decoded.Value) + switch v := col.Value.(type) { + case types.VectorFloat32: + require.EqualValues(t, v.String(), decoded.Value) + default: + require.EqualValues(t, v, decoded.Value) + } } } @@ -722,7 +738,12 @@ func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) { colName := insertEvent.TableInfo.ForceGetColumnName(col.ColumnID) decoded, ok := decodedColumns[colName] require.True(t, ok) - require.EqualValues(t, col.Value, decoded.Value) + switch v := col.Value.(type) { + case types.VectorFloat32: + require.EqualValues(t, v.String(), decoded.Value) + default: + require.EqualValues(t, v, decoded.Value) + } } _, hasNext, _ = decoder.HasNext() diff --git a/pkg/sink/codec/common/verify_checksum.go b/pkg/sink/codec/common/verify_checksum.go index 3e91a7804ba..589ae8680d7 100644 --- a/pkg/sink/codec/common/verify_checksum.go +++ b/pkg/sink/codec/common/verify_checksum.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/log" timodel "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" @@ -215,6 +216,9 @@ func buildChecksumBytes(buf []byte, value interface{}, mysqlType byte) ([]byte, // this should not happen, does not take into the checksum calculation. case mysql.TypeNull, mysql.TypeGeometry: // do nothing + case mysql.TypeTiDBVectorFloat32: + vec, _ := types.ParseVectorFloat32(value.(string)) + buf = vec.SerializeTo(buf) default: return buf, errors.New("invalid type for the checksum calculation") } diff --git a/pkg/sink/codec/craft/message_decoder.go b/pkg/sink/codec/craft/message_decoder.go index 03798826955..2ea6632c301 100644 --- a/pkg/sink/codec/craft/message_decoder.go +++ b/pkg/sink/codec/craft/message_decoder.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" pmodel "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" ) @@ -353,6 +354,10 @@ func DecodeTiDBType(ty byte, flag model.ColumnFlagType, bits []byte) (interface{ fallthrough case mysql.TypeGeometry: return nil, nil + case mysql.TypeTiDBVectorFloat32: + if val, err := types.ParseVectorFloat32(string(bits)); err != nil { + return val, nil + } } return nil, nil } diff --git a/pkg/sink/codec/craft/message_encoder.go b/pkg/sink/codec/craft/message_encoder.go index 92fd3b3d5e4..8f887226bdc 100644 --- a/pkg/sink/codec/craft/message_encoder.go +++ b/pkg/sink/codec/craft/message_encoder.go @@ -19,6 +19,7 @@ import ( "unsafe" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/model" ) @@ -220,6 +221,9 @@ func EncodeTiDBType(allocator *SliceAllocator, ty byte, flag model.ColumnFlagTyp fallthrough case mysql.TypeGeometry: return nil + case mysql.TypeTiDBVectorFloat32: + vec := value.(types.VectorFloat32) + return []byte(vec.String()) } return nil } diff --git a/pkg/sink/codec/csv/csv_message.go b/pkg/sink/codec/csv/csv_message.go index 673cf499ede..7b39b1f247f 100644 --- a/pkg/sink/codec/csv/csv_message.go +++ b/pkg/sink/codec/csv/csv_message.go @@ -362,6 +362,11 @@ func fromColValToCsvVal(csvConfig *common.Config, col *model.Column, ft *types.F return nil, cerror.WrapError(cerror.ErrCSVEncodeFailed, err) } return setVar.Name, nil + case mysql.TypeTiDBVectorFloat32: + if vec, ok := col.Value.(types.VectorFloat32); ok { + return vec.String(), nil + } + return nil, cerror.ErrCSVEncodeFailed default: return col.Value, nil } @@ -440,7 +445,6 @@ func csvMsg2RowChangedEvent(csvConfig *common.Config, csvMsg *csvMessage, tableI if err != nil { return nil, err } - return e, nil } diff --git a/pkg/sink/codec/csv/csv_message_test.go b/pkg/sink/codec/csv/csv_message_test.go index 57fe5f96010..3ce686bcb33 100644 --- a/pkg/sink/codec/csv/csv_message_test.go +++ b/pkg/sink/codec/csv/csv_message_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/sink/codec/common" + "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" ) @@ -629,6 +630,19 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ config.BinaryEncodingBase64, }, }, + { + { + model.Column{Name: "vectorfloat32", Value: util.Must(types.ParseVectorFloat32("[1,2,3,4,5]")), Type: mysql.TypeTiDBVectorFloat32}, + rowcodec.ColInfo{ + ID: 37, + IsPKHandle: false, + VirtualGenCol: false, + Ft: types.NewFieldType(mysql.TypeTiDBVectorFloat32), + }, + "[1,2,3,4,5]", + config.BinaryEncodingBase64, + }, + }, } func setBinChsClnFlag(ft *types.FieldType) *types.FieldType { diff --git a/pkg/sink/codec/debezium/codec.go b/pkg/sink/codec/debezium/codec.go index 22a9bca7940..75c3ada818a 100644 --- a/pkg/sink/codec/debezium/codec.go +++ b/pkg/sink/codec/debezium/codec.go @@ -240,6 +240,13 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteStringField("field", col.Name) }) + case mysql.TypeTiDBVectorFloat32: + writer.WriteObjectElement(func() { + writer.WriteStringField("type", "string") + writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteStringField("name", "io.debezium.data.TiDBVectorFloat32") + writer.WriteStringField("field", col.Name) + }) default: log.Warn( "meet unsupported field type", @@ -502,6 +509,11 @@ func (c *dbzCodec) writeDebeziumFieldValue( return nil } + case mysql.TypeTiDBVectorFloat32: + v := col.Value.(types.VectorFloat32).String() + writer.WriteStringField(col.Name, v) + return nil + // Note: Although Debezium's doc claims to use INT32 for INT, but it // actually uses INT64. Debezium also uses INT32 for SMALLINT. // So we only handle with TypeLonglong here. diff --git a/pkg/sink/codec/internal/column.go b/pkg/sink/codec/internal/column.go index 5aefe9d9a16..0070f3f138c 100644 --- a/pkg/sink/codec/internal/column.go +++ b/pkg/sink/codec/internal/column.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/model" "go.uber.org/zap" ) @@ -61,6 +62,8 @@ func (c *Column) FromRowChangeColumn(col *model.Column) { str = str[1 : len(str)-1] } c.Value = str + case mysql.TypeTiDBVectorFloat32: + c.Value = col.Value.(types.VectorFloat32).String() default: c.Value = col.Value } @@ -98,8 +101,8 @@ func (c *Column) ToRowChangeColumn(name string) *model.Column { zap.Any("col", c), zap.Error(err)) } col.Value = uint64(val) + case mysql.TypeTiDBVectorFloat32: default: - col.Value = c.Value } return col } diff --git a/pkg/sink/codec/internal/java.go b/pkg/sink/codec/internal/java.go index 38ec2e33f6c..b25421fb6ba 100644 --- a/pkg/sink/codec/internal/java.go +++ b/pkg/sink/codec/internal/java.go @@ -143,6 +143,9 @@ func MySQLType2JavaType(mysqlType byte, isBinary bool) JavaSQLType { case mysql.TypeJSON: return JavaSQLTypeVARCHAR + case mysql.TypeTiDBVectorFloat32: + return JavaSQLTypeVARCHAR + default: return JavaSQLTypeVARCHAR } diff --git a/pkg/sink/codec/maxwell/maxwell_message.go b/pkg/sink/codec/maxwell/maxwell_message.go index 985bbb49244..f673ad5b4f1 100644 --- a/pkg/sink/codec/maxwell/maxwell_message.go +++ b/pkg/sink/codec/maxwell/maxwell_message.go @@ -18,6 +18,7 @@ import ( model2 "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink/codec/internal" @@ -84,6 +85,8 @@ func rowChangeToMaxwellMsg(e *model.RowChangedEvent, onlyHandleKeyColumns bool) } else { value.Old[colName] = string(v.Value.([]byte)) } + case mysql.TypeTiDBVectorFloat32: + value.Old[colName] = v.Value.(types.VectorFloat32).String() default: value.Old[colName] = v.Value } @@ -102,6 +105,8 @@ func rowChangeToMaxwellMsg(e *model.RowChangedEvent, onlyHandleKeyColumns bool) } else { value.Data[colName] = string(v.Value.([]byte)) } + case mysql.TypeTiDBVectorFloat32: + value.Data[colName] = v.Value.(types.VectorFloat32).String() default: value.Data[colName] = v.Value } @@ -129,6 +134,11 @@ func rowChangeToMaxwellMsg(e *model.RowChangedEvent, onlyHandleKeyColumns bool) value.Old[colName] = string(v.Value.([]byte)) } } + case mysql.TypeTiDBVectorFloat32: + val := v.Value.(types.VectorFloat32).String() + if value.Old[colName] != val { + value.Old[colName] = val + } default: if value.Data[colName] != v.Value { value.Old[colName] = v.Value @@ -282,6 +292,8 @@ func columnToMaxwellType(columnType byte) (string, error) { return "float", nil case mysql.TypeNewDecimal: return "decimal", nil + case mysql.TypeTiDBVectorFloat32: + return "string", nil default: return "", cerror.ErrMaxwellInvalidData.GenWithStack("unsupported column type - %v", columnType) } diff --git a/pkg/sink/codec/open/open_protocol_encoder_test.go b/pkg/sink/codec/open/open_protocol_encoder_test.go index f1833795999..5ad82506bd2 100644 --- a/pkg/sink/codec/open/open_protocol_encoder_test.go +++ b/pkg/sink/codec/open/open_protocol_encoder_test.go @@ -18,6 +18,7 @@ import ( "database/sql" "testing" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/compression" @@ -420,7 +421,12 @@ func TestE2EClaimCheckMessage(t *testing.T) { colName := insertEvent.TableInfo.ForceGetColumnName(column.ColumnID) decodedColumn, ok := decodedColumns[colName] require.True(t, ok) - require.Equal(t, column.Value, decodedColumn.Value, colName) + switch v := column.Value.(type) { + case types.VectorFloat32: + require.Equal(t, v.String(), decodedColumn.Value, colName) + default: + require.Equal(t, v, decodedColumn.Value, colName) + } } } diff --git a/pkg/sink/codec/simple/encoder_test.go b/pkg/sink/codec/simple/encoder_test.go index 946a5bb2975..b26dece1aff 100644 --- a/pkg/sink/codec/simple/encoder_test.go +++ b/pkg/sink/codec/simple/encoder_test.go @@ -27,6 +27,7 @@ import ( "github.com/golang/mock/gomock" timodel "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/compression" @@ -1387,7 +1388,12 @@ func TestEncodeLargeEventsNormal(t *testing.T) { colName := event.TableInfo.ForceGetColumnName(col.ColumnID) decoded, ok := decodedColumns[colName] require.True(t, ok) - require.EqualValues(t, col.Value, decoded.Value) + switch v := col.Value.(type) { + case types.VectorFloat32: + require.EqualValues(t, v.String(), decoded.Value) + default: + require.EqualValues(t, v, decoded.Value) + } } decodedPreviousColumns := make(map[string]*model.ColumnData, len(decodedRow.PreColumns)) @@ -1399,7 +1405,12 @@ func TestEncodeLargeEventsNormal(t *testing.T) { colName := event.TableInfo.ForceGetColumnName(col.ColumnID) decoded, ok := decodedPreviousColumns[colName] require.True(t, ok) - require.EqualValues(t, col.Value, decoded.Value) + switch v := col.Value.(type) { + case types.VectorFloat32: + require.EqualValues(t, v.String(), decoded.Value) + default: + require.EqualValues(t, v, decoded.Value) + } } } } @@ -1541,7 +1552,12 @@ func TestLargerMessageHandleClaimCheck(t *testing.T) { colName := updateEvent.TableInfo.ForceGetColumnName(col.ColumnID) decoded, ok := decodedColumns[colName] require.True(t, ok) - require.EqualValues(t, col.Value, decoded.Value) + switch v := col.Value.(type) { + case types.VectorFloat32: + require.EqualValues(t, v.String(), decoded.Value, colName) + default: + require.EqualValues(t, v, decoded.Value, colName) + } } for _, column := range decodedRow.PreColumns { @@ -1552,7 +1568,12 @@ func TestLargerMessageHandleClaimCheck(t *testing.T) { colName := updateEvent.TableInfo.ForceGetColumnName(col.ColumnID) decoded, ok := decodedColumns[colName] require.True(t, ok) - require.EqualValues(t, col.Value, decoded.Value) + switch v := col.Value.(type) { + case types.VectorFloat32: + require.EqualValues(t, v.String(), decoded.Value, colName) + default: + require.EqualValues(t, v, decoded.Value, colName) + } } } } @@ -1723,11 +1744,18 @@ func TestLargeMessageHandleKeyOnly(t *testing.T) { case []byte: length := len(decoded.Value.([]uint8)) require.Equal(t, v[:length], decoded.Value, colName) + case types.VectorFloat32: + require.Equal(t, v.String(), decoded.Value, colName) default: - require.EqualValues(t, col.Value, decoded.Value, colName) + require.Equal(t, col.Value, decoded.Value, colName) } } else { - require.EqualValues(t, col.Value, decoded.Value, colName) + switch v := col.Value.(type) { + case []byte: + require.Equal(t, string(v), decoded.Value, colName) + default: + require.Equal(t, v, decoded.Value, colName) + } } } @@ -1746,11 +1774,20 @@ func TestLargeMessageHandleKeyOnly(t *testing.T) { case []byte: length := len(decoded.Value.([]uint8)) require.Equal(t, v[:length], decoded.Value, colName) + case types.VectorFloat32: + require.Equal(t, v.String(), decoded.Value, colName) default: - require.EqualValues(t, col.Value, decoded.Value, colName) + require.Equal(t, col.Value, decoded.Value, colName) } } else { - require.EqualValues(t, col.Value, decoded.Value, colName) + switch v := col.Value.(type) { + case types.VectorFloat32: + require.Equal(t, v.String(), decoded.Value, colName) + case []byte: + require.Equal(t, string(v), decoded.Value, colName) + default: + require.Equal(t, v, decoded.Value, colName) + } } } } diff --git a/pkg/sink/codec/simple/message.go b/pkg/sink/codec/simple/message.go index 26afea31298..3a275cba45d 100644 --- a/pkg/sink/codec/simple/message.go +++ b/pkg/sink/codec/simple/message.go @@ -403,7 +403,7 @@ func buildRowChangedEvent( err := common.VerifyChecksum(result, db) if err != nil || msg.Checksum.Corrupted { log.Warn("consumer detect checksum corrupted", - zap.String("schema", msg.Schema), zap.String("table", msg.Table)) + zap.String("schema", msg.Schema), zap.String("table", msg.Table), zap.Error(err)) return nil, cerror.ErrDecodeFailed.GenWithStackByArgs("checksum corrupted") } @@ -639,6 +639,8 @@ func (a *avroMarshaller) encodeValue4Avro( return v, "double" case string: return v, "string" + case tiTypes.VectorFloat32: + return v.String(), "string" default: log.Panic("unexpected type for avro value", zap.Any("value", value)) } @@ -651,7 +653,6 @@ func encodeValue( if value == nil { return nil } - var err error switch ft.GetType() { case mysql.TypeBit: @@ -715,6 +716,8 @@ func encodeValue( } else { result = string(v) } + case tiTypes.VectorFloat32: + result = v.String() default: result = fmt.Sprintf("%v", v) } @@ -755,7 +758,18 @@ func decodeColumn(value interface{}, id int64, fieldType *types.FieldType) *mode case int64: value = uint64(v) } - case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeInt24, mysql.TypeYear: + case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeInt24: + switch v := value.(type) { + case string: + if mysql.HasUnsignedFlag(fieldType.GetFlag()) { + value, err = strconv.ParseUint(v, 10, 64) + } else { + value, err = strconv.ParseInt(v, 10, 64) + } + default: + value = v + } + case mysql.TypeYear: switch v := value.(type) { case string: value, err = strconv.ParseInt(v, 10, 64) @@ -765,9 +779,10 @@ func decodeColumn(value interface{}, id int64, fieldType *types.FieldType) *mode case mysql.TypeLonglong: switch v := value.(type) { case string: - value, err = strconv.ParseInt(v, 10, 64) - if err != nil { + if mysql.HasUnsignedFlag(fieldType.GetFlag()) { value, err = strconv.ParseUint(v, 10, 64) + } else { + value, err = strconv.ParseInt(v, 10, 64) } case map[string]interface{}: value = uint64(v["value"].(int64)) @@ -777,7 +792,9 @@ func decodeColumn(value interface{}, id int64, fieldType *types.FieldType) *mode case mysql.TypeFloat: switch v := value.(type) { case string: - value, err = strconv.ParseFloat(v, 32) + var val float64 + val, err = strconv.ParseFloat(v, 32) + value = float32(val) default: value = v } diff --git a/pkg/sink/codec/utils/test_utils.go b/pkg/sink/codec/utils/test_utils.go index 4385b411d18..18c38c89186 100644 --- a/pkg/sink/codec/utils/test_utils.go +++ b/pkg/sink/codec/utils/test_utils.go @@ -81,6 +81,7 @@ func NewLargeEvent4Test(t *testing.T, replicaConfig *config.ReplicaConfig) (*mod enumT enum('a', 'b', 'c') default 'b', setT set('a', 'b', 'c') default 'c', bitT bit(10) default b'1010101010', + vectorT vector(5), jsonT json)` ddlEvent := helper.DDL2Event(sql) @@ -138,6 +139,7 @@ func NewLargeEvent4Test(t *testing.T, replicaConfig *config.ReplicaConfig) (*mod 'a', 'b', 65, + '[1,2,3,4,5]', '{"key1": "value1"}')` insert := helper.DML2Event(sql, "test", "t") @@ -217,5 +219,6 @@ var LargeTableColumns = map[string]interface{}{ "enumT": []uint8("a"), "setT": []uint8("b"), "bitT": []uint8{65}, + "vectorT": []uint8("[1,2,3,4,5]"), "jsonT": []uint8("{\"key1\": \"value1\"}"), } diff --git a/pkg/sink/mysql/config.go b/pkg/sink/mysql/config.go index 4adf68639a8..9e9f149eee3 100644 --- a/pkg/sink/mysql/config.go +++ b/pkg/sink/mysql/config.go @@ -77,6 +77,8 @@ const ( // defaultcachePrepStmts is the default value of cachePrepStmts defaultCachePrepStmts = true + + defaultHasVectorType = false ) type urlConfig struct { @@ -96,6 +98,7 @@ type urlConfig struct { EnableBatchDML *bool `form:"batch-dml-enable"` EnableMultiStatement *bool `form:"multi-stmt-enable"` EnableCachePreparedStatement *bool `form:"cache-prep-stmts"` + HasVectorType *bool `form:"has-vector-type"` } // Config is the configs for MySQL backend. @@ -117,6 +120,7 @@ type Config struct { // IsBDRModeSupported is true if the downstream is TiDB and write source is existed. // write source exists when the downstream is TiDB and version is greater than or equal to v6.5.0. IsWriteSourceExisted bool + HasVectorType bool // HasVectorType is true if the column is vector type SourceID uint64 BatchDMLEnable bool @@ -140,6 +144,7 @@ func NewConfig() *Config { MultiStmtEnable: defaultMultiStmtEnable, CachePrepStmts: defaultCachePrepStmts, SourceID: config.DefaultTiDBSourceID, + HasVectorType: defaultHasVectorType, } } @@ -197,6 +202,7 @@ func (c *Config) Apply( } getBatchDMLEnable(urlParameter, &c.BatchDMLEnable) + getHasVectorType(urlParameter, &c.HasVectorType) getMultiStmtEnable(urlParameter, &c.MultiStmtEnable) getCachePrepStmts(urlParameter, &c.CachePrepStmts) c.ForceReplicate = replicaConfig.ForceReplicate @@ -446,6 +452,12 @@ func getBatchDMLEnable(values *urlConfig, batchDMLEnable *bool) { } } +func getHasVectorType(values *urlConfig, hasVectorType *bool) { + if values.HasVectorType != nil { + *hasVectorType = *values.HasVectorType + } +} + func getMultiStmtEnable(values *urlConfig, multiStmtEnable *bool) { if values.EnableMultiStatement != nil { *multiStmtEnable = *values.EnableMultiStatement diff --git a/scripts/download-integration-test-binaries.sh b/scripts/download-integration-test-binaries.sh index df5fac810c6..234ec4d787d 100755 --- a/scripts/download-integration-test-binaries.sh +++ b/scripts/download-integration-test-binaries.sh @@ -147,7 +147,7 @@ download_binaries() { local minio_download_url="${FILE_SERVER_URL}/download/minio.tar.gz" local go_ycsb_download_url="${FILE_SERVER_URL}/download/builds/pingcap/go-ycsb/test-br/go-ycsb" local etcd_download_url="${FILE_SERVER_URL}/download/builds/pingcap/cdc/etcd-v3.4.7-linux-amd64.tar.gz" - local sync_diff_inspector_url="${FILE_SERVER_URL}/download/builds/pingcap/cdc/sync_diff_inspector_hash-79f1fd1e_linux-amd64.tar.gz" + local sync_diff_inspector_url="${FILE_SERVER_URL}/download/builds/pingcap/cdc/sync_diff_inspector_hash-a129f096_linux-amd64.tar.gz" local jq_download_url="${FILE_SERVER_URL}/download/builds/pingcap/test/jq-1.6/jq-linux64" local schema_registry_url="${FILE_SERVER_URL}/download/builds/pingcap/cdc/schema-registry.tar.gz" diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index 991ba3d188a..df2ab20ed4d 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -42,7 +42,7 @@ groups=( # G04 'foreign_key ddl_puller_lag ddl_only_block_related_table changefeed_auto_stop' # G05 - 'charset_gbk ddl_manager multi_source' + 'charset_gbk ddl_manager multi_source vector' # G06 'sink_retry changefeed_error ddl_sequence resourcecontrol' # G07 pulsar oauth2 authentication enabled diff --git a/tests/integration_tests/vector/conf/diff_config.toml b/tests/integration_tests/vector/conf/diff_config.toml new file mode 100644 index 00000000000..d37f42f32c8 --- /dev/null +++ b/tests/integration_tests/vector/conf/diff_config.toml @@ -0,0 +1,30 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/vector/sync_diff/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["test.?*"] + + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/vector/data/data.sql b/tests/integration_tests/vector/data/data.sql new file mode 100644 index 00000000000..daa94d721ee --- /dev/null +++ b/tests/integration_tests/vector/data/data.sql @@ -0,0 +1,34 @@ +DROP DATABASE IF EXISTS test; +CREATE DATABASE test; +use test; +DROP TABLE IF EXISTS test.simple1; +DROP TABLE IF EXISTS test.simple2; + +CREATE TABLE test.simple1(id int primary key, data VECTOR(5)); +-- CREATE VECTOR INDEX idx_name1 USING HNSW ON test.simple1(VEC_COSINE_DISTANCE(data)) ; +INSERT INTO test.simple1(id, data) VALUES (1, "[1,2,3,4,5]"); +INSERT INTO test.simple1(id, data) VALUES (2, '[2,3,4,5,6]'); +INSERT INTO test.simple1(id, data) VALUES (3, '[0.1,0.2,0.3,0.4,0.5]'); +INSERT INTO test.simple1(id, data) VALUES (4, '[0,-0.1,-2,2,0.1]'); + + +CREATE TABLE test.simple2(id int primary key, data VECTOR(5), embedding VECTOR(5) COMMENT "hnsw(distance=cosine)"); +INSERT INTO test.simple2(id, data, embedding) VALUES (1, '[1,2,3,4,5]','[1,2,3,4,5]'); +INSERT INTO test.simple2(id, data, embedding) VALUES (2, '[2,3,4,5,6]','[1,2,3,4,5]'); +INSERT INTO test.simple2(id, data, embedding) VALUES (3, '[0.1,0.2,0.3,0.4,0.5]','[1,2,3,4,5]'); +INSERT INTO test.simple2(id, data, embedding) VALUES (4, '[0,-0.1,-2,2,0.1]','[1,2,3,4,5]'); + +DELETE FROM test.simple1 where id=1; +DELETE FROM test.simple2 where id=1; +DELETE FROM test.simple1 where id=2; +DELETE FROM test.simple2 where id=2; + +UPDATE test.simple1 SET data = '[0,-0.1,-2,2.0,0.1]' WHERE id = 3; +UPDATE test.simple2 SET data = '[0,-0.1,-2,2.0,0.1]' WHERE id = 3; + +ALTER TABLE test.simple1 ADD column embedding VECTOR(3) COMMENT "hnsw(distance=cosine)"; +INSERT INTO test.simple1 (id, data, embedding) VALUES (5, '[1,2,3,4,5]', '[1,2,3]'); + +ALTER TABLE test.simple2 DROP column embedding; + +CREATE TABLE test.finish_mark(id int primary key); \ No newline at end of file diff --git a/tests/integration_tests/vector/run.sh b/tests/integration_tests/vector/run.sh new file mode 100755 index 00000000000..aedcd2bccec --- /dev/null +++ b/tests/integration_tests/vector/run.sh @@ -0,0 +1,44 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + TOPIC_NAME="vector-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; + *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; + esac + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR $SINK_URI ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac + run_sql_file $CUR/data/data.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + sleep 3 + # sync_diff can't check non-exist table, so we check expected tables are created in downstream first + check_table_exists test.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"