Skip to content

Commit

Permalink
feat: support externalname services
Browse files Browse the repository at this point in the history
  • Loading branch information
msvticket committed Feb 20, 2024
1 parent e5fdc89 commit 21588bb
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 152 deletions.
70 changes: 55 additions & 15 deletions pkg/backend/endpoint_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package backend
import (
"context"
"fmt"
"net"

awssdk "github.com/aws/aws-sdk-go/aws"
"github.com/go-logr/logr"
Expand All @@ -27,12 +28,17 @@ var ErrNotFound = errors.New("backend not found")
type EndpointResolver interface {
// ResolvePodEndpoints will resolve endpoints backed by pods directly.
// returns resolved podEndpoints and whether there are unready endpoints that can potentially turn ready in future reconciles.
ResolvePodEndpoints(ctx context.Context, svcKey types.NamespacedName, port intstr.IntOrString,
opts ...EndpointResolveOption) ([]PodEndpoint, bool, error)
ResolvePodEndpoints(ctx context.Context, svckey types.NamespacedName, svc *corev1.Service, port intstr.IntOrString, opts ...EndpointResolveOption) ([]IpEndpoint, bool, error)

// ResolveNodePortEndpoints will resolve endpoints backed by nodePort.
ResolveNodePortEndpoints(ctx context.Context, svcKey types.NamespacedName, port intstr.IntOrString,
opts ...EndpointResolveOption) ([]NodePortEndpoint, error)

// FindService finds a k8s service
FindService(ctx context.Context, svcKey types.NamespacedName) (*corev1.Service, error)

// ResolveExternalNameEndpoints will resolve external name using dns
ResolveExternalNameEndpoints(ctx context.Context, svc *corev1.Service, port intstr.IntOrString) ([]IpEndpoint, error)
}

// NewDefaultEndpointResolver constructs new defaultEndpointResolver
Expand All @@ -42,6 +48,7 @@ func NewDefaultEndpointResolver(k8sClient client.Client, podInfoRepo k8s.PodInfo
podInfoRepo: podInfoRepo,
failOpenEnabled: failOpenEnabled,
endpointSliceEnabled: endpointSliceEnabled,
dnsResolver: net.DefaultResolver,
logger: logger,
}
}
Expand All @@ -58,13 +65,34 @@ type defaultEndpointResolver struct {
// [Pod Endpoint] whether to use endpointSlice instead of endpoints
endpointSliceEnabled bool
logger logr.Logger
// dnsResolver to use for resolving external names
dnsResolver dnsResolver
}

type dnsResolver interface {
LookupHost(ctx context.Context, host string) (addrs []string, err error)
}

func (r *defaultEndpointResolver) ResolveExternalNameEndpoints(ctx context.Context, svc *corev1.Service, port intstr.IntOrString) ([]IpEndpoint, error) {
if port.Type == intstr.String {
return nil, fmt.Errorf("port of target group must be numeric for external name")
}
addrs, err := r.dnsResolver.LookupHost(ctx, svc.Spec.ExternalName)
if err != nil {
return nil, err
}
endpoints := make([]IpEndpoint, len(addrs))
for i, ip := range addrs {
endpoints[i] = IpEndpoint{IP: ip, Port: int64(port.IntVal)}
}
return endpoints, nil
}

