Skip to content

Commit

Permalink
[supervisor squash] Time out terminal listener
Browse files Browse the repository at this point in the history
  • Loading branch information
Christian Weichel committed Sep 14, 2020
1 parent fd29caa commit d44eea1
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 24 deletions.
5 changes: 4 additions & 1 deletion components/supervisor/cmd/terminal-attach.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,17 @@ func attachToTerminal(ctx context.Context, client api.TerminalServiceClient, ali
HeightPx: uint32(size.Y),
}

var expectResize bool
if opts.ForceResize {
req.Priority = &api.SetTerminalSizeRequest_Force{Force: true}
expectResize = true
} else if opts.Token != "" {
req.Priority = &api.SetTerminalSizeRequest_Token{Token: opts.Token}
expectResize = true
}

_, err = client.SetSize(ctx, req)
if err != nil {
if err != nil && expectResize {
log.WithError(err).Error("cannot set terminal size")
continue
}
Expand Down
90 changes: 67 additions & 23 deletions components/supervisor/pkg/terminal/terminal.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"os/exec"
"sync"
"time"

"github.com/creack/pty"
"github.com/gitpod-io/gitpod/common-go/log"
Expand Down Expand Up @@ -84,6 +85,7 @@ func (m *Mux) Close(alias string) error {
if err != nil {
log.WithError(err).Warn("cannot close pseudo-terminal")
}
delete(m.terms, alias)

return nil
}
Expand All @@ -97,7 +99,7 @@ func newTerm(pty *os.File, cmd *exec.Cmd) (*term, error) {
res := &term{
PTY: pty,
Command: cmd,
Stdout: &multiWriter{listener: make(map[io.WriteCloser]struct{})},
Stdout: &multiWriter{listener: make(map[*multiWriterListener]struct{})},

StarterToken: token.String(),
}
Expand All @@ -115,47 +117,89 @@ type term struct {
}

// multiWriter is like io.MultiWriter, except that we can listener at runtime.
//
// TODO(cw): if one listener blocks, all others are blocked, too. Instead we should
// ignore or even drop the listener.
type multiWriter struct {
closed bool
mu sync.Mutex
listener map[io.WriteCloser]struct{}
listener map[*multiWriterListener]struct{}
}

type multiWriterListener struct {
io.Reader

closed bool
once sync.Once
closeChan chan struct{}
cchan chan []byte
}

func (l *multiWriterListener) Close() error {
l.once.Do(func() {
close(l.closeChan)
l.closed = true

// actual cleanup happens in a go routine started by Listen()
})
return nil
}

func (l *multiWriterListener) Done() <-chan struct{} {
return l.closeChan
}

// Listen listens in on the multi-writer stream
func (mw *multiWriter) Listen() io.ReadCloser {
func (mw *multiWriter) Listen() *multiWriterListener {
mw.mu.Lock()
defer mw.mu.Unlock()

r, w := io.Pipe()
res := opCloser{
Reader: r,
Op: func() error {
mw.mu.Lock()
defer mw.mu.Unlock()
delete(mw.listener, w)
return nil
},
cchan, closeChan := make(chan []byte), make(chan struct{}, 1)
res := &multiWriterListener{
Reader: r,
cchan: cchan,
closeChan: closeChan,
}
mw.listener[w] = struct{}{}

return &res
go func() {
// copy bytes from channel to writer.
// Note: we close the writer independently of the write operation s.t. we don't
// block the closing because the write's blocking.
for b := range cchan {
_, err := w.Write(b)
if err != nil {
log.WithError(err).Error("terminal listener droped out")
res.Close()
}
}
}()
go func() {
// listener cleanup on close
<-closeChan
w.Close()
close(cchan)

mw.mu.Lock()
delete(mw.listener, res)
mw.mu.Unlock()
}()

mw.listener[res] = struct{}{}

return res
}

func (mw *multiWriter) Write(p []byte) (n int, err error) {
mw.mu.Lock()
defer mw.mu.Unlock()

for w := range mw.listener {
n, err = w.Write(p)
if err != nil {
return
for lstr := range mw.listener {
if lstr.closed {
continue
}
if n != len(p) {
err = io.ErrShortWrite
return

select {
case lstr.cchan <- p:
case <-time.After(5 * time.Second):
lstr.Close()
}
}
return len(p), nil
Expand Down

0 comments on commit d44eea1

Please sign in to comment.