Skip to content

Commit

Permalink
Merge pull request #2788 from openziti/detect-non-member-peers
Browse files Browse the repository at this point in the history
Disconnect and event on non-member peers. Fixes #2742
  • Loading branch information
plorenz authored Feb 19, 2025
2 parents 7db54af + 3c3585c commit d7ed6c8
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 10 deletions.
20 changes: 11 additions & 9 deletions controller/event/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
ClusterIsLeaderless ClusterEventType = "state.is_leaderless"
ClusterStateReadOnly ClusterEventType = "state.ro"
ClusterStateReadWrite ClusterEventType = "state.rw"
ClusterPeerNotMember ClusterEventType = "peer.not_member"
)

// A ClusterPeer represents a controller which is a member of the cluster.
Expand Down Expand Up @@ -72,15 +73,16 @@ func (self *ClusterPeer) String() string {

// A ClusterEvent marks a change to the controller HA cluster.
// ClusterEvents can be of the following types:
// - peer.connected
// - peer.disconnected
// - members.changed
// - leadership.gained
// - leadership.lost
// - state.has_leader
// - state.is_leaderless
// - state.ro
// - state.rw
// - peer.connected - a peer connected
// - peer.disconnected - a peer disconnected
// - peer.not_member - a peer connected, but was not a member and didn't join the cluster
// - members.changed - A peer was added to or removed from the cluster
// - leadership.gained - The node become the cluster leader
// - leadership.lost - The node lost cluster leadership
// - state.has_leader - The cluster gained a leader
// - state.is_leaderless - The cluster became leaderless
// - state.ro - The cluster is not accepting state changes, likely due to version mismatches in cluster members
// - state.rw - The cluster is accepting state changes
//
// Example: Cluster Members Changed Event
//
Expand Down
19 changes: 19 additions & 0 deletions controller/raft/mesh/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ type Env interface {
GetClusterId() string
GetVersionProvider() versions.VersionProvider
GetEventDispatcher() event.Dispatcher
IsPeerMember(id string) bool
IsLeader() bool
}

// Mesh provides the networking layer to raft
Expand Down Expand Up @@ -823,6 +825,23 @@ func (self *impl) AcceptUnderlay(underlay channel.Underlay) error {
binding.AddReceiveHandlerF(RaftConnectType, peer.handleReceiveConnect)
binding.AddReceiveHandlerF(RaftDisconnectType, peer.handleReceiveDisconnect)
binding.AddCloseHandler(peer)

if self.env.IsLeader() && !self.env.IsPeerMember(id) {
time.AfterFunc(time.Minute, func() {
if !self.env.IsPeerMember(id) && !binding.GetChannel().IsClosed() {
logger := pfxlog.Logger().WithField("peer", peer.Id)
logger.Info("disconnecting non-member peer after 1 minute")
if err := binding.GetChannel().Close(); err != nil {
log.WithError(err).Error("error closing channel to non-member peer")
}

evt := event.NewClusterEvent(event.ClusterPeerNotMember)
evt.Peers = self.GetEventPeerList(peer)
self.env.GetEventDispatcher().AcceptClusterEvent(evt)
}
})
}

return self.PeerConnected(peer, false)
})

Expand Down
10 changes: 10 additions & 0 deletions controller/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,16 @@ func (self *Controller) GetEventDispatcher() event.Dispatcher {
return self.env.GetEventDispatcher()
}

func (self *Controller) IsPeerMember(id string) bool {
result := self.Fsm.GetCurrentState(self.Raft)
for _, srv := range result.Servers {
if string(srv.ID) == id {
return true
}
}
return false
}

func (self *Controller) GetListenerHeaders() map[int32][]byte {
return map[int32][]byte{
mesh.ClusterIdHeader: []byte(self.clusterId.Load()),
Expand Down
2 changes: 1 addition & 1 deletion zititest/models/smoke/actions/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (a *bootstrapAction) bind(m *model.Model) model.Action {
workflow.AddAction(component.Start(".ctrl"))

if isHA {
workflow.AddAction(semaphore.Sleep(2 * time.Second))
workflow.AddAction(semaphore.Sleep(5 * time.Second))
workflow.AddAction(edge.InitRaftController("#ctrl1"))
}

Expand Down

0 comments on commit d7ed6c8

Please sign in to comment.