Skip to content

Commit

Permalink
Per-service load balancing algorithm selection
Browse files Browse the repository at this point in the history
This PR implements CFP-34577

Signed-off-by: Katarzyna Lach <[email protected]>
  • Loading branch information
kl52752 authored and pchaigno committed Nov 25, 2024
1 parent dbb4bda commit cf5eeb8
Show file tree
Hide file tree
Showing 29 changed files with 483 additions and 103 deletions.
1 change: 1 addition & 0 deletions Documentation/cmdref/cilium-agent.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bpf/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ LB_OPTIONS = \
-DENABLE_IPV6:-DENCAP_IFINDEX:-DTUNNEL_MODE:-DENABLE_NODEPORT:-DENABLE_NODEPORT_ACCELERATION:-DENABLE_SESSION_AFFINITY:-DENABLE_BANDWIDTH_MANAGER: \
-DENABLE_IPV6:-DENCAP_IFINDEX:-DTUNNEL_MODE:-DENABLE_NODEPORT:-DENABLE_NODEPORT_ACCELERATION:-DENABLE_SESSION_AFFINITY:-DENABLE_BANDWIDTH_MANAGER:-DENABLE_SRC_RANGE_CHECK: \
-DENABLE_IPV6:-DENCAP_IFINDEX:-DTUNNEL_MODE:-DENABLE_NODEPORT:-DENABLE_NODEPORT_ACCELERATION:-DENABLE_SESSION_AFFINITY:-DENABLE_BANDWIDTH_MANAGER:-DENABLE_SRC_RANGE_CHECK:-DLB_SELECTION:-DLB_SELECTION_MAGLEV: \
-DENABLE_IPV6:-DENCAP_IFINDEX:-DTUNNEL_MODE:-DENABLE_NODEPORT:-DENABLE_NODEPORT_ACCELERATION:-DENABLE_SESSION_AFFINITY:-DENABLE_BANDWIDTH_MANAGER:-DENABLE_SRC_RANGE_CHECK:-DLB_SELECTION:-DLB_ALG_PER_SERVICE:-DLB_SELECTION_MAGLEV \
-DENABLE_IPV6:-DENCAP_IFINDEX:-DTUNNEL_MODE:-DENABLE_NODEPORT:-DENABLE_NODEPORT_ACCELERATION:-DENABLE_SESSION_AFFINITY:-DENABLE_BANDWIDTH_MANAGER:-DENABLE_SRC_RANGE_CHECK:-DLB_SELECTION:-DLB_SELECTION_MAGLEV:-DENABLE_SOCKET_LB_HOST_ONLY: \
-DENABLE_IPV6:-DENCAP_IFINDEX:-DTUNNEL_MODE:-DENABLE_NODEPORT:-DENABLE_NODEPORT_ACCELERATION:-DENABLE_SESSION_AFFINITY:-DENABLE_BANDWIDTH_MANAGER:-DENABLE_SRC_RANGE_CHECK:-DLB_SELECTION:-DLB_SELECTION_MAGLEV:-DENABLE_SOCKET_LB_HOST_ONLY:-DENABLE_L7_LB:-DENABLE_SCTP: \
-DENABLE_IPV6:-DENCAP_IFINDEX:-DTUNNEL_MODE:-DENABLE_NODEPORT:-DENABLE_NODEPORT_ACCELERATION:-DENABLE_SESSION_AFFINITY:-DENABLE_BANDWIDTH_MANAGER:-DENABLE_SRC_RANGE_CHECK:-DLB_SELECTION:-DLB_SELECTION_MAGLEV:-DENABLE_SOCKET_LB_HOST_ONLY:-DENABLE_L7_LB:-DENABLE_SCTP:-DENABLE_VTEP: \
Expand Down
16 changes: 16 additions & 0 deletions bpf/lib/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,14 @@ struct lb6_service {
* slots under quarantine (otherwise zero).
*/
__u16 qcount;
#ifdef LB_ALG_PER_SERVICE
/* Load balancer algorithm
* 1 - random
* 2 - maglev
*/
__u8 lb_alg;
__u8 pad[3];
#endif
};

