Skip to content

Commit

Permalink
Merge pull request #50 from Xsxdot/master
Browse files Browse the repository at this point in the history
重写了ClusterCanalConnector,增加client高可用
  • Loading branch information
withlin authored Mar 23, 2020
2 parents 96bf8b8 + 5df0df2 commit d552e74
Show file tree
Hide file tree
Showing 5 changed files with 366 additions and 140 deletions.
313 changes: 234 additions & 79 deletions client/cluster_canal_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ package client

import (
"errors"
"fmt"
"github.com/samuel/go-zookeeper/zk"
"log"
"sort"
"strings"
"time"

pb "github.com/CanalClient/canal-go/protocol"
Expand All @@ -16,127 +20,278 @@ type ClusterCanalConnector struct {
filter string

RetryTimes int32
currentSequence string
zkVersion int32
}

const (
path = "/canal-consumer"
runningFlag = byte(0)
notRunningFlag = byte(0)
)
func NewClusterCanalConnector(canalNode *CanalClusterNode, username string, password string, destination string,
soTimeOut int32, idleTimeOut int32) *ClusterCanalConnector {
soTimeOut int32, idleTimeOut int32) (*ClusterCanalConnector,error) {

err := checkRootPath(canalNode.zkClient)
if err != nil {
return nil, err
}

cc := &ClusterCanalConnector{
canalNode: canalNode, username: username, password: password,
destination: destination, soTimeOut: soTimeOut, idleTimeOut: idleTimeOut,
RetryTimes: 3,
currentSequence, err := createEphemeralSequence(canalNode.zkClient)
if err != nil {
return nil,err
}

return cc
cluster := &ClusterCanalConnector{
canalNode: canalNode,
username: username,
password: password,
destination: destination,
soTimeOut: soTimeOut,
idleTimeOut: idleTimeOut,
RetryTimes: 0,
currentSequence:currentSequence,
zkVersion:0,
}

return cluster, nil
}

// 重试失败后重新连接
func (cc *ClusterCanalConnector) reTryWithConn(name string, do func() error) error {
return cc.reTry(name, func() error {
if cc.conn == nil {
cc.Connect()
}
if cc.conn == nil {
return errors.New("canal connect fail")
}
if err := do(); err != nil {
cc.Connect()
return err
}
return nil
})
func (cc *ClusterCanalConnector) Connect() error {
log.Println("connecting canal server...")
err := cc.reconnect()
return err
}

func (cc *ClusterCanalConnector) reTry(name string, do func() error) (err error) {
for times := int32(0); times < cc.RetryTimes; times++ {
if err = do(); err != nil {
log.Printf("%s err:%v, reTry", name, err)
time.Sleep(time.Second * 5)
} else {
return nil
func (cc *ClusterCanalConnector) reconnect() error {
err := cc.doConnect()
if err != nil {
log.Println("connect canal-server failed ",err)
cc.RetryTimes ++
if cc.RetryTimes < 5 {
time.Sleep(5*time.Second)
return cc.reconnect()
}
return err
}
return
cc.RetryTimes =0

return nil
}

func (cc *ClusterCanalConnector) Connect() error {
log.Printf("Connect")

return cc.reTry("Connect", func() error {
var (
addr string
port int
err error
)
cc.DisConnection()
if addr, port, err = cc.canalNode.GetNode(); err != nil {
log.Printf("canalNode.GetNode addr=%s, port=%d, err=%v\n", addr, port, err)
return err
}
func (cc *ClusterCanalConnector) doConnect() error {

conn := NewSimpleCanalConnector(addr, port, cc.username, cc.password,
cc.destination, cc.soTimeOut, cc.idleTimeOut)
log.Println("connecting...")

if cc.filter != "" {
conn.Filter = cc.filter
}
//等待成为最小的节点,最小的节点去运行
err := cc.waitBecomeFirst()
if err != nil {
return fmt.Errorf("error wait become first zk node %s", err.Error())
}

if err = conn.Connect(); err != nil {
log.Printf("conn.Connect err:%v", err)
conn.DisConnection()
return err
}
cc.conn = conn
return nil
})
_, err = cc.canalNode.zkClient.Set(path+"/"+cc.currentSequence, []byte{runningFlag}, cc.zkVersion)
if err != nil {
return fmt.Errorf("error set running flag %s", err.Error())
}
cc.zkVersion++

addr, port, err := cc.canalNode.GetNode()
if err != nil {
return fmt.Errorf("error get zk current node path %s", err.Error())
}

connector := NewSimpleCanalConnector(addr, port, cc.username, cc.password, cc.destination, cc.soTimeOut, cc.idleTimeOut)
cc.conn = connector

err = connector.Connect()
if err != nil {
return err
}

log.Println("connected to ",addr," success")

return nil
}

func (cc *ClusterCanalConnector) DisConnection() {

func (cc *ClusterCanalConnector) DisConnection() error {
if cc.conn != nil {
cc.conn.DisConnection()
_, stat, _ := cc.canalNode.zkClient.Get(path + "/" + cc.currentSequence)
err := cc.canalNode.zkClient.Delete(path+"/"+cc.currentSequence, stat.Version)
if err != nil {
return fmt.Errorf("error delete temp consumer path %s", err.Error())
}
cc.conn = nil
}

return nil
}

func (cc *ClusterCanalConnector) Subscribe(filter string) error {
return cc.reTryWithConn("Subscribe", func() (err error) {
if err = cc.conn.Subscribe(filter); err == nil {
cc.filter = filter
err := cc.conn.Subscribe(filter)
if err != nil {
err = cc.reconnect()
if err != nil {
return err
}
return
})
return cc.Subscribe(filter)
}

return nil
}

func (cc *ClusterCanalConnector) UnSubscribe() error {
return cc.reTryWithConn("UnSubscribe", func() error {
return cc.conn.UnSubscribe()
})
err := cc.conn.UnSubscribe()
if err != nil {
err = cc.reconnect()
if err != nil {
return err
}
return cc.UnSubscribe()
}

return nil
}

func (cc *ClusterCanalConnector) GetWithOutAck(batchSize int32, timeOut *int64, units *int32) (msg *pb.Message, err error) {
_ = cc.reTryWithConn("GetWithOutAck", func() error {
msg, err = cc.conn.GetWithOutAck(batchSize, timeOut, units)
return err
})
msg, err = cc.conn.GetWithOutAck(batchSize, timeOut, units)
if err != nil {
err = cc.reconnect()
if err != nil {
return nil,err
}
return cc.conn.GetWithOutAck(batchSize,timeOut,units)
}

return
}

func (cc *ClusterCanalConnector) Get(batchSize int32, timeOut *int64, units *int32) (msg *pb.Message, err error) {
_ = cc.reTryWithConn("Get", func() error {
msg, err = cc.conn.Get(batchSize, timeOut, units)
return err
})
msg, err = cc.conn.Get(batchSize, timeOut, units)
if err != nil {
err = cc.reconnect()
if err != nil {
return nil,err
}
return cc.conn.Get(batchSize,timeOut,units)
}

return
}

func (cc *ClusterCanalConnector) Ack(batchId int64) error {
return cc.reTryWithConn("Ack", func() error {
return cc.conn.Ack(batchId)
})
err := cc.conn.Ack(batchId)
if err != nil {
err = cc.reconnect()
if err != nil {
return err
}
return cc.Ack(batchId)
}

return nil
}

func (cc *ClusterCanalConnector) RollBack(batchId int64) error {
return cc.reTryWithConn("RollBack", func() error {
return cc.conn.RollBack(batchId)
})
err := cc.conn.RollBack(batchId)
if err != nil {
err = cc.reconnect()
if err != nil {
return err
}
return cc.RollBack(batchId)
}

return nil
}

func createEphemeralSequence(zkClient *zk.Conn) (string, error) {
node, err := zkClient.Create(path+"/", []byte{notRunningFlag}, zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll))
if err != nil {
return "", err
}
split := strings.Split(node, "/")
currentSequence := split[len(split)-1]
return currentSequence, nil
}

func checkRootPath(zkClient *zk.Conn) error {
exists, _, err := zkClient.Exists(path)
if err != nil {
return err
}
if !exists {
_, err := zkClient.Create(path, []byte{}, 0, zk.WorldACL(zk.PermAll))
if err != nil {
return err
}
}
return nil
}

func (cc *ClusterCanalConnector) waitBecomeFirst() error {
zkClient := cc.canalNode.zkClient
children, _, err := zkClient.Children(path)
if err != nil {
return err
}

if len(children) == 0 {
return errors.New("没有子节点")
}

sort.Strings(children)

if cc.currentSequence != children[0] {
noSelf := true

for i, child := range children {
if cc.currentSequence == child {
noSelf = false
previousPath := path + "/" + children[i-1]
//阻塞等待上一个比他小的节点删除
log.Println("waiting")
err := waitDelete(zkClient, previousPath)
if err != nil {
return err
}
log.Println("waited")

return cc.waitBecomeFirst()
}
}

if noSelf {
//以防万一
cc.currentSequence, err = createEphemeralSequence(zkClient)
if err != nil {
return err
}
return cc.waitBecomeFirst()
}
}

return nil
}

//等待上一个比他小的节点失联,失联后等待10秒,10秒后还没恢复则确认已被删除
func waitDelete(zkClient *zk.Conn, previousPath string) error {
existsW, _, events, err := zkClient.ExistsW(previousPath)
if err != nil {
return err
}

if existsW {
event := <-events
if event.Type != zk.EventNodeDeleted {
return waitDelete(zkClient,previousPath)
}else {
//等待10秒再查看监听的节点是否确实不存在了,以防只是网络延迟造成的掉线
time.Sleep(10*time.Second)
return waitDelete(zkClient,previousPath)
}
}

return nil
}
Loading

0 comments on commit d552e74

Please sign in to comment.