Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] consumer(ticdc): remove order message #11812

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions cdc/sink/dmlsink/mq/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/codec"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -267,6 +268,8 @@ func (w *worker) sendMessages(ctx context.Context) error {
metricSendMessageDuration := mq.WorkerSendMessageDuration.WithLabelValues(w.changeFeedID.Namespace, w.changeFeedID.ID)
defer mq.WorkerSendMessageDuration.DeleteLabelValues(w.changeFeedID.Namespace, w.changeFeedID.ID)

previousMap := make(map[int64]*model.RowChangedEvent)
previousMessageMap := make(map[int64]*common.Message)
var err error
outCh := w.encoderGroup.Output()
for {
Expand All @@ -283,7 +286,45 @@ func (w *worker) sendMessages(ctx context.Context) error {
if err = future.Ready(ctx); err != nil {
return errors.Trace(err)
}
for _, event := range future.Events {
previous := previousMap[event.Event.GetTableID()]
if previous != nil {
if event.Event.CommitTs < previous.CommitTs {
log.Panic("commitTs is not monotonically increasing",
zap.String("namespace", w.changeFeedID.Namespace),
zap.String("changefeed", w.changeFeedID.ID),
zap.Any("previous", previous),
zap.Any("event", event.Event))
}
}
previousMap[event.Event.GetTableID()] = event.Event
}
for _, message := range future.Messages {
previousMessage := previousMessageMap[message.TableID]
if previousMessage != nil {
if message.Ts < previousMessage.Ts {
for _, event := range future.Events {
log.Warn("Ts not monotonically increasing",
zap.String("namespace", w.changeFeedID.Namespace),
zap.String("changefeed", w.changeFeedID.ID),
zap.Int32("partition", future.Key.Partition),
zap.Int64("tableID", event.Event.GetTableID()),
zap.Uint64("commitTs", event.Event.CommitTs),
zap.Int("length", len(future.Events)))
}

log.Panic("Ts is not monotonically increasing",
zap.String("namespace", w.changeFeedID.Namespace),
zap.String("changefeed", w.changeFeedID.ID),
zap.Int32("partition", future.Key.Partition),
zap.Int64("tableID", previousMessage.TableID),
zap.Uint64("previousTs", previousMessage.Ts),
zap.Int64("currentTableID", message.TableID),
zap.Uint64("currentTs", message.Ts),
zap.Int("length", len(future.Messages)))
}
}
previousMessageMap[message.TableID] = message
start := time.Now()
if err = w.statistics.RecordBatchExecution(func() (int, int64, error) {
message.SetPartitionKey(future.Key.PartitionKey)
Expand Down
6 changes: 4 additions & 2 deletions cmd/kafka-consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,12 @@ func (c *consumer) Consume(ctx context.Context) {

topicPartition, err := c.client.CommitMessage(msg)
if err != nil {
log.Error("commit message failed, just continue", zap.Error(err))
log.Error("commit message failed, just continue",
zap.String("topic", *msg.TopicPartition.Topic), zap.Int32("partition", msg.TopicPartition.Partition),
zap.Any("offset", msg.TopicPartition.Offset), zap.Error(err))
continue
}
log.Info("commit message success",
log.Debug("commit message success",
zap.String("topic", topicPartition[0].String()), zap.Int32("partition", topicPartition[0].Partition),
zap.Any("offset", topicPartition[0].Offset))
}
Expand Down
17 changes: 0 additions & 17 deletions cmd/kafka-consumer/event_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
package main

import (
"sort"

"github.com/pingcap/tiflow/cdc/model"
)

Expand All @@ -35,18 +33,3 @@ func NewEventsGroup() *eventsGroup {
func (g *eventsGroup) Append(e *model.RowChangedEvent) {
g.events = append(g.events, e)
}

// Resolve will get events where CommitTs is less than resolveTs.
func (g *eventsGroup) Resolve(resolveTs uint64) []*model.RowChangedEvent {
sort.Slice(g.events, func(i, j int) bool {
return g.events[i].CommitTs < g.events[j].CommitTs
})

i := sort.Search(len(g.events), func(i int) bool {
return g.events[i].CommitTs > resolveTs
})

result := g.events[:i]
g.events = g.events[i:]
return result
}
179 changes: 95 additions & 84 deletions cmd/kafka-consumer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ type partitionProgress struct {

eventGroups map[int64]*eventsGroup
decoder codec.RowEventDecoder

previousMap map[model.TableID]previous
}

type previous struct {
offset kafka.Offset
commitTs uint64
}

type writer struct {
Expand Down Expand Up @@ -132,6 +139,7 @@ func newWriter(ctx context.Context, o *option) *writer {
}
w.progresses[i] = &partitionProgress{
eventGroups: make(map[int64]*eventsGroup),
previousMap: make(map[model.TableID]previous),
decoder: decoder,
}
}
Expand Down Expand Up @@ -249,7 +257,7 @@ func (w *writer) Write(ctx context.Context, messageType model.MessageType) bool
w.popDDL()
}

if messageType == model.MessageTypeResolved {
if messageType == model.MessageTypeResolved || messageType == model.MessageTypeRow {
w.forEachPartition(func(sink *partitionProgress) {
syncFlushRowChangedEvents(ctx, sink, watermark)
})
Expand All @@ -276,15 +284,15 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool

progress := w.progresses[partition]
decoder := progress.decoder
eventGroup := progress.eventGroups
// eventGroup := progress.eventGroups
if err := decoder.AddKeyValue(key, value); err != nil {
log.Panic("add key value to the decoder failed",
zap.Int32("partition", partition), zap.Any("offset", message.TopicPartition.Offset),
zap.Error(err))
}
var (
counter int
needFlush bool
counter int
//needFlush bool
messageType model.MessageType
)
for {
Expand Down Expand Up @@ -322,32 +330,38 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool
zap.Error(err))
}

if simple, ok := decoder.(*simple.Decoder); ok {
cachedEvents := simple.GetCachedEvents()
for _, row := range cachedEvents {
row.TableInfo.TableName.TableID = row.PhysicalTableID
w.checkPartition(row, partition, message)
if w.checkOldMessage(progress, row.CommitTs, row, partition, message) {
continue
}
group, ok := eventGroup[row.PhysicalTableID]
if !ok {
group = NewEventsGroup()
eventGroup[row.PhysicalTableID] = group
}
group.Append(row)
}
}
// if simple, ok := decoder.(*simple.Decoder); ok {
// cachedEvents := simple.GetCachedEvents()
// for _, row := range cachedEvents {
// row.TableInfo.TableName.TableID = row.PhysicalTableID
// w.checkPartition(row, partition, message)
// if w.checkOldMessage(progress, row.CommitTs, row, partition, message) {
// continue
// }
// group, ok := eventGroup[row.PhysicalTableID]
// if !ok {
// group = NewEventsGroup()
// eventGroup[row.PhysicalTableID] = group
// }
// group.Append(row)
// }
// }

// the Query maybe empty if using simple protocol, it's comes from `bootstrap` event.
if partition == 0 && ddl.Query != "" {
w.appendDDL(ddl)
needFlush = true
log.Info("DDL message received",
zap.Int32("partition", partition),
zap.Any("offset", message.TopicPartition.Offset),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("DDL", ddl.Query))
// DDL can be executed, do it first.
if err = w.ddlSink.WriteDDLEvent(ctx, ddl); err != nil {
log.Panic("write DDL event failed", zap.Error(err),
zap.String("DDL", ddl.Query), zap.Uint64("commitTs", ddl.CommitTs))
}

//w.appendDDL(ddl)
//needFlush = true
//log.Info("DDL message received",
// zap.Int32("partition", partition),
// zap.Any("offset", message.TopicPartition.Offset),
// zap.Uint64("commitTs", ddl.CommitTs),
// zap.String("DDL", ddl.Query))
}
case model.MessageTypeRow:
row, err := decoder.NextRowChangedEvent()
Expand All @@ -373,64 +387,61 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool

w.checkPartition(row, partition, message)

if w.checkOldMessage(progress, row.CommitTs, row, partition, message) {
continue
}

group, ok := eventGroup[tableID]
if !ok {
group = NewEventsGroup()
eventGroup[tableID] = group
//if w.checkOldMessage(progress, row.CommitTs, row, partition, message) {
// continue
//}

prev, ok := progress.previousMap[tableID]
if ok {
if prev.commitTs > row.CommitTs && prev.offset < message.TopicPartition.Offset {
watermark := atomic.LoadUint64(&progress.watermark)
log.Panic("row changed event commitTs fallback",
zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()),
zap.Int64("tableID", tableID), zap.Int32("partition", partition), zap.Uint64("watermark", watermark),
zap.Uint64("previous", prev.commitTs), zap.Uint64("commitTs", row.CommitTs),
zap.Any("previousOffset", prev.offset), zap.Any("offset", message.TopicPartition.Offset))
//if row.CommitTs < atomic.LoadUint64(&progress.watermark) {
// log.Panic("row changed event commitTs fallback",
// zap.Int64("tableID", tableID), zap.Int32("partition", partition), zap.Uint64("watermark", watermark),
// zap.Uint64("previous", prev.commitTs), zap.Uint64("commitTs", row.CommitTs),
// zap.Any("previousOffset", prev.offset), zap.Any("offset", message.TopicPartition.Offset))
//}
//log.Warn("row changed event commitTs fallback, ignore it",
// zap.Int64("tableID", tableID), zap.Int32("partition", partition), zap.Uint64("watermark", watermark),
// zap.Uint64("previous", prev.commitTs), zap.Uint64("commitTs", row.CommitTs),
// zap.Any("previousOffset", prev.offset), zap.Any("offset", message.TopicPartition.Offset))
}
}
group.Append(row)
log.Debug("DML event received",
zap.Int32("partition", partition),
zap.Any("offset", message.TopicPartition.Offset),
zap.Uint64("commitTs", row.CommitTs),
zap.Int64("physicalTableID", row.PhysicalTableID),
zap.Int64("tableID", tableID),
zap.String("schema", row.TableInfo.GetSchemaName()),
zap.String("table", row.TableInfo.GetTableName()))
case model.MessageTypeResolved:
ts, err := decoder.NextResolvedEvent()
if err != nil {
log.Panic("decode message value failed",
zap.Int32("partition", partition), zap.Any("offset", message.TopicPartition.Offset),
zap.ByteString("value", value),
zap.Error(err))
progress.previousMap[tableID] = previous{
offset: message.TopicPartition.Offset,
commitTs: row.CommitTs,
}

log.Debug("watermark event received",
// group, ok := eventGroup[tableID]
// if !ok {
// group = NewEventsGroup()
// eventGroup[tableID] = group
// }
// group.Append(row)
log.Info("DML event received",
zap.String("schema", row.TableInfo.GetSchemaName()),
zap.String("table", row.TableInfo.GetTableName()),
zap.Int32("partition", partition),
zap.Any("offset", message.TopicPartition.Offset),
zap.Uint64("watermark", ts))

if w.checkOldMessage(progress, ts, nil, partition, message) {
continue
}
zap.Uint64("commitTs", row.CommitTs), zap.Uint64("es", row.CommitTs>>18))

for tableID, group := range eventGroup {
events := group.Resolve(ts)
if len(events) == 0 {
continue
}
tableSink, ok := progress.tableSinkMap.Load(tableID)
if !ok {
tableSink = w.sinkFactory.CreateTableSinkForConsumer(
model.DefaultChangeFeedID("kafka-consumer"),
spanz.TableIDToComparableSpan(tableID),
events[0].CommitTs,
)
progress.tableSinkMap.Store(tableID, tableSink)
}
tableSink.(tablesink.TableSink).AppendRowChangedEvents(events...)
log.Debug("append row changed events to table sink",
zap.Uint64("resolvedTs", ts), zap.Int64("tableID", tableID), zap.Int("count", len(events)),
zap.Int32("partition", partition), zap.Any("offset", message.TopicPartition.Offset))
tableSink, ok := progress.tableSinkMap.Load(tableID)
if !ok {
tableSink = w.sinkFactory.CreateTableSinkForConsumer(
model.DefaultChangeFeedID("kafka-consumer"),
spanz.TableIDToComparableSpan(tableID),
row.CommitTs,
)
progress.tableSinkMap.Store(tableID, tableSink)
}
atomic.StoreUint64(&progress.watermark, ts)
progress.watermarkOffset = message.TopicPartition.Offset
needFlush = true
tableSink.(tablesink.TableSink).AppendRowChangedEvents(row)
syncFlushRowChangedEvents(ctx, progress, row.CommitTs+1)
case model.MessageTypeResolved:
default:
log.Panic("unknown message type", zap.Any("messageType", messageType),
zap.Int32("partition", partition), zap.Any("offset", message.TopicPartition.Offset))
Expand All @@ -442,12 +453,12 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool
zap.Int("max-batch-size", w.option.maxBatchSize), zap.Int("actual-batch-size", counter),
zap.Int32("partition", partition), zap.Any("offset", message.TopicPartition.Offset))
}

if !needFlush {
return false
}
// flush when received DDL event or resolvedTs
return w.Write(ctx, messageType)
return true
//if !needFlush {
// return false
//}
//// flush when received DDL event or resolvedTs
//return w.Write(ctx, messageType)
}

func (w *writer) checkPartition(row *model.RowChangedEvent, partition int32, message *kafka.Message) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/codec/canal/canal_json_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (c *JSONMessage) getTable() *string {

// for JSONMessage, we lost the commitTs.
func (c *JSONMessage) getCommitTs() uint64 {
return 0
return uint64(c.ExecutionTime) << 18
}

func (c *JSONMessage) getQuery() string {
Expand Down
1 change: 1 addition & 0 deletions pkg/sink/codec/canal/canal_json_row_event_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ func (c *JSONRowEventEncoder) AppendRowChangedEvent(
Key: nil,
Value: value,
Ts: e.CommitTs,
TableID: e.TableInfo.ID,
Schema: e.TableInfo.GetSchemaNamePtr(),
Table: e.TableInfo.GetTableNamePtr(),
Type: model.MessageTypeRow,
Expand Down
1 change: 1 addition & 0 deletions pkg/sink/codec/common/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const MaxRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1
type Message struct {
Key []byte
Value []byte
TableID int64
Ts uint64 // reserved for possible output sorting
Schema *string // schema
Table *string // table
Expand Down
Loading
Loading