Skip to content

Commit

Permalink
support multi region cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
sergelogvinov committed May 15, 2024
1 parent 2f186d6 commit 46aebfb
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ The options in `Global` section are used for openstack-cloud-controller-manager
Keystone user password. If you are using [Keystone application credential](https://docs.openstack.org/keystone/latest/user/application_credentials.html), this option is not required.
* `region`
Required. Keystone region name.
* `regions`
Optional. Keystone region name, which is used to specify regions for the cloud provider where the instance is running. Region is default region name. Can be specified multiple times.
* `domain-id`
Keystone user domain ID. If you are using [Keystone application credential](https://docs.openstack.org/keystone/latest/user/application_credentials.html), this option is not required.
* `domain-name`
Expand Down Expand Up @@ -207,7 +209,7 @@ Although the openstack-cloud-controller-manager was initially implemented with N
* `ROUND_ROBIN` (default)
* `LEAST_CONNECTIONS`
* `SOURCE_IP`
If `lb-provider` is set to "ovn" the value must be set to `SOURCE_IP_PORT`.
* `lb-provider`
Expand Down Expand Up @@ -300,7 +302,7 @@ Although the openstack-cloud-controller-manager was initially implemented with N
call](https://docs.openstack.org/api-ref/load-balancer/v2/?expanded=create-a-load-balancer-detail#creating-a-fully-populated-load-balancer).
Setting this option to true will create loadbalancers using serial API calls which first create an unpopulated
loadbalancer, then populate its listeners, pools and members. This is a compatibility option at the expense of
increased load on the OpenStack API. Default: false
increased load on the OpenStack API. Default: false
NOTE:
Expand Down
16 changes: 16 additions & 0 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type AuthOpts struct {
UserDomainID string `gcfg:"user-domain-id" mapstructure:"user-domain-id" name:"os-userDomainID" value:"optional"`
UserDomainName string `gcfg:"user-domain-name" mapstructure:"user-domain-name" name:"os-userDomainName" value:"optional"`
Region string `name:"os-region"`
Regions []string `name:"os-regions" value:"optional"`
EndpointType gophercloud.Availability `gcfg:"os-endpoint-type" mapstructure:"os-endpoint-type" name:"os-endpointType" value:"optional"`
CAFile string `gcfg:"ca-file" mapstructure:"ca-file" name:"os-certAuthorityPath" value:"optional"`
TLSInsecure string `gcfg:"tls-insecure" mapstructure:"tls-insecure" name:"os-TLSInsecure" value:"optional" matches:"^true|false$"`
Expand Down Expand Up @@ -88,6 +89,7 @@ func LogCfg(authOpts AuthOpts) {
klog.V(5).Infof("UserDomainID: %s", authOpts.UserDomainID)
klog.V(5).Infof("UserDomainName: %s", authOpts.UserDomainName)
klog.V(5).Infof("Region: %s", authOpts.Region)
klog.V(5).Infof("Regions: %s", authOpts.Regions)
klog.V(5).Infof("EndpointType: %s", authOpts.EndpointType)
klog.V(5).Infof("CAFile: %s", authOpts.CAFile)
klog.V(5).Infof("CertFile: %s", authOpts.CertFile)
Expand Down Expand Up @@ -233,6 +235,20 @@ func ReadClouds(authOpts *AuthOpts) error {
authOpts.ApplicationCredentialName = replaceEmpty(authOpts.ApplicationCredentialName, cloud.AuthInfo.ApplicationCredentialName)
authOpts.ApplicationCredentialSecret = replaceEmpty(authOpts.ApplicationCredentialSecret, cloud.AuthInfo.ApplicationCredentialSecret)

regions := strings.Split(authOpts.Region, ",")
if len(regions) > 1 {
authOpts.Region = regions[0]
}

for _, r := range cloud.Regions {
// Support only single auth section in clouds.yaml
if r.Values.AuthInfo == nil && r.Name != authOpts.Region {
regions = append(regions, r.Name)
}
}

authOpts.Regions = regions

return nil
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/csi/cinder/openstack/openstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ func GetConfigFromFiles(configFilePaths []string) (Config, error) {
klog.V(5).Infof("Credentials are loaded from %s:", cfg.Global.CloudsFile)
}

if len(cfg.Global.Regions) == 0 {
cfg.Global.Regions = []string{cfg.Global.Region}
}

return cfg, nil
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/csi/cinder/openstack/openstack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ rescan-on-resize=true`
expectedOpts.Global.CAFile = fakeCAfile
expectedOpts.Global.TenantID = fakeTenantID
expectedOpts.Global.Region = fakeRegion
expectedOpts.Global.Regions = []string{fakeRegion}
expectedOpts.BlockStorage.RescanOnResize = true

// Invoke GetConfigFromFiles
Expand Down Expand Up @@ -155,6 +156,7 @@ rescan-on-resize=true`
expectedOpts.Global.CAFile = fakeCAfile
expectedOpts.Global.TenantID = fakeTenantID
expectedOpts.Global.Region = fakeRegion
expectedOpts.Global.Regions = []string{fakeRegion}
expectedOpts.Global.EndpointType = gophercloud.AvailabilityPublic
expectedOpts.Global.UseClouds = true
expectedOpts.Global.CloudsFile = wd + "/fixtures/clouds.yaml"
Expand Down
174 changes: 120 additions & 54 deletions pkg/openstack/instancesv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"fmt"
sysos "os"
"slices"
"strings"

"github.com/gophercloud/gophercloud"
"github.com/gophercloud/gophercloud/openstack/compute/v2/servers"
Expand All @@ -33,9 +35,9 @@ import (

// InstancesV2 encapsulates an implementation of InstancesV2 for OpenStack.
type InstancesV2 struct {
compute *gophercloud.ServiceClient
network *gophercloud.ServiceClient
region string
compute map[string]*gophercloud.ServiceClient
network map[string]*gophercloud.ServiceClient
regions []string
regionProviderID bool
networkingOpts NetworkingOpts
}
Expand All @@ -51,16 +53,25 @@ func (os *OpenStack) InstancesV2() (cloudprovider.InstancesV2, bool) {
func (os *OpenStack) instancesv2() (*InstancesV2, bool) {
klog.V(4).Info("openstack.Instancesv2() called")

compute, err := client.NewComputeV2(os.provider, os.epOpts)
if err != nil {
klog.Errorf("unable to access compute v2 API : %v", err)
return nil, false
}
var err error
compute := make(map[string]*gophercloud.ServiceClient, len(os.regions))
network := make(map[string]*gophercloud.ServiceClient, len(os.regions))

network, err := client.NewNetworkV2(os.provider, os.epOpts)
if err != nil {
klog.Errorf("unable to access network v2 API : %v", err)
return nil, false
for _, region := range os.regions {
opt := os.epOpts
opt.Region = region

compute[region], err = client.NewComputeV2(os.provider, opt)
if err != nil {
klog.Errorf("unable to access compute v2 API : %v", err)
return nil, false
}

network[region], err = client.NewNetworkV2(os.provider, opt)
if err != nil {
klog.Errorf("unable to access network v2 API : %v", err)
return nil, false
}
}

regionalProviderID := false
Expand All @@ -71,17 +82,23 @@ func (os *OpenStack) instancesv2() (*InstancesV2, bool) {
return &InstancesV2{
compute: compute,
network: network,
region: os.epOpts.Region,
regions: os.regions,
regionProviderID: regionalProviderID,
networkingOpts: os.networkingOpts,
}, true
}

// InstanceExists indicates whether a given node exists according to the cloud provider
func (i *InstancesV2) InstanceExists(ctx context.Context, node *v1.Node) (bool, error) {
_, err := i.getInstance(ctx, node)
klog.V(4).InfoS("openstack.InstanceExists() called", "node", klog.KObj(node),
"providerID", node.Spec.ProviderID,
"region", node.Labels[v1.LabelTopologyRegion])

_, _, err := i.getInstance(ctx, node)
if err == cloudprovider.InstanceNotFound {
klog.V(6).Infof("instance not found for node: %s", node.Name)
klog.V(6).InfoS("Node is not found in cloud provider", "node", klog.KObj(node),
"providerID", node.Spec.ProviderID,
"region", node.Labels[v1.LabelTopologyRegion])
return false, nil
}

Expand All @@ -94,7 +111,11 @@ func (i *InstancesV2) InstanceExists(ctx context.Context, node *v1.Node) (bool,

// InstanceShutdown returns true if the instance is shutdown according to the cloud provider.
func (i *InstancesV2) InstanceShutdown(ctx context.Context, node *v1.Node) (bool, error) {
server, err := i.getInstance(ctx, node)
klog.V(4).InfoS("openstack.InstanceShutdown() called", "node", klog.KObj(node),
"providerID", node.Spec.ProviderID,
"region", node.Labels[v1.LabelTopologyRegion])

server, _, err := i.getInstance(ctx, node)
if err != nil {
return false, err
}
Expand All @@ -109,7 +130,7 @@ func (i *InstancesV2) InstanceShutdown(ctx context.Context, node *v1.Node) (bool

// InstanceMetadata returns the instance's metadata.
func (i *InstancesV2) InstanceMetadata(ctx context.Context, node *v1.Node) (*cloudprovider.InstanceMetadata, error) {
srv, err := i.getInstance(ctx, node)
srv, region, err := i.getInstance(ctx, node)
if err != nil {
return nil, err
}
Expand All @@ -118,79 +139,124 @@ func (i *InstancesV2) InstanceMetadata(ctx context.Context, node *v1.Node) (*clo
server = *srv
}

instanceType, err := srvInstanceType(i.compute, &server.Server)
instanceType, err := srvInstanceType(i.compute[region], &server.Server)
if err != nil {
return nil, err
}

ports, err := getAttachedPorts(i.network, server.ID)
ports, err := getAttachedPorts(i.network[region], server.ID)
if err != nil {
return nil, err
}

addresses, err := nodeAddresses(&server.Server, ports, i.network, i.networkingOpts)
addresses, err := nodeAddresses(&server.Server, ports, i.network[region], i.networkingOpts)
if err != nil {
return nil, err
}

return &cloudprovider.InstanceMetadata{
ProviderID: i.makeInstanceID(&server.Server),
ProviderID: i.makeInstanceID(&server.Server, region),
InstanceType: instanceType,
NodeAddresses: addresses,
Zone: server.AvailabilityZone,
Region: i.region,
Region: region,
}, nil
}

func (i *InstancesV2) makeInstanceID(srv *servers.Server) string {
func (i *InstancesV2) makeInstanceID(srv *servers.Server, region string) string {
if i.regionProviderID {
return fmt.Sprintf("%s://%s/%s", ProviderName, i.region, srv.ID)
return fmt.Sprintf("%s://%s/%s", ProviderName, region, srv.ID)
}
return fmt.Sprintf("%s:///%s", ProviderName, srv.ID)
}

func (i *InstancesV2) getInstance(ctx context.Context, node *v1.Node) (*ServerAttributesExt, error) {
if node.Spec.ProviderID == "" {
opt := servers.ListOpts{
Name: fmt.Sprintf("^%s$", node.Name),
func (i *InstancesV2) getInstance(ctx context.Context, node *v1.Node) (*ServerAttributesExt, string, error) {
klog.V(4).InfoS("openstack.getInstance() called", "node", klog.KObj(node),
"providerID", node.Spec.ProviderID,
"region", node.Labels[v1.LabelTopologyRegion])

instanceID, instanceRegion, err := instanceIDFromProviderID(node.Spec.ProviderID)
if err == nil && instanceRegion != "" {
if slices.Contains(i.regions, instanceRegion) {
return i.getInstanceByID(instanceID, []string{instanceRegion})
}

return nil, "", fmt.Errorf("getInstance: ProviderID \"%s\" didn't match supported regions \"%s\"", node.Spec.ProviderID, strings.Join(i.regions, ","))
}

// At this point we know that ProviderID is not properly set or it doesn't contain region information
// We need to search for the instance in all regions
var searchRegions []string

// We cannot trust the region label, so we need to check the region
instanceRegion = node.Labels[v1.LabelTopologyRegion]
if slices.Contains(i.regions, instanceRegion) {
searchRegions = []string{instanceRegion}
}

for _, r := range i.regions {
if r != instanceRegion {
searchRegions = append(searchRegions, r)
}
mc := metrics.NewMetricContext("server", "list")
allPages, err := servers.List(i.compute, opt).AllPages()
}

klog.V(6).InfoS("openstack.getInstance() trying to find the instance in regions", "node", klog.KObj(node),
"instanceID", instanceID,
"regions", strings.Join(searchRegions, ","))

if instanceID == "" {
return i.getInstanceByName(node, searchRegions)
}

return i.getInstanceByID(instanceID, searchRegions)
}

func (i *InstancesV2) getInstanceByID(instanceID string, searchRegions []string) (*ServerAttributesExt, string, error) {
server := ServerAttributesExt{}

mc := metrics.NewMetricContext("server", "get")
for _, r := range searchRegions {
err := servers.Get(i.compute[r], instanceID).ExtractInto(&server)
if mc.ObserveRequest(err) != nil {
return nil, fmt.Errorf("error listing servers %v: %v", opt, err)
if errors.IsNotFound(err) {
continue
}

return nil, "", err
}

return &server, r, nil
}

return nil, "", cloudprovider.InstanceNotFound
}

func (i *InstancesV2) getInstanceByName(node *v1.Node, searchRegions []string) (*ServerAttributesExt, string, error) {
opt := servers.ListOpts{
Name: fmt.Sprintf("^%s$", node.Name),
}
mc := metrics.NewMetricContext("server", "list")
serverList := []ServerAttributesExt{}

for _, r := range searchRegions {
allPages, err := servers.List(i.compute[r], opt).AllPages()
if mc.ObserveRequest(err) != nil {
return nil, "", fmt.Errorf("error listing servers %v: %v", opt, err)
}

serverList := []ServerAttributesExt{}
err = servers.ExtractServersInto(allPages, &serverList)
if err != nil {
return nil, fmt.Errorf("error extracting servers from pages: %v", err)
return nil, "", fmt.Errorf("error extracting servers from pages: %v", err)
}
if len(serverList) == 0 {
return nil, cloudprovider.InstanceNotFound
continue
}
if len(serverList) > 1 {
return nil, fmt.Errorf("getInstance: multiple instances found")
return nil, "", fmt.Errorf("getInstanceByName: multiple instances found")
}
return &serverList[0], nil
}

instanceID, instanceRegion, err := instanceIDFromProviderID(node.Spec.ProviderID)
if err != nil {
return nil, err
}

if instanceRegion != "" && instanceRegion != i.region {
return nil, fmt.Errorf("ProviderID \"%s\" didn't match supported region \"%s\"", node.Spec.ProviderID, i.region)
return &serverList[0], r, nil
}

server := ServerAttributesExt{}
mc := metrics.NewMetricContext("server", "get")
err = servers.Get(i.compute, instanceID).ExtractInto(&server)
if mc.ObserveRequest(err) != nil {
if errors.IsNotFound(err) {
return nil, cloudprovider.InstanceNotFound
}
return nil, err
}
return &server, nil
return nil, "", cloudprovider.InstanceNotFound
}
9 changes: 8 additions & 1 deletion pkg/openstack/openstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog/v2"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -159,6 +159,7 @@ type ServerAttributesExt struct {

// OpenStack is an implementation of cloud provider Interface for OpenStack.
type OpenStack struct {
regions []string
provider *gophercloud.ProviderClient
epOpts *gophercloud.EndpointOpts
lbOpts LoadBalancerOpts
Expand Down Expand Up @@ -257,6 +258,11 @@ func ReadConfig(config io.Reader) (Config, error) {
klog.V(5).Infof("Config, loaded from the %s:", cfg.Global.CloudsFile)
client.LogCfg(cfg.Global)
}

if len(cfg.Global.Regions) == 0 {
cfg.Global.Regions = []string{cfg.Global.Region}
}

// Set the default values for search order if not set
if cfg.Metadata.SearchOrder == "" {
cfg.Metadata.SearchOrder = fmt.Sprintf("%s,%s", metadata.ConfigDriveID, metadata.MetadataID)
Expand Down Expand Up @@ -309,6 +315,7 @@ func NewOpenStack(cfg Config) (*OpenStack, error) {
}

os := OpenStack{
regions: cfg.Global.Regions,
provider: provider,
epOpts: &gophercloud.EndpointOpts{
Region: cfg.Global.Region,
Expand Down

0 comments on commit 46aebfb

Please sign in to comment.