Skip to content
This repository has been archived by the owner on Mar 5, 2024. It is now read-only.

Commit

Permalink
Rebase master onto this branch
Browse files Browse the repository at this point in the history
  • Loading branch information
scottjab committed Dec 1, 2016
1 parent eaf6f9c commit e18c85e
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 8 deletions.
4 changes: 2 additions & 2 deletions serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,12 @@ func (vs *version) serveError(w http.ResponseWriter, key string, err error) {
w.WriteHeader(http.StatusInternalServerError)
}

func shuffle(vs []string) []string {
func shuffle(vs []string, disappeared []string) []string {
shuffled := make([]string, len(vs))
perm := rand.Perm(len(vs))
for i, v := range perm {
shuffled[v] = vs[i]
}

return shuffled
return append(shuffled, disappeared...)
}
49 changes: 43 additions & 6 deletions sharding/partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Partitions struct {
selected map[int]bool
local map[int]bool
remote map[int][]string
disappeared map[int][]string
numMissing int
readyClosed bool
shouldAdvertise bool
Expand All @@ -55,6 +56,7 @@ func WatchPartitions(zkWatcher *zk.Watcher, peers *Peers, db, version string, nu
replication: replication,
local: make(map[int]bool),
remote: make(map[int][]string),
disappeared: make(map[int][]string, 1024),
}

p.pickLocal()
Expand All @@ -69,6 +71,19 @@ func WatchPartitions(zkWatcher *zk.Watcher, peers *Peers, db, version string, nu
return p
}

// Dedupelicates elements in a slice of strings.
func dedupe(nodes []string) []string {
found := map[string]bool{}
dedupedNodes := make([]string, 0, len(nodes))
for _, node := range nodes {
if !found[node] {
found[node] = true
dedupedNodes = append(dedupedNodes, node)
}
}
return dedupedNodes
}

// pickLocal selects which partitions are local by iterating through
// them all, and checking the hashring to see if this peer is one of the
// replicas.
Expand Down Expand Up @@ -107,17 +122,20 @@ func (p *Partitions) sync(updates chan []string) {
}

// FindPeers returns the list of peers who have the given partition available.
func (p *Partitions) FindPeers(partition int) []string {
if p.peers == nil {
return nil
}

func (p *Partitions) FindPeers(partition int) ([]string, []string) {
p.lock.RLock()
defer p.lock.RUnlock()

disappearedPeers := make([]string, 1024)
copy(disappearedPeers, p.disappeared[partition])

if p.peers == nil {
return nil, disappearedPeers
}

peers := make([]string, len(p.remote[partition]))
copy(peers, p.remote[partition])
return peers
return peers, disappearedPeers
}

// Update updates the list of local partitions to the given list.
Expand Down Expand Up @@ -228,6 +246,25 @@ func (p *Partitions) updateRemote(nodes []string) {
}
}

for partitionId, partition := range p.remote {
disappearedPeers := make([]string, len(partition))
for _, oldPeer := range partition {
found := false
for _, newPeer := range remote[partitionId] {
if newPeer == oldPeer {
found = true
}
}
if !found {
disappearedPeers = append(disappearedPeers, oldPeer)
}
}
p.disappeared[partitionId] = dedupe(append(disappearedPeers, p.disappeared[partitionId]...))
if len(p.disappeared[partitionId]) >= 1024 {
p.disappeared[partitionId] = p.disappeared[partitionId][:1024]
}
}

p.remote = remote
p.updateMissing()
}
Expand Down

0 comments on commit e18c85e

Please sign in to comment.