From 32adf843c35da16a1a746f85ff6cc8acaa25e713 Mon Sep 17 00:00:00 2001 From: Xuhui zhang Date: Fri, 26 Jul 2024 17:21:58 +0800 Subject: [PATCH] fix remote terminal leak Signed-off-by: Xuhui zhang --- pkg/dashboard/pod.go | 115 +++--------------------------- pkg/util/resource/terminal.go | 129 ++++++++++++++++++++++++++++++++++ 2 files changed, 140 insertions(+), 104 deletions(-) create mode 100644 pkg/util/resource/terminal.go diff --git a/pkg/dashboard/pod.go b/pkg/dashboard/pod.go index ef51994ccd..539978d56b 100644 --- a/pkg/dashboard/pod.go +++ b/pkg/dashboard/pod.go @@ -18,7 +18,6 @@ package dashboard import ( "context" - "encoding/json" "io" "strconv" "strings" @@ -32,13 +31,12 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/remotecommand" "k8s.io/klog" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/juicedata/juicefs-csi-driver/pkg/config" "github.com/juicedata/juicefs-csi-driver/pkg/util" + "github.com/juicedata/juicefs-csi-driver/pkg/util/resource" ) type PodExtra struct { @@ -775,50 +773,6 @@ func (api *API) watchPodLogs() gin.HandlerFunc { } } -type terminalSession struct { - conn *websocket.Conn - sizeCh chan *remotecommand.TerminalSize -} - -func (t *terminalSession) Write(p []byte) (int, error) { - err := websocket.Message.Send(t.conn, string(p)) - return len(p), err -} - -func (t *terminalSession) Read(p []byte) (int, error) { - var msgStr []byte - var msg struct { - Rows uint16 `json:"rows"` - Cols uint16 `json:"cols"` - Data string `json:"data"` - Type string `json:"type"` - } - err := websocket.Message.Receive(t.conn, &msgStr) - if err != nil { - return 0, err - } - if err := json.Unmarshal(msgStr, &msg); err != nil { - return copy(p, msgStr), nil - } - switch msg.Type { - case "stdin": - return copy(p, []byte(msg.Data)), nil - case "resize": - select { - case t.sizeCh <- &remotecommand.TerminalSize{ - Width: msg.Cols, - Height: msg.Rows, - }: - default: - } - } - return 0, nil -} - -func (t *terminalSession) Next() *remotecommand.TerminalSize { - return <-t.sizeCh -} - func (api *API) execPod() gin.HandlerFunc { return func(c *gin.Context) { namespace := c.Param("namespace") @@ -826,36 +780,11 @@ func (api *API) execPod() gin.HandlerFunc { container := c.Param("container") websocket.Handler(func(ws *websocket.Conn) { defer ws.Close() - terminal := &terminalSession{ - conn: ws, - sizeCh: make(chan *remotecommand.TerminalSize), - } - req := api.client.CoreV1().RESTClient().Post(). - Resource("pods"). - Name(name). - Namespace(namespace).SubResource("exec") - req.VersionedParams(&corev1.PodExecOptions{ - Command: []string{"sh", "-c", "bash || sh"}, - Container: container, - Stdin: true, - Stdout: true, - Stderr: true, - TTY: true, - }, scheme.ParameterCodec) - - executor, err := remotecommand.NewSPDYExecutor(api.kubeconfig, "POST", req.URL()) - if err != nil { - klog.Error("Failed to create SPDY executor: ", err) - return - } - if err := executor.Stream(remotecommand.StreamOptions{ - Stdin: terminal, - Stdout: terminal, - Stderr: terminal, - Tty: true, - TerminalSizeQueue: terminal, - }); err != nil { - klog.Error("Failed to stream: ", err) + terminal := resource.NewTerminalSession(ws, resource.EndOfTransmissionCTRLD) + if err := resource.ExecInPod( + api.client, api.kubeconfig, terminal, namespace, name, container, + []string{"sh", "-c", "bash || sh"}); err != nil { + klog.Error("Failed to exec in pod: ", err) return } }).ServeHTTP(c.Writer, c.Request) @@ -869,10 +798,7 @@ func (api *API) watchMountPodAccessLog() gin.HandlerFunc { container := c.Param("container") websocket.Handler(func(ws *websocket.Conn) { defer ws.Close() - terminal := &terminalSession{ - conn: ws, - sizeCh: make(chan *remotecommand.TerminalSize), - } + terminal := resource.NewTerminalSession(ws, resource.EndOfTransmissionCTRLC) mountpod, err := api.client.CoreV1().Pods(namespace).Get(c, name, metav1.GetOptions{}) if err != nil { klog.Error("Failed to get mount pod: ", err) @@ -883,29 +809,10 @@ func (api *API) watchMountPodAccessLog() gin.HandlerFunc { klog.Error("Failed to get mount path: ", err) return } - req := api.client.CoreV1().RESTClient().Post(). - Resource("pods"). - Name(name). - Namespace(namespace).SubResource("exec") - req.VersionedParams(&corev1.PodExecOptions{ - Command: []string{"sh", "-c", "cat " + mntPath + "/.accesslog"}, - Container: container, - Stdin: true, - Stdout: true, - Stderr: true, - }, scheme.ParameterCodec) - - executor, err := remotecommand.NewSPDYExecutor(api.kubeconfig, "POST", req.URL()) - if err != nil { - klog.Error("Failed to create SPDY executor: ", err) - return - } - if err := executor.Stream(remotecommand.StreamOptions{ - Stdin: terminal, - Stdout: terminal, - Stderr: terminal, - }); err != nil { - klog.Error("Failed to stream: ", err) + if err := resource.ExecInPod( + api.client, api.kubeconfig, terminal, namespace, name, container, + []string{"sh", "-c", "cat " + mntPath + "/.accesslog"}); err != nil { + klog.Error("Failed to exec in pod: ", err) return } }).ServeHTTP(c.Writer, c.Request) diff --git a/pkg/util/resource/terminal.go b/pkg/util/resource/terminal.go new file mode 100644 index 0000000000..fdbbd6a94f --- /dev/null +++ b/pkg/util/resource/terminal.go @@ -0,0 +1,129 @@ +/* + Copyright 2024 Juicedata Inc + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package resource + +import ( + "encoding/json" + "io" + + "golang.org/x/net/websocket" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" + "k8s.io/klog" +) + +const ( + EndOfTransmissionCTRLD = "\u0004" + EndOfTransmissionCTRLC = "\u0003" +) + +type terminalSession struct { + conn *websocket.Conn + sizeCh chan *remotecommand.TerminalSize + endOfTransmission string +} + +func NewTerminalSession(conn *websocket.Conn, endOfTransmission string) *terminalSession { + return &terminalSession{ + conn: conn, + sizeCh: make(chan *remotecommand.TerminalSize), + endOfTransmission: endOfTransmission, + } +} + +func (t *terminalSession) Write(p []byte) (int, error) { + err := websocket.Message.Send(t.conn, string(p)) + return len(p), err +} + +func (t *terminalSession) Read(p []byte) (int, error) { + var msgStr []byte + var msg struct { + Rows uint16 `json:"rows"` + Cols uint16 `json:"cols"` + Data string `json:"data"` + Type string `json:"type"` + } + err := websocket.Message.Receive(t.conn, &msgStr) + if err != nil { + return copy(p, t.endOfTransmission), err + } + if err := json.Unmarshal(msgStr, &msg); err != nil { + return copy(p, t.endOfTransmission), nil + } + switch msg.Type { + case "stdin": + return copy(p, []byte(msg.Data)), nil + case "resize": + select { + case t.sizeCh <- &remotecommand.TerminalSize{ + Width: msg.Cols, + Height: msg.Rows, + }: + default: + } + default: + return copy(p, t.endOfTransmission), nil + } + return 0, nil +} + +func (t *terminalSession) Next() *remotecommand.TerminalSize { + return <-t.sizeCh +} + +type Handler interface { + io.Reader + io.Writer + remotecommand.TerminalSizeQueue +} + +func ExecInPod(client kubernetes.Interface, cfg *rest.Config, h Handler, namespace, name, container string, cmd []string) error { + req := client.CoreV1().RESTClient().Post(). + Resource("pods"). + Name(name). + Namespace(namespace).SubResource("exec") + req.VersionedParams(&corev1.PodExecOptions{ + Command: cmd, + Container: container, + Stdin: true, + Stdout: true, + Stderr: true, + TTY: true, + }, scheme.ParameterCodec) + + executor, err := remotecommand.NewSPDYExecutor(cfg, "POST", req.URL()) + if err != nil { + klog.Error("Failed to create SPDY executor: ", err) + return err + } + if err := executor.Stream(remotecommand.StreamOptions{ + Stdin: h, + Stdout: h, + Stderr: h, + TerminalSizeQueue: h, + Tty: true, + }); err != nil { + klog.Error("Failed to stream: ", err) + return err + } + + return nil +}