Skip to content
This repository has been archived by the owner on Mar 5, 2024. It is now read-only.

Commit

Permalink
Address most comments and don't shuffle disappered nodes.
Browse files Browse the repository at this point in the history
  • Loading branch information
scottjab committed Nov 8, 2016
1 parent 1f05169 commit 03bb861
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 29 deletions.
54 changes: 28 additions & 26 deletions partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type partitions struct {
selected map[int]bool
local map[int]bool
remote map[int][]string
old map[int][]string
disappeared map[int][]string
numMissing int
ready chan bool
readyClosed bool
Expand All @@ -45,7 +45,7 @@ func watchPartitions(zkWatcher *zkWatcher, peers *peers, db, version string, num
replication: replication,
local: make(map[int]bool),
remote: make(map[int][]string),
old: make(map[int][]string, 1024),
disappeared: make(map[int][]string, 1024),
ready: make(chan bool),
}

Expand Down Expand Up @@ -113,6 +113,19 @@ func (p *partitions) updateLocalPartitions(local map[int]bool) {
}
}

func (p *partitions) deDupe(nodes []string) []string {
found := map[string]bool{}
dedupedNodes := make([]string, len(nodes))
for _, node := range nodes {
if !found[node] {
found[node] = true

dedupedNodes = append(dedupedNodes, node)
}
}
return dedupedNodes
}

func (p *partitions) updateRemotePartitions(nodes []string) {
if p.peers == nil {
return
Expand All @@ -131,23 +144,14 @@ func (p *partitions) updateRemotePartitions(nodes []string) {
}
}

// Keep track of old peers in case zookeeper goes away.
for partitionId, partition := range remote {
newPartition := make([]string, len(partition))
copy(newPartition, partition)

unDedupedPartition := append(newPartition, p.old[partitionId]...)
found := map[string]bool{}

// Shitty dedupe, iterate though the remote peers
for _, node := range unDedupedPartition {
if !found[node] {
found[node] = true
p.old[partitionId] = append([]string{node}, p.old[partitionId]...)
}
}
if len(p.old[partitionId]) >= 1024 {
p.old[partitionId] = p.old[partitionId][:1024]
unDedupedPartition := append(newPartition, p.disappeared[partitionId]...)
p.disappeared[partitionId] = p.deDupe(unDedupedPartition)
if len(p.disappeared[partitionId]) >= 1024 {
p.disappeared[partitionId] = p.disappeared[partitionId][:1024]
}
}

Expand Down Expand Up @@ -243,23 +247,21 @@ func (p *partitions) partitionZKNode(partition int) string {
}

// getPeers returns the list of peers who have the given partition available.
func (p *partitions) getPeers(partition int) []string {
if p.peers == nil {
return nil
}

func (p *partitions) getPeers(partition int) ([]string, []string) {
p.lock.RLock()
defer p.lock.RUnlock()

disappearedPeers := make([]string, 1024)
copy(disappearedPeers, p.disappeared[partition])

if p.peers == nil {
return nil, disappearedPeers
}

peers := make([]string, len(p.remote[partition]))
copy(peers, p.remote[partition])

oldPeers := make([]string, 1024)
copy(oldPeers, p.old[partition])
// Append old peers to peer list, in case of Zookeeper issues.
peers = append(peers, oldPeers...)

return peers
return peers, disappearedPeers
}

// partitionId returns a string id for the given partition, to be used for the
Expand Down
5 changes: 2 additions & 3 deletions serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,11 @@ func (vs *version) serveError(w http.ResponseWriter, key string, err error) {
w.WriteHeader(http.StatusInternalServerError)
}

func shuffle(vs []string) []string {
func shuffle(vs []string, disappeared []string) []string {
shuffled := make([]string, len(vs))
perm := rand.Perm(len(vs))
for i, v := range perm {
shuffled[v] = vs[i]
}

return shuffled
return append(shuffled, disappeared...)
}

0 comments on commit 03bb861

Please sign in to comment.