From bfffe763aa180f3848ae6c12abcb7e9b9eb655ff Mon Sep 17 00:00:00 2001 From: Sam Yuan Date: Thu, 12 Dec 2024 13:58:38 +0800 Subject: [PATCH] [fix]: enhance kubelet package impl Signed-off-by: Sam Yuan --- pkg/kubelet/kubelet_pod_lister.go | 42 ++++++++---- pkg/kubelet/kubelet_pod_lister_test.go | 89 ++++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 11 deletions(-) create mode 100644 pkg/kubelet/kubelet_pod_lister_test.go diff --git a/pkg/kubelet/kubelet_pod_lister.go b/pkg/kubelet/kubelet_pod_lister.go index 28a1161dc2..3364bedc6a 100644 --- a/pkg/kubelet/kubelet_pod_lister.go +++ b/pkg/kubelet/kubelet_pod_lister.go @@ -17,7 +17,6 @@ limitations under the License. package kubelet import ( - "context" "crypto/tls" "encoding/json" "fmt" @@ -37,8 +36,9 @@ const ( ) var ( - podURL string - client http.Client + podURL string + client http.Client + bearerToken string ) func init() { @@ -55,19 +55,20 @@ func init() { client = http.Client{} } -func httpGet(url string) (*http.Response, error) { - objToken, err := os.ReadFile(saPath) +func loadToken(path string) (string, error) { + objToken, err := os.ReadFile(path) if err != nil { - return nil, fmt.Errorf("failed to read from %q: %v", saPath, err) + return "", fmt.Errorf("failed to read from %q: %v", path, err) } - token := string(objToken) + return "Bearer " + string(objToken), nil +} - var bearer = "Bearer " + token - req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, url, http.NoBody) +func doFetchPod(url string) (*http.Response, error) { + req, err := http.NewRequest(http.MethodGet, url, http.NoBody) if err != nil { return nil, err } - req.Header.Add("Authorization", bearer) + req.Header.Add("Authorization", bearerToken) resp, err := client.Do(req) if err != nil { return nil, fmt.Errorf("failed to get response from %q: %v", url, err) @@ -75,9 +76,28 @@ func httpGet(url string) (*http.Response, error) { return resp, err } +func httpGet(path, url string) (*http.Response, error) { + var err error + if bearerToken == "" || len(bearerToken) == 0 { + bearerToken, err = loadToken(path) + if err != nil { + return nil, fmt.Errorf("failed to read from %q: %v", path, err) + } + } + resp, err := doFetchPod(url) + if resp != nil && resp.StatusCode > 399 && resp.StatusCode < 500 { // if response in 4xx retry once + bearerToken, err = loadToken(path) + if err != nil { + return nil, fmt.Errorf("failed to read from %q: %v", path, err) + } + resp, err = doFetchPod(url) + } + return resp, err +} + // ListPods obtains PodList func (k *KubeletPodLister) ListPods() (*[]corev1.Pod, error) { - resp, err := httpGet(podURL) + resp, err := httpGet(saPath, podURL) if err != nil { return nil, fmt.Errorf("failed to get response: %v", err) } diff --git a/pkg/kubelet/kubelet_pod_lister_test.go b/pkg/kubelet/kubelet_pod_lister_test.go new file mode 100644 index 0000000000..dd689a022a --- /dev/null +++ b/pkg/kubelet/kubelet_pod_lister_test.go @@ -0,0 +1,89 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "fmt" + "net/http" + "net/http/httptest" + "os" + "sync" + "testing" + + . "github.com/onsi/gomega" +) + +func TestDoFetchPod(t *testing.T) { + g := NewWithT(t) + svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "dummy") + })) + defer svr.Close() + res, err := doFetchPod(svr.URL) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(res.StatusCode).To(Equal(http.StatusOK)) + bearerToken = "dummy" + res, err = httpGet("", svr.URL) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(res.StatusCode).To(Equal(http.StatusOK)) +} + +func TestDoFetchPodWithError(t *testing.T) { + g := NewWithT(t) + svr := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "dummy") + })) + res, err := doFetchPod(svr.URL) + g.Expect(err).To(HaveOccurred()) + g.Expect(res).To(BeNil()) + bearerToken = "dummy" + res, err = httpGet("", svr.URL) + g.Expect(err).To(HaveOccurred()) + g.Expect(res).To(BeNil()) +} + +func TestLoadToken(t *testing.T) { + g := NewWithT(t) + bearerToken = "" + tmpDir, err := os.MkdirTemp("", "kepler-tmp-") + g.Expect(err).NotTo(HaveOccurred()) + defer os.RemoveAll(tmpDir) + + TokenFile, err := os.CreateTemp(tmpDir, "kubeletToken") + g.Expect(err).NotTo(HaveOccurred()) + _, err = TokenFile.Write([]byte("token")) + g.Expect(err).NotTo(HaveOccurred()) + TokenFile.Close() + + bearerToken, err := loadToken(TokenFile.Name()) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(bearerToken).To(Equal("Bearer token")) + + bearerToken = "" + var once sync.Once + svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + once.Do(func() { + w.WriteHeader(http.StatusUnauthorized) + }) + fmt.Fprintf(w, "dummy") + })) + defer svr.Close() + + res, err := httpGet(TokenFile.Name(), svr.URL) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(res.StatusCode).To(Equal(http.StatusOK)) +}