Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go: add deadlock detector, improve locking #618

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ require (
github.com/labstack/echo v3.3.10+incompatible
github.com/linuxkit/virtsock v0.0.0-20180830132707-8e79449dea07
github.com/pkg/errors v0.8.1-0.20181023235946-059132a15dd0
github.com/sirupsen/logrus v1.3.0
github.com/sasha-s/go-deadlock v0.3.1
github.com/sirupsen/logrus v1.9.0
github.com/spf13/viper v1.4.0
github.com/stretchr/testify v1.3.0
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
github.com/stretchr/testify v1.7.0
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2
golang.org/x/sync v0.0.0-20190423024810-112230192c58
k8s.io/api v0.0.0-20170922112058-fe29995db376
k8s.io/apimachinery v0.0.0-20170922111930-9d38e20d609d
k8s.io/client-go v0.0.0-20170922112243-82aa063804cf
Expand Down Expand Up @@ -41,7 +42,6 @@ require (
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/json-iterator/go v1.1.5 // indirect
github.com/juju/ratelimit v0.0.0-20170523012141-5b9ff8664717 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect
github.com/labstack/gommon v0.2.9 // indirect
github.com/magiconair/properties v1.8.1 // indirect
github.com/mailru/easyjson v0.0.0-20170902151237-2a92e673c9a6 // indirect
Expand All @@ -52,6 +52,7 @@ require (
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 // indirect
github.com/pelletier/go-toml v1.4.0 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/afero v1.2.2 // indirect
github.com/spf13/cast v1.3.0 // indirect
Expand All @@ -64,5 +65,6 @@ require (
golang.org/x/text v0.3.3 // indirect
gopkg.in/inf.v0 v0.9.0 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
k8s.io/kube-openapi v0.0.0-20170906091745-abfc5fbe1cf8 // indirect
)
16 changes: 11 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
Expand Down Expand Up @@ -122,6 +121,8 @@ github.com/pelletier/go-toml v1.4.0 h1:u3Z1r+oOXJIkxqw34zVhyPgjBsm6X2wn21NWs/HfS
github.com/pelletier/go-toml v1.4.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUruD3k1mMwo=
github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1-0.20181023235946-059132a15dd0 h1:TVdhkEP0WKajbywS5TEDWwuzCl9EqOcQ6b1ymfmx/6E=
github.com/pkg/errors v0.8.1-0.20181023235946-059132a15dd0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand All @@ -137,9 +138,11 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/sasha-s/go-deadlock v0.3.1 h1:sqv7fDNShgjcaxkO0JNcOAlr8B9+cV5Ey/OB71efZx0=
github.com/sasha-s/go-deadlock v0.3.1/go.mod h1:F73l+cr82YSh10GxyRI6qZiCgK64VaZjwesgfQ1/iLM=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
Expand All @@ -158,8 +161,9 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
Expand Down Expand Up @@ -209,7 +213,7 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down Expand Up @@ -242,6 +246,8 @@ gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bl
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
k8s.io/api v0.0.0-20170922112058-fe29995db376 h1:xED2Koq20vYhrfwxgBE4YfA/rMY0nm4YxaK7YJVklZ8=
k8s.io/api v0.0.0-20170922112058-fe29995db376/go.mod h1:iuAfoD4hCxJ8Onx9kaTIt30j7jUFS00AXQi6QMi99vA=
Expand Down
4 changes: 3 additions & 1 deletion go/pkg/libproxy/loopbackconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"net"
"sync"
"time"

"github.com/sasha-s/go-deadlock"
)

// io.Pipe is synchronous but we need to decouple the Read and Write calls
Expand All @@ -19,7 +21,7 @@ import (
type bufferedPipe struct {
bufs [][]byte
eof bool
m sync.Mutex
m deadlock.Mutex
c *sync.Cond
readDeadline time.Time
}
Expand Down
106 changes: 52 additions & 54 deletions go/pkg/libproxy/multiplexed.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"
"time"

"github.com/sasha-s/go-deadlock"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -39,7 +40,7 @@ func (w *windowState) advance() {
}

type channel struct {
m sync.Mutex
m deadlock.Mutex
c *sync.Cond
multiplexer *multiplexer
destination Destination
Expand Down Expand Up @@ -99,13 +100,15 @@ func (c *channel) sendWindowUpdate() error {
c.read.advance()
seq := c.read.allowed
c.m.Unlock()
return c.multiplexer.send(NewWindow(c.ID, seq))
return c.multiplexer.send(NewWindow(c.ID, seq), nil)
}

func (c *channel) recvWindowUpdate(seq uint64) {
c.m.Lock()
c.write.allowed = seq
c.c.Signal()
// net.Conn says: Multiple goroutines may invoke methods on a Conn simultaneously.
// Therefore there can be multiple goroutines blocked in Write, so when the window opens we should wake them all up.
c.c.Broadcast()
c.m.Unlock()
}

Expand Down Expand Up @@ -138,62 +141,49 @@ func (c *channel) Write(p []byte) (int, error) {
return written, io.EOF
}
if c.write.size() > 0 {
// Some window space is available
toWrite := c.write.size()
if toWrite > len(p) {
toWrite = len(p)
}
// Advance the window before dropping the lock in case another Write() appears and sees the same available space
c.write.current = c.write.current + uint64(toWrite)

// Don't block holding the metadata mutex.
// Note this would allow concurrent calls to Write on the same channel
// to conflict, but we regard that as user error.
c.m.Unlock()

// need to write the header and the payload together
c.multiplexer.writeMutex.Lock()
f := NewData(c.ID, uint32(toWrite))
c.multiplexer.appendEvent(&event{eventType: eventSend, frame: f})
err1 := f.Write(c.multiplexer.connW)
_, err2 := c.multiplexer.connW.Write(p[0:toWrite])
err3 := c.multiplexer.connW.Flush()
c.multiplexer.writeMutex.Unlock()

err := c.multiplexer.send(NewData(c.ID, uint32(toWrite)), p[0:toWrite])
c.m.Lock()
if err1 != nil {
return written, err1
}
if err2 != nil {
return written, err2
}
if err3 != nil {
return written, err3

if err != nil {
return written, err
}
c.write.current = c.write.current + uint64(toWrite)
p = p[toWrite:]
written = written + toWrite
continue
}

// Wait for the write window to be increased (or a timeout)
done := make(chan struct{})
timeout := make(chan time.Time)
// If the client has set a deadline then create a timer:
var (
timer *time.Timer
timeOut bool
)
if !c.writeDeadline.IsZero() {
go func() {
time.Sleep(time.Until(c.writeDeadline))
close(timeout)
}()
timer = time.AfterFunc(time.Until(c.writeDeadline), func() {
c.m.Lock()
defer c.m.Unlock()
timeOut = true
c.c.Broadcast()
})
}
go func() {
c.c.Wait()
close(done)
}()
select {
case <-timeout:
// clean up the goroutine
c.c.Broadcast()
<-done

// Wait for the write window to be increased or a timeout
c.c.Wait()

if timer != nil {
timer.Stop()
}
if timeOut {
return written, &errTimeout{}
case <-done:
// The timeout will still fire in the background
continue
}
}
}
Expand All @@ -209,7 +199,7 @@ func (c *channel) Close() error {
if alreadyClosed {
return nil
}
if err := c.multiplexer.send(NewClose(c.ID)); err != nil {
if err := c.multiplexer.send(NewClose(c.ID), nil); err != nil {
return err
}
c.m.Lock()
Expand All @@ -236,7 +226,7 @@ func (c *channel) CloseWrite() error {
if alreadyShutdown {
return nil
}
if err := c.multiplexer.send(NewShutdown(c.ID)); err != nil {
if err := c.multiplexer.send(NewShutdown(c.ID), nil); err != nil {
return err
}
c.m.Lock()
Expand Down Expand Up @@ -357,15 +347,15 @@ type multiplexer struct {
conn io.Closer
connR io.Reader // with buffering
connW *bufio.Writer
writeMutex sync.Mutex // hold when writing on the channel
writeMutex deadlock.Mutex // hold when writing on the channel
channels map[uint32]*channel
nextChannelID uint32
metadataMutex sync.Mutex // hold when reading/modifying this structure
pendingAccept []*channel // incoming connections
metadataMutex deadlock.Mutex // hold when reading/modifying this structure
pendingAccept []*channel // incoming connections
acceptCond *sync.Cond
isRunning bool
events *ring.Ring // log of packetEvents
eventsM sync.Mutex
eventsM deadlock.Mutex
allocateBackwards bool
}

Expand Down Expand Up @@ -424,14 +414,22 @@ func (m *multiplexer) appendEvent(e *event) {
m.events = m.events.Next()
}

func (m *multiplexer) send(f *Frame) error {
// send a frame (header) plus optional payload. If this call fails then the multiplexed connection will be desynchronised.
func (m *multiplexer) send(f *Frame, payload []byte) error {
m.writeMutex.Lock()
defer m.writeMutex.Unlock()
m.appendEvent(&event{eventType: eventSend, frame: f})

if err := f.Write(m.connW); err != nil {
return err
return fmt.Errorf("writing frame %s: %w", f, err)
}
m.appendEvent(&event{eventType: eventSend, frame: f})
return m.connW.Flush()
if n, err := m.connW.Write(payload); err != nil || n != len(payload) {
return fmt.Errorf("writing frame %s payload length %d: %d, %w", f, len(payload), n, err)
}
if err := m.connW.Flush(); err != nil {
return fmt.Errorf("flushing frame %s: %w", f, err)
}
return nil
}

func (m *multiplexer) findFreeChannelID() uint32 {
Expand Down Expand Up @@ -481,7 +479,7 @@ func (m *multiplexer) Dial(d Destination) (MultiplexedConn, error) {
m.channels[id] = channel
m.metadataMutex.Unlock()

if err := m.send(NewOpen(id, d)); err != nil {
if err := m.send(NewOpen(id, d), nil); err != nil {
return nil, err
}
if err := channel.sendWindowUpdate(); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion go/pkg/libproxy/multiplexed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"
"time"

"github.com/sasha-s/go-deadlock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -492,7 +493,7 @@ func TestMuxConcurrent(t *testing.T) {
serverReadSha := make(map[uint16]string)
clientWriteSha := make(map[uint16]string)
clientReadSha := make(map[uint16]string)
m := &sync.Mutex{}
m := &deadlock.Mutex{}
wg.Add(numConcurrent)
for i := 0; i < numConcurrent; i++ {
go func(i int) {
Expand Down
4 changes: 4 additions & 0 deletions go/pkg/libproxy/network_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"net"
"os"
"runtime"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -118,6 +119,9 @@ func testProxy(t *testing.T, proto string, proxy Proxy) {
}

func TestUnixProxy(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("Unix domain sockets don't work on Windows")
}
pathA := "/tmp/network_proxy_test.sock"
pathB := "/tmp/network_proxy_test.sock2"
if err := os.Remove(pathA); err != nil && !(os.IsNotExist(err)) {
Expand Down
9 changes: 5 additions & 4 deletions go/pkg/libproxy/udp_encapsulation.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"errors"
"io"
"net"
"sync"
"time"

"github.com/sasha-s/go-deadlock"
)

// UDPListener defines a listener interface to read, write and close a UDP connection
Expand All @@ -28,9 +29,9 @@ type uDPEncapsulator interface {
// udpEncapsulator encapsulates a UDP connection and listener
type udpEncapsulator struct {
conn net.Conn
m sync.Mutex
r sync.Mutex
w sync.Mutex
m deadlock.Mutex
r deadlock.Mutex
w deadlock.Mutex
addr *net.UDPAddr
}

Expand Down
5 changes: 3 additions & 2 deletions go/pkg/libproxy/udp_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"encoding/binary"
"net"
"strings"
"sync"
"syscall"
"time"

"github.com/sasha-s/go-deadlock"
)

const (
Expand Down Expand Up @@ -50,7 +51,7 @@ type UDPProxy struct {
backendAddr *net.UDPAddr
dialer UDPDialer
connTrackTable connTrackMap
connTrackLock sync.Mutex
connTrackLock deadlock.Mutex
}

// UDPDialer creates UDP (pseudo-)connections to an address
Expand Down
Loading