Skip to content

Commit

Permalink
update create ipamblock, recover already used ip
Browse files Browse the repository at this point in the history
  • Loading branch information
qiangzii committed Sep 26, 2023
1 parent 99dff69 commit b9f52a1
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 18 deletions.
4 changes: 2 additions & 2 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ func main() {
k8sInformerFactory := k8sinformers.NewSharedInformerFactory(k8sClient, time.Second*30)
informerFactory := informers.NewSharedInformerFactory(client, time.Second*30)

c1 := controller.NewVxNetPoolController(clusterConfig, k8sClient, client, informerFactory)
c1 := controller.NewVxNetPoolController(clusterConfig, k8sClient, client, informerFactory, k8sInformerFactory)

c2 := controller.NewIPPoolController(k8sClient, client,
k8sInformerFactory, informerFactory, ippool.NewProvider(client, networkv1alpha1.IPPoolTypeLocal, informerFactory))
k8sInformerFactory, informerFactory, ippool.NewProvider(client, networkv1alpha1.IPPoolTypeLocal, informerFactory, k8sInformerFactory))

// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
// Start method is non-blocking and runs all registered informers in a dedicated goroutine.
Expand Down
2 changes: 1 addition & 1 deletion cmd/ipam/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func main() {
informerFactory := informers.NewSharedInformerFactory(client, time.Second*30)

clusterConfig := config.NewClusterConfig(k8sInformerFactory.Core().V1().ConfigMaps())
ipamClient := ipam.NewIPAMClient(client, networkv1alpha1.IPPoolTypeLocal, informerFactory)
ipamClient := ipam.NewIPAMClient(client, networkv1alpha1.IPPoolTypeLocal, informerFactory, k8sInformerFactory)

k8sInformerFactory.Start(stopCh)
informerFactory.Start(stopCh)
Expand Down
14 changes: 9 additions & 5 deletions cmd/tools/ipam-client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import (
"fmt"
"time"

networkv1alpha1 "github.com/yunify/hostnic-cni/pkg/apis/network/v1alpha1"
"github.com/yunify/hostnic-cni/pkg/signals"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"

networkv1alpha1 "github.com/yunify/hostnic-cni/pkg/apis/network/v1alpha1"
clientset "github.com/yunify/hostnic-cni/pkg/client/clientset/versioned"
informers "github.com/yunify/hostnic-cni/pkg/client/informers/externalversions"
"github.com/yunify/hostnic-cni/pkg/constants"
Expand Down Expand Up @@ -49,20 +50,23 @@ func main() {
return
}

k8sInformerFactory := k8sinformers.NewSharedInformerFactory(k8sClient, time.Second*10)
informerFactory := informers.NewSharedInformerFactory(client, time.Second*30)
ipamClient := ipam.NewIPAMClient(client, networkv1alpha1.IPPoolTypeLocal, informerFactory)

ipamClient := ipam.NewIPAMClient(client, networkv1alpha1.IPPoolTypeLocal, informerFactory, k8sInformerFactory)

k8sInformerFactory.Start(stopCh)
informerFactory.Start(stopCh)

if err = ipamClient.Sync(stopCh); err != nil {
fmt.Printf("ipamclient sync error: %v", err)
return
}

c := ipam.NewIPAMClient(client, networkv1alpha1.IPPoolTypeLocal, informerFactory)
// c := ipam.NewIPAMClient(client, networkv1alpha1.IPPoolTypeLocal, informerFactory)

if handleID != "" {
if err := c.ReleaseByHandle(handleID); err != nil {
if err := ipamClient.ReleaseByHandle(handleID); err != nil {
fmt.Printf("Release %s failed: %v\n", handleID, err)
} else {
fmt.Printf("Release %s OK\n", handleID)
Expand All @@ -73,7 +77,7 @@ func main() {
if pool != "" {
args.Pools = []string{pool}
}
utils, err := c.GetPoolBlocksUtilization(args)
utils, err := ipamClient.GetPoolBlocksUtilization(args)
if err != nil {
fmt.Printf("GetUtilization failed: %v\n", err)
return
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/vxnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/selection"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
k8sinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -96,8 +97,7 @@ func NewVxNetPoolController(
kubeclientset kubernetes.Interface,
clientset clientset.Interface,
informers informers.SharedInformerFactory,
// ippoolInformer networkinformers.IPPoolInformer,
// poolInformer networkinformers.VxNetPoolInformer,
k8sInformers k8sinformers.SharedInformerFactory,
) *VxNetPoolController {

utilruntime.Must(poolscheme.AddToScheme(scheme.Scheme))
Expand All @@ -109,7 +109,7 @@ func NewVxNetPoolController(
controller := &VxNetPoolController{
kubeclientset: kubeclientset,
clientset: clientset,
ipamClient: ipam.NewIPAMClient(clientset, networkv1alpha1.IPPoolTypeLocal, informers),
ipamClient: ipam.NewIPAMClient(clientset, networkv1alpha1.IPPoolTypeLocal, informers, k8sInformers),
ippoolsLister: ippoolInformer.Lister(),
ippoolsSynced: ippoolInformer.Informer().HasSynced,
poolsLister: vxnetpoolInformer.Lister(),
Expand Down
120 changes: 115 additions & 5 deletions pkg/simple/client/network/ippool/ipam/ipam.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ package ipam

import (
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
"net"
"reflect"
"strings"

cnitypes "github.com/containernetworking/cni/pkg/types"
"github.com/containernetworking/cni/pkg/types/current"
Expand All @@ -31,9 +33,10 @@ import (
"github.com/projectcalico/libcalico-go/lib/set"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
k8sinformers "k8s.io/client-go/informers"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand All @@ -44,6 +47,7 @@ import (
"github.com/yunify/hostnic-cni/pkg/client/clientset/versioned/scheme"
informers "github.com/yunify/hostnic-cni/pkg/client/informers/externalversions"
networklisters "github.com/yunify/hostnic-cni/pkg/client/listers/network/v1alpha1"
"github.com/yunify/hostnic-cni/pkg/constants"
"github.com/yunify/hostnic-cni/pkg/simple/client/network/utils"
)

Expand Down Expand Up @@ -81,10 +85,12 @@ func (c IPAMClient) getAllPools() ([]v1alpha1.IPPool, error) {
}

// NewIPAMClient returns a new IPAMClient, which implements Interface.
func NewIPAMClient(client versioned.Interface, typeStr string, informers informers.SharedInformerFactory) IPAMClient {
func NewIPAMClient(client versioned.Interface, typeStr string, informers informers.SharedInformerFactory, k8sInformers k8sinformers.SharedInformerFactory) IPAMClient {
ippoolInformer := informers.Network().V1alpha1().IPPools()
ipamblocksInformer := informers.Network().V1alpha1().IPAMBlocks()
ipamhandleInformer := informers.Network().V1alpha1().IPAMHandles()
configMapInformer := k8sInformers.Core().V1().ConfigMaps()
podInformer := k8sInformers.Core().V1().Pods()
return IPAMClient{
typeStr: typeStr,
client: client,
Expand All @@ -94,6 +100,10 @@ func NewIPAMClient(client versioned.Interface, typeStr string, informers informe
ipamblocksSynced: ipamblocksInformer.Informer().HasSynced,
ipamhandleLister: ipamhandleInformer.Lister(),
ipamhandleSynced: ipamhandleInformer.Informer().HasSynced,
configMapLister: configMapInformer.Lister(),
configMapSynced: configMapInformer.Informer().HasSynced,
podLister: podInformer.Lister(),
podSynced: podInformer.Informer().HasSynced,
}
}

Expand All @@ -110,11 +120,17 @@ type IPAMClient struct {

ipamhandleLister networklisters.IPAMHandleLister
ipamhandleSynced cache.InformerSynced

configMapLister corelisters.ConfigMapLister
configMapSynced cache.InformerSynced

podLister corelisters.PodLister
podSynced cache.InformerSynced
}

func (c IPAMClient) Sync(stopCh <-chan struct{}) error {
klog.Info("Waiting for ippools and ipamblocks caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.ippoolsSynced, c.ipamblocksSynced, c.ipamhandleSynced); !ok {
if ok := cache.WaitForCacheSync(stopCh, c.ippoolsSynced, c.ipamblocksSynced, c.ipamhandleSynced, c.configMapSynced, c.podSynced); !ok {
return fmt.Errorf("failed to wait for ippools and ipamblocks caches to sync")
}
return nil
Expand Down Expand Up @@ -407,7 +423,7 @@ func (c IPAMClient) incrementHandle(handleID string, block *v1alpha1.IPAMBlock,
if k8serrors.IsNotFound(err) {
// Handle doesn't exist - create it.
handle = &v1alpha1.IPAMHandle{
ObjectMeta: v1.ObjectMeta{
ObjectMeta: metav1.ObjectMeta{
Name: handleID,
},
Spec: v1alpha1.IPAMHandleSpec{
Expand Down Expand Up @@ -771,7 +787,7 @@ func (c IPAMClient) AutoAssignFromPools(args AutoAssignArgs) (*current.Result, e
func (c IPAMClient) AutoAssignFromBlocks(args AutoAssignArgs) (*current.Result, error) {
var blocks []*v1alpha1.IPAMBlock
for _, block := range args.Blocks {
if b, err := c.client.NetworkV1alpha1().IPAMBlocks().Get(context.Background(), block, v1.GetOptions{}); err == nil {
if b, err := c.client.NetworkV1alpha1().IPAMBlocks().Get(context.Background(), block, metav1.GetOptions{}); err == nil {
blocks = append(blocks, b)
} else {
klog.Warningf("Get block %s failed: %v", block, err)
Expand Down Expand Up @@ -832,6 +848,10 @@ func (c IPAMClient) AutoGenerateBlocksFromPool(poolName string) error {
block := v1alpha1.NewBlock(pool, *subnet, reservedAttr)
blockName := block.BlockName()
controllerutil.SetControllerReference(pool, block, scheme.Scheme)
err = c.setBlockAttributes(blockName, block)
if err != nil {
return err
}
block, err = c.client.NetworkV1alpha1().IPAMBlocks().Create(context.Background(), block, metav1.CreateOptions{})
if err != nil {
if k8serrors.IsAlreadyExists(err) {
Expand Down Expand Up @@ -1122,3 +1142,93 @@ func updateAttribute(b *networkv1alpha1.IPAMBlock, handleID string, attrs map[st
b.Spec.Attributes = append(b.Spec.Attributes, attr)
return attrIndex
}

// setBlockAttributes set attributes for already used ip in the block
func (c IPAMClient) setBlockAttributes(blockName string, block *networkv1alpha1.IPAMBlock) error {
// 1. get ns for this ipamblock from configmap
// 2. get ip being used in the ns, also get handle_id(pause container id for this pod) for this ip
// 3. update block attributes

// 1. get configmap to get ns
cm, err := c.configMapLister.ConfigMaps(constants.IPAMConfigNamespace).Get(constants.IPAMConfigName)
if err != nil {
klog.Errorf("Get configmap %s/%s failed: %v, skip set block %s attribute", constants.IPAMConfigNamespace, constants.IPAMConfigName, err, blockName)
return err
}
var apps map[string][]string
err = json.Unmarshal([]byte(cm.Data[constants.IPAMConfigDate]), &apps)
if err != nil {
klog.Errorf("unmarshal configmap %s/%s data failed: %v, skip set block %s attribute", constants.IPAMConfigNamespace, constants.IPAMConfigName, err, blockName)
return nil
}
// 2. get pod in ns
var blockNs string
for ns, blocks := range apps {
if utils.SliceContains(blocks, blockName) {
blockNs = ns
break
}
}
if blockNs == "" {
return nil
}

// 3. update block
pods, err := c.podLister.Pods(blockNs).List(labels.Everything())
if err != nil {
klog.Errorf("get pod for ns %s err: %v, skip set block %s attribute", blockNs, err, blockName)
return nil
}

ipamHandles, err := c.ipamhandleLister.List(labels.Everything())
if err != nil {
klog.Errorf("list ipam handle error: %v, skip set block %s attribute", err, blockName)
return nil
}

// 4. check if pods ip belong to block
_, cidrNet, err := cnet.ParseCIDR(block.Spec.CIDR)
if err != nil {
klog.Errorf("parse block %s cidr err: %v, skip set block %s attribute", blockName, err, blockName)
return nil
}

for _, pod := range pods {
if pod != nil && pod.Status.PodIP != "" {
podIP := cnet.ParseIP(pod.Status.PodIP)
if cidrNet.Contains(podIP.IP) {
// get handle id,allocations index ,delete from unallocated slice
// handle id for ipamhandle
var handleID string
for _, ipamHandle := range ipamHandles {
if strings.Contains(ipamHandle.Name, pod.Namespace+"-"+pod.Name) {
handleID = ipamHandle.Name
}
}
if handleID == "" {
klog.Infof("ip %s of pod %s/%s belong to block %s, but can not found handle id, skip!", &pod.Status.PodIP, pod.Namespace, pod.Name, blockName)
continue
}

// get index
ordinal, err := block.IPToOrdinal(*podIP)
if err != nil {
klog.Errorf("get pod ip %s index err: %v, skip!", pod.Status.PodIP, err)
continue
}

// attr
for index, unUsedOrdinal := range block.Spec.Unallocated {
if ordinal == unUsedOrdinal {
block.Spec.Unallocated = append(block.Spec.Unallocated[:index], block.Spec.Unallocated[index+1:]...)
attribute := updateAttribute(block, handleID, nil)
block.Spec.Allocations[ordinal] = &attribute
}
}

}
}
}

return nil
}
5 changes: 3 additions & 2 deletions pkg/simple/client/network/ippool/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package ippool
import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
k8sinformers "k8s.io/client-go/informers"
"k8s.io/client-go/util/workqueue"

networkv1alpha1 "github.com/yunify/hostnic-cni/pkg/apis/network/v1alpha1"
Expand Down Expand Up @@ -107,14 +108,14 @@ func (p provider) GetIPPoolStats(pool *networkv1alpha1.IPPool) (*networkv1alpha1
return clone, nil
}

func NewProvider(clientset versioned.Interface, pt string, informers informers.SharedInformerFactory) Provider {
func NewProvider(clientset versioned.Interface, pt string, informers informers.SharedInformerFactory, k8sInformers k8sinformers.SharedInformerFactory) Provider {
var p Provider

switch pt {
case networkv1alpha1.IPPoolTypeLocal:
p = provider{
client: clientset,
ipamclient: ipam.NewIPAMClient(clientset, networkv1alpha1.IPPoolTypeLocal, informers),
ipamclient: ipam.NewIPAMClient(clientset, networkv1alpha1.IPPoolTypeLocal, informers, k8sInformers),
}
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/simple/client/network/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,12 @@ func EthRandomAddr(ip net.IP) string {
buf = append(buf, []byte(ip)...)
return fmt.Sprintf("%02x:%02x:%02x:%02x:%02x:%02x", buf[0], buf[1], buf[2], buf[3], buf[4], buf[5])
}

func SliceContains(ss []string, s string) bool {
for _, v := range ss {
if v == s {
return true
}
}
return false
}

0 comments on commit b9f52a1

Please sign in to comment.