Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v3: Add PeerConnection.GracefulClose #2854

Merged
merged 1 commit into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions datachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
readyState atomic.Value // DataChannelState
bufferedAmountLowThreshold uint64
detachCalled bool
readLoopActive chan struct{}
isGracefulClosed bool

// The binaryType represents attribute MUST, on getting, return the value to
// which it was last set. On setting, if the new value is either the string
Expand Down Expand Up @@ -225,6 +227,10 @@
func (d *DataChannel) onOpen() {
d.mu.RLock()
handler := d.onOpenHandler
if d.isGracefulClosed {
d.mu.RUnlock()
return

Check warning on line 232 in datachannel.go

View check run for this annotation

Codecov / codecov/patch

datachannel.go#L231-L232

Added lines #L231 - L232 were not covered by tests
}
d.mu.RUnlock()

if handler != nil {
Expand Down Expand Up @@ -252,6 +258,10 @@
func (d *DataChannel) onDial() {
d.mu.RLock()
handler := d.onDialHandler
if d.isGracefulClosed {
d.mu.RUnlock()
return

Check warning on line 263 in datachannel.go

View check run for this annotation

Codecov / codecov/patch

datachannel.go#L262-L263

Added lines #L262 - L263 were not covered by tests
}
d.mu.RUnlock()

if handler != nil {
Expand All @@ -261,6 +271,10 @@

// OnClose sets an event handler which is invoked when
// the underlying data transport has been closed.
// Note: Due to backwards compatibility, there is a chance that
// OnClose can be called, even if the GracefulClose is used.
// If this is the case for you, you can deregister OnClose
// prior to GracefulClose.
func (d *DataChannel) OnClose(f func()) {
d.mu.Lock()
defer d.mu.Unlock()
Expand Down Expand Up @@ -292,6 +306,10 @@
func (d *DataChannel) onMessage(msg DataChannelMessage) {
d.mu.RLock()
handler := d.onMessageHandler
if d.isGracefulClosed {
d.mu.RUnlock()
return
}
d.mu.RUnlock()

if handler == nil {
Expand All @@ -302,6 +320,10 @@

func (d *DataChannel) handleOpen(dc *datachannel.DataChannel, isRemote, isAlreadyNegotiated bool) {
d.mu.Lock()
if d.isGracefulClosed {
d.mu.Unlock()
return

Check warning on line 325 in datachannel.go

View check run for this annotation

Codecov / codecov/patch

datachannel.go#L324-L325

Added lines #L324 - L325 were not covered by tests
}
d.dataChannel = dc
bufferedAmountLowThreshold := d.bufferedAmountLowThreshold
onBufferedAmountLow := d.onBufferedAmountLow
Expand All @@ -326,7 +348,12 @@
d.mu.Lock()
defer d.mu.Unlock()

if d.isGracefulClosed {
return

Check warning on line 352 in datachannel.go

View check run for this annotation

Codecov / codecov/patch

datachannel.go#L352

Added line #L352 was not covered by tests
}

if !d.api.settingEngine.detach.DataChannels {
d.readLoopActive = make(chan struct{})
go d.readLoop()
}
}
Expand All @@ -342,6 +369,10 @@
func (d *DataChannel) onError(err error) {
d.mu.RLock()
handler := d.onErrorHandler
if d.isGracefulClosed {
d.mu.RUnlock()
return
}
d.mu.RUnlock()

if handler != nil {
Expand All @@ -356,6 +387,12 @@
}}

func (d *DataChannel) readLoop() {
defer func() {
d.mu.Lock()
readLoopActive := d.readLoopActive
d.mu.Unlock()
defer close(readLoopActive)
}()
for {
buffer := rlBufPool.Get().([]byte) //nolint:forcetypeassert
n, isString, err := d.dataChannel.ReadDataChannel(buffer)
Expand Down Expand Up @@ -438,7 +475,32 @@
// Close Closes the DataChannel. It may be called regardless of whether
// the DataChannel object was created by this peer or the remote peer.
func (d *DataChannel) Close() error {
return d.close(false)
}

// GracefulClose Closes the DataChannel. It may be called regardless of whether
// the DataChannel object was created by this peer or the remote peer. It also waits
// for any goroutines it started to complete. This is only safe to call outside of
// DataChannel callbacks or if in a callback, in its own goroutine.
func (d *DataChannel) GracefulClose() error {
return d.close(true)
}

// Normally, close only stops writes from happening, so graceful=true
// will wait for reads to be finished based on underlying SCTP association
// closure or a SCTP reset stream from the other side. This is safe to call
// with graceful=true after tearing down a PeerConnection but not
// necessarily before. For example, if you used a vnet and dropped all packets
// right before closing the DataChannel, you'd need never see a reset stream.
func (d *DataChannel) close(shouldGracefullyClose bool) error {
d.mu.Lock()
d.isGracefulClosed = true
readLoopActive := d.readLoopActive
if shouldGracefullyClose && readLoopActive != nil {
defer func() {
<-readLoopActive
}()
}
haveSctpTransport := d.dataChannel != nil
d.mu.Unlock()

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.17
require (
github.com/pion/datachannel v1.5.8
github.com/pion/dtls/v2 v2.2.12
github.com/pion/ice/v2 v2.3.31
github.com/pion/ice/v2 v2.3.34
github.com/pion/interceptor v0.1.29
github.com/pion/logging v0.2.2
github.com/pion/randutil v0.1.0
Expand All @@ -15,7 +15,7 @@ require (
github.com/pion/sdp/v3 v3.0.9
github.com/pion/srtp/v2 v2.0.20
github.com/pion/stun v0.6.1
github.com/pion/transport/v2 v2.2.8
github.com/pion/transport/v2 v2.2.10
github.com/sclevine/agouti v3.0.0+incompatible
github.com/stretchr/testify v1.9.0
golang.org/x/net v0.22.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ github.com/pion/datachannel v1.5.8/go.mod h1:PgmdpoaNBLX9HNzNClmdki4DYW5JtI7Yibu
github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s=
github.com/pion/dtls/v2 v2.2.12 h1:KP7H5/c1EiVAAKUmXyCzPiQe5+bCJrpOeKg/L05dunk=
github.com/pion/dtls/v2 v2.2.12/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE=
github.com/pion/ice/v2 v2.3.31 h1:qag/YqiOn5qPi0kgeVdsytxjx8szuriWSIeXKu8dDQc=
github.com/pion/ice/v2 v2.3.31/go.mod h1:8fac0+qftclGy1tYd/nfwfHC729BLaxtVqMdMVCAVPU=
github.com/pion/ice/v2 v2.3.34 h1:Ic1ppYCj4tUOcPAp76U6F3fVrlSw8A9JtRXLqw6BbUM=
github.com/pion/ice/v2 v2.3.34/go.mod h1:mBF7lnigdqgtB+YHkaY/Y6s6tsyRyo4u4rPGRuOjUBQ=
github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M=
github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
Expand Down Expand Up @@ -74,8 +74,8 @@ github.com/pion/stun v0.6.1/go.mod h1:/hO7APkX4hZKu/D0f2lHzNyvdkTGtIy3NDmLR7kSz/
github.com/pion/transport/v2 v2.2.1/go.mod h1:cXXWavvCnFF6McHTft3DWS9iic2Mftcz1Aq29pGcU5g=
github.com/pion/transport/v2 v2.2.3/go.mod h1:q2U/tf9FEfnSBGSW6w5Qp5PFWRLRj3NjLhCCgpRK4p0=
github.com/pion/transport/v2 v2.2.4/go.mod h1:q2U/tf9FEfnSBGSW6w5Qp5PFWRLRj3NjLhCCgpRK4p0=
github.com/pion/transport/v2 v2.2.8 h1:HzsqGBChgtF4Cj47gu51l5hONuK/NwgbZL17CMSuwS0=
github.com/pion/transport/v2 v2.2.8/go.mod h1:sq1kSLWs+cHW9E+2fJP95QudkzbK7wscs8yYgQToO5E=
github.com/pion/transport/v2 v2.2.10 h1:ucLBLE8nuxiHfvkFKnkDQRYWYfp8ejf4YBOPfaQpw6Q=
github.com/pion/transport/v2 v2.2.10/go.mod h1:sq1kSLWs+cHW9E+2fJP95QudkzbK7wscs8yYgQToO5E=
github.com/pion/transport/v3 v3.0.1/go.mod h1:UY7kiITrlMv7/IKgd5eTUcaahZx5oUN3l9SzK5f5xE0=
github.com/pion/transport/v3 v3.0.2 h1:r+40RJR25S9w3jbA6/5uEPTzcdn7ncyU44RWCbHkLg4=
github.com/pion/transport/v3 v3.0.2/go.mod h1:nIToODoOlb5If2jF9y2Igfx3PFYWfuXi37m0IlWa/D0=
Expand Down
22 changes: 20 additions & 2 deletions icegatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,31 @@

// Close prunes all local candidates, and closes the ports.
func (g *ICEGatherer) Close() error {
return g.close(false /* shouldGracefullyClose */)
}

// GracefulClose prunes all local candidates, and closes the ports. It also waits
// for any goroutines it started to complete. This is only safe to call outside of
// ICEGatherer callbacks or if in a callback, in its own goroutine.
func (g *ICEGatherer) GracefulClose() error {
return g.close(true /* shouldGracefullyClose */)
}

func (g *ICEGatherer) close(shouldGracefullyClose bool) error {
g.lock.Lock()
defer g.lock.Unlock()

if g.agent == nil {
return nil
} else if err := g.agent.Close(); err != nil {
return err
}
if shouldGracefullyClose {
if err := g.agent.GracefulClose(); err != nil {
return err

Check warning on line 210 in icegatherer.go

View check run for this annotation

Codecov / codecov/patch

icegatherer.go#L210

Added line #L210 was not covered by tests
}
} else {
if err := g.agent.Close(); err != nil {
return err

Check warning on line 214 in icegatherer.go

View check run for this annotation

Codecov / codecov/patch

icegatherer.go#L214

Added line #L214 was not covered by tests
}
}

g.agent = nil
Expand Down
24 changes: 23 additions & 1 deletion icetransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"github.com/pion/ice/v2"
"github.com/pion/logging"
"github.com/pion/webrtc/v3/internal/mux"
"github.com/pion/webrtc/v3/internal/util"
)

// ICETransport allows an application access to information about the ICE
Expand Down Expand Up @@ -187,6 +188,17 @@

// Stop irreversibly stops the ICETransport.
func (t *ICETransport) Stop() error {
return t.stop(false /* shouldGracefullyClose */)
}

// GracefulStop irreversibly stops the ICETransport. It also waits
// for any goroutines it started to complete. This is only safe to call outside of
// ICETransport callbacks or if in a callback, in its own goroutine.
func (t *ICETransport) GracefulStop() error {
return t.stop(true /* shouldGracefullyClose */)
}

func (t *ICETransport) stop(shouldGracefullyClose bool) error {
t.lock.Lock()
defer t.lock.Unlock()

Expand All @@ -197,8 +209,18 @@
}

if t.mux != nil {
return t.mux.Close()
var closeErrs []error
if shouldGracefullyClose && t.gatherer != nil {
// we can't access icegatherer/icetransport.Close via
// mux's net.Conn Close so we call it earlier here.
closeErrs = append(closeErrs, t.gatherer.GracefulClose())
}
closeErrs = append(closeErrs, t.mux.Close())
return util.FlattenErrs(closeErrs)
} else if t.gatherer != nil {
if shouldGracefullyClose {
return t.gatherer.GracefulClose()

Check warning on line 222 in icetransport.go

View check run for this annotation

Codecov / codecov/patch

icetransport.go#L222

Added line #L222 was not covered by tests
}
return t.gatherer.Close()
}
return nil
Expand Down
4 changes: 4 additions & 0 deletions internal/mux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@
}

if err = m.dispatch(buf[:n]); err != nil {
if errors.Is(err, io.ErrClosedPipe) {

Check warning on line 123 in internal/mux/mux.go

View check run for this annotation

Codecov / codecov/patch

internal/mux/mux.go#L123

Added line #L123 was not covered by tests
// if the buffer was closed, that's not an error we care to report
return

Check warning on line 125 in internal/mux/mux.go

View check run for this annotation

Codecov / codecov/patch

internal/mux/mux.go#L125

Added line #L125 was not covered by tests
}
m.log.Errorf("mux: ending readLoop dispatch error %s", err.Error())
return
}
Expand Down
71 changes: 58 additions & 13 deletions operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@

// Operations is a task executor.
type operations struct {
mu sync.Mutex
busy bool
ops *list.List
mu sync.Mutex
busyCh chan struct{}
ops *list.List

updateNegotiationNeededFlagOnEmptyChain *atomicBool
onNegotiationNeeded func()
isClosed bool
}

func newOperations(
Expand All @@ -33,21 +34,34 @@
}

// Enqueue adds a new action to be executed. If there are no actions scheduled,
// the execution will start immediately in a new goroutine.
// the execution will start immediately in a new goroutine. If the queue has been
// closed, the operation will be dropped. The queue is only deliberately closed
// by a user.
func (o *operations) Enqueue(op operation) {
o.mu.Lock()
defer o.mu.Unlock()
_ = o.tryEnqueue(op)
}

// tryEnqueue attempts to enqueue the given operation. It returns false
// if the op is invalid or the queue is closed. mu must be locked by
// tryEnqueue's caller.
func (o *operations) tryEnqueue(op operation) bool {
if op == nil {
return
return false

Check warning on line 51 in operations.go

View check run for this annotation

Codecov / codecov/patch

operations.go#L51

Added line #L51 was not covered by tests
}

o.mu.Lock()
running := o.busy
if o.isClosed {
return false
}
o.ops.PushBack(op)
o.busy = true
o.mu.Unlock()

if !running {
if o.busyCh == nil {
o.busyCh = make(chan struct{})
go o.start()
}

return true
}

// IsEmpty checks if there are tasks in the queue
Expand All @@ -62,12 +76,38 @@
func (o *operations) Done() {
var wg sync.WaitGroup
wg.Add(1)
o.Enqueue(func() {
o.mu.Lock()
enqueued := o.tryEnqueue(func() {
wg.Done()
})
o.mu.Unlock()
if !enqueued {
return
}
wg.Wait()
}

// GracefulClose waits for the operations queue to be cleared and forbids
// new operations from being enqueued.
func (o *operations) GracefulClose() {
o.mu.Lock()
if o.isClosed {
o.mu.Unlock()
return

Check warning on line 96 in operations.go

View check run for this annotation

Codecov / codecov/patch

operations.go#L95-L96

Added lines #L95 - L96 were not covered by tests
}
// do not enqueue anymore ops from here on
// o.isClosed=true will also not allow a new busyCh
// to be created.
o.isClosed = true

busyCh := o.busyCh
o.mu.Unlock()
if busyCh == nil {
return
}
<-busyCh

Check warning on line 108 in operations.go

View check run for this annotation

Codecov / codecov/patch

operations.go#L108

Added line #L108 was not covered by tests
}

func (o *operations) pop() func() {
o.mu.Lock()
defer o.mu.Unlock()
Expand All @@ -87,12 +127,17 @@
defer func() {
o.mu.Lock()
defer o.mu.Unlock()
if o.ops.Len() == 0 {
o.busy = false
// this wil lbe the most recent busy chan
close(o.busyCh)

if o.ops.Len() == 0 || o.isClosed {
o.busyCh = nil
return
}

// either a new operation was enqueued while we
// were busy, or an operation panicked
o.busyCh = make(chan struct{})
go o.start()
}()

Expand Down
Loading
Loading