From e18c85e6ebd165d2ee842cd462ac506423df0b67 Mon Sep 17 00:00:00 2001 From: Jim Scott Date: Thu, 1 Dec 2016 13:39:35 -0800 Subject: [PATCH] Rebase master onto this branch --- serve.go | 4 ++-- sharding/partitions.go | 49 ++++++++++++++++++++++++++++++++++++------ 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/serve.go b/serve.go index de09dd6d..d19af787 100644 --- a/serve.go +++ b/serve.go @@ -120,12 +120,12 @@ func (vs *version) serveError(w http.ResponseWriter, key string, err error) { w.WriteHeader(http.StatusInternalServerError) } -func shuffle(vs []string) []string { +func shuffle(vs []string, disappeared []string) []string { shuffled := make([]string, len(vs)) perm := rand.Perm(len(vs)) for i, v := range perm { shuffled[v] = vs[i] } - return shuffled + return append(shuffled, disappeared...) } diff --git a/sharding/partitions.go b/sharding/partitions.go index cc9e3d6c..28ba9851 100644 --- a/sharding/partitions.go +++ b/sharding/partitions.go @@ -33,6 +33,7 @@ type Partitions struct { selected map[int]bool local map[int]bool remote map[int][]string + disappeared map[int][]string numMissing int readyClosed bool shouldAdvertise bool @@ -55,6 +56,7 @@ func WatchPartitions(zkWatcher *zk.Watcher, peers *Peers, db, version string, nu replication: replication, local: make(map[int]bool), remote: make(map[int][]string), + disappeared: make(map[int][]string, 1024), } p.pickLocal() @@ -69,6 +71,19 @@ func WatchPartitions(zkWatcher *zk.Watcher, peers *Peers, db, version string, nu return p } +// Dedupelicates elements in a slice of strings. +func dedupe(nodes []string) []string { + found := map[string]bool{} + dedupedNodes := make([]string, 0, len(nodes)) + for _, node := range nodes { + if !found[node] { + found[node] = true + dedupedNodes = append(dedupedNodes, node) + } + } + return dedupedNodes +} + // pickLocal selects which partitions are local by iterating through // them all, and checking the hashring to see if this peer is one of the // replicas. @@ -107,17 +122,20 @@ func (p *Partitions) sync(updates chan []string) { } // FindPeers returns the list of peers who have the given partition available. -func (p *Partitions) FindPeers(partition int) []string { - if p.peers == nil { - return nil - } - +func (p *Partitions) FindPeers(partition int) ([]string, []string) { p.lock.RLock() defer p.lock.RUnlock() + disappearedPeers := make([]string, 1024) + copy(disappearedPeers, p.disappeared[partition]) + + if p.peers == nil { + return nil, disappearedPeers + } + peers := make([]string, len(p.remote[partition])) copy(peers, p.remote[partition]) - return peers + return peers, disappearedPeers } // Update updates the list of local partitions to the given list. @@ -228,6 +246,25 @@ func (p *Partitions) updateRemote(nodes []string) { } } + for partitionId, partition := range p.remote { + disappearedPeers := make([]string, len(partition)) + for _, oldPeer := range partition { + found := false + for _, newPeer := range remote[partitionId] { + if newPeer == oldPeer { + found = true + } + } + if !found { + disappearedPeers = append(disappearedPeers, oldPeer) + } + } + p.disappeared[partitionId] = dedupe(append(disappearedPeers, p.disappeared[partitionId]...)) + if len(p.disappeared[partitionId]) >= 1024 { + p.disappeared[partitionId] = p.disappeared[partitionId][:1024] + } + } + p.remote = remote p.updateMissing() }