diff --git a/.github/workflows/push-images.yml b/.github/workflows/push-images.yml new file mode 100644 index 000000000000..1c4e0fd2c006 --- /dev/null +++ b/.github/workflows/push-images.yml @@ -0,0 +1,29 @@ +# This workflow will build a golang project +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go + +name: Go + +on: + push: + tags: + - "v*.*.*" + +jobs: + push-images: + runs-on: ubuntu-latest + permissions: write-all + steps: + - name: Checkout code + uses: actions/checkout@v3 + with: + fetch-depth: 0 + - name: Set up qemu + uses: docker/setup-qemu-action@v2 + - name: Login registry + run: | + echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io -u ${{ github.actor }} --password-stdin + - name: Push images + env: + ON_PLUGINS: true + run: | + make upload-images \ No newline at end of file diff --git a/.gitignore b/.gitignore index ca36a9de4b3b..da21e6672209 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,4 @@ build/ release/ vendor/ -.idea \ No newline at end of file +.idea diff --git a/Corefile b/Corefile new file mode 100644 index 000000000000..6c7d456f460b --- /dev/null +++ b/Corefile @@ -0,0 +1,19 @@ +.:53 { + errors + health { + lameduck 5s + } + ready + kubernetes kosmos.local cluster.local in-addr.arpa ip6.arpa { + pods insecure + ttl 30 + kubeconfig /Users/wangyizhi/env/kubeconfig/ecs/gy-host kubernetes-admin@cluster.local + } + hosts /etc/add-hosts/customer-hosts . { + fallthrough kosmos.local cluster.local in-addr.arpa ip6.arpa + } + prometheus :9153 + cache 30 + reload + loadbalance +} \ No newline at end of file diff --git a/Makefile b/Makefile index 43d03e207baf..f1c0b9745150 100644 --- a/Makefile +++ b/Makefile @@ -7,13 +7,27 @@ BUILDOPTS:=-v GOPATH?=$(HOME)/go MAKEPWD:=$(dir $(realpath $(firstword $(MAKEFILE_LIST)))) CGO_ENABLED?=0 +GOOS?=linux +GOARCH?=amd64 +VERSION?=latest +REGISTRY?="ghcr.io/kosmos-io" .PHONY: all all: coredns .PHONY: coredns coredns: $(CHECKS) - CGO_ENABLED=$(CGO_ENABLED) $(SYSTEM) go build $(BUILDOPTS) -ldflags="-s -w -X github.com/coredns/coredns/coremain.GitCommit=$(GITCOMMIT)" -o $(BINARY) + CGO_ENABLED=$(CGO_ENABLED) GOOS=${GOOS} GOARCH=${GOARCH} go build $(BUILDOPTS) -ldflags="-s -w -X github.com/coredns/coredns/coremain.GitCommit=$(GITCOMMIT)" -o $(BINARY) + +.PHONY: images +images: coredns + set -e;\ + docker buildx build --output=type=docker --platform ${GOOS}/${GOARCH} --tag ${REGISTRY}/coredns:${VERSION} . + +.PHONY: push-images +upload-images: images + @echo "push images to $(REGISTRY)" + docker push ${REGISTRY}/coredns:${VERSION} .PHONY: check check: core/plugin/zplugin.go core/dnsserver/zdirectives.go diff --git a/deploy/yamls/coredns-clusterrole.yaml b/deploy/yamls/coredns-clusterrole.yaml new file mode 100644 index 000000000000..8e157ac61476 --- /dev/null +++ b/deploy/yamls/coredns-clusterrole.yaml @@ -0,0 +1,15 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: kosmos-coredns +rules: +- apiGroups: + - '*' + resources: + - '*' + verbs: + - '*' +- nonResourceURLs: + - '*' + verbs: + - get \ No newline at end of file diff --git a/deploy/yamls/coredns-clusterrolebinding.yaml b/deploy/yamls/coredns-clusterrolebinding.yaml new file mode 100644 index 000000000000..db6ee56b221f --- /dev/null +++ b/deploy/yamls/coredns-clusterrolebinding.yaml @@ -0,0 +1,12 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: kosmos-coredns +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: kosmos-coredns +subjects: +- kind: ServiceAccount + name: coredns + namespace: kosmos-system \ No newline at end of file diff --git a/deploy/yamls/coredns-cm.yaml b/deploy/yamls/coredns-cm.yaml new file mode 100644 index 000000000000..b1bf7cbe8d8f --- /dev/null +++ b/deploy/yamls/coredns-cm.yaml @@ -0,0 +1,29 @@ +apiVersion: v1 +data: + Corefile: | + .:53 { + errors + health { + lameduck 5s + } + ready + rewrite stop { + name regex (.*).kosmos.local {1}.cluster.local + answer name (.*).cluster.local {1}.kosmos.local + } + kubernetes cluster.local in-addr.arpa ip6.arpa { + pods insecure + ttl 30 + } + hosts /etc/add-hosts/customer-hosts . { + fallthrough cluster.local in-addr.arpa ip6.arpa + } + prometheus :9153 + cache 30 + reload + loadbalance + } +kind: ConfigMap +metadata: + name: coredns + namespace: kosmos-system diff --git a/deploy/yamls/coredns-customer-hosts-cm.yaml b/deploy/yamls/coredns-customer-hosts-cm.yaml new file mode 100644 index 000000000000..275478ff0f64 --- /dev/null +++ b/deploy/yamls/coredns-customer-hosts-cm.yaml @@ -0,0 +1,9 @@ +apiVersion: v1 +data: + customer-hosts: | + #customer-hosts + #10.10.10.10 myhost +kind: ConfigMap +metadata: + name: coredns-customer-hosts + namespace: kosmos-system diff --git a/deploy/yamls/coredns-deploy.yaml b/deploy/yamls/coredns-deploy.yaml new file mode 100644 index 000000000000..a6ff5434a397 --- /dev/null +++ b/deploy/yamls/coredns-deploy.yaml @@ -0,0 +1,130 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + kosmos.io/app: coredns + name: coredns + namespace: kosmos-system +spec: + progressDeadlineSeconds: 600 + replicas: 2 + revisionHistoryLimit: 10 + selector: + matchLabels: + kosmos.io/app: coredns + strategy: + rollingUpdate: + maxSurge: 25% + maxUnavailable: 1 + type: RollingUpdate + template: + metadata: + creationTimestamp: null + labels: + kosmos.io/app: coredns + spec: + containers: + - args: + - -conf + - /etc/coredns/Corefile + image: ghcr.io/kosmos-io/coredns:v1.8.0 + imagePullPolicy: Always + livenessProbe: + failureThreshold: 5 + httpGet: + path: /health + port: 8080 + scheme: HTTP + initialDelaySeconds: 60 + periodSeconds: 10 + successThreshold: 1 + timeoutSeconds: 5 + name: coredns + ports: + - containerPort: 53 + name: dns + protocol: UDP + - containerPort: 53 + name: dns-tcp + protocol: TCP + - containerPort: 9153 + name: metrics + protocol: TCP + readinessProbe: + failureThreshold: 3 + httpGet: + path: /ready + port: 8181 + scheme: HTTP + periodSeconds: 10 + successThreshold: 1 + timeoutSeconds: 1 + resources: + limits: + cpu: 2000m + memory: 2560Mi + requests: + cpu: 1000m + memory: 1280Mi + securityContext: + allowPrivilegeEscalation: false + capabilities: + add: + - NET_BIND_SERVICE + drop: + - all + readOnlyRootFilesystem: true + terminationMessagePath: /dev/termination-log + terminationMessagePolicy: File + volumeMounts: + - mountPath: /etc/coredns + name: config-volume + readOnly: true + - mountPath: /etc/add-hosts + name: customer-hosts + readOnly: true + dnsPolicy: Default + priorityClassName: system-cluster-critical + restartPolicy: Always + schedulerName: default-scheduler + securityContext: {} + serviceAccount: coredns + serviceAccountName: coredns + terminationGracePeriodSeconds: 30 + affinity: + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchLabels: + kosmos.io/app: coredns + topologyKey: kubernetes.io/hostname + tolerations: + - effect: NoSchedule + key: node-role.kubernetes.io/master + operator: Exists + - effect: NoSchedule + key: node.kubernetes.io/unschedulable + operator: Exists + - effect: NoExecute + key: node.kubernetes.io/not-ready + operator: Exists + tolerationSeconds: 300 + - effect: NoExecute + key: node.kubernetes.io/unreachable + operator: Exists + tolerationSeconds: 300 + volumes: + - configMap: + defaultMode: 420 + items: + - key: Corefile + path: Corefile + name: coredns + name: config-volume + - configMap: + defaultMode: 420 + items: + - key: customer-hosts + path: customer-hosts + name: coredns-customer-hosts + name: customer-hosts diff --git a/deploy/yamls/coredns-sa.yaml b/deploy/yamls/coredns-sa.yaml new file mode 100644 index 000000000000..d8467b6efb90 --- /dev/null +++ b/deploy/yamls/coredns-sa.yaml @@ -0,0 +1,5 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: coredns + namespace: kosmos-system diff --git a/deploy/yamls/coredns-svc.yaml b/deploy/yamls/coredns-svc.yaml new file mode 100644 index 000000000000..faeb6e167c51 --- /dev/null +++ b/deploy/yamls/coredns-svc.yaml @@ -0,0 +1,28 @@ +apiVersion: v1 +kind: Service +metadata: + labels: + kosmos.io/app: coredns + name: coredns + namespace: kosmos-system +spec: + ipFamilies: + - IPv4 + ipFamilyPolicy: SingleStack + ports: + - name: dns + port: 53 + protocol: UDP + targetPort: 53 + - name: dns-tcp + port: 53 + protocol: TCP + targetPort: 53 + - name: metrics + port: 9153 + protocol: TCP + targetPort: 9153 + selector: + kosmos.io/app: coredns + sessionAffinity: None + type: ClusterIP diff --git a/plugin/kubernetes/cluster_controller.go b/plugin/kubernetes/cluster_controller.go new file mode 100644 index 000000000000..b88aea8528d3 --- /dev/null +++ b/plugin/kubernetes/cluster_controller.go @@ -0,0 +1,507 @@ +package kubernetes + +import ( + "context" + "encoding/base64" + "fmt" + "strings" + "sync" + "time" + + "github.com/coredns/coredns/plugin/kubernetes/object" + + api "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/workqueue" +) + +const ( + ClusterGroup = "kosmos.io" + ClusterVersion = "v1alpha1" + ClusterResource = "clusters" + //ClusterKind = "Cluster" + + ClusterNameIndex = "Name" + + KosmosClusterGlobalMapPath = "spec/globalCIDRsMap" + KosmosClusterKubeconfigPath = "spec/kubeconfig" +) + +const ( + maxRetries = 10 +) + +type clusterController struct { + ctx context.Context + + client dynamic.Interface + queue workqueue.RateLimitingInterface + + // stopLock is used to enforce only a single call to Stop is active. + // Needed because we allow stopping through an http endpoint and + // allowing concurrent stoppers leads to stack traces. + stopLock sync.Mutex + shutdown bool + stopCh chan struct{} + + dnsControllersLock sync.RWMutex + dnsControllers map[string]*dnsControl + dncControllersWaitGroup wait.Group + dnsOpts dnsControlOpts + + clusterController cache.Controller + clusterLister cache.Indexer +} + +func newClusterController(ctx context.Context, client dynamic.Interface, opts dnsControlOpts) *clusterController { + queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + c := clusterController{ + ctx: ctx, + stopCh: make(chan struct{}), + client: client, + queue: queue, + dnsOpts: opts, + dnsControllers: make(map[string]*dnsControl), + } + + var identity object.ToFunc + identity = func(obj meta.Object) (meta.Object, error) { + return obj, nil + } + + c.clusterLister, c.clusterController = object.NewIndexerInformer( + &cache.ListWatch{ + ListFunc: clusterListFunc(ctx, c.client), + WatchFunc: clusterWatchFunc(ctx, c.client), + }, + &unstructured.Unstructured{}, + cache.ResourceEventHandlerFuncs{AddFunc: c.addCluster, UpdateFunc: c.updateCluster, DeleteFunc: c.deleteCluster}, + cache.Indexers{ClusterNameIndex: clusterNameIndexFunc}, + object.DefaultProcessor(identity, nil), + ) + + return &c +} + +func (c *clusterController) ServiceList() []*object.Service { + var arr []*object.Service + for _, ctrl := range c.dnsControllers { + svcList := ctrl.ServiceList() + if len(svcList) > 0 { + arr = append(arr, svcList...) + } + } + return arr +} + +func (c *clusterController) EndpointsList() []*object.Endpoints { + var arr []*object.Endpoints + for _, ctrl := range c.dnsControllers { + epList := ctrl.EndpointsList() + if len(epList) > 0 { + arr = append(arr, epList...) + } + } + return arr +} + +func (c *clusterController) SvcIndex(s string) []*object.Service { + for _, ctrl := range c.dnsControllers { + svcList := ctrl.SvcIndex(s) + if len(svcList) > 0 { + return svcList + } + } + return nil +} + +func (c *clusterController) SvcIndexReverse(ip string) []*object.Service { + for _, ctrl := range c.dnsControllers { + svcList := ctrl.SvcIndexReverse(ip) + if len(svcList) > 0 { + return svcList + } + } + return nil +} + +func (c *clusterController) SvcExtIndexReverse(ip string) []*object.Service { + for _, ctrl := range c.dnsControllers { + svcList := ctrl.SvcExtIndexReverse(ip) + if len(svcList) > 0 { + return svcList + } + } + return nil +} + +func (c *clusterController) PodIndex(s string) []*object.Pod { + for _, ctrl := range c.dnsControllers { + podList := ctrl.PodIndex(s) + if len(podList) > 0 { + return podList + } + } + return nil +} + +func (c *clusterController) EpIndex(s string) []*object.Endpoints { + for _, ctrl := range c.dnsControllers { + epList := ctrl.EpIndex(s) + if len(epList) > 0 { + return epList + } + } + return nil +} + +func (c *clusterController) EpIndexReverse(s string) []*object.Endpoints { + for _, ctrl := range c.dnsControllers { + epList := ctrl.EpIndexReverse(s) + if len(epList) > 0 { + return epList + } + } + return nil +} + +func (c *clusterController) GetNodeByName(ctx context.Context, s string) (*api.Node, error) { + for _, ctrl := range c.dnsControllers { + node, _ := ctrl.GetNodeByName(ctx, s) + if node != nil { + return node, nil + } + } + return nil, fmt.Errorf("node not found") +} + +func (c *clusterController) GetNamespaceByName(s string) (*object.Namespace, error) { + for _, ctrl := range c.dnsControllers { + ns, _ := ctrl.GetNamespaceByName(s) + if ns != nil { + return ns, nil + } + } + return nil, fmt.Errorf("namespace not found") +} + +func (c *clusterController) Run() { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + + log.Info("starting cluster controller") + defer log.Info("shutting down cluster controller") + + go c.clusterController.Run(c.stopCh) + if !cache.WaitForCacheSync(c.stopCh, c.clusterController.HasSynced) { + log.Errorf("failed to wait for caches to sync") + return + } + + go wait.Until(c.worker, time.Second, c.stopCh) + <-c.stopCh +} + +func (c *clusterController) HasSynced() bool { + if !c.clusterController.HasSynced() { + return false + } + + clusters := c.clusterLister.List() + if len(c.dnsControllers) != len(clusters) { + return false + } + + for cluster, ctrl := range c.dnsControllers { + if !ctrl.HasSynced() { + log.Infof("waiting for cluster %s to sync", cluster) + return false + } + } + + return true +} + +func (c *clusterController) Stop() error { + c.stopLock.Lock() + defer c.stopLock.Unlock() + + // Only try draining the workqueue if we haven't already. + if !c.shutdown { + close(c.stopCh) + c.shutdown = true + return nil + } + + return fmt.Errorf("shutdown already in progress") +} + +func (c *clusterController) Modified(external bool) int64 { + var m int64 + for _, ctrl := range c.dnsControllers { + modified := ctrl.Modified(external) + if modified > m { + m = modified + } + } + return m +} + +func (c *clusterController) addCluster(obj interface{}) { + c.enqueue(obj) +} + +func (c *clusterController) deleteCluster(obj interface{}) { + c.enqueue(obj) +} + +func (c *clusterController) updateCluster(older, newer interface{}) { + oldObj := older.(*unstructured.Unstructured) + newObj := newer.(*unstructured.Unstructured) + + var cidrsChanged bool + oldCIDRsMap, _, err := unstructured.NestedStringMap(oldObj.Object, strings.Split(KosmosClusterGlobalMapPath, "/")...) + newCIDRsMap, _, err := unstructured.NestedStringMap(oldObj.Object, strings.Split(KosmosClusterGlobalMapPath, "/")...) + if err == nil && equality.Semantic.DeepEqual(oldCIDRsMap, newCIDRsMap) { + cidrsChanged = true + } + + var kubeconfigChanged bool + oldConfig, _, err := unstructured.NestedString(oldObj.Object, strings.Split(KosmosClusterKubeconfigPath, "/")...) + newConfig, _, err := unstructured.NestedString(oldObj.Object, strings.Split(KosmosClusterKubeconfigPath, "/")...) + if err == nil && equality.Semantic.DeepEqual(oldConfig, newConfig) { + cidrsChanged = true + } + + if newObj.GetDeletionTimestamp().IsZero() && + !cidrsChanged && + !kubeconfigChanged { + return + } + + c.enqueue(newer) +} + +func (c *clusterController) enqueue(obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + return + } + + c.queue.Add(key) +} + +func (c *clusterController) worker() { + for c.processNextCluster() { + select { + case <-c.stopCh: + return + default: + } + } +} + +func (c *clusterController) processNextCluster() bool { + key, shutdown := c.queue.Get() + if shutdown { + return false + } + + defer c.queue.Done(key) + + err := c.syncCluster(key.(string)) + c.handleErr(err, key) + + return true +} + +func (c *clusterController) cleanControllerByClusterName(clusterName string) error { + c.dnsControllersLock.RLock() + ctrl := c.dnsControllers[clusterName] + c.dnsControllersLock.RUnlock() + + if ctrl != nil { + err := ctrl.Stop() + if err != nil { + return fmt.Errorf("cannot stop dns controller, cluster: %s", clusterName) + } + + c.dnsControllersLock.Lock() + delete(c.dnsControllers, clusterName) + c.dnsControllersLock.Unlock() + } + return nil +} + +func (c *clusterController) syncCluster(key string) error { + log.Infof("Start to sync cluster: %v", key) + + gvr := schema.GroupVersionResource{ + Group: ClusterGroup, + Version: ClusterVersion, + Resource: ClusterResource, + } + + obj, exists, err := c.clusterLister.GetByKey(key) + if !exists { + log.Infof("cluster has been deleted, cluster: %s", key) + return nil + } + if err != nil { + log.Errorf("cannot get cluster by key: %v, err: %v", key, err) + return nil + } + + cluster, ok := obj.(*unstructured.Unstructured) + if !ok { + log.Errorf("failed to convert cluster") + return nil + } + + clusterName := cluster.GetName() + + if !cluster.GetDeletionTimestamp().IsZero() { + log.Infof("try to stop cluster informer %s", cluster.GetName()) + + if !ContainsFinalizer(cluster, corednsFinalizer) { + return nil + } + + err = c.cleanControllerByClusterName(clusterName) + if err != nil { + return err + } + + RemoveFinalizer(cluster, corednsFinalizer) + if _, err := c.client.Resource(gvr).Update(context.TODO(), cluster, meta.UpdateOptions{}); err != nil { + return fmt.Errorf("failed to remove finalizer, err: %v", err) + } + + log.Infof("successfully removed cluster %s", clusterName) + + return nil + } + + if !ContainsFinalizer(cluster, corednsFinalizer) { + AddFinalizer(cluster, corednsFinalizer) + if _, err := c.client.Resource(gvr).Update(context.TODO(), cluster, meta.UpdateOptions{}); err != nil { + return fmt.Errorf("failed to add finalizer, err: %v", err) + } + } + + err = c.cleanControllerByClusterName(clusterName) + if err != nil { + log.Warningf("clean controllers failed, cluster: %s", clusterName) + } + + config, err := buildRESTConfig(cluster) + if err != nil { + return err + } + + client, err := kubernetes.NewForConfig(config) + if err != nil { + return fmt.Errorf("cannot get init clientset, err: %v", err) + } + + dnsCtrl := newdnsController(c.ctx, client, c.dnsOpts, cluster) + + c.dncControllersWaitGroup.Start(dnsCtrl.Run) + + if !cache.WaitForCacheSync(c.stopCh, dnsCtrl.HasSynced) { + return fmt.Errorf("failed to wait for cluster %s to sync", clusterName) + } + + c.dnsControllersLock.Lock() + c.dnsControllers[clusterName] = dnsCtrl + c.dnsControllersLock.Unlock() + + log.Infof("cluster %s has synced", clusterName) + + return nil +} + +func (c *clusterController) handleErr(err error, key interface{}) { + if err == nil { + c.queue.Forget(key) + return + } + + if c.queue.NumRequeues(key) < maxRetries { + log.Info("Error syncing cluster, retrying.", "key: ", key, "error: ", err) + c.queue.AddRateLimited(key) + return + } + + log.Info("Dropping cluster out of the queue", "key: ", key, "error: ", err) + c.queue.Forget(key) + utilruntime.HandleError(err) +} + +func clusterNameIndexFunc(obj interface{}) ([]string, error) { + o, ok := obj.(*unstructured.Unstructured) + if !ok { + return nil, errObj + } + return []string{o.GetName()}, nil +} + +func clusterListFunc(ctx context.Context, c dynamic.Interface) func(meta.ListOptions) (runtime.Object, error) { + return func(opts meta.ListOptions) (runtime.Object, error) { + gvr := schema.GroupVersionResource{ + Group: ClusterGroup, + Version: ClusterVersion, + Resource: ClusterResource, + } + listV1, err := c.Resource(gvr).List(ctx, opts) + return listV1, err + } +} + +func clusterWatchFunc(ctx context.Context, c dynamic.Interface) func(options meta.ListOptions) (watch.Interface, error) { + return func(options meta.ListOptions) (watch.Interface, error) { + gvr := schema.GroupVersionResource{ + Group: ClusterGroup, + Version: ClusterVersion, + Resource: ClusterResource, + } + w, err := c.Resource(gvr).Watch(ctx, options) + return w, err + } +} + +func buildRESTConfig(cluster *unstructured.Unstructured) (*rest.Config, error) { + kubeconfig, isString, err := unstructured.NestedString(cluster.Object, "spec", "kubeconfig") + if !isString || err != nil { + return nil, fmt.Errorf("cannot find kubeconfig by cluster : %s", cluster.GetName()) + } + + decodeConfig, err := base64.StdEncoding.DecodeString(kubeconfig) + if !isString || err != nil { + return nil, fmt.Errorf("failed to decode kubeconfig, cluster : %s, kubeconfig: %s", cluster.GetName(), kubeconfig) + } + + clientConfig, err := clientcmd.NewClientConfigFromBytes(decodeConfig) + if err != nil { + return nil, fmt.Errorf("cannot convert cluster kubeconfig, cluster: %s, kubeconfig: %s", cluster.GetName(), decodeConfig) + } + + config, err := clientConfig.ClientConfig() + if err != nil { + return nil, fmt.Errorf("cannot get rest config, err: %v", err) + } + + return config, nil +} diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go index e7db294fcc5a..26ce94f09427 100644 --- a/plugin/kubernetes/controller.go +++ b/plugin/kubernetes/controller.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "strings" "sync" "sync/atomic" "time" @@ -13,6 +14,7 @@ import ( api "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" @@ -84,6 +86,8 @@ type dnsControl struct { zones []string endpointNameMode bool + + clusterInfo *unstructured.Unstructured } type dnsControlOpts struct { @@ -102,7 +106,7 @@ type dnsControlOpts struct { } // newdnsController creates a controller for CoreDNS. -func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts dnsControlOpts) *dnsControl { +func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts dnsControlOpts, cluster *unstructured.Unstructured) *dnsControl { dns := dnsControl{ client: kubeClient, selector: opts.selector, @@ -110,8 +114,17 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts stopCh: make(chan struct{}), zones: opts.zones, endpointNameMode: opts.endpointNameMode, + clusterInfo: cluster, + } + + var cidrsMap map[string]string + obj, exists, err := unstructured.NestedStringMap(cluster.Object, strings.Split(KosmosClusterGlobalMapPath, "/")...) + if exists && err == nil { + cidrsMap = obj } + // func ToService(obj meta.Object, cidrsMap map[string]string) (meta.Object, error) + dns.svcLister, dns.svcController = object.NewIndexerInformer( &cache.ListWatch{ ListFunc: serviceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector), @@ -120,7 +133,9 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts &api.Service{}, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc, svcIPIndex: svcIPIndexFunc, svcExtIPIndex: svcExtIPIndexFunc}, - object.DefaultProcessor(object.ToService, nil), + object.DefaultProcessor(func(obj meta.Object) (meta.Object, error) { + return object.ToService(obj, cidrsMap) + }, nil), ) podLister, podController := object.NewIndexerInformer( @@ -131,7 +146,9 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts &api.Pod{}, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.Indexers{podIPIndex: podIPIndexFunc}, - object.DefaultProcessor(object.ToPod, nil), + object.DefaultProcessor(func(obj meta.Object) (meta.Object, error) { + return object.ToPod(obj, cidrsMap) + }, nil), ) dns.podLister = podLister if opts.initPodCache { @@ -146,7 +163,9 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts &discovery.EndpointSlice{}, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc}, - object.DefaultProcessor(object.EndpointSliceToEndpoints, dns.EndpointSliceLatencyRecorder()), + object.DefaultProcessor(func(obj meta.Object) (meta.Object, error) { + return object.EndpointSliceToEndpoints(obj, cidrsMap) + }, dns.EndpointSliceLatencyRecorder()), ) dns.epLister = epLister if opts.initEndpointsCache { diff --git a/plugin/kubernetes/controller_test.go b/plugin/kubernetes/controller_test.go index c36ab6605340..855a28372eba 100644 --- a/plugin/kubernetes/controller_test.go +++ b/plugin/kubernetes/controller_test.go @@ -15,6 +15,7 @@ import ( api "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" ) @@ -34,7 +35,7 @@ func kubernetesWithFakeClient(ctx context.Context, zone, cidr string, initEndpoi zones: []string{zone}, initEndpointsCache: initEndpointsCache, } - controller := newdnsController(ctx, client, dco) + controller := newdnsController(ctx, client, dco, &unstructured.Unstructured{}) // Add resources _, err := client.CoreV1().Namespaces().Create(ctx, &api.Namespace{ObjectMeta: meta.ObjectMeta{Name: "testns"}}, meta.CreateOptions{}) diff --git a/plugin/kubernetes/finalizer_helper.go b/plugin/kubernetes/finalizer_helper.go new file mode 100644 index 000000000000..03e1b8809e30 --- /dev/null +++ b/plugin/kubernetes/finalizer_helper.go @@ -0,0 +1,45 @@ +package kubernetes + +import "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + +const corednsFinalizer = "clusterlink.io/coredns" + +type FinalizerHelper interface { + ContainsFinalizer(obj *unstructured.Unstructured, finalizer string) bool + RemoveFinalizer(obj *unstructured.Unstructured, finalizer string) bool + AddFinalizer(obj *unstructured.Unstructured, finalizer string) bool +} + +func ContainsFinalizer(obj *unstructured.Unstructured, finalizer string) bool { + finalizers := obj.GetFinalizers() + for _, str := range finalizers { + if str == finalizer { + return true + } + } + return false +} + +func RemoveFinalizer(obj *unstructured.Unstructured, finalizer string) (finalizersUpdated bool) { + finalizers := obj.GetFinalizers() + for i := 0; i < len(finalizers); i++ { + if finalizers[i] == finalizer { + finalizers = append(finalizers[:i], finalizers[i+1:]...) + i-- + finalizersUpdated = true + } + } + obj.SetFinalizers(finalizers) + return +} + +func AddFinalizer(obj *unstructured.Unstructured, finalizer string) (finalizersUpdated bool) { + finalizers := obj.GetFinalizers() + for _, e := range finalizers { + if e == finalizer { + return false + } + } + obj.SetFinalizers(append(finalizers, finalizer)) + return true +} diff --git a/plugin/kubernetes/informer_test.go b/plugin/kubernetes/informer_test.go index ee5186a947f2..ac5f64023fa0 100644 --- a/plugin/kubernetes/informer_test.go +++ b/plugin/kubernetes/informer_test.go @@ -12,7 +12,9 @@ import ( ) func TestDefaultProcessor(t *testing.T) { - pbuild := object.DefaultProcessor(object.ToService, nil) + pbuild := object.DefaultProcessor(func(obj metav1.Object) (metav1.Object, error) { + return object.ToService(obj, nil) + }, nil) reh := cache.ResourceEventHandlerFuncs{} idx := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{}) processor := pbuild(idx, reh) diff --git a/plugin/kubernetes/kubernetes.go b/plugin/kubernetes/kubernetes.go index cea23d860faa..d3d53d5f8b83 100644 --- a/plugin/kubernetes/kubernetes.go +++ b/plugin/kubernetes/kubernetes.go @@ -20,7 +20,7 @@ import ( api "k8s.io/api/core/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/client-go/kubernetes" + "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" @@ -228,9 +228,14 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (onStart func() error, o return nil, nil, err } - kubeClient, err := kubernetes.NewForConfig(config) + //kubeClient, err := kubernetes.NewForConfig(config) + //if err != nil { + // return fmt.Errorf("failed to create kubernetes notification controller: %q", err) + //} + + dynamicClient, err := dynamic.NewForConfig(config) if err != nil { - return nil, nil, fmt.Errorf("failed to create kubernetes notification controller: %q", err) + return nil, nil, fmt.Errorf("failed to create dynamic client: %q", err) } if k.opts.labelSelector != nil { @@ -256,7 +261,7 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (onStart func() error, o k.opts.zones = k.Zones k.opts.endpointNameMode = k.endpointNameMode - k.APIConn = newdnsController(ctx, kubeClient, k.opts) + k.APIConn = newClusterController(ctx, dynamicClient, k.opts) onStart = func() error { go func() { diff --git a/plugin/kubernetes/object/endpoint.go b/plugin/kubernetes/object/endpoint.go index 26555e1ac00a..16cb229b7919 100644 --- a/plugin/kubernetes/object/endpoint.go +++ b/plugin/kubernetes/object/endpoint.go @@ -3,6 +3,8 @@ package object import ( "fmt" + "github.com/coredns/coredns/plugin/pkg/netmap" + discovery "k8s.io/api/discovery/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -47,7 +49,7 @@ type EndpointPort struct { func EndpointsKey(name, namespace string) string { return name + "." + namespace } // EndpointSliceToEndpoints converts a *discovery.EndpointSlice to a *Endpoints. -func EndpointSliceToEndpoints(obj meta.Object) (meta.Object, error) { +func EndpointSliceToEndpoints(obj meta.Object, cidrsMap map[string]string) (meta.Object, error) { ends, ok := obj.(*discovery.EndpointSlice) if !ok { return nil, fmt.Errorf("unexpected object %v", obj) @@ -87,7 +89,16 @@ func EndpointSliceToEndpoints(obj meta.Object) (meta.Object, error) { if !endpointsliceReady(end.Conditions.Ready) { continue } - for _, a := range end.Addresses { + for _, ip := range end.Addresses { + a := ip + if cidrsMap != nil { + mappedIP, err := netmap.NetMap(ip, cidrsMap) + if err != nil { + return nil, fmt.Errorf("failed to map ip, err: %v, epIP: %s", err, ip) + } + a = mappedIP + } + ea := EndpointAddress{IP: a} if end.Hostname != nil { ea.Hostname = *end.Hostname diff --git a/plugin/kubernetes/object/pod.go b/plugin/kubernetes/object/pod.go index 9b9d5641c982..e1ce19194b62 100644 --- a/plugin/kubernetes/object/pod.go +++ b/plugin/kubernetes/object/pod.go @@ -3,6 +3,7 @@ package object import ( "errors" "fmt" + "github.com/coredns/coredns/plugin/pkg/netmap" api "k8s.io/api/core/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -23,7 +24,7 @@ type Pod struct { var errPodTerminating = errors.New("pod terminating") // ToPod converts an api.Pod to a *Pod. -func ToPod(obj meta.Object) (meta.Object, error) { +func ToPod(obj meta.Object, cidrsMap map[string]string) (meta.Object, error) { apiPod, ok := obj.(*api.Pod) if !ok { return nil, fmt.Errorf("unexpected object %v", obj) @@ -34,6 +35,15 @@ func ToPod(obj meta.Object) (meta.Object, error) { Namespace: apiPod.GetNamespace(), Name: apiPod.GetName(), } + + if cidrsMap != nil && pod.PodIP != "" { + mappedIP, err := netmap.NetMap(pod.PodIP, cidrsMap) + if err != nil { + return nil, fmt.Errorf("failed to map ip, err: %v, podIP: %s", err, pod.PodIP) + } + pod.PodIP = mappedIP + } + t := apiPod.ObjectMeta.DeletionTimestamp if t != nil && !(*t).Time.IsZero() { // if the pod is in the process of termination, return an error so it can be ignored diff --git a/plugin/kubernetes/object/service.go b/plugin/kubernetes/object/service.go index bd3e3d335013..903a86682437 100644 --- a/plugin/kubernetes/object/service.go +++ b/plugin/kubernetes/object/service.go @@ -3,6 +3,8 @@ package object import ( "fmt" + "github.com/coredns/coredns/plugin/pkg/netmap" + api "k8s.io/api/core/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -30,7 +32,7 @@ type Service struct { func ServiceKey(name, namespace string) string { return name + "." + namespace } // ToService converts an api.Service to a *Service. -func ToService(obj meta.Object) (meta.Object, error) { +func ToService(obj meta.Object, cidrsMap map[string]string) (meta.Object, error) { svc, ok := obj.(*api.Service) if !ok { return nil, fmt.Errorf("unexpected object %v", obj) @@ -46,11 +48,36 @@ func ToService(obj meta.Object) (meta.Object, error) { ExternalIPs: make([]string, len(svc.Status.LoadBalancer.Ingress)+len(svc.Spec.ExternalIPs)), } + // global ip map + clusterIPs := svc.Spec.ClusterIPs + clusterIP := svc.Spec.ClusterIP + if cidrsMap != nil { + if len(svc.Spec.ClusterIPs) > 0 { + clusterIPs = []string{} + for _, ip := range svc.Spec.ClusterIPs { + mappedIP, err := netmap.NetMap(ip, cidrsMap) + if err != nil { + return nil, fmt.Errorf("failed to map ip, err: %v, clusterIP: %s", err, ip) + } else { + clusterIPs = append(clusterIPs, mappedIP) + } + } + + } + if svc.Spec.ClusterIP != "" && svc.Spec.ClusterIP != "None" { + mappedIP, err := netmap.NetMap(svc.Spec.ClusterIP, cidrsMap) + if err != nil { + return nil, fmt.Errorf("failed to map ip, err: %v, clusterIP: %s", err, svc.Spec.ClusterIP) + } + clusterIP = mappedIP + } + } + if len(svc.Spec.ClusterIPs) > 0 { - s.ClusterIPs = make([]string, len(svc.Spec.ClusterIPs)) - copy(s.ClusterIPs, svc.Spec.ClusterIPs) + s.ClusterIPs = make([]string, len(clusterIPs)) + copy(s.ClusterIPs, clusterIPs) } else { - s.ClusterIPs = []string{svc.Spec.ClusterIP} + s.ClusterIPs = []string{clusterIP} } if len(svc.Spec.Ports) == 0 { diff --git a/plugin/pkg/netmap/netmap.go b/plugin/pkg/netmap/netmap.go new file mode 100644 index 000000000000..5d49ebe28ff8 --- /dev/null +++ b/plugin/pkg/netmap/netmap.go @@ -0,0 +1,105 @@ +package netmap + +import ( + "encoding/binary" + "fmt" + "net" +) + +type IPType int + +const ( + IPV4 IPType = iota + IPV6 +) + +func GetIPType(s string) IPType { + for i := 0; i < len(s); i++ { + switch s[i] { + case '.': + return IPV4 + case ':': + return IPV6 + } + } + return -1 +} + +func NetMap(ipStr string, cidrsMap map[string]string) (string, error) { + ip := net.ParseIP(ipStr) + if ip == nil { + return ipStr, nil + } + for src, dest := range cidrsMap { + _, srcNet, err := net.ParseCIDR(src) + if err != nil { + return "", err + } + + if GetIPType(ipStr) != GetIPType(src) { + continue + } + + if !srcNet.Contains(ip) { + continue + } + + _, destNet, err := net.ParseCIDR(dest) + if err != nil { + return "", err + } + + srcBits, _ := srcNet.Mask.Size() + destBits, _ := destNet.Mask.Size() + if srcBits != destBits { + return "", fmt.Errorf("the subnet masks of srcCIDR and destCIDR of CIDRsMap need to be the same") + } + + var changeIPNet func(ip net.IP, destNet net.IPNet) (net.IP, error) + + if GetIPType(ipStr) == IPV4 { + changeIPNet = changeIPNetIPV4 + } else { + changeIPNet = changeIPNetIPV6 + } + + newIP, err := changeIPNet(ip, *destNet) + if err != nil { + return "", err + } + return newIP.String(), nil + } + + return ip.String(), nil +} + +func changeIPNetIPV4(ip net.IP, destNet net.IPNet) (net.IP, error) { + ipBytes := ip.To4() + destNetBytes := destNet.IP.To4() + maskSize, _ := destNet.Mask.Size() + + ipBits := binary.BigEndian.Uint32(ipBytes) + destNetBits := binary.BigEndian.Uint32(destNetBytes) + + v := ((destNetBits >> (32 - maskSize)) << (32 - maskSize)) | ((ipBits << maskSize) >> maskSize) + + newIP := make(net.IP, 4) + binary.BigEndian.PutUint32(newIP, v) + + return newIP, nil +} + +func changeIPNetIPV6(ip net.IP, destNet net.IPNet) (net.IP, error) { + ipBytes := []byte(ip) + maskBytes := []byte(destNet.Mask) + destIPBytes := []byte(destNet.IP) + + targetIP := make(net.IP, len(ipBytes)) + + for k, _ := range ipBytes { + invertedMask := maskBytes[k] ^ 0xff + targetIP[k] = (invertedMask & ipBytes[k]) | (destIPBytes[k] & maskBytes[k]) + } + + return targetIP, nil +} diff --git a/test/netmap_test.go b/test/netmap_test.go new file mode 100644 index 000000000000..4b8af2ef4e1d --- /dev/null +++ b/test/netmap_test.go @@ -0,0 +1,119 @@ +package test + +import ( + "strings" + "testing" + + "github.com/coredns/coredns/plugin/pkg/netmap" +) + +func TestNetMap(t *testing.T) { + tests := []struct { + name string + ip string + cidrsMap map[string]string + want string + }{ + { + name: "ipv4-1", + ip: "10.234.0.1", + cidrsMap: map[string]string{ + "10.234.0.0/16": "10.238.0.0/16", + }, + want: "10.238.0.1", + }, + { + name: "ipv4-2", + ip: "10.234.12.13", + cidrsMap: map[string]string{ + "10.232.0.0/16": "10.239.0.0/16", + "10.234.0.0/16": "10.238.0.0/16", + }, + want: "10.238.12.13", + }, + { + name: "ipv4-3", + ip: "10.233.12.13", + cidrsMap: map[string]string{ + "10.234.0.0/16": "10.238.0.0/16", + }, + want: "10.233.12.13", + }, + { + name: "ipv4-4", + ip: "10.234.12.13", + cidrsMap: map[string]string{ + "10.234.0.0/16": "10.238.0.0/18", + }, + want: "", + }, + { + name: "ipv4-5", + ip: "10.234.12.13", + cidrsMap: map[string]string{ + "10.234.0.0/16": "10.238.1.2/16", + }, + want: "10.238.12.13", + }, + { + name: "ipv4-6", + ip: "None", + cidrsMap: map[string]string{ + "10.234.0.0/16": "10.238.0.0/18", + }, + want: "None", + }, + { + name: "ipv6-1", + ip: "2001:0:0:CD30::1", + cidrsMap: map[string]string{ + "2001:0:0:CD30::/60": "4292:0:0:CD30::/60", + }, + want: "4292:0:0:CD30::1", + }, + { + name: "ipv6-2", + ip: "2001:0:0:CD30::1", + cidrsMap: map[string]string{ + "2001:0:0:CD30::/60": "4292:0:0:CD30::/30", + }, + want: "", + }, + { + name: "ipv6-3", + ip: "2001:0:0:CD30::1", + cidrsMap: map[string]string{ + "2001:0:0:CD30::/60": "4292:0:0:CD30::3/60", + }, + want: "4292:0:0:CD30::1", + }, + { + name: "ipv4+ipv6-1", + ip: "2001:0:0:CD30::1", + cidrsMap: map[string]string{ + "10.234.0.0/16": "10.238.1.2/16", + "2001:0:0:CD30::/60": "4292:0:0:CD30::3/60", + }, + want: "4292:0:0:CD30::1", + }, + { + name: "ipv4+ipv6-2", + ip: "10.222.222.222", + cidrsMap: map[string]string{ + "10.234.0.0/16": "10.238.1.2/16", + "2001:0:0:CD30::/60": "4292:0:0:CD30::3/60", + "10.222.0.0/16": "10.234.1.2/16", + }, + want: "10.234.222.222", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got, _ := netmap.NetMap(tt.ip, tt.cidrsMap); !strings.EqualFold(got, tt.want) { + t.Errorf("kubernetes.NetMap() = %v, want %v", got, tt.want) + } + }) + } + +}