Skip to content

Commit ac8595b

Browse files
committed
Make it stop sending maps eventually.
Now it hashes maps and doesn't pass duplicate ones along, except when a new peer comes online. So far, this is experimental, but it seems to work alright.
1 parent 73a45f7 commit ac8595b

File tree

2 files changed

+102
-5
lines changed

2 files changed

+102
-5
lines changed

internal/bloom_reachability_map.go

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,29 @@
11
package internal
22

33
import (
4+
"encoding/base64"
5+
"hash/fnv"
6+
47
"github.com/AutoRoute/bloom"
58
"github.com/AutoRoute/node/types"
69
)
710

811
type BloomReachabilityMap struct {
9-
Filters []*bloom.BloomFilter
10-
Conglomerate *bloom.BloomFilter
12+
Filters []*bloom.BloomFilter
13+
// Allows us to keep track of filters between nodes.
14+
filter_hashes map[string]bool
15+
Conglomerate *bloom.BloomFilter
16+
}
17+
18+
// Generates a unique hash for a particular filter.
19+
// Args:
20+
// filter: The filter to hash.
21+
// Returns:
22+
// The FNV hash of the filter.
23+
func hashFilter(filter *bloom.BloomFilter) string {
24+
hasher := fnv.New64()
25+
filter.WriteTo(hasher)
26+
return base64.URLEncoding.EncodeToString(hasher.Sum(nil))
1127
}
1228

1329
func NewBloomReachabilityMap() *BloomReachabilityMap {
@@ -16,8 +32,13 @@ func NewBloomReachabilityMap() *BloomReachabilityMap {
1632

1733
m := BloomReachabilityMap{
1834
Filters: fs,
35+
filter_hashes: make(map[string]bool),
1936
Conglomerate: fs[0].Copy(),
2037
}
38+
39+
// Hash our initial filter to begin with.
40+
initial_hash := hashFilter(fs[0])
41+
m.filter_hashes[initial_hash] = true
2142
return &m
2243
}
2344

@@ -39,22 +60,69 @@ func (m *BloomReachabilityMap) Increment() {
3960
m.Filters = append(newZeroth, m.Filters...)
4061
}
4162

42-
func (m *BloomReachabilityMap) Merge(n *BloomReachabilityMap) {
63+
// Merges two reachability maps.
64+
// Args:
65+
// n: The map to merge with this one.
66+
// Returns:
67+
// True if the map was modified, false if it wasn't. Practically, it will only
68+
// return false if it is being asked to merge a map whose filters are a subset
69+
// of this one's filters.
70+
func (m *BloomReachabilityMap) Merge(n *BloomReachabilityMap) bool {
71+
modified := false
72+
4373
if len(m.Filters) < len(n.Filters) {
74+
modified = true
4475
for k, v := range m.Filters {
76+
_, found := m.filter_hashes[hashFilter(n.Filters[k])]
77+
if (found) {
78+
// This filter is not new.
79+
continue
80+
}
81+
82+
old_hash := hashFilter(v)
4583
v.Merge(n.Filters[k])
84+
new_hash := hashFilter(v)
85+
if old_hash != new_hash {
86+
delete(m.filter_hashes, old_hash)
87+
m.filter_hashes[new_hash] = true
88+
}
4689
}
4790
// append the remaining Filters
91+
for _, filter := range n.Filters[len(m.Filters):] {
92+
m.filter_hashes[hashFilter(filter)] = true
93+
}
4894
m.Filters = append(m.Filters, n.Filters[len(m.Filters):]...)
4995
} else {
5096
for k, v := range n.Filters {
97+
// Check for an identical filter.
98+
_, found := m.filter_hashes[hashFilter(v)]
99+
if (found) {
100+
// This filter is not new.
101+
continue
102+
}
103+
104+
old_hash := hashFilter(m.Filters[k])
51105
m.Filters[k].Merge(v)
106+
new_hash := hashFilter(m.Filters[k])
107+
if old_hash != new_hash {
108+
delete(m.filter_hashes, old_hash)
109+
m.filter_hashes[new_hash] = true
110+
modified = true
111+
}
52112
}
53113
}
114+
115+
if !modified {
116+
// We didn't add any new filters.
117+
return false
118+
}
119+
54120
// reconstruct the Conglomerate
55121
for _, v := range m.Filters {
56122
m.Conglomerate.Merge(v)
57123
}
124+
125+
return true
58126
}
59127

60128
func (m *BloomReachabilityMap) Copy() *BloomReachabilityMap {

internal/reachability.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,23 @@ package internal
22

33
import (
44
"errors"
5+
"expvar"
56
"log"
67
"sync"
78

89
"github.com/AutoRoute/node/types"
910
)
1011

12+
// Export the last number of possible nexthops, and the destination we were
13+
// trying to reach.
14+
var next_hops *expvar.Int
15+
var destination *expvar.String
16+
17+
func init() {
18+
next_hops = expvar.NewInt("next_hops")
19+
destination = expvar.NewString("destination")
20+
}
21+
1122
// Takes care of maintaining and relaying maps and insures that we know which
1223
// interfaces can reach which addresses.
1324
type reachabilityHandler struct {
@@ -37,8 +48,16 @@ func newReachability(me types.NodeAddress) *reachabilityHandler {
3748
func (m *reachabilityHandler) addMap(address types.NodeAddress, new_map *BloomReachabilityMap) {
3849
m.l.Lock()
3950
defer m.l.Unlock()
40-
m.maps[address].Merge(new_map)
51+
52+
if !m.maps[address].Merge(new_map) {
53+
// If this returns false, then we know we have already seen this map and
54+
// passed it along.
55+
log.Print("Dropping duplicate map.")
56+
return
57+
}
58+
4159
m.merged_map.Merge(new_map)
60+
4261
for addr, conn := range m.conns {
4362
if addr != address {
4463
conn.SendMap(new_map.Copy())
@@ -61,7 +80,7 @@ func (m *reachabilityHandler) AddConnection(id types.NodeAddress, c MapConnectio
6180
defer m.l.Unlock()
6281
err := c.SendMap(initial_map)
6382
if err != nil {
64-
log.Fatal(err)
83+
log.Fatalf("Sending map failed: %s\n", err)
6584
}
6685
}()
6786

@@ -92,14 +111,22 @@ func (m *reachabilityHandler) HandleConnection(id types.NodeAddress, c MapConnec
92111
// All the nodes that we could possibly send the packet to.
93112
func (m *reachabilityHandler) FindPossibleDests(id types.NodeAddress,
94113
src types.NodeAddress) ([]types.NodeAddress, error) {
114+
human_address, err := id.MarshalText()
115+
if err != nil {
116+
log.Printf("Warning: Converting to human-readable address failed: %s\n", err)
117+
}
118+
destination.Set(string(human_address))
119+
95120
m.l.Lock()
96121
defer m.l.Unlock()
97122
_, ok := m.conns[id]
98123
if ok {
124+
next_hops.Set(1)
99125
return []types.NodeAddress{id}, nil
100126
}
101127

102128
if id == m.me {
129+
next_hops.Set(1)
103130
return []types.NodeAddress{id}, nil
104131
}
105132

@@ -116,8 +143,10 @@ func (m *reachabilityHandler) FindPossibleDests(id types.NodeAddress,
116143
}
117144

118145
if len(dests) == 0 {
146+
next_hops.Set(0)
119147
return nil, errors.New("Unable to find host")
120148
}
149+
next_hops.Set(int64(len(dests)))
121150
return dests, nil
122151
}
123152

0 commit comments

Comments
 (0)