Skip to content

Commit

Permalink
Implement web-friendly versions of sleep and sleep_until
Browse files Browse the repository at this point in the history
  • Loading branch information
er-azh authored and Lonami committed Dec 22, 2024
1 parent 067042d commit 922f810
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 7 deletions.
6 changes: 4 additions & 2 deletions lib/grammers-client/src/client/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use super::{Client, ClientInner, Config};
use crate::utils;
use grammers_mtproto::mtp;
use grammers_mtproto::transport;
use grammers_mtsender::{self as sender, AuthorizationError, InvocationError, RpcError, Sender};
use grammers_mtsender::{
self as sender, utils::sleep, AuthorizationError, InvocationError, RpcError, Sender,
};
use grammers_session::{ChatHashCache, MessageBox};
use grammers_tl_types::{self as tl, Deserializable};
use log::{debug, info};
Expand Down Expand Up @@ -397,7 +399,7 @@ impl Connection {
delay,
std::any::type_name::<R>()
);
tokio::time::sleep(delay).await;
sleep(delay).await;
slept_flood = true;
rx = self.request_tx.read().unwrap().enqueue(request);
continue;
Expand Down
2 changes: 1 addition & 1 deletion lib/grammers-client/src/client/updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
use super::Client;
use crate::types::{ChatMap, Update};
use futures::future::{select, Either};
use grammers_mtsender::utils::sleep_until;
pub use grammers_mtsender::{AuthorizationError, InvocationError};
use grammers_session::channel_id;
pub use grammers_session::{PrematureEndReason, UpdateState};
use grammers_tl_types as tl;
use std::pin::pin;
use std::sync::Arc;
use tokio::time::sleep_until;
use std::time::Duration;
use web_time::Instant;

Expand Down
3 changes: 2 additions & 1 deletion lib/grammers-client/src/types/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use futures::future::Either;
use grammers_mtsender::utils;
use grammers_mtsender::InvocationError;
use grammers_session::PackedChat;
use grammers_tl_types as tl;
Expand Down Expand Up @@ -118,7 +119,7 @@ impl ActionSender {

let action = async {
request_result = self.oneshot(action().into()).await;
tokio::time::sleep(self.repeat_delay).await;
utils::sleep(self.repeat_delay).await;
};

tokio::pin!(action);
Expand Down
4 changes: 4 additions & 0 deletions lib/grammers-mtsender/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ hickory-resolver = { version = "0.24.1", optional = true }
url = { version = "2.5.2", optional = true }
web-time = "1.1.0"

[target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dependencies]
wasm-bindgen-futures = "0.4.49"
web-sys = {version = "0.3.76", features = ["Window"]}

[dev-dependencies]
simple_logger = { version = "5.0.0", default-features = false, features = ["colors"] }
tokio = { version = "1.40.0", features = ["rt"] }
Expand Down
10 changes: 10 additions & 0 deletions lib/grammers-mtsender/DEPS.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,13 @@ SOCKS5 proxy support.

Used for its web-friendly clock and timer as a replacement for `std::time` in the library.
Automatically falls back to `std::time` when we're not targeting web.

## web-sys

Only used when targeting `wasm32-unknown-unknown`. Used by the `Timeout` implementation to
call `setTimeout` and `clearTimeout` in the browser.

## wasm-bindgen-futures

Only used when targeting `wasm32-unknown-unknown`. Used by the `Timeout` implementation to
convert a `Promise` into a `Future`.
7 changes: 4 additions & 3 deletions lib/grammers-mtsender/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

mod errors;
mod reconnection;
pub mod utils;

pub use crate::reconnection::*;
pub use errors::{AuthorizationError, InvocationError, ReadError, RpcError};
Expand All @@ -35,7 +36,7 @@ use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
use tokio::time::sleep_until;
use utils::{sleep, sleep_until};
use web_time::{Instant, SystemTime};

#[cfg(feature = "proxy")]
Expand Down Expand Up @@ -387,7 +388,7 @@ impl<T: Transport, M: Mtp> Sender<T, M> {
Err(e) => {
attempts += 1;
log::warn!("auto-reconnect failed {} time(s): {}", attempts, e);
tokio::time::sleep(Duration::from_secs(1)).await;
sleep(Duration::from_secs(1)).await;

match self.reconnection_policy.should_retry(attempts) {
ControlFlow::Break(_) => {
Expand All @@ -398,7 +399,7 @@ impl<T: Transport, M: Mtp> Sender<T, M> {
return Err(e);
}
ControlFlow::Continue(duration) => {
tokio::time::sleep(duration).await;
sleep(duration).await;
}
}
}
Expand Down
93 changes: 93 additions & 0 deletions lib/grammers-mtsender/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2020 - developers of the `grammers` project.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use std::time::Duration;
use web_time::Instant;

/// A cancellable timeout for web platforms.
/// It is simply a wrapper around `window.setTimeout` but also makes
/// sure to clear the timeout when dropped to avoid leaking timers.
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
pub struct Timeout {
handle: std::cell::OnceCell<i32>,
inner: wasm_bindgen_futures::JsFuture,
}

#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
impl std::future::Future for Timeout {
type Output = ();

fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
std::pin::Pin::new(&mut self.get_mut().inner)
.poll(cx)
.map(|_| ())
}
}

#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
impl Drop for Timeout {
fn drop(&mut self) {
if let Some(handle) = self.handle.get() {
web_sys::window()
.unwrap()
.clear_timeout_with_handle(*handle);
}
}
}

#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
impl Timeout {
pub fn new(duration: Duration) -> Self {
use wasm_bindgen_futures::js_sys;

let handle = std::cell::OnceCell::new();
let mut cb = |resolve: js_sys::Function, _reject: js_sys::Function| {
handle
.set(
web_sys::window()
.unwrap()
.set_timeout_with_callback_and_timeout_and_arguments_0(
&resolve,
duration.as_millis() as i32,
)
.unwrap(),
)
.expect("timeout already set");
};

let inner = wasm_bindgen_futures::JsFuture::from(js_sys::Promise::new(&mut cb));
Self { handle, inner }
}
}

/// a web-friendly version of `tokio::time::sleep`
pub async fn sleep(duration: Duration) {
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
{
return tokio::time::sleep(duration).await;
}
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
{
Timeout::new(duration).await;
}
}

/// a web-friendly version of `tokio::time::sleep_until`
pub async fn sleep_until(deadline: Instant) {
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
{
return tokio::time::sleep_until(deadline.into()).await;
}
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
{
Timeout::new(deadline - Instant::now()).await;
}
}

0 comments on commit 922f810

Please sign in to comment.