func (r *defaultEndpointResolver) ResolvePodEndpoints(ctx context.Context, svcKey types.NamespacedName, port intstr.IntOrString, opts ...EndpointResolveOption) ([]PodEndpoint, bool, error) {
func (r *defaultEndpointResolver) ResolvePodEndpoints(ctx context.Context, svcKey types.NamespacedName, svc *corev1.Service, port intstr.IntOrString, opts ...EndpointResolveOption) ([]IpEndpoint, bool, error) {
resolveOpts := defaultEndpointResolveOptions()
resolveOpts.ApplyOptions(opts)

_, svcPort, err := r.findServiceAndServicePort(ctx, svcKey, port)
_, svcPort, err := r.findServicePort(svc, port)
if err != nil {
return nil, false, err
}
Expand Down Expand Up @@ -140,9 +168,9 @@ func (r *defaultEndpointResolver) computeServiceEndpointsData(ctx context.Contex
return endpointsDataList, nil
}

func (r *defaultEndpointResolver) resolvePodEndpointsWithEndpointsData(ctx context.Context, svcKey types.NamespacedName, svcPort corev1.ServicePort, endpointsDataList []EndpointsData, podReadinessGates []corev1.PodConditionType) ([]PodEndpoint, bool, error) {
var readyPodEndpoints []PodEndpoint
var unknownPodEndpoints []PodEndpoint
func (r *defaultEndpointResolver) resolvePodEndpointsWithEndpointsData(ctx context.Context, svcKey types.NamespacedName, svcPort corev1.ServicePort, endpointsDataList []EndpointsData, podReadinessGates []corev1.PodConditionType) ([]IpEndpoint, bool, error) {
var readyPodEndpoints []IpEndpoint
var unknownPodEndpoints []IpEndpoint
containsPotentialReadyEndpoints := false

for _, epsData := range endpointsDataList {
Expand Down Expand Up @@ -170,7 +198,7 @@ func (r *defaultEndpointResolver) resolvePodEndpointsWithEndpointsData(ctx conte
containsPotentialReadyEndpoints = true
continue
}
podEndpoint := buildPodEndpoint(pod, epAddr, epPort)
podEndpoint := buildPodEndpoint(&pod, epAddr, epPort)
if ep.Conditions.Ready != nil && *ep.Conditions.Ready {
readyPodEndpoints = append(readyPodEndpoints, podEndpoint)
continue
Expand Down Expand Up @@ -214,13 +242,14 @@ func (r *defaultEndpointResolver) resolvePodEndpointsWithEndpointsData(ctx conte
}

func (r *defaultEndpointResolver) findServiceAndServicePort(ctx context.Context, svcKey types.NamespacedName, port intstr.IntOrString) (*corev1.Service, corev1.ServicePort, error) {
svc := &corev1.Service{}
if err := r.k8sClient.Get(ctx, svcKey, svc); err != nil {
if apierrors.IsNotFound(err) {
return nil, corev1.ServicePort{}, fmt.Errorf("%w: %v", ErrNotFound, err.Error())
}
svc, err := r.FindService(ctx, svcKey)
if err != nil {
return nil, corev1.ServicePort{}, err
}
return r.findServicePort(svc, port)
}

func (r *defaultEndpointResolver) findServicePort(svc *corev1.Service, port intstr.IntOrString) (*corev1.Service, corev1.ServicePort, error) {
svcPort, err := k8s.LookupServicePort(svc, port)
if err != nil {
return nil, corev1.ServicePort{}, fmt.Errorf("%w: %v", ErrNotFound, err.Error())
Expand All @@ -229,6 +258,17 @@ func (r *defaultEndpointResolver) findServiceAndServicePort(ctx context.Context,
return svc, svcPort, nil
}

func (r *defaultEndpointResolver) FindService(ctx context.Context, svcKey types.NamespacedName) (*corev1.Service, error) {
svc := &corev1.Service{}
if err := r.k8sClient.Get(ctx, svcKey, svc); err != nil {
if apierrors.IsNotFound(err) {
return nil, fmt.Errorf("%w: %v", ErrNotFound, err.Error())
}
return nil, err
}
return svc, nil
}

// filterNodesByReadyConditionStatus will filter out nodes that matches specified ready condition status
func filterNodesByReadyConditionStatus(nodes []*corev1.Node, readyCondStatus corev1.ConditionStatus) []*corev1.Node {
var nodesWithMatchingReadyStatus []*corev1.Node
Expand Down Expand Up @@ -281,8 +321,8 @@ func buildEndpointsDataFromEndpointSliceList(epsList *discovery.EndpointSliceLis
return endpointsDataList
}

func buildPodEndpoint(pod k8s.PodInfo, epAddr string, port int32) PodEndpoint {
return PodEndpoint{
func buildPodEndpoint(pod *k8s.PodInfo, epAddr string, port int32) IpEndpoint {
return IpEndpoint{
IP: epAddr,
Port: int64(port),
Pod: pod,
Expand Down
Loading

0 comments on commit 21588bb

Please sign in to comment.