diff --git a/pkg/controller/network_policy.go b/pkg/controller/network_policy.go index 2a7a62cc966..f7b54c423a6 100644 --- a/pkg/controller/network_policy.go +++ b/pkg/controller/network_policy.go @@ -152,7 +152,7 @@ func (c *Controller) handleUpdateNp(key string) error { return err } - if err = c.checkAndUpdateNodePortGroup(false); err != nil { + if err = c.checkAndUpdateNodePortGroup(false, ""); err != nil { klog.Errorf("failed to update node acl: %v", err) return err } @@ -571,6 +571,11 @@ func (c *Controller) handleDeleteNp(key string) error { defer func() { _ = c.npKeyMutex.UnlockKey(key) }() klog.Infof("handle delete network policy %s", key) + if err = c.checkAndUpdateNodePortGroup(false, ""); err != nil { + klog.Errorf("failed to update node acl: %v", err) + return err + } + npName := name nameArray := []rune(name) if !unicode.IsLetter(nameArray[0]) { diff --git a/pkg/controller/node.go b/pkg/controller/node.go index ef0d77fc190..d2d96cdaf6e 100644 --- a/pkg/controller/node.go +++ b/pkg/controller/node.go @@ -933,14 +933,18 @@ func (c *Controller) fetchPodsOnNode(nodeName string, pods []*v1.Pod) ([]string, } func (c *Controller) CheckNodePortGroup() { - if err := c.checkAndUpdateNodePortGroup(true); err != nil { + if err := c.checkAndUpdateNodePortGroup(true, ""); err != nil { klog.Errorf("check node port group status: %v", err) } } var nodeAclExists bool -func (c *Controller) checkAndUpdateNodePortGroup(updateIfNotExists bool) error { +func (c *Controller) checkAndUpdateNodePortGroup(alwaysUpdate bool, nodeName string) error { + if !c.config.EnableNP { + return nil + } + c.npKeyMutex.LockKey("node_acl") defer func() { _ = c.npKeyMutex.UnlockKey("node_acl") }() @@ -948,19 +952,29 @@ func (c *Controller) checkAndUpdateNodePortGroup(updateIfNotExists bool) error { np, _ := c.npsLister.List(labels.Everything()) networkPolicyExists := len(np) != 0 - if !updateIfNotExists && networkPolicyExists == nodeAclExists { + if !alwaysUpdate && networkPolicyExists == nodeAclExists { return nil } - nodes, err := c.nodesLister.List(labels.Everything()) + pods, err := c.podsLister.List(labels.Everything()) if err != nil { - klog.Errorf("list nodes: %v", err) + klog.Errorf("failed to list pods: %v", err) return err } - pods, err := c.podsLister.List(labels.Everything()) - if err != nil { - klog.Errorf("list pods, %v", err) + var nodes []*v1.Node + if nodeName != "" { + node, err := c.nodesLister.Get(nodeName) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + klog.Errorf("failed to get node %q: %v", err) + return err + } + nodes = []*v1.Node{node} + } else if nodes, err = c.nodesLister.List(labels.Everything()); err != nil { + klog.Errorf("failed to list nodes: %v", err) return err } @@ -1003,7 +1017,10 @@ func (c *Controller) checkAndUpdateNodePortGroup(updateIfNotExists bool) error { } } - nodeAclExists = networkPolicyExists + if nodeName == "" { + nodeAclExists = networkPolicyExists + } + return nil } diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index 98dcda78b7e..22f53eda4d9 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -179,8 +179,12 @@ func (c *Controller) enqueueAddPod(obj interface{}) { } p := obj.(*v1.Pod) + workqueueKey := key // TODO: we need to find a way to reduce duplicated np added to the queue if c.config.EnableNP { + if p.Spec.NodeName != "" { + workqueueKey = fmt.Sprintf("%s@%s", key, p.Spec.NodeName) + } c.namedPort.AddNamedPortByPod(p) if p.Status.PodIP != "" { for _, np := range c.podMatchNetworkPolicies(p) { @@ -222,7 +226,7 @@ func (c *Controller) enqueueAddPod(obj interface{}) { } if need { klog.Infof("enqueue add pod %s", key) - c.addOrUpdatePodQueue.Add(key) + c.addOrUpdatePodQueue.Add(workqueueKey) } } @@ -270,7 +274,12 @@ func (c *Controller) enqueueUpdatePod(oldObj, newObj interface{}) { return } + workqueueKey := key if c.config.EnableNP { + if newPod.Spec.NodeName != oldPod.Spec.NodeName { + workqueueKey = fmt.Sprintf("%s@%s", key, newPod.Spec.NodeName) + } + c.namedPort.AddNamedPortByPod(newPod) newNp := c.podMatchNetworkPolicies(newPod) if !reflect.DeepEqual(oldPod.Labels, newPod.Labels) { @@ -345,7 +354,7 @@ func (c *Controller) enqueueUpdatePod(oldObj, newObj interface{}) { return } klog.Infof("enqueue update pod %s", key) - c.addOrUpdatePodQueue.Add(key) + c.addOrUpdatePodQueue.Add(workqueueKey) // security policy changed for _, podNet := range podNets { @@ -544,15 +553,20 @@ func (c *Controller) changeVMSubnet(vmName, namespace, providerName, subnetName } func (c *Controller) handleAddOrUpdatePod(key string) (err error) { - namespace, name, err := cache.SplitMetaNamespaceKey(key) + podKey := key + if idx := strings.IndexRune(key, '@'); idx >= 0 { + podKey = key[:idx] + } + + namespace, name, err := cache.SplitMetaNamespaceKey(podKey) if err != nil { - utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) + utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", podKey)) return nil } - c.podKeyMutex.LockKey(key) - defer func() { _ = c.podKeyMutex.UnlockKey(key) }() - klog.Infof("handle add/update pod %s", key) + c.podKeyMutex.LockKey(podKey) + defer func() { _ = c.podKeyMutex.UnlockKey(podKey) }() + klog.Infof("handle add/update pod %s", podKey) cachedPod, err := c.podsLister.Pods(namespace).Get(name) if err != nil { @@ -600,6 +614,13 @@ func (c *Controller) handleAddOrUpdatePod(key string) (err error) { } } + if len(podKey) != len(key) && pod.Spec.NodeName != "" { + if err = c.checkAndUpdateNodePortGroup(true, pod.Spec.NodeName); err != nil { + klog.Errorf("failed to sync node port group for pod %s on node %s: %v", podKey, pod.Spec.NodeName, err) + return err + } + } + // check if route subnet is need. pod = cachedPod.DeepCopy() return c.reconcileRouteSubnets(cachedPod, pod, needRouteSubnets(pod, podNets))