Skip to content

Commit

Permalink
[occm] ensure octavia monitor is always updated (#2422)
Browse files Browse the repository at this point in the history
  • Loading branch information
kayrus authored Oct 11, 2023
1 parent 6c1f7e7 commit 74e30ac
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 73 deletions.
158 changes: 85 additions & 73 deletions pkg/openstack/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ const (
// See https://nip.io
defaultProxyHostnameSuffix = "nip.io"
ServiceAnnotationLoadBalancerID = "loadbalancer.openstack.org/load-balancer-id"

// Octavia resources name formats
lbFormat = "%s%s_%s_%s"
listenerFormat = "listener_%d_%s"
poolFormat = "pool_%d_%s"
monitorFormat = "monitor_%d_%s"
)

// LbaasV2 is a LoadBalancer implementation based on Octavia
Expand Down Expand Up @@ -514,20 +520,17 @@ func (lbaas *LbaasV2) createFullyPopulatedOctaviaLoadBalancer(name, clusterName
}

for portIndex, port := range service.Spec.Ports {
listenerCreateOpt := lbaas.buildListenerCreateOpt(port, svcConf)
listenerCreateOpt.Name = cutString(fmt.Sprintf("listener_%d_%s", portIndex, name))
listenerCreateOpt := lbaas.buildListenerCreateOpt(port, svcConf, cpoutil.Sprintf255(listenerFormat, portIndex, name))
members, newMembers, err := lbaas.buildBatchUpdateMemberOpts(port, nodes, svcConf)
if err != nil {
return nil, err
}
poolCreateOpt := lbaas.buildPoolCreateOpt(string(listenerCreateOpt.Protocol), service, svcConf)
poolCreateOpt := lbaas.buildPoolCreateOpt(string(listenerCreateOpt.Protocol), service, svcConf, cpoutil.Sprintf255(poolFormat, portIndex, name))
poolCreateOpt.Members = members
// Pool name must be provided to create fully populated loadbalancer
poolCreateOpt.Name = cutString(fmt.Sprintf("pool_%d_%s", portIndex, name))
var withHealthMonitor string
if svcConf.enableMonitor {
opts := lbaas.buildMonitorCreateOpts(svcConf, port)
opts.Name = cutString(fmt.Sprintf("monitor_%d_%s", port.Port, name))
opts := lbaas.buildMonitorCreateOpts(svcConf, port, cpoutil.Sprintf255(monitorFormat, portIndex, name))
poolCreateOpt.Monitor = &opts
withHealthMonitor = " with healthmonitor"
}
Expand Down Expand Up @@ -598,24 +601,14 @@ func (lbaas *LbaasV2) GetLoadBalancer(ctx context.Context, clusterName string, s

// GetLoadBalancerName returns the constructed load balancer name.
func (lbaas *LbaasV2) GetLoadBalancerName(_ context.Context, clusterName string, service *corev1.Service) string {
name := fmt.Sprintf("%s%s_%s_%s", servicePrefix, clusterName, service.Namespace, service.Name)
return cutString(name)
return cpoutil.Sprintf255(lbFormat, servicePrefix, clusterName, service.Namespace, service.Name)
}

// getLoadBalancerLegacyName returns the legacy load balancer name for backward compatibility.
func (lbaas *LbaasV2) getLoadBalancerLegacyName(_ context.Context, _ string, service *corev1.Service) string {
return cloudprovider.DefaultLoadBalancerName(service)
}

// cutString makes sure the string length doesn't exceed 255, which is usually the maximum string length in OpenStack.
func cutString(original string) string {
ret := original
if len(original) > 255 {
ret = original[:255]
}
return ret
}

// The LB needs to be configured with instance addresses on the same
// subnet as the LB (aka opts.SubnetID). Currently, we're just
// guessing that the node's InternalIP is the right address.
Expand Down Expand Up @@ -1079,55 +1072,56 @@ func (lbaas *LbaasV2) ensureFloatingIP(clusterName string, service *corev1.Servi
func (lbaas *LbaasV2) ensureOctaviaHealthMonitor(lbID string, name string, pool *v2pools.Pool, port corev1.ServicePort, svcConf *serviceConfig) error {
monitorID := pool.MonitorID

if monitorID != "" {
monitor, err := openstackutil.GetHealthMonitor(lbaas.lb, monitorID)
if err != nil {
return err
}
//Recreate health monitor with correct protocol if externalTrafficPolicy was changed
createOpts := lbaas.buildMonitorCreateOpts(svcConf, port)
if createOpts.Type != monitor.Type {
klog.InfoS("Recreating health monitor for the pool", "pool", pool.ID, "oldMonitor", monitorID)
if err := openstackutil.DeleteHealthMonitor(lbaas.lb, monitorID, lbID); err != nil {
return err
}
monitorID = ""
}
if svcConf.healthMonitorDelay != monitor.Delay ||
svcConf.healthMonitorTimeout != monitor.Timeout ||
svcConf.healthMonitorMaxRetries != monitor.MaxRetries ||
svcConf.healthMonitorMaxRetriesDown != monitor.MaxRetriesDown {
updateOpts := v2monitors.UpdateOpts{
Delay: svcConf.healthMonitorDelay,
Timeout: svcConf.healthMonitorTimeout,
MaxRetries: svcConf.healthMonitorMaxRetries,
MaxRetriesDown: svcConf.healthMonitorMaxRetriesDown,
}
klog.Infof("Updating health monitor %s updateOpts %+v", monitorID, updateOpts)
if err := openstackutil.UpdateHealthMonitor(lbaas.lb, monitorID, updateOpts); err != nil {
return err
}
if monitorID == "" {
// do nothing
if !svcConf.enableMonitor {
return nil
}
}
if monitorID == "" && svcConf.enableMonitor {

// a new monitor must be created
klog.V(2).Infof("Creating monitor for pool %s", pool.ID)
createOpts := lbaas.buildMonitorCreateOpts(svcConf, port, name)
return lbaas.createOctaviaHealthMonitor(createOpts, pool.ID, lbID)
}

createOpts := lbaas.buildMonitorCreateOpts(svcConf, port)
// Populate PoolID, attribute is omitted for consumption of the createOpts for fully populated Loadbalancer
createOpts.PoolID = pool.ID
createOpts.Name = name
monitor, err := openstackutil.CreateHealthMonitor(lbaas.lb, createOpts, lbID)
if err != nil {
return err
}
monitorID = monitor.ID
klog.Infof("Health monitor %s for pool %s created.", monitorID, pool.ID)
} else if monitorID != "" && !svcConf.enableMonitor {
// an existing monitor must be deleted
if !svcConf.enableMonitor {
klog.Infof("Deleting health monitor %s for pool %s", monitorID, pool.ID)
return openstackutil.DeleteHealthMonitor(lbaas.lb, monitorID, lbID)
}

// get an existing monitor status
monitor, err := openstackutil.GetHealthMonitor(lbaas.lb, monitorID)
if err != nil {
// return err on 404 is ok, since we get monitorID dynamically from the pool
return err
}

// recreate health monitor with a new type
createOpts := lbaas.buildMonitorCreateOpts(svcConf, port, name)
if createOpts.Type != monitor.Type {
klog.InfoS("Recreating health monitor for the pool", "pool", pool.ID, "oldMonitor", monitorID)
if err := openstackutil.DeleteHealthMonitor(lbaas.lb, monitorID, lbID); err != nil {
return err
}
return lbaas.createOctaviaHealthMonitor(createOpts, pool.ID, lbID)
}

// update new monitor parameters
if name != monitor.Name ||
svcConf.healthMonitorDelay != monitor.Delay ||
svcConf.healthMonitorTimeout != monitor.Timeout ||
svcConf.healthMonitorMaxRetries != monitor.MaxRetries ||
svcConf.healthMonitorMaxRetriesDown != monitor.MaxRetriesDown {
updateOpts := v2monitors.UpdateOpts{
Name: &name,
Delay: svcConf.healthMonitorDelay,
Timeout: svcConf.healthMonitorTimeout,
MaxRetries: svcConf.healthMonitorMaxRetries,
MaxRetriesDown: svcConf.healthMonitorMaxRetriesDown,
}
klog.Infof("Updating health monitor %s updateOpts %+v", monitorID, updateOpts)
return openstackutil.UpdateHealthMonitor(lbaas.lb, monitorID, updateOpts)
}

return nil
Expand All @@ -1137,7 +1131,9 @@ func (lbaas *LbaasV2) canUseHTTPMonitor(port corev1.ServicePort) bool {
if lbaas.opts.LBProvider == "ovn" {
// ovn-octavia-provider doesn't support HTTP monitors at all. We got to avoid creating it with ovn.
return false
} else if port.Protocol == corev1.ProtocolUDP {
}

if port.Protocol == corev1.ProtocolUDP {
// Older Octavia versions or OVN provider doesn't support HTTP monitors on UDP pools. We got to check if that's the case.
return openstackutil.IsOctaviaFeatureSupported(lbaas.lb, openstackutil.OctaviaFeatureHTTPMonitorsOnUDP, lbaas.opts.LBProvider)
}
Expand All @@ -1146,8 +1142,9 @@ func (lbaas *LbaasV2) canUseHTTPMonitor(port corev1.ServicePort) bool {
}

// buildMonitorCreateOpts returns a v2monitors.CreateOpts without PoolID for consumption of both, fully popuplated Loadbalancers and Monitors.
func (lbaas *LbaasV2) buildMonitorCreateOpts(svcConf *serviceConfig, port corev1.ServicePort) v2monitors.CreateOpts {
func (lbaas *LbaasV2) buildMonitorCreateOpts(svcConf *serviceConfig, port corev1.ServicePort, name string) v2monitors.CreateOpts {
opts := v2monitors.CreateOpts{
Name: name,
Type: string(port.Protocol),
Delay: svcConf.healthMonitorDelay,
Timeout: svcConf.healthMonitorTimeout,
Expand All @@ -1166,6 +1163,18 @@ func (lbaas *LbaasV2) buildMonitorCreateOpts(svcConf *serviceConfig, port corev1
return opts
}

func (lbaas *LbaasV2) createOctaviaHealthMonitor(createOpts v2monitors.CreateOpts, poolID, lbID string) error {
// populate PoolID, attribute is omitted for consumption of the createOpts for fully populated Loadbalancer
createOpts.PoolID = poolID
monitor, err := openstackutil.CreateHealthMonitor(lbaas.lb, createOpts, lbID)
if err != nil {
return err
}
klog.Infof("Health monitor %s for pool %s created.", monitor.ID, poolID)

return nil
}

// Make sure the pool is created for the Service, nodes are added as pool members.
func (lbaas *LbaasV2) ensureOctaviaPool(lbID string, name string, listener *listeners.Listener, service *corev1.Service, port corev1.ServicePort, nodes []*corev1.Node, svcConf *serviceConfig) (*v2pools.Pool, error) {
pool, err := openstackutil.GetPoolByListener(lbaas.lb, lbID, listener.ID)
Expand Down Expand Up @@ -1193,9 +1202,8 @@ func (lbaas *LbaasV2) ensureOctaviaPool(lbID string, name string, listener *list
}

if pool == nil {
createOpt := lbaas.buildPoolCreateOpt(listener.Protocol, service, svcConf)
createOpt := lbaas.buildPoolCreateOpt(listener.Protocol, service, svcConf, name)
createOpt.ListenerID = listener.ID
createOpt.Name = name

klog.InfoS("Creating pool", "listenerID", listener.ID, "protocol", createOpt.Protocol)
pool, err = openstackutil.CreatePool(lbaas.lb, createOpt, lbID)
Expand Down Expand Up @@ -1230,7 +1238,7 @@ func (lbaas *LbaasV2) ensureOctaviaPool(lbID string, name string, listener *list
return pool, nil
}

func (lbaas *LbaasV2) buildPoolCreateOpt(listenerProtocol string, service *corev1.Service, svcConf *serviceConfig) v2pools.CreateOpts {
func (lbaas *LbaasV2) buildPoolCreateOpt(listenerProtocol string, service *corev1.Service, svcConf *serviceConfig, name string) v2pools.CreateOpts {
// By default, use the protocol of the listener
poolProto := v2pools.Protocol(listenerProtocol)
if svcConf.enableProxyProtocol {
Expand All @@ -1257,6 +1265,7 @@ func (lbaas *LbaasV2) buildPoolCreateOpt(listenerProtocol string, service *corev

lbmethod := v2pools.LBMethod(lbaas.opts.LBMethod)
return v2pools.CreateOpts{
Name: name,
Protocol: poolProto,
LBMethod: lbmethod,
Persistence: persistence,
Expand Down Expand Up @@ -1309,9 +1318,8 @@ func (lbaas *LbaasV2) ensureOctaviaListener(lbID string, name string, curListene
Port: int(port.Port),
}]
if !isPresent {
listenerCreateOpt := lbaas.buildListenerCreateOpt(port, svcConf)
listenerCreateOpt := lbaas.buildListenerCreateOpt(port, svcConf, name)
listenerCreateOpt.LoadbalancerID = lbID
listenerCreateOpt.Name = name

klog.V(2).Infof("Creating listener for port %d using protocol %s", int(port.Port), listenerCreateOpt.Protocol)

Expand Down Expand Up @@ -1396,11 +1404,10 @@ func (lbaas *LbaasV2) ensureOctaviaListener(lbID string, name string, curListene
}

// buildListenerCreateOpt returns listeners.CreateOpts for a specific Service port and configuration
func (lbaas *LbaasV2) buildListenerCreateOpt(port corev1.ServicePort, svcConf *serviceConfig) listeners.CreateOpts {
listenerProtocol := listeners.Protocol(port.Protocol)

func (lbaas *LbaasV2) buildListenerCreateOpt(port corev1.ServicePort, svcConf *serviceConfig, name string) listeners.CreateOpts {
listenerCreateOpt := listeners.CreateOpts{
Protocol: listenerProtocol,
Name: name,
Protocol: listeners.Protocol(port.Protocol),
ProtocolPort: int(port.Port),
ConnLimit: &svcConf.connLimit,
}
Expand Down Expand Up @@ -1999,17 +2006,17 @@ func (lbaas *LbaasV2) ensureOctaviaLoadBalancer(ctx context.Context, clusterName
}

for portIndex, port := range service.Spec.Ports {
listener, err := lbaas.ensureOctaviaListener(loadbalancer.ID, cutString(fmt.Sprintf("listener_%d_%s", portIndex, lbName)), curListenerMapping, port, svcConf, service)
listener, err := lbaas.ensureOctaviaListener(loadbalancer.ID, cpoutil.Sprintf255(listenerFormat, portIndex, lbName), curListenerMapping, port, svcConf, service)
if err != nil {
return nil, err
}

pool, err := lbaas.ensureOctaviaPool(loadbalancer.ID, cutString(fmt.Sprintf("pool_%d_%s", portIndex, lbName)), listener, service, port, nodes, svcConf)
pool, err := lbaas.ensureOctaviaPool(loadbalancer.ID, cpoutil.Sprintf255(poolFormat, portIndex, lbName), listener, service, port, nodes, svcConf)
if err != nil {
return nil, err
}

if err := lbaas.ensureOctaviaHealthMonitor(loadbalancer.ID, cutString(fmt.Sprintf("monitor_%d_%s", portIndex, lbName)), pool, port, svcConf); err != nil {
if err := lbaas.ensureOctaviaHealthMonitor(loadbalancer.ID, cpoutil.Sprintf255(monitorFormat, portIndex, lbName), pool, port, svcConf); err != nil {
return nil, err
}

Expand Down Expand Up @@ -2199,7 +2206,12 @@ func (lbaas *LbaasV2) updateOctaviaLoadBalancer(ctx context.Context, clusterName
return fmt.Errorf("loadbalancer %s does not contain required listener for port %d and protocol %s", loadbalancer.ID, port.Port, port.Protocol)
}

_, err := lbaas.ensureOctaviaPool(loadbalancer.ID, cutString(fmt.Sprintf("pool_%d_%s", portIndex, loadbalancer.Name)), &listener, service, port, nodes, svcConf)
pool, err := lbaas.ensureOctaviaPool(loadbalancer.ID, cpoutil.Sprintf255(poolFormat, portIndex, loadbalancer.Name), &listener, service, port, nodes, svcConf)
if err != nil {
return err
}

err = lbaas.ensureOctaviaHealthMonitor(loadbalancer.ID, cpoutil.Sprintf255(monitorFormat, portIndex, loadbalancer.Name), pool, port, svcConf)
if err != nil {
return err
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,20 @@ import (
"k8s.io/klog/v2"
)

// CutString255 makes sure the string length doesn't exceed 255, which is usually the maximum string length in OpenStack.
func CutString255(original string) string {
ret := original
if len(original) > 255 {
ret = original[:255]
}
return ret
}

// Sprintf255 formats according to a format specifier and returns the resulting string with a maximum length of 255 characters.
func Sprintf255(format string, args ...interface{}) string {
return CutString255(fmt.Sprintf(format, args...))
}

// MyDuration is the encoding.TextUnmarshaler interface for time.Duration
type MyDuration struct {
time.Duration
Expand Down

0 comments on commit 74e30ac

Please sign in to comment.