Skip to content

Commit

Permalink
netpol: add lsp to node port group before pod is running
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangzujian committed Aug 15, 2023
1 parent 92f3f93 commit 1cd7fed
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 17 deletions.
7 changes: 6 additions & 1 deletion pkg/controller/network_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (c *Controller) handleUpdateNp(key string) error {
return err
}

if err = c.checkAndUpdateNodePortGroup(false); err != nil {
if err = c.checkAndUpdateNodePortGroup(false, ""); err != nil {
klog.Errorf("failed to update node acl: %v", err)
return err
}
Expand Down Expand Up @@ -571,6 +571,11 @@ func (c *Controller) handleDeleteNp(key string) error {
defer func() { _ = c.npKeyMutex.UnlockKey(key) }()
klog.Infof("handle delete network policy %s", key)

if err = c.checkAndUpdateNodePortGroup(false, ""); err != nil {
klog.Errorf("failed to update node acl: %v", err)
return err
}

npName := name
nameArray := []rune(name)
if !unicode.IsLetter(nameArray[0]) {
Expand Down
35 changes: 26 additions & 9 deletions pkg/controller/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -933,34 +933,48 @@ func (c *Controller) fetchPodsOnNode(nodeName string, pods []*v1.Pod) ([]string,
}

func (c *Controller) CheckNodePortGroup() {
if err := c.checkAndUpdateNodePortGroup(true); err != nil {
if err := c.checkAndUpdateNodePortGroup(true, ""); err != nil {
klog.Errorf("check node port group status: %v", err)
}
}

var nodeAclExists bool

func (c *Controller) checkAndUpdateNodePortGroup(updateIfNotExists bool) error {
func (c *Controller) checkAndUpdateNodePortGroup(alwaysUpdate bool, nodeName string) error {
if !c.config.EnableNP {
return nil
}

c.npKeyMutex.LockKey("node_acl")
defer func() { _ = c.npKeyMutex.UnlockKey("node_acl") }()

klog.V(3).Infoln("start to check node port-group status")
np, _ := c.npsLister.List(labels.Everything())
networkPolicyExists := len(np) != 0

if !updateIfNotExists && networkPolicyExists == nodeAclExists {
if !alwaysUpdate && networkPolicyExists == nodeAclExists {
return nil
}

nodes, err := c.nodesLister.List(labels.Everything())
pods, err := c.podsLister.List(labels.Everything())
if err != nil {
klog.Errorf("list nodes: %v", err)
klog.Errorf("failed to list pods: %v", err)
return err
}

pods, err := c.podsLister.List(labels.Everything())
if err != nil {
klog.Errorf("list pods, %v", err)
var nodes []*v1.Node
if nodeName != "" {
node, err := c.nodesLister.Get(nodeName)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
klog.Errorf("failed to get node %q: %v", err)
return err
}
nodes = []*v1.Node{node}
} else if nodes, err = c.nodesLister.List(labels.Everything()); err != nil {
klog.Errorf("failed to list nodes: %v", err)
return err
}

Expand Down Expand Up @@ -1003,7 +1017,10 @@ func (c *Controller) checkAndUpdateNodePortGroup(updateIfNotExists bool) error {
}
}

nodeAclExists = networkPolicyExists
if nodeName == "" {
nodeAclExists = networkPolicyExists
}

return nil
}

Expand Down
35 changes: 28 additions & 7 deletions pkg/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,12 @@ func (c *Controller) enqueueAddPod(obj interface{}) {
}

p := obj.(*v1.Pod)
workqueueKey := key
// TODO: we need to find a way to reduce duplicated np added to the queue
if c.config.EnableNP {
if p.Spec.NodeName != "" {
workqueueKey = fmt.Sprintf("%s@%s", key, p.Spec.NodeName)
}
c.namedPort.AddNamedPortByPod(p)
if p.Status.PodIP != "" {
for _, np := range c.podMatchNetworkPolicies(p) {
Expand Down Expand Up @@ -222,7 +226,7 @@ func (c *Controller) enqueueAddPod(obj interface{}) {
}
if exist {
klog.Infof("enqueue add pod %s", key)
c.addOrUpdatePodQueue.Add(key)
c.addOrUpdatePodQueue.Add(workqueueKey)
}
}

Expand Down Expand Up @@ -270,7 +274,12 @@ func (c *Controller) enqueueUpdatePod(oldObj, newObj interface{}) {
return
}

workqueueKey := key
if c.config.EnableNP {
if newPod.Spec.NodeName != oldPod.Spec.NodeName {
workqueueKey = fmt.Sprintf("%s@%s", key, newPod.Spec.NodeName)
}

c.namedPort.AddNamedPortByPod(newPod)
newNp := c.podMatchNetworkPolicies(newPod)
if !reflect.DeepEqual(oldPod.Labels, newPod.Labels) {
Expand Down Expand Up @@ -345,7 +354,7 @@ func (c *Controller) enqueueUpdatePod(oldObj, newObj interface{}) {
return
}
klog.Infof("enqueue update pod %s", key)
c.addOrUpdatePodQueue.Add(key)
c.addOrUpdatePodQueue.Add(workqueueKey)

// security policy changed
for _, podNet := range podNets {
Expand Down Expand Up @@ -544,15 +553,20 @@ func (c *Controller) changeVMSubnet(vmName, namespace, providerName, subnetName
}

func (c *Controller) handleAddOrUpdatePod(key string) (err error) {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
podKey := key
if idx := strings.IndexRune(key, '@'); idx >= 0 {
podKey = key[:idx]
}

namespace, name, err := cache.SplitMetaNamespaceKey(podKey)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", podKey))
return nil
}

c.podKeyMutex.LockKey(key)
defer func() { _ = c.podKeyMutex.UnlockKey(key) }()
klog.Infof("handle add/update pod %s", key)
c.podKeyMutex.LockKey(podKey)
defer func() { _ = c.podKeyMutex.UnlockKey(podKey) }()
klog.Infof("handle add/update pod %s", podKey)

cachedPod, err := c.podsLister.Pods(namespace).Get(name)
if err != nil {
Expand Down Expand Up @@ -600,6 +614,13 @@ func (c *Controller) handleAddOrUpdatePod(key string) (err error) {
}
}

if len(podKey) != len(key) && pod.Spec.NodeName != "" {
if err = c.checkAndUpdateNodePortGroup(true, pod.Spec.NodeName); err != nil {
klog.Errorf("failed to sync node port group for pod %s on node %s: %v", podKey, pod.Spec.NodeName, err)
return err
}
}

// check if route subnet is need.
pod = cachedPod.DeepCopy()
return c.reconcileRouteSubnets(cachedPod, pod, needRouteSubnets(pod, podNets))
Expand Down

0 comments on commit 1cd7fed

Please sign in to comment.