diff --git a/internal/bloom_reachability_map.go b/internal/bloom_reachability_map.go index 2a90be8..3b40e65 100644 --- a/internal/bloom_reachability_map.go +++ b/internal/bloom_reachability_map.go @@ -1,13 +1,29 @@ package internal import ( + "encoding/base64" + "hash/fnv" + "github.com/AutoRoute/bloom" "github.com/AutoRoute/node/types" ) type BloomReachabilityMap struct { - Filters []*bloom.BloomFilter - Conglomerate *bloom.BloomFilter + Filters []*bloom.BloomFilter + // Allows us to keep track of filters between nodes. + filter_hashes map[string]bool + Conglomerate *bloom.BloomFilter +} + +// Generates a unique hash for a particular filter. +// Args: +// filter: The filter to hash. +// Returns: +// The FNV hash of the filter. +func hashFilter(filter *bloom.BloomFilter) string { + hasher := fnv.New64() + filter.WriteTo(hasher) + return base64.URLEncoding.EncodeToString(hasher.Sum(nil)) } func NewBloomReachabilityMap() *BloomReachabilityMap { @@ -16,8 +32,13 @@ func NewBloomReachabilityMap() *BloomReachabilityMap { m := BloomReachabilityMap{ Filters: fs, + filter_hashes: make(map[string]bool), Conglomerate: fs[0].Copy(), } + + // Hash our initial filter to begin with. + initial_hash := hashFilter(fs[0]) + m.filter_hashes[initial_hash] = true return &m } @@ -39,22 +60,69 @@ func (m *BloomReachabilityMap) Increment() { m.Filters = append(newZeroth, m.Filters...) } -func (m *BloomReachabilityMap) Merge(n *BloomReachabilityMap) { +// Merges two reachability maps. +// Args: +// n: The map to merge with this one. +// Returns: +// True if the map was modified, false if it wasn't. Practically, it will only +// return false if it is being asked to merge a map whose filters are a subset +// of this one's filters. +func (m *BloomReachabilityMap) Merge(n *BloomReachabilityMap) bool { + modified := false + if len(m.Filters) < len(n.Filters) { + modified = true for k, v := range m.Filters { + _, found := m.filter_hashes[hashFilter(n.Filters[k])] + if (found) { + // This filter is not new. + continue + } + + old_hash := hashFilter(v) v.Merge(n.Filters[k]) + new_hash := hashFilter(v) + if old_hash != new_hash { + delete(m.filter_hashes, old_hash) + m.filter_hashes[new_hash] = true + } } // append the remaining Filters + for _, filter := range n.Filters[len(m.Filters):] { + m.filter_hashes[hashFilter(filter)] = true + } m.Filters = append(m.Filters, n.Filters[len(m.Filters):]...) } else { for k, v := range n.Filters { + // Check for an identical filter. + _, found := m.filter_hashes[hashFilter(v)] + if (found) { + // This filter is not new. + continue + } + + old_hash := hashFilter(m.Filters[k]) m.Filters[k].Merge(v) + new_hash := hashFilter(m.Filters[k]) + if old_hash != new_hash { + delete(m.filter_hashes, old_hash) + m.filter_hashes[new_hash] = true + modified = true + } } } + + if !modified { + // We didn't add any new filters. + return false + } + // reconstruct the Conglomerate for _, v := range m.Filters { m.Conglomerate.Merge(v) } + + return true } func (m *BloomReachabilityMap) Copy() *BloomReachabilityMap { diff --git a/internal/reachability.go b/internal/reachability.go index e2ea1c0..3863d73 100644 --- a/internal/reachability.go +++ b/internal/reachability.go @@ -2,12 +2,23 @@ package internal import ( "errors" + "expvar" "log" "sync" "github.com/AutoRoute/node/types" ) +// Export the last number of possible nexthops, and the destination we were +// trying to reach. +var next_hops *expvar.Int +var destination *expvar.String + +func init() { + next_hops = expvar.NewInt("next_hops") + destination = expvar.NewString("destination") +} + // Takes care of maintaining and relaying maps and insures that we know which // interfaces can reach which addresses. type reachabilityHandler struct { @@ -37,8 +48,16 @@ func newReachability(me types.NodeAddress) *reachabilityHandler { func (m *reachabilityHandler) addMap(address types.NodeAddress, new_map *BloomReachabilityMap) { m.l.Lock() defer m.l.Unlock() - m.maps[address].Merge(new_map) + + if !m.maps[address].Merge(new_map) { + // If this returns false, then we know we have already seen this map and + // passed it along. + log.Print("Dropping duplicate map.") + return + } + m.merged_map.Merge(new_map) + for addr, conn := range m.conns { if addr != address { conn.SendMap(new_map.Copy()) @@ -61,7 +80,7 @@ func (m *reachabilityHandler) AddConnection(id types.NodeAddress, c MapConnectio defer m.l.Unlock() err := c.SendMap(initial_map) if err != nil { - log.Fatal(err) + log.Fatalf("Sending map failed: %s\n", err) } }() @@ -92,14 +111,17 @@ func (m *reachabilityHandler) HandleConnection(id types.NodeAddress, c MapConnec // All the nodes that we could possibly send the packet to. func (m *reachabilityHandler) FindPossibleDests(id types.NodeAddress, src types.NodeAddress) ([]types.NodeAddress, error) { + human_address, err := id.MarshalText() + if err != nil { + log.Printf("Warning: Converting to human-readable address failed: %s\n", err) + } + destination.Set(string(human_address)) + m.l.Lock() defer m.l.Unlock() _, ok := m.conns[id] - if ok { - return []types.NodeAddress{id}, nil - } - - if id == m.me { + if ok || id == m.me { + next_hops.Set(1) return []types.NodeAddress{id}, nil } @@ -116,8 +138,10 @@ func (m *reachabilityHandler) FindPossibleDests(id types.NodeAddress, } if len(dests) == 0 { + next_hops.Set(0) return nil, errors.New("Unable to find host") } + next_hops.Set(int64(len(dests))) return dests, nil }