Skip to content

Commit

Permalink
use informers and context cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
aojea committed Jul 19, 2024
1 parent d33e545 commit b26553c
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 18 deletions.
2 changes: 1 addition & 1 deletion images/kindnetd/cmd/kindnetd/cni.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type CNIConfigInputs struct {
}

// ComputeCNIConfigInputs computes the template inputs for CNIConfigWriter
func ComputeCNIConfigInputs(node corev1.Node) CNIConfigInputs {
func ComputeCNIConfigInputs(node *corev1.Node) CNIConfigInputs {

defaultRoutes := []string{"0.0.0.0/0", "::/0"}
// check if is a dualstack cluster
Expand Down
69 changes: 54 additions & 15 deletions images/kindnetd/cmd/kindnetd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@ import (
"fmt"
"net"
"os"
"os/signal"
"strings"
"syscall"
"time"

"k8s.io/apimachinery/pkg/util/sets"
"golang.org/x/sys/unix"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -75,6 +78,9 @@ func main() {
if err != nil {
panic(err.Error())
}
// use protobuf to improve performance
config.AcceptContentTypes = "application/vnd.kubernetes.protobuf,application/json"
config.ContentType = "application/vnd.kubernetes.protobuf"

// override the internal apiserver endpoint to avoid
// waiting for kube-proxy to install the services rules.
Expand All @@ -101,6 +107,31 @@ func main() {
}
klog.Infof("connected to apiserver: %s", config.Host)

// trap Ctrl+C and call cancel on the context
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)

// Enable signal handler
signalCh := make(chan os.Signal, 2)
defer func() {
close(signalCh)
cancel()
}()
signal.Notify(signalCh, os.Interrupt, unix.SIGINT)

go func() {
select {
case <-signalCh:
klog.Infof("Exiting: received signal")
cancel()
case <-ctx.Done():
}
}()

informersFactory := informers.NewSharedInformerFactory(clientset, 0)
nodeInformer := informersFactory.Core().V1().Nodes()
nodeLister := nodeInformer.Lister()

// obtain the host and pod ip addresses
// if both ips are different we are not using the host network
hostIP, podIP := os.Getenv("HOST_IP"), os.Getenv("POD_IP")
Expand Down Expand Up @@ -153,7 +184,7 @@ func main() {
panic(err.Error())
}
go func() {
if err := masqAgentIPv4.SyncRulesForever(time.Second * 60); err != nil {
if err := masqAgentIPv4.SyncRulesForever(ctx, time.Second*60); err != nil {
panic(err)
}
}()
Expand All @@ -168,7 +199,7 @@ func main() {
}

go func() {
if err := masqAgentIPv6.SyncRulesForever(time.Second * 60); err != nil {
if err := masqAgentIPv6.SyncRulesForever(ctx, time.Second*60); err != nil {
panic(err)
}
}()
Expand All @@ -178,13 +209,15 @@ func main() {
reconcileNodes := makeNodesReconciler(cniConfigWriter, hostIP, ipFamily)

// main control loop
informersFactory.Start(ctx.Done())
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
// Gets the Nodes information from the API
// TODO: use a proper controller instead
var nodes *corev1.NodeList
var nodes []*corev1.Node
var err error
for i := 0; i < 5; i++ {
nodes, err = clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
nodes, err = nodeLister.List(labels.Everything())
if err == nil {
break
}
Expand All @@ -209,14 +242,20 @@ func main() {
}

// rate limit
time.Sleep(10 * time.Second)
select {
case <-ctx.Done():
// grace period to cleanup resources
time.Sleep(1 * time.Second)
return
case <-ticker.C:
}
}
}

// nodeNodesReconciler returns a reconciliation func for nodes
func makeNodesReconciler(cniConfig *CNIConfigWriter, hostIP string, ipFamily IPFamily) func(*corev1.NodeList) error {
func makeNodesReconciler(cniConfig *CNIConfigWriter, hostIP string, ipFamily IPFamily) func([]*corev1.Node) error {
// reconciles a node
reconcileNode := func(node corev1.Node) error {
reconcileNode := func(node *corev1.Node) error {
// first get this node's IPs
// we don't support more than one IP address per IP family for simplification
nodeIPs := internalIPs(node)
Expand Down Expand Up @@ -252,7 +291,7 @@ func makeNodesReconciler(cniConfig *CNIConfigWriter, hostIP string, ipFamily IPF

// obtain the PodCIDR gateway
var nodeIPv4, nodeIPv6 string
for _, ip := range sets.List(nodeIPs) {
for _, ip := range nodeIPs.UnsortedList() {
if isIPv6String(ip) {
nodeIPv6 = ip
} else {
Expand All @@ -274,8 +313,8 @@ func makeNodesReconciler(cniConfig *CNIConfigWriter, hostIP string, ipFamily IPF
}

// return a reconciler for all the nodes
return func(nodes *corev1.NodeList) error {
for _, node := range nodes.Items {
return func(nodes []*corev1.Node) error {
for _, node := range nodes {
if err := reconcileNode(node); err != nil {
return err
}
Expand All @@ -285,7 +324,7 @@ func makeNodesReconciler(cniConfig *CNIConfigWriter, hostIP string, ipFamily IPF
}

// internalIPs returns the internal IP addresses for node
func internalIPs(node corev1.Node) sets.Set[string] {
func internalIPs(node *corev1.Node) sets.Set[string] {
ips := sets.New[string]()
// check the node.Status.Addresses
for _, address := range node.Status.Addresses {
Expand Down
11 changes: 9 additions & 2 deletions images/kindnetd/cmd/kindnetd/masq.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -55,8 +56,11 @@ type IPMasqAgent struct {
// these rules only needs to be installed once, but we run it periodically to check that are
// not deleted by an external program. It fails if can't sync the rules during 3 iterations
// TODO: aggregate errors
func (ma *IPMasqAgent) SyncRulesForever(interval time.Duration) error {
func (ma *IPMasqAgent) SyncRulesForever(ctx context.Context, interval time.Duration) error {
errs := 0
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
if err := ma.SyncRules(); err != nil {
errs++
Expand All @@ -66,7 +70,10 @@ func (ma *IPMasqAgent) SyncRulesForever(interval time.Duration) error {
} else {
errs = 0
}
time.Sleep(interval)
select {
case <-ctx.Done():
case <-ticker.C:
}
}
}

Expand Down

0 comments on commit b26553c

Please sign in to comment.