From 61a0f4e9145934f93c1fceb7244442a71b994c36 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Fri, 29 Nov 2024 16:24:08 +0000 Subject: [PATCH] Expose start/stop content providing on DHT for libp2p backend --- substrate/client/network/src/behaviour.rs | 12 ++++++++++++ substrate/client/network/src/discovery.rs | 21 +++++++++++++++++++++ substrate/client/network/src/event.rs | 3 +++ substrate/client/network/src/service.rs | 15 +++++++++++++++ 4 files changed, 51 insertions(+) diff --git a/substrate/client/network/src/behaviour.rs b/substrate/client/network/src/behaviour.rs index dbb72381b6604..16c081a4dd2ae 100644 --- a/substrate/client/network/src/behaviour.rs +++ b/substrate/client/network/src/behaviour.rs @@ -310,6 +310,16 @@ impl Behaviour { ) { self.discovery.store_record(record_key, record_value, publisher, expires); } + + /// Start providing `key` on the DHT. + pub fn start_providing(&mut self, key: RecordKey) { + self.discovery.start_providing(key) + } + + /// Stop providing `key` on the DHT. + pub fn stop_providing(&mut self, key: &RecordKey) { + self.discovery.stop_providing(key) + } } impl From for BehaviourOut { @@ -387,6 +397,8 @@ impl From for BehaviourOut { ), DiscoveryOut::ValuePutFailed(key, duration) => BehaviourOut::Dht(DhtEvent::ValuePutFailed(key.into()), Some(duration)), + DiscoveryOut::StartProvidingFailed(key) => + BehaviourOut::Dht(DhtEvent::StartProvidingFailed(key.into()), None), DiscoveryOut::RandomKademliaStarted => BehaviourOut::RandomKademliaStarted, } } diff --git a/substrate/client/network/src/discovery.rs b/substrate/client/network/src/discovery.rs index 8080bda9a5749..720850637347f 100644 --- a/substrate/client/network/src/discovery.rs +++ b/substrate/client/network/src/discovery.rs @@ -466,6 +466,24 @@ impl DiscoveryBehaviour { } } } + + /// Register as a content provider on the DHT for `key`. + pub fn start_providing(&mut self, key: RecordKey) { + if let Some(kad) = self.kademlia.as_mut() { + if let Err(e) = kad.start_providing(key.clone()) { + warn!(target: "sub-libp2p", "Libp2p => Failed to start providing {key:?}: {e}."); + self.pending_events.push_back(DiscoveryOut::StartProvidingFailed(key)); + } + } + } + + /// Deregister as a content provider on the DHT for `key`. + pub fn stop_providing(&mut self, key: &RecordKey) { + if let Some(kad) = self.kademlia.as_mut() { + kad.stop_providing(key); + } + } + /// Store a record in the Kademlia record store. pub fn store_record( &mut self, @@ -581,6 +599,9 @@ pub enum DiscoveryOut { /// Returning the corresponding key as well as the request duration. ValuePutFailed(RecordKey, Duration), + /// Starting providing a key failed. + StartProvidingFailed(RecordKey), + /// Started a random Kademlia query. /// /// Only happens if [`DiscoveryConfig::with_dht_random_walk`] has been configured to `true`. diff --git a/substrate/client/network/src/event.rs b/substrate/client/network/src/event.rs index 626cf516a7ec2..6eea38d839f0a 100644 --- a/substrate/client/network/src/event.rs +++ b/substrate/client/network/src/event.rs @@ -45,6 +45,9 @@ pub enum DhtEvent { /// An error has occurred while putting a record into the DHT. ValuePutFailed(Key), + /// An error occured while registering as a content provider on the DHT. + StartProvidingFailed(Key), + /// The DHT received a put record request. PutRecordRequest(Key, Vec, Option, Option), } diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index 5e5e4ee285894..e97300f1923af 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -973,6 +973,14 @@ where expires, )); } + + fn start_providing(&self, key: KademliaKey) { + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StartProviding(key)); + } + + fn stop_providing(&self, key: KademliaKey) { + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StopProviding(key)); + } } #[async_trait::async_trait] @@ -1333,6 +1341,8 @@ enum ServiceToWorkerMsg { update_local_storage: bool, }, StoreRecord(KademliaKey, Vec, Option, Option), + StartProviding(KademliaKey), + StopProviding(KademliaKey), AddKnownAddress(PeerId, Multiaddr), EventStream(out_events::Sender), Request { @@ -1466,6 +1476,10 @@ where .network_service .behaviour_mut() .store_record(key.into(), value, publisher, expires), + ServiceToWorkerMsg::StartProviding(key) => + self.network_service.behaviour_mut().start_providing(key.into()), + ServiceToWorkerMsg::StopProviding(key) => + self.network_service.behaviour_mut().stop_providing(&key.into()), ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) => self.network_service.behaviour_mut().add_known_address(peer_id, addr), ServiceToWorkerMsg::EventStream(sender) => self.event_streams.push(sender), @@ -1678,6 +1692,7 @@ where DhtEvent::ValuePut(_) => "value-put", DhtEvent::ValuePutFailed(_) => "value-put-failed", DhtEvent::PutRecordRequest(_, _, _, _) => "put-record-request", + DhtEvent::StartProvidingFailed(_) => "start-providing-failed", }; metrics .kademlia_query_duration