From 8179501dd56dde02a5eafa4560f2354a0174d858 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Sch=C3=A4fer?= Date: Wed, 11 May 2022 11:58:03 +0200 Subject: [PATCH] Rip out the port cache For a while now (potentially since ever), ch-k8s-lbaas has been known for losing floating IPs. We were so far unable to track it down---the logs were always inconclusive and it wasn't fully clear where the issue might come from. As "cache invalidation" is one of the two hard problems in computer science (next to naming things and off-by-one errors), it seems most sensible to rip out the port cache. With the typical cluster size, it should not be necessary to have a cache. In addition, using a cache is in direct contradiction to acting on the current state (even though API calls also always return stale data, but at least it's less stale than your average cache), which we normally want to do in the Kubernetes world. So let's try this, for a change. --- internal/openstack/cache.go | 164 ----------------------------- internal/openstack/port_manager.go | 16 ++- internal/openstack/ports.go | 102 ++++++++++++++++++ 3 files changed, 108 insertions(+), 174 deletions(-) delete mode 100644 internal/openstack/cache.go create mode 100644 internal/openstack/ports.go diff --git a/internal/openstack/cache.go b/internal/openstack/cache.go deleted file mode 100644 index 39bc9a4..0000000 --- a/internal/openstack/cache.go +++ /dev/null @@ -1,164 +0,0 @@ -/* Copyright 2020 CLOUD&HEAT Technologies GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package openstack - -import ( - "time" - - "github.com/gophercloud/gophercloud" - floatingipsv2 "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/layer3/floatingips" - portsv2 "github.com/gophercloud/gophercloud/openstack/networking/v2/ports" - "github.com/gophercloud/gophercloud/pagination" - - "k8s.io/klog" -) - -type CachedPort struct { - Port portsv2.Port - FloatingIP *floatingipsv2.FloatingIP -} - -type SimplePortCache struct { - client *gophercloud.ServiceClient - tag string - useFloatingIPs bool - validUntil time.Time - ttl time.Duration - ports map[string]*CachedPort -} - -type PortCache interface { - GetPorts() ([]*portsv2.Port, error) - GetPortByID(ID string) (*portsv2.Port, *floatingipsv2.FloatingIP, error) - Invalidate() -} - -func (client *OpenStackClient) NewPortCache(ttl time.Duration, tag string, useFloatingIPs bool) (*SimplePortCache, error) { - networkingclient, err := client.NewNetworkV2() - if err != nil { - return nil, err - } - - return NewPortCache(networkingclient, ttl, tag, useFloatingIPs), nil -} - -func NewPortCache(networkingclient *gophercloud.ServiceClient, ttl time.Duration, tag string, useFloatingIPs bool) *SimplePortCache { - return &SimplePortCache{ - client: networkingclient, - ttl: ttl, - tag: tag, - useFloatingIPs: useFloatingIPs, - } -} - -func (pc *SimplePortCache) GetPorts() ([]*portsv2.Port, error) { - err := pc.refreshIfInvalid() - if err != nil { - return nil, err - } - - result := make([]*portsv2.Port, len(pc.ports)) - i := 0 - for _, v := range pc.ports { - result[i] = &v.Port - i += 1 - } - return result, nil -} - -func (pc *SimplePortCache) GetPortByID(ID string) (*portsv2.Port, *floatingipsv2.FloatingIP, error) { - err := pc.refreshIfInvalid() - if err != nil { - return nil, nil, err - } - - port, ok := pc.ports[ID] - if !ok { - return nil, nil, nil - } - return &port.Port, port.FloatingIP, nil -} - -func (pc *SimplePortCache) refreshIfInvalid() error { - now := time.Now() - if now.Before(pc.validUntil) { - klog.V(5).Infof("not refreshing port cache, it is still valid until %s", pc.validUntil) - return nil - } - return pc.forceRefresh() -} - -func (pc *SimplePortCache) forceRefresh() error { - // port ID -> FloatingIP, *NOT* floating IP ID -> FloatingIP - var fipEntries map[string]*floatingipsv2.FloatingIP = nil - if pc.useFloatingIPs { - fipEntries = make(map[string]*floatingipsv2.FloatingIP) - err := floatingipsv2.List( - pc.client, - floatingipsv2.ListOpts{Tags: pc.tag}, - ).EachPage(func(page pagination.Page) (bool, error) { - fips, err := floatingipsv2.ExtractFloatingIPs(page) - if err != nil { - return false, err - } - for _, fip := range fips { - if fip.PortID == "" { - continue - } - fipCopy := &floatingipsv2.FloatingIP{} - *fipCopy = fip - fipEntries[fip.PortID] = fipCopy - } - return true, nil - }) - // if floating IPs are enabled, not being able to fetch them is actually - // fatal for the refresh operation. - if err != nil { - return err - } - } - - entries := make(map[string]*CachedPort) - err := portsv2.List( - pc.client, - portsv2.ListOpts{Tags: pc.tag}, - ).EachPage(func(page pagination.Page) (bool, error) { - ports, err := portsv2.ExtractPorts(page) - if err != nil { - return false, err - } - for _, port := range ports { - portEntry := &CachedPort{ - Port: port, - } - if fipEntries != nil { - portEntry.FloatingIP = fipEntries[port.ID] - } - entries[port.ID] = portEntry - } - return true, nil - }) - if err == nil { - klog.V(5).Infof("successfully refreshed port cache. found %d ports", len(entries)) - pc.ports = entries - pc.validUntil = time.Now().Add(pc.ttl) - } - return err -} - -func (pc *SimplePortCache) Invalidate() { - // set to zero time -> expired immediately - pc.validUntil = time.Time{} -} diff --git a/internal/openstack/port_manager.go b/internal/openstack/port_manager.go index 22ef13a..eb9a445 100644 --- a/internal/openstack/port_manager.go +++ b/internal/openstack/port_manager.go @@ -16,7 +16,6 @@ package openstack import ( "errors" - "time" "github.com/gophercloud/gophercloud" tags "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/attributestags" @@ -75,7 +74,7 @@ type OpenStackL3PortManager struct { client *gophercloud.ServiceClient networkID string cfg *NetworkingOpts - cache PortCache + ports PortClient } func (client *OpenStackClient) NewOpenStackL3PortManager(networkConfig *NetworkingOpts) (*OpenStackL3PortManager, error) { @@ -95,9 +94,8 @@ func (client *OpenStackClient) NewOpenStackL3PortManager(networkConfig *Networki client: networkingclient, cfg: networkConfig, networkID: networkID, - cache: NewPortCache( + ports: NewPortClient( networkingclient, - 30*time.Second, TagLBManagedPort, networkConfig.UseFloatingIPs, ), @@ -194,7 +192,6 @@ func (pm *OpenStackL3PortManager) ProvisionPort() (string, error) { } } - pm.cache.Invalidate() return port.ID, nil } @@ -238,7 +235,7 @@ func (pm *OpenStackL3PortManager) deleteUnusedFloatingIPs() error { } func (pm *OpenStackL3PortManager) CleanUnusedPorts(usedPorts []string) error { - ports, err := pm.cache.GetPorts() + ports, err := pm.ports.GetPorts() klog.Infof("Used ports=%q", usedPorts) if err != nil { return err @@ -265,14 +262,13 @@ func (pm *OpenStackL3PortManager) CleanUnusedPorts(usedPorts []string) error { } if anyDeleted { - pm.cache.Invalidate() return pm.deleteUnusedFloatingIPs() } return nil } func (pm *OpenStackL3PortManager) GetAvailablePorts() ([]string, error) { - ports, err := pm.cache.GetPorts() + ports, err := pm.ports.GetPorts() if err != nil { return nil, err } @@ -287,7 +283,7 @@ func (pm *OpenStackL3PortManager) GetAvailablePorts() ([]string, error) { } func (pm *OpenStackL3PortManager) GetExternalAddress(portID string) (string, string, error) { - port, fip, err := pm.cache.GetPortByID(portID) + port, fip, err := pm.ports.GetPortByID(portID) if err != nil { return "", "", err } @@ -313,7 +309,7 @@ func (pm *OpenStackL3PortManager) GetExternalAddress(portID string) (string, str } func (pm *OpenStackL3PortManager) GetInternalAddress(portID string) (string, error) { - port, _, err := pm.cache.GetPortByID(portID) + port, _, err := pm.ports.GetPortByID(portID) if err != nil { return "", err } diff --git a/internal/openstack/ports.go b/internal/openstack/ports.go new file mode 100644 index 0000000..9007ef6 --- /dev/null +++ b/internal/openstack/ports.go @@ -0,0 +1,102 @@ +/* Copyright 2020 CLOUD&HEAT Technologies GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package openstack + +import ( + "github.com/gophercloud/gophercloud" + floatingipsv2 "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/layer3/floatingips" + portsv2 "github.com/gophercloud/gophercloud/openstack/networking/v2/ports" + "github.com/gophercloud/gophercloud/pagination" + + "k8s.io/klog" +) + +type CachedPort struct { + Port portsv2.Port + FloatingIP *floatingipsv2.FloatingIP +} + +type UncachedClient struct { + client *gophercloud.ServiceClient + tag string + useFloatingIPs bool +} + +type PortClient interface { + GetPorts() ([]*portsv2.Port, error) + GetPortByID(ID string) (*portsv2.Port, *floatingipsv2.FloatingIP, error) +} + +func NewPortClient(networkingclient *gophercloud.ServiceClient, tag string, useFloatingIPs bool) *UncachedClient { + return &UncachedClient{ + client: networkingclient, + tag: tag, + useFloatingIPs: useFloatingIPs, + } +} + +func (pc *UncachedClient) GetPorts() (ports []*portsv2.Port, err error) { + err = portsv2.List( + pc.client, + portsv2.ListOpts{Tags: pc.tag}, + ).EachPage(func(page pagination.Page) (bool, error) { + fetched_ports, err := portsv2.ExtractPorts(page) + if err != nil { + return false, err + } + for _, found_port := range fetched_ports { + ports = append(ports, &found_port) + } + return true, nil + }) + if err != nil { + return nil, err + } + return ports, err +} + +func (pc *UncachedClient) GetPortByID(ID string) (port *portsv2.Port, fip *floatingipsv2.FloatingIP, err error) { + port, err = portsv2.Get( + pc.client, + ID, + ).Extract() + if err != nil { + return nil, nil, err + } + + if pc.useFloatingIPs { + err = floatingipsv2.List( + pc.client, + floatingipsv2.ListOpts{Tags: pc.tag, PortID: ID}, + ).EachPage(func(page pagination.Page) (bool, error) { + fips, err := floatingipsv2.ExtractFloatingIPs(page) + if err != nil { + return false, err + } + for _, found_fip := range fips { + if fip != nil { + // TODO: warn here?! + klog.Warningf("Found multiple floating IPs for port %s (%s and %s at least)", ID, fip.ID, found_fip.ID) + } + fip = &found_fip + } + return true, nil + }) + if err != nil { + return nil, nil, err + } + } + return port, fip, nil +}