Skip to content

Commit

Permalink
codec: clean some code in codec (flowbehappy#255)
Browse files Browse the repository at this point in the history
  • Loading branch information
hongyunyan committed Sep 4, 2024
1 parent 28d80e9 commit 749abcc
Show file tree
Hide file tree
Showing 20 changed files with 208 additions and 427 deletions.
9 changes: 2 additions & 7 deletions downstreamadapter/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/flowbehappy/tigate/downstreamadapter/worker/dmlproducer"
"github.com/flowbehappy/tigate/pkg/common"
"github.com/flowbehappy/tigate/pkg/sink/codec"
"github.com/flowbehappy/tigate/pkg/sink/codec/builder"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -108,10 +107,6 @@ func NewKafkaSink(changefeedID model.ChangeFeedID, sinkURI *url.URL, replicaConf
if err != nil {
return nil, errors.Trace(err)
}
encoderBuilder, err := builder.NewRowEventEncoderBuilder(ctx, encoderConfig)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewProducer, err)
}

failpointCh := make(chan error, 1)
asyncProducer, err := factory.AsyncProducer(ctx, failpointCh)
Expand All @@ -121,7 +116,7 @@ func NewKafkaSink(changefeedID model.ChangeFeedID, sinkURI *url.URL, replicaConf

metricsCollector := factory.MetricsCollector(utils.RoleProcessor, adminClient)
dmlProducer := dmlproducer.NewKafkaDMLProducer(ctx, changefeedID, asyncProducer, metricsCollector)
encoderGroup := codec.NewEncoderGroup(replicaConfig.Sink, encoderBuilder, changefeedID)
encoderGroup := codec.NewEncoderGroup(ctx, replicaConfig.Sink, encoderConfig, changefeedID)

statistics := timetrics.NewStatistics(changefeedID, sink.RowSink)
worker := worker.NewKafkaWorker(changefeedID, protocol, dmlProducer, encoderGroup, statistics)
Expand Down Expand Up @@ -201,7 +196,7 @@ func (s *KafkaSink) AddDMLEvent(event *common.TEvent, tableProgress *types.Table
return
}

s.worker.GetEventChan() <- common.MQRowEvent{
s.worker.GetEventChan() <- &common.MQRowEvent{
Key: model.TopicPartitionKey{
Topic: topic,
Partition: index,
Expand Down
59 changes: 26 additions & 33 deletions downstreamadapter/worker/kafka_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package worker

import (
"context"
"sync"
"time"

"github.com/flowbehappy/tigate/pkg/common"
Expand All @@ -29,7 +30,6 @@ import (
"github.com/pingcap/tiflow/pkg/config"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

const (
Expand All @@ -48,7 +48,7 @@ type KafkaWorker struct {
protocol config.Protocol
// msgChan caches the messages to be sent.
// It is an unbounded channel.
msgChan *chann.DrainableChann[common.MQRowEvent]
msgChan *chann.DrainableChann[*common.MQRowEvent]
// ticker used to force flush the batched messages when the interval is reached.
ticker *time.Ticker

Expand All @@ -65,6 +65,9 @@ type KafkaWorker struct {
metricMQWorkerBatchDuration prometheus.Observer
// statistics is used to record DML metrics.
statistics *metrics.Statistics

cancel context.CancelFunc
wg sync.WaitGroup
}

// newWorker creates a new flush worker.
Expand All @@ -75,52 +78,42 @@ func NewKafkaWorker(
encoderGroup codec.EncoderGroup,
statistics *metrics.Statistics,
) *KafkaWorker {
ctx, cancel := context.WithCancel(context.Background())
w := &KafkaWorker{
changeFeedID: id,
protocol: protocol,
msgChan: chann.NewAutoDrainChann[common.MQRowEvent](),
msgChan: chann.NewAutoDrainChann[*common.MQRowEvent](),
ticker: time.NewTicker(batchInterval),
encoderGroup: encoderGroup,
producer: producer,
metricMQWorkerSendMessageDuration: mq.WorkerSendMessageDuration.WithLabelValues(id.Namespace, id.ID),
metricMQWorkerBatchSize: mq.WorkerBatchSize.WithLabelValues(id.Namespace, id.ID),
metricMQWorkerBatchDuration: mq.WorkerBatchDuration.WithLabelValues(id.Namespace, id.ID),
statistics: statistics,
cancel: cancel,
}

return w
}

func (w *KafkaWorker) GetEventChan() chan<- common.MQRowEvent {
return w.msgChan.In()
}

// run starts a loop that keeps collecting, sorting and sending messages
// until it encounters an error or is interrupted.
func (w *KafkaWorker) run(ctx context.Context) (retErr error) {
defer func() {
w.ticker.Stop()
log.Info("MQ sink worker exited", zap.Error(retErr),
zap.String("namespace", w.changeFeedID.Namespace),
zap.String("changefeed", w.changeFeedID.ID),
zap.String("protocol", w.protocol.String()),
)
}()

g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
w.wg.Add(3)
go func() error {
defer w.wg.Done()
return w.encoderGroup.Run(ctx)
})
g.Go(func() error {
}()
go func() error {
defer w.wg.Done()
if w.protocol.IsBatchEncode() {
return w.batchEncodeRun(ctx)
}
return w.nonBatchEncodeRun(ctx)
})
g.Go(func() error {
}()
go func() error {
defer w.wg.Done()
return w.sendMessages(ctx)
})
return g.Wait()
}()
return w
}

func (w *KafkaWorker) GetEventChan() chan<- *common.MQRowEvent {
return w.msgChan.In()
}

// nonBatchEncodeRun add events to the encoder group immediately.
Expand Down Expand Up @@ -156,7 +149,7 @@ func (w *KafkaWorker) batchEncodeRun(ctx context.Context) (retErr error) {
zap.String("protocol", w.protocol.String()),
)

msgsBuf := make([]common.MQRowEvent, batchSize)
msgsBuf := make([]*common.MQRowEvent, batchSize)
for {
start := time.Now()
msgCount, err := w.batch(ctx, msgsBuf, batchInterval)
Expand Down Expand Up @@ -184,7 +177,7 @@ func (w *KafkaWorker) batchEncodeRun(ctx context.Context) (retErr error) {
// batch collects a batch of messages from w.msgChan into buffer.
// It returns the number of messages collected.
// Note: It will block until at least one message is received.
func (w *KafkaWorker) batch(ctx context.Context, buffer []common.MQRowEvent, flushInterval time.Duration) (int, error) {
func (w *KafkaWorker) batch(ctx context.Context, buffer []*common.MQRowEvent, flushInterval time.Duration) (int, error) {
msgCount := 0
maxBatchSize := len(buffer)
// We need to receive at least one message or be interrupted,
Expand Down Expand Up @@ -233,7 +226,7 @@ func (w *KafkaWorker) batch(ctx context.Context, buffer []common.MQRowEvent, flu
}

// group groups messages by its key.
func (w *KafkaWorker) group(msgs []common.MQRowEvent) map[model.TopicPartitionKey][]*common.RowEvent {
func (w *KafkaWorker) group(msgs []*common.MQRowEvent) map[model.TopicPartitionKey][]*common.RowEvent {
groupedMsgs := make(map[model.TopicPartitionKey][]*common.RowEvent)
for _, msg := range msgs {
if _, ok := groupedMsgs[msg.Key]; !ok {
Expand Down
32 changes: 7 additions & 25 deletions pkg/sink/codec/avro/arvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"strings"

"github.com/flowbehappy/tigate/pkg/common"
"github.com/flowbehappy/tigate/pkg/sink/codec"
"github.com/flowbehappy/tigate/pkg/sink/codec/encoder"
"github.com/linkedin/goavro/v2"
"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand Down Expand Up @@ -978,6 +978,8 @@ func (a *BatchEncoder) columnToAvroData(
}
}

func (a *BatchEncoder) Clean() {}

const (
// avro does not send ddl and checkpoint message, the following 2 field is used to distinguish
// TiCDC DDL event and checkpoint event, only used for testing purpose, not for production
Expand Down Expand Up @@ -1008,10 +1010,8 @@ const (
valueSchemaSuffix = "-value"
)

// NewBatchEncoderBuilder creates an avro batchEncoderBuilder.
func NewBatchEncoderBuilder(
ctx context.Context, config *ticommon.Config,
) (codec.RowEventEncoderBuilder, error) {
// NewAvroEncoder return a avro encoder.
func NewAvroEncoder(ctx context.Context, config *ticommon.Config) (encoder.RowEventEncoder, error) {
var schemaM SchemaManager
var err error

Expand All @@ -1030,30 +1030,12 @@ func NewBatchEncoderBuilder(
default:
return nil, cerror.ErrAvroSchemaAPIError.GenWithStackByArgs(schemaRegistryType)
}

return &batchEncoderBuilder{
namespace: config.ChangefeedID.Namespace,
config: config,
schemaM: schemaM,
}, nil
}

// Build an AvroEventBatchEncoder.
func (b *batchEncoderBuilder) Build() codec.RowEventEncoder {
return NewAvroEncoder(b.namespace, b.schemaM, b.config)
}

// CleanMetrics is a no-op for AvroEventBatchEncoder.
func (b *batchEncoderBuilder) CleanMetrics() {}

// NewAvroEncoder return a avro encoder.
func NewAvroEncoder(namespace string, schemaM SchemaManager, config *ticommon.Config) codec.RowEventEncoder {
return &BatchEncoder{
namespace: namespace,
namespace: config.ChangefeedID.Namespace,
schemaM: schemaM,
result: make([]*ticommon.Message, 0, 1),
config: config,
}
}, nil
}

// // SetupEncoderAndSchemaRegistry4Testing start a local schema registry for testing.
Expand Down
10 changes: 6 additions & 4 deletions pkg/sink/codec/bootstraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/flowbehappy/tigate/pkg/common"
"github.com/flowbehappy/tigate/pkg/sink/codec/encoder"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
Expand All @@ -39,7 +40,7 @@ const (
type bootstrapWorker struct {
changefeedID model.ChangeFeedID
activeTables sync.Map
encoder RowEventEncoder
rowEventEncoder encoder.RowEventEncoder
sendBootstrapInterval time.Duration
sendBootstrapInMsgCount int32
sendBootstrapToAllPartition bool
Expand All @@ -52,7 +53,7 @@ type bootstrapWorker struct {
func newBootstrapWorker(
changefeedID model.ChangeFeedID,
outCh chan<- *future,
encoder RowEventEncoder,
rowEventEncoder encoder.RowEventEncoder,
sendBootstrapInterval int64,
sendBootstrapInMsgCount int32,
sendBootstrapToAllPartition bool,
Expand All @@ -66,7 +67,7 @@ func newBootstrapWorker(
return &bootstrapWorker{
changefeedID: changefeedID,
outCh: outCh,
encoder: encoder,
rowEventEncoder: rowEventEncoder,
activeTables: sync.Map{},
sendBootstrapInterval: time.Duration(sendBootstrapInterval) * time.Second,
sendBootstrapInMsgCount: sendBootstrapInMsgCount,
Expand All @@ -79,6 +80,7 @@ func (b *bootstrapWorker) run(ctx context.Context) error {
sendTicker := time.NewTicker(bootstrapWorkerTickerInterval)
gcTicker := time.NewTicker(bootstrapWorkerGCInterval)
defer func() {
b.rowEventEncoder.Clean()
gcTicker.Stop()
sendTicker.Stop()
}()
Expand Down Expand Up @@ -156,7 +158,7 @@ func (b *bootstrapWorker) generateEvents(
tableInfo *model.TableInfo,
) ([]*future, error) {
res := make([]*future, 0, totalPartition)
msg, err := b.encoder.EncodeDDLEvent(model.NewBootstrapDDLEvent(tableInfo))
msg, err := b.rowEventEncoder.EncodeDDLEvent(model.NewBootstrapDDLEvent(tableInfo))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
71 changes: 0 additions & 71 deletions pkg/sink/codec/builder/encoder_builder.go

This file was deleted.

Loading

0 comments on commit 749abcc

Please sign in to comment.