From d112dd825f20ade0a659c39d491a1da6e7b43a0b Mon Sep 17 00:00:00 2001 From: Marco Vidonis <31407403+marcovidonis@users.noreply.github.com> Date: Thu, 3 Oct 2024 15:46:35 +0100 Subject: [PATCH] Expose WebRTC peerconn stats (#983) * add WebRTC peer connection transport stats * save all peer connections in tracker-client and make stats available when seeding * make offer ID keys into readable strings * handle unsupported Peer Conn stats on WASM --- torrent.go | 13 ++++++++ webtorrent/peer-conn-stats.go | 15 +++++++++ webtorrent/peer-conn-stats_js.go | 13 ++++++++ webtorrent/tracker-client.go | 53 ++++++++++++++++++++++++++------ webtorrent/transport.go | 19 ++++++++++++ 5 files changed, 103 insertions(+), 10 deletions(-) create mode 100644 webtorrent/peer-conn-stats.go create mode 100644 webtorrent/peer-conn-stats_js.go diff --git a/torrent.go b/torrent.go index 1288606117..7435c9b6c2 100644 --- a/torrent.go +++ b/torrent.go @@ -33,6 +33,7 @@ import ( "github.com/anacrolix/multiless" "github.com/anacrolix/sync" "github.com/pion/datachannel" + "github.com/pion/webrtc/v3" "golang.org/x/sync/errgroup" "github.com/anacrolix/torrent/bencode" @@ -3002,6 +3003,18 @@ func (t *Torrent) iterUndirtiedRequestIndexesInPiece( ) } +type webRtcStatsReports map[string]webrtc.StatsReport + +func (t *Torrent) GetWebRtcPeerConnStats() map[string]webRtcStatsReports { + stats := make(map[string]webRtcStatsReports) + trackersMap := t.cl.websocketTrackers.clients + for i, trackerClient := range trackersMap { + ts := trackerClient.RtcPeerConnStats() + stats[i] = ts + } + return stats +} + type requestState struct { peer *Peer when time.Time diff --git a/webtorrent/peer-conn-stats.go b/webtorrent/peer-conn-stats.go new file mode 100644 index 0000000000..907398625f --- /dev/null +++ b/webtorrent/peer-conn-stats.go @@ -0,0 +1,15 @@ +//go:build !js +// +build !js + +package webtorrent + +import ( + "github.com/pion/webrtc/v3" +) + +func GetPeerConnStats(pc *wrappedPeerConnection) (stats webrtc.StatsReport) { + if pc != nil { + stats = pc.GetStats() + } + return +} diff --git a/webtorrent/peer-conn-stats_js.go b/webtorrent/peer-conn-stats_js.go new file mode 100644 index 0000000000..7f770b1d8b --- /dev/null +++ b/webtorrent/peer-conn-stats_js.go @@ -0,0 +1,13 @@ +//go:build js && wasm +// +build js,wasm + +package webtorrent + +import ( + "github.com/pion/webrtc/v3" +) + +// webrtc.PeerConnection.GetStats() is not currently supported for WASM. Return empty stats. +func GetPeerConnStats(pc *wrappedPeerConnection) (stats webrtc.StatsReport) { + return +} diff --git a/webtorrent/tracker-client.go b/webtorrent/tracker-client.go index 8b307c7bea..cc0c65849b 100644 --- a/webtorrent/tracker-client.go +++ b/webtorrent/tracker-client.go @@ -44,6 +44,8 @@ type TrackerClient struct { WebsocketTrackerHttpHeader func() http.Header ICEServers []webrtc.ICEServer + + rtcPeerConns map[string]*wrappedPeerConnection } func (me *TrackerClient) Stats() TrackerClientStats { @@ -234,17 +236,22 @@ func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte return fmt.Errorf("creating offer: %w", err) } + // save the leecher peer connections + tc.storePeerConnection(fmt.Sprintf("%x", randOfferId[:]), pc) + + pc.OnClose(func() { + delete(tc.rtcPeerConns, offerIDBinary) + }) + tc.Logger.Levelf(log.Debug, "announcing offer") - err = tc.announce(event, infoHash, []outboundOffer{ - { - offerId: offerIDBinary, - outboundOfferValue: outboundOfferValue{ - originalOffer: offer, - peerConnection: pc, - infoHash: infoHash, - dataChannel: dc, - }, - }, + err = tc.announce(event, infoHash, []outboundOffer{{ + offerId: offerIDBinary, + outboundOfferValue: outboundOfferValue{ + originalOffer: offer, + peerConnection: pc, + infoHash: infoHash, + dataChannel: dc, + }}, }) if err != nil { dc.Close() @@ -293,6 +300,19 @@ func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte return nil } +// Calculate the stats for all the peer connections the moment they are requested. +// As the stats will change over the life of a peer connection, this ensures that +// the updated values are returned. +func (tc *TrackerClient) RtcPeerConnStats() map[string]webrtc.StatsReport { + tc.mu.Lock() + defer tc.mu.Unlock() + sr := make(map[string]webrtc.StatsReport) + for id, pc := range tc.rtcPeerConns { + sr[id] = GetPeerConnStats(pc) + } + return sr +} + func (tc *TrackerClient) writeMessage(data []byte) error { for tc.wsConn == nil { if tc.closed { @@ -359,6 +379,10 @@ func (tc *TrackerClient) handleOffer( if err != nil { return fmt.Errorf("creating answering peer connection: %w", err) } + + // save the seeder peer connections + tc.storePeerConnection(fmt.Sprintf("%x", offerContext.Id[:]), peerConnection) + response := AnnounceResponse{ Action: "announce", InfoHash: binaryToJsonString(offerContext.InfoHash[:]), @@ -401,3 +425,12 @@ func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescr delete(tc.outboundOffers, offerId) go tc.Announce(tracker.None, offer.infoHash) } + +func (tc *TrackerClient) storePeerConnection(offerId string, pc *wrappedPeerConnection) { + tc.mu.Lock() + defer tc.mu.Unlock() + if tc.rtcPeerConns == nil { + tc.rtcPeerConns = make(map[string]*wrappedPeerConnection) + } + tc.rtcPeerConns[offerId] = pc +} diff --git a/webtorrent/transport.go b/webtorrent/transport.go index 75f1376299..6231fd8d66 100644 --- a/webtorrent/transport.go +++ b/webtorrent/transport.go @@ -38,16 +38,35 @@ type wrappedPeerConnection struct { pproffd.CloseWrapper span trace.Span ctx context.Context + + onCloseHandler func() } func (me *wrappedPeerConnection) Close() error { me.closeMu.Lock() defer me.closeMu.Unlock() + + me.onClose() + err := me.CloseWrapper.Close() me.span.End() return err } +func (me *wrappedPeerConnection) OnClose(f func()) { + me.closeMu.Lock() + defer me.closeMu.Unlock() + me.onCloseHandler = f +} + +func (me *wrappedPeerConnection) onClose() { + handler := me.onCloseHandler + + if handler != nil { + handler() + } +} + func newPeerConnection(logger log.Logger, iceServers []webrtc.ICEServer) (*wrappedPeerConnection, error) { newPeerConnectionMu.Lock() defer newPeerConnectionMu.Unlock()