Skip to content

Commit

Permalink
workquque: use typed interface
Browse files Browse the repository at this point in the history
Signed-off-by: zhangzujian <[email protected]>
  • Loading branch information
zhangzujian committed Aug 15, 2024
1 parent 28daf4b commit ac9bb9f
Show file tree
Hide file tree
Showing 30 changed files with 426 additions and 2,942 deletions.
131 changes: 10 additions & 121 deletions pkg/controller/admin_network_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"reflect"
"strings"
"time"
"unicode"

"github.com/scylladb/go-set/strset"
Expand Down Expand Up @@ -38,7 +37,7 @@ type ChangedName struct {
curRuleName string
}

type ChangedDelta struct {
type AdminNetworkPolicyChangedDelta struct {
key string
ruleNames [util.AnpMaxRules]ChangedName
field ChangedField
Expand All @@ -56,14 +55,9 @@ func (c *Controller) enqueueAddAnp(obj interface{}) {
}

func (c *Controller) enqueueDeleteAnp(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
klog.V(3).Infof("enqueue delete anp %s", key)
c.deleteAnpQueue.Add(obj)
anp := obj.(*v1alpha1.AdminNetworkPolicy)
klog.V(3).Infof("enqueue delete anp %s", anp.Name)
c.deleteAnpQueue.Add(anp)
}

func (c *Controller) enqueueUpdateAnp(oldObj, newObj interface{}) {
Expand Down Expand Up @@ -104,7 +98,7 @@ func (c *Controller) enqueueUpdateAnp(oldObj, newObj interface{}) {
// The remaining changes do not affect the acls. The port-group or address-set should be updated.
// The port-group for anp should be updated
if !reflect.DeepEqual(oldAnpObj.Spec.Subject, newAnpObj.Spec.Subject) {
c.updateAnpQueue.Add(ChangedDelta{key: newAnpObj.Name, field: ChangedSubject})
c.updateAnpQueue.Add(&AdminNetworkPolicyChangedDelta{key: newAnpObj.Name, field: ChangedSubject})
}

// Rule name or peer selector in ingress/egress rule has changed, the corresponding address-set need be updated
Expand All @@ -122,7 +116,7 @@ func (c *Controller) enqueueUpdateAnp(oldObj, newObj interface{}) {
}
}
if ruleChanged {
c.updateAnpQueue.Add(ChangedDelta{key: newAnpObj.Name, ruleNames: changedIngressRuleNames, field: ChangedIngressRule})
c.updateAnpQueue.Add(&AdminNetworkPolicyChangedDelta{key: newAnpObj.Name, ruleNames: changedIngressRuleNames, field: ChangedIngressRule})
}

ruleChanged = false
Expand All @@ -138,113 +132,8 @@ func (c *Controller) enqueueUpdateAnp(oldObj, newObj interface{}) {
}
}
if ruleChanged {
c.updateAnpQueue.Add(ChangedDelta{key: newAnpObj.Name, ruleNames: changedEgressRuleNames, field: ChangedEgressRule})
}
}

func (c *Controller) runAddAnpWorker() {
for c.processNextAddAnpWorkItem() {
}
}

func (c *Controller) runUpdateAnpWorker() {
for c.processNextUpdateAnpWorkItem() {
}
}

func (c *Controller) runDeleteAnpWorker() {
for c.processNextDeleteAnpWorkItem() {
}
}

func (c *Controller) processNextAddAnpWorkItem() bool {
obj, shutdown := c.addAnpQueue.Get()
if shutdown {
return false
}
now := time.Now()

err := func(obj interface{}) error {
defer c.addAnpQueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.addAnpQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
if err := c.handleAddAnp(key); err != nil {
c.addAnpQueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
last := time.Since(now)
klog.Infof("take %d ms to handle add anp %s", last.Milliseconds(), key)
c.addAnpQueue.Forget(obj)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}

func (c *Controller) processNextUpdateAnpWorkItem() bool {
obj, shutdown := c.updateAnpQueue.Get()
if shutdown {
return false
}

err := func(obj interface{}) error {
defer c.updateAnpQueue.Done(obj)
var key ChangedDelta
var ok bool
if key, ok = obj.(ChangedDelta); !ok {
c.updateAnpQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected ChangedDelta in workqueue but got %#v", obj))
return nil
}
if err := c.handleUpdateAnp(key); err != nil {
c.updateAnpQueue.AddRateLimited(key)
return fmt.Errorf("error syncing admin network policy %s: %w, requeuing", key.key, err)
}
c.updateAnpQueue.Forget(obj)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}

func (c *Controller) processNextDeleteAnpWorkItem() bool {
obj, shutdown := c.deleteAnpQueue.Get()
if shutdown {
return false
}

err := func(obj interface{}) error {
defer c.deleteAnpQueue.Done(obj)
var anp *v1alpha1.AdminNetworkPolicy
var ok bool
if anp, ok = obj.(*v1alpha1.AdminNetworkPolicy); !ok {
c.deleteAnpQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected anp object in workqueue but got %#v", obj))
return nil
}
if err := c.handleDeleteAnp(anp); err != nil {
c.deleteAnpQueue.AddRateLimited(obj)
return fmt.Errorf("error syncing anp '%s': %s, requeuing", anp.Name, err.Error())
}
c.deleteAnpQueue.Forget(obj)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
c.updateAnpQueue.Add(&AdminNetworkPolicyChangedDelta{key: newAnpObj.Name, ruleNames: changedEgressRuleNames, field: ChangedEgressRule})
}
return true
}

func (c *Controller) handleAddAnp(key string) (err error) {
Expand Down Expand Up @@ -479,7 +368,7 @@ func (c *Controller) handleDeleteAnp(anp *v1alpha1.AdminNetworkPolicy) error {
return nil
}

func (c *Controller) handleUpdateAnp(changed ChangedDelta) error {
func (c *Controller) handleUpdateAnp(changed *AdminNetworkPolicyChangedDelta) error {
// Only handle updates that do not affect acls.
c.anpKeyMutex.LockKey(changed.key)
defer func() { _ = c.anpKeyMutex.UnlockKey(changed.key) }()
Expand Down Expand Up @@ -899,7 +788,7 @@ func (c *Controller) setAddrSetForAnpRule(anpName, pgName, ruleName string, inde
func (c *Controller) updateAnpsByLabelsMatch(nsLabels, podLabels map[string]string) {
anps, _ := c.anpsLister.List(labels.Everything())
for _, anp := range anps {
changed := ChangedDelta{
changed := &AdminNetworkPolicyChangedDelta{
key: anp.Name,
}

Expand Down Expand Up @@ -927,7 +816,7 @@ func (c *Controller) updateAnpsByLabelsMatch(nsLabels, podLabels map[string]stri

banps, _ := c.banpsLister.List(labels.Everything())
for _, banp := range banps {
changed := ChangedDelta{
changed := &AdminNetworkPolicyChangedDelta{
key: banp.Name,
}

Expand Down
125 changes: 7 additions & 118 deletions pkg/controller/baseline_admin_network_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"reflect"
"strings"
"time"

"github.com/scylladb/go-set/strset"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -29,14 +28,9 @@ func (c *Controller) enqueueAddBanp(obj interface{}) {
}

func (c *Controller) enqueueDeleteBanp(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
klog.V(3).Infof("enqueue delete banp %s", key)
c.deleteBanpQueue.Add(obj)
banp := obj.(*v1alpha1.BaselineAdminNetworkPolicy)
klog.V(3).Infof("enqueue delete banp %s", banp.Name)
c.deleteBanpQueue.Add(banp)
}

func (c *Controller) enqueueUpdateBanp(oldObj, newObj interface{}) {
Expand Down Expand Up @@ -75,7 +69,7 @@ func (c *Controller) enqueueUpdateBanp(oldObj, newObj interface{}) {
// The remaining changes do not affect the acls. The port-group or address-set should be updated.
// The port-group for anp should be updated
if !reflect.DeepEqual(oldBanp.Spec.Subject, newBanp.Spec.Subject) {
c.updateBanpQueue.Add(ChangedDelta{key: newBanp.Name, field: ChangedSubject})
c.updateBanpQueue.Add(&AdminNetworkPolicyChangedDelta{key: newBanp.Name, field: ChangedSubject})
}

// Rule name or peer selector in ingress/egress rule has changed, the corresponding address-set need be updated
Expand All @@ -94,7 +88,7 @@ func (c *Controller) enqueueUpdateBanp(oldObj, newObj interface{}) {
}
}
if ruleChanged {
c.updateBanpQueue.Add(ChangedDelta{key: newBanp.Name, ruleNames: changedIngressRuleNames, field: ChangedIngressRule})
c.updateBanpQueue.Add(&AdminNetworkPolicyChangedDelta{key: newBanp.Name, ruleNames: changedIngressRuleNames, field: ChangedIngressRule})
}

ruleChanged = false
Expand All @@ -110,113 +104,8 @@ func (c *Controller) enqueueUpdateBanp(oldObj, newObj interface{}) {
}
}
if ruleChanged {
c.updateBanpQueue.Add(ChangedDelta{key: newBanp.Name, ruleNames: changedEgressRuleNames, field: ChangedEgressRule})
}
}

func (c *Controller) runAddBanpWorker() {
for c.processNextAddBanpWorkItem() {
}
}

func (c *Controller) runUpdateBanpWorker() {
for c.processNextUpdateBanpWorkItem() {
}
}

func (c *Controller) runDeleteBanpWorker() {
for c.processNextDeleteBanpWorkItem() {
}
}

func (c *Controller) processNextAddBanpWorkItem() bool {
obj, shutdown := c.addBanpQueue.Get()
if shutdown {
return false
}
now := time.Now()

err := func(obj interface{}) error {
defer c.addBanpQueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.addBanpQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
if err := c.handleAddBanp(key); err != nil {
c.addBanpQueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
last := time.Since(now)
klog.Infof("take %d ms to handle add banp %s", last.Milliseconds(), key)
c.addBanpQueue.Forget(obj)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}

func (c *Controller) processNextUpdateBanpWorkItem() bool {
obj, shutdown := c.updateBanpQueue.Get()
if shutdown {
return false
}

err := func(obj interface{}) error {
defer c.updateBanpQueue.Done(obj)
var key ChangedDelta
var ok bool
if key, ok = obj.(ChangedDelta); !ok {
c.updateBanpQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected ChangedDelta in workqueue but got %#v", obj))
return nil
}
if err := c.handleUpdateBanp(key); err != nil {
c.updateBanpQueue.AddRateLimited(key)
return fmt.Errorf("error syncing banp %s: %w, requeuing", key.key, err)
}
c.updateBanpQueue.Forget(obj)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}

func (c *Controller) processNextDeleteBanpWorkItem() bool {
obj, shutdown := c.deleteBanpQueue.Get()
if shutdown {
return false
}

err := func(obj interface{}) error {
defer c.deleteBanpQueue.Done(obj)
var banp *v1alpha1.BaselineAdminNetworkPolicy
var ok bool
if banp, ok = obj.(*v1alpha1.BaselineAdminNetworkPolicy); !ok {
c.deleteBanpQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected banp object in workqueue but got %#v", obj))
return nil
}
if err := c.handleDeleteBanp(banp); err != nil {
c.deleteBanpQueue.AddRateLimited(obj)
return fmt.Errorf("error syncing banp '%s': %s, requeuing", banp.Name, err.Error())
}
c.deleteBanpQueue.Forget(obj)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
c.updateBanpQueue.Add(&AdminNetworkPolicyChangedDelta{key: newBanp.Name, ruleNames: changedEgressRuleNames, field: ChangedEgressRule})
}
return true
}

func (c *Controller) handleAddBanp(key string) (err error) {
Expand Down Expand Up @@ -436,7 +325,7 @@ func (c *Controller) handleDeleteBanp(banp *v1alpha1.BaselineAdminNetworkPolicy)
return nil
}

func (c *Controller) handleUpdateBanp(changed ChangedDelta) error {
func (c *Controller) handleUpdateBanp(changed *AdminNetworkPolicyChangedDelta) error {
// Only handle updates that do not affect acls.
c.banpKeyMutex.LockKey(changed.key)
defer func() { _ = c.banpKeyMutex.UnlockKey(changed.key) }()
Expand Down
Loading

0 comments on commit ac9bb9f

Please sign in to comment.