diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 8138f10a..d6c423e1 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -82,10 +82,10 @@ func main() { k8sInformerFactory := k8sinformers.NewSharedInformerFactory(k8sClient, time.Second*30) informerFactory := informers.NewSharedInformerFactory(client, time.Second*30) - c1 := controller.NewVxNetPoolController(clusterConfig, k8sClient, client, informerFactory) + c1 := controller.NewVxNetPoolController(clusterConfig, k8sClient, client, informerFactory, k8sInformerFactory) c2 := controller.NewIPPoolController(k8sClient, client, - k8sInformerFactory, informerFactory, ippool.NewProvider(client, networkv1alpha1.IPPoolTypeLocal, informerFactory)) + k8sInformerFactory, informerFactory, ippool.NewProvider(client, networkv1alpha1.IPPoolTypeLocal, informerFactory, k8sInformerFactory)) // notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh) // Start method is non-blocking and runs all registered informers in a dedicated goroutine. diff --git a/cmd/ipam/main.go b/cmd/ipam/main.go index ef7f777a..4ddd6fcb 100644 --- a/cmd/ipam/main.go +++ b/cmd/ipam/main.go @@ -96,7 +96,7 @@ func main() { informerFactory := informers.NewSharedInformerFactory(client, time.Second*30) clusterConfig := config.NewClusterConfig(k8sInformerFactory.Core().V1().ConfigMaps()) - ipamClient := ipam.NewIPAMClient(client, networkv1alpha1.IPPoolTypeLocal, informerFactory) + ipamClient := ipam.NewIPAMClient(client, networkv1alpha1.IPPoolTypeLocal, informerFactory, k8sInformerFactory) k8sInformerFactory.Start(stopCh) informerFactory.Start(stopCh) diff --git a/cmd/tools/ipam-client/client.go b/cmd/tools/ipam-client/client.go index 8ae97c72..ddf02992 100644 --- a/cmd/tools/ipam-client/client.go +++ b/cmd/tools/ipam-client/client.go @@ -7,12 +7,13 @@ import ( "fmt" "time" - networkv1alpha1 "github.com/yunify/hostnic-cni/pkg/apis/network/v1alpha1" "github.com/yunify/hostnic-cni/pkg/signals" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" + networkv1alpha1 "github.com/yunify/hostnic-cni/pkg/apis/network/v1alpha1" clientset "github.com/yunify/hostnic-cni/pkg/client/clientset/versioned" informers "github.com/yunify/hostnic-cni/pkg/client/informers/externalversions" "github.com/yunify/hostnic-cni/pkg/constants" @@ -49,9 +50,12 @@ func main() { return } + k8sInformerFactory := k8sinformers.NewSharedInformerFactory(k8sClient, time.Second*10) informerFactory := informers.NewSharedInformerFactory(client, time.Second*30) - ipamClient := ipam.NewIPAMClient(client, networkv1alpha1.IPPoolTypeLocal, informerFactory) + ipamClient := ipam.NewIPAMClient(client, networkv1alpha1.IPPoolTypeLocal, informerFactory, k8sInformerFactory) + + k8sInformerFactory.Start(stopCh) informerFactory.Start(stopCh) if err = ipamClient.Sync(stopCh); err != nil { @@ -59,10 +63,10 @@ func main() { return } - c := ipam.NewIPAMClient(client, networkv1alpha1.IPPoolTypeLocal, informerFactory) + // c := ipam.NewIPAMClient(client, networkv1alpha1.IPPoolTypeLocal, informerFactory) if handleID != "" { - if err := c.ReleaseByHandle(handleID); err != nil { + if err := ipamClient.ReleaseByHandle(handleID); err != nil { fmt.Printf("Release %s failed: %v\n", handleID, err) } else { fmt.Printf("Release %s OK\n", handleID) @@ -73,7 +77,7 @@ func main() { if pool != "" { args.Pools = []string{pool} } - utils, err := c.GetPoolBlocksUtilization(args) + utils, err := ipamClient.GetPoolBlocksUtilization(args) if err != nil { fmt.Printf("GetUtilization failed: %v\n", err) return diff --git a/cmd/tools/node-patch/patch.go b/cmd/tools/node-patch/patch.go index 8716da71..77ebf5c7 100644 --- a/cmd/tools/node-patch/patch.go +++ b/cmd/tools/node-patch/patch.go @@ -3,6 +3,7 @@ package main import ( "context" "flag" + "strings" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -10,6 +11,8 @@ import ( "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" + "github.com/yunify/hostnic-cni/pkg/conf" + "github.com/yunify/hostnic-cni/pkg/constants" "github.com/yunify/hostnic-cni/pkg/qcclient" "github.com/yunify/hostnic-cni/pkg/rpc" ) @@ -60,9 +63,11 @@ func handleNode(clusterID string, k8sClient *kubernetes.Clientset) error { if host := getHostForNode(node.Status.Addresses, qcNodes); host != "" { if err := patchNode(k8sClient, host, node); err != nil { klog.Errorf("patch %s failed: %v", topoKey, err) + } else { + klog.Infof("patch node %s with hostmachine %s success", node.Name, host) } } else { - klog.Errorf("get host for %s failed", node.Name) + klog.Errorf("get host for node %s failed", node.Name) } } return nil @@ -76,9 +81,17 @@ func main() { flag.StringVar(&clusterID, "clusterID", "", "clusterID") flag.Parse() - if clusterID == "" { - klog.Info("Plesse input clusterID: ./patch-node --clusterID cl-xxxxxxxx") - return + if !strings.HasPrefix(clusterID, "cl-") { + // use clusterid read from /etc/kubernetes/qingcloud.yaml + klog.Infof("invalid clusterID %s, try to get clusterID from volumed cluterConfig %s", clusterID, constants.DefaultClusterConfigPath) + + clusterConfig, err := conf.TryLoadClusterConfFromDisk(constants.DefaultClusterConfigPath) + if err != nil || clusterConfig == nil { + klog.Fatalf("Error building clusterConfig: %s", err.Error()) + } + clusterID = clusterConfig.ClusterID + klog.Infof("get clusterID success: %s", clusterID) + } // setup qcclient, k8s diff --git a/config/default/manager_image_patch.yaml b/config/default/manager_image_patch.yaml index d1ba14ad..9019f748 100644 --- a/config/default/manager_image_patch.yaml +++ b/config/default/manager_image_patch.yaml @@ -6,11 +6,11 @@ spec: template: spec: initContainers: - - image: qingcloud/hostnic-plus:v1.0.3 + - image: qingcloud/hostnic-plus:v1.0.7 name: hostnic-init imagePullPolicy: IfNotPresent containers: # Change the value of image field below to your controller image URL - - image: qingcloud/hostnic-plus:v1.0.3 + - image: qingcloud/hostnic-plus:v1.0.7 name: hostnic-node imagePullPolicy: IfNotPresent \ No newline at end of file diff --git a/config/default/manager_image_patch.yaml-e b/config/default/manager_image_patch.yaml-e index d1ba14ad..9019f748 100644 --- a/config/default/manager_image_patch.yaml-e +++ b/config/default/manager_image_patch.yaml-e @@ -6,11 +6,11 @@ spec: template: spec: initContainers: - - image: qingcloud/hostnic-plus:v1.0.3 + - image: qingcloud/hostnic-plus:v1.0.7 name: hostnic-init imagePullPolicy: IfNotPresent containers: # Change the value of image field below to your controller image URL - - image: qingcloud/hostnic-plus:v1.0.3 + - image: qingcloud/hostnic-plus:v1.0.7 name: hostnic-node imagePullPolicy: IfNotPresent \ No newline at end of file diff --git a/deploy/cronjob.yaml b/deploy/cronjob.yaml index 62a6edec..874ff007 100644 --- a/deploy/cronjob.yaml +++ b/deploy/cronjob.yaml @@ -13,8 +13,8 @@ spec: spec: containers: - name: node-patch - image: qingcloud/hostnic-plus:v1.0.0 - command: ["sh", "-c", "/app/tools/patch-node --clusterID CLUSTERID"] + image: qingcloud/hostnic-plus:v1.0.7 + command: ["sh", "-c", "/app/tools/patch-node"] volumeMounts: - mountPath: /root/.qingcloud/ name: apiaccesskey @@ -22,6 +22,9 @@ spec: - mountPath: /etc/qingcloud/ name: qingcloud-cfg readOnly: true + - mountPath: /etc/kubernetes + name: clusterconfig + readOnly: true restartPolicy: OnFailure serviceAccount: hostnic-node serviceAccountName: hostnic-node @@ -35,3 +38,6 @@ spec: - hostPath: path: /etc/qingcloud name: qingcloud-cfg + - configMap: + name: clusterconfig + name: clusterconfig diff --git a/deploy/hostnic.yaml b/deploy/hostnic.yaml index 32c7213a..27c77e91 100644 --- a/deploy/hostnic.yaml +++ b/deploy/hostnic.yaml @@ -474,7 +474,7 @@ spec: fieldRef: apiVersion: v1 fieldPath: metadata.namespace - image: qingcloud/hostnic-plus:v1.0.3 + image: qingcloud/hostnic-plus:v1.0.7 imagePullPolicy: IfNotPresent name: hostnic-node ports: @@ -513,7 +513,7 @@ spec: - /app/install_hostnic.sh command: - /bin/sh - image: qingcloud/hostnic-plus:v1.0.3 + image: qingcloud/hostnic-plus:v1.0.7 imagePullPolicy: IfNotPresent name: hostnic-init resources: {} @@ -588,7 +588,7 @@ spec: spec: containers: - name: hostnic-controller - image: qingcloud/hostnic-plus:v1.0.3 + image: qingcloud/hostnic-plus:v1.0.7 command: - /app/hostnic-controller - --v=5 diff --git a/pkg/conf/conf.go b/pkg/conf/conf.go index 97aca963..d6954e6e 100644 --- a/pkg/conf/conf.go +++ b/pkg/conf/conf.go @@ -2,7 +2,7 @@ package conf import ( "fmt" - "io/ioutil" + "os" "github.com/spf13/viper" "k8s.io/apimachinery/pkg/util/yaml" @@ -96,7 +96,7 @@ type ClusterConfig struct { } func TryLoadClusterConfFromDisk(file string) (*ClusterConfig, error) { - content, err := ioutil.ReadFile(file) + content, err := os.ReadFile(file) if err != nil { return nil, err } diff --git a/pkg/controller/vxnet.go b/pkg/controller/vxnet.go index 36e17f6a..6c68db46 100644 --- a/pkg/controller/vxnet.go +++ b/pkg/controller/vxnet.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/selection" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + k8sinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" @@ -96,8 +97,7 @@ func NewVxNetPoolController( kubeclientset kubernetes.Interface, clientset clientset.Interface, informers informers.SharedInformerFactory, - // ippoolInformer networkinformers.IPPoolInformer, - // poolInformer networkinformers.VxNetPoolInformer, + k8sInformers k8sinformers.SharedInformerFactory, ) *VxNetPoolController { utilruntime.Must(poolscheme.AddToScheme(scheme.Scheme)) @@ -109,7 +109,7 @@ func NewVxNetPoolController( controller := &VxNetPoolController{ kubeclientset: kubeclientset, clientset: clientset, - ipamClient: ipam.NewIPAMClient(clientset, networkv1alpha1.IPPoolTypeLocal, informers), + ipamClient: ipam.NewIPAMClient(clientset, networkv1alpha1.IPPoolTypeLocal, informers, k8sInformers), ippoolsLister: ippoolInformer.Lister(), ippoolsSynced: ippoolInformer.Informer().HasSynced, poolsLister: vxnetpoolInformer.Lister(), diff --git a/pkg/simple/client/network/ippool/ipam/ipam.go b/pkg/simple/client/network/ippool/ipam/ipam.go index c282b2a8..6d203a0e 100644 --- a/pkg/simple/client/network/ippool/ipam/ipam.go +++ b/pkg/simple/client/network/ippool/ipam/ipam.go @@ -18,11 +18,13 @@ package ipam import ( "context" + "encoding/json" "errors" "fmt" "math/big" "net" "reflect" + "strings" cnitypes "github.com/containernetworking/cni/pkg/types" "github.com/containernetworking/cni/pkg/types/current" @@ -31,9 +33,10 @@ import ( "github.com/projectcalico/libcalico-go/lib/set" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" + k8sinformers "k8s.io/client-go/informers" + corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -44,6 +47,7 @@ import ( "github.com/yunify/hostnic-cni/pkg/client/clientset/versioned/scheme" informers "github.com/yunify/hostnic-cni/pkg/client/informers/externalversions" networklisters "github.com/yunify/hostnic-cni/pkg/client/listers/network/v1alpha1" + "github.com/yunify/hostnic-cni/pkg/constants" "github.com/yunify/hostnic-cni/pkg/simple/client/network/utils" ) @@ -81,10 +85,12 @@ func (c IPAMClient) getAllPools() ([]v1alpha1.IPPool, error) { } // NewIPAMClient returns a new IPAMClient, which implements Interface. -func NewIPAMClient(client versioned.Interface, typeStr string, informers informers.SharedInformerFactory) IPAMClient { +func NewIPAMClient(client versioned.Interface, typeStr string, informers informers.SharedInformerFactory, k8sInformers k8sinformers.SharedInformerFactory) IPAMClient { ippoolInformer := informers.Network().V1alpha1().IPPools() ipamblocksInformer := informers.Network().V1alpha1().IPAMBlocks() ipamhandleInformer := informers.Network().V1alpha1().IPAMHandles() + configMapInformer := k8sInformers.Core().V1().ConfigMaps() + podInformer := k8sInformers.Core().V1().Pods() return IPAMClient{ typeStr: typeStr, client: client, @@ -94,6 +100,10 @@ func NewIPAMClient(client versioned.Interface, typeStr string, informers informe ipamblocksSynced: ipamblocksInformer.Informer().HasSynced, ipamhandleLister: ipamhandleInformer.Lister(), ipamhandleSynced: ipamhandleInformer.Informer().HasSynced, + configMapLister: configMapInformer.Lister(), + configMapSynced: configMapInformer.Informer().HasSynced, + podLister: podInformer.Lister(), + podSynced: podInformer.Informer().HasSynced, } } @@ -110,11 +120,17 @@ type IPAMClient struct { ipamhandleLister networklisters.IPAMHandleLister ipamhandleSynced cache.InformerSynced + + configMapLister corelisters.ConfigMapLister + configMapSynced cache.InformerSynced + + podLister corelisters.PodLister + podSynced cache.InformerSynced } func (c IPAMClient) Sync(stopCh <-chan struct{}) error { klog.Info("Waiting for ippools and ipamblocks caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.ippoolsSynced, c.ipamblocksSynced, c.ipamhandleSynced); !ok { + if ok := cache.WaitForCacheSync(stopCh, c.ippoolsSynced, c.ipamblocksSynced, c.ipamhandleSynced, c.configMapSynced, c.podSynced); !ok { return fmt.Errorf("failed to wait for ippools and ipamblocks caches to sync") } return nil @@ -407,7 +423,7 @@ func (c IPAMClient) incrementHandle(handleID string, block *v1alpha1.IPAMBlock, if k8serrors.IsNotFound(err) { // Handle doesn't exist - create it. handle = &v1alpha1.IPAMHandle{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: handleID, }, Spec: v1alpha1.IPAMHandleSpec{ @@ -771,7 +787,7 @@ func (c IPAMClient) AutoAssignFromPools(args AutoAssignArgs) (*current.Result, e func (c IPAMClient) AutoAssignFromBlocks(args AutoAssignArgs) (*current.Result, error) { var blocks []*v1alpha1.IPAMBlock for _, block := range args.Blocks { - if b, err := c.client.NetworkV1alpha1().IPAMBlocks().Get(context.Background(), block, v1.GetOptions{}); err == nil { + if b, err := c.client.NetworkV1alpha1().IPAMBlocks().Get(context.Background(), block, metav1.GetOptions{}); err == nil { blocks = append(blocks, b) } else { klog.Warningf("Get block %s failed: %v", block, err) @@ -832,6 +848,10 @@ func (c IPAMClient) AutoGenerateBlocksFromPool(poolName string) error { block := v1alpha1.NewBlock(pool, *subnet, reservedAttr) blockName := block.BlockName() controllerutil.SetControllerReference(pool, block, scheme.Scheme) + err = c.setBlockAttributes(blockName, block) + if err != nil { + return err + } block, err = c.client.NetworkV1alpha1().IPAMBlocks().Create(context.Background(), block, metav1.CreateOptions{}) if err != nil { if k8serrors.IsAlreadyExists(err) { @@ -1122,3 +1142,93 @@ func updateAttribute(b *networkv1alpha1.IPAMBlock, handleID string, attrs map[st b.Spec.Attributes = append(b.Spec.Attributes, attr) return attrIndex } + +// setBlockAttributes set attributes for already used ip in the block +func (c IPAMClient) setBlockAttributes(blockName string, block *networkv1alpha1.IPAMBlock) error { + // 1. get ns for this ipamblock from configmap + // 2. get ip being used in the ns, also get handle_id(pause container id for this pod) for this ip + // 3. update block attributes + + // 1. get configmap to get ns + cm, err := c.configMapLister.ConfigMaps(constants.IPAMConfigNamespace).Get(constants.IPAMConfigName) + if err != nil { + klog.Errorf("Get configmap %s/%s failed: %v, skip set block %s attribute", constants.IPAMConfigNamespace, constants.IPAMConfigName, err, blockName) + return err + } + var apps map[string][]string + err = json.Unmarshal([]byte(cm.Data[constants.IPAMConfigDate]), &apps) + if err != nil { + klog.Errorf("unmarshal configmap %s/%s data failed: %v, skip set block %s attribute", constants.IPAMConfigNamespace, constants.IPAMConfigName, err, blockName) + return nil + } + // 2. get pod in ns + var blockNs string + for ns, blocks := range apps { + if utils.SliceContains(blocks, blockName) { + blockNs = ns + break + } + } + if blockNs == "" { + return nil + } + + // 3. update block + pods, err := c.podLister.Pods(blockNs).List(labels.Everything()) + if err != nil { + klog.Errorf("get pod for ns %s err: %v, skip set block %s attribute", blockNs, err, blockName) + return nil + } + + ipamHandles, err := c.ipamhandleLister.List(labels.Everything()) + if err != nil { + klog.Errorf("list ipam handle error: %v, skip set block %s attribute", err, blockName) + return nil + } + + // 4. check if pods ip belong to block + _, cidrNet, err := cnet.ParseCIDR(block.Spec.CIDR) + if err != nil { + klog.Errorf("parse block %s cidr err: %v, skip set block %s attribute", blockName, err, blockName) + return nil + } + + for _, pod := range pods { + if pod != nil && pod.Status.PodIP != "" { + podIP := cnet.ParseIP(pod.Status.PodIP) + if cidrNet.Contains(podIP.IP) { + // get handle id,allocations index ,delete from unallocated slice + // handle id for ipamhandle + var handleID string + for _, ipamHandle := range ipamHandles { + if strings.Contains(ipamHandle.Name, pod.Namespace+"-"+pod.Name) { + handleID = ipamHandle.Name + } + } + if handleID == "" { + klog.Infof("ip %s of pod %s/%s belong to block %s, but can not found handle id, skip!", &pod.Status.PodIP, pod.Namespace, pod.Name, blockName) + continue + } + + // get index + ordinal, err := block.IPToOrdinal(*podIP) + if err != nil { + klog.Errorf("get pod ip %s index err: %v, skip!", pod.Status.PodIP, err) + continue + } + + // attr + for index, unUsedOrdinal := range block.Spec.Unallocated { + if ordinal == unUsedOrdinal { + block.Spec.Unallocated = append(block.Spec.Unallocated[:index], block.Spec.Unallocated[index+1:]...) + attribute := updateAttribute(block, handleID, nil) + block.Spec.Allocations[ordinal] = &attribute + } + } + + } + } + } + + return nil +} diff --git a/pkg/simple/client/network/ippool/provider.go b/pkg/simple/client/network/ippool/provider.go index c449c572..61da57e9 100644 --- a/pkg/simple/client/network/ippool/provider.go +++ b/pkg/simple/client/network/ippool/provider.go @@ -19,6 +19,7 @@ package ippool import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + k8sinformers "k8s.io/client-go/informers" "k8s.io/client-go/util/workqueue" networkv1alpha1 "github.com/yunify/hostnic-cni/pkg/apis/network/v1alpha1" @@ -107,14 +108,14 @@ func (p provider) GetIPPoolStats(pool *networkv1alpha1.IPPool) (*networkv1alpha1 return clone, nil } -func NewProvider(clientset versioned.Interface, pt string, informers informers.SharedInformerFactory) Provider { +func NewProvider(clientset versioned.Interface, pt string, informers informers.SharedInformerFactory, k8sInformers k8sinformers.SharedInformerFactory) Provider { var p Provider switch pt { case networkv1alpha1.IPPoolTypeLocal: p = provider{ client: clientset, - ipamclient: ipam.NewIPAMClient(clientset, networkv1alpha1.IPPoolTypeLocal, informers), + ipamclient: ipam.NewIPAMClient(clientset, networkv1alpha1.IPPoolTypeLocal, informers, k8sInformers), } } diff --git a/pkg/simple/client/network/utils/utils.go b/pkg/simple/client/network/utils/utils.go index 70778654..91e61aea 100644 --- a/pkg/simple/client/network/utils/utils.go +++ b/pkg/simple/client/network/utils/utils.go @@ -35,3 +35,12 @@ func EthRandomAddr(ip net.IP) string { buf = append(buf, []byte(ip)...) return fmt.Sprintf("%02x:%02x:%02x:%02x:%02x:%02x", buf[0], buf[1], buf[2], buf[3], buf[4], buf[5]) } + +func SliceContains(ss []string, s string) bool { + for _, v := range ss { + if v == s { + return true + } + } + return false +}