Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEP32] Implement IPv6 DHT support #636

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ This is a list of all the BEPs which have been implemented in MonoTorrent. A ful
## Draft BEPs

* BEP 16 - [Superseeding](http://www.bittorrent.org/beps/bep_0016.html)
* BEP 32 - [BitTorrent DHT Extensions for IPv6](http://www.bittorrent.org/beps/bep_0032.html)
* BEP 48 - [Tracker Protocol Extension: Scrape](http://www.bittorrent.org/beps/bep_0048.html)
* BEP 47 - [Padding files and extended file attributes](https://www.bittorrent.org/beps/bep_0047.html)
* BEP 52 - [The BitTorrent Protocol Specification v2](https://www.bittorrent.org/beps/bep_0052.html)
Expand Down
8 changes: 4 additions & 4 deletions src/MonoTorrent.Client/MonoTorrent.Client.Modes/Mode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ public void HandleMessage (PeerId id, PeerMessage message, PeerMessage.Releaser
HandleHaveMessage (id, have);
else if (message is RequestMessage request)
HandleRequestMessage (id, request);
else if (message is PortMessage port)
HandlePortMessage (id, port);
else if (message is DhtPortMessage port)
HandleDhtPortMessage (id, port);
else if (message is PieceMessage piece)
HandlePieceMessage (id, piece, releaser);
else if (message is NotInterestedMessage notinterested)
Expand Down Expand Up @@ -466,9 +466,9 @@ async void WritePieceAsync (PieceMessage message, PeerMessage.Releaser releaser,
Manager.finishedPieces.Enqueue (block.PieceIndex);
}

protected virtual void HandlePortMessage (PeerId id, PortMessage message)
protected virtual void HandleDhtPortMessage (PeerId id, DhtPortMessage message)
{
id.Port = (ushort) message.Port;
id.DhtPort = (ushort) message.Port;
}

protected virtual void HandleRequestMessage (PeerId id, RequestMessage message)
Expand Down
146 changes: 77 additions & 69 deletions src/MonoTorrent.Client/MonoTorrent.Client/ClientEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,11 @@ public async Task SaveStateAsync (string pathToStateFile)

public ConnectionManager ConnectionManager { get; }

public IDht Dht { get; private set; }
public IDualStackDhtEngine Dht => DhtEngine;

internal IDhtEngine DhtEngine { get; private set; }
internal DualStackDhtEngine DhtEngine { get; private set; }

IDhtListener DhtListener { get; set; }
IList<IDhtListener> DhtListeners { get; set; }

public DiskManager DiskManager { get; }

Expand Down Expand Up @@ -324,13 +324,18 @@ public ClientEngine (EngineSettings settings, Factories factories)
PeerListeners = Array.AsReadOnly (settings.ListenEndPoints.Values.Select (t => Factories.CreatePeerConnectionListener (t)).ToArray ());
listenManager.SetListeners (PeerListeners);

DhtListener = (settings.DhtEndPoint == null ? null : Factories.CreateDhtListener (settings.DhtEndPoint)) ?? new NullDhtListener ();
DhtEngine = (settings.DhtEndPoint == null ? null : Factories.CreateDht ()) ?? new NullDhtEngine ();
Dht = new DhtEngineWrapper (DhtEngine);
DhtEngine.SetListenerAsync (DhtListener).GetAwaiter ().GetResult ();
DhtListeners = settings.DhtEndPoints.Select (t => Factories.CreateDhtListener (t)).Where (t => t != null).ToList ().AsReadOnly ();
DhtEngine = new DualStackDhtEngine (Factories);
DhtEngine.SetListenersAsync (DhtListeners).GetAwaiter ().GetResult ();

// Listen on the IPv4 DHT
DhtEngine.IPv4Dht.StateChanged += DhtEngineStateChanged;
DhtEngine.IPv4Dht.PeersFound += DhtEnginePeersFound;

// ... And the IPv6 DHT.
DhtEngine.IPv6Dht.StateChanged += DhtEngineStateChanged;
DhtEngine.IPv6Dht.PeersFound += DhtEnginePeersFound;

DhtEngine.StateChanged += DhtEngineStateChanged;
DhtEngine.PeersFound += DhtEnginePeersFound;
LocalPeerDiscovery = new NullLocalPeerDiscovery ();

RegisterLocalPeerDiscovery (settings.AllowLocalPeerDiscovery ? Factories.CreateLocalPeerDiscovery () : null);
Expand Down Expand Up @@ -558,7 +563,8 @@ public void Dispose ()
listener.Stop ();
listenManager.SetListeners (Array.Empty<IPeerConnectionListener> ());

DhtListener.Stop ();
foreach (var listener in DhtListeners)
listener.Stop ();
DhtEngine.Dispose ();

DiskManager.Dispose ();
Expand Down Expand Up @@ -645,7 +651,9 @@ async Task Register (TorrentManager manager, bool isPublic)

manager.DownloadLimiters.Add (downloadLimiters);
manager.UploadLimiters.Add (uploadLimiters);
if (DhtEngine != null && manager.Torrent?.Nodes != null && DhtEngine.State != DhtState.Ready) {

// FIXME: Are these still ipv4 nodes, or has the spec been updated?
if (DhtEngine != null && manager.Torrent?.Nodes != null && DhtEngine.IPv4Dht.State != DhtState.Ready) {
try {
DhtEngine.Add (manager.Torrent.Nodes.OfType<BEncodedString> ().Select (t => t.AsMemory ()));
} catch {
Expand All @@ -654,23 +662,6 @@ async Task Register (TorrentManager manager, bool isPublic)
}
}

async Task RegisterDht (IDhtEngine engine)
{
if (DhtEngine != null) {
DhtEngine.StateChanged -= DhtEngineStateChanged;
DhtEngine.PeersFound -= DhtEnginePeersFound;
await DhtEngine.StopAsync ();
DhtEngine.Dispose ();
}
DhtEngine = engine ?? new NullDhtEngine ();
Dht = new DhtEngineWrapper (DhtEngine);

DhtEngine.StateChanged += DhtEngineStateChanged;
DhtEngine.PeersFound += DhtEnginePeersFound;
if (IsRunning)
await DhtEngine.StartAsync (await MaybeLoadDhtNodes ());
}

void RegisterLocalPeerDiscovery (ILocalPeerDiscovery? localPeerDiscovery)
{
if (LocalPeerDiscovery != null) {
Expand Down Expand Up @@ -708,23 +699,28 @@ async void DhtEnginePeersFound (object? o, PeersFoundEventArgs e)
}
}

async void DhtEngineStateChanged (object? o, EventArgs e)
void DhtEngineStateChanged (object? o, EventArgs e)
{
if (DhtEngine.State != DhtState.Ready)
return;
if (o == DhtEngine.IPv4Dht && Dht.IPv4Dht.State == DhtState.Ready)
AnnounceToDht (DhtEngine.IPv4Dht, GetOverrideOrActualListenPort ("ipv4") ?? -1);

if (o == Dht.IPv6Dht && Dht.IPv6Dht.State == DhtState.Ready)
AnnounceToDht (DhtEngine.IPv6Dht, GetOverrideOrActualListenPort ("ipv6") ?? -1);
}

async void AnnounceToDht (IDhtEngine engine, int port)
{
await MainLoop;
foreach (TorrentManager manager in allTorrents) {
if (!manager.CanUseDht)
continue;

// IPV6: Also report to an ipv6 DHT node
if (manager.InfoHashes.V1 != null) {
DhtEngine.Announce (manager.InfoHashes.V1, GetOverrideOrActualListenPort ("ipv4") ?? -1);
engine.Announce (manager.InfoHashes.V1, port);
DhtEngine.GetPeers (manager.InfoHashes.V1);
}
if (manager.InfoHashes.V2 != null) {
DhtEngine.Announce (manager.InfoHashes.V2.Truncate (), GetOverrideOrActualListenPort ("ipv4") ?? -1);
engine.Announce (manager.InfoHashes.V2.Truncate (), port);
DhtEngine.GetPeers (manager.InfoHashes.V2.Truncate ());
}
}
Expand Down Expand Up @@ -812,12 +808,10 @@ internal async Task StartAsync ()
await PortForwarder.StartAsync (CancellationToken.None);

LocalPeerDiscovery.Start ();
await DhtEngine.StartAsync (await MaybeLoadDhtNodes ());

await StartAndPortMapPeerListeners ();

if (DhtListener.LocalEndPoint != null)
await PortForwarder.RegisterMappingAsync (new Mapping (Protocol.Udp, DhtListener.LocalEndPoint.Port));
await StartAndPortMapDhtListeners ();
await DhtEngine.StartAsync (await MaybeLoadDhtNodes (AddressFamily.InterNetwork), await MaybeLoadDhtNodes (AddressFamily.InterNetworkV6));
}
}

Expand All @@ -828,9 +822,7 @@ internal async Task StopAsync ()
IsRunning = allTorrents.Exists (m => m.State != TorrentState.Stopped);
if (!IsRunning) {
await UnmapAndStopPeerListeners ();

if (DhtListener.LocalEndPoint != null)
await PortForwarder.UnregisterMappingAsync (new Mapping (Protocol.Udp, DhtListener.LocalEndPoint.Port), CancellationToken.None);
await UnmapAndStopDhtListeners ();

LocalPeerDiscovery.Stop ();

Expand All @@ -842,6 +834,20 @@ internal async Task StopAsync ()
}
}

async ReusableTask StartAndPortMapDhtListeners ()
{
foreach (var v in DhtListeners)
v.Start ();

// The settings could say to listen at port 0, which means 'choose one dynamically'
var maps = DhtListeners
.Select (t => t.LocalEndPoint!)
.Where (t => t != null)
.Select (endpoint => PortForwarder.RegisterMappingAsync (new Mapping (Protocol.Tcp, endpoint.Port)))
.ToArray ();
await Task.WhenAll (maps);
}

async ReusableTask StartAndPortMapPeerListeners ()
{
foreach (var v in PeerListeners)
Expand All @@ -856,6 +862,19 @@ async ReusableTask StartAndPortMapPeerListeners ()
await Task.WhenAll (maps);
}

async ReusableTask UnmapAndStopDhtListeners ()
{
var unmaps = DhtListeners
.Select (t => t.LocalEndPoint!)
.Where (t => t != null)
.Select (endpoint => PortForwarder.UnregisterMappingAsync (new Mapping (Protocol.Udp, endpoint.Port), CancellationToken.None))
.ToArray ();
await Task.WhenAll (unmaps);

foreach (var listener in DhtListeners)
listener.Stop ();
}

async ReusableTask UnmapAndStopPeerListeners()
{
var unmaps = PeerListeners
Expand All @@ -869,12 +888,12 @@ async ReusableTask UnmapAndStopPeerListeners()
listener.Stop ();
}

async ReusableTask<ReadOnlyMemory<byte>> MaybeLoadDhtNodes ()
async ReusableTask<ReadOnlyMemory<byte>> MaybeLoadDhtNodes (AddressFamily family)
{
if (!Settings.AutoSaveLoadDhtCache)
return ReadOnlyMemory<byte>.Empty;

var savePath = Settings.GetDhtNodeCacheFilePath ();
var savePath = Settings.GetDhtNodeCacheFilePath (family);
return await Task.Run (() => File.Exists (savePath) ? File.ReadAllBytes (savePath) : ReadOnlyMemory<byte>.Empty);
}

Expand All @@ -883,7 +902,13 @@ async ReusableTask MaybeSaveDhtNodes ()
if (!Settings.AutoSaveLoadDhtCache)
return;

var nodes = await DhtEngine.SaveNodesAsync ();
await MaybeSaveDhtNodes (DhtEngine.IPv4Dht);
await MaybeSaveDhtNodes (DhtEngine.IPv6Dht);
}

async ReusableTask MaybeSaveDhtNodes (IDhtEngine engine)
{
var nodes = await DhtEngine.IPv4Dht.SaveNodesAsync ();
if (nodes.Length == 0)
return;

Expand All @@ -895,7 +920,7 @@ async ReusableTask MaybeSaveDhtNodes ()
// TorrentManagers and the file write could happen
// concurrently.
using (await dhtNodeLocker.EnterAsync ().ConfigureAwait (false)) {
var savePath = Settings.GetDhtNodeCacheFilePath ();
var savePath = Settings.GetDhtNodeCacheFilePath (engine.AddressFamily);
var parentDir = Path.GetDirectoryName (savePath)!;
Directory.CreateDirectory (parentDir);
File.WriteAllBytes (savePath, nodes.ToArray ());
Expand Down Expand Up @@ -950,37 +975,20 @@ async Task UpdateSettingsAsync (EngineSettings oldSettings, EngineSettings newSe
await PortForwarder.StopAsync (removeExistingMappings: true, CancellationToken.None);
}

if (oldSettings.DhtEndPoint != newSettings.DhtEndPoint) {
if (DhtListener.LocalEndPoint != null)
await PortForwarder.UnregisterMappingAsync (new Mapping (Protocol.Udp, DhtListener.LocalEndPoint.Port), CancellationToken.None);
DhtListener.Stop ();
if (oldSettings.DhtEndPoints.SequenceEqual (newSettings.DhtEndPoints)) {
await UnmapAndStopDhtListeners ();

if (newSettings.DhtEndPoint == null) {
DhtListener = new NullDhtListener ();
await RegisterDht (new NullDhtEngine ());
} else {
DhtListener = (Settings.DhtEndPoint is null ? null : Factories.CreateDhtListener (Settings.DhtEndPoint)) ?? new NullDhtListener ();
if (IsRunning)
DhtListener.Start ();
DhtListeners = Array.AsReadOnly (newSettings.DhtEndPoints.Select (Factories.CreateDhtListener).Where (t => t != null).ToArray ());
await DhtEngine.SetListenersAsync (DhtListeners);

if (oldSettings.DhtEndPoint == null) {
var dht = Factories.CreateDht ();
await dht.SetListenerAsync (DhtListener);
await RegisterDht (dht);

} else {
await DhtEngine.SetListenerAsync (DhtListener);
}
}

if (DhtListener.LocalEndPoint != null)
await PortForwarder.RegisterMappingAsync (new Mapping (Protocol.Udp, DhtListener.LocalEndPoint.Port));
if (IsRunning)
await StartAndPortMapDhtListeners ();
}

if (!oldSettings.ListenEndPoints.SequenceEqual (newSettings.ListenEndPoints)) {
await UnmapAndStopPeerListeners ();

PeerListeners = Array.AsReadOnly (newSettings.ListenEndPoints.Values.Select (t => Factories.CreatePeerConnectionListener (t)).ToArray ());
PeerListeners = Array.AsReadOnly (newSettings.ListenEndPoints.Values.Select (Factories.CreatePeerConnectionListener).Where (t => t != null).ToArray ());
listenManager.SetListeners (PeerListeners);

if (IsRunning)
Expand Down
2 changes: 1 addition & 1 deletion src/MonoTorrent.Client/MonoTorrent.Client/PeerId.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public EncryptionType EncryptionType {
internal MessageQueue MessageQueue { get; set; }
internal Peer Peer { get; }
internal PeerExchangeManager? PeerExchangeManager { get; set; }
internal ushort Port { get; set; }
internal ushort DhtPort { get; set; }
internal List<int> SuggestedPieces { get; }

#endregion Properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;

using MonoTorrent.Connections;
using MonoTorrent.Dht;
Expand Down Expand Up @@ -126,10 +127,15 @@ public sealed class EngineSettings : IEquatable<EngineSettings>
public CachePolicy DiskCachePolicy { get; } = CachePolicy.WritesOnly;

/// <summary>
/// The UDP port used for DHT communications. Set the port to 0 to choose a random available port.
/// Set to null to disable DHT. Defaults to IPAddress.Any with port 0.
/// The list of endpoints which will be used for DHT communications. An empty collection disables DHT.
/// By default the collection contains two IPEndPoints, binding to IPAddress.Any and IPAddress.IPv6Any using
/// port 0.
/// </summary>
public IPEndPoint? DhtEndPoint { get; } = new IPEndPoint (IPAddress.Any, 0);
public IList<IPEndPoint> DhtEndPoints { get; } = Array.AsReadOnly (new[] {
new IPEndPoint (IPAddress.Any, 0),
new IPEndPoint (IPAddress.IPv6Any, 0)
});


/// <summary>
/// This is the full path to a sub-directory of <see cref="CacheDirectory"/>. If <see cref="AutoSaveLoadFastResume"/>
Expand Down Expand Up @@ -265,7 +271,7 @@ public EngineSettings ()
internal EngineSettings (
IList<EncryptionType> allowedEncryption, bool allowHaveSuppression, bool allowLocalPeerDiscovery, bool allowPortForwarding,
bool autoSaveLoadDhtCache, bool autoSaveLoadFastResume, bool autoSaveLoadMagnetLinkMetadata, string cacheDirectory,
TimeSpan connectionTimeout, IPEndPoint? dhtEndPoint, int diskCacheBytes, CachePolicy diskCachePolicy, FastResumeMode fastResumeMode, Dictionary<string, IPEndPoint> listenEndPoints,
TimeSpan connectionTimeout, IList<IPEndPoint>? dhtEndPoints, int diskCacheBytes, CachePolicy diskCachePolicy, FastResumeMode fastResumeMode, Dictionary<string, IPEndPoint> listenEndPoints,
int maximumConnections, int maximumDiskReadRate, int maximumDiskWriteRate, int maximumDownloadRate, int maximumHalfOpenConnections,
int maximumOpenFiles, int maximumUploadRate, IDictionary<string, IPEndPoint> reportedListenEndPoints, bool usePartialFiles,
TimeSpan webSeedConnectionTimeout, TimeSpan webSeedDelay, int webSeedSpeedTrigger, TimeSpan staleRequestTimeout,
Expand All @@ -279,7 +285,7 @@ internal EngineSettings (
AutoSaveLoadDhtCache = autoSaveLoadDhtCache;
AutoSaveLoadFastResume = autoSaveLoadFastResume;
AutoSaveLoadMagnetLinkMetadata = autoSaveLoadMagnetLinkMetadata;
DhtEndPoint = dhtEndPoint;
DhtEndPoints = Array.AsReadOnly (dhtEndPoints?.ToArray () ?? Array.Empty<IPEndPoint> ());
DiskCacheBytes = diskCacheBytes;
DiskCachePolicy = diskCachePolicy;
CacheDirectory = cacheDirectory;
Expand All @@ -302,8 +308,12 @@ internal EngineSettings (
WebSeedSpeedTrigger = webSeedSpeedTrigger;
}

internal string GetDhtNodeCacheFilePath ()
=> Path.Combine (CacheDirectory, "dht_nodes.cache");
internal string GetDhtNodeCacheFilePath (AddressFamily family)
=> family switch {
AddressFamily.InterNetwork => Path.Combine (CacheDirectory, "dht_nodes_ipv4.cache"),
AddressFamily.InterNetworkV6 => Path.Combine (CacheDirectory, "dht_nodes_ipv6.cache"),
_ => throw new NotSupportedException ($"DHT does not support AddressFamily.{family}. Only InterNetwork and InterNetworkv6 are supported.")
};

/// <summary>
/// Returns the full path to the <see cref="FastResume"/> file for the specified torrent. This is
Expand Down Expand Up @@ -334,7 +344,7 @@ public bool Equals (EngineSettings? other)
&& AutoSaveLoadFastResume == other.AutoSaveLoadFastResume
&& AutoSaveLoadMagnetLinkMetadata == other.AutoSaveLoadMagnetLinkMetadata
&& CacheDirectory == other.CacheDirectory
&& Equals (DhtEndPoint, other.DhtEndPoint)
&& DhtEndPoints.SequenceEqual (other.DhtEndPoints)
&& DiskCacheBytes == other.DiskCacheBytes
&& DiskCachePolicy == other.DiskCachePolicy
&& FastResumeMode == other.FastResumeMode
Expand Down
Loading