Skip to content

Commit

Permalink
*: use chunk to decode rawKv in eventService (flowbehappy#253)
Browse files Browse the repository at this point in the history
* use chunk to decode rawKv in eventService

Signed-off-by: dongmen <[email protected]>
  • Loading branch information
asddongmen committed Sep 4, 2024
1 parent 28f3186 commit db88d35
Show file tree
Hide file tree
Showing 22 changed files with 963 additions and 269 deletions.
38 changes: 18 additions & 20 deletions downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type Dispatcher struct {

resolvedTs *TsWithMutex // 用来记 中目前收到的 event 中收到的最大的 commitTs - 1,不代表 dispatcher 的 checkpointTs

ddlPendingEvent *common.TxnEvent
ddlPendingEvent *common.DDLEvent
isRemoving atomic.Bool

tableProgress *types.TableProgress
Expand Down Expand Up @@ -124,11 +124,11 @@ func NewDispatcher(id common.DispatcherID, tableSpan *heartbeatpb.TableSpan, sin
// 2.2 maintainer 通知自己可以 write 或者 pass event
//
// TODO:特殊处理有 add index 的逻辑
func (d *Dispatcher) addDDLEventToSinkWhenAvailable(event *common.TxnEvent) {
func (d *Dispatcher) addDDLEventToSinkWhenAvailable(event *common.DDLEvent) {
// 根据 filter 过滤 query 中不需要 send to downstream 的数据
// 但应当不出现整个 query 都不需要 send to downstream 的 ddl,这种 ddl 不应该发给 dispatcher
// TODO: ddl 影响到的 tableSpan 也在 filter 中过滤一遍
err := d.filter.FilterDDLEvent(event.GetDDLEvent())
err := d.filter.FilterDDLEvent(event)
if err != nil {
log.Error("filter ddl query failed", zap.Error(err))
// 这里怎么处理更合适呢?有错然后反上去让 changefeed 报错
Expand Down Expand Up @@ -164,7 +164,7 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat

action := dispatcherStatus.GetAction()
if action != nil {
if action.CommitTs == d.ddlPendingEvent.CommitTs {
if action.CommitTs == d.ddlPendingEvent.CommitTS {
if action.Action == heartbeatpb.Action_Write {
d.sink.AddDDLAndSyncPointEvent(d.ddlPendingEvent, d.tableProgress)
} else {
Expand All @@ -181,29 +181,27 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat
}

ack := dispatcherStatus.GetAck()
if ack != nil && ack.CommitTs == d.ddlPendingEvent.CommitTs {
if ack != nil && ack.CommitTs == d.ddlPendingEvent.CommitTS {
d.CancelResendTask()
}
}

func (d *Dispatcher) HandleEvent(event common.Event) (block bool) {
switch event.GetType() {
case common.TypeTxnEvent:
event := event.(*common.TxnEvent)
if event.IsDMLEvent() {
d.sink.AddDMLEvent(event, d.tableProgress)
return false
} else if event.IsDDLEvent() {
event.PostTxnFlushed = append(event.PostTxnFlushed, func() {
dispatcherEventDynamicStream := GetDispatcherEventsDynamicStream()
dispatcherEventDynamicStream.Wake() <- event.GetDispatcherID()
})
d.addDDLEventToSinkWhenAvailable(event)
return true
}
case common.TypeResolvedEvent:
d.resolvedTs.Set(event.(common.ResolvedEvent).ResolvedTs)
return false
case common.TypeTEvent:
d.sink.AddDMLEvent(event.(*common.TEvent), d.tableProgress)
return false
case common.TypeDDLEvent:
event := event.(*common.DDLEvent)
event.AddPostFlushFunc(func() {
dispatcherEventDynamicStream := GetDispatcherEventsDynamicStream()
dispatcherEventDynamicStream.Wake() <- event.GetDispatcherID()
})
d.addDDLEventToSinkWhenAvailable(event)
return true
}
log.Panic("invalid event type", zap.Any("event", event))
return false
Expand All @@ -220,7 +218,7 @@ func (d *Dispatcher) DealWithDDLWhenProgressEmpty() {
ComponentStatus: heartbeatpb.ComponentState_Working,
State: &heartbeatpb.State{
IsBlocked: false,
BlockTs: d.ddlPendingEvent.CommitTs,
BlockTs: d.ddlPendingEvent.CommitTS,
NeedDroppedTables: d.ddlPendingEvent.GetNeedDroppedTables().ToPB(),
NeedAddedTables: common.ToTablesPB(d.ddlPendingEvent.GetNeedAddedTables()),
},
Expand All @@ -234,7 +232,7 @@ func (d *Dispatcher) DealWithDDLWhenProgressEmpty() {
ComponentStatus: heartbeatpb.ComponentState_Working,
State: &heartbeatpb.State{
IsBlocked: true,
BlockTs: d.ddlPendingEvent.CommitTs,
BlockTs: d.ddlPendingEvent.CommitTS,
BlockTables: d.ddlPendingEvent.GetBlockedTables().ToPB(),
NeedDroppedTables: d.ddlPendingEvent.GetNeedDroppedTables().ToPB(),
NeedAddedTables: common.ToTablesPB(d.ddlPendingEvent.GetNeedAddedTables()),
Expand Down
12 changes: 5 additions & 7 deletions downstreamadapter/sink/helper/eventrouter/event_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,9 @@ func NewEventRouter(cfg *config.ReplicaConfig, protocol config.Protocol, default
}

// GetTopicForRowChange returns the target topic for row changes.
func (s *EventRouter) GetTopicForRowChange(row *common.RowChangedEvent) string {
schema := row.TableInfo.GetSchemaName()
table := row.TableInfo.GetTableName()
topicGenerator := s.matchTopicGenerator(schema, table)
return topicGenerator.Substitute(schema, table)
func (s *EventRouter) GetTopicForRowChange(tableInfo *common.TableInfo) string {
topicGenerator := s.matchTopicGenerator(tableInfo.TableName.Schema, tableInfo.TableName.Table)
return topicGenerator.Substitute(tableInfo.TableName.Schema, tableInfo.TableName.Table)
}

// GetTopicForDDL returns the target topic for DDL.
Expand Down Expand Up @@ -110,8 +108,8 @@ func (s *EventRouter) GetPartitionForRowChange(row *common.RowChangedEvent, part
}

// GetPartitionForRowChange returns the target partition for row changes.
func (s *EventRouter) GetPartitionGeneratorForRowChange(row *common.RowChangedEvent) partition.PartitionGenerator {
return s.GetPartitionDispatcher(row.TableInfo.GetSchemaName(), row.TableInfo.GetTableName())
func (s *EventRouter) GetPartitionGeneratorForRowChange(tableInfo *common.TableInfo) partition.PartitionGenerator {
return s.GetPartitionDispatcher(tableInfo.GetSchemaName(), tableInfo.GetTableName())
}

// GetPartitionDispatcher returns the partition dispatcher for a specific table.
Expand Down
23 changes: 14 additions & 9 deletions downstreamadapter/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,21 +150,20 @@ func NewKafkaSink(changefeedID model.ChangeFeedID, sinkURI *url.URL, replicaConf
}, nil
}

func (s *KafkaSink) AddDMLEvent(event *common.TxnEvent, tableProgress *types.TableProgress) {
if len(event.GetRows()) == 0 {
func (s *KafkaSink) AddDMLEvent(event *common.TEvent, tableProgress *types.TableProgress) {
if event.Len() == 0 {
return
}
tableProgress.Add(event)

firstRow := event.GetRows()[0]
topic := s.eventRouter.GetTopicForRowChange(firstRow)
topic := s.eventRouter.GetTopicForRowChange(event.TableInfo)
partitionNum, err := s.topicManager.GetPartitionNum(s.ctx, topic)
if err != nil {
s.cancel(err)
log.Error("failed to get partition number for topic", zap.String("topic", topic), zap.Error(err))
return
}
partitonGenerator := s.eventRouter.GetPartitionGeneratorForRowChange(firstRow)
partitonGenerator := s.eventRouter.GetPartitionGeneratorForRowChange(event.TableInfo)

toRowCallback := func(postTxnFlushed []func(), totalCount uint64) func() {
var calledCount atomic.Uint64
Expand All @@ -178,10 +177,16 @@ func (s *KafkaSink) AddDMLEvent(event *common.TxnEvent, tableProgress *types.Tab
}
}

rowsCount := uint64(len(event.GetRows()))
rowsCount := uint64(event.Len())
rowCallback := toRowCallback(event.PostTxnFlushed, rowsCount)

for _, row := range event.GetRows() {
for {
_, ok := event.GetNextRow()
if !ok {
break
}
// FIXME: pass a real row
row := &common.RowChangedEvent{}
err = s.columnSelector.Apply(row)
if err != nil {
s.cancel(err)
Expand Down Expand Up @@ -212,11 +217,11 @@ func (s *KafkaSink) AddDMLEvent(event *common.TxnEvent, tableProgress *types.Tab

}

func (s *KafkaSink) PassDDLAndSyncPointEvent(event *common.TxnEvent, tableProgress *types.TableProgress) {
func (s *KafkaSink) PassDDLAndSyncPointEvent(event *common.DDLEvent, tableProgress *types.TableProgress) {
tableProgress.Pass(event)
}

func (s *KafkaSink) AddDDLAndSyncPointEvent(event *common.TxnEvent, tableProgress *types.TableProgress) {
func (s *KafkaSink) AddDDLAndSyncPointEvent(event *common.DDLEvent, tableProgress *types.TableProgress) {
}

func (s *KafkaSink) Close() {
Expand Down
34 changes: 17 additions & 17 deletions downstreamadapter/sink/mysql_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,22 @@ type MysqlSink struct {
wg sync.WaitGroup
cancel context.CancelFunc

eventChans []chan *common.TxnEvent
ddlEventChan chan *common.TxnEvent
eventChans []chan *common.TEvent
ddlEventChan chan *common.DDLEvent
workerCount int
}

// event dispatcher manager 初始化的时候创建 mysqlSink 对象
func NewMysqlSink(changefeedID model.ChangeFeedID, workerCount int, cfg *writer.MysqlConfig, db *sql.DB) *MysqlSink {
mysqlSink := MysqlSink{
changefeedID: changefeedID,
eventChans: make([]chan *common.TxnEvent, workerCount),
ddlEventChan: make(chan *common.TxnEvent, 16),
eventChans: make([]chan *common.TEvent, workerCount),
ddlEventChan: make(chan *common.DDLEvent, 16),
workerCount: workerCount,
}

for i := 0; i < workerCount; i++ {
mysqlSink.eventChans[i] = make(chan *common.TxnEvent, 16)
mysqlSink.eventChans[i] = make(chan *common.TEvent, 16)
}

mysqlSink.initWorker(workerCount, cfg, db)
Expand All @@ -76,11 +76,11 @@ func (s *MysqlSink) initWorker(workerCount int, cfg *writer.MysqlConfig, db *sql
for i := 0; i < workerCount; i++ {
s.wg.Add(1)
workerId := i
go func(ctx context.Context, eventChan chan *common.TxnEvent, db *sql.DB, config *writer.MysqlConfig, maxRows int) {
go func(ctx context.Context, eventChan chan *common.TEvent, db *sql.DB, config *writer.MysqlConfig, maxRows int) {
defer s.wg.Done()
totalStart := time.Now()
worker := worker.NewMysqlWorker(eventChan, db, config, workerId, s.changefeedID)
events := make([]*common.TxnEvent, 0)
events := make([]*common.TEvent, 0)
rows := 0
for {
needFlush := false
Expand All @@ -89,7 +89,7 @@ func (s *MysqlSink) initWorker(workerCount int, cfg *writer.MysqlConfig, db *sql
return
case txnEvent := <-worker.GetEventChan():
events = append(events, txnEvent)
rows += len(txnEvent.Rows)
rows += txnEvent.Len()
if rows > maxRows {
needFlush = true
}
Expand All @@ -98,9 +98,9 @@ func (s *MysqlSink) initWorker(workerCount int, cfg *writer.MysqlConfig, db *sql
for !needFlush {
select {
case txnEvent := <-worker.GetEventChan():
worker.MetricWorkerHandledRows.Add(float64(len(txnEvent.Rows)))
worker.MetricWorkerHandledRows.Add(float64(txnEvent.Len()))
events = append(events, txnEvent)
rows += len(txnEvent.Rows)
rows += txnEvent.Len()
if rows > maxRows {
needFlush = true
}
Expand All @@ -117,7 +117,7 @@ func (s *MysqlSink) initWorker(workerCount int, cfg *writer.MysqlConfig, db *sql
}
}
start := time.Now()
err := worker.GetMysqlWriter().Flush(events, workerId, rows)
err := worker.GetMysqlWriter().Flush(events, workerId)
if err != nil {
log.Error("Failed to flush events", zap.Error(err), zap.Any("workerID", workerId), zap.Any("events", events))
return
Expand All @@ -139,7 +139,7 @@ func (s *MysqlSink) initWorker(workerCount int, cfg *writer.MysqlConfig, db *sql

// ddl flush goroutine
s.wg.Add(1)
go func(ctx context.Context, ddleEventChan chan *common.TxnEvent) {
go func(ctx context.Context, ddleEventChan chan *common.DDLEvent) {
defer s.wg.Done()
for {
select {
Expand All @@ -152,23 +152,23 @@ func (s *MysqlSink) initWorker(workerCount int, cfg *writer.MysqlConfig, db *sql
}(ctx, s.ddlEventChan)
}

func (s *MysqlSink) AddDMLEvent(event *common.TxnEvent, tableProgress *types.TableProgress) {
if len(event.GetRows()) == 0 {
func (s *MysqlSink) AddDMLEvent(event *common.TEvent, tableProgress *types.TableProgress) {
if event.Len() == 0 {
return
}

tableProgress.Add(event)

// TODO:后续再优化这里的逻辑,目前有个问题是 physical table id 好像都是偶数?这个后面改个能见人的方法
index := event.GetRows()[0].PhysicalTableID % int64(s.workerCount)
index := int64(event.PhysicalTableID) % int64(s.workerCount)
s.eventChans[index] <- event
}

func (s *MysqlSink) PassDDLAndSyncPointEvent(event *common.TxnEvent, tableProgress *types.TableProgress) {
func (s *MysqlSink) PassDDLAndSyncPointEvent(event *common.DDLEvent, tableProgress *types.TableProgress) {
tableProgress.Pass(event)
}

func (s *MysqlSink) AddDDLAndSyncPointEvent(event *common.TxnEvent, tableProgress *types.TableProgress) {
func (s *MysqlSink) AddDDLAndSyncPointEvent(event *common.DDLEvent, tableProgress *types.TableProgress) {
// TODO:这个 ddl 可以并发写么?如果不行的话,后面还要加锁或者排队
tableProgress.Add(event)
//s.ddlWorker.GetMysqlWriter().FlushDDLEvent(event)
Expand Down
6 changes: 3 additions & 3 deletions downstreamadapter/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (
)

type Sink interface {
AddDMLEvent(event *common.TxnEvent, tableProgress *types.TableProgress)
AddDDLAndSyncPointEvent(event *common.TxnEvent, tableProgress *types.TableProgress)
PassDDLAndSyncPointEvent(event *common.TxnEvent, tableProgress *types.TableProgress)
AddDMLEvent(event *common.TEvent, tableProgress *types.TableProgress)
AddDDLAndSyncPointEvent(event *common.DDLEvent, tableProgress *types.TableProgress)
PassDDLAndSyncPointEvent(event *common.DDLEvent, tableProgress *types.TableProgress)
// IsEmpty(tableSpan *common.TableSpan) bool
// AddTableSpan(tableSpan *common.TableSpan)
// RemoveTableSpan(tableSpan *common.TableSpan)
Expand Down
17 changes: 8 additions & 9 deletions downstreamadapter/sink/types/table_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,19 @@ func NewTableProgress() *TableProgress {
return tableProgress
}

func (p *TableProgress) Add(event *common.TxnEvent) {
ts := Ts{startTs: event.StartTs, commitTs: event.CommitTs}
func (p *TableProgress) Add(event common.FlushableEvent) {
ts := Ts{startTs: event.GetStartTs(), commitTs: event.GetCommitTs()}
p.mutex.Lock()
defer p.mutex.Unlock()
elem := p.list.PushBack(ts)
p.elemMap[ts] = elem
p.maxCommitTs = event.CommitTs

event.PostTxnFlushed = append(event.PostTxnFlushed, func() { p.Remove(event) })
p.maxCommitTs = event.GetCommitTs()
event.AddPostFlushFunc(func() { p.Remove(event) })
}

// 而且删除可以认为是批量的?但要不要做成批量可以后面再看
func (p *TableProgress) Remove(event *common.TxnEvent) {
ts := Ts{startTs: event.StartTs, commitTs: event.CommitTs}
func (p *TableProgress) Remove(event common.Event) {
ts := Ts{startTs: event.GetStartTs(), commitTs: event.GetCommitTs()}
p.mutex.Lock()
defer p.mutex.Unlock()
if elem, ok := p.elemMap[ts]; ok {
Expand All @@ -77,10 +76,10 @@ func (p *TableProgress) Empty() bool {
return p.list.Len() == 0
}

func (p *TableProgress) Pass(event *common.TxnEvent) {
func (p *TableProgress) Pass(event *common.DDLEvent) {
p.mutex.Lock()
defer p.mutex.Unlock()
p.maxCommitTs = event.CommitTs
p.maxCommitTs = event.CommitTS
}

// 返回当前 tableSpan 中最大的 checkpointTs,也就是最大的 ts,并且 <= ts 之前的数据都已经成功写下去了
Expand Down
8 changes: 4 additions & 4 deletions downstreamadapter/worker/mysql_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (

// MysqlWorker is use to flush the event downstream
type MysqlWorker struct {
eventChan <-chan *common.TxnEvent // 获取到能往下游写的 events
mysqlWriter *writer.MysqlWriter // 实际负责做 flush 操作
eventChan <-chan *common.TEvent // 获取到能往下游写的 events
mysqlWriter *writer.MysqlWriter // 实际负责做 flush 操作
id int
// Metrics.
MetricConflictDetectDuration prometheus.Observer
Expand All @@ -38,7 +38,7 @@ type MysqlWorker struct {
MetricWorkerHandledRows prometheus.Counter
}

func NewMysqlWorker(eventChan <-chan *common.TxnEvent, db *sql.DB, config *writer.MysqlConfig, id int, changefeedID model.ChangeFeedID) *MysqlWorker {
func NewMysqlWorker(eventChan <-chan *common.TEvent, db *sql.DB, config *writer.MysqlConfig, id int, changefeedID model.ChangeFeedID) *MysqlWorker {
wid := strconv.Itoa(id)

return &MysqlWorker{
Expand All @@ -54,7 +54,7 @@ func NewMysqlWorker(eventChan <-chan *common.TxnEvent, db *sql.DB, config *write
}
}

func (t *MysqlWorker) GetEventChan() <-chan *common.TxnEvent {
func (t *MysqlWorker) GetEventChan() <-chan *common.TEvent {
return t.eventChan
}

Expand Down
12 changes: 0 additions & 12 deletions downstreamadapter/writer/dml_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,15 +199,3 @@ func buildColumnList(names []string) string {
}

// placeHolder returns a string separated by comma
// n must be greater or equal than 1, or the function will panic
func placeHolder(n int) string {
var builder strings.Builder
builder.Grow((n-1)*2 + 1)
for i := 0; i < n; i++ {
if i > 0 {
builder.WriteString(",")
}
builder.WriteString("?")
}
return builder.String()
}
Loading

0 comments on commit db88d35

Please sign in to comment.