Skip to content

Commit

Permalink
fix eds when discovery endabled
Browse files Browse the repository at this point in the history
  • Loading branch information
yuval-k committed Nov 21, 2024
1 parent d613a7d commit 50da791
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 6 deletions.
25 changes: 21 additions & 4 deletions projects/gateway2/krtcollections/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package krtcollections

import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
"hash/fnv"
Expand Down Expand Up @@ -128,14 +129,16 @@ type EndpointsForUpstream struct {
Hostname string

LbEpsEqualityHash uint64
upstreamHash uint64
epsEqualityHash uint64
}

func NewEndpointsForUpstream(us UpstreamWrapper, logger *zap.Logger) *EndpointsForUpstream {
// start with a hash of the cluster name. technically we dont need it for krt, as we can compare the upstream name. but it helps later
// to compute the hash we present envoy with.
h := fnv.New64()
h.Write([]byte(us.Inner.GetMetadata().Ref().String()))
lbEpsEqualityHash := h.Sum64()
upstreamHash := h.Sum64()

// add the upstream hash to the clustername, so that if it changes the envoy cluster will become warm again.
clusterName := GetEndpointClusterName(us.Inner)
Expand All @@ -148,12 +151,13 @@ func NewEndpointsForUpstream(us UpstreamWrapper, logger *zap.Logger) *EndpointsF
},
Port: ggv2utils.GetPortForUpstream(us.Inner),
Hostname: ggv2utils.GetHostnameForUpstream(us.Inner),
LbEpsEqualityHash: lbEpsEqualityHash,
LbEpsEqualityHash: upstreamHash,
upstreamHash: upstreamHash,
}
}

func hashEndpoints(l PodLocality, emd EndpointWithMd) uint64 {
hasher := fnv.New64()
hasher := fnv.New64a()
hasher.Write([]byte(l.Region))
hasher.Write([]byte(l.Zone))
hasher.Write([]byte(l.Subzone))
Expand All @@ -163,10 +167,23 @@ func hashEndpoints(l PodLocality, emd EndpointWithMd) uint64 {
return hasher.Sum64()
}

func hash(a, b uint64) uint64 {
hasher := fnv.New64a()
var buf [16]byte
binary.NativeEndian.PutUint64(buf[:8], a)
binary.NativeEndian.PutUint64(buf[8:], b)
hasher.Write(buf[:])
return hasher.Sum64()
}

func (e *EndpointsForUpstream) Add(l PodLocality, emd EndpointWithMd) {
// xor it as we dont care about order - if we have the same endpoints in the same locality
// we are good.
e.LbEpsEqualityHash ^= hashEndpoints(l, emd)
e.epsEqualityHash ^= hashEndpoints(l, emd)
// we can't xor the endpoint hash with the upstream hash, because upstreams with
// different names and similar endpoints will cancel out, so endpoint changes
// won't result in different equality hashes.
e.LbEpsEqualityHash = hash(e.epsEqualityHash, e.upstreamHash)
e.LbEps[l] = append(e.LbEps[l], emd)
}

Expand Down
108 changes: 108 additions & 0 deletions projects/gateway2/krtcollections/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,114 @@ func TestEndpointsForUpstreamOrderDoesntMatter(t *testing.T) {

}

func TestEndpointsForUpstreamWithDiscoveredUpstream(t *testing.T) {
g := gomega.NewWithT(t)

us := UpstreamWrapper{
Inner: &gloov1.Upstream{
Metadata: &core.Metadata{Name: "name", Namespace: "ns"},
UpstreamType: &gloov1.Upstream_Kube{
Kube: &kubernetes.UpstreamSpec{
ServiceName: "svc",
ServiceNamespace: "ns",
ServicePort: 8080,
},
},
},
}
usd := UpstreamWrapper{
Inner: &gloov1.Upstream{
Metadata: &core.Metadata{Name: "discovered-name", Namespace: "ns"},
UpstreamType: &gloov1.Upstream_Kube{
Kube: &kubernetes.UpstreamSpec{
ServiceName: "svc",
ServiceNamespace: "ns",
ServicePort: 8080,
},
},
},
}
// input
emd1 := EndpointWithMd{
LbEndpoint: &endpointv3.LbEndpoint{
HostIdentifier: &endpointv3.LbEndpoint_Endpoint{
Endpoint: &endpointv3.Endpoint{
Address: &envoy_config_core_v3.Address{
Address: &envoy_config_core_v3.Address_SocketAddress{
SocketAddress: &envoy_config_core_v3.SocketAddress{
Address: "1.2.3.4",
PortSpecifier: &envoy_config_core_v3.SocketAddress_PortValue{
PortValue: 8080,
},
},
},
},
},
},
},
EndpointMd: EndpointMetadata{
Labels: map[string]string{
corev1.LabelTopologyRegion: "region",
corev1.LabelTopologyZone: "zone",
},
},
}
emd2 := EndpointWithMd{
LbEndpoint: &endpointv3.LbEndpoint{
HostIdentifier: &endpointv3.LbEndpoint_Endpoint{
Endpoint: &endpointv3.Endpoint{
Address: &envoy_config_core_v3.Address{
Address: &envoy_config_core_v3.Address_SocketAddress{
SocketAddress: &envoy_config_core_v3.SocketAddress{
Address: "1.2.3.5",
PortSpecifier: &envoy_config_core_v3.SocketAddress_PortValue{
PortValue: 8080,
},
},
},
},
},
},
},
EndpointMd: EndpointMetadata{
Labels: map[string]string{
corev1.LabelTopologyRegion: "region",
corev1.LabelTopologyZone: "zone",
},
},
}

result1 := NewEndpointsForUpstream(us, nil)
result1.Add(PodLocality{
Region: "region",
Zone: "zone",
}, emd1)

result2 := NewEndpointsForUpstream(usd, nil)
result2.Add(PodLocality{
Region: "region",
Zone: "zone",
}, emd1)

result3 := NewEndpointsForUpstream(us, nil)
result3.Add(PodLocality{
Region: "region",
Zone: "zone",
}, emd2)

result4 := NewEndpointsForUpstream(usd, nil)
result4.Add(PodLocality{
Region: "region",
Zone: "zone",
}, emd2)

h1 := result1.LbEpsEqualityHash ^ result2.LbEpsEqualityHash
h2 := result3.LbEpsEqualityHash ^ result4.LbEpsEqualityHash

g.Expect(h1).NotTo(Equal(h2), "not expected %v, got %v", h1, h2)

}

func TestEndpoints(t *testing.T) {
testCases := []struct {
name string
Expand Down
4 changes: 2 additions & 2 deletions projects/gateway2/utils/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import (
)

func HashProto(resource proto.Message) uint64 {
hasher := fnv.New64()
hasher := fnv.New64a()
HashProtoWithHasher(hasher, resource)
return hasher.Sum64()
}

func HashProtoWithHasher(hasher hash.Hash64, resource proto.Message) {
func HashProtoWithHasher(hasher hash.Hash, resource proto.Message) {
var buffer [1024]byte
mo := proto.MarshalOptions{Deterministic: true}
buf := buffer[:0]
Expand Down

0 comments on commit 50da791

Please sign in to comment.