From d44eea10290dac719cc11e34b050ab61f68d10f3 Mon Sep 17 00:00:00 2001 From: Christian Weichel Date: Mon, 14 Sep 2020 08:40:47 +0000 Subject: [PATCH] [supervisor squash] Time out terminal listener --- components/supervisor/cmd/terminal-attach.go | 5 +- .../supervisor/pkg/terminal/terminal.go | 90 ++++++++++++++----- 2 files changed, 71 insertions(+), 24 deletions(-) diff --git a/components/supervisor/cmd/terminal-attach.go b/components/supervisor/cmd/terminal-attach.go index e4a76d421e6e13..b4cabeb768244f 100644 --- a/components/supervisor/cmd/terminal-attach.go +++ b/components/supervisor/cmd/terminal-attach.go @@ -118,14 +118,17 @@ func attachToTerminal(ctx context.Context, client api.TerminalServiceClient, ali HeightPx: uint32(size.Y), } + var expectResize bool if opts.ForceResize { req.Priority = &api.SetTerminalSizeRequest_Force{Force: true} + expectResize = true } else if opts.Token != "" { req.Priority = &api.SetTerminalSizeRequest_Token{Token: opts.Token} + expectResize = true } _, err = client.SetSize(ctx, req) - if err != nil { + if err != nil && expectResize { log.WithError(err).Error("cannot set terminal size") continue } diff --git a/components/supervisor/pkg/terminal/terminal.go b/components/supervisor/pkg/terminal/terminal.go index 2e64afe0ef1f2a..bce92dda8d7f9a 100644 --- a/components/supervisor/pkg/terminal/terminal.go +++ b/components/supervisor/pkg/terminal/terminal.go @@ -6,6 +6,7 @@ import ( "os" "os/exec" "sync" + "time" "github.com/creack/pty" "github.com/gitpod-io/gitpod/common-go/log" @@ -84,6 +85,7 @@ func (m *Mux) Close(alias string) error { if err != nil { log.WithError(err).Warn("cannot close pseudo-terminal") } + delete(m.terms, alias) return nil } @@ -97,7 +99,7 @@ func newTerm(pty *os.File, cmd *exec.Cmd) (*term, error) { res := &term{ PTY: pty, Command: cmd, - Stdout: &multiWriter{listener: make(map[io.WriteCloser]struct{})}, + Stdout: &multiWriter{listener: make(map[*multiWriterListener]struct{})}, StarterToken: token.String(), } @@ -115,47 +117,89 @@ type term struct { } // multiWriter is like io.MultiWriter, except that we can listener at runtime. -// -// TODO(cw): if one listener blocks, all others are blocked, too. Instead we should -// ignore or even drop the listener. type multiWriter struct { closed bool mu sync.Mutex - listener map[io.WriteCloser]struct{} + listener map[*multiWriterListener]struct{} +} + +type multiWriterListener struct { + io.Reader + + closed bool + once sync.Once + closeChan chan struct{} + cchan chan []byte +} + +func (l *multiWriterListener) Close() error { + l.once.Do(func() { + close(l.closeChan) + l.closed = true + + // actual cleanup happens in a go routine started by Listen() + }) + return nil +} + +func (l *multiWriterListener) Done() <-chan struct{} { + return l.closeChan } // Listen listens in on the multi-writer stream -func (mw *multiWriter) Listen() io.ReadCloser { +func (mw *multiWriter) Listen() *multiWriterListener { mw.mu.Lock() defer mw.mu.Unlock() r, w := io.Pipe() - res := opCloser{ - Reader: r, - Op: func() error { - mw.mu.Lock() - defer mw.mu.Unlock() - delete(mw.listener, w) - return nil - }, + cchan, closeChan := make(chan []byte), make(chan struct{}, 1) + res := &multiWriterListener{ + Reader: r, + cchan: cchan, + closeChan: closeChan, } - mw.listener[w] = struct{}{} - return &res + go func() { + // copy bytes from channel to writer. + // Note: we close the writer independently of the write operation s.t. we don't + // block the closing because the write's blocking. + for b := range cchan { + _, err := w.Write(b) + if err != nil { + log.WithError(err).Error("terminal listener droped out") + res.Close() + } + } + }() + go func() { + // listener cleanup on close + <-closeChan + w.Close() + close(cchan) + + mw.mu.Lock() + delete(mw.listener, res) + mw.mu.Unlock() + }() + + mw.listener[res] = struct{}{} + + return res } func (mw *multiWriter) Write(p []byte) (n int, err error) { mw.mu.Lock() defer mw.mu.Unlock() - for w := range mw.listener { - n, err = w.Write(p) - if err != nil { - return + for lstr := range mw.listener { + if lstr.closed { + continue } - if n != len(p) { - err = io.ErrShortWrite - return + + select { + case lstr.cchan <- p: + case <-time.After(5 * time.Second): + lstr.Close() } } return len(p), nil