Skip to content

Commit

Permalink
fix: terminal stuck in connecting state (#4989)
Browse files Browse the repository at this point in the history
* closing channel after write operation

* removing close

* using buffered channel

* wip: making done channel bufferred

* terminal racecondition and deadlock fix

* wire run

* removing done send call

* updating bound channel send function
  • Loading branch information
iamayushm authored and kishan789dev committed May 13, 2024
1 parent c4b3d16 commit d5984b7
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 17 deletions.
53 changes: 37 additions & 16 deletions pkg/terminal/terminalSesion.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (

const END_OF_TRANSMISSION = "\u0004"
const ProcessExitedMsg = "Process exited"
const ProcessTimedOut = "Process timedOut"

// PtyHandler is what remotecommand expects from a pty
type PtyHandler interface {
Expand Down Expand Up @@ -185,23 +186,53 @@ func (sm *SessionMap) SetTerminalSessionStartTime(sessionId string) {
}
}

func (sm *SessionMap) setAndSendSignal(sessionId string, session sockjs.Session) {
sm.Lock.Lock()
defer sm.Lock.Unlock()
terminalSession, ok := sm.Sessions[sessionId]
if ok && terminalSession.id == "" {
log.Printf("handleTerminalSession: can't find session '%s'", sessionId)
session.Close(http.StatusGone, fmt.Sprintf("handleTerminalSession: can't find session '%s'", sessionId))
return
} else if ok {
terminalSession.sockJSSession = session
sm.Sessions[sessionId] = terminalSession

select {
case terminalSession.bound <- nil:
log.Printf("message sent on bound channel for sessionId : %s", sessionId)
default:
// if a request from the front end is not received within a particular time frame, and no one is reading from the bound channel, we will ignore sending on the bound channel.
log.Printf("skipping send on bound, channel receiver possibly timed out. sessionId: %s", sessionId)
}

}
}

// Close shuts down the SockJS connection and sends the status code and reason to the client
// Can happen if the process exits or if there is an error starting up the process
// For now the status code is unused and reason is shown to the user (unless "")
func (sm *SessionMap) Close(sessionId string, status uint32, reason string) {

sm.Lock.Lock()
defer sm.Lock.Unlock()

terminalSession := sm.Sessions[sessionId]

if terminalSession.sockJSSession != nil {

err := terminalSession.sockJSSession.Close(status, reason)
if err != nil {
log.Println(err)
}

close(terminalSession.doneChan)

isErroredConnectionTermination := isConnectionClosedByError(status)
middleware.IncTerminalSessionRequestCounter(SessionTerminated, strconv.FormatBool(isErroredConnectionTermination))
middleware.RecordTerminalSessionDurationMetrics(terminalSession.podName, terminalSession.namespace, terminalSession.clusterId, time.Since(terminalSession.startedOn).Seconds())
close(terminalSession.doneChan)
terminalSession.contextCancelFunc()
close(terminalSession.bound)
delete(sm.Sessions, sessionId)
}

Expand All @@ -219,10 +250,9 @@ var terminalSessions = SessionMap{Sessions: make(map[string]TerminalSession)}
// handleTerminalSession is Called by net/http for any new /api/sockjs connections
func handleTerminalSession(session sockjs.Session) {
var (
buf string
err error
msg TerminalMessage
terminalSession TerminalSession
buf string
err error
msg TerminalMessage
)

if buf, err = session.Recv(); err != nil {
Expand All @@ -241,15 +271,8 @@ func handleTerminalSession(session sockjs.Session) {
return
}

if terminalSession = terminalSessions.Get(msg.SessionID); terminalSession.id == "" {
log.Printf("handleTerminalSession: can't find session '%s'", msg.SessionID)
session.Close(http.StatusGone, fmt.Sprintf("handleTerminalSession: can't find session '%s'", msg.SessionID))
return
}
terminalSessions.setAndSendSignal(msg.SessionID, session)

terminalSession.sockJSSession = session
terminalSessions.Set(msg.SessionID, terminalSession)
terminalSession.bound <- nil
}

type SocketConfig struct {
Expand Down Expand Up @@ -381,7 +404,6 @@ func WaitForTerminal(k8sClient kubernetes.Interface, cfg *rest.Config, request *
timedCtx, _ := context.WithTimeout(sessionCtx, 60*time.Second)
select {
case <-session.bound:
close(session.bound)

var err error
if isValidShell(validShells, request.Shell) {
Expand All @@ -407,8 +429,7 @@ func WaitForTerminal(k8sClient kubernetes.Interface, cfg *rest.Config, request *
terminalSessions.Close(request.SessionId, 1, ProcessExitedMsg)
case <-timedCtx.Done():
// handle case when connection has not been initiated from FE side within particular time
close(session.bound)
terminalSessions.Close(request.SessionId, 1, ProcessExitedMsg)
terminalSessions.Close(request.SessionId, 1, ProcessTimedOut)
}
}

Expand Down
2 changes: 1 addition & 1 deletion wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit d5984b7

Please sign in to comment.