Skip to content
This repository has been archived by the owner on Mar 5, 2024. It is now read-only.

Commit

Permalink
Refactor how we keep track of old nodes and add dedupe.
Browse files Browse the repository at this point in the history
  • Loading branch information
scottjab committed Nov 7, 2016
1 parent 7908efa commit 1f05169
Showing 1 changed file with 23 additions and 10 deletions.
33 changes: 23 additions & 10 deletions partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type partitions struct {
selected map[int]bool
local map[int]bool
remote map[int][]string
old []map[int][]string
old map[int][]string
numMissing int
ready chan bool
readyClosed bool
Expand All @@ -45,7 +45,7 @@ func watchPartitions(zkWatcher *zkWatcher, peers *peers, db, version string, num
replication: replication,
local: make(map[int]bool),
remote: make(map[int][]string),
old: make([]map[int][]string, 1024),
old: make(map[int][]string, 1024),
ready: make(chan bool),
}

Expand Down Expand Up @@ -132,11 +132,24 @@ func (p *partitions) updateRemotePartitions(nodes []string) {
}

// Keep track of old peers in case zookeeper goes away.
if len(p.old) >= 1024 {
p.old = p.old[:1024]
for partitionId, partition := range remote {
newPartition := make([]string, len(partition))
copy(newPartition, partition)

unDedupedPartition := append(newPartition, p.old[partitionId]...)
found := map[string]bool{}

// Shitty dedupe, iterate though the remote peers
for _, node := range unDedupedPartition {
if !found[node] {
found[node] = true
p.old[partitionId] = append([]string{node}, p.old[partitionId]...)
}
}
if len(p.old[partitionId]) >= 1024 {
p.old[partitionId] = p.old[partitionId][:1024]
}
}
oldRemote := []map[int][]string{remote}
p.old = append(oldRemote, p.old...)

p.remote = remote
p.updateMissing()
Expand Down Expand Up @@ -241,11 +254,11 @@ func (p *partitions) getPeers(partition int) []string {
peers := make([]string, len(p.remote[partition]))
copy(peers, p.remote[partition])

oldPeers := make([]string, 1024)
copy(oldPeers, p.old[partition])
// Append old peers to peer list, in case of Zookeeper issues.
for _, oldPeer := range p.old {
peers = append(peers, oldPeer[partition]...)
}

peers = append(peers, oldPeers...)

return peers
}

Expand Down

0 comments on commit 1f05169

Please sign in to comment.