diff --git a/callbacks.go b/callbacks.go index 0c66bc50f7..664ba16617 100644 --- a/callbacks.go +++ b/callbacks.go @@ -34,6 +34,10 @@ type Callbacks struct { // handshake has not yet occurred. This is a good time to alter the supported extension // protocols. PeerConnAdded []func(*PeerConn) + + // Sends status event updates. Useful to inform the user of specific events as they happen, + // for logging or to action on. + StatusUpdated []func(StatusUpdatedEvent) } type ReceivedUsefulDataEvent = PeerMessageEvent @@ -54,3 +58,23 @@ type PeerConnReadExtensionMessageEvent struct { ExtensionNumber pp.ExtensionNumber Payload []byte } + +type StatusUpdatedEvent struct { + Event StatusEvent `json:"event"` + Error error `json:"error"` + // The following fields may or may not be populated depending on the event. + PeerId PeerID `json:"peer_id"` + Url string `json:"url"` + InfoHash string `json:"info_hash"` +} + +type StatusEvent string + +const ( + PeerConnected StatusEvent = "peer_connected" + PeerDisconnected StatusEvent = "peer_disconnected" + TrackerConnected StatusEvent = "tracker_connected" + TrackerDisconnected StatusEvent = "tracker_disconnected" + TrackerAnnounceSuccessful StatusEvent = "tracker_announce_successful" + TrackerAnnounceError StatusEvent = "tracker_announce_error" +) diff --git a/client-peerconn_test.go b/client-peerconn_test.go new file mode 100644 index 0000000000..9e114b4edc --- /dev/null +++ b/client-peerconn_test.go @@ -0,0 +1,231 @@ +package torrent + +import ( + "io" + "os" + "testing" + "testing/iotest" + "time" + + "github.com/anacrolix/missinggo/v2" + "github.com/anacrolix/missinggo/v2/bitmap" + "github.com/anacrolix/torrent/internal/testutil" + "github.com/anacrolix/torrent/types" + "github.com/frankban/quicktest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" +) + +func TestPeerConnObserverReadStatusOk(t *testing.T) { + cfg := TestingConfig(t) + cfg.DisableTrackers = false + cfg.EstablishedConnsPerTorrent = 1 + cfg.Observers = &Observers{ + Peers: PeerObserver{ + PeerStatus: make(chan PeerStatus), + }, + } + + c, _ := NewClient(cfg) + defer c.Close() + + go func() { + cfg.Observers.Peers.PeerStatus <- PeerStatus{ + Ok: true, + } + }() + + status := readChannelTimeout(t, cfg.Observers.Peers.PeerStatus, 500*time.Millisecond).(PeerStatus) + require.True(t, status.Ok) + require.Equal(t, "", status.Err) +} + +func TestPeerConnObserverReadStatusErr(t *testing.T) { + cfg := TestingConfig(t) + cfg.DisableTrackers = false + cfg.EstablishedConnsPerTorrent = 1 + cfg.Observers = &Observers{ + Peers: PeerObserver{ + PeerStatus: make(chan PeerStatus), + }, + } + + c, _ := NewClient(cfg) + defer c.Close() + + go func() { + cfg.Observers.Peers.PeerStatus <- PeerStatus{ + Err: "test error", + } + }() + + status := readChannelTimeout(t, cfg.Observers.Peers.PeerStatus, 500*time.Millisecond).(PeerStatus) + require.False(t, status.Ok) + require.Equal(t, status.Err, "test error") +} + +func TestPeerConnEstablished(t *testing.T) { + obs := NewClientObservers() + ps := testClientTransferParams{ + ConfigureSeeder: ConfigureClient{ + Config: func(cfg *ClientConfig) { + cfg.PeerID = "12345123451234512345" + }, + }, + ConfigureLeecher: ConfigureClient{ + Config: func(cfg *ClientConfig) { + // TODO one of UTP or TCP is needed for the transfer + // Does this mean we're not doing webtorrent? TBC + // cfg.DisableUTP = true + cfg.DisableTCP = true + cfg.Debug = false + cfg.DisableTrackers = true + cfg.EstablishedConnsPerTorrent = 1 + cfg.Observers = obs + }, + }, + } + + go testClientTransfer(t, ps) + + status := readChannelTimeout(t, obs.Peers.PeerStatus, 500*time.Millisecond).(PeerStatus) + var expectedPeerId types.PeerID + missinggo.CopyExact(&expectedPeerId, "12345123451234512345") + require.Equal(t, expectedPeerId, status.Id) + require.True(t, status.Ok) + require.Equal(t, "", status.Err) + + // Peer conn is dropped after transfer is finished. This is the next update we receive. + status = readChannelTimeout(t, obs.Peers.PeerStatus, 500*time.Millisecond).(PeerStatus) + require.Equal(t, expectedPeerId, status.Id) + require.False(t, status.Ok) + require.Equal(t, "", status.Err) +} + +type ConfigureClient struct { + Config func(cfg *ClientConfig) + Client func(cl *Client) +} + +type testClientTransferParams struct { + SeederUploadRateLimiter *rate.Limiter + LeecherDownloadRateLimiter *rate.Limiter + ConfigureSeeder ConfigureClient + ConfigureLeecher ConfigureClient + + LeecherStartsWithoutMetadata bool +} + +// Simplified version of testClientTransfer found in test/leecher-storage.go. +// Could not import and reuse that function due to circular dependencies between modules. +func testClientTransfer(t *testing.T, ps testClientTransferParams) { + greetingTempDir, mi := testutil.GreetingTestTorrent() + defer os.RemoveAll(greetingTempDir) + // Create seeder and a Torrent. + cfg := TestingConfig(t) + cfg.Seed = true + // Some test instances don't like this being on, even when there's no cache involved. + cfg.DropMutuallyCompletePeers = false + if ps.SeederUploadRateLimiter != nil { + cfg.UploadRateLimiter = ps.SeederUploadRateLimiter + } + cfg.DataDir = greetingTempDir + if ps.ConfigureSeeder.Config != nil { + ps.ConfigureSeeder.Config(cfg) + } + seeder, err := NewClient(cfg) + require.NoError(t, err) + if ps.ConfigureSeeder.Client != nil { + ps.ConfigureSeeder.Client(seeder) + } + seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi)) + defer seeder.Close() + <-seederTorrent.Complete().On() + + // Create leecher and a Torrent. + leecherDataDir := t.TempDir() + cfg = TestingConfig(t) + // See the seeder client config comment. + cfg.DropMutuallyCompletePeers = false + cfg.DataDir = leecherDataDir + if ps.LeecherDownloadRateLimiter != nil { + cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter + } + cfg.Seed = false + if ps.ConfigureLeecher.Config != nil { + ps.ConfigureLeecher.Config(cfg) + } + leecher, err := NewClient(cfg) + require.NoError(t, err) + defer leecher.Close() + if ps.ConfigureLeecher.Client != nil { + ps.ConfigureLeecher.Client(leecher) + } + leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) { + ret = TorrentSpecFromMetaInfo(mi) + ret.ChunkSize = 2 + if ps.LeecherStartsWithoutMetadata { + ret.InfoBytes = nil + } + return + }()) + require.NoError(t, err) + assert.False(t, leecherTorrent.Complete().Bool()) + assert.True(t, new) + + added := leecherTorrent.AddClientPeer(seeder) + assert.False(t, leecherTorrent.Seeding()) + // The leecher will use peers immediately if it doesn't have the metadata. Otherwise, they + // should be sitting idle until we demand data. + if !ps.LeecherStartsWithoutMetadata { + assert.EqualValues(t, added, leecherTorrent.Stats().PendingPeers) + } + if ps.LeecherStartsWithoutMetadata { + <-leecherTorrent.GotInfo() + } + r := leecherTorrent.NewReader() + defer r.Close() + go leecherTorrent.SetInfoBytes(mi.InfoBytes) + + assertReadAllGreeting(t, r) + <-leecherTorrent.Complete().On() + assert.NotEmpty(t, seederTorrent.PeerConns()) + leecherPeerConns := leecherTorrent.PeerConns() + if cfg.DropMutuallyCompletePeers { + // I don't think we can assume it will be empty already, due to timing. + // assert.Empty(t, leecherPeerConns) + } else { + assert.NotEmpty(t, leecherPeerConns) + } + foundSeeder := false + for _, pc := range leecherPeerConns { + completed := pc.PeerPieces().GetCardinality() + t.Logf("peer conn %v has %v completed pieces", pc, completed) + if completed == bitmap.BitRange(leecherTorrent.Info().NumPieces()) { + foundSeeder = true + } + } + if !foundSeeder { + t.Errorf("didn't find seeder amongst leecher peer conns") + } + + seederStats := seederTorrent.Stats() + assert.True(t, 13 <= seederStats.BytesWrittenData.Int64()) + assert.True(t, 8 <= seederStats.ChunksWritten.Int64()) + + leecherStats := leecherTorrent.Stats() + assert.True(t, 13 <= leecherStats.BytesReadData.Int64()) + assert.True(t, 8 <= leecherStats.ChunksRead.Int64()) + + // Try reading through again for the cases where the torrent data size + // exceeds the size of the cache. + assertReadAllGreeting(t, r) +} + +func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) { + pos, err := r.Seek(0, io.SeekStart) + assert.NoError(t, err) + assert.EqualValues(t, 0, pos) + quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil) +} diff --git a/client-tracker_test.go b/client-tracker_test.go new file mode 100644 index 0000000000..a75e861a25 --- /dev/null +++ b/client-tracker_test.go @@ -0,0 +1,166 @@ +package torrent + +import ( + "errors" + "github.com/anacrolix/torrent/internal/testutil" + "github.com/anacrolix/torrent/tracker" + "github.com/anacrolix/torrent/webtorrent" + "github.com/gorilla/websocket" + "github.com/stretchr/testify/require" + "net" + "net/http" + "net/http/httptest" + "os" + "strings" + "testing" + "time" +) + +func TestClientInvalidTracker(t *testing.T) { + cfg := TestingConfig(t) + cfg.DisableTrackers = false + cfg.Observers = NewClientObservers() + + cl, err := NewClient(cfg) + require.NoError(t, err) + defer cl.Close() + + dir, mi := testutil.GreetingTestTorrent() + defer os.RemoveAll(dir) + + mi.AnnounceList = [][]string{ + {"ws://test.invalid:4242"}, + } + + to, err := cl.AddTorrent(mi) + require.NoError(t, err) + + status := readChannelTimeout(t, cfg.Observers.Trackers.ConnStatus, 500*time.Millisecond).(webtorrent.TrackerStatus) + require.Equal(t, "ws://test.invalid:4242", status.Url) + var expected *net.OpError + require.ErrorAs(t, expected, &status.Err) + + to.Drop() +} + +var upgrader = websocket.Upgrader{} + +func testtracker(w http.ResponseWriter, r *http.Request) { + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + defer c.Close() + for { + _, _, err := c.ReadMessage() + if err != nil { + break + } + //err = c.WriteMessage(mt, message) + //if err != nil { + // break + //} + } +} + +func TestClientValidTrackerConn(t *testing.T) { + s, trackerUrl := startTestTracker() + defer s.Close() + + cfg := TestingConfig(t) + cfg.DisableTrackers = false + cfg.Observers = NewClientObservers() + + cl, err := NewClient(cfg) + require.NoError(t, err) + defer cl.Close() + + dir, mi := testutil.GreetingTestTorrent() + defer os.RemoveAll(dir) + + mi.AnnounceList = [][]string{ + {trackerUrl}, + } + + to, err := cl.AddTorrent(mi) + require.NoError(t, err) + + status := readChannelTimeout(t, cfg.Observers.Trackers.ConnStatus, 500*time.Millisecond).(webtorrent.TrackerStatus) + require.Equal(t, trackerUrl, status.Url) + require.True(t, status.Ok) + require.Nil(t, status.Err) + + to.Drop() +} + +func TestClientAnnounceFailure(t *testing.T) { + s, trackerUrl := startTestTracker() + defer s.Close() + + cfg := TestingConfig(t) + cfg.DisableTrackers = false + cfg.Observers = NewClientObservers() + + cl, err := NewClient(cfg) + require.NoError(t, err) + defer cl.Close() + + cl.websocketTrackers.GetAnnounceRequest = func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) { + return tracker.AnnounceRequest{}, errors.New("test error") + } + + dir, mi := testutil.GreetingTestTorrent() + defer os.RemoveAll(dir) + + mi.AnnounceList = [][]string{ + {trackerUrl}, + } + + to, err := cl.AddTorrent(mi) + require.NoError(t, err) + + status := readChannelTimeout(t, cfg.Observers.Trackers.AnnounceStatus, 500*time.Millisecond).(webtorrent.AnnounceStatus) + require.Equal(t, trackerUrl, status.Url) + require.False(t, status.Ok) + require.EqualError(t, status.Err, "test error") + require.Empty(t, status.Event) + + to.Drop() +} + +func TestClientAnnounceSuccess(t *testing.T) { + s, trackerUrl := startTestTracker() + defer s.Close() + + cfg := TestingConfig(t) + cfg.DisableTrackers = false + cfg.Observers = NewClientObservers() + + cl, err := NewClient(cfg) + require.NoError(t, err) + defer cl.Close() + + dir, mi := testutil.GreetingTestTorrent() + defer os.RemoveAll(dir) + + mi.AnnounceList = [][]string{ + {trackerUrl}, + } + + to, err := cl.AddTorrent(mi) + require.NoError(t, err) + + status := readChannelTimeout(t, cfg.Observers.Trackers.AnnounceStatus, 500*time.Millisecond).(webtorrent.AnnounceStatus) + require.Equal(t, trackerUrl, status.Url) + require.True(t, status.Ok) + require.Nil(t, status.Err) + require.Equal(t, "started", status.Event) + + to.Drop() +} + +func startTestTracker() (*httptest.Server, string) { + s := httptest.NewServer(http.HandlerFunc(testtracker)) + trackerUrl := "ws" + strings.TrimPrefix(s.URL, "http") + return s, trackerUrl +} diff --git a/client.go b/client.go index fe3af23997..339e46a320 100644 --- a/client.go +++ b/client.go @@ -18,6 +18,8 @@ import ( "strconv" "time" + "github.com/anacrolix/torrent/webtorrent" + "github.com/anacrolix/chansync" "github.com/anacrolix/chansync/events" "github.com/anacrolix/dht/v2" @@ -47,7 +49,6 @@ import ( "github.com/anacrolix/torrent/tracker" "github.com/anacrolix/torrent/types/infohash" infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2" - "github.com/anacrolix/torrent/webtorrent" ) // Clients contain zero or more Torrents. A Client manages a blocklist, the @@ -312,7 +313,12 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) { } } + var obs *webtorrent.TrackerObserver + if cl.config.Observers != nil { + obs = &cl.config.Observers.Trackers + } cl.websocketTrackers = websocketTrackers{ + obs: obs, PeerId: cl.peerID, Logger: cl.logger.WithNames("websocketTrackers"), GetAnnounceRequest: func( @@ -741,6 +747,11 @@ func doProtocolHandshakeOnDialResult( cl := t.cl nc := dr.Conn addrIpPort, _ := tryIpPortFromNetAddr(addr) + + var obs *PeerObserver + if t.cl.config.Observers != nil { + obs = &t.cl.config.Observers.Peers + } c, err = cl.initiateProtocolHandshakes( context.Background(), nc, t, obfuscatedHeader, newConnectionOpts{ @@ -750,6 +761,7 @@ func doProtocolHandshakeOnDialResult( localPublicAddr: cl.publicAddr(addrIpPort.IP), network: dr.Dialer.DialerNetwork(), connString: regularNetConnPeerConnConnString(nc), + obs: obs, }) if err != nil { nc.Close() @@ -1128,8 +1140,23 @@ func (t *Torrent) runHandshookConn(pc *PeerConn) error { pc.startMessageWriter() pc.sendInitialMessages() pc.initUpdateRequestsTimer() + + for _, cb := range pc.callbacks.StatusUpdated { + cb(StatusUpdatedEvent{ + Event: PeerConnected, + PeerId: pc.PeerID, + }) + } + err := pc.mainReadLoop() if err != nil { + for _, cb := range pc.callbacks.StatusUpdated { + cb(StatusUpdatedEvent{ + Event: PeerDisconnected, + Error: err, + PeerId: pc.PeerID, + }) + } return fmt.Errorf("main read loop: %w", err) } return nil @@ -1638,6 +1665,7 @@ type newConnectionOpts struct { localPublicAddr peerLocalPublicAddr network string connString string + obs *PeerObserver } func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerConn) { @@ -1658,6 +1686,7 @@ func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerCon }, connString: opts.connString, conn: nc, + Observers: opts.obs, } c.peerRequestDataAllocLimiter.Max = cl.config.MaxAllocPeerRequestDataPerConn c.initRequestState() diff --git a/config.go b/config.go index 76979463cc..7fdd89f450 100644 --- a/config.go +++ b/config.go @@ -7,6 +7,8 @@ import ( "net/url" "time" + "github.com/anacrolix/torrent/webtorrent" + "github.com/anacrolix/dht/v2" "github.com/anacrolix/dht/v2/krpc" "github.com/anacrolix/log" @@ -20,6 +22,23 @@ import ( "github.com/anacrolix/torrent/version" ) +type Observers struct { + Trackers webtorrent.TrackerObserver + Peers PeerObserver +} + +func NewClientObservers() *Observers { + return &Observers{ + Trackers: webtorrent.TrackerObserver{ + ConnStatus: make(chan webtorrent.TrackerStatus), + AnnounceStatus: make(chan webtorrent.AnnounceStatus), + }, + Peers: PeerObserver{ + PeerStatus: make(chan PeerStatus), + }, + } +} + // Contains config elements that are exclusive to tracker handling. There may be other fields in // ClientConfig that are also relevant. type ClientTrackerConfig struct { @@ -32,6 +51,7 @@ type ClientTrackerConfig struct { // Takes a tracker's hostname and requests DNS A and AAAA records. // Used in case DNS lookups require a special setup (i.e., dns-over-https) LookupTrackerIp func(*url.URL) ([]net.IP, error) + Observers *Observers } type ClientDhtConfig struct { diff --git a/peerconn.go b/peerconn.go index 65800dd695..96664d36c0 100644 --- a/peerconn.go +++ b/peerconn.go @@ -32,6 +32,16 @@ import ( utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch" ) +type PeerStatus struct { + Id PeerID + Ok bool + Err string // see https://github.com/golang/go/issues/5161 +} + +type PeerObserver struct { + PeerStatus chan PeerStatus +} + // Maintains the state of a BitTorrent-protocol based connection with a peer. type PeerConn struct { Peer @@ -90,6 +100,7 @@ type PeerConn struct { // we can verify all the pieces for a file when they're all arrived before submitting them to // the torrent. receivedHashPieces map[[32]byte][][32]byte + Observers *PeerObserver } func (cn *PeerConn) pexStatus() string { diff --git a/testing.go b/testing.go index a490927308..1ab74c7a83 100644 --- a/testing.go +++ b/testing.go @@ -1,6 +1,7 @@ package torrent import ( + "github.com/stretchr/testify/require" "testing" "time" @@ -35,3 +36,13 @@ func TestingConfig(t testing.TB) *ClientConfig { //}) return cfg } + +func readChannelTimeout[T any](t *testing.T, channel chan T, duration time.Duration) interface{} { + select { + case s := <-channel: + return s + case <-time.After(duration): + require.Fail(t, "Timeout reading observer channel.") + } + return nil +} diff --git a/torrent.go b/torrent.go index cbd2c05006..58dc2db802 100644 --- a/torrent.go +++ b/torrent.go @@ -1804,6 +1804,15 @@ func (t *Torrent) assertPendingRequests() { func (t *Torrent) dropConnection(c *PeerConn) { t.cl.event.Broadcast() c.close() + + for _, cb := range c.callbacks.StatusUpdated { + cb(StatusUpdatedEvent{ + Event: PeerDisconnected, + PeerId: c.PeerID, + }) + } + t.logger.WithDefaultLevel(log.Debug).Printf("dropping connection to %+q, sent peerconn update", c.PeerID) + if t.deletePeerConn(c) { t.openNewConns() } @@ -1864,6 +1873,12 @@ func (t *Torrent) onWebRtcConn( return } localAddrIpPort := missinggo.IpPortFromNetAddr(netConn.LocalAddr()) + + var obs *PeerObserver + if t.cl.config.Observers != nil { + obs = &t.cl.config.Observers.Peers + } + pc, err := t.cl.initiateProtocolHandshakes( context.Background(), netConn, @@ -1875,6 +1890,7 @@ func (t *Torrent) onWebRtcConn( localPublicAddr: localAddrIpPort, network: webrtcNetwork, connString: fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)), + obs: obs, }, ) if err != nil { diff --git a/webtorrent/tracker-client.go b/webtorrent/tracker-client.go index ec2232d132..9aa2490758 100644 --- a/webtorrent/tracker-client.go +++ b/webtorrent/tracker-client.go @@ -9,6 +9,8 @@ import ( "sync" "time" + "github.com/anacrolix/torrent/types/infohash" + g "github.com/anacrolix/generics" "github.com/anacrolix/log" "github.com/gorilla/websocket" @@ -18,6 +20,23 @@ import ( "github.com/anacrolix/torrent/tracker" ) +type TrackerStatus struct { + Url string `json:"url"` + Ok bool `json:"ok"` + Err error `json:"err"` +} + +type AnnounceStatus struct { + TrackerStatus + Event string `json:"event"` + InfoHash string `json:"info_hash"` +} + +type TrackerObserver struct { + ConnStatus chan TrackerStatus + AnnounceStatus chan AnnounceStatus +} + type TrackerClientStats struct { Dials int64 ConvertedInboundConns int64 @@ -32,6 +51,7 @@ type TrackerClient struct { OnConn onDataChannelOpen Logger log.Logger Dialer *websocket.Dialer + Observers *TrackerObserver mu sync.Mutex cond sync.Cond @@ -99,6 +119,7 @@ func (tc *TrackerClient) doWebsocket() error { c, _, err := tc.Dialer.Dial(tc.Url, header) if err != nil { + tc.updateTrackerConnStatus(TrackerStatus{tc.Url, false, err}) return fmt.Errorf("dialing tracker: %w", err) } defer c.Close() @@ -125,6 +146,7 @@ func (tc *TrackerClient) doWebsocket() error { } } }() + tc.updateTrackerConnStatus(TrackerStatus{tc.Url, true, nil}) err = tc.trackerReadLoop(tc.wsConn) close(closeChan) tc.mu.Lock() @@ -133,6 +155,24 @@ func (tc *TrackerClient) doWebsocket() error { return err } +func (tc *TrackerClient) updateTrackerConnStatus(status TrackerStatus) { + if tc.Observers != nil { + select { + case tc.Observers.ConnStatus <- status: + default: + } + } +} + +func (tc *TrackerClient) updateTrackerAnnounceStatus(status AnnounceStatus) { + if tc.Observers != nil { + select { + case tc.Observers.AnnounceStatus <- status: + default: + } + } +} + // Finishes initialization and spawns the run routine, calling onStop when it completes with the // result. We don't let the caller just spawn the runner directly, since then we can race against // .Close to finish initialization. @@ -261,6 +301,17 @@ func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte, offers []outboundOffer) error { request, err := tc.GetAnnounceRequest(event, infoHash) + if err != nil { + tc.updateTrackerAnnounceStatus(AnnounceStatus{ + TrackerStatus: TrackerStatus{ + Url: tc.Url, + Ok: false, + Err: err, + }, + Event: "", + InfoHash: infohash.T(infoHash).HexString(), + }) + } if err != nil { return fmt.Errorf("getting announce parameters: %w", err) } @@ -282,6 +333,16 @@ func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte }) } + announceStatus := AnnounceStatus{ + TrackerStatus: TrackerStatus{ + Url: tc.Url, + Ok: true, + Err: nil, + }, + Event: req.Event, + InfoHash: infohash.T(infoHash).HexString(), + } + data, err := json.Marshal(req) if err != nil { return fmt.Errorf("marshalling request: %w", err) @@ -291,10 +352,15 @@ func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte defer tc.mu.Unlock() err = tc.writeMessage(data) if err != nil { + announceStatus.Ok = false + announceStatus.Err = err + tc.updateTrackerAnnounceStatus(announceStatus) return fmt.Errorf("write AnnounceRequest: %w", err) } + tc.updateTrackerAnnounceStatus(announceStatus) + g.MakeMapIfNil(&tc.outboundOffers) for _, offer := range offers { - g.MakeMapIfNilAndSet(&tc.outboundOffers, offer.offerId, offer.outboundOfferValue) + g.MapInsert(tc.outboundOffers, offer.offerId, offer.outboundOfferValue) } return nil } diff --git a/webtorrent/transport_test.go b/webtorrent/transport_test.go index dcb170a061..753339d303 100644 --- a/webtorrent/transport_test.go +++ b/webtorrent/transport_test.go @@ -1,3 +1,6 @@ +//go:build !js +// +build !js + package webtorrent import ( diff --git a/wstracker.go b/wstracker.go index 0e71e4e4a0..795e9616fc 100644 --- a/wstracker.go +++ b/wstracker.go @@ -3,6 +3,7 @@ package torrent import ( "context" "fmt" + "github.com/anacrolix/torrent/webtorrent" "net" netHttp "net/http" "net/url" @@ -14,7 +15,6 @@ import ( "github.com/anacrolix/torrent/tracker" httpTracker "github.com/anacrolix/torrent/tracker/http" - "github.com/anacrolix/torrent/webtorrent" ) type websocketTrackerStatus struct { @@ -45,6 +45,7 @@ type websocketTrackers struct { OnConn func(webtorrent.DataChannelConn, webtorrent.DataChannelContext) mu sync.Mutex clients map[string]*refCountedWebtorrentTrackerClient + obs *webtorrent.TrackerObserver Proxy httpTracker.ProxyFunc DialContext func(ctx context.Context, network, addr string) (net.Conn, error) WebsocketTrackerHttpHeader func() netHttp.Header @@ -63,6 +64,7 @@ func (me *websocketTrackers) Get(url string, infoHash [20]byte) (*webtorrent.Tra } value = &refCountedWebtorrentTrackerClient{ TrackerClient: webtorrent.TrackerClient{ + Observers: me.obs, Dialer: dialer, Url: url, GetAnnounceRequest: me.GetAnnounceRequest,