Skip to content

Commit

Permalink
Merge branch 'release-3.0' into release-3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
july2993 committed Jan 3, 2020
2 parents de25dbb + f928d44 commit 88c47fb
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 7 deletions.
8 changes: 8 additions & 0 deletions arbiter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,16 @@ func (s *Server) loadStatus() (int, error) {
func syncBinlogs(ctx context.Context, source <-chan *reader.Message, ld loader.Loader) (err error) {
dest := ld.Input()
defer ld.Close()
var receivedTs int64
for msg := range source {
log.Debug("recv msg from kafka reader", zap.Int64("ts", msg.Binlog.CommitTs), zap.Int64("offset", msg.Offset))

if msg.Binlog.CommitTs <= receivedTs {
log.Info("skip repeated binlog", zap.Int64("ts", msg.Binlog.CommitTs), zap.Int64("offset", msg.Offset))
continue
}
receivedTs = msg.Binlog.CommitTs

txn, err := loader.SlaveBinlogToTxn(msg.Binlog)
if err != nil {
log.Error("transfer binlog failed, program will stop handling data from loader", zap.Error(err))
Expand Down
20 changes: 14 additions & 6 deletions arbiter/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,23 +380,31 @@ type syncBinlogsSuite struct{}

var _ = Suite(&syncBinlogsSuite{})

func (s *syncBinlogsSuite) createMsg(schema, table, sql string) *reader.Message {
func (s *syncBinlogsSuite) createMsg(schema, table, sql string, commitTs int64) *reader.Message {
return &reader.Message{
Binlog: &pb.Binlog{
DdlData: &pb.DDLData{
SchemaName: &schema,
TableName: &table,
DdlQuery: []byte(sql),
},
CommitTs: commitTs,
},
}
}

func (s *syncBinlogsSuite) TestShouldSendBinlogToLoader(c *C) {
source := make(chan *reader.Message, 1)
msgs := []*reader.Message{
s.createMsg("test42", "users", "alter table users add column gender smallint"),
s.createMsg("test42", "operations", "alter table operations drop column seq"),
s.createMsg("test42", "users", "alter table users add column gender smallint", 1),
s.createMsg("test42", "users", "alter table users add column gender smallint", 1),
s.createMsg("test42", "operations", "alter table operations drop column seq", 2),
s.createMsg("test42", "users", "alter table users add column gender smallint", 1),
s.createMsg("test42", "operations", "alter table operations drop column seq", 2),
}
expectMsgs := []*reader.Message{
s.createMsg("test42", "users", "alter table users add column gender smallint", 1),
s.createMsg("test42", "operations", "alter table operations drop column seq", 2),
}
dest := make(chan *loader.Txn, len(msgs))
go func() {
Expand All @@ -410,8 +418,8 @@ func (s *syncBinlogsSuite) TestShouldSendBinlogToLoader(c *C) {
err := syncBinlogs(context.Background(), source, &ld)
c.Assert(err, IsNil)

c.Assert(len(dest), Equals, 2)
for _, m := range msgs {
c.Assert(len(dest), Equals, len(expectMsgs))
for _, m := range expectMsgs {
txn := <-dest
c.Assert(txn.Metadata.(*reader.Message), DeepEquals, m)
}
Expand All @@ -426,7 +434,7 @@ func (s *syncBinlogsSuite) TestShouldQuitWhenSomeErrorOccurs(c *C) {
// input is set small to trigger blocking easily
input: make(chan *loader.Txn, 1),
}
msg := s.createMsg("test42", "users", "alter table users add column gender smallint")
msg := s.createMsg("test42", "users", "alter table users add column gender smallint", 1)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// start a routine keep sending msgs to kafka reader
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/pingcap/errors v0.11.4
github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd
github.com/pingcap/parser v0.0.0-20191118062434-7c5018645942
github.com/pingcap/parser v0.0.0-20191224043251-93f4d5ec2623
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0
github.com/pingcap/tidb v1.1.0-beta.0.20191120070053-5a7ecfeb94fd
github.com/pingcap/tidb-tools v3.0.6-0.20191125061035-b087739b71f1+incompatible
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v1
github.com/pingcap/parser v0.0.0-20190506092653-e336082eb825/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/parser v0.0.0-20191118062434-7c5018645942 h1:JAPbnAxPryeAO50UO89/9MDYJK38Ts7mykTDqgUS14k=
github.com/pingcap/parser v0.0.0-20191118062434-7c5018645942/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/parser v0.0.0-20191224043251-93f4d5ec2623 h1:/BJjVyJlNKWMMrgPsbzk5Y9VPJWwHKYttj3oWxnFQ9U=
github.com/pingcap/parser v0.0.0-20191224043251-93f4d5ec2623/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/pd v1.1.0-beta.0.20190912093418-dc03c839debd/go.mod h1:I7TEby5BHTYIxgHszfsOJSBsk8b2Qt8QrSIgdv5n5QQ=
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0 h1:GIEq+wZfrl2bcJxpuSrEH4H7/nlf5YdmpS+dU9lNIt8=
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0/go.mod h1:G/6rJpnYwM0LKMec2rI82/5Kg6GaZMvlfB+e6/tvYmI=
Expand Down
3 changes: 3 additions & 0 deletions pump/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,9 @@ func (a *Append) feedPreWriteValue(cbinlog *pb.Binlog) error {

cbinlog.StartTs = pbinlog.StartTs
cbinlog.PrewriteValue = pbinlog.PrewriteValue
cbinlog.DdlQuery = pbinlog.DdlQuery
cbinlog.DdlJobId = pbinlog.DdlJobId
cbinlog.DdlSchemaState = pbinlog.DdlSchemaState

return nil
}
Expand Down
38 changes: 38 additions & 0 deletions pump/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,44 @@ func (as *AppendSuit) TestWriteCBinlog(c *check.C) {
c.Assert(cBinlog.Tp, check.Equals, pb.BinlogType_Commit)
}

func (as *AppendSuit) TestFeedPreWriteValue(c *check.C) {
a := newAppend(c)
defer cleanAppend(a)

expectPBinlog := &pb.Binlog{
Tp: pb.BinlogType_Prewrite,
StartTs: 42,
PrewriteKey: []byte("PrewriteKey"),
PrewriteValue: []byte("PrewriteValue"),
DdlQuery: []byte("create table t(a int);"),
DdlJobId: 6,
DdlSchemaState: 5,
}

req := a.writeBinlog(expectPBinlog)
c.Assert(req.err, check.IsNil)

cBinlog := &pb.Binlog{
Tp: pb.BinlogType_Commit,
StartTs: 42,
CommitTs: 50,
}
req = a.writeBinlog(cBinlog)
c.Assert(req.err, check.IsNil)

err := a.feedPreWriteValue(cBinlog)
c.Assert(err, check.IsNil)

c.Assert(cBinlog.StartTs, check.Equals, expectPBinlog.StartTs)
c.Assert(cBinlog.CommitTs, check.Equals, int64(50))
c.Assert(cBinlog.Tp, check.Equals, pb.BinlogType_Commit)
c.Assert(cBinlog.PrewriteKey, check.IsNil)
c.Assert(cBinlog.PrewriteValue, check.BytesEquals, expectPBinlog.PrewriteValue)
c.Assert(cBinlog.DdlQuery, check.BytesEquals, expectPBinlog.DdlQuery)
c.Assert(cBinlog.DdlJobId, check.Equals, expectPBinlog.DdlJobId)
c.Assert(cBinlog.DdlSchemaState, check.Equals, expectPBinlog.DdlSchemaState)
}

type OpenDBSuit struct {
dir string
}
Expand Down

0 comments on commit 88c47fb

Please sign in to comment.