Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[to #48] modify TiDB CDC to TiKV CDC #74

Merged
merged 33 commits into from
Apr 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a330434
modify TiCDC to TiKV CDC
zeminzhou Mar 23, 2022
d537173
br: fix CI timeout (#71)
pingyu Mar 23, 2022
6acd595
[to #67] remove unused function and dead code (#69)
zz-jason Mar 23, 2022
8eea41e
fix lint
zeminzhou Mar 23, 2022
2956d83
fix ut
zeminzhou Mar 23, 2022
92a5940
fix ut
zeminzhou Mar 23, 2022
fe8cac1
[to #519] move br component from tikv java client to this repo (#73)
marsishandsome Mar 23, 2022
be501e8
[to #67] remove unused code related to restore (#76)
zz-jason Mar 29, 2022
4cf9276
fix ut
zeminzhou Mar 29, 2022
c829e71
[to #67] remove unused function and dead code (#69)
zz-jason Mar 23, 2022
62c7f00
[to #67] remove unused code related to restore (#76)
zz-jason Mar 29, 2022
6bd0a5c
remove ForceReplicate, IgnoreIneligibleTable, IgnoreTxnStartTs, Mounter
zeminzhou Mar 30, 2022
e0a7e8b
fix the same keyspan id
zeminzhou Mar 30, 2022
bd77c3a
fix the same keyspan id
zeminzhou Mar 30, 2022
d8eec09
namespace ticdc -> tikv_cdc
zeminzhou Mar 30, 2022
a61dd5a
change func name to updateCurrentKeySpansImpl
zeminzhou Mar 30, 2022
f60b0fe
fix hard code
zeminzhou Mar 30, 2022
4380a2f
fix error name
zeminzhou Mar 30, 2022
7f5419a
fix ut
zeminzhou Mar 30, 2022
40c9ecb
add deleted ut TestFixChangefeedSinkProtocol
zeminzhou Mar 30, 2022
687fea1
[to #66] update kvproto to support api v2 (#79)
zz-jason Mar 30, 2022
3363a0b
[to #67] remove unused code related to parser (#78)
zz-jason Mar 30, 2022
c5da5e5
[to #67] clean Makefile (#80)
zz-jason Mar 31, 2022
a878f96
fix keyspan id
zeminzhou Apr 1, 2022
fbdefb7
[to #67] setup backup/restore integration test for rawkv (#82)
zz-jason Apr 6, 2022
be8b5dd
[to#67] remove some unused code (#81)
zz-jason Apr 6, 2022
78ad01b
remove unnecessary tests (#83)
zz-jason Apr 6, 2022
d890254
[to #67] support setting api version in br requests (#75)
zz-jason Apr 13, 2022
36f9ee8
[to #67] remove unnecessary codes (#84)
zz-jason Apr 14, 2022
373086c
[to #67] fix rawkv backup failure (#77)
pingyu Apr 14, 2022
fc51e96
fix comment
zeminzhou Apr 18, 2022
7d0ee78
fix comment
zeminzhou Apr 18, 2022
eede5d2
Merge branch 'main' into tikv-cdc
pingyu Apr 18, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cdc/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ PACKAGE_LIST := go list ./... | grep -vE 'vendor|proto|cdc\/tests|integration|te
PACKAGES := $$($(PACKAGE_LIST))
FILES := $$(find . -name '*.go' -type f | grep -vE 'vendor|kv_gen|proto|pb\.go|pb\.gw\.go')
TEST_FILES := $$(find . -name '*_test.go' -type f | grep -vE 'vendor|kv_gen|integration|testing_utils')
FAILPOINT_DIR := $$(for p in $(PACKAGES); do echo $${p\#"github.com/tikv/migration/cdc/"}|grep -v "github.com/pingcap/migration/cdc/"; done)
FAILPOINT_DIR := $$(for p in $(PACKAGES); do echo $${p\#"github.com/tikv/migration/cdc/"}|grep -v "github.com/tikv/migration/cdc/"; done)
FAILPOINT := tools/bin/failpoint-ctl

FAILPOINT_ENABLE := $$(echo $(FAILPOINT_DIR) | xargs $(FAILPOINT) enable >/dev/null)
Expand Down
78 changes: 15 additions & 63 deletions cdc/cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import (
"github.com/tikv/migration/cdc/cdc/model"
"github.com/tikv/migration/cdc/cdc/owner"
"github.com/tikv/migration/cdc/cdc/processor"
"github.com/tikv/migration/cdc/cdc/processor/pipeline/system"
ssystem "github.com/tikv/migration/cdc/cdc/sorter/leveldb/system"

// ssystem "github.com/tikv/migration/cdc/cdc/sorter/leveldb/system"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just remove it?

Copy link
Contributor Author

@zeminzhou zeminzhou Mar 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sorter module is used to sort transactions by commit ts to ensure that the transactions entered by cdc are in the same order as commit ts. Now rawkv don't it (We also need to add sorter for txnkv later).

"github.com/tikv/migration/cdc/pkg/config"
cdcContext "github.com/tikv/migration/cdc/pkg/context"
cerror "github.com/tikv/migration/cdc/pkg/errors"
Expand Down Expand Up @@ -66,10 +66,10 @@ type Capture struct {
grpcPool kv.GrpcPool
regionCache *tikv.RegionCache
TimeAcquirer pdtime.TimeAcquirer
sorterSystem *ssystem.System
// sorterSystem *ssystem.System

enableNewScheduler bool
tableActorSystem *system.System
// keyspanActorSystem *system.System

// MessageServer is the receiver of the messages from the other nodes.
// It should be recreated each time the capture is restarted.
Expand Down Expand Up @@ -141,40 +141,6 @@ func (c *Capture) reset(ctx context.Context) error {
c.TimeAcquirer.Stop()
}
c.TimeAcquirer = pdtime.NewTimeAcquirer(c.pdClient)

if c.tableActorSystem != nil {
err := c.tableActorSystem.Stop()
if err != nil {
log.Warn("stop table actor system failed", zap.Error(err))
}
}
if conf.Debug.EnableTableActor {
c.tableActorSystem = system.NewSystem()
err = c.tableActorSystem.Start(ctx)
if err != nil {
return errors.Annotate(
cerror.WrapError(cerror.ErrNewCaptureFailed, err),
"create table actor system")
}
}
if conf.Debug.EnableDBSorter {
if c.sorterSystem != nil {
err := c.sorterSystem.Stop()
if err != nil {
log.Warn("stop sorter system failed", zap.Error(err))
}
}
// Sorter dir has been set and checked when server starts.
// See https://github.com/tikv/migration/cdc/blob/9dad09/cdc/server.go#L275
sortDir := config.GetGlobalServerConfig().Sorter.SortDir
c.sorterSystem = ssystem.NewSystem(sortDir, conf.Debug.DB)
err = c.sorterSystem.Start(ctx)
if err != nil {
return errors.Annotate(
cerror.WrapError(cerror.ErrNewCaptureFailed, err),
"create sorter system")
}
}
if c.grpcPool != nil {
c.grpcPool.Close()
}
Expand Down Expand Up @@ -257,17 +223,17 @@ func (c *Capture) Run(ctx context.Context) error {

func (c *Capture) run(stdCtx context.Context) error {
ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{
PDClient: c.pdClient,
KVStorage: c.kvStorage,
CaptureInfo: c.info,
EtcdClient: c.etcdClient,
GrpcPool: c.grpcPool,
RegionCache: c.regionCache,
TimeAcquirer: c.TimeAcquirer,
TableActorSystem: c.tableActorSystem,
SorterSystem: c.sorterSystem,
MessageServer: c.MessageServer,
MessageRouter: c.MessageRouter,
PDClient: c.pdClient,
KVStorage: c.kvStorage,
CaptureInfo: c.info,
EtcdClient: c.etcdClient,
GrpcPool: c.grpcPool,
RegionCache: c.regionCache,
TimeAcquirer: c.TimeAcquirer,
// KeySpanActorSystem: c.keyspanActorSystem,
// SorterSystem: c.sorterSystem,
MessageServer: c.MessageServer,
MessageRouter: c.MessageRouter,
})
err := c.register(ctx)
if err != nil {
Expand Down Expand Up @@ -535,20 +501,6 @@ func (c *Capture) AsyncClose() {
c.regionCache.Close()
c.regionCache = nil
}
if c.tableActorSystem != nil {
err := c.tableActorSystem.Stop()
if err != nil {
log.Warn("stop table actor system failed", zap.Error(err))
}
c.tableActorSystem = nil
}
if c.sorterSystem != nil {
err := c.sorterSystem.Stop()
if err != nil {
log.Warn("stop sorter system failed", zap.Error(err))
}
c.sorterSystem = nil
}
if c.enableNewScheduler {
c.grpcService.Reset(nil)

Expand Down
44 changes: 22 additions & 22 deletions cdc/cdc/capture/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,11 @@ func (h *HTTPHandler) GetChangefeed(c *gin.Context) {

taskStatus := make([]model.CaptureTaskStatus, 0, len(processorInfos))
for captureID, status := range processorInfos {
tables := make([]int64, 0)
for tableID := range status.Tables {
tables = append(tables, tableID)
keyspans := make([]uint64, 0)
for keyspanID := range status.KeySpans {
keyspans = append(keyspans, keyspanID)
}
taskStatus = append(taskStatus, model.CaptureTaskStatus{CaptureID: captureID, Tables: tables, Operation: status.Operation})
taskStatus = append(taskStatus, model.CaptureTaskStatus{CaptureID: captureID, KeySpans: keyspans, Operation: status.Operation})
}

changefeedDetail := &model.ChangefeedDetail{
Expand Down Expand Up @@ -424,17 +424,17 @@ func (h *HTTPHandler) RemoveChangefeed(c *gin.Context) {
c.Status(http.StatusAccepted)
}

// RebalanceTable rebalances tables
// @Summary rebalance tables
// @Description rebalance all tables of a changefeed
// RebalanceKeySpan rebalances keyspans
// @Summary rebalance keyspans
// @Description rebalance all keyspans of a changefeed
// @Tags changefeed
// @Accept json
// @Produce json
// @Param changefeed_id path string true "changefeed_id"
// @Success 202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/changefeeds/{changefeed_id}/tables/rebalance_table [post]
func (h *HTTPHandler) RebalanceTable(c *gin.Context) {
// @Router /api/v1/changefeeds/{changefeed_id}/keyspans/rebalance_keyspan [post]
func (h *HTTPHandler) RebalanceKeySpan(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
Expand Down Expand Up @@ -462,19 +462,19 @@ func (h *HTTPHandler) RebalanceTable(c *gin.Context) {
c.Status(http.StatusAccepted)
}

// MoveTable moves a table to target capture
// @Summary move table
// @Description move one table to the target capture
// MoveKeySpan moves a keyspan to target capture
// @Summary move keyspan
// @Description move one keyspan to the target capture
// @Tags changefeed
// @Accept json
// @Produce json
// @Param changefeed_id path string true "changefeed_id"
// @Param table_id body integer true "table_id"
// @Param keyspan_id body integer true "keyspan_id"
// @Param capture_id body string true "capture_id"
// @Success 202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/changefeeds/{changefeed_id}/tables/move_table [post]
func (h *HTTPHandler) MoveTable(c *gin.Context) {
// @Router /api/v1/changefeeds/{changefeed_id}/keyspans/move_keyspan [post]
func (h *HTTPHandler) MoveKeySpan(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
Expand All @@ -495,7 +495,7 @@ func (h *HTTPHandler) MoveTable(c *gin.Context) {

data := struct {
CaptureID string `json:"capture_id"`
TableID int64 `json:"table_id"`
KeySpanID uint64 `json:"keyspan_id"`
}{}
err = c.BindJSON(&data)
if err != nil {
Expand All @@ -509,7 +509,7 @@ func (h *HTTPHandler) MoveTable(c *gin.Context) {
}

_ = h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error {
owner.ManualSchedule(changefeedID, data.CaptureID, data.TableID)
owner.ManualSchedule(changefeedID, data.CaptureID, data.KeySpanID)
return nil
})

Expand Down Expand Up @@ -586,7 +586,7 @@ func (h *HTTPHandler) GetProcessor(c *gin.Context) {
return
}
position, exist := positions[captureID]
// Note: for the case that no tables are attached to a newly created changefeed,
// Note: for the case that no keyspans are attached to a newly created changefeed,
// we just do not report an error.
var processorDetail model.ProcessorDetail
if exist {
Expand All @@ -596,11 +596,11 @@ func (h *HTTPHandler) GetProcessor(c *gin.Context) {
Count: position.Count,
Error: position.Error,
}
tables := make([]int64, 0)
for tableID := range status.Tables {
tables = append(tables, tableID)
keyspans := make([]uint64, 0)
for keyspanID := range status.KeySpans {
keyspans = append(keyspans, keyspanID)
}
processorDetail.Tables = tables
processorDetail.KeySpans = keyspans
}
c.IndentedJSON(http.StatusOK, &processorDetail)
}
Expand Down
82 changes: 15 additions & 67 deletions cdc/cdc/capture/http_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,13 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/r3labs/diff"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/migration/cdc/cdc/entry"
"github.com/tikv/migration/cdc/cdc/kv"
"github.com/tikv/migration/cdc/cdc/model"
"github.com/tikv/migration/cdc/cdc/sink"
"github.com/tikv/migration/cdc/pkg/config"
cerror "github.com/tikv/migration/cdc/pkg/errors"
"github.com/tikv/migration/cdc/pkg/filter"

"github.com/tikv/migration/cdc/pkg/txnutil/gc"
"github.com/tikv/migration/cdc/pkg/util"
"github.com/tikv/migration/cdc/pkg/version"
Expand Down Expand Up @@ -80,19 +77,23 @@ func verifyCreateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch

// init replicaConfig
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.ForceReplicate = changefeedConfig.ForceReplicate
if changefeedConfig.MounterWorkerNum != 0 {
replicaConfig.Mounter.WorkerNum = changefeedConfig.MounterWorkerNum
}
/*
replicaConfig.ForceReplicate = changefeedConfig.ForceReplicate
if changefeedConfig.MounterWorkerNum != 0 {
replicaConfig.Mounter.WorkerNum = changefeedConfig.MounterWorkerNum
}
*/
if changefeedConfig.SinkConfig != nil {
replicaConfig.Sink = changefeedConfig.SinkConfig
}
if len(changefeedConfig.IgnoreTxnStartTs) != 0 {
replicaConfig.Filter.IgnoreTxnStartTs = changefeedConfig.IgnoreTxnStartTs
}
if len(changefeedConfig.FilterRules) != 0 {
replicaConfig.Filter.Rules = changefeedConfig.FilterRules
}
/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FilterRules are used to filter table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can modify the filter table to filter keys (or keyspan) to filter keys that users do not need to synchronize.

if len(changefeedConfig.IgnoreTxnStartTs) != 0 {
replicaConfig.Filter.IgnoreTxnStartTs = changefeedConfig.IgnoreTxnStartTs
}
if len(changefeedConfig.FilterRules) != 0 {
replicaConfig.Filter.Rules = changefeedConfig.FilterRules
}
*/

captureInfos, err := capture.owner.StatusProvider().GetCaptures(ctx)
if err != nil {
Expand Down Expand Up @@ -127,16 +128,6 @@ func verifyCreateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
CreatorVersion: version.ReleaseVersion,
}

if !replicaConfig.ForceReplicate && !changefeedConfig.IgnoreIneligibleTable {
ineligibleTables, _, err := verifyTables(replicaConfig, capture.kvStorage, changefeedConfig.StartTS)
if err != nil {
return nil, err
}
if len(ineligibleTables) != 0 {
return nil, cerror.ErrTableIneligible.GenWithStackByArgs(ineligibleTables)
}
}

tz, err := util.GetTimezone(changefeedConfig.TimeZone)
if err != nil {
return nil, cerror.ErrAPIInvalidParam.Wrap(errors.Annotatef(err, "invalid timezone:%s", changefeedConfig.TimeZone))
Expand Down Expand Up @@ -164,22 +155,6 @@ func verifyUpdateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
}

// verify rules
if len(changefeedConfig.FilterRules) != 0 {
newInfo.Config.Filter.Rules = changefeedConfig.FilterRules
_, err = filter.VerifyRules(newInfo.Config)
if err != nil {
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs(err.Error())
}
}

if len(changefeedConfig.IgnoreTxnStartTs) != 0 {
newInfo.Config.Filter.IgnoreTxnStartTs = changefeedConfig.IgnoreTxnStartTs
}

if changefeedConfig.MounterWorkerNum != 0 {
newInfo.Config.Mounter.WorkerNum = changefeedConfig.MounterWorkerNum
}

if changefeedConfig.SinkConfig != nil {
newInfo.Config.Sink = changefeedConfig.SinkConfig
}
Expand All @@ -198,30 +173,3 @@ func verifyUpdateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch

return newInfo, nil
}

func verifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, startTs uint64) (ineligibleTables, eligibleTables []model.TableName, err error) {
filter, err := filter.NewFilter(replicaConfig)
if err != nil {
return nil, nil, errors.Trace(err)
}
meta, err := kv.GetSnapshotMeta(storage, startTs)
if err != nil {
return nil, nil, errors.Trace(err)
}
snap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs, false /* explicitTables */)
if err != nil {
return nil, nil, errors.Trace(err)
}

for _, tableInfo := range snap.Tables() {
if filter.ShouldIgnoreTable(tableInfo.TableName.Schema, tableInfo.TableName.Table) {
continue
}
if !tableInfo.IsEligible(false /* forceReplicate */) {
ineligibleTables = append(ineligibleTables, tableInfo.TableName)
} else {
eligibleTables = append(eligibleTables, tableInfo.TableName)
}
}
return
}
2 changes: 1 addition & 1 deletion cdc/cdc/capture/http_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) {
require.Nil(t, newInfo)

// test verify success
changefeedConfig = model.ChangefeedConfig{MounterWorkerNum: 32}
changefeedConfig = model.ChangefeedConfig{SinkConfig: &config.SinkConfig{Protocol: "test"}}
newInfo, err = verifyUpdateChangefeedConfig(ctx, changefeedConfig, oldInfo)
require.Nil(t, err)
require.NotNil(t, newInfo)
Expand Down
Loading