Skip to content

Commit

Permalink
fix remote terminal leak
Browse files Browse the repository at this point in the history
Signed-off-by: Xuhui zhang <[email protected]>
  • Loading branch information
zxh326 committed Jul 29, 2024
1 parent e835614 commit 32adf84
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 104 deletions.
115 changes: 11 additions & 104 deletions pkg/dashboard/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package dashboard

import (
"context"
"encoding/json"
"io"
"strconv"
"strings"
Expand All @@ -32,13 +31,12 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/klog"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/juicedata/juicefs-csi-driver/pkg/config"
"github.com/juicedata/juicefs-csi-driver/pkg/util"
"github.com/juicedata/juicefs-csi-driver/pkg/util/resource"
)

type PodExtra struct {
Expand Down Expand Up @@ -775,87 +773,18 @@ func (api *API) watchPodLogs() gin.HandlerFunc {
}
}

type terminalSession struct {
conn *websocket.Conn
sizeCh chan *remotecommand.TerminalSize
}

func (t *terminalSession) Write(p []byte) (int, error) {
err := websocket.Message.Send(t.conn, string(p))
return len(p), err
}

func (t *terminalSession) Read(p []byte) (int, error) {
var msgStr []byte
var msg struct {
Rows uint16 `json:"rows"`
Cols uint16 `json:"cols"`
Data string `json:"data"`
Type string `json:"type"`
}
err := websocket.Message.Receive(t.conn, &msgStr)
if err != nil {
return 0, err
}
if err := json.Unmarshal(msgStr, &msg); err != nil {
return copy(p, msgStr), nil
}
switch msg.Type {
case "stdin":
return copy(p, []byte(msg.Data)), nil
case "resize":
select {
case t.sizeCh <- &remotecommand.TerminalSize{
Width: msg.Cols,
Height: msg.Rows,
}:
default:
}
}
return 0, nil
}

func (t *terminalSession) Next() *remotecommand.TerminalSize {
return <-t.sizeCh
}

