diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 5b4b18f5bc50..7c52db79d5c1 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -44,6 +44,17 @@ const ( sgsKey = "security_groups" ) +func newNamedRateLimitingQueue(rl workqueue.RateLimiter, name string) workqueue.RateLimitingInterface { + if rl == nil { + rl = workqueue.DefaultControllerRateLimiter() + } + config := workqueue.RateLimitingQueueConfig{ + Name: name, + MetricsProvider: workqueueMetricsProvider{}, + } + return workqueue.NewRateLimitingQueueWithConfig(rl, config) +} + // Controller is kube-ovn main controller that watch ns/pod/node/svc/ep and operate ovn type Controller struct { config *Configuration @@ -316,36 +327,36 @@ func Run(ctx context.Context, config *Configuration) { vpcsLister: vpcInformer.Lister(), vpcSynced: vpcInformer.Informer().HasSynced, - addOrUpdateVpcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddOrUpdateVpc"), - delVpcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteVpc"), - updateVpcStatusQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateVpcStatus"), + addOrUpdateVpcQueue: newNamedRateLimitingQueue(nil, "AddOrUpdateVpc"), + delVpcQueue: newNamedRateLimitingQueue(nil, "DeleteVpc"), + updateVpcStatusQueue: newNamedRateLimitingQueue(nil, "UpdateVpcStatus"), vpcKeyMutex: keymutex.NewHashed(numKeyLocks), vpcNatGatewayLister: vpcNatGatewayInformer.Lister(), vpcNatGatewaySynced: vpcNatGatewayInformer.Informer().HasSynced, - addOrUpdateVpcNatGatewayQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddOrUpdateVpcNatGw"), - initVpcNatGatewayQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "InitVpcNatGw"), - delVpcNatGatewayQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DeleteVpcNatGw"), - updateVpcEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateVpcEip"), - updateVpcFloatingIpQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateVpcFloatingIp"), - updateVpcDnatQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateVpcDnat"), - updateVpcSnatQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateVpcSnat"), - updateVpcSubnetQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateVpcSubnet"), + addOrUpdateVpcNatGatewayQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "AddOrUpdateVpcNatGw"), + initVpcNatGatewayQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "InitVpcNatGw"), + delVpcNatGatewayQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "DeleteVpcNatGw"), + updateVpcEipQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "UpdateVpcEip"), + updateVpcFloatingIpQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "UpdateVpcFloatingIp"), + updateVpcDnatQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "UpdateVpcDnat"), + updateVpcSnatQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "UpdateVpcSnat"), + updateVpcSubnetQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "UpdateVpcSubnet"), vpcNatGwKeyMutex: keymutex.NewHashed(numKeyLocks), subnetsLister: subnetInformer.Lister(), subnetSynced: subnetInformer.Informer().HasSynced, - addOrUpdateSubnetQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddSubnet"), - deleteSubnetQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteSubnet"), - updateSubnetStatusQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateSubnetStatus"), - syncVirtualPortsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "SyncVirtualPort"), + addOrUpdateSubnetQueue: newNamedRateLimitingQueue(nil, "AddSubnet"), + deleteSubnetQueue: newNamedRateLimitingQueue(nil, "DeleteSubnet"), + updateSubnetStatusQueue: newNamedRateLimitingQueue(nil, "UpdateSubnetStatus"), + syncVirtualPortsQueue: newNamedRateLimitingQueue(nil, "SyncVirtualPort"), subnetKeyMutex: keymutex.NewHashed(numKeyLocks), ippoolLister: ippoolInformer.Lister(), ippoolSynced: ippoolInformer.Informer().HasSynced, - addOrUpdateIPPoolQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddIPPool"), - updateIPPoolStatusQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateIPPoolStatus"), - deleteIPPoolQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteIPPool"), + addOrUpdateIPPoolQueue: newNamedRateLimitingQueue(nil, "AddIPPool"), + updateIPPoolStatusQueue: newNamedRateLimitingQueue(nil, "UpdateIPPoolStatus"), + deleteIPPoolQueue: newNamedRateLimitingQueue(nil, "DeleteIPPool"), ippoolKeyMutex: keymutex.NewHashed(numKeyLocks), ipsLister: ipInformer.Lister(), @@ -353,52 +364,52 @@ func Run(ctx context.Context, config *Configuration) { virtualIpsLister: virtualIpInformer.Lister(), virtualIpsSynced: virtualIpInformer.Informer().HasSynced, - addVirtualIpQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddVirtualIp"), - updateVirtualIpQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateVirtualIp"), - delVirtualIpQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteVirtualIp"), + addVirtualIpQueue: newNamedRateLimitingQueue(nil, "AddVirtualIp"), + updateVirtualIpQueue: newNamedRateLimitingQueue(nil, "UpdateVirtualIp"), + delVirtualIpQueue: newNamedRateLimitingQueue(nil, "DeleteVirtualIp"), iptablesEipsLister: iptablesEipInformer.Lister(), iptablesEipSynced: iptablesEipInformer.Informer().HasSynced, - addIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddIptablesEip"), - updateIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateIptablesEip"), - resetIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "ResetIptablesEip"), - delIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DeleteIptablesEip"), + addIptablesEipQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "AddIptablesEip"), + updateIptablesEipQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "UpdateIptablesEip"), + resetIptablesEipQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "ResetIptablesEip"), + delIptablesEipQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "DeleteIptablesEip"), podAnnotatedIptablesEipLister: podAnnotatedIptablesEipInformer.Lister(), podAnnotatedIptablesEipSynced: podAnnotatedIptablesEipInformer.Informer().HasSynced, - addPodAnnotatedIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddPodAnnotatedIptablesEip"), - updatePodAnnotatedIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdatePodAnnotatedIptablesEip"), - delPodAnnotatedIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DeletePodAnnotatedIptablesEip"), + addPodAnnotatedIptablesEipQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "AddPodAnnotatedIptablesEip"), + updatePodAnnotatedIptablesEipQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "UpdatePodAnnotatedIptablesEip"), + delPodAnnotatedIptablesEipQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "DeletePodAnnotatedIptablesEip"), iptablesFipsLister: iptablesFipInformer.Lister(), iptablesFipSynced: iptablesFipInformer.Informer().HasSynced, - addIptablesFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddIptablesFip"), - updateIptablesFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateIptablesFip"), - delIptablesFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DeleteIptablesFip"), + addIptablesFipQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "AddIptablesFip"), + updateIptablesFipQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "UpdateIptablesFip"), + delIptablesFipQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "DeleteIptablesFip"), podAnnotatedIptablesFipLister: podAnnotatedIptablesFipInformer.Lister(), podAnnotatedIptablesFipSynced: podAnnotatedIptablesFipInformer.Informer().HasSynced, - addPodAnnotatedIptablesFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddPodAnnotatedIptablesFip"), - updatePodAnnotatedIptablesFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdatePodAnnotatedIptablesFip"), - delPodAnnotatedIptablesFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DeletePodAnnotatedIptablesFip"), + addPodAnnotatedIptablesFipQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "AddPodAnnotatedIptablesFip"), + updatePodAnnotatedIptablesFipQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "UpdatePodAnnotatedIptablesFip"), + delPodAnnotatedIptablesFipQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "DeletePodAnnotatedIptablesFip"), iptablesDnatRulesLister: iptablesDnatRuleInformer.Lister(), iptablesDnatRuleSynced: iptablesDnatRuleInformer.Informer().HasSynced, - addIptablesDnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddIptablesDnatRule"), - updateIptablesDnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateIptablesDnatRule"), - delIptablesDnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DeleteIptablesDnatRule"), + addIptablesDnatRuleQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "AddIptablesDnatRule"), + updateIptablesDnatRuleQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "UpdateIptablesDnatRule"), + delIptablesDnatRuleQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "DeleteIptablesDnatRule"), iptablesSnatRulesLister: iptablesSnatRuleInformer.Lister(), iptablesSnatRuleSynced: iptablesSnatRuleInformer.Informer().HasSynced, - addIptablesSnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddIptablesSnatRule"), - updateIptablesSnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateIptablesSnatRule"), - delIptablesSnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DeleteIptablesSnatRule"), + addIptablesSnatRuleQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "AddIptablesSnatRule"), + updateIptablesSnatRuleQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "UpdateIptablesSnatRule"), + delIptablesSnatRuleQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "DeleteIptablesSnatRule"), vlansLister: vlanInformer.Lister(), vlanSynced: vlanInformer.Informer().HasSynced, - addVlanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddVlan"), - delVlanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DelVlan"), - updateVlanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateVlan"), + addVlanQueue: newNamedRateLimitingQueue(nil, "AddVlan"), + delVlanQueue: newNamedRateLimitingQueue(nil, "DeleteVlan"), + updateVlanQueue: newNamedRateLimitingQueue(nil, "UpdateVlan"), vlanKeyMutex: keymutex.NewHashed(numKeyLocks), providerNetworksLister: providerNetworkInformer.Lister(), @@ -406,43 +417,43 @@ func Run(ctx context.Context, config *Configuration) { podsLister: podInformer.Lister(), podsSynced: podInformer.Informer().HasSynced, - addOrUpdatePodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddOrUpdatePod"), + addOrUpdatePodQueue: newNamedRateLimitingQueue(nil, "AddOrUpdatePod"), deletePodQueue: workqueue.NewRateLimitingQueueWithDelayingInterface( workqueue.NewNamedDelayingQueue("DeletePod"), workqueue.DefaultControllerRateLimiter(), ), - updatePodSecurityQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdatePodSecurity"), + updatePodSecurityQueue: newNamedRateLimitingQueue(nil, "UpdatePodSecurity"), podKeyMutex: keymutex.NewHashed(numKeyLocks), namespacesLister: namespaceInformer.Lister(), namespacesSynced: namespaceInformer.Informer().HasSynced, - addNamespaceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddNamespace"), + addNamespaceQueue: newNamedRateLimitingQueue(nil, "AddNamespace"), nsKeyMutex: keymutex.NewHashed(numKeyLocks), nodesLister: nodeInformer.Lister(), nodesSynced: nodeInformer.Informer().HasSynced, - addNodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddNode"), - updateNodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateNode"), - deleteNodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteNode"), + addNodeQueue: newNamedRateLimitingQueue(nil, "AddNode"), + updateNodeQueue: newNamedRateLimitingQueue(nil, "UpdateNode"), + deleteNodeQueue: newNamedRateLimitingQueue(nil, "DeleteNode"), nodeKeyMutex: keymutex.NewHashed(numKeyLocks), servicesLister: serviceInformer.Lister(), serviceSynced: serviceInformer.Informer().HasSynced, - addServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddService"), - deleteServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteService"), - updateServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateService"), + addServiceQueue: newNamedRateLimitingQueue(nil, "AddService"), + deleteServiceQueue: newNamedRateLimitingQueue(nil, "DeleteService"), + updateServiceQueue: newNamedRateLimitingQueue(nil, "UpdateService"), svcKeyMutex: keymutex.NewHashed(numKeyLocks), endpointsLister: endpointInformer.Lister(), endpointsSynced: endpointInformer.Informer().HasSynced, - updateEndpointQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateEndpoint"), + updateEndpointQueue: newNamedRateLimitingQueue(nil, "UpdateEndpoint"), epKeyMutex: keymutex.NewHashed(numKeyLocks), qosPoliciesLister: qosPolicyInformer.Lister(), qosPolicySynced: qosPolicyInformer.Informer().HasSynced, - addQoSPolicyQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddQoSPolicy"), - updateQoSPolicyQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateQoSPolicy"), - delQoSPolicyQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DeleteQoSPolicy"), + addQoSPolicyQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "AddQoSPolicy"), + updateQoSPolicyQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "UpdateQoSPolicy"), + delQoSPolicyQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "DeleteQoSPolicy"), configMapsLister: configMapInformer.Lister(), configMapsSynced: configMapInformer.Informer().HasSynced, @@ -450,34 +461,34 @@ func Run(ctx context.Context, config *Configuration) { sgKeyMutex: keymutex.NewHashed(numKeyLocks), sgsLister: sgInformer.Lister(), sgSynced: sgInformer.Informer().HasSynced, - addOrUpdateSgQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateSg"), - delSgQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteSg"), - syncSgPortsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "SyncSgPorts"), + addOrUpdateSgQueue: newNamedRateLimitingQueue(nil, "UpdateSg"), + delSgQueue: newNamedRateLimitingQueue(nil, "DeleteSg"), + syncSgPortsQueue: newNamedRateLimitingQueue(nil, "SyncSgPorts"), ovnEipsLister: ovnEipInformer.Lister(), ovnEipSynced: ovnEipInformer.Informer().HasSynced, - addOvnEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddOvnEip"), - updateOvnEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateOvnEip"), - resetOvnEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "ResetOvnEip"), - delOvnEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DelOvnEip"), + addOvnEipQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "AddOvnEip"), + updateOvnEipQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "UpdateOvnEip"), + resetOvnEipQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "ResetOvnEip"), + delOvnEipQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "DelOvnEip"), ovnFipsLister: ovnFipInformer.Lister(), ovnFipSynced: ovnFipInformer.Informer().HasSynced, - addOvnFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddOvnFip"), - updateOvnFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateOvnFip"), - delOvnFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DeleteOvnFip"), + addOvnFipQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "AddOvnFip"), + updateOvnFipQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "UpdateOvnFip"), + delOvnFipQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "DeleteOvnFip"), ovnSnatRulesLister: ovnSnatRuleInformer.Lister(), ovnSnatRuleSynced: ovnSnatRuleInformer.Informer().HasSynced, - addOvnSnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddOvnSnatRule"), - updateOvnSnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateOvnSnatRule"), - delOvnSnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DelOvnSnatRule"), + addOvnSnatRuleQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "AddOvnSnatRule"), + updateOvnSnatRuleQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "UpdateOvnSnatRule"), + delOvnSnatRuleQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "DeleteOvnSnatRule"), ovnDnatRulesLister: ovnDnatRuleInformer.Lister(), ovnDnatRuleSynced: ovnDnatRuleInformer.Informer().HasSynced, - addOvnDnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddOvnDnatRule"), - updateOvnDnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateOvnDnatRule"), - delOvnDnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DeleteOvnDnatRule"), + addOvnDnatRuleQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "AddOvnDnatRule"), + updateOvnDnatRuleQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "UpdateOvnDnatRule"), + delOvnDnatRuleQueue: newNamedRateLimitingQueue(custCrdRateLimiter, "DeleteOvnDnatRule"), recorder: recorder, informerFactory: informerFactory, @@ -493,21 +504,21 @@ func Run(ctx context.Context, config *Configuration) { if config.EnableLb { controller.switchLBRuleLister = switchLBRuleInformer.Lister() controller.switchLBRuleSynced = switchLBRuleInformer.Informer().HasSynced - controller.addSwitchLBRuleQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "addSwitchLBRule") - controller.delSwitchLBRuleQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "delSwitchLBRule") - controller.UpdateSwitchLBRuleQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "updateSwitchLBRule") + controller.addSwitchLBRuleQueue = newNamedRateLimitingQueue(custCrdRateLimiter, "AddSwitchLBRule") + controller.delSwitchLBRuleQueue = newNamedRateLimitingQueue(custCrdRateLimiter, "DeleteSwitchLBRule") + controller.UpdateSwitchLBRuleQueue = newNamedRateLimitingQueue(custCrdRateLimiter, "UpdateSwitchLBRule") controller.vpcDnsLister = vpcDnsInformer.Lister() controller.vpcDnsSynced = vpcDnsInformer.Informer().HasSynced - controller.addOrUpdateVpcDnsQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddOrUpdateVpcDns") - controller.delVpcDnsQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DeleteVpcDns") + controller.addOrUpdateVpcDnsQueue = newNamedRateLimitingQueue(custCrdRateLimiter, "AddOrUpdateVpcDns") + controller.delVpcDnsQueue = newNamedRateLimitingQueue(custCrdRateLimiter, "DeleteVpcDns") } if config.EnableNP { controller.npsLister = npInformer.Lister() controller.npsSynced = npInformer.Informer().HasSynced - controller.updateNpQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateNp") - controller.deleteNpQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteNp") + controller.updateNpQueue = newNamedRateLimitingQueue(nil, "UpdateNetworkPolicy") + controller.deleteNpQueue = newNamedRateLimitingQueue(nil, "DeleteNetworkPolicy") controller.npKeyMutex = keymutex.NewHashed(numKeyLocks) }