Skip to content

Commit

Permalink
add cilium specific virtual router annotation to nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
iljarotar committed Jul 4, 2024
1 parent c73e25b commit 8879246
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 54 deletions.
26 changes: 13 additions & 13 deletions metal/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func NewCloud(_ io.Reader) (cloudprovider.Interface, error) {
partitionID := os.Getenv(constants.MetalPartitionIDEnvVar)
clusterID := os.Getenv(constants.MetalClusterIDEnvVar)
defaultExternalNetworkID := os.Getenv(constants.MetalDefaultExternalNetworkEnvVar)
loadbalancerType := os.Getenv(constants.Loadbalancer)

var (
additionalNetworksString = os.Getenv(constants.MetalAdditionalNetworks)
Expand Down Expand Up @@ -96,18 +95,7 @@ func NewCloud(_ io.Reader) (cloudprovider.Interface, error) {

instancesController := instances.New(defaultExternalNetworkID)
zonesController := zones.New()

var config loadbalancer.LoadBalancerConfig
switch loadbalancerType {
case "metallb":
config = metallb.NewMetalLBConfig()
case "cilium":
config = cilium.NewCiliumConfig()
default:
config = metallb.NewMetalLBConfig()
}

loadBalancerController := loadbalancer.New(partitionID, projectID, clusterID, defaultExternalNetworkID, additionalNetworks, config)
loadBalancerController := loadbalancer.New(partitionID, projectID, clusterID, defaultExternalNetworkID, additionalNetworks)

klog.Info("initialized cloud controller manager")
return &cloud{
Expand All @@ -123,6 +111,7 @@ func (c *cloud) Initialize(clientBuilder cloudprovider.ControllerClientBuilder,
projectID := os.Getenv(constants.MetalProjectIDEnvVar)
sshPublicKey := os.Getenv(constants.MetalSSHPublicKey)
clusterID := os.Getenv(constants.MetalClusterIDEnvVar)
loadbalancerType := os.Getenv(constants.Loadbalancer)

k8sClientSet := clientBuilder.ClientOrDie("cloud-controller-manager")
k8sRestConfig, err := clientBuilder.Config("cloud-controller-manager")
Expand All @@ -147,13 +136,24 @@ func (c *cloud) Initialize(clientBuilder cloudprovider.ControllerClientBuilder,
klog.Fatalf("unable to create k8s client: %v", err)
}

var config loadbalancer.LoadBalancerConfig
switch loadbalancerType {
case "metallb":
config = metallb.NewMetalLBConfig()
case "cilium":
config = cilium.NewCiliumConfig(k8sClientSet)
default:
config = metallb.NewMetalLBConfig()
}

housekeeper := housekeeping.New(metalclient, stop, c.loadBalancer, k8sClientSet, projectID, sshPublicKey, clusterID)
ms := metal.New(metalclient, k8sClientSet, projectID)

c.instances.MetalService = ms
c.loadBalancer.K8sClientSet = k8sClientSet
c.loadBalancer.K8sClient = k8sClient
c.loadBalancer.MetalService = ms
c.loadBalancer.LoadBalancerConfig = config
c.zones.MetalService = ms

go housekeeper.Run()
Expand Down
80 changes: 60 additions & 20 deletions pkg/controllers/loadbalancer/cilium/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@ import (
"fmt"
"strconv"
"strings"
"time"

"github.com/metal-stack/metal-lib/pkg/pointer"
"github.com/metal-stack/metal-lib/pkg/tag"

"github.com/metal-stack/metal-ccm/pkg/controllers/loadbalancer"
"github.com/metal-stack/metal-ccm/pkg/resources/kubernetes"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"github.com/metal-stack/metal-go/api/models"
Expand All @@ -25,20 +29,21 @@ import (
"sigs.k8s.io/yaml"
)

type CiliumConfig struct {
type ciliumConfig struct {
Peers []*Peer `json:"peers,omitempty" yaml:"peers,omitempty"`
AddressPools []*loadbalancer.AddressPool `json:"address-pools,omitempty" yaml:"address-pools,omitempty"`
k8sClient clientset.Interface
}

func NewCiliumConfig() *CiliumConfig {
return &CiliumConfig{}
func NewCiliumConfig(k8sClient clientset.Interface) *ciliumConfig {
return &ciliumConfig{k8sClient: k8sClient}
}

func (cfg *CiliumConfig) Namespace() string {
func (cfg *ciliumConfig) Namespace() string {
return ""
}

func (cfg *CiliumConfig) CalculateConfig(ips []*models.V1IPResponse, nws sets.Set[string], nodes []v1.Node) error {
func (cfg *ciliumConfig) PrepareConfig(ips []*models.V1IPResponse, nws sets.Set[string], nodes []v1.Node) error {
err := cfg.computeAddressPools(ips, nws)
if err != nil {
return err
Expand All @@ -50,7 +55,7 @@ func (cfg *CiliumConfig) CalculateConfig(ips []*models.V1IPResponse, nws sets.Se
return nil
}

func (cfg *CiliumConfig) computeAddressPools(ips []*models.V1IPResponse, nws sets.Set[string]) error {
func (cfg *ciliumConfig) computeAddressPools(ips []*models.V1IPResponse, nws sets.Set[string]) error {
for _, ip := range ips {
if !nws.Has(*ip.Networkid) {
klog.Infof("skipping ip %q: not part of cluster networks", *ip.Ipaddress)
Expand All @@ -62,17 +67,12 @@ func (cfg *CiliumConfig) computeAddressPools(ips []*models.V1IPResponse, nws set
return nil
}

func (cfg *CiliumConfig) computePeers(nodes []v1.Node) error {
func (cfg *ciliumConfig) computePeers(nodes []v1.Node) error {
cfg.Peers = []*Peer{} // we want an empty array of peers and not nil if there are no nodes
for _, n := range nodes {
labels := n.GetLabels()
asnString, ok := labels[tag.MachineNetworkPrimaryASN]
if !ok {
return fmt.Errorf("node %q misses label: %s", n.GetName(), tag.MachineNetworkPrimaryASN)
}
asn, err := strconv.ParseInt(asnString, 10, 64)
asn, err := getASNFromNodeLabels(n)
if err != nil {
return fmt.Errorf("unable to parse valid integer from asn annotation: %w", err)
return err
}

peer, err := newPeer(n, asn)
Expand All @@ -86,7 +86,7 @@ func (cfg *CiliumConfig) computePeers(nodes []v1.Node) error {
return nil
}

func (cfg *CiliumConfig) getOrCreateAddressPool(poolName string) *loadbalancer.AddressPool {
func (cfg *ciliumConfig) getOrCreateAddressPool(poolName string) *loadbalancer.AddressPool {
for _, pool := range cfg.AddressPools {
if pool.Name == poolName {
return pool
Expand All @@ -99,7 +99,7 @@ func (cfg *CiliumConfig) getOrCreateAddressPool(poolName string) *loadbalancer.A
return pool
}

func (cfg *CiliumConfig) addIPToPool(network string, ip models.V1IPResponse) {
func (cfg *ciliumConfig) addIPToPool(network string, ip models.V1IPResponse) {
t := ip.Type
poolType := models.V1IPBaseTypeEphemeral
if t != nil && *t == models.V1IPBaseTypeStatic {
Expand All @@ -110,15 +110,15 @@ func (cfg *CiliumConfig) addIPToPool(network string, ip models.V1IPResponse) {
pool.AppendIP(*ip.Ipaddress)
}

func (cfg *CiliumConfig) ToYAML() (string, error) {
func (cfg *ciliumConfig) ToYAML() (string, error) {
bb, err := yaml.Marshal(cfg)
if err != nil {
return "", err
}
return string(bb), nil
}

func (cfg *CiliumConfig) WriteCRs(ctx context.Context, c client.Client) error {
func (cfg *ciliumConfig) WriteCRs(ctx context.Context, c client.Client) error {
err := cfg.writeCiliumBGPPeeringPolicies(ctx, c)
if err != nil {
return fmt.Errorf("failed to write ciliumbgppeeringpolicy resources %w", err)
Expand All @@ -132,7 +132,7 @@ func (cfg *CiliumConfig) WriteCRs(ctx context.Context, c client.Client) error {
return nil
}

func (cfg *CiliumConfig) writeCiliumBGPPeeringPolicies(ctx context.Context, c client.Client) error {
func (cfg *ciliumConfig) writeCiliumBGPPeeringPolicies(ctx context.Context, c client.Client) error {
existingPolicies := ciliumv2alpha1.CiliumBGPPeeringPolicyList{}
err := c.List(ctx, &existingPolicies)
if err != nil {
Expand Down Expand Up @@ -205,7 +205,7 @@ func (cfg *CiliumConfig) writeCiliumBGPPeeringPolicies(ctx context.Context, c cl
return nil
}

func (cfg *CiliumConfig) writeCiliumLoadBalancerIPPools(ctx context.Context, c client.Client) error {
func (cfg *ciliumConfig) writeCiliumLoadBalancerIPPools(ctx context.Context, c client.Client) error {
existingPools := ciliumv2alpha1.CiliumLoadBalancerIPPoolList{}
err := c.List(ctx, &existingPools)
if err != nil {
Expand Down Expand Up @@ -261,3 +261,43 @@ func (cfg *CiliumConfig) writeCiliumLoadBalancerIPPools(ctx context.Context, c c

return nil
}

func (cfg *ciliumConfig) writeNodeAnnotations() error {

Check failure on line 265 in pkg/controllers/loadbalancer/cilium/config.go

View workflow job for this annotation

GitHub Actions / Docker Build

func `(*ciliumConfig).writeNodeAnnotations` is unused (unused)
nodes, err := kubernetes.GetNodes(cfg.k8sClient)
if err != nil {
return fmt.Errorf("failed to write node annotations: %w", err)
}
backoff := wait.Backoff{
Steps: 20,
Duration: 50 * time.Millisecond,
Jitter: 1.0,
}
for _, n := range nodes {
asn, err := getASNFromNodeLabels(n)
if err != nil {
return fmt.Errorf("failed to write node annotations for node %s: %w", n.Name, err)
}
annotations := map[string]string{
fmt.Sprintf("cilium.io/bgp-virtual-router.%d", asn): "router-id=127.0.0.1",
}
err = kubernetes.UpdateNodeAnnotationsWithBackoff(cfg.k8sClient, n.Name, annotations, backoff)
if err != nil {
return fmt.Errorf("failed to write node annotations for node %s: %w", n.Name, err)
}
}

return nil
}

func getASNFromNodeLabels(node v1.Node) (int64, error) {
labels := node.GetLabels()
asnString, ok := labels[tag.MachineNetworkPrimaryASN]
if !ok {
return 0, fmt.Errorf("node %q misses label: %s", node.GetName(), tag.MachineNetworkPrimaryASN)
}
asn, err := strconv.ParseInt(asnString, 10, 64)
if err != nil {
return 0, fmt.Errorf("unable to parse valid integer from asn annotation: %w", err)
}
return asn, nil
}
4 changes: 2 additions & 2 deletions pkg/controllers/loadbalancer/cilium/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,9 @@ func TestCiliumConfig_CalculateConfig(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
cfg := &CiliumConfig{}
cfg := &ciliumConfig{}

err := cfg.CalculateConfig(tt.ips, tt.nws, tt.nodes)
err := cfg.PrepareConfig(tt.ips, tt.nws, tt.nodes)
if diff := cmp.Diff(err, tt.wantErr); diff != "" {
t.Errorf("CiliumConfig.CalculateConfig() error = %v", diff)
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/loadbalancer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ import (

type LoadBalancerConfig interface {
Namespace() string
CalculateConfig(ips []*models.V1IPResponse, nws sets.Set[string], nodes []v1.Node) error
PrepareConfig(ips []*models.V1IPResponse, nws sets.Set[string], nodes []v1.Node) error
WriteCRs(ctx context.Context, c client.Client) error
}
5 changes: 2 additions & 3 deletions pkg/controllers/loadbalancer/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ type LoadBalancerController struct {
}

// New returns a new load balancer controller that satisfies the kubernetes cloud provider load balancer interface
func New(partitionID, projectID, clusterID, defaultExternalNetworkID string, additionalNetworks []string, config LoadBalancerConfig) *LoadBalancerController {
func New(partitionID, projectID, clusterID, defaultExternalNetworkID string, additionalNetworks []string) *LoadBalancerController {
return &LoadBalancerController{
LoadBalancerConfig: config,
partitionID: partitionID,
projectID: projectID,
clusterID: clusterID,
Expand Down Expand Up @@ -342,7 +341,7 @@ func (l *LoadBalancerController) updateLoadBalancerConfig(ctx context.Context, n
return fmt.Errorf("could not find ips of this project's cluster: %w", err)
}

err = l.LoadBalancerConfig.CalculateConfig(ips, l.additionalNetworks, nodes)
err = l.LoadBalancerConfig.PrepareConfig(ips, l.additionalNetworks, nodes)
if err != nil {
return err
}
Expand Down
26 changes: 13 additions & 13 deletions pkg/controllers/loadbalancer/metallb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,23 @@ const (
metallbNamespace = "metallb-system"
)

// MetalLBConfig is a struct containing a config for metallb
type MetalLBConfig struct {
// metalLBConfig is a struct containing a config for metallb
type metalLBConfig struct {
Peers []*Peer `json:"peers,omitempty" yaml:"peers,omitempty"`
AddressPools []*loadbalancer.AddressPool `json:"address-pools,omitempty" yaml:"address-pools,omitempty"`
namespace string
}

func NewMetalLBConfig() *MetalLBConfig {
return &MetalLBConfig{namespace: metallbNamespace}
func NewMetalLBConfig() *metalLBConfig {
return &metalLBConfig{namespace: metallbNamespace}
}

func (cfg *MetalLBConfig) Namespace() string {
func (cfg *metalLBConfig) Namespace() string {
return cfg.namespace
}

// CalculateConfig computes the metallb config from given parameter input.
func (cfg *MetalLBConfig) CalculateConfig(ips []*models.V1IPResponse, nws sets.Set[string], nodes []v1.Node) error {
func (cfg *metalLBConfig) PrepareConfig(ips []*models.V1IPResponse, nws sets.Set[string], nodes []v1.Node) error {
err := cfg.computeAddressPools(ips, nws)
if err != nil {
return err
Expand All @@ -57,7 +57,7 @@ func (cfg *MetalLBConfig) CalculateConfig(ips []*models.V1IPResponse, nws sets.S
return nil
}

func (cfg *MetalLBConfig) computeAddressPools(ips []*models.V1IPResponse, nws sets.Set[string]) error {
func (cfg *metalLBConfig) computeAddressPools(ips []*models.V1IPResponse, nws sets.Set[string]) error {
for _, ip := range ips {
if !nws.Has(*ip.Networkid) {
klog.Infof("skipping ip %q: not part of cluster networks", *ip.Ipaddress)
Expand All @@ -69,7 +69,7 @@ func (cfg *MetalLBConfig) computeAddressPools(ips []*models.V1IPResponse, nws se
return nil
}

func (cfg *MetalLBConfig) computePeers(nodes []v1.Node) error {
func (cfg *metalLBConfig) computePeers(nodes []v1.Node) error {
cfg.Peers = []*Peer{} // we want an empty array of peers and not nil if there are no nodes
for _, n := range nodes {
labels := n.GetLabels()
Expand All @@ -95,7 +95,7 @@ func (cfg *MetalLBConfig) computePeers(nodes []v1.Node) error {

// getOrCreateAddressPool returns the address pool of the given network.
// It will be created if it does not exist yet.
func (cfg *MetalLBConfig) getOrCreateAddressPool(poolName string) *loadbalancer.AddressPool {
func (cfg *metalLBConfig) getOrCreateAddressPool(poolName string) *loadbalancer.AddressPool {
for _, pool := range cfg.AddressPools {
if pool.Name == poolName {
return pool
Expand All @@ -109,7 +109,7 @@ func (cfg *MetalLBConfig) getOrCreateAddressPool(poolName string) *loadbalancer.
}

// announceIPs appends the given IPs to the network address pools.
func (cfg *MetalLBConfig) addIPToPool(network string, ip models.V1IPResponse) {
func (cfg *metalLBConfig) addIPToPool(network string, ip models.V1IPResponse) {
t := ip.Type
poolType := models.V1IPBaseTypeEphemeral
if t != nil && *t == models.V1IPBaseTypeStatic {
Expand All @@ -121,16 +121,16 @@ func (cfg *MetalLBConfig) addIPToPool(network string, ip models.V1IPResponse) {
}

// ToYAML returns this config in YAML format.
func (cfg *MetalLBConfig) ToYAML() (string, error) {
func (cfg *metalLBConfig) ToYAML() (string, error) {
bb, err := yaml.Marshal(cfg)
if err != nil {
return "", err
}
return string(bb), nil
}

// Write inserts or updates the Metal-LB custom resources.
func (cfg *MetalLBConfig) WriteCRs(ctx context.Context, c client.Client) error {
// WriteCRs inserts or updates the Metal-LB custom resources.
func (cfg *metalLBConfig) WriteCRs(ctx context.Context, c client.Client) error {

// BGPPeers
bgpPeerList := metallbv1beta2.BGPPeerList{}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/loadbalancer/metallb/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,9 @@ func TestMetalLBConfig_CalculateConfig(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
cfg := &MetalLBConfig{}
cfg := &metalLBConfig{}

err := cfg.CalculateConfig(tt.ips, tt.nws, tt.nodes)
err := cfg.PrepareConfig(tt.ips, tt.nws, tt.nodes)
if diff := cmp.Diff(err, tt.wantErr); diff != "" {
t.Errorf("MetalLBConfig.CalculateConfig() error = %v", diff)
return
Expand Down
18 changes: 18 additions & 0 deletions pkg/resources/kubernetes/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,24 @@ func UpdateNodeLabelsWithBackoff(client clientset.Interface, nodeName string, la
})
}

// UpdateNodeAnnotationsWithBackoff updates labels on a given node with a given backoff retry.
func UpdateNodeAnnotationsWithBackoff(client clientset.Interface, nodeName string, annotations map[string]string, backoff wait.Backoff) error {
return retry.RetryOnConflict(backoff, func() error {

node, err := client.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{})
if err != nil {
return err
}

for key, value := range annotations {
node.Annotations[key] = value
}

_, err = client.CoreV1().Nodes().Update(context.Background(), node, metav1.UpdateOptions{})
return err
})
}

// NodeNamesOfNodes returns the node names of the nodes
func NodeNamesOfNodes(nodes []v1.Node) string {
var nn []string
Expand Down

0 comments on commit 8879246

Please sign in to comment.