From 7bdf2090880309959e816f5d7619e33d25988afa Mon Sep 17 00:00:00 2001 From: valesteban Date: Tue, 17 Oct 2023 10:04:55 -0300 Subject: [PATCH] resolver async awake lookup --- src/resolver/async_resolver.rs | 17 ++++- src/resolver/lookup.rs | 116 +++++++++++++++++++++------------ 2 files changed, 90 insertions(+), 43 deletions(-) diff --git a/src/resolver/async_resolver.rs b/src/resolver/async_resolver.rs index 10853fd0..f8d5dbaa 100644 --- a/src/resolver/async_resolver.rs +++ b/src/resolver/async_resolver.rs @@ -36,7 +36,7 @@ impl AsyncResolver { //Async query let response = LookupIpFutureStub::lookup(domain_name_struct, self.cache.clone(),conn).await; - + println!("[LOOKUP IP RESPONSE => {:?}]",response); let ip_addr = match response { Ok(val) => { @@ -66,6 +66,7 @@ mod async_resolver_test { // use tokio::runtime::Runtime; use crate::resolver::config::ResolverConfig; use super::AsyncResolver; + // #[test] @@ -87,12 +88,24 @@ mod async_resolver_test { #[tokio::test] #[ignore] async fn lookupip_example() { + println!("[TEST INIT]"); let resolver = AsyncResolver::new(ResolverConfig::default()); let response = resolver.lookup_ip("example.com", "UDP").await.unwrap(); - println!("[TEST => {}]",response); + println!("[TEST FINISH=> {}]",response); + + + } + // #[test] + // fn test_spawn() { + + // let resolver = AsyncResolver::new(ResolverConfig::default()); + + // let handle = tokio::spawn(resolver.lookup_ip("example.com", "UDP")); + // unimplemented!(); + // } } \ No newline at end of file diff --git a/src/resolver/lookup.rs b/src/resolver/lookup.rs index a77eb7d4..7fb3c0be 100644 --- a/src/resolver/lookup.rs +++ b/src/resolver/lookup.rs @@ -10,7 +10,9 @@ use crate::message::type_qtype::Qtype; use crate::message::question::Question; use crate::client::client_error::ClientError; -use futures_util::FutureExt; +use std::sync::Arc; +use futures_util::{FutureExt,task::Waker}; +use tokio::io::AsyncWriteExt; use std::pin::Pin; use std::task::{Poll,Context}; //TODO: Eliminar librerias @@ -18,14 +20,16 @@ use std::net::{IpAddr,Ipv4Addr}; use futures_util::{future::Future,future}; use super::resolver_error::ResolverError; use std::time::Duration; +use std::sync:: Mutex; use crate::client::client_connection::ClientConnectionType; //Future returned fron AsyncResolver when performing a lookup with rtype A pub struct LookupIpFutureStub { name: DomainName, // cache: DnsCache, - query: Pin< Box< dyn Future > > >, + query:Pin< Box< dyn Future> >>, cache: DnsCache, conn: ClientConnectionType, + waker: Option, } impl Future for LookupIpFutureStub{ @@ -34,63 +38,61 @@ impl Future for LookupIpFutureStub{ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { println!("[POLL FUTURE]"); - loop { - let query = self.query.as_mut().poll(cx); - println!("[POLL] query "); - - match query { - Poll::Pending => { - println!(" [Pending]"); - return Poll::Pending; - }, - Poll::Ready(Err(_)) => { - println!(" [ready err]"); - self.query = Box::pin(lookup_stub(self.name.clone(), self.cache.clone(),self.conn.clone())); - }, - Poll::Ready(Ok(ip_addr)) => { - println!(" [Ready]"); - return Poll::Ready(Ok(ip_addr)); - } + let query = self.query.as_mut().poll(cx); + println!("[POLL query {:?}",query); + + match query { + Poll::Pending => { + println!(" [return pending]"); + return Poll::Pending; + }, + Poll::Ready(Err(_)) => { + println!(" [ready empty]"); + self.waker = Some(cx.waker().clone()); + + tokio::spawn( + lookup_stub(self.name.clone(),self.cache.clone(),self.conn.clone(),self.waker.clone())); + println!(" [return pending]"); + return Poll::Pending; + }, + Poll::Ready(Ok(ip_addr)) => { + println!(" [return ready]"); + return Poll::Ready(Ok(ip_addr)); } } } } - - impl LookupIpFutureStub{ pub fn lookup( name: DomainName, cache:DnsCache, conn: ClientConnectionType, ) -> Self { - println!("[LOOKUP FUTURE]"); + println!("[LOOKUP CREATE FUTURE]");; Self { name: name, query: future::err(ResolverError::Message("Empty")).boxed(), //FIXME: cambiar a otro tipo el error/inicio cache: cache, conn: conn, + waker: None, } } -} +} pub async fn lookup_stub( //FIXME: podemos ponerle de nombre lookup_strategy y que se le pase ahi un parametro strategy que diga si son los pasos o si funciona como stub name: DomainName, mut cache: DnsCache, conn: ClientConnectionType, + waker: Option, + // query:Pin< Box< dyn Future > > >, ) -> Result { println!("[LOOKUP STUB]"); - // //FIXME: change values - // let server:IpAddr = IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1)); - // let timeout:Duration = Duration::new(2, 0); - - // //Connection type - // let conn = ClientUDPConnection::new(server, timeout); - //Create query - let mut query = DnsMessage::new_query_message( + + let mut new_query = DnsMessage::new_query_message( DomainName::new_from_string("example.com".to_string()), Qtype::A, Qclass::IN, @@ -101,7 +103,7 @@ pub async fn lookup_stub( //FIXME: podemos ponerle de nombre lookup_strategy y let mut question = Question::new(); question.set_qclass(Qclass::IN); - query.set_question(question); + new_query.set_question(question); @@ -114,25 +116,33 @@ pub async fn lookup_stub( //FIXME: podemos ponerle de nombre lookup_strategy y .iter() .map(|rr_cache_value| rr_cache_value.get_resource_record()) .collect::>(); - query.set_answer(answer); - return Ok(query); + new_query.set_answer(answer); + return Ok(new_query); } //FIXME: let responseResult: Result = match conn { ClientConnectionType::TCP(client_conn) => { - match client_conn.send(query) { + match client_conn.send(new_query) { Err(_) => Err(ResolverError::Message("Error: Receiving DNS message")), - Ok(val) => Ok(val), + Ok(val) => { + Ok(val) + }, } } ClientConnectionType::UDP(client_conn) => { - match client_conn.send(query) { + match client_conn.send(new_query) { Err(_) => Err(ResolverError::Message("Error: Receiving DNS message")), - Ok(val) => Ok(val), + Ok(val) => { + Ok(val)}, } } - }; + }; + //para que en siguient eciclo de tokio despierte esta task + if let Some(waker) = waker { + println!("[LOOKUP STUB] wake"); + waker.wake(); + } println!("[LOOKUP STUB] return"); responseResult @@ -141,8 +151,32 @@ pub async fn lookup_stub( //FIXME: podemos ponerle de nombre lookup_strategy y +#[cfg(test)] +mod async_resolver_test { + // use tokio::runtime::Runtime; + use crate::{ domain_name::DomainName, dns_cache::DnsCache}; + use super::lookup_stub; + use tokio::time::Duration; + use std::net::{IpAddr, Ipv4Addr}; + use super::*; + + #[tokio::test] + async fn lookup_stub_test() { + let name = DomainName::new_from_string("example.com".to_string()); + let cache = DnsCache::new(); + let waker = None; + + let google_server: IpAddr = IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)); + let timeout: Duration = Duration::from_secs(20); + + let client_udp = ClientUDPConnection::new(google_server, timeout); + let conn = ClientConnectionType::UDP(client_udp); + + let result = lookup_stub(name, cache, conn, waker).await; + println!("[Test Result ] {:?}", result); + } - - + +} \ No newline at end of file