diff --git a/.gitignore b/.gitignore index e84e883..82fc9a4 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,5 @@ ci/tf/*.tfstate ci/tf/*.tfstate.backup ci/tf/hosts.yaml ci/tf/values.yaml + +*.DS_Store \ No newline at end of file diff --git a/Makefile b/Makefile index dc12a02..ab301f6 100644 --- a/Makefile +++ b/Makefile @@ -2,6 +2,7 @@ VERSION ?= $(shell git log -1 --date='format:%Y%m%d' --format='format:%ad').$(shell git describe --always --contains HEAD) BRANCH ?= $(shell git rev-parse --abbrev-ref HEAD) DATE = $(shell date +"%Y-%m-%d_%H:%M:%S") +COMMIT = $(shell git rev-parse HEAD | head -c 8) # Information of OS and ARCH OS = $(shell uname -s) @@ -21,7 +22,7 @@ IMAGE_NAME_TAG ?= $(IMAGE_NAME):$(IMAGE_TAG) # GO FLAGS GOPROXY ?= GO_FLAGS=-ldflags="-s -w" -CNI_VERSION_LD_FLAG=-ldflags="-X github.com/volcengine/cello/pkg/version.Version=$(VERSION)@$(BRANCH)" +CNI_VERSION_LD_FLAG=-ldflags="-X github.com/volcengine/cello/pkg/version.Version=$(VERSION)@$(BRANCH) -X github.com/volcengine/cello/pkg/version.GitCommit=$(COMMIT)" BUILD_INFO=-ldflags="-X main.BuildInfo=$(VERSION)@$(BRANCH)_$(DATE)" # BUILD FLAGS diff --git a/chart/Chart.yaml b/chart/Chart.yaml index ff1d8c4..196fea6 100644 --- a/chart/Chart.yaml +++ b/chart/Chart.yaml @@ -2,5 +2,5 @@ apiVersion: v2 name: vpc-cni description: cello type: application -version: v1.6.2 -appVersion: v1.6.2 +version: 1.6.6 +appVersion: v1.6.6 diff --git a/chart/templates/daemonset.yaml b/chart/templates/daemonset.yaml index f40bfe1..6aac653 100644 --- a/chart/templates/daemonset.yaml +++ b/chart/templates/daemonset.yaml @@ -141,8 +141,6 @@ spec: securityContext: privileged: true volumeMounts: - - mountPath: /host - name: host-root - mountPath: /lib/modules name: host-lib-modules - mountPath: /etc/cni/net.d @@ -155,10 +153,6 @@ spec: name: host-cilium-run - mountPath: /var/run/netns name: host-run-netns - - mountPath: /host/opt/cni/bin - name: host-cni-bin-dir - - mountPath: /host/etc/cni/net.d - name: host-cni-conf-dir - mountPath: /run/xtables.lock name: host-xtables-lock - mountPath: /etc/cilium/cilium-config @@ -210,10 +204,6 @@ spec: - name: host-lib-modules hostPath: path: /lib/modules - - name: host-root - hostPath: - path: / - type: Directory - name: host-cilium-run hostPath: path: /var/run/cilium diff --git a/chart/values.yaml b/chart/values.yaml index ca80bf6..aacaaac 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -1,6 +1,6 @@ image: repository: "cr-cn-beijing.volces.com/cello/cello" - tag: v1.6.2 + tag: v1.6.8 pullPolicy: "Always" cello: diff --git a/cmd/cello-agent/cello-agent.go b/cmd/cello-agent/cello-agent.go index 7b1156a..ad32a65 100644 --- a/cmd/cello-agent/cello-agent.go +++ b/cmd/cello-agent/cello-agent.go @@ -17,6 +17,8 @@ package main import ( "github.com/volcengine/cello/pkg/daemon" + + _ "go.uber.org/automaxprocs" ) func main() { diff --git a/cmd/launcher/cilium/launcher.go b/cmd/launcher/cilium/launcher.go index 94143fa..a7f6b3c 100644 --- a/cmd/launcher/cilium/launcher.go +++ b/cmd/launcher/cilium/launcher.go @@ -18,23 +18,22 @@ package main import ( "encoding/json" "fmt" - "io/ioutil" + "io" "net/http" "os" "os/exec" "os/signal" + "path" "path/filepath" - "strings" "sync" "syscall" "time" - "github.com/containernetworking/plugins/pkg/ns" "github.com/spf13/pflag" + "k8s.io/apimachinery/pkg/util/wait" "github.com/volcengine/cello/pkg/config" "github.com/volcengine/cello/pkg/utils/datatype" - "github.com/volcengine/cello/pkg/utils/kernel" "github.com/volcengine/cello/pkg/utils/logger" "github.com/volcengine/cello/pkg/utils/sysctl" "github.com/volcengine/cello/types" @@ -43,7 +42,6 @@ import ( // cilium Launcher pre_start_check and start cilium while work with cello const ( - bpfFsPath = "/sys/fs/bpf" ciliumConfigPath = "/etc/cilium/cilium-config" celloConfigPath = "/etc/cilium/cello-config" ) @@ -131,8 +129,8 @@ func main() { log.Warnf("Cello agent not ready, err: %v", err) continue } - bodyText, err := ioutil.ReadAll(response.Body) - response.Body.Close() + bodyText, err := io.ReadAll(response.Body) + _ = response.Body.Close() if err != nil { time.Sleep(1 * time.Second) log.Warnf("Cello agent not ready, err: %v", err) @@ -144,31 +142,12 @@ func main() { } log.Infof("Cello ready, launch cilium...") - // kernel version must equal and above 4.19 - if !kernel.CheckKernelVersion(4, 19, 0) { - log.Fatalf("Linux kernel version < 4.19, skipping load cilium") - } - - // ensure bpf mount - err := ensureBpfFsMounted() - if err != nil { - log.Fatalf("BPF filesystem not mount, %v", err) - } - // disable rp_filter - err = sysctl.Disable("net.ipv4.conf.eth0.rp_filter") + err := sysctl.Disable("net.ipv4.conf.eth0.rp_filter") if err != nil { log.Fatalf("Disable rp_filter for eth0 failed, %v", err) } - // modprobe ipvlan - cmd := exec.Command("modprobe", "ipvlan") - _, err = cmd.Output() - if err != nil { - log.Fatalf("Modprobe ipvlan failed, %v", err) - } - log.Infof("Node init success") - // check apiServer info host := os.Getenv("KUBERNETES_SERVICE_HOST") if host == "" { @@ -219,7 +198,7 @@ func main() { } } if !info.IsDir() { - value, err := ioutil.ReadFile(path) + value, err := os.ReadFile(path) if err != nil { return err } @@ -232,15 +211,11 @@ func main() { log.Fatalf("Read custom cilium config failed, %v", err) } + policyState := fmt.Sprintf("%v", ciliumArgs["enable-policy"]) ciliumExitChan := make(chan struct{}) var lock sync.Mutex go func() { - defer func() { - if err := recover(); err != nil { - log.Errorf("Cilium panic, %v", err) - } - }() log.Infof("Run cilium-agent with args: %v", ciliumArgs.ToArgs()) lock.Lock() ciliumCmd = exec.Command("cilium-agent", ciliumArgs.ToArgs()...) @@ -261,75 +236,102 @@ func main() { close(ciliumExitChan) }() + policyCfgPath := path.Join(ciliumConfigPath, "enable-policy") + policyEvent := make(chan *ValueEvent, 1) + watchPath(policyCfgPath, &policyState, policyEvent, 10*time.Second) + // press signal sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) - select { - case sig := <-sigCh: - log.Infof("%d signal: %s", os.Getpid(), sig.String()) - lock.Lock() - if ciliumCmd != nil { - err = ciliumCmd.Process.Signal(syscall.SIGINT) - if err != nil { - log.Infof("INT cilium failed: %v", err) - } - - t := time.NewTimer(30 * time.Second) - select { - case <-ciliumExitChan: - log.Infof("cilium exited, code: %d", ciliumCmd.ProcessState.ExitCode()) - os.Exit(ciliumCmd.ProcessState.ExitCode()) - case <-t.C: - t.Stop() - log.Infof("wait cilium finish timeout") - os.Exit(1) - } + exitCilium := func() { + log.Infof("cilium exiting") + err = ciliumCmd.Process.Signal(syscall.SIGINT) + if err != nil { + log.Infof("INT cilium failed: %v", err) } - case <-ciliumExitChan: - log.Infof("cilium unexpect exited, code: %d", ciliumCmd.ProcessState.ExitCode()) - os.Exit(ciliumCmd.ProcessState.ExitCode()) - } -} -func ensureBpfFsMounted() error { - initNs, err := ns.GetNS("/proc/1/ns/net") - if err != nil { - return fmt.Errorf("nsenter pid 1 failed, %w", err) + t := time.NewTimer(30 * time.Second) + select { + case <-ciliumExitChan: + log.Infof("cilium exited, code: %d", ciliumCmd.ProcessState.ExitCode()) + os.Exit(ciliumCmd.ProcessState.ExitCode()) + case <-t.C: + t.Stop() + log.Infof("wait cilium finish timeout") + os.Exit(1) + } } - err = initNs.Do(func(netNS ns.NetNS) error { - // not mount - if !isBpfMountExist() { - // mount - log.Infof("Mounting BPF filesystem...") - inErr := syscall.Mount("bpffs", bpfFsPath, "bpf", 0, "") - if inErr != nil { - return fmt.Errorf("mount bpf filesystem failed, %w", err) + for { + select { + case sig := <-sigCh: + log.Infof("%d signal: %s", os.Getpid(), sig.String()) + lock.Lock() + if ciliumCmd != nil { + exitCilium() + } + case pe := <-policyEvent: + lock.Lock() + if ciliumCmd != nil { + if pe.err != nil { + log.Errorf("watch policy state failed, %v", pe.err) + exitCilium() + } + if pe.value != "default" && pe.value != "always" && pe.value != "never" { + log.Errorf("Invalid value '%s' for enable-policy", pe.value) + } else { + if err = setPolicyState(pe.value); err != nil { + log.Errorf("Switch enable-policy to %s failed, %v", pe.value, err) + exitCilium() + } + } } - log.Infof("BPF filesystem mounted") - } else { - log.Infof("BPF filesystem has mounted") + lock.Unlock() + case <-ciliumExitChan: + log.Infof("cilium unexpect exited, code: %d", ciliumCmd.ProcessState.ExitCode()) + os.Exit(ciliumCmd.ProcessState.ExitCode()) } - return nil - }) + } +} +func setPolicyState(value string) error { + log.Infof("Switch enable-policy to %s", value) + cfg := fmt.Sprintf("PolicyEnforcement=%s", value) + policyCmd := exec.Command("cilium", "config", cfg) + output, err := policyCmd.CombinedOutput() if err != nil { - return fmt.Errorf("ensure bpf filesystem mount failed, %w", err) + return fmt.Errorf("cmd execute failed, output: %v, err: %v", output, err) } - + log.Infof("Switch enable-policy to %s success", value) return nil } -func isBpfMountExist() bool { - cmd := exec.Command("mount", "-t", "bpf") - output, err := cmd.Output() - if err != nil { - log.Errorf("exec mount command failed, %v", err) - return false - } - if strings.Contains(string(output), bpfFsPath) { - return true - } - return false +type ValueEvent struct { + value string + err error +} + +func watchPath(path string, oldValue *string, valueEvent chan<- *ValueEvent, period time.Duration) { + go wait.Forever(func() { + value, err := os.ReadFile(path) + if os.IsNotExist(err) { + return + } + if err != nil { + event := &ValueEvent{ + value: "", + err: fmt.Errorf("read %s failed, %v", path, err), + } + valueEvent <- event + return + } + newValue := string(value) + if newValue != *oldValue { + *oldValue = newValue + valueEvent <- &ValueEvent{ + value: newValue, + } + } + }, period) } diff --git a/go.mod b/go.mod index 62f8483..72f9e41 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/vishvananda/netlink v1.2.1-beta.2 github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f github.com/volcengine/volcengine-go-sdk v1.0.61 + go.uber.org/automaxprocs v1.5.2 golang.org/x/net v0.9.0 golang.org/x/sys v0.7.0 golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac diff --git a/go.sum b/go.sum index e302956..f4d7814 100644 --- a/go.sum +++ b/go.sum @@ -450,6 +450,7 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= +github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= @@ -616,6 +617,8 @@ go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/automaxprocs v1.5.2 h1:2LxUOGiR3O6tw8ui5sZa2LAaHnsviZdVOUZw4fvbnME= +go.uber.org/automaxprocs v1.5.2/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= diff --git a/opstools/tag/main.go b/opstools/tag/main.go new file mode 100644 index 0000000..aa084ae --- /dev/null +++ b/opstools/tag/main.go @@ -0,0 +1,69 @@ +// Copyright 2023 The Cello Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package main + +import ( + "fmt" + "os" + + "github.com/urfave/cli/v2" +) + +var ( + endpoint string + ramRole string +) + +func main() { + app := &cli.App{ + Name: "tag-tool", + Usage: "convert eni description to tags", + Version: "1.0.0", + Commands: buildCommand(), + Flags: []cli.Flag{ + &cli.StringFlag{Name: "endpoint", Value: "", Required: true, Destination: &endpoint}, + &cli.StringFlag{Name: "ramRole", Value: "KubernetesNodeRoleForECS", Destination: &ramRole}, + }, + } + + err := app.Run(os.Args) + if err != nil { + fmt.Printf("%s", err) + os.Exit(1) + } +} + +func buildCommand() []*cli.Command { + return []*cli.Command{ + { + Name: "tagEni", + Usage: "convert eni description to tags", + Flags: []cli.Flag{ + &cli.BoolFlag{Name: "exec", Value: false}, + &cli.BoolFlag{Name: "vpc-all", Aliases: []string{"v"}, Value: false}, + }, + Action: tagEni, + }, + { + Name: "listTaggedEni", + Usage: "list tagged eni", + Flags: []cli.Flag{ + &cli.BoolFlag{Name: "vpc-all", Aliases: []string{"v"}, Value: false}, + }, + Action: listTaggedEni, + }, + } +} diff --git a/opstools/tag/tag.go b/opstools/tag/tag.go new file mode 100644 index 0000000..956bbe1 --- /dev/null +++ b/opstools/tag/tag.go @@ -0,0 +1,310 @@ +// Copyright 2023 The Cello Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package main + +import ( + "fmt" + "log" + "strings" + + "github.com/urfave/cli/v2" + "github.com/volcengine/volcengine-go-sdk/service/vpc" + "github.com/volcengine/volcengine-go-sdk/volcengine" + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/volcengine/cello/pkg/backoff" + "github.com/volcengine/cello/pkg/metrics" + helper "github.com/volcengine/cello/pkg/provider/volcengine/cellohelper" + apiErr "github.com/volcengine/cello/pkg/provider/volcengine/cellohelper/errors" + "github.com/volcengine/cello/pkg/provider/volcengine/credential" + "github.com/volcengine/cello/pkg/provider/volcengine/ec2" +) + +func createEc2(endpoint, ramRole string, instanceMeta helper.InstanceMetadataGetter) (ec2.EC2, error) { + var credentialProvider credential.Provider + if endpoint == "" { + return nil, fmt.Errorf("no endpoint") + } + if ramRole != "" { + log.Printf("Set credential provider by ramRole %s", ramRole) + credentialProvider = credential.NewTSTProvider(ramRole) + } else { + return nil, fmt.Errorf("no credential provided") + } + + apiClient := metrics.NewMetricEC2Wrapper(ec2.NewClient(instanceMeta.GetRegion(), endpoint, credentialProvider)) + return apiClient, nil +} + +func tagEni(c *cli.Context) error { + // instanceMetaGetter + instanceMeta := helper.GetInstanceMetadata() + + apiClient, err := createEc2(endpoint, ramRole, instanceMeta) + if err != nil { + return err + } + volc := &volcApi{ + vpcId: instanceMeta.GetVpcId(), + instanceID: instanceMeta.GetInstanceId(), + ec2Client: apiClient, + } + return volc.convertENIDescriptionToTags(c) +} + +func listTaggedEni(c *cli.Context) error { + // instanceMetaGetter + instanceMeta := helper.GetInstanceMetadata() + + apiClient, err := createEc2(endpoint, ramRole, instanceMeta) + if err != nil { + return err + } + volc := &volcApi{ + vpcId: instanceMeta.GetVpcId(), + instanceID: instanceMeta.GetInstanceId(), + ec2Client: apiClient, + } + + enis, err := volc.getNetworkInterfacesCreatedByCelloWithTag(c) + if err != nil { + return err + } + printEni(enis) + return nil +} + +type volcApi struct { + vpcId string + instanceID string + ec2Client ec2.EC2 +} + +func (v *volcApi) convertENIDescriptionToTags(c *cli.Context) error { + enis, err := v.getNetworkInterfacesCreatedByCelloWithoutTag(c) + if err != nil { + log.Printf("Get enis created by cello(have special description, no tags) on instance %s failed, %v", + v.instanceID, err) + return err + } + log.Printf("%d eni waiting to tag:\n", len(enis)) + printEni(enis) + log.Println() + if c.Bool("exec") { + return v.tagENIs(enis) + } + return nil +} + +func (v *volcApi) tagENIs(enis []*ec2.NetworkInterfaceSetForDescribeNetworkInterfacesOutput) error { + if len(enis) == 0 { + return nil + } + var err error + + var eniTags []*vpc.TagForTagResourcesInput + tagStr := func(eniTags []*vpc.TagForTagResourcesInput) string { + var strs []string + for _, tag := range eniTags { + strs = append(strs, fmt.Sprintf("%s=%s", volcengine.StringValue(tag.Key), volcengine.StringValue(tag.Value))) + } + return fmt.Sprintf("%v", strs) + } + for _, eni := range enis { + // tag + expectDescriptionPrefix := fmt.Sprintf("%s.%s %s.%s %s.", TagComponentKey, ComponentName, TagVpcIdKey, v.vpcId, TagInstanceKey) + eniInstance, found := strings.CutPrefix(volcengine.StringValue(eni.Description), expectDescriptionPrefix) + if !found || !strings.HasPrefix(eniInstance, "i-") { + log.Printf("eni %s description not expected: %s", volcengine.StringValue(eni.NetworkInterfaceId), volcengine.StringValue(eni.Description)) + continue + } + log.Printf("%s tagging", volcengine.StringValue(eni.NetworkInterfaceId)) + werr := wait.ExponentialBackoff(backoff.BackOff(backoff.APIWriteOps), func() (bool, error) { + eniTags = buildSDKTagForTagResourcesInput(eniInstance) + _, err = v.ec2Client.TagResources(&vpc.TagResourcesInput{ + ResourceIds: []*string{eni.NetworkInterfaceId}, + ResourceType: volcengine.String(vpc.ResourceTypeForTagResourcesInputEni), + Tags: eniTags, + }) + return err == nil, err + }) + if err = apiErr.BackoffErrWrapper(werr, err); err != nil { + log.Printf("Add Tag for eni %s failed, %v", volcengine.StringValue(eni.NetworkInterfaceId), err) + return err + } + log.Printf("%s tagged: %s", volcengine.StringValue(eni.NetworkInterfaceId), tagStr(eniTags)) + } + + return nil +} + +const maxPageSize = 100 + +type TagFilterForDescribeNetworkInterfacesInput []*vpc.TagFilterForDescribeNetworkInterfacesInput +type FilterForDescribeNetworkInterfacesOutput func([]*ec2.NetworkInterfaceSetForDescribeNetworkInterfacesOutput) []*ec2.NetworkInterfaceSetForDescribeNetworkInterfacesOutput + +func (v *volcApi) describeNetworkInterfacesWithPage(pageNumber int, status string, eniType string, eniIDs []string, inputFilter TagFilterForDescribeNetworkInterfacesInput) (*ec2.DescribeNetworkInterfacesOutput, error) { + var resp *ec2.DescribeNetworkInterfacesOutput + var err error + input := &vpc.DescribeNetworkInterfacesInput{ + Type: volcengine.String(eniType), + VpcId: volcengine.String(v.vpcId), + NetworkInterfaceIds: volcengine.StringSlice(eniIDs), + PageNumber: volcengine.Int64(int64(pageNumber)), + PageSize: volcengine.Int64(maxPageSize), + } + if status != "" { + input.Status = volcengine.String(status) + } + if status == helper.ENIStatusInuse { + input.InstanceId = volcengine.String(v.instanceID) + } + + if inputFilter != nil { + input.TagFilters = inputFilter + } + + werr := wait.ExponentialBackoff(backoff.BackOff(backoff.APIFastRetry), func() (bool, error) { + resp, err = v.ec2Client.DescribeNetworkInterfaces(input) + return err == nil, nil + }) + return resp, apiErr.BackoffErrWrapper(werr, err) +} + +func (v *volcApi) getNetworkInterfacesByDescribe(status string, eniType string, eniIDs []string, inputFilter TagFilterForDescribeNetworkInterfacesInput, + outputFilter FilterForDescribeNetworkInterfacesOutput) (int, []*ec2.NetworkInterfaceSetForDescribeNetworkInterfacesOutput, error) { + var preResult []*ec2.NetworkInterfaceSetForDescribeNetworkInterfacesOutput + var result []*ec2.NetworkInterfaceSetForDescribeNetworkInterfacesOutput + pages := 1 + first := true + for i := 1; i <= pages; i++ { + resp, err := v.describeNetworkInterfacesWithPage(i, status, eniType, eniIDs, inputFilter) + if err != nil { + log.Printf("describeNetworkInterfacesWithPage failed: %s", err.Error()) + return 0, result, err + } + + total := int(volcengine.Int64Value(resp.TotalCount)) + if total == 0 { + return 0, result, nil + } + preResult = append(preResult, resp.NetworkInterfaceSets...) + if first { + pages = total / maxPageSize + if total%maxPageSize != 0 { + pages += 1 + } + first = false + } + } + if outputFilter != nil { + return len(preResult), outputFilter(preResult), nil + } + return len(preResult), preResult, nil +} + +func (v *volcApi) getNetworkInterfacesCreatedByCelloWithoutTag(c *cli.Context) ([]*ec2.NetworkInterfaceSetForDescribeNetworkInterfacesOutput, error) { + expectDescriptionPrefix := fmt.Sprintf("%s.%s %s.%s %s.%s", TagComponentKey, ComponentName, TagVpcIdKey, v.vpcId, TagInstanceKey, v.instanceID) + if c.Bool("vpc-all") { + expectDescriptionPrefix = fmt.Sprintf("%s.%s %s.%s %s.", TagComponentKey, ComponentName, TagVpcIdKey, v.vpcId, TagInstanceKey) + } + + filterNetworkInterfacesWithDescriptionWithoutTags := func(ifaces []*ec2.NetworkInterfaceSetForDescribeNetworkInterfacesOutput) []*ec2.NetworkInterfaceSetForDescribeNetworkInterfacesOutput { + var result []*ec2.NetworkInterfaceSetForDescribeNetworkInterfacesOutput + for _, ifa := range ifaces { + // with special description + if strings.HasPrefix(volcengine.StringValue(ifa.Description), expectDescriptionPrefix) && + // without tags + !existSDKTagForDescribeNetworkInterfacesOutput(ifa.Tags) { + result = append(result, ifa) + } + } + return result + } + _, enis, err := v.getNetworkInterfacesByDescribe("", helper.ENITypeSecondary, nil, nil, filterNetworkInterfacesWithDescriptionWithoutTags) + return enis, err +} + +func (v *volcApi) getNetworkInterfacesCreatedByCelloWithTag(c *cli.Context) ([]*ec2.NetworkInterfaceSetForDescribeNetworkInterfacesOutput, error) { + instanceId := v.instanceID + if c.Bool("vpc-all") { + instanceId = "" + } + + _, enis, err := v.getNetworkInterfacesByDescribe("", helper.ENITypeSecondary, nil, buildSDKTagFilterForDescribeNetworkInterfacesInput(instanceId), nil) + return enis, err +} + +const ( + TagVpcIdKey = "vpcId" + TagInstanceKey = "instanceId" + TagComponentKey = "createdBy" + ComponentName = "Cello" +) + +const ( + TagPrefix = "volc:vke:" + VkePlatformTagKey = TagPrefix + "createdby-vke-flag" + VkePlatformTagValue = "true" + ComponentTagKey = TagPrefix + "created-by" + InstanceIdTagKey = TagPrefix + "ecs-id" + Component = "cello" + eniDescription = "interface create by cello" +) + +func buildSDKTagForTagResourcesInput(instanceId string) []*vpc.TagForTagResourcesInput { + return []*vpc.TagForTagResourcesInput{ + {Key: volcengine.String(VkePlatformTagKey), Value: volcengine.String(VkePlatformTagValue)}, + {Key: volcengine.String(ComponentTagKey), Value: volcengine.String(Component)}, + {Key: volcengine.String(InstanceIdTagKey), Value: volcengine.String(instanceId)}, + } +} + +func buildSDKTagFilterForDescribeNetworkInterfacesInput(instanceId string) []*vpc.TagFilterForDescribeNetworkInterfacesInput { + tags := []*vpc.TagFilterForDescribeNetworkInterfacesInput{ + {Key: volcengine.String(VkePlatformTagKey), Values: []*string{volcengine.String(VkePlatformTagValue)}}, + {Key: volcengine.String(ComponentTagKey), Values: []*string{volcengine.String(Component)}}, + } + if instanceId != "" { + instanceTag := &vpc.TagFilterForDescribeNetworkInterfacesInput{Key: volcengine.String(InstanceIdTagKey), Values: []*string{volcengine.String(instanceId)}} + tags = append(tags, instanceTag) + } + return tags +} + +func existSDKTagForDescribeNetworkInterfacesOutput(tags []*vpc.TagForDescribeNetworkInterfacesOutput) bool { + hit := 0 + for _, tag := range tags { + if *tag.Key == VkePlatformTagKey && *tag.Value == VkePlatformTagValue { + hit += 1 + } + if *tag.Key == ComponentTagKey && *tag.Value == Component { + hit += 1 + } + } + return hit == 2 +} + +func printEni(enis []*ec2.NetworkInterfaceSetForDescribeNetworkInterfacesOutput) { + for id, eni := range enis { + var tagStr []string + for _, tag := range eni.Tags { + tagStr = append(tagStr, fmt.Sprintf("%s=%s", volcengine.StringValue(tag.Key), volcengine.StringValue(tag.Value))) + } + log.Printf("{%d: id: %s, description: %s, tags: %v}\n", + id, volcengine.StringValue(eni.NetworkInterfaceId), volcengine.StringValue(eni.Description), tagStr) + } +} diff --git a/pkg/cni/driver/interfaces_test.go b/pkg/cni/driver/interfaces_test.go index dc66797..2f79fd5 100644 --- a/pkg/cni/driver/interfaces_test.go +++ b/pkg/cni/driver/interfaces_test.go @@ -438,8 +438,7 @@ func TestDataPathUseSharedENI(t *testing.T) { assert.True(t, hasFastPath) // Check for filters. - filters, err := netlink.FilterList(eni, - uint32(netlink.HANDLE_CLSACT&0xffff0000|netlink.HANDLE_MIN_EGRESS&0x0000ffff)) + filters, err := netlink.FilterList(eni, qdiscHandle) assert.NoError(t, err) assert.NotEqual(t, 0, len(filters)) hasV4SrcEgressFilter := false @@ -464,10 +463,38 @@ func TestDataPathUseSharedENI(t *testing.T) { if flower.DestIP.Equal(redirV4CIDR.IP) && flower.DestIPMask.String() == redirV4CIDR.Mask.String() { hasV4DstFilter = true + assert.Equal(t, 3, len(flower.Actions)) + hasMirredAct, hasTunKeyAct, hasSkbEditAct := false, false, false + for _, act := range flower.Actions { + switch act.Type() { + case "tunnel_key": + hasTunKeyAct = true + break + case "mirred": + hasMirredAct = true + case "skbedit": + hasSkbEditAct = true + } + } + assert.True(t, hasSkbEditAct && hasTunKeyAct && hasMirredAct) } if flower.DestIP.Equal(redirV6CIDR.IP) && flower.DestIPMask.String() == redirV6CIDR.Mask.String() { hasV6DstFilter = true + assert.Equal(t, 3, len(flower.Actions)) + hasMirredAct, hasTunKeyAct, hasSkbEditAct := false, false, false + for _, act := range flower.Actions { + switch act.Type() { + case "tunnel_key": + hasTunKeyAct = true + break + case "mirred": + hasMirredAct = true + case "skbedit": + hasSkbEditAct = true + } + } + assert.True(t, hasSkbEditAct && hasTunKeyAct && hasMirredAct) } } assert.True(t, hasV4SrcEgressFilter) diff --git a/pkg/cni/driver/ipvlan.go b/pkg/cni/driver/ipvlan.go index 3238bf3..4dad920 100644 --- a/pkg/cni/driver/ipvlan.go +++ b/pkg/cni/driver/ipvlan.go @@ -32,6 +32,8 @@ import ( "github.com/volcengine/cello/pkg/cni/utils" ) +const qdiscHandle = uint32(netlink.HANDLE_CLSACT&0xffff0000 | netlink.HANDLE_MIN_EGRESS&0x0000ffff) + // IPVlanDriver is used in shared ENI mode. type IPVlanDriver struct{} @@ -365,13 +367,12 @@ func (d *IPVlanDriver) setupFilters(link netlink.Link, srcEgressRedirectCIDRs [] ruleInFilter[rule] = false } - parent := uint32(netlink.HANDLE_CLSACT&0xffff0000 | netlink.HANDLE_MIN_EGRESS&0x0000ffff) if err != nil { return fmt.Errorf("list egress filter for %s error, %w", link.Attrs().Name, err) } filtersToDeleted := make([]netlink.Filter, 0) - filters, err := netlink.FilterList(link, parent) + filters, err := netlink.FilterList(link, qdiscHandle) if err != nil { log.Log.Errorf("failed to get filter list, %v", err) } @@ -401,7 +402,7 @@ func (d *IPVlanDriver) setupFilters(link netlink.Link, srcEgressRedirectCIDRs [] for rule, in := range ruleInFilter { if !in { filter := rule.toFlower() - filter.Parent = parent + filter.Parent = qdiscHandle if err := netlink.FilterReplace(filter); err != nil && !os.IsExist(err) { return fmt.Errorf("add filter for %s error, %w", link.Attrs().Name, err) } @@ -477,14 +478,17 @@ func gernerateDstRedirRule(index int, ip *net.IPNet, dstIfIndex int, redir netli var actions []netlink.Action - skbedit := netlink.NewSkbEditAction() + tunnelKeyAct := netlink.NewTunnelKeyAction() + tunnelKeyAct.Action = netlink.TCA_TUNNEL_KEY_UNSET + + skbeditAct := netlink.NewSkbEditAction() ptype := uint16(unix.PACKET_HOST) - skbedit.PType = &ptype + skbeditAct.PType = &ptype mirredAct := netlink.NewMirredAction(dstIfIndex) mirredAct.MirredAction = redir - actions = append(actions, skbedit, mirredAct) + actions = append(actions, tunnelKeyAct, skbeditAct, mirredAct) return &redirectRule{ linkIndex: index, proto: proto, diff --git a/pkg/daemon/ctl.go b/pkg/daemon/ctl.go index 88f35eb..7aba555 100644 --- a/pkg/daemon/ctl.go +++ b/pkg/daemon/ctl.go @@ -23,6 +23,7 @@ import ( "github.com/gin-gonic/gin" helper "github.com/volcengine/cello/pkg/provider/volcengine/cellohelper" + "github.com/volcengine/cello/pkg/utils/runtime" ) const ( @@ -114,11 +115,7 @@ func (c *celloCtlAPI) start() (*http.Server, error) { } go func() { - defer func() { - if err := recover(); err != nil { - log.Errorf("CliServer panic, %v", err) - } - }() + defer runtime.HandleCrash(log) log.Infof("Start ctl http server") err := server.Serve(l) if err != nil { diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index d4f0208..baeef46 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -58,6 +58,8 @@ import ( "github.com/volcengine/cello/pkg/utils/logger" "github.com/volcengine/cello/pkg/utils/math" "github.com/volcengine/cello/pkg/utils/netns" + "github.com/volcengine/cello/pkg/utils/runtime" + "github.com/volcengine/cello/pkg/version" "github.com/volcengine/cello/types" ) @@ -83,7 +85,7 @@ type daemon struct { eniManager *eniResourceManager eniIPManager *eniIPResourceManager managers map[string]ResourceManager // NetResourceType - ResourceManager - devicePlugin deviceplugin.Manager + devicePluginManager deviceplugin.Manager cfg *config.Config lastGC time.Time pbrpc.UnimplementedCelloServer @@ -149,11 +151,11 @@ func NewDaemon() (*daemon, error) { return nil, err } - return newDaemon(k8sService, cfg, apiClient, podPersist, instanceMeta, nil, deviceplugin.NewPluginManagerOption()) + return newDaemon(k8sService, cfg, apiClient, podPersist, instanceMeta, nil) } func newDaemon(k8sService k8s.Service, cfg *config.Config, apiClient ec2.EC2, podPersist PodPersistenceManager, - instanceMeta helper.InstanceMetadataGetter, volcApi helper.VolcAPI, pluginManagerOption *deviceplugin.PluginManagerOption) (*daemon, error) { + instanceMeta helper.InstanceMetadataGetter, volcApi helper.VolcAPI) (*daemon, error) { // register metrics metrics.PrometheusRegister() @@ -219,14 +221,20 @@ func newDaemon(k8sService k8s.Service, cfg *config.Config, apiClient ec2.EC2, po return nil, fmt.Errorf("create eni resource manager failed, %v", err) } d.managers[types.NetResourceTypeEni] = d.eniManager - pluginManagerOption.UseExclusiveENI().WithENILister(func() int { - return math.Max(0, d.eniManager.GetResourceLimit()-d.GetStockPodCount()) - }) + d.devicePluginManager = deviceplugin.NewResourcePluginManager(context.TODO(), + deviceplugin.NewENIDevicePlugin(deviceplugin.ENIResourceName, + math.Max(0, d.eniManager.GetResourceLimit()-d.GetStockPodCount()))) if d.eniManager.SupportTrunk() { - pluginManagerOption.UseBranchENI().WithBranchENILister(func() int { - return d.eniManager.GetTrunkBranchLimit() - }) + d.devicePluginManager.AddPlugin(deviceplugin.NewENIDevicePlugin( + deviceplugin.BranchENIResourceName, + d.eniManager.GetTrunkBranchLimit())) } + sigChannel := make(chan struct{}, 1) + d.instanceLimit.WatchUpdate("device-plugin-eni", sigChannel) + go watchResourceNum(context.TODO(), d.devicePluginManager, deviceplugin.ENIResourceName, func() int { + return math.Max(0, d.eniManager.GetResourceLimit()-d.GetStockPodCount()) + }, sigChannel) + case config.NetworkModeENIShare: d.eniIPManager, err = newEniIPResourceManager(cfg, subnetManager, secGrpManager, volcApi, allocatedResMap[types.NetResourceTypeEniIp], k8sService) @@ -234,20 +242,23 @@ func newDaemon(k8sService k8s.Service, cfg *config.Config, apiClient ec2.EC2, po return nil, fmt.Errorf("create eniIP resource manager failed, %v", err) } d.managers[types.NetResourceTypeEniIp] = d.eniIPManager - pluginManagerOption.UseSharedENI().WithIPLister(func() int { - return math.Max(0, d.eniIPManager.GetResourceLimit()-d.GetStockPodCount()) - }) + d.devicePluginManager = deviceplugin.NewResourcePluginManager(context.TODO(), + deviceplugin.NewENIDevicePlugin(deviceplugin.ENIIPResourceName, + math.Max(0, d.eniIPManager.GetResourceLimit()-d.GetStockPodCount()))) if d.eniIPManager.SupportTrunk() { - pluginManagerOption.UseBranchENI().WithBranchENILister(func() int { - return d.eniIPManager.GetTrunkBranchLimit() - }) + d.devicePluginManager.AddPlugin(deviceplugin.NewENIDevicePlugin( + deviceplugin.BranchENIResourceName, + d.eniIPManager.GetTrunkBranchLimit())) } + sigChannel := make(chan struct{}, 1) + d.instanceLimit.WatchUpdate("device-plugin-eniip", sigChannel) + go watchResourceNum(context.TODO(), d.devicePluginManager, deviceplugin.ENIIPResourceName, func() int { + return math.Max(0, d.eniIPManager.GetResourceLimit()-d.GetStockPodCount()) + }, sigChannel) default: return nil, fmt.Errorf("no support network mode %s", d.networkMode) } - d.devicePlugin = pluginManagerOption.BuildManager() - if datatype.StringValue(cfg.Source) == config.SourceClusterConfigMap { d.k8s.AddConfigMapEventHandler(cache.ResourceEventHandlerFuncs{ UpdateFunc: func(_, newObj interface{}) { @@ -303,13 +314,13 @@ func (d *daemon) gc() { signal.MuteChannel(signal.WakeGC) defer signal.UnmuteChannel(signal.WakeGC) var err error - log.Debugf("Daemon gc start") + log.Infof("Daemon GC start") defer func() { if err != nil { log.Errorf("Daemon gc failed, %v", err) } else { d.lastGC = time.Now() - log.Debugf("Daemon gc finished") + log.Infof("Daemon GC finished") } }() @@ -453,6 +464,7 @@ func (d *daemon) CreateEndpoint(ctx context.Context, req *pbrpc.CreateEndpointRe }) lg.Infof("Handle CreateEndpoint") + defer runtime.HandleCrash(lg) defer func() { if err != nil { lg.Warnf("Fail to handle CreateEndpoint: %v", err) @@ -608,6 +620,7 @@ func (d *daemon) DeleteEndpoint(ctx context.Context, req *pbrpc.DeleteEndpointRe "SandboxContainerId": req.InfraContainerId, }) lg.Infof("Handle DeleteEndpoint") + defer runtime.HandleCrash(lg) defer func() { if err != nil { lg.Warnf("Fail to handle DeleteEndpoint: %s", err.Error()) @@ -678,6 +691,7 @@ func (d *daemon) DeleteEndpoint(ctx context.Context, req *pbrpc.DeleteEndpointRe // GetPodMetaInfo returns Pod metadata. func (d *daemon) GetPodMetaInfo(ctx context.Context, request *pbrpc.GetPodMetaRequest) (*pbrpc.GetPodMetaResponse, error) { + defer runtime.HandleCrash(log) pod, err := d.k8s.GetCachedPod(request.GetNamespace(), request.GetName()) if err != nil { return nil, err @@ -693,6 +707,7 @@ func (d *daemon) GetPodMetaInfo(ctx context.Context, request *pbrpc.GetPodMetaRe // PatchPodAnnotation patches pod annotation. func (d *daemon) PatchPodAnnotation(ctx context.Context, request *pbrpc.PatchPodAnnotationRequest) (*pbrpc.PatchPodAnnotationResponse, error) { + defer runtime.HandleCrash(log) return &pbrpc.PatchPodAnnotationResponse{}, d.k8s.PatchPodAnnotation(ctx, request.GetNamespace(), request.GetName(), request.GetAnnotations()) } @@ -707,6 +722,8 @@ func NewKubernetesClient() (*kubernetes.Clientset, error) { if err != nil { return nil, fmt.Errorf("create incluster config failed: %v", err) } + c.UserAgent = version.UserAgent() + return kubernetes.NewForConfig(c) } @@ -752,11 +769,11 @@ func (d *daemon) syncPodPersistence() error { func (d *daemon) startServers(stopCh chan struct{}) error { log.Infof("Cello daemon ready, start service") - err := d.devicePlugin.Serve(stopCh) + err := d.devicePluginManager.Serve(stopCh) if err != nil { return fmt.Errorf("device plugin start failed: %v", err) } - defer d.devicePlugin.Stop() + defer d.devicePluginManager.Stop() grpcServer, err := d.startEndpointGrpcServer() if err != nil { @@ -851,11 +868,7 @@ func (d *daemon) startDebugServer() (*http.Server, error) { } go func() { - defer func() { - if err := recover(); err != nil { - log.Errorf("DebugServer panic, %v", err) - } - }() + defer runtime.HandleCrash(log) log.Infof("Start debug server") err := server.ListenAndServe() if err != nil { @@ -1015,15 +1028,15 @@ func isHostNetwork(pod *v1.Pod) bool { func hasRequestAndLimitsFields(pod *v1.Pod) bool { for _, c := range pod.Spec.Containers { if c.Resources.Requests != nil { - _, ok := c.Resources.Requests[deviceplugin.ENIResourceName] + _, ok := c.Resources.Requests[deviceplugin.VolcNameSpace+deviceplugin.ENIResourceName] if ok { return true } - _, ok = c.Resources.Requests[deviceplugin.ENIIPResourceName] + _, ok = c.Resources.Requests[deviceplugin.VolcNameSpace+deviceplugin.ENIIPResourceName] if ok { return true } - _, ok = c.Resources.Requests[deviceplugin.BranchENIResourceName] + _, ok = c.Resources.Requests[deviceplugin.VolcNameSpace+deviceplugin.BranchENIResourceName] if ok { return true } @@ -1031,3 +1044,23 @@ func hasRequestAndLimitsFields(pod *v1.Pod) bool { } return false } + +func watchResourceNum(ctx context.Context, pluginManger deviceplugin.Manager, resName string, lister func() int, updateSignal <-chan struct{}) { + // A ticker for resync resourceNum + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + var err error + for { + select { + case <-ticker.C: + err = pluginManger.Update(resName, lister()) + case <-updateSignal: + err = pluginManger.Update(resName, lister()) + case <-ctx.Done(): + return + } + if err != nil { + log.Errorf("update resource for %s failed, %v", resName, err) + } + } +} diff --git a/pkg/daemon/daemon_main.go b/pkg/daemon/daemon_main.go index 2bd1733..97a26e3 100644 --- a/pkg/daemon/daemon_main.go +++ b/pkg/daemon/daemon_main.go @@ -27,8 +27,8 @@ import ( ) func Execute() { - if err := UpdateLocalPodDB(); err != nil { - log.Fatalf("Convert pod format in persistence db before build daemon failed, %v", err) + if err := PreHookAction(); err != nil { + log.Fatalf("PreHook action execute failed, %v", err) } d, err := NewDaemon() diff --git a/pkg/daemon/daemon_test.go b/pkg/daemon/daemon_test.go index 4412547..85f727c 100644 --- a/pkg/daemon/daemon_test.go +++ b/pkg/daemon/daemon_test.go @@ -45,7 +45,7 @@ import ( "github.com/volcengine/volcengine-go-sdk/volcengine/response" "github.com/volcengine/cello/pkg/config" - "github.com/volcengine/cello/pkg/deviceplugin" + mockDeiveplugin "github.com/volcengine/cello/pkg/deviceplugin/mock" "github.com/volcengine/cello/pkg/k8s" "github.com/volcengine/cello/pkg/pbrpc" helper "github.com/volcengine/cello/pkg/provider/volcengine/cellohelper" @@ -995,7 +995,11 @@ func newMockDaemon() (*daemon, error) { return nil, fmt.Errorf("create persistence db failed: %w", err) } - return newDaemon(k8sService, cfg, ec2MockClient, podPersist, instanceMetaGetter, volcApi, deviceplugin.NewPluginManagerOption().WithDryRun()) + d, err := newDaemon(k8sService, cfg, ec2MockClient, podPersist, instanceMetaGetter, volcApi) + if err == nil { + d.devicePluginManager = mockDeiveplugin.PluginManager{} + } + return d, err } func TestDaemon(t *testing.T) { diff --git a/pkg/daemon/eni.go b/pkg/daemon/eni.go index 85709ce..c5b5660 100644 --- a/pkg/daemon/eni.go +++ b/pkg/daemon/eni.go @@ -20,6 +20,8 @@ import ( "strings" "time" + "k8s.io/apimachinery/pkg/util/wait" + "github.com/volcengine/cello/pkg/config" "github.com/volcengine/cello/pkg/k8s" "github.com/volcengine/cello/pkg/metrics" @@ -27,6 +29,7 @@ import ( helper "github.com/volcengine/cello/pkg/provider/volcengine/cellohelper" apiErr "github.com/volcengine/cello/pkg/provider/volcengine/cellohelper/errors" "github.com/volcengine/cello/pkg/utils/math" + "github.com/volcengine/cello/pkg/utils/runtime" "github.com/volcengine/cello/types" ) @@ -156,6 +159,7 @@ func newEniResourceManager(cfg *config.Config, subnet helper.SubnetManager, secM } return nil } + factory.monitor(time.Duration(*cfg.SubnetStatUpdateIntervalSec)*time.Second, time.Duration(*cfg.ReconcileIntervalSec)*time.Second) p, err := pool.NewResourcePool(poolConfig) if err != nil { @@ -194,7 +198,7 @@ func (f *eniFactory) Name() string { return types.NetResourceTypeEni } -// Create while create count eni without ip count, +// Create while create count eni with one ip, // it will ignore some errors while partially request successful. func (f *eniFactory) Create(count int) (res []types.NetResource, err error) { for i := 0; i < count; i++ { @@ -220,8 +224,10 @@ func (f *eniFactory) CreateWithIPCount(ipCnt int, trunk bool) (types.NetResource }() subnet := f.subnets.SelectSubnet(f.ipFamily, helper.WithAging(subnetAging)) if subnet == nil { + f.limit.CordonCreate("eniFactory create eni") return nil, fmt.Errorf("no available subnet, please check subnets and available ip of subnets") } + f.limit.UnCordonCreate("eniFactory create eni") eni, err := f.volcApi.AllocENI(subnet.SubnetId, f.secManager.GetSecurityGroups(), trunk, ipCnt) if err != nil { @@ -289,6 +295,21 @@ func (f *eniFactory) GetResourceLimit() int { return limit.ENIAvailable() } +func (f *eniFactory) monitor(subnetPeriod, limitPeriod time.Duration) { + go wait.Forever(func() { + log.Debugf("Monitor check subnet") + defer runtime.HandleCrash(log) + if subnet := f.subnets.SelectSubnet(f.ipFamily, helper.WithAging(subnetAging)); subnet != nil { + f.limit.UnCordonCreate("eniFactory subnet monitor") + } + }, subnetPeriod) + go wait.Forever(func() { + log.Debugf("Monitor check limit") + defer runtime.HandleCrash(log) + f.limit.Update() + }, limitPeriod) +} + func newEniFactory(secManager helper.SecurityGroupManager, subnetManager helper.SubnetManager, api helper.VolcAPI, limit helper.InstanceLimitManager, ipFamily types.IPFamily) (*eniFactory, error) { if subnetManager == nil { return nil, fmt.Errorf("subnet manager is nil") diff --git a/pkg/daemon/eni_multi_ip.go b/pkg/daemon/eni_multi_ip.go index e502de0..43b86d6 100644 --- a/pkg/daemon/eni_multi_ip.go +++ b/pkg/daemon/eni_multi_ip.go @@ -157,6 +157,8 @@ func newEniIPResourceManager(cfg *config.Config, subnet helper.SubnetManager, se limit.UpdateTrunk(m.trunkEni) } + eniFact.monitor(time.Duration(*cfg.SubnetStatUpdateIntervalSec)*time.Second, time.Duration(*cfg.ReconcileIntervalSec)*time.Second) + factory := &eniIPFactory{ eniFactory: eniFact, RWMutex: sync.RWMutex{}, @@ -280,7 +282,7 @@ func newEniIPResourceManager(cfg *config.Config, subnet helper.SubnetManager, se } return nil } - go factory.subnetMonitor(time.Duration(*cfg.SubnetStatUpdateIntervalSec)*time.Second, + factory.subnetMonitor(time.Duration(*cfg.SubnetStatUpdateIntervalSec)*time.Second, time.Duration(*cfg.SubnetStatAgingSec)*time.Second) p, err := pool.NewResourcePool(poolConfig) if err != nil { @@ -424,10 +426,15 @@ func (f *eniIPFactory) Name() string { func (f *eniIPFactory) receiveRes() (ip *types.ENIIP, err error) { eniIP := <-f.eniIpReceiver resErr := eniIP.err - if resErr != nil && (strings.Contains(resErr.Error(), apiErr.LimitExceededPrivateIpsPerEni) || - strings.Contains(resErr.Error(), apiErr.LimitExceededIpv6AddressesPerEni)) { - f.eniFactory.limit.Update() - signal.NotifySignal(signal.WakeGC, signal.SigWakeGC) + if resErr != nil { + if strings.Contains(resErr.Error(), apiErr.LimitExceededPrivateIpsPerEni) || + strings.Contains(resErr.Error(), apiErr.LimitExceededIpv6AddressesPerEni) { + f.eniFactory.limit.Update() + signal.NotifySignal(signal.WakeGC, signal.SigWakeGC) + } + if strings.Contains(resErr.Error(), apiErr.InsufficientIpInSubnet) { + f.eniFactory.limit.NotifyWatcher() + } } if eniIP.ENIIP == nil || resErr != nil { @@ -526,47 +533,50 @@ func (f *eniIPFactory) deleteEniLocked(eni *ENI) { // initENI initializes ENI. func (f *eniIPFactory) initENI(eni *ENI) { - var err error var ipv4s, ipv6s []net.IP + var vpcEni types.NetResource + var err error // if err is not nil, init eni failed, clean it defer func() { <-f.eniPending }() - vpcEni, err := f.eniFactory.CreateWithIPCount(eni.pending, false) + vpcEni, err = f.eniFactory.CreateWithIPCount(eni.pending, false) if err == nil { var ok bool eni.ENI, ok = vpcEni.(*types.ENI) if !ok { - log.Errorf("Net resource created by factory is not expect type eni, try release it") - err = f.eniFactory.Release(vpcEni) - if err != nil { - log.Errorf("Release unexpect resource %+v failed, %v", vpcEni, err) + err = fmt.Errorf("net resource created by factory is not expect type, get %+v, try release it", vpcEni) + log.Error(err) + releaseErr := f.eniFactory.Release(vpcEni) + if releaseErr != nil { + log.Errorf("Release unexpect resource %+v failed, %v", vpcEni, releaseErr) } } else { ipv4s, ipv6s, err = f.volcApi.GetENIIPList(eni.Mac.String()) if err != nil { log.Errorf("Get ip list on eni failed, %v, try release it", err) - err = f.eniFactory.Release(vpcEni) - if err != nil { - log.Errorf("Release eni %+v failed, %v", vpcEni, err) + releaseErr := f.eniFactory.Release(vpcEni) + if releaseErr != nil { + log.Errorf("Release eni %+v failed, %v", vpcEni, releaseErr) } } if f.ipFamily.EnableIPv4() && f.ipFamily.EnableIPv6() { // check ip pairs if len(ipv4s) != len(ipv6s) { - log.Errorf("The number of ipv4 and ipv6 not equal on eni %+v, try release it", vpcEni) - err = f.eniFactory.Release(vpcEni) - if err != nil { - log.Errorf("Release eni %+v failed, %v", vpcEni, err) + err = fmt.Errorf("the number of ipv4 and ipv6 not equal on eni %+v, try release it", vpcEni) + log.Error(err) + releaseErr := f.eniFactory.Release(vpcEni) + if releaseErr != nil { + log.Errorf("Release eni %+v failed, %v", vpcEni, releaseErr) } } } } } + // if err is not nil, init eni failed, clean it if err != nil { - // clean eni.Lock() for i := 0; i < eni.pending; i++ { f.eniIpReceiver <- &ENIIPRes{ diff --git a/pkg/daemon/prestart.go b/pkg/daemon/prestart.go index 475b058..6a53fd1 100644 --- a/pkg/daemon/prestart.go +++ b/pkg/daemon/prestart.go @@ -26,8 +26,16 @@ import ( "github.com/volcengine/cello/types" ) -// UpdateLocalPodDB migrate local Pod persistence DB to the current version. -func UpdateLocalPodDB() error { +func PreHookAction() error { + err := updateLocalPodDB() + if err != nil { + return fmt.Errorf("update local pod db failed, %v", err) + } + return nil +} + +// updateLocalPodDB migrate local Pod persistence DB to the current version. +func updateLocalPodDB() error { log.Infof("Start convert pod format in persistence db") ipNsMapping := map[string]string{} diff --git a/pkg/deviceplugin/eniPlugin.go b/pkg/deviceplugin/eniPlugin.go new file mode 100644 index 0000000..be870ba --- /dev/null +++ b/pkg/deviceplugin/eniPlugin.go @@ -0,0 +1,169 @@ +// Copyright 2023 The Cello Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package deviceplugin + +import ( + "context" + "fmt" + "path" + "time" + + "google.golang.org/grpc" + pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" +) + +// ENIDevicePlugin implements the Kubernetes devices device deviceplugin API. +type ENIDevicePlugin struct { + resourceName string + apiEndPoint string + count int + updateSignal chan int + server *grpc.Server + ctx context.Context +} + +// NewENIDevicePlugin creates a new ENIDevicePlugin. +func NewENIDevicePlugin(resName string, initCount int) *ENIDevicePlugin { + return &ENIDevicePlugin{ + resourceName: resName, + apiEndPoint: path.Join(DevicePluginPath, resName+".sock"), + count: initCount, + updateSignal: make(chan int, 1), + server: grpc.NewServer(), + ctx: context.TODO(), + } +} + +// GetDevicePluginOptions returns options that ENI devices support. +func (eniPlugin *ENIDevicePlugin) GetDevicePluginOptions(_ context.Context, _ *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) { + return &pluginapi.DevicePluginOptions{}, nil +} + +// ListAndWatch returns ENI devices list. +func (eniPlugin *ENIDevicePlugin) ListAndWatch(_ *pluginapi.Empty, stream pluginapi.DevicePlugin_ListAndWatchServer) error { + count := eniPlugin.count + + sendResponse := func(count int, s pluginapi.DevicePlugin_ListAndWatchServer) error { + res := make([]*pluginapi.Device, count) + for i := 0; i < count; i++ { + res[i] = &pluginapi.Device{ + ID: fmt.Sprintf("%v-%d", eniPlugin.resourceName, i), + Health: pluginapi.Healthy, + } + } + + resp := &pluginapi.ListAndWatchResponse{ + Devices: res, + } + err := stream.Send(resp) + log.Infof("Report resources: %v of %v", eniPlugin.resourceName, count) + if err != nil { + log.Errorf("Send devices error: %v", err) + return err + } + return nil + } + + if err := sendResponse(count, stream); err != nil { + return err + } + ticker := time.NewTicker(reportPeriod) + defer ticker.Stop() + for { + select { + case <-ticker.C: + count = eniPlugin.count + err := sendResponse(count, stream) + if err != nil { + return err + } + // Send new list when resource count changed + case eniPlugin.count = <-eniPlugin.updateSignal: + count = eniPlugin.count + err := sendResponse(count, stream) + if err != nil { + return err + } + case <-eniPlugin.ctx.Done(): + return nil + } + } +} + +// Allocate does nothing, here we only return a void response. +func (eniPlugin *ENIDevicePlugin) Allocate(_ context.Context, request *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) { + resp := pluginapi.AllocateResponse{ + ContainerResponses: []*pluginapi.ContainerAllocateResponse{}, + } + + for range request.GetContainerRequests() { + resp.ContainerResponses = append(resp.ContainerResponses, + &pluginapi.ContainerAllocateResponse{}, + ) + } + + return &resp, nil +} + +// PreStartContainer is not supported by this plugin. +func (eniPlugin *ENIDevicePlugin) PreStartContainer(_ context.Context, _ *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) { + return &pluginapi.PreStartContainerResponse{}, nil +} + +// GetPreferredAllocation is not supported by this plugin. +func (eniPlugin *ENIDevicePlugin) GetPreferredAllocation(_ context.Context, _ *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) { + return &pluginapi.PreferredAllocationResponse{}, nil +} + +// Endpoint returns the path of grpc UDS endpoint +func (eniPlugin *ENIDevicePlugin) Endpoint() string { + return eniPlugin.apiEndPoint +} + +func (eniPlugin *ENIDevicePlugin) ResourceName() string { + return eniPlugin.resourceName +} + +func (eniPlugin *ENIDevicePlugin) Server() *grpc.Server { + return eniPlugin.server +} + +func (eniPlugin *ENIDevicePlugin) ResetServer() { + eniPlugin.server = grpc.NewServer() +} + +func (eniPlugin *ENIDevicePlugin) SetContext(ctx context.Context) { + eniPlugin.ctx = ctx +} + +func (eniPlugin *ENIDevicePlugin) Update(count int) { + if count == eniPlugin.count { + return + } + t := time.NewTimer(5 * time.Second) + defer t.Stop() + select { + case eniPlugin.updateSignal <- count: + return + case <-t.C: + eniPlugin.count = count + case <-eniPlugin.updateSignal: + eniPlugin.updateSignal <- count + eniPlugin.count = count + log.Errorf("Failed to update resource count: %v ", count) + return + } +} diff --git a/pkg/deviceplugin/manager.go b/pkg/deviceplugin/manager.go index 1f1f5a7..2406e14 100644 --- a/pkg/deviceplugin/manager.go +++ b/pkg/deviceplugin/manager.go @@ -17,6 +17,7 @@ package deviceplugin import ( "context" + "fmt" "net" "os" "path" @@ -27,72 +28,32 @@ import ( "google.golang.org/grpc/credentials/insecure" pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" - "github.com/volcengine/cello/pkg/deviceplugin/mock" "github.com/volcengine/cello/pkg/utils/logger" ) var log = logger.GetLogger().WithFields(logger.Fields{"subsys": "deviceplugin"}) -// Manager is the interface of device plugin manager. -type Manager interface { - Serve(stopCh chan struct{}) error - Stop() - Update(count int) -} - // PluginManager manages all device plugins. type PluginManager struct { - plugins []*ENIDevicePlugin - res *resource + plugins map[string]Plugin cancel context.CancelFunc ctx context.Context } -type PluginManagerOption struct { - useExclusiveENI bool - useSharedENI bool - useBranchENI bool - ctx context.Context - eniLister func() int - eniIPLister func() int - branchENILister func() int - dryRun bool +func (manager *PluginManager) Plugin(resourceName string) Plugin { + plugin, _ := manager.plugins[resourceName] + return plugin } -// NewPluginManagerWithOptions creates a new PluginManager with given options. -func NewPluginManagerWithOptions(option *PluginManagerOption) *PluginManager { - var ctx context.Context - var cancel context.CancelFunc - if option.ctx == nil { - ctx, cancel = context.WithCancel(context.Background()) - } else { - ctx, cancel = context.WithCancel(option.ctx) - } - manager := PluginManager{ - plugins: []*ENIDevicePlugin{}, - cancel: cancel, - res: &resource{}, +func NewResourcePluginManager(ctx context.Context, plugins ...Plugin) *PluginManager { + mgr := PluginManager{} + mgr.ctx, mgr.cancel = context.WithCancel(ctx) + mgr.plugins = make(map[string]Plugin) + for _, plugin := range plugins { + plugin.SetContext(mgr.ctx) + mgr.plugins[plugin.ResourceName()] = plugin } - if option.useExclusiveENI { - manager.res.updateSignal = make(chan struct{}) - manager.plugins = append(manager.plugins, NewENIDevicePlugin(Exclusive, manager.res, option.eniLister)) - manager.ctx = ctx - } - if option.useSharedENI { - manager.res.updateSignal = make(chan struct{}) - manager.plugins = append(manager.plugins, NewENIDevicePlugin(Shared, manager.res, option.eniIPLister)) - manager.ctx = ctx - } - - if option.useBranchENI { - resNumCh := resource{ - updateSignal: make(chan struct{}), - } - manager.plugins = append(manager.plugins, NewENIDevicePlugin(Trunk, &resNumCh, option.branchENILister)) - manager.ctx = ctx - } - - return &manager + return &mgr } // register registers device plugins grpc endpoints to kubelet @@ -107,8 +68,8 @@ func (manager *PluginManager) register() error { for _, plugin := range manager.plugins { _, err = client.Register(manager.ctx, &pluginapi.RegisterRequest{ Version: pluginapi.Version, - Endpoint: path.Base(plugin.endPoint), - ResourceName: plugin.name, + Endpoint: path.Base(plugin.Endpoint()), + ResourceName: path.Join(VolcNameSpace, plugin.ResourceName()), }) if err != nil { @@ -146,10 +107,11 @@ func (manager *PluginManager) Serve(stopCh chan struct{}) error { select { case event, ok := <-watcher.Events: if !ok { + log.Error("Watch kubelet failed.") return } if event.Name == KubeletSocket && event.Has(fsnotify.Create) { - log.Infof(" %s created, restarting.", pluginapi.KubeletSocket) + log.Infof(" %s created, restarting.", KubeletSocket) manager.Stop() manager.ctx, manager.cancel = context.WithCancel(context.Background()) _ = manager.start() @@ -157,9 +119,6 @@ func (manager *PluginManager) Serve(stopCh chan struct{}) error { if err != nil { log.Errorf("Register failed after kubelet restart", err) } - if err != nil { - log.Errorf("register failed after kubelet restart") - } } else if event.Name == "kubelet.sock" && event.Op&fsnotify.Remove == fsnotify.Remove { log.Infof("Kubelet stopped") } @@ -186,18 +145,18 @@ func (manager *PluginManager) Stop() { } // Update will emit count to res channel asynchronously. -func (manager *PluginManager) Update(count int) { - go func() { - t := time.NewTimer(5 * time.Second) - manager.res.count = count - select { - case manager.res.updateSignal <- struct{}{}: - return - case <-t.C: - log.Errorf("Failed to update resource count: %v ", count) - return - } - }() +func (manager *PluginManager) Update(resName string, count int) error { + plugin, ok := manager.plugins[resName] + if !ok { + return fmt.Errorf("plugin not found") + } + plugin.Update(count) + return nil +} + +func (manager *PluginManager) AddPlugin(plugin Plugin) { + plugin.SetContext(manager.ctx) + manager.plugins[plugin.ResourceName()] = plugin } // start will boot grpc service and listen on /var/lib/kubelet/device-plugin/.sock. @@ -205,23 +164,23 @@ func (manager *PluginManager) start() error { if err := manager.cleanUp(); err != nil { return err } - for _, eniPlugin := range manager.plugins { - sock, err := net.Listen("unix", eniPlugin.endPoint) + for _, plugin := range manager.plugins { + sock, err := net.Listen("unix", plugin.Endpoint()) if err != nil { return err } - eniPlugin.server = grpc.NewServer() - eniPlugin.ctx = manager.ctx - pluginapi.RegisterDevicePluginServer(eniPlugin.server, eniPlugin) + plugin.ResetServer() + plugin.SetContext(manager.ctx) + pluginapi.RegisterDevicePluginServer(plugin.Server(), plugin) go func() { - err := eniPlugin.server.Serve(sock) + err := plugin.Server().Serve(sock) if err != nil { log.Errorf("Failed to serve deviceplugin grpc server.") } }() - conn, err := dailUnix(manager.ctx, eniPlugin.endPoint) + conn, err := dailUnix(manager.ctx, plugin.Endpoint()) if err != nil { return err } @@ -229,7 +188,7 @@ func (manager *PluginManager) start() error { if err != nil { return err } - log.Infof("Start device plugin for %v", eniPlugin.name) + log.Infof("Start device plugin for %v", VolcNameSpace+plugin.ResourceName()) } return nil } @@ -238,74 +197,23 @@ func (manager *PluginManager) start() error { func (manager *PluginManager) stop() { manager.cancel() for _, eniPlugin := range manager.plugins { - if eniPlugin.server == nil { + if eniPlugin.Server() == nil { return } - eniPlugin.server.Stop() - eniPlugin.server = nil + eniPlugin.Server().Stop() } } // cleanUp delete all resource. func (manager *PluginManager) cleanUp() error { for _, res := range manager.plugins { - if err := os.Remove(res.endPoint); err != nil && !os.IsNotExist(err) { + if err := os.Remove(res.Endpoint()); err != nil && !os.IsNotExist(err) { return err } } return nil } -func NewPluginManagerOption() *PluginManagerOption { - return &PluginManagerOption{} -} - -func (option *PluginManagerOption) WithDryRun() *PluginManagerOption { - option.dryRun = true - return option -} - -func (option *PluginManagerOption) UseExclusiveENI() *PluginManagerOption { - option.useExclusiveENI = true - return option -} - -func (option *PluginManagerOption) WithENILister(lister func() int) *PluginManagerOption { - option.eniLister = lister - return option -} - -func (option *PluginManagerOption) UseSharedENI() *PluginManagerOption { - option.useSharedENI = true - return option -} - -func (option *PluginManagerOption) WithIPLister(lister func() int) *PluginManagerOption { - option.eniIPLister = lister - return option -} - -func (option *PluginManagerOption) UseBranchENI() *PluginManagerOption { - option.useBranchENI = true - return option -} - -func (option *PluginManagerOption) WithBranchENILister(lister func() int) *PluginManagerOption { - option.branchENILister = lister - return option -} -func (option *PluginManagerOption) WithContext(ctx context.Context) *PluginManagerOption { - option.ctx = ctx - return option -} - -func (option *PluginManagerOption) BuildManager() Manager { - if option.dryRun { - return mock.New() - } - return NewPluginManagerWithOptions(option) -} - func dailUnix(ctx context.Context, path string) (*grpc.ClientConn, error) { conn, err := grpc.DialContext(ctx, path, grpc.WithTransportCredentials(insecure.NewCredentials()), diff --git a/pkg/deviceplugin/manager_test.go b/pkg/deviceplugin/manager_test.go index 59065b3..f91c885 100644 --- a/pkg/deviceplugin/manager_test.go +++ b/pkg/deviceplugin/manager_test.go @@ -13,7 +13,7 @@ // limitations under the License. // -package deviceplugin +package deviceplugin_test import ( "context" @@ -23,6 +23,9 @@ import ( "github.com/stretchr/testify/assert" pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" + + "github.com/volcengine/cello/pkg/deviceplugin" + "github.com/volcengine/cello/pkg/deviceplugin/mock" ) const ( @@ -31,15 +34,14 @@ const ( func TestPluginManager_UseSharedENI(t *testing.T) { setupEnv() - client := NewMockClient() + client := mock.NewMockKubelet(deviceplugin.DevicePluginPath) err := client.StartServer() assert.NoError(t, err) - option := NewPluginManagerOption().UseSharedENI().WithIPLister(func() int { - return 5 - }).WithContext(context.Background()) - manager := NewPluginManagerWithOptions(option) - defer manager.cleanUp() + manager := deviceplugin.NewResourcePluginManager(context.Background(), + deviceplugin.NewENIDevicePlugin(deviceplugin.ENIIPResourceName, 5)) + assert.NotNil(t, manager.Plugin(deviceplugin.ENIIPResourceName)) + defer manager.Stop() stopCh := make(chan struct{}) err = manager.Serve(stopCh) @@ -48,8 +50,10 @@ func TestPluginManager_UseSharedENI(t *testing.T) { // Test get options. var pluginOption *pluginapi.DevicePluginOptions pluginOption, err = client.Res[0].Client.GetDevicePluginOptions(context.Background(), &pluginapi.Empty{}) + assert.NotNil(t, pluginOption) assert.NoError(t, err) - wantOptions, err := manager.plugins[0].GetDevicePluginOptions(context.Background(), &pluginapi.Empty{}) + wantOptions, err := manager.Plugin(deviceplugin.ENIIPResourceName).GetDevicePluginOptions( + context.Background(), &pluginapi.Empty{}) assert.NoError(t, err) assert.Equal(t, wantOptions.String(), pluginOption.String()) @@ -58,27 +62,35 @@ func TestPluginManager_UseSharedENI(t *testing.T) { recv, err := watch.Recv() assert.NoError(t, err) assert.Equal(t, 5, len(recv.Devices)) - manager.Update(3) + err = manager.Update(deviceplugin.ENIIPResourceName, 3) + assert.NoError(t, err) recv, err = watch.Recv() assert.Equal(t, 3, len(recv.Devices)) - // Test restart + // Test restart. _ = client.Stop() assert.NoError(t, err) time.Sleep(10 * time.Second) + _ = manager.Update(deviceplugin.ENIIPResourceName, 4) + _ = manager.Update(deviceplugin.ENIIPResourceName, 6) err = client.StartServer() assert.NoError(t, err) - time.Sleep(30 * time.Second) + time.Sleep(10 * time.Second) assert.True(t, client.Registered()) + watch = client.Res[0].Watcher + recv, err = watch.Recv() + assert.NoError(t, err) + assert.Equal(t, 4, len(recv.Devices)) + recv, err = watch.Recv() + assert.NoError(t, err) + assert.Equal(t, 6, len(recv.Devices)) manager.Stop() - for _, res := range manager.plugins { - assert.NoFileExists(t, res.endPoint) - } + assert.NoFileExists(t, manager.Plugin(deviceplugin.ENIIPResourceName).Endpoint()) } func setupEnv() { _ = os.MkdirAll(tmpPath, os.ModePerm) - DevicePluginPath = tmpPath - KubeletSocket = DevicePluginPath + "kubelet.sock" + deviceplugin.DevicePluginPath = tmpPath + deviceplugin.KubeletSocket = deviceplugin.DevicePluginPath + "kubelet.sock" } diff --git a/pkg/deviceplugin/kubelet_mock.go b/pkg/deviceplugin/mock/kubelet_mock.go similarity index 70% rename from pkg/deviceplugin/kubelet_mock.go rename to pkg/deviceplugin/mock/kubelet_mock.go index a608adb..d0373ac 100644 --- a/pkg/deviceplugin/kubelet_mock.go +++ b/pkg/deviceplugin/mock/kubelet_mock.go @@ -13,7 +13,7 @@ // limitations under the License. // -package deviceplugin +package mock import ( "context" @@ -29,10 +29,13 @@ import ( // Kubelet MockClient is a mock RPC client of kubelet for testing purpose. type Kubelet struct { - srv *grpc.Server - sock net.Listener - Res []deviceResource - registered bool + srv *grpc.Server + sock net.Listener + Res []deviceResource + devicepluginPath string + registered bool + ctx context.Context + cancel context.CancelFunc sync.Mutex } @@ -43,11 +46,15 @@ type deviceResource struct { Watcher pluginapi.DevicePlugin_ListAndWatchClient } -func NewMockClient() *Kubelet { +func NewMockKubelet(devicepluginPath string) *Kubelet { + ctx, cancel := context.WithCancel(context.Background()) return &Kubelet{ - srv: nil, - sock: nil, - Res: make([]deviceResource, 0), + devicepluginPath: devicepluginPath, + srv: nil, + sock: nil, + Res: make([]deviceResource, 0), + ctx: ctx, + cancel: cancel, } } @@ -60,7 +67,7 @@ func (m *Kubelet) Register(_ context.Context, request *pluginapi.RegisterRequest m.registered = true m.Mutex.Unlock() - conn, err := grpc.DialContext(context.Background(), path.Join(DevicePluginPath, request.Endpoint), + conn, err := grpc.DialContext(m.ctx, path.Join(m.devicepluginPath, request.Endpoint), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) { socketAddr, err := net.ResolveUnixAddr("unix", addr) @@ -73,16 +80,17 @@ func (m *Kubelet) Register(_ context.Context, request *pluginapi.RegisterRequest return &pluginapi.Empty{}, err } m.Res[len(m.Res)-1].Client = pluginapi.NewDevicePluginClient(conn) - m.Res[len(m.Res)-1].Watcher, _ = m.Res[0].Client.ListAndWatch(context.Background(), &pluginapi.Empty{}) + m.Res[len(m.Res)-1].Watcher, _ = m.Res[0].Client.ListAndWatch(m.ctx, &pluginapi.Empty{}) return &pluginapi.Empty{}, nil } func (m *Kubelet) StartServer() error { - socket, err := net.Listen("unix", KubeletSocket) + kubeletSock := path.Join(m.devicepluginPath, "kubelet.sock") + socket, err := net.Listen("unix", kubeletSock) if err != nil { - _ = os.Remove(KubeletSocket) - socket, err = net.Listen("unix", KubeletSocket) + _ = os.Remove(kubeletSock) + socket, err = net.Listen("unix", kubeletSock) } if err != nil { return err @@ -96,7 +104,7 @@ func (m *Kubelet) StartServer() error { } m.Res = []deviceResource{} - _, err = grpc.DialContext(context.Background(), path.Join(DevicePluginPath, KubeletSocket), + _, err = grpc.DialContext(m.ctx, kubeletSock, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) { socketAddr, err := net.ResolveUnixAddr("unix", addr) @@ -111,9 +119,11 @@ func (m *Kubelet) StartServer() error { func (m *Kubelet) Stop() error { m.registered = false + m.cancel() m.srv.Stop() m.srv = nil - err := os.Remove(KubeletSocket) + m.ctx, m.cancel = context.WithCancel(context.Background()) + err := os.Remove(path.Join(m.devicepluginPath, "kubelet.sock")) if err != nil { return err } diff --git a/pkg/deviceplugin/mock/mock_manager.go b/pkg/deviceplugin/mock/mock_manager.go index 2a0e79b..12daf99 100644 --- a/pkg/deviceplugin/mock/mock_manager.go +++ b/pkg/deviceplugin/mock/mock_manager.go @@ -15,18 +15,25 @@ package mock +import "github.com/volcengine/cello/pkg/deviceplugin" + type PluginManager struct{} -func (p PluginManager) Serve(_ chan struct{}) error { +func (p PluginManager) Plugin(resourceName string) deviceplugin.Plugin { return nil } -func (p PluginManager) Stop() { +func (p PluginManager) AddPlugin(plugin deviceplugin.Plugin) { + return } -func (p PluginManager) Update(_ int) { +func (p PluginManager) Update(resourceName string, count int) error { + return nil } -func New() *PluginManager { - return &PluginManager{} +func (p PluginManager) Serve(_ chan struct{}) error { + return nil +} + +func (p PluginManager) Stop() { } diff --git a/pkg/deviceplugin/plugin.go b/pkg/deviceplugin/plugin.go deleted file mode 100644 index 266bb58..0000000 --- a/pkg/deviceplugin/plugin.go +++ /dev/null @@ -1,179 +0,0 @@ -// Copyright 2023 The Cello Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -package deviceplugin - -import ( - "context" - "fmt" - "path" - "time" - - "google.golang.org/grpc" - pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" -) - -const period = time.Minute * 3 - -// ResourceName. -const ( - // VolcNameSpace is a VKE prifix. - VolcNameSpace = "vke.volcengine.com/" - // ENIResourceName indicates exclusive ENI resource name. - ENIResourceName = VolcNameSpace + "eni" - // ENIIPResourceName indicates shared ENI ip addresses resource name. - ENIIPResourceName = VolcNameSpace + "eni-ip" - // BranchENIResourceName indicates truck ENI. - BranchENIResourceName = VolcNameSpace + "branch-eni" -) - -type ENIType int - -const ( - Exclusive = iota - Shared - Trunk -) - -var DevicePluginPath = pluginapi.DevicePluginPath -var KubeletSocket = DevicePluginPath + "kubelet.sock" - -type resource struct { - updateSignal chan struct{} - count int -} - -// ENIDevicePlugin implements the Kubernetes devices device deviceplugin API. -type ENIDevicePlugin struct { - ENIType - name string - endPoint string - server *grpc.Server - res *resource - ctx context.Context - listFunc func() int -} - -// NewENIDevicePlugin creates a new ENIDevicePlugin. -func NewENIDevicePlugin(t ENIType, res *resource, list func() int) *ENIDevicePlugin { - switch t { - case Exclusive: - return &ENIDevicePlugin{ - ENIType: Exclusive, - name: ENIResourceName, - endPoint: path.Join(DevicePluginPath, "eni.sock"), - res: res, - listFunc: list, - } - case Shared: - return &ENIDevicePlugin{ - ENIType: Shared, - name: ENIIPResourceName, - endPoint: path.Join(DevicePluginPath, "eni-ip.sock"), - res: res, - listFunc: list, - } - case Trunk: - return &ENIDevicePlugin{ - ENIType: Trunk, - name: BranchENIResourceName, - endPoint: path.Join(DevicePluginPath, "branch-eni.sock"), - res: res, - listFunc: list, - } - } - return nil -} - -// GetDevicePluginOptions returns options that ENI devices support. -func (eniPlugin ENIDevicePlugin) GetDevicePluginOptions(_ context.Context, _ *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) { - return &pluginapi.DevicePluginOptions{}, nil -} - -// ListAndWatch returns ENI devices list. -func (eniPlugin ENIDevicePlugin) ListAndWatch(_ *pluginapi.Empty, stream pluginapi.DevicePlugin_ListAndWatchServer) error { - count := eniPlugin.listFunc() - - sendResponse := func(count int, s pluginapi.DevicePlugin_ListAndWatchServer) error { - enis := make([]*pluginapi.Device, count) - for i := 0; i < count; i++ { - enis[i] = &pluginapi.Device{ - ID: fmt.Sprintf("%v-%d", eniPlugin.name, i), - Health: pluginapi.Healthy, - } - } - - resp := &pluginapi.ListAndWatchResponse{ - Devices: enis, - } - err := stream.Send(resp) - log.Infof("Report resources: %v of %v", eniPlugin.name, count) - if err != nil { - log.Errorf("Send devices error: %v", err) - return err - } - return nil - } - - if err := sendResponse(count, stream); err != nil { - return err - } - ticker := time.NewTicker(period) - defer ticker.Stop() - for { - select { - case <-ticker.C: - count = eniPlugin.listFunc() - err := sendResponse(count, stream) - if err != nil { - return err - } - // Send new list when resource count changed - case <-eniPlugin.res.updateSignal: - count = eniPlugin.res.count - err := sendResponse(count, stream) - if err != nil { - return err - } - case <-eniPlugin.ctx.Done(): - return nil - } - } -} - -// Allocate does nothing, here we only return a void response. -func (eniPlugin ENIDevicePlugin) Allocate(_ context.Context, request *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) { - resp := pluginapi.AllocateResponse{ - ContainerResponses: []*pluginapi.ContainerAllocateResponse{}, - } - - for range request.GetContainerRequests() { - resp.ContainerResponses = append(resp.ContainerResponses, - &pluginapi.ContainerAllocateResponse{}, - ) - } - - return &resp, nil -} - -// PreStartContainer is not supported by this plugin. -func (eniPlugin ENIDevicePlugin) PreStartContainer(_ context.Context, _ *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) { - return &pluginapi.PreStartContainerResponse{}, nil -} - -// GetPreferredAllocation is not supported by this plugin. -func (eniPlugin ENIDevicePlugin) GetPreferredAllocation(_ context.Context, _ *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) { - return &pluginapi.PreferredAllocationResponse{}, nil -} diff --git a/pkg/deviceplugin/types.go b/pkg/deviceplugin/types.go new file mode 100644 index 0000000..3dc633b --- /dev/null +++ b/pkg/deviceplugin/types.go @@ -0,0 +1,63 @@ +// Copyright 2023 The Cello Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package deviceplugin + +import ( + "time" + + "golang.org/x/net/context" + "google.golang.org/grpc" + pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" +) + +// ResourcesName. +const ( + // VolcNameSpace is the resource namespace for VKE volcengine. + VolcNameSpace = "vke.volcengine.com/" + // ENIResourceName indicates exclusive ENI resource resourceName. + ENIResourceName = "eni" + // ENIIPResourceName indicates shared ENI ip addresses resource resourceName. + ENIIPResourceName = "eni-ip" + // BranchENIResourceName indicates truck ENI. + BranchENIResourceName = "branch-eni" +) + +const reportPeriod = time.Minute * 3 + +var ( + DevicePluginPath = pluginapi.DevicePluginPath + KubeletSocket = DevicePluginPath + "kubelet.sock" +) + +// Manager is the interface of device plugin manager. +type Manager interface { + AddPlugin(plugin Plugin) + Plugin(resourceName string) Plugin + Serve(stopCh chan struct{}) error + Stop() + Update(resourceName string, count int) error +} + +// Plugin is the interface for generic device plugin which can be managed by Manager. +type Plugin interface { + pluginapi.DevicePluginServer + Endpoint() string + ResourceName() string + Server() *grpc.Server + ResetServer() + SetContext(ctx context.Context) + Update(count int) +} diff --git a/pkg/k8s/k8s.go b/pkg/k8s/k8s.go index a5b74fa..db80826 100644 --- a/pkg/k8s/k8s.go +++ b/pkg/k8s/k8s.go @@ -43,6 +43,7 @@ import ( "k8s.io/client-go/util/retry" "github.com/volcengine/cello/pkg/utils/logger" + utilruntime "github.com/volcengine/cello/pkg/utils/runtime" "github.com/volcengine/cello/types" ) @@ -189,11 +190,7 @@ func (k *k8sManager) initPodInformer() { c := make(chan os.Signal, 2) signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) go func() { - defer func() { - if err := recover(); err != nil { - log.Errorf("SignalHandler panic, %v", err) - } - }() + defer utilruntime.HandleCrash(log) <-c cancel() <-c @@ -236,11 +233,7 @@ func (k *k8sManager) initConfigMapInformer() { c := make(chan os.Signal, 2) signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) go func() { - defer func() { - if err := recover(); err != nil { - log.Errorf("SignalHandler panic, %v", err) - } - }() + defer utilruntime.HandleCrash(log) <-c cancel() <-c diff --git a/pkg/metrics/ec2.go b/pkg/metrics/ec2.go index 776a4d6..6ac3c8b 100644 --- a/pkg/metrics/ec2.go +++ b/pkg/metrics/ec2.go @@ -293,3 +293,17 @@ func (m *MetricEC2Wrapper) UnassignIpv6Addresses(req *ec2.UnassignIpv6AddressesI } return resp, err } + +func (m *MetricEC2Wrapper) TagResources(req *vpc.TagResourcesInput) (*vpc.TagResourcesOutput, error) { + start := time.Now() + resp, err := m.parent.TagResources(req) + duration := MsSince(start) + OpenAPILatency.WithLabelValues(fmt.Sprintf("TagResources[%s]", volcengine.StringValue(req.ResourceType)), + fmt.Sprint(err != nil), CelloReqErrCode(err), CelloReqId(err)).Observe(duration) + if err != nil { + OpenAPIErrInc(fmt.Sprintf("TagResources[%s]", volcengine.StringValue(req.ResourceType)), err) + apiErr.RecordOpenAPIErrEvent(err, + apiErr.EventInfoField{Key: "API", Value: fmt.Sprintf("TagResources[%s]", volcengine.StringValue(req.ResourceType))}) + } + return resp, err +} diff --git a/pkg/provider/volcengine/cellohelper/instance.go b/pkg/provider/volcengine/cellohelper/instance.go index b6c8513..c54218b 100644 --- a/pkg/provider/volcengine/cellohelper/instance.go +++ b/pkg/provider/volcengine/cellohelper/instance.go @@ -43,11 +43,14 @@ type InstanceLimits struct { ENICustomer int // currently support only one TrunkENI *types.ENI + + Created int + Cordon bool } func (l *InstanceLimits) String() string { - return fmt.Sprintf("{ENITotal: %d, ENIQuota: %d, IPv4MaxPerENI: %d, IPv6MaxPerENI: %d, TrunkSupported: %t, ENICustomer: %d}", - l.ENITotal, l.ENIQuota, l.IPv4MaxPerENI, l.IPv6MaxPerENI, l.TrunkSupported, l.ENICustomer) + return fmt.Sprintf("{ENITotal: %d, ENIQuota: %d, IPv4MaxPerENI: %d, IPv6MaxPerENI: %d, TrunkSupported: %t, ENICustomer: %d, Created: %d, Cordon: %t}", + l.ENITotal, l.ENIQuota, l.IPv4MaxPerENI, l.IPv6MaxPerENI, l.TrunkSupported, l.ENICustomer, l.Created, l.Cordon) } // SupportTrunk support trunk or not. @@ -67,11 +70,22 @@ type InstanceLimitManager interface { Update() // UpdateTrunk update trunk eni to InstanceLimits UpdateTrunk(trunk *types.ENI) + // WatchUpdate watch update event + WatchUpdate(name string, watcher chan<- struct{}) + // CordonCreate cordon create eni + CordonCreate(name string) + // UnCordonCreate unCordon create eni + UnCordonCreate(name string) + // NotifyWatcher send a signal to all instance limit watcher + NotifyWatcher() } // ENIAvailable get quota minus the custom eni and primary eni. func (l *InstanceLimits) ENIAvailable() int { cnt := l.ENIQuota - 1 - l.ENICustomer + if l.Cordon { + cnt = l.Created + } if l.TrunkENI != nil { cnt -= 1 } @@ -83,10 +97,11 @@ func (l *InstanceLimits) BranchENI() int { } type defaultInstanceLimit struct { - lock sync.RWMutex - api VolcAPI - limit InstanceLimits - lastUpdate time.Time + lock sync.RWMutex + api VolcAPI + limit InstanceLimits + lastUpdate time.Time + eventWatchers []chan<- struct{} } var instanceLimitManager *defaultInstanceLimit @@ -114,56 +129,118 @@ func (m *defaultInstanceLimit) UpdateTrunk(trunk *types.ENI) { m.limit.TrunkENI = trunk } -func (m *defaultInstanceLimit) update() error { +func (m *defaultInstanceLimit) WatchUpdate(name string, watcher chan<- struct{}) { m.lock.Lock() defer m.lock.Unlock() + log.Infof("Component %s watch update", name) + m.eventWatchers = append(m.eventWatchers, watcher) +} - if time.Since(m.lastUpdate) < time.Minute { - return nil +func (m *defaultInstanceLimit) NotifyWatcher() { + m.lock.Lock() + defer m.lock.Unlock() + m.notifyWatcherLocked() +} + +func (m *defaultInstanceLimit) notifyWatcherLocked() { + log.Infof("Notify watcher due to limit update") + for _, watcher := range m.eventWatchers { + select { + case watcher <- struct{}{}: + default: + } } +} + +func (m *defaultInstanceLimit) updateLocked() error { log.Infof("InstanceLimit Updating") - limit, err := m.api.GetInstanceLimit() + newLimit, err := m.api.GetInstanceLimit() if err != nil { return err } oldLimit := m.limit.InstanceLimitsAttr emptyLimit := InstanceLimitsAttr{} - if oldLimit != emptyLimit && oldLimit != limit.InstanceLimitsAttr { + if oldLimit != emptyLimit && oldLimit != newLimit.InstanceLimitsAttr { _ = tracing.RecordNodeEvent(v1.EventTypeWarning, tracing.EventInstanceQuotaUpdated, - fmt.Sprintf("ECS instance quota updated from %v to %v", oldLimit, *limit)) + fmt.Sprintf("ECS instance quota updated from %v to %v", oldLimit, *newLimit)) } created, err := m.api.GetAttachedENIs(true) if err != nil { return err } + + m.limit.Created = len(created) + total, err := m.api.GetTotalAttachedEniCnt() if err != nil { return err } - limit.ENICustomer = total - len(created) - 1 // contains primary eni + m.limit.ENICustomer = total - len(created) - 1 // contains primary eni for _, e := range created { if e.Trunk { - limit.TrunkENI = e + m.limit.TrunkENI = e break } } - log.Infof("InstanceLimit Updated %v", limit) - m.limit = *limit + m.limit.InstanceLimitsAttr = newLimit.InstanceLimitsAttr + log.Infof("InstanceLimit Updated %s", m.limit.String()) m.lastUpdate = time.Now() + m.notifyWatcherLocked() return nil } +func (m *defaultInstanceLimit) update() error { + m.lock.Lock() + defer m.lock.Unlock() + + if time.Since(m.lastUpdate) < time.Minute { + return nil + } + if err := m.updateLocked(); err != nil { + return err + } + return nil +} + +func (m *defaultInstanceLimit) CordonCreate(name string) { + m.lock.Lock() + defer m.lock.Unlock() + + if m.limit.Cordon { + return + } + log.Infof("Cordon eni create by %s", name) + if err := m.updateLocked(); err != nil { + log.Errorf("Update InstanceLimit failed, %v", err) + return + } + + m.limit.Cordon = true +} + +func (m *defaultInstanceLimit) UnCordonCreate(name string) { + m.lock.Lock() + defer m.lock.Unlock() + if !m.limit.Cordon { + return + } + log.Infof("UnCordon eni create by %s", name) + m.limit.Cordon = false + m.notifyWatcherLocked() +} + func NewInstanceLimitManager(api VolcAPI) (InstanceLimitManager, error) { if instanceLimitManager != nil { return instanceLimitManager, nil } instanceLimitManager = &defaultInstanceLimit{ - lock: sync.RWMutex{}, - api: api, + lock: sync.RWMutex{}, + api: api, + eventWatchers: []chan<- struct{}{}, } if err := instanceLimitManager.update(); err != nil { return nil, err diff --git a/pkg/provider/volcengine/cellohelper/volc_api.go b/pkg/provider/volcengine/cellohelper/volc_api.go index 53f1f4d..1d15d3f 100644 --- a/pkg/provider/volcengine/cellohelper/volc_api.go +++ b/pkg/provider/volcengine/cellohelper/volc_api.go @@ -25,13 +25,12 @@ import ( "time" "github.com/containernetworking/plugins/pkg/ip" - v1 "k8s.io/api/core/v1" - k8sErr "k8s.io/apimachinery/pkg/util/errors" - "k8s.io/apimachinery/pkg/util/wait" - "github.com/volcengine/volcengine-go-sdk/service/ecs" "github.com/volcengine/volcengine-go-sdk/service/vpc" "github.com/volcengine/volcengine-go-sdk/volcengine" + v1 "k8s.io/api/core/v1" + k8sErr "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/wait" "github.com/volcengine/cello/pkg/backoff" "github.com/volcengine/cello/pkg/config" @@ -126,7 +125,7 @@ func (e *VolcApiImpl) deleteENI(eniID string) error { return true, nil } errCodes = &apiErr.OpenApiErrCodeChain{} - if errCodes.WithPublicErrCodes().WithErrCodes(apiErr.InvalidEniInvalidStatus, apiErr.InvalidVpcInvalidStatus).ErrChainEqual(err) { + if errCodes.WithPublicErrCodes().WithErrCodes(apiErr.InvalidVpcInvalidStatus).ErrChainEqual(err) { return false, err } return false, nil @@ -166,8 +165,7 @@ func (e *VolcApiImpl) freeENI(eniID string, sleepDelayAfterDetach time.Duration) return true, nil } errCodes.WithPublicErrCodes().WithErrCodes(apiErr.InvalidVpcInvalidStatus, - apiErr.InvalidEniIdNotFound, apiErr.InvalidEniInstanceMismatch, - apiErr.InvalidEniInvalidStatus) + apiErr.InvalidEniIdNotFound, apiErr.InvalidEniInstanceMismatch) if err == nil || errCodes.ErrChainEqual(err) { return false, err @@ -262,7 +260,7 @@ func (e *VolcApiImpl) attachENI(eniID string) (*ec2.DescribeNetworkInterfaceAttr NetworkInterfaceId: volcengine.String(eniID), }) errCodes := &apiErr.OpenApiErrCodeChain{} - if errCodes.WithPublicErrCodes().WithErrCodes(apiErr.InvalidVpcInvalidStatus, apiErr.InvalidEniInvalidStatus, + if errCodes.WithPublicErrCodes().WithErrCodes(apiErr.InvalidVpcInvalidStatus, apiErr.LimitExceededEnisPerInstance).ErrChainEqual(err) { return false, err } @@ -511,7 +509,7 @@ func (e *VolcApiImpl) AllocIPAddresses(eniID, eniMac string, v4Cnt, v6Cnt int) ( return true, nil } errCodes := &apiErr.OpenApiErrCodeChain{} - if errCodes.WithPublicErrCodes().WithErrCodes(apiErr.InvalidVpcInvalidStatus, apiErr.InvalidEniInvalidStatus, apiErr.InsufficientIpInSubnet, + if errCodes.WithPublicErrCodes().WithErrCodes(apiErr.InvalidVpcInvalidStatus, apiErr.InsufficientIpInSubnet, apiErr.LimitExceededPrivateIpsPerEni, apiErr.QuotaExceededSecurityGroupIp).ErrChainEqual(err) { return false, err } @@ -562,7 +560,7 @@ func (e *VolcApiImpl) AllocIPAddresses(eniID, eniMac string, v4Cnt, v6Cnt int) ( return true, nil } errCodes := &apiErr.OpenApiErrCodeChain{} - if errCodes.WithPublicErrCodes().WithErrCodes(apiErr.InvalidVpcInvalidStatus, apiErr.InvalidEniInvalidStatus, apiErr.InsufficientIpInSubnet, + if errCodes.WithPublicErrCodes().WithErrCodes(apiErr.InvalidVpcInvalidStatus, apiErr.InsufficientIpInSubnet, apiErr.LimitExceededIpv6AddressesPerEni, apiErr.InvalidSubnetDisableIpv6, apiErr.QuotaExceededSecurityGroupIp).ErrChainEqual(err) { return false, err } @@ -656,7 +654,7 @@ func (e *VolcApiImpl) deallocIPAddressesWithLocked(eniID, eniMac string, ipv4s, return true, nil } errCodes := &apiErr.OpenApiErrCodeChain{} - if errCodes.WithPublicErrCodes().WithErrCodes(apiErr.InvalidVpcInvalidStatus, apiErr.InvalidEniInvalidStatus).ErrChainEqual(err) { + if errCodes.WithPublicErrCodes().WithErrCodes(apiErr.InvalidVpcInvalidStatus).ErrChainEqual(err) { return false, err } errCodes = &apiErr.OpenApiErrCodeChain{} @@ -683,7 +681,7 @@ func (e *VolcApiImpl) deallocIPAddressesWithLocked(eniID, eniMac string, ipv4s, return true, nil } errCodes := &apiErr.OpenApiErrCodeChain{} - if errCodes.WithPublicErrCodes().WithErrCodes(apiErr.InvalidVpcInvalidStatus, apiErr.InvalidEniInvalidStatus).ErrChainEqual(err) { + if errCodes.WithPublicErrCodes().WithErrCodes(apiErr.InvalidVpcInvalidStatus).ErrChainEqual(err) { return false, err } errCodes = &apiErr.OpenApiErrCodeChain{} diff --git a/pkg/provider/volcengine/ec2/client.go b/pkg/provider/volcengine/ec2/client.go index c40c571..0600213 100644 --- a/pkg/provider/volcengine/ec2/client.go +++ b/pkg/provider/volcengine/ec2/client.go @@ -68,8 +68,13 @@ type APIGroupECS interface { DescribeInstanceTypes(input *ecs.DescribeInstanceTypesInput) (*DescribeInstanceTypesOutput, error) } +type APIGroupTag interface { + TagResources(input *vpc.TagResourcesInput) (*vpc.TagResourcesOutput, error) +} + type EC2 interface { APIGroupENI APIGroupSubnet APIGroupECS + APIGroupTag } diff --git a/pkg/provider/volcengine/ec2/clientwrapper.go b/pkg/provider/volcengine/ec2/clientwrapper.go index d7d7e90..3cd8b7f 100644 --- a/pkg/provider/volcengine/ec2/clientwrapper.go +++ b/pkg/provider/volcengine/ec2/clientwrapper.go @@ -31,6 +31,7 @@ import ( apiErr "github.com/volcengine/cello/pkg/provider/volcengine/cellohelper/errors" "github.com/volcengine/cello/pkg/provider/volcengine/credential" "github.com/volcengine/cello/pkg/utils/logger" + "github.com/volcengine/cello/pkg/version" ) type ClientSet struct { @@ -207,6 +208,14 @@ func (c *ClientSet) DescribeSubnetAttributes(input *vpc.DescribeSubnetAttributes return output, nil } +func (c *ClientSet) TagResources(input *vpc.TagResourcesInput) (*vpc.TagResourcesOutput, error) { + output, err := c.VpcSvc.TagResourcesWithContext(context.TODO(), input) + if err != nil || output.Metadata.Error != nil { + return output, apiErr.NewAPIRequestErr(output.Metadata, err) + } + return output, nil +} + func NewClient(region, endpoint string, credentialProvider credential.Provider) *ClientSet { config := volcengine.NewConfig(). WithRegion(region). @@ -218,7 +227,8 @@ func NewClient(region, endpoint string, credentialProvider credential.Provider) cred := credentialProvider.Get() return credentials.NewStaticCredentials(cred.AccessKeyId, cred.SecretAccessKey, cred.SessionToken), volcengine.String(region) }). - WithEndpoint(volcengineutil.NewEndpoint().WithCustomerEndpoint(endpoint).GetEndpoint()) + WithEndpoint(volcengineutil.NewEndpoint().WithCustomerEndpoint(endpoint).GetEndpoint()). + WithExtraUserAgent(volcengine.String(version.UserAgent())) if logger.GetLogLevel() == "trace" { config = config.WithLogger(volcengine.NewDefaultLogger()). diff --git a/pkg/provider/volcengine/ec2/mock/ec2_mocks.go b/pkg/provider/volcengine/ec2/mock/ec2_mocks.go index 64a4b8a..1fb6b69 100644 --- a/pkg/provider/volcengine/ec2/mock/ec2_mocks.go +++ b/pkg/provider/volcengine/ec2/mock/ec2_mocks.go @@ -229,6 +229,21 @@ func (mr *MockEC2MockRecorder) DetachNetworkInterface(arg0 interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DetachNetworkInterface", reflect.TypeOf((*MockEC2)(nil).DetachNetworkInterface), arg0) } +// TagResources mocks base method +func (m *MockEC2) TagResources(arg0 *vpc.TagResourcesInput) (*vpc.TagResourcesOutput, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TagResources", arg0) + ret0, _ := ret[0].(*vpc.TagResourcesOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// TagResources indicates an expected call of TagResources +func (mr *MockEC2MockRecorder) TagResources(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TagResources", reflect.TypeOf((*MockEC2)(nil).TagResources), arg0) +} + // UnAssignPrivateIpAddress mocks base method func (m *MockEC2) UnAssignPrivateIpAddress(arg0 *vpc.UnassignPrivateIpAddressesInput) (*vpc.UnassignPrivateIpAddressesOutput, error) { m.ctrl.T.Helper() diff --git a/pkg/utils/runtime/runtime.go b/pkg/utils/runtime/runtime.go new file mode 100644 index 0000000..e33f7a5 --- /dev/null +++ b/pkg/utils/runtime/runtime.go @@ -0,0 +1,25 @@ +package runtime + +import ( + "github.com/volcengine/cello/pkg/utils/logger" + "runtime" +) + +func logPanic(r interface{}, log logger.Logger) { + // Same as stdlib http server code. Manually allocate stack trace buffer size + // to prevent excessively large logs + const size = 64 << 10 + stacktrace := make([]byte, size) + stacktrace = stacktrace[:runtime.Stack(stacktrace, false)] + if _, ok := r.(string); ok { + log.Errorf("Observed a panic: %s\n%s\n", r, stacktrace) + } else { + log.Errorf("Observed a panic: %#v (%v)\n%s", r, r, stacktrace) + } +} + +func HandleCrash(log logger.Logger) { + if r := recover(); r != nil { + logPanic(r, log) + } +} diff --git a/pkg/version/version.go b/pkg/version/version.go index 5f9c52f..45525bc 100644 --- a/pkg/version/version.go +++ b/pkg/version/version.go @@ -15,4 +15,15 @@ package version -var Version = "v1.6.2" +import ( + "fmt" + "runtime" +) + +var Version = "v1.6.8" +var ComponentName = "cello-agent" +var GitCommit = "unset" + +func UserAgent() string { + return fmt.Sprintf("%s/%s (%s/%s)/%s", ComponentName, Version, runtime.GOOS, runtime.GOARCH, GitCommit) +} diff --git a/script/bootstrap/entrypoint-cello.sh b/script/bootstrap/entrypoint-cello.sh deleted file mode 100644 index 541cd66..0000000 --- a/script/bootstrap/entrypoint-cello.sh +++ /dev/null @@ -1,10 +0,0 @@ -#!/usr/bin/env bash - -CURDIR=$( - cd $(dirname $0) || exit - pwd -) - -cd "${CURDIR}" || exit -cd /cello || exit -./cello-agent diff --git a/script/bootstrap/entrypoint-cilium.sh b/script/bootstrap/entrypoint-cilium.sh deleted file mode 100644 index 12771f9..0000000 --- a/script/bootstrap/entrypoint-cilium.sh +++ /dev/null @@ -1,73 +0,0 @@ -#!/usr/bin/env bash - -# 确保脚本是可重入的 - -init_node_bpf() { - nsenter -t 1 -m -- bash -c ' - mount | grep "/sys/fs/bpf type bpf" || { - # Mount the filesystem until next reboot - echo "Mounting BPF filesystem..." - mount bpffs /sys/fs/bpf -t bpf - - echo "Link information:" - ip link - - echo "Routing table:" - ip route - - echo "Addressing:" - ip -4 a - ip -6 a -# date > /tmp/cilium-bootstrap-time - echo "Node initialization complete" -}' -} - -set -o errexit -set -o nounset - -# check kernel version & enable cilium -read KERNEL_MAJOR_VERSION KERNEL_MINOR_VERSION < <(uname -r | awk -F . '{print $1,$2}') -# kernel version equal and above 4.19 -if { [ "$KERNEL_MAJOR_VERSION" -eq 4 ] && [ "$KERNEL_MINOR_VERSION" -ge 19 ]; } || - [ "$KERNEL_MAJOR_VERSION" -gt 4 ]; then - echo "Init node BPF" - init_node_bpf -else - echo "Linux kernel version <= 4.19, skipping cilium config" - exit 1 -fi - -#echo "install cni" -#mkdir -p /opt/cni/bin/ /etc/cni/net.d/ - -#install /usr/bin/cilium-cni /opt/cni/bin/ -#chmod +x /opt/cni/bin/cilium-cni - -echo "disable rp_filter" -sysctl -w net.ipv4.conf.eth0.rp_filter=0 - -#限速 -#echo "modprobe sch_htb" -#modprobe sch_htb || echo "modprobe sch_htb failed" -echo "modprobe ipvlan" -modprobe ipvlan || echo "modprobe ipvlan failed" - -echo "[`date`] init.sh execute finish" > /tmp/init - -export DATASTORE_TYPE=kubernetes -if [ "$DATASTORE_TYPE" = "kubernetes" ]; then - if [ -z "$KUBERNETES_SERVICE_HOST" ]; then - echo "cilium need k8s datastore, but can not found [KUBERNETES_SERVICE_HOST] env, exiting" - exit 1 - fi -fi - -# 注册crd,更新ciliumnode ipam -mkdir -p /cilium && cd /cilium - -## run cello-cilium-hook -#/usr/bin/cello-cilium-hook 2>&1 1>ciliumnode.log & - -echo "run cilium-agent use config-dir: /etc/cilium/cilium-config" -exec cilium-agent --config-dir=/etc/cilium/cilium-config \ No newline at end of file diff --git a/script/bootstrap/install_env.sh b/script/bootstrap/install_env.sh index d292aeb..f44f83e 100644 --- a/script/bootstrap/install_env.sh +++ b/script/bootstrap/install_env.sh @@ -2,6 +2,7 @@ set -e set -x +set -o nounset CURDIR=$( cd $(dirname $0) || exit @@ -10,6 +11,45 @@ CURDIR=$( cd "${CURDIR}" || exit +init_node_bpf() { + nsenter -t 1 -m -- bash -c ' + mount | grep "/sys/fs/bpf type bpf" || { + # Mount the filesystem until next reboot + echo "Mounting BPF filesystem..." + mount bpffs /sys/fs/bpf -t bpf + + echo "Link information:" + ip link + + echo "Routing table:" + ip -4 route + ip -6 route + + echo "Addressing:" + ip -4 addr + ip -6 addr +# date > /tmp/cilium-bootstrap-time + echo "Node initialization complete" +}' +} + +# check kernel version & enable cilium +read KERNEL_MAJOR_VERSION KERNEL_MINOR_VERSION < <(uname -r | awk -F . '{print $1,$2}') +# kernel version equal and above 4.19 +if { [ "$KERNEL_MAJOR_VERSION" -eq 4 ] && [ "$KERNEL_MINOR_VERSION" -ge 19 ]; } || + [ "$KERNEL_MAJOR_VERSION" -gt 4 ]; then + echo "Init node BPF" + init_node_bpf +else + echo "Linux kernel version <= 4.19, cant install cilium" + exit 1 +fi + +echo "modprobe ipvlan" +modprobe ipvlan || echo "modprobe ipvlan failed" +echo "modprobe sch_htb" +modprobe sch_htb || echo "modprobe sch_htb failed" + # install CNIs /bin/cp -f /etc/cello/net.d/* /etc/cni/net.d /bin/cp -f /cello/cello-cni /opt/cni/bin