/* See lb4_backend comments */
Expand Down Expand Up @@ -1088,6 +1096,14 @@ struct lb4_service {
* slots under quarantine (otherwise zero).
*/
__u16 qcount;
#ifdef LB_ALG_PER_SERVICE
/* Load balancer algorithm
* 1 - random
* 2 - maglev
*/
__u8 lb_alg;
__u8 pad[3];
#endif
};

struct lb4_backend {
Expand Down
98 changes: 74 additions & 24 deletions bpf/lib/lb.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ struct {
} LB6_HEALTH_MAP __section_maps_btf;
#endif

#if LB_SELECTION == LB_SELECTION_MAGLEV
#if defined(LB_ALG_PER_SERVICE) || LB_SELECTION == LB_SELECTION_MAGLEV
struct {
__uint(type, BPF_MAP_TYPE_HASH_OF_MAPS);
__type(key, __u16);
Expand Down Expand Up @@ -689,25 +689,26 @@ struct lb6_service *lb6_lookup_backend_slot(struct __ctx_buff *ctx __maybe_unuse
return NULL;
}

/* Backend slot 0 is always reserved for the service frontend. */
#if LB_SELECTION == LB_SELECTION_RANDOM
#if defined(LB_ALG_PER_SERVICE) || LB_SELECTION == LB_SELECTION_RANDOM
static __always_inline __u32
lb6_select_backend_id(struct __ctx_buff *ctx,
struct lb6_key *key,
const struct ipv6_ct_tuple *tuple __maybe_unused,
const struct lb6_service *svc)
lb6_select_backend_id_random(struct __ctx_buff *ctx,
struct lb6_key *key,
const struct ipv6_ct_tuple *tuple __maybe_unused,
const struct lb6_service *svc)
{
__u16 slot = (get_prandom_u32() % svc->count) + 1;
struct lb6_service *be = lb6_lookup_backend_slot(ctx, key, slot);

return be ? be->backend_id : 0;
}
#elif LB_SELECTION == LB_SELECTION_MAGLEV
#endif /* defined(LB_ALG_PER_SERVICE) || LB_SELECTION == LB_SELECTION_RANDOM */

#if defined(LB_ALG_PER_SERVICE) || LB_SELECTION == LB_SELECTION_MAGLEV
static __always_inline __u32
lb6_select_backend_id(struct __ctx_buff *ctx __maybe_unused,
struct lb6_key *key __maybe_unused,
const struct ipv6_ct_tuple *tuple,
const struct lb6_service *svc)
lb6_select_backend_id_maglev(struct __ctx_buff *ctx __maybe_unused,
struct lb6_key *key __maybe_unused,
const struct ipv6_ct_tuple *tuple,
const struct lb6_service *svc)
{
__u32 zero = 0, index = svc->rev_nat_index;
__u32 *backend_ids;
Expand All @@ -722,8 +723,31 @@ lb6_select_backend_id(struct __ctx_buff *ctx __maybe_unused,
return 0;

index = hash_from_tuple_v6(tuple) % LB_MAGLEV_LUT_SIZE;
return map_array_get_32(backend_ids, index, (LB_MAGLEV_LUT_SIZE - 1) << 2);
return map_array_get_32(backend_ids, index, (LB_MAGLEV_LUT_SIZE - 1) << 2);
}
#endif /* defined(LB_ALG_PER_SERVICE) || LB_SELECTION == LB_SELECTION_RANDOM */

#ifdef LB_ALG_PER_SERVICE
static __always_inline __u32
lb6_select_backend_id(struct __ctx_buff *ctx,
struct lb6_key *key,
const struct ipv6_ct_tuple *tuple __maybe_unused,
const struct lb6_service *svc)
{
if (svc->lb_alg == LB_SELECTION_MAGLEV)
return lb6_select_backend_id_maglev(ctx, key, tuple, svc);

return lb6_select_backend_id_random(ctx, key, tuple, svc);
}
#elif LB_SELECTION == LB_SELECTION_RANDOM
/* Backend slot 0 is always reserved for the service frontend. */

#define lb6_select_backend_id lb6_select_backend_id_random

#elif LB_SELECTION == LB_SELECTION_MAGLEV

#define lb6_select_backend_id lb6_select_backend_id_maglev

#elif LB_SELECTION == LB_SELECTION_FIRST
/* Backend selection for tests that always chooses first slot. */
static __always_inline __u32
Expand Down Expand Up @@ -1370,25 +1394,27 @@ struct lb4_service *lb4_lookup_backend_slot(struct __ctx_buff *ctx __maybe_unuse
return NULL;
}

#if defined(LB_ALG_PER_SERVICE) || LB_SELECTION == LB_SELECTION_RANDOM
/* Backend slot 0 is always reserved for the service frontend. */
#if LB_SELECTION == LB_SELECTION_RANDOM
static __always_inline __u32
lb4_select_backend_id(struct __ctx_buff *ctx,
struct lb4_key *key,
const struct ipv4_ct_tuple *tuple __maybe_unused,
const struct lb4_service *svc)
lb4_select_backend_id_random(struct __ctx_buff *ctx,
struct lb4_key *key,
const struct ipv4_ct_tuple *tuple __maybe_unused,
const struct lb4_service *svc)
{
__u16 slot = (get_prandom_u32() % svc->count) + 1;
struct lb4_service *be = lb4_lookup_backend_slot(ctx, key, slot);

return be ? be->backend_id : 0;
}
#elif LB_SELECTION == LB_SELECTION_MAGLEV
#endif /* LB_ALG_PER_SERVICE || LB_SELECTION == LB_SELECTION_RANDOM */

#if defined(LB_ALG_PER_SERVICE) || LB_SELECTION == LB_SELECTION_MAGLEV
static __always_inline __u32
lb4_select_backend_id(struct __ctx_buff *ctx __maybe_unused,
struct lb4_key *key __maybe_unused,
const struct ipv4_ct_tuple *tuple,
const struct lb4_service *svc)
lb4_select_backend_id_maglev(struct __ctx_buff *ctx __maybe_unused,
struct lb4_key *key __maybe_unused,
const struct ipv4_ct_tuple *tuple,
const struct lb4_service *svc)
{
__u32 zero = 0, index = svc->rev_nat_index;
__u32 *backend_ids;
Expand All @@ -1403,8 +1429,32 @@ lb4_select_backend_id(struct __ctx_buff *ctx __maybe_unused,
return 0;

index = hash_from_tuple_v4(tuple) % LB_MAGLEV_LUT_SIZE;
return map_array_get_32(backend_ids, index, (LB_MAGLEV_LUT_SIZE - 1) << 2);
return map_array_get_32(backend_ids, index, (LB_MAGLEV_LUT_SIZE - 1) << 2);
}
#endif /* LB_ALG_PER_SERVICE || LB_SELECTION == LB_SELECTION_MAGLEV */

#ifdef LB_ALG_PER_SERVICE

static __always_inline __u32
lb4_select_backend_id(struct __ctx_buff *ctx,
struct lb4_key *key,
const struct ipv4_ct_tuple *tuple __maybe_unused,
const struct lb4_service *svc)
{
if (svc->lb_alg == LB_SELECTION_MAGLEV)
return lb4_select_backend_id_maglev(ctx, key, tuple, svc);

return lb4_select_backend_id_random(ctx, key, tuple, svc);
}

#elif LB_SELECTION == LB_SELECTION_RANDOM

#define lb4_select_backend_id lb4_select_backend_id_random

#elif LB_SELECTION == LB_SELECTION_MAGLEV

#define lb4_select_backend_id lb4_select_backend_id_maglev

#elif LB_SELECTION == LB_SELECTION_FIRST
/* Backend selection for tests that always chooses first slot. */
static __always_inline __u32
Expand Down
3 changes: 3 additions & 0 deletions daemon/cmd/daemon_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,9 @@ func InitGlobalFlags(cmd *cobra.Command, vp *viper.Viper) {
flags.String(option.LoadBalancerMode, option.NodePortModeSNAT, "BPF load balancing mode (\"snat\", \"dsr\", \"hybrid\")")
option.BindEnv(vp, option.LoadBalancerMode)

flags.Bool(option.LoadBalancerAlgAnnotation, false, "Tells whether controller should check service level annotation for configuring bpf load balancing algorithm.")
option.BindEnv(vp, option.LoadBalancerAlgAnnotation)

flags.String(option.LoadBalancerAlg, option.NodePortAlgRandom, "BPF load balancing algorithm (\"random\", \"maglev\")")
option.BindEnv(vp, option.LoadBalancerAlg)

Expand Down
3 changes: 2 additions & 1 deletion daemon/cmd/datapath.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ func (d *Daemon) initMaps() error {
}
}

if option.Config.NodePortAlg == option.NodePortAlgMaglev {
if option.Config.NodePortAlg == option.NodePortAlgMaglev ||
option.Config.LoadBalancerAlgAnnotation {
if err := lbmap.InitMaglevMaps(option.Config.EnableIPv4, option.Config.EnableIPv6, uint32(option.Config.MaglevTableSize)); err != nil {
return fmt.Errorf("initializing maglev maps: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion daemon/cmd/kube_proxy_replacement.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ func initKubeProxyReplacementOptions(sysctl sysctl.Sysctl, tunnelConfig tunnel.C
log.Warning("NodePort BPF configured without bind(2) protection against service ports")
}

if option.Config.NodePortAlg == option.NodePortAlgMaglev {
if option.Config.NodePortAlg == option.NodePortAlgMaglev ||
option.Config.LoadBalancerAlgAnnotation {
// "Let N be the size of a VIP's backend pool." [...] "In practice, we choose M to be
// larger than 100 x N to ensure at most a 1% difference in hash space assigned to
// backends." (from Maglev paper, page 6)
Expand Down
3 changes: 3 additions & 0 deletions daemon/cmd/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,9 @@ func (d *Daemon) getKubeProxyReplacementStatus() *models.KubeProxyReplacement {
features.NodePort.Algorithm = models.KubeProxyReplacementFeaturesNodePortAlgorithmMaglev
features.NodePort.LutSize = int64(option.Config.MaglevTableSize)
}
if option.Config.LoadBalancerAlgAnnotation {
features.NodePort.LutSize = int64(option.Config.MaglevTableSize)
}
if option.Config.NodePortAcceleration == option.NodePortAccelerationGeneric {
features.NodePort.Acceleration = models.KubeProxyReplacementFeaturesNodePortAccelerationGeneric
} else {
Expand Down
8 changes: 8 additions & 0 deletions pkg/annotation/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ const (
ServiceAffinity = ServicePrefix + "/affinity"
ServiceAffinityAlias = Prefix + "/service-affinity"

// ServiceLoadBalancingAlgo indicates which backend selection algorithm for
// a given Service to use. This annotation will override the default value
// set in bpf-lb-algorithm.
// Allowed values:
// - random
// - maglev
ServiceLoadBalancingAlgo = ServicePrefix + "/lb-algorithm"

// ServiceNodeExposure is the label name used to mark a service to only a
// subset of the nodes which match the same value. For all other nodes, this
// service is ignored and not installed into their datapath.
Expand Down
16 changes: 14 additions & 2 deletions pkg/datapath/alignchecker/alignchecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,20 @@ func CheckStructAlignments(path string) error {
return check.CheckStructAlignments(path, toCheckSizes, false)
}

func RegisterLbStructsToCheck(isPerSvcLb bool) {
if isPerSvcLb {
registerToCheck(map[string][]any{
"lb4_service": {lbmap.Service4ExtendedValue{}},
"lb6_service": {lbmap.Service6ExtendedValue{}},
})
} else {
registerToCheck(map[string][]any{
"lb4_service": {lbmap.Service4Value{}},
"lb6_service": {lbmap.Service6Value{}},
})
}
}

func init() {
registerToCheck(map[string][]any{
"ipv4_ct_tuple": {ctmap.CtKey4{}, ctmap.CtKey4Global{}},
Expand All @@ -68,10 +82,8 @@ func init() {
"ipcache_key": {ipcachemap.Key{}},
"remote_endpoint_info": {ipcachemap.RemoteEndpointInfo{}},
"lb4_key": {lbmap.Service4Key{}},
"lb4_service": {lbmap.Service4Value{}},
"lb4_backend": {lbmap.Backend4ValueV3{}},
"lb6_key": {lbmap.Service6Key{}},
"lb6_service": {lbmap.Service6Value{}},
"lb6_backend": {lbmap.Backend6ValueV3{}},
"endpoint_info": {lxcmap.EndpointInfo{}},
"metrics_key": {metricsmap.Key{}},
Expand Down
9 changes: 9 additions & 0 deletions pkg/datapath/linux/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,10 +540,19 @@ func (h *HeaderfileWriter) WriteNodeConfig(w io.Writer, cfg *datapath.LocalNodeC
)
cDefinesMap["LB_SELECTION_RANDOM"] = fmt.Sprintf("%d", selectionRandom)
cDefinesMap["LB_SELECTION_MAGLEV"] = fmt.Sprintf("%d", selectionMaglev)
if option.Config.LoadBalancerAlgAnnotation {
cDefinesMap["LB_ALG_PER_SERVICE"] = "1"
}
if option.Config.NodePortAlg == option.NodePortAlgRandom {
cDefinesMap["LB_SELECTION"] = fmt.Sprintf("%d", selectionRandom)
} else if option.Config.NodePortAlg == option.NodePortAlgMaglev {
cDefinesMap["LB_SELECTION"] = fmt.Sprintf("%d", selectionMaglev)
}

// define maglev tables when loadbalancer algorith is maglev or config can
// be set by the Service annotation
if option.Config.LoadBalancerAlgAnnotation ||
option.Config.NodePortAlg == option.NodePortAlgMaglev {
cDefinesMap["LB_MAGLEV_LUT_SIZE"] = fmt.Sprintf("%d", option.Config.MaglevTableSize)
if option.Config.EnableIPv6 {
cDefinesMap["LB6_MAGLEV_MAP_OUTER"] = lbmap.MaglevOuter6MapName
Expand Down
1 change: 1 addition & 0 deletions pkg/datapath/loader/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ func (l *loader) Reinitialize(ctx context.Context, cfg *datapath.LocalNodeConfig
log.WithError(err).Fatal("alignchecker compile failed")
}
// Validate alignments of C and Go equivalent structs
alignchecker.RegisterLbStructsToCheck(option.Config.LoadBalancerAlgAnnotation)
if err := alignchecker.CheckStructAlignments(defaults.AlignCheckerName); err != nil {
log.WithError(err).Fatal("C and Go structs alignment check failed")
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/datapath/maps/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ func (ms *MapSweeper) RemoveDisabledMaps() {
maps = append(maps, lbmap.HealthProbe6MapName, lbmap.HealthProbe4MapName)
}

if option.Config.NodePortAlg != option.NodePortAlgMaglev {
if option.Config.NodePortAlg != option.NodePortAlgMaglev &&
!option.Config.LoadBalancerAlgAnnotation {
maps = append(maps, lbmap.MaglevOuter6MapName, lbmap.MaglevOuter4MapName)
}

Expand Down
1 change: 1 addition & 0 deletions pkg/datapath/types/lbmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type UpsertServiceParams struct {
L7LBProxyPort uint16 // Non-zero for L7 LB services
Name loadbalancer.ServiceName // Fully qualified name of the service
LoopbackHostport bool
LoadBalancingAlgo loadbalancer.SVCLoadBalancingAlgo
}

// GetOrderedBackends returns an ordered list of backends with all the sorted
Expand Down
4 changes: 4 additions & 0 deletions pkg/k8s/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,11 @@ func Test_ParseNodeAddressType(t *testing.T) {
func TestParseNodeWithService(t *testing.T) {
prevAnnotateK8sNode := option.Config.AnnotateK8sNode
option.Config.AnnotateK8sNode = false
oldDefaultLbAlg := option.Config.NodePortAlg
option.Config.NodePortAlg = option.NodePortAlgRandom
defer func() {
option.Config.AnnotateK8sNode = prevAnnotateK8sNode
option.Config.NodePortAlg = oldDefaultLbAlg
}()

k8sNode := &slim_corev1.Node{
Expand Down Expand Up @@ -410,6 +413,7 @@ func TestParseNodeWithService(t *testing.T) {
Ports: map[loadbalancer.FEPortName]*loadbalancer.L4Addr{},
NodePorts: map[loadbalancer.FEPortName]NodePortToFrontend{},
LoadBalancerSourceRanges: map[string]*cidr.CIDR{},
LoadBalancerAlgo: loadbalancer.SVCLoadBalancingAlgoRandom,
Type: loadbalancer.SVCTypeClusterIP,
ForwardingMode: loadbalancer.SVCForwardingModeSNAT,
}, svc)
Expand Down
24 changes: 24 additions & 0 deletions pkg/k8s/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,17 @@ func getAnnotationServiceForwardingMode(svc *slim_corev1.Service) loadbalancer.S

return loadbalancer.SVCForwardingModeSNAT
}
func getAnnotationServiceLoadBalancingAlgo(svc *slim_corev1.Service) (loadbalancer.SVCLoadBalancingAlgo, error) {
if value, ok := annotation.Get(svc, annotation.ServiceLoadBalancingAlgo); ok {
val := loadbalancer.ToSVCLoadBalancingAlgo(strings.ToLower(value))
if val != loadbalancer.SVCLoadBalancingAlgoUndef {
return val, nil
}
return loadbalancer.ToSVCLoadBalancingAlgo(option.Config.NodePortAlg), fmt.Errorf("Value %q is not supported for %q", val, annotation.ServiceLoadBalancingAlgo)
}

return loadbalancer.ToSVCLoadBalancingAlgo(option.Config.NodePortAlg), nil
}

func getTopologyAware(svc *slim_corev1.Service) bool {
return getAnnotationTopologyAwareHints(svc) ||
Expand Down Expand Up @@ -250,6 +261,16 @@ func ParseService(svc *slim_corev1.Service, nodePortAddrs []netip.Addr) (Service
svcInfo.IncludeExternal = getAnnotationIncludeExternal(svc)
svcInfo.ServiceAffinity = getAnnotationServiceAffinity(svc)

if option.Config.LoadBalancerAlgAnnotation {
var err error
svcInfo.LoadBalancerAlgo, err = getAnnotationServiceLoadBalancingAlgo(svc)
if err != nil {
scopedLog.WithError(err).Warnf("Ignoring %q annotation, applying global configuration: %v", annotation.ServiceLoadBalancingAlgo, svcInfo.LoadBalancerAlgo)
}
} else {
svcInfo.LoadBalancerAlgo = loadbalancer.ToSVCLoadBalancingAlgo(option.Config.NodePortAlg)
}

if svc.Spec.SessionAffinity == slim_corev1.ServiceAffinityClientIP {
svcInfo.SessionAffinity = true
if cfg := svc.Spec.SessionAffinityConfig; cfg != nil && cfg.ClientIP != nil && cfg.ClientIP.TimeoutSeconds != nil {
Expand Down Expand Up @@ -426,6 +447,9 @@ type Service struct {
// +deepequal-gen=false
K8sExternalIPs map[string]net.IP

// LoadBalancerAlgo indicates which backend selection algorithm to use.
LoadBalancerAlgo loadbalancer.SVCLoadBalancingAlgo

// LoadBalancerIPs stores LB IPs assigned to the service (string(IP) => IP).
//
// Until deepequal-gen adds support for net.IP we need to compare this field
Expand Down
Loading

0 comments on commit cf5eeb8

Please sign in to comment.