func (api *API) execPod() gin.HandlerFunc {
return func(c *gin.Context) {
namespace := c.Param("namespace")
name := c.Param("name")
container := c.Param("container")
websocket.Handler(func(ws *websocket.Conn) {
defer ws.Close()
terminal := &terminalSession{
conn: ws,
sizeCh: make(chan *remotecommand.TerminalSize),
}
req := api.client.CoreV1().RESTClient().Post().
Resource("pods").
Name(name).
Namespace(namespace).SubResource("exec")
req.VersionedParams(&corev1.PodExecOptions{
Command: []string{"sh", "-c", "bash || sh"},
Container: container,
Stdin: true,
Stdout: true,
Stderr: true,
TTY: true,
}, scheme.ParameterCodec)

executor, err := remotecommand.NewSPDYExecutor(api.kubeconfig, "POST", req.URL())
if err != nil {
klog.Error("Failed to create SPDY executor: ", err)
return
}
if err := executor.Stream(remotecommand.StreamOptions{
Stdin: terminal,
Stdout: terminal,
Stderr: terminal,
Tty: true,
TerminalSizeQueue: terminal,
}); err != nil {
klog.Error("Failed to stream: ", err)
terminal := resource.NewTerminalSession(ws, resource.EndOfTransmissionCTRLD)
if err := resource.ExecInPod(
api.client, api.kubeconfig, terminal, namespace, name, container,
[]string{"sh", "-c", "bash || sh"}); err != nil {
klog.Error("Failed to exec in pod: ", err)
return
}
}).ServeHTTP(c.Writer, c.Request)
Expand All @@ -869,10 +798,7 @@ func (api *API) watchMountPodAccessLog() gin.HandlerFunc {
container := c.Param("container")
websocket.Handler(func(ws *websocket.Conn) {
defer ws.Close()
terminal := &terminalSession{
conn: ws,
sizeCh: make(chan *remotecommand.TerminalSize),
}
terminal := resource.NewTerminalSession(ws, resource.EndOfTransmissionCTRLC)
mountpod, err := api.client.CoreV1().Pods(namespace).Get(c, name, metav1.GetOptions{})
if err != nil {
klog.Error("Failed to get mount pod: ", err)
Expand All @@ -883,29 +809,10 @@ func (api *API) watchMountPodAccessLog() gin.HandlerFunc {
klog.Error("Failed to get mount path: ", err)
return
}
req := api.client.CoreV1().RESTClient().Post().
Resource("pods").
Name(name).
Namespace(namespace).SubResource("exec")
req.VersionedParams(&corev1.PodExecOptions{
Command: []string{"sh", "-c", "cat " + mntPath + "/.accesslog"},
Container: container,
Stdin: true,
Stdout: true,
Stderr: true,
}, scheme.ParameterCodec)

executor, err := remotecommand.NewSPDYExecutor(api.kubeconfig, "POST", req.URL())
if err != nil {
klog.Error("Failed to create SPDY executor: ", err)
return
}
if err := executor.Stream(remotecommand.StreamOptions{
Stdin: terminal,
Stdout: terminal,
Stderr: terminal,
}); err != nil {
klog.Error("Failed to stream: ", err)
if err := resource.ExecInPod(
api.client, api.kubeconfig, terminal, namespace, name, container,
[]string{"sh", "-c", "cat " + mntPath + "/.accesslog"}); err != nil {
klog.Error("Failed to exec in pod: ", err)
return
}
}).ServeHTTP(c.Writer, c.Request)
Expand Down
129 changes: 129 additions & 0 deletions pkg/util/resource/terminal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
Copyright 2024 Juicedata Inc
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package resource

import (
"encoding/json"
"io"

"golang.org/x/net/websocket"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/klog"
)

const (
EndOfTransmissionCTRLD = "\u0004"
EndOfTransmissionCTRLC = "\u0003"
)

type terminalSession struct {
conn *websocket.Conn
sizeCh chan *remotecommand.TerminalSize
endOfTransmission string
}

func NewTerminalSession(conn *websocket.Conn, endOfTransmission string) *terminalSession {
return &terminalSession{
conn: conn,
sizeCh: make(chan *remotecommand.TerminalSize),
endOfTransmission: endOfTransmission,
}
}

func (t *terminalSession) Write(p []byte) (int, error) {
err := websocket.Message.Send(t.conn, string(p))
return len(p), err
}

func (t *terminalSession) Read(p []byte) (int, error) {
var msgStr []byte
var msg struct {
Rows uint16 `json:"rows"`
Cols uint16 `json:"cols"`
Data string `json:"data"`
Type string `json:"type"`
}
err := websocket.Message.Receive(t.conn, &msgStr)
if err != nil {
return copy(p, t.endOfTransmission), err
}
if err := json.Unmarshal(msgStr, &msg); err != nil {
return copy(p, t.endOfTransmission), nil
}
switch msg.Type {
case "stdin":
return copy(p, []byte(msg.Data)), nil
case "resize":
select {
case t.sizeCh <- &remotecommand.TerminalSize{
Width: msg.Cols,
Height: msg.Rows,
}:
default:
}
default:
return copy(p, t.endOfTransmission), nil
}
return 0, nil
}

func (t *terminalSession) Next() *remotecommand.TerminalSize {
return <-t.sizeCh
}

type Handler interface {
io.Reader
io.Writer
remotecommand.TerminalSizeQueue
}

func ExecInPod(client kubernetes.Interface, cfg *rest.Config, h Handler, namespace, name, container string, cmd []string) error {
req := client.CoreV1().RESTClient().Post().
Resource("pods").
Name(name).
Namespace(namespace).SubResource("exec")
req.VersionedParams(&corev1.PodExecOptions{
Command: cmd,
Container: container,
Stdin: true,
Stdout: true,
Stderr: true,
TTY: true,
}, scheme.ParameterCodec)

executor, err := remotecommand.NewSPDYExecutor(cfg, "POST", req.URL())
if err != nil {
klog.Error("Failed to create SPDY executor: ", err)
return err
}
if err := executor.Stream(remotecommand.StreamOptions{
Stdin: h,
Stdout: h,
Stderr: h,
TerminalSizeQueue: h,
Tty: true,
}); err != nil {
klog.Error("Failed to stream: ", err)
return err
}

return nil
}

0 comments on commit 32adf84

Please sign in to comment.