diff --git a/arbiter/server.go b/arbiter/server.go index 7035f1406..fb8d1b294 100644 --- a/arbiter/server.go +++ b/arbiter/server.go @@ -287,8 +287,16 @@ func (s *Server) loadStatus() (int, error) { func syncBinlogs(ctx context.Context, source <-chan *reader.Message, ld loader.Loader) (err error) { dest := ld.Input() defer ld.Close() + var receivedTs int64 for msg := range source { log.Debug("recv msg from kafka reader", zap.Int64("ts", msg.Binlog.CommitTs), zap.Int64("offset", msg.Offset)) + + if msg.Binlog.CommitTs <= receivedTs { + log.Info("skip repeated binlog", zap.Int64("ts", msg.Binlog.CommitTs), zap.Int64("offset", msg.Offset)) + continue + } + receivedTs = msg.Binlog.CommitTs + txn, err := loader.SlaveBinlogToTxn(msg.Binlog) if err != nil { log.Error("transfer binlog failed, program will stop handling data from loader", zap.Error(err)) diff --git a/arbiter/server_test.go b/arbiter/server_test.go index 97cb0cfa4..8b68a5550 100644 --- a/arbiter/server_test.go +++ b/arbiter/server_test.go @@ -380,7 +380,7 @@ type syncBinlogsSuite struct{} var _ = Suite(&syncBinlogsSuite{}) -func (s *syncBinlogsSuite) createMsg(schema, table, sql string) *reader.Message { +func (s *syncBinlogsSuite) createMsg(schema, table, sql string, commitTs int64) *reader.Message { return &reader.Message{ Binlog: &pb.Binlog{ DdlData: &pb.DDLData{ @@ -388,6 +388,7 @@ func (s *syncBinlogsSuite) createMsg(schema, table, sql string) *reader.Message TableName: &table, DdlQuery: []byte(sql), }, + CommitTs: commitTs, }, } } @@ -395,8 +396,15 @@ func (s *syncBinlogsSuite) createMsg(schema, table, sql string) *reader.Message func (s *syncBinlogsSuite) TestShouldSendBinlogToLoader(c *C) { source := make(chan *reader.Message, 1) msgs := []*reader.Message{ - s.createMsg("test42", "users", "alter table users add column gender smallint"), - s.createMsg("test42", "operations", "alter table operations drop column seq"), + s.createMsg("test42", "users", "alter table users add column gender smallint", 1), + s.createMsg("test42", "users", "alter table users add column gender smallint", 1), + s.createMsg("test42", "operations", "alter table operations drop column seq", 2), + s.createMsg("test42", "users", "alter table users add column gender smallint", 1), + s.createMsg("test42", "operations", "alter table operations drop column seq", 2), + } + expectMsgs := []*reader.Message{ + s.createMsg("test42", "users", "alter table users add column gender smallint", 1), + s.createMsg("test42", "operations", "alter table operations drop column seq", 2), } dest := make(chan *loader.Txn, len(msgs)) go func() { @@ -410,8 +418,8 @@ func (s *syncBinlogsSuite) TestShouldSendBinlogToLoader(c *C) { err := syncBinlogs(context.Background(), source, &ld) c.Assert(err, IsNil) - c.Assert(len(dest), Equals, 2) - for _, m := range msgs { + c.Assert(len(dest), Equals, len(expectMsgs)) + for _, m := range expectMsgs { txn := <-dest c.Assert(txn.Metadata.(*reader.Message), DeepEquals, m) } @@ -426,7 +434,7 @@ func (s *syncBinlogsSuite) TestShouldQuitWhenSomeErrorOccurs(c *C) { // input is set small to trigger blocking easily input: make(chan *loader.Txn, 1), } - msg := s.createMsg("test42", "users", "alter table users add column gender smallint") + msg := s.createMsg("test42", "users", "alter table users add column gender smallint", 1) ctx, cancel := context.WithCancel(context.Background()) defer cancel() // start a routine keep sending msgs to kafka reader diff --git a/go.mod b/go.mod index 0f8b0978f..4eff1ca4b 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/pingcap/errors v0.11.4 github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0 github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd - github.com/pingcap/parser v0.0.0-20191118062434-7c5018645942 + github.com/pingcap/parser v0.0.0-20191224043251-93f4d5ec2623 github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0 github.com/pingcap/tidb v1.1.0-beta.0.20191120070053-5a7ecfeb94fd github.com/pingcap/tidb-tools v3.0.6-0.20191125061035-b087739b71f1+incompatible diff --git a/go.sum b/go.sum index 683c1d633..14efb01d8 100644 --- a/go.sum +++ b/go.sum @@ -211,6 +211,8 @@ github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v1 github.com/pingcap/parser v0.0.0-20190506092653-e336082eb825/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA= github.com/pingcap/parser v0.0.0-20191118062434-7c5018645942 h1:JAPbnAxPryeAO50UO89/9MDYJK38Ts7mykTDqgUS14k= github.com/pingcap/parser v0.0.0-20191118062434-7c5018645942/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA= +github.com/pingcap/parser v0.0.0-20191224043251-93f4d5ec2623 h1:/BJjVyJlNKWMMrgPsbzk5Y9VPJWwHKYttj3oWxnFQ9U= +github.com/pingcap/parser v0.0.0-20191224043251-93f4d5ec2623/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA= github.com/pingcap/pd v1.1.0-beta.0.20190912093418-dc03c839debd/go.mod h1:I7TEby5BHTYIxgHszfsOJSBsk8b2Qt8QrSIgdv5n5QQ= github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0 h1:GIEq+wZfrl2bcJxpuSrEH4H7/nlf5YdmpS+dU9lNIt8= github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0/go.mod h1:G/6rJpnYwM0LKMec2rI82/5Kg6GaZMvlfB+e6/tvYmI= diff --git a/pump/storage/storage.go b/pump/storage/storage.go index c06c5f10c..c4549ba7e 100644 --- a/pump/storage/storage.go +++ b/pump/storage/storage.go @@ -1085,6 +1085,9 @@ func (a *Append) feedPreWriteValue(cbinlog *pb.Binlog) error { cbinlog.StartTs = pbinlog.StartTs cbinlog.PrewriteValue = pbinlog.PrewriteValue + cbinlog.DdlQuery = pbinlog.DdlQuery + cbinlog.DdlJobId = pbinlog.DdlJobId + cbinlog.DdlSchemaState = pbinlog.DdlSchemaState return nil } diff --git a/pump/storage/storage_test.go b/pump/storage/storage_test.go index f5d6140c1..9bbc96a67 100644 --- a/pump/storage/storage_test.go +++ b/pump/storage/storage_test.go @@ -450,6 +450,44 @@ func (as *AppendSuit) TestWriteCBinlog(c *check.C) { c.Assert(cBinlog.Tp, check.Equals, pb.BinlogType_Commit) } +func (as *AppendSuit) TestFeedPreWriteValue(c *check.C) { + a := newAppend(c) + defer cleanAppend(a) + + expectPBinlog := &pb.Binlog{ + Tp: pb.BinlogType_Prewrite, + StartTs: 42, + PrewriteKey: []byte("PrewriteKey"), + PrewriteValue: []byte("PrewriteValue"), + DdlQuery: []byte("create table t(a int);"), + DdlJobId: 6, + DdlSchemaState: 5, + } + + req := a.writeBinlog(expectPBinlog) + c.Assert(req.err, check.IsNil) + + cBinlog := &pb.Binlog{ + Tp: pb.BinlogType_Commit, + StartTs: 42, + CommitTs: 50, + } + req = a.writeBinlog(cBinlog) + c.Assert(req.err, check.IsNil) + + err := a.feedPreWriteValue(cBinlog) + c.Assert(err, check.IsNil) + + c.Assert(cBinlog.StartTs, check.Equals, expectPBinlog.StartTs) + c.Assert(cBinlog.CommitTs, check.Equals, int64(50)) + c.Assert(cBinlog.Tp, check.Equals, pb.BinlogType_Commit) + c.Assert(cBinlog.PrewriteKey, check.IsNil) + c.Assert(cBinlog.PrewriteValue, check.BytesEquals, expectPBinlog.PrewriteValue) + c.Assert(cBinlog.DdlQuery, check.BytesEquals, expectPBinlog.DdlQuery) + c.Assert(cBinlog.DdlJobId, check.Equals, expectPBinlog.DdlJobId) + c.Assert(cBinlog.DdlSchemaState, check.Equals, expectPBinlog.DdlSchemaState) +} + type OpenDBSuit struct { dir string }