diff --git a/cdc/cdc/sink/mq_test.go b/cdc/cdc/sink/mq_test.go index 39699790..c7c4ac08 100644 --- a/cdc/cdc/sink/mq_test.go +++ b/cdc/cdc/sink/mq_test.go @@ -60,6 +60,17 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) { opts := map[string]string{} errCh := make(chan error, 1) + newSaramaConfigImplBak := kafkap.NewSaramaConfigImpl + kafkap.NewSaramaConfigImpl = func(ctx context.Context, config *kafkap.Config) (*sarama.Config, error) { + // Idempotent requires Kafka version >= 0.11.0.0 + config.Idempotent = false + cfg, err := newSaramaConfigImplBak(ctx, config) + c.Assert(err, check.IsNil) + return cfg, err + } + defer func() { + kafkap.NewSaramaConfigImpl = newSaramaConfigImplBak + }() kafkap.NewAdminClientImpl = kafka.NewMockAdminClient defer func() { kafkap.NewAdminClientImpl = kafka.NewSaramaAdminClient @@ -146,6 +157,8 @@ func (s mqSinkSuite) TestFlushChangedEvents(c *check.C) { newSaramaConfigImplBak := kafkap.NewSaramaConfigImpl kafkap.NewSaramaConfigImpl = func(ctx context.Context, config *kafkap.Config) (*sarama.Config, error) { + // Idempotent requires Kafka version >= 0.11.0.0 + config.Idempotent = false cfg, err := newSaramaConfigImplBak(ctx, config) c.Assert(err, check.IsNil) cfg.Producer.Flush.MaxMessages = 1 diff --git a/cdc/cdc/sink/producer/kafka/config.go b/cdc/cdc/sink/producer/kafka/config.go index 3293f476..cf8152ef 100644 --- a/cdc/cdc/sink/producer/kafka/config.go +++ b/cdc/cdc/sink/producer/kafka/config.go @@ -50,6 +50,8 @@ type Config struct { SaslScram *security.SaslScram // control whether to create topic AutoCreate bool + // Whether to enable idempotent producer + Idempotent bool } // NewConfig returns a default Kafka configuration @@ -63,6 +65,7 @@ func NewConfig() *Config { Credential: &security.Credential{}, SaslScram: &security.SaslScram{}, AutoCreate: true, + Idempotent: true, } } @@ -231,6 +234,14 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { // and https://github.com/tikv/migration/cdc/issues/3352. config.Metadata.Timeout = 1 * time.Minute + // See: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#enable-idempotence + config.Producer.Idempotent = c.Idempotent + if c.Idempotent { + config.Net.MaxOpenRequests = 1 + } else { + log.Warn("The idempotent producer is disabled, which may cause data reordering") + } + config.Producer.Partitioner = sarama.NewManualPartitioner config.Producer.MaxMessageBytes = c.MaxMessageBytes config.Producer.Return.Successes = true diff --git a/cdc/cdc/sink/producer/kafka/kafka_test.go b/cdc/cdc/sink/producer/kafka/kafka_test.go index ec5c7d69..3b934731 100644 --- a/cdc/cdc/sink/producer/kafka/kafka_test.go +++ b/cdc/cdc/sink/producer/kafka/kafka_test.go @@ -93,6 +93,7 @@ func (s *kafkaSuite) TestNewSaramaProducer(c *check.C) { config.Version = "0.9.0.0" config.PartitionNum = int32(2) config.AutoCreate = false + config.Idempotent = false config.BrokerEndpoints = strings.Split(leader.Addr(), ",") newSaramaConfigImplBak := NewSaramaConfigImpl @@ -339,6 +340,7 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { config.Version = "0.9.0.0" config.PartitionNum = int32(2) config.AutoCreate = false + config.Idempotent = false config.BrokerEndpoints = strings.Split(leader.Addr(), ",") NewAdminClientImpl = kafka.NewMockAdminClient @@ -421,6 +423,7 @@ func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) { config.Version = "0.9.0.0" config.PartitionNum = int32(2) config.AutoCreate = false + config.Idempotent = false config.BrokerEndpoints = strings.Split(leader.Addr(), ",") NewAdminClientImpl = kafka.NewMockAdminClient