Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race cond. when expanding replication topic #34

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions check/connectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type ZkConnection interface {
Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error)
Children(path string) ([]string, *zk.Stat, error)
Get(path string) ([]byte, *zk.Stat, error)
NewLock(path string, acl []zk.ACL) (ZkLock, error)
}

// Actual implementation based on samuel/go-zookeeper/zk
Expand Down Expand Up @@ -104,3 +105,22 @@ func (zkConn *zkConnection) Children(path string) ([]string, *zk.Stat, error) {
func (zkConn *zkConnection) Get(path string) ([]byte, *zk.Stat, error) {
return zkConn.connection.Get(path)
}

type ZkLock interface {
Unlock() error
}

type zkLock struct {
lock *zk.Lock
}

// Creates a lock object, in a locked state
func (zkConn *zkConnection) NewLock(path string, acl []zk.ACL) (ZkLock, error) {
l := zk.NewLock(zkConn.connection, path, acl)
err := l.Lock()
return &zkLock{l}, err
}

func (l *zkLock) Unlock() error {
return l.lock.Unlock()
}
42 changes: 42 additions & 0 deletions check/connectors_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,45 @@ func (_m *MockZkConnection) Get(path string) ([]byte, *zk.Stat, error) {
func (_mr *_MockZkConnectionRecorder) Get(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Get", arg0)
}

func (_m *MockZkConnection) NewLock(path string, acl []zk.ACL) (ZkLock, error) {
ret := _m.ctrl.Call(_m, "NewLock", path, acl)
ret0, _ := ret[0].(ZkLock)
ret1, _ := ret[1].(error)
return ret0, ret1
}

func (_mr *_MockZkConnectionRecorder) NewLock(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "NewLock", arg0, arg1)
}

// Mock of ZkLock interface
type MockZkLock struct {
ctrl *gomock.Controller
recorder *_MockZkLockRecorder
}

// Recorder for MockZkLock (not exported)
type _MockZkLockRecorder struct {
mock *MockZkLock
}

func NewMockZkLock(ctrl *gomock.Controller) *MockZkLock {
mock := &MockZkLock{ctrl: ctrl}
mock.recorder = &_MockZkLockRecorder{mock}
return mock
}

func (_m *MockZkLock) EXPECT() *_MockZkLockRecorder {
return _m.recorder
}

func (_m *MockZkLock) Unlock() error {
ret := _m.ctrl.Call(_m, "Unlock")
ret0, _ := ret[0].(error)
return ret0
}

func (_mr *_MockZkLockRecorder) Unlock() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Unlock")
}
2 changes: 1 addition & 1 deletion check/health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (check *HealthCheck) CheckHealth(brokerUpdates chan<- Update, clusterUpdate
func newUpdate(report StatusReport, name string) Update {
data, err := report.Json()
if err != nil {
log.Warn("Error while marshaling %s status: %s", name, err.Error())
log.Warnf("Error while marshaling %s status: %s", name, err.Error())
data = simpleStatus(report.Summary())
}
return Update{report.Summary(), data}
Expand Down
2 changes: 1 addition & 1 deletion check/parse_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (check *HealthCheck) ParseCommandLineArguments() {
}
if check.config.replicationTopicName == "" {
check.config.replicationTopicName = "broker-replication-check"
logr.Println("using topic", check.config.topicName, "for broker", check.config.brokerID, "replication check")
logr.Println("using topic", check.config.replicationTopicName, "for broker", check.config.brokerID, "replication check")
}
check.config.retryInterval = check.config.CheckInterval / 2
}
Expand Down
53 changes: 40 additions & 13 deletions check/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (check *HealthCheck) findPartitionID(topicName string, forHealthCheck bool,
}

if ok {
return 0, fmt.Errorf(`Unable to find broker's parition in topic "%s" in metadata`, topicName)
return 0, fmt.Errorf(`Unable to find broker's partition in topic "%s" in metadata`, topicName)
} else {
return 0, fmt.Errorf(`Unable to find broker's topic "%s" in metadata`, topicName)
}
Expand Down Expand Up @@ -156,6 +156,13 @@ func (check *HealthCheck) createTopic(name string, forHealthCheck bool) (err err
}
defer zkConn.Close()

// Create a distributed lock to prevent race conditions when multiple health check instances expand replication
lock, err := zkConn.NewLock(chroot+"/kafka-health-check", zk.WorldACL(zk.PermAll))
if err != nil {
return errors.Wrap(err, "Unable to aquire ZK lock")
}
defer lock.Unlock()

topicPath := chroot + "/config/topics/" + name

exists := false
Expand Down Expand Up @@ -194,9 +201,11 @@ func (check *HealthCheck) createTopic(name string, forHealthCheck bool) (err err

}

func maybeExpandReplicationTopic(zk ZkConnection, brokerID, partitionID int32, topicName, chroot string) error {
func maybeExpandReplicationTopic(zkConn ZkConnection, brokerID, partitionID int32, topicName, chroot string) error {
topic := ZkTopic{Name: topicName}
err := zkPartitions(&topic, zk, topicName, chroot)
// Wait so that we get up-to-date partition info
waitForPartitionReassignmentDone(zkConn, chroot)
err := zkPartitions(&topic, zkConn, topicName, chroot)
if err != nil {
return errors.Wrap(err, "Unable to determine if replication topic should be expanded")
}
Expand All @@ -210,23 +219,24 @@ func maybeExpandReplicationTopic(zk ZkConnection, brokerID, partitionID int32, t
log.Info("Expanding replication check topic to include broker ", brokerID)
replicas = append(replicas, brokerID)

return reassignPartition(zk, partitionID, replicas, topicName, chroot)
return reassignPartition(zkConn, partitionID, replicas, topicName, chroot)
}
return nil
}

func reassignPartition(zk ZkConnection, partitionID int32, replicas []int32, topicName, chroot string) (err error) {

func waitForPartitionReassignmentDone(zk ZkConnection, chroot string) {
repeat := true
for repeat {
time.Sleep(1 * time.Second)
exists, _, rp_err := zk.Exists(chroot + "/admin/reassign_partitions")
if rp_err != nil {
exists, _, err := zk.Exists(chroot + "/admin/reassign_partitions")
if err != nil {
log.Warn("Error while checking if reassign_partitions node exists", err)
}
repeat = exists || err != nil
}
}

func reassignPartition(zk ZkConnection, partitionID int32, replicas []int32, topicName, chroot string) (err error) {
var replicasStr []string
for _, ID := range replicas {
replicasStr = append(replicasStr, fmt.Sprintf("%d", ID))
Expand All @@ -235,12 +245,14 @@ func reassignPartition(zk ZkConnection, partitionID int32, replicas []int32, top
reassign := fmt.Sprintf(`{"version":1,"partitions":[{"topic":"%s","partition":%d,"replicas":[%s]}]}`,
topicName, partitionID, strings.Join(replicasStr, ","))

repeat = true
// Start new partition reassignment process
repeat := true
for repeat {
log.Info("Creating reassign partition node")
err = createZkNode(zk, chroot+"/admin/reassign_partitions", reassign, true)
if err != nil {
log.Warn("Error while creating reassignment node", err)
log.Warn("Error while creating reassignment node, retrying in 1 second...", err)
time.Sleep(1 * time.Second)
}
repeat = err != nil
}
Expand Down Expand Up @@ -281,15 +293,30 @@ func (check *HealthCheck) closeConnection(deleteTopicIfPresent bool) {
}
defer zkConn.Close()

check.deleteTopic(zkConn, chroot, check.config.topicName, check.partitionID)
check.deleteTopic(zkConn, chroot, check.config.replicationTopicName, check.replicationPartitionID)
err = check.deleteTopic(zkConn, chroot, check.config.topicName, check.partitionID)
if err != nil {
log.Warnf(`Unable to delete topic "%s"`, check.config.topicName)
}
err = check.deleteTopic(zkConn, chroot, check.config.replicationTopicName, check.replicationPartitionID)
if err != nil {
log.Warnf(`Unable to delete topic "%s"`, check.config.replicationTopicName)
}
}
check.broker.Close()
}

func (check *HealthCheck) deleteTopic(zkConn ZkConnection, chroot, name string, partitionID int32) error {
// Create a distributed lock to prevent race conditions when multiple health check instances shrink replication
lock, err := zkConn.NewLock(chroot+"/kafka-health-check", zk.WorldACL(zk.PermAll))
if err != nil {
return errors.Wrap(err, "Unable to aquire ZK lock")
}
defer lock.Unlock()

topic := ZkTopic{Name: name}
err := zkPartitions(&topic, zkConn, name, chroot)
// Wait so that we get up-to-date partition info
waitForPartitionReassignmentDone(zkConn, chroot)
err = zkPartitions(&topic, zkConn, name, chroot)
if err != nil {
return err
}
Expand Down
26 changes: 20 additions & 6 deletions check/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func Test_findPartitionID_WhenTopicDoesNotExistAndMayCreateIt_CreatesTopic(t *te
metadata := metadataWithoutTopic()

zookeeper.EXPECT().Connect([]string{"localhost:2181"}, gomock.Any()).Return(nil, nil)
zookeeper.mockLock("/kafka-health-check", ctrl)
zookeeper.mockSuccessfulPathCreation("/config/topics/health-check")
zookeeper.mockSuccessfulPathCreation("/brokers/topics/health-check")
zookeeper.EXPECT().Close()
Expand Down Expand Up @@ -167,6 +168,7 @@ func Test_createHealthCheckTopic_WhenTopicCreationsSuccessful_ReturnsNoError(t *
check.zookeeper = zookeeper

zookeeper.EXPECT().Connect([]string{"localhost:2181", "localhost:2182"}, gomock.Any()).Return(nil, nil)
zookeeper.mockLock("/kafka-health-check", ctrl)
zookeeper.mockSuccessfulPathCreation("/config/topics/health-check")
zookeeper.mockSuccessfulPathCreation("/brokers/topics/health-check")
zookeeper.EXPECT().Close()
Expand Down Expand Up @@ -220,6 +222,7 @@ func Test_createTopic_WhenCreatingTopicConfigFails_ReturnsError(t *testing.T) {

check, zookeeper := newZkTestCheck(ctrl)
zookeeper.EXPECT().Connect([]string{"localhost:2181"}, gomock.Any()).Return(nil, nil)
zookeeper.mockLock("/kafka-health-check", ctrl)
zookeeper.mockFailingPathCreation("/config/topics/health-check")
zookeeper.EXPECT().Close()

Expand All @@ -236,6 +239,7 @@ func Test_createTopic_WhenCreatingTopicPartitionsFails_ReturnsError(t *testing.T

check, zookeeper := newZkTestCheck(ctrl)
zookeeper.EXPECT().Connect([]string{"localhost:2181"}, gomock.Any()).Return(nil, nil)
zookeeper.mockLock("/kafka-health-check", ctrl)
zookeeper.mockSuccessfulPathCreation("/config/topics/health-check")
zookeeper.mockFailingPathCreation("/brokers/topics/health-check")
zookeeper.EXPECT().Close()
Expand All @@ -252,9 +256,13 @@ func Test_deleteHealthCheckTopic_WhenDeleteSucceeds_ReturnsNoError(t *testing.T)
defer ctrl.Finish()

check, zookeeper := newZkTestCheck(ctrl)
zookeeper.mockTopicGet("health-check")
zookeeper.mockSuccessfulPathCreation("/admin/delete_topics/health-check")
zookeeper.EXPECT().Exists("/admin/delete_topics/health-check").Return(true, nil, nil).Return(false, nil, nil)
gomock.InOrder(
zookeeper.mockLock("/kafka-health-check", ctrl),
zookeeper.EXPECT().Exists("/admin/reassign_partitions").Return(false, nil, nil),
zookeeper.mockTopicGet("health-check"),
zookeeper.mockSuccessfulPathCreation("/admin/delete_topics/health-check"),
zookeeper.EXPECT().Exists("/admin/delete_topics/health-check").Return(true, nil, nil).Return(false, nil, nil),
)

err := check.deleteTopic(zookeeper, "", "health-check", 0)

Expand All @@ -268,6 +276,8 @@ func Test_deleteTopic_WhenCreateDeleteNodeFails_ReturnsError(t *testing.T) {
defer ctrl.Finish()

check, zookeeper := newZkTestCheck(ctrl)
zookeeper.mockLock("/kafka-health-check", ctrl)
zookeeper.EXPECT().Exists("/admin/reassign_partitions").Return(false, nil, nil)
zookeeper.mockTopicGet("health-check")
zookeeper.mockFailingPathCreation("/admin/delete_topics/health-check")

Expand All @@ -283,9 +293,13 @@ func Test_deleteTopic_WhenExistsFails_ReturnsError(t *testing.T) {
defer ctrl.Finish()

check, zookeeper := newZkTestCheck(ctrl)
zookeeper.mockTopicGet("health-check")
zookeeper.mockSuccessfulPathCreation("/admin/delete_topics/health-check")
zookeeper.EXPECT().Exists("/admin/delete_topics/health-check").Return(false, nil, errors.New("test error"))
gomock.InOrder(
zookeeper.mockLock("/kafka-health-check", ctrl),
zookeeper.EXPECT().Exists("/admin/reassign_partitions").Return(false, nil, nil),
zookeeper.mockTopicGet("health-check"),
zookeeper.mockSuccessfulPathCreation("/admin/delete_topics/health-check"),
zookeeper.EXPECT().Exists("/admin/delete_topics/health-check").Return(false, nil, errors.New("test error")),
)

err := check.deleteTopic(zookeeper, "", "health-check", 0)

Expand Down
29 changes: 19 additions & 10 deletions check/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,22 +343,24 @@ func newZkTestCheck(ctrl *gomock.Controller) (check *HealthCheck, zookeeper *Moc
return
}

func (zookeeper *MockZkConnection) mockGet(path, data string) {
zookeeper.EXPECT().Get(path).Return([]byte(data), nil, nil)
func (zookeeper *MockZkConnection) mockGet(path, data string) *gomock.Call {
return zookeeper.EXPECT().Get(path).Return([]byte(data), nil, nil)
}

func (zookeeper *MockZkConnection) mockTopicGet(name string) {
zookeeper.EXPECT().Get("/brokers/topics/"+name).Return([]byte(`{"version":1,"partitions":{"0":[1]}}`), nil, nil)
func (zookeeper *MockZkConnection) mockTopicGet(name string) *gomock.Call {
return zookeeper.EXPECT().Get("/brokers/topics/"+name).Return([]byte(`{"version":1,"partitions":{"0":[1]}}`), nil, nil)
}

func (zookeeper *MockZkConnection) mockSuccessfulPathCreation(path string) {
zookeeper.EXPECT().Exists(path).Return(false, nil, nil)
zookeeper.EXPECT().Create(path, gomock.Any(), int32(0), gomock.Any()).Return(path, nil)
func (zookeeper *MockZkConnection) mockSuccessfulPathCreation(path string) *gomock.Call {
before := zookeeper.EXPECT().Exists(path).Return(false, nil, nil)
zookeeper.EXPECT().Create(path, gomock.Any(), int32(0), gomock.Any()).Return(path, nil).After(before)
return before
}

func (zookeeper *MockZkConnection) mockFailingPathCreation(path string) {
zookeeper.EXPECT().Exists(path).Return(false, nil, nil)
zookeeper.EXPECT().Create(path, gomock.Any(), int32(0), gomock.Any()).Return("", errors.New("Test error"))
func (zookeeper *MockZkConnection) mockFailingPathCreation(path string) *gomock.Call {
before := zookeeper.EXPECT().Exists(path).Return(false, nil, nil)
zookeeper.EXPECT().Create(path, gomock.Any(), int32(0), gomock.Any()).Return("", errors.New("Test error")).After(before)
return before
}

func (zk *MockZkConnection) mockHealthyMetadata(topics ...string) {
Expand All @@ -370,3 +372,10 @@ func (zk *MockZkConnection) mockHealthyMetadata(topics ...string) {
}
zk.EXPECT().Close()
}

func (zookeeper *MockZkConnection) mockLock(path string, ctrl *gomock.Controller) *gomock.Call {
lock := NewMockZkLock(ctrl)
before := zookeeper.EXPECT().NewLock(path, gomock.Any()).Return(lock, nil)
lock.EXPECT().Unlock().After(before)
return before
}