Skip to content

Commit

Permalink
fix: don't block startup on subscribe (#359)
Browse files Browse the repository at this point in the history
* fix: don't block startup on subscribe

* chore: comment
  • Loading branch information
chris13524 authored Feb 12, 2024
1 parent 7661ce1 commit 459725e
Showing 1 changed file with 24 additions and 39 deletions.
63 changes: 24 additions & 39 deletions src/services/relay_renewal_job/refresh_topic_subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use {
},
futures_util::{StreamExt, TryFutureExt, TryStreamExt},
relay_client::http::Client,
relay_rpc::{domain::Topic, rpc::MAX_SUBSCRIPTION_BATCH_SIZE},
relay_rpc::domain::Topic,
sqlx::PgPool,
std::{sync::Arc, time::Instant},
tokio::sync::Mutex,
Expand Down Expand Up @@ -46,36 +46,13 @@ pub async fn run(
let topics_count = topics.len();
info!("topics_count: {topics_count}");

// Collect each batch into its own vec, since `batch_subscribe` would convert
// them anyway.
let topic_batches = topics
.chunks(MAX_SUBSCRIPTION_BATCH_SIZE)
.map(|chunk| chunk.to_vec())
.collect::<Vec<_>>();

// Limit concurrency to avoid overwhelming the relay with requests.
const REQUEST_CONCURRENCY: usize = 200;

futures_util::stream::iter(topic_batches)
.map(|topic_batch| {
// Map result to an unsized type to avoid allocation when collecting,
// as we don't care about subscription IDs.
client.batch_subscribe_blocking(topic_batch).map_ok(|_| ())
})
.buffer_unordered(REQUEST_CONCURRENCY)
.try_collect::<Vec<_>>()
.await?;

let elapsed: u64 = start
.elapsed()
.as_millis()
.try_into()
.expect("No error getting ms of elapsed time");
info!("resubscribe took {elapsed}ms");
const REQUEST_CONCURRENCY: usize = 25;

// If operation already running, don't start another one
let mut operation_running = renew_all_topics_lock.lock().await;
if !*operation_running {
info!("Starting renew operation");
*operation_running = true;
// Renew all subscription TTLs.
// This can take a long time (e.g. 2 hours), so cannot block server startup.
Expand All @@ -86,7 +63,9 @@ pub async fn run(
async move {
let client = &client;
let metrics = metrics.as_ref();
let start = Instant::now();

// Using `batch_subscription` was removed in https://github.com/WalletConnect/notify-server/pull/359
// We can't really use this right now because we are also extending the topic TTL which could take longer than the 5m TTL
let result = futures_util::stream::iter(topics)
.map(|topic| async move {
// Subscribe a second time as the initial subscription above may have expired
Expand All @@ -101,7 +80,7 @@ pub async fn run(
})
// Above we want to resubscribe as quickly as possible so use a high concurrency value
// But here we prefer stability and are OK with a lower value
.buffer_unordered(25)
.buffer_unordered(REQUEST_CONCURRENCY)
.try_collect::<Vec<_>>()
.await;
let elapsed: u64 = start.elapsed().as_millis().try_into().unwrap();
Expand All @@ -114,19 +93,25 @@ pub async fn run(
info!("Success renewing all topic subscriptions in {elapsed}ms");
}
*renew_all_topics_lock.lock().await = false;

if let Some(metrics) = metrics {
let ctx = Context::current();
metrics.subscribed_project_topics.observe(
&ctx,
project_topics_count as u64,
&[],
);
metrics.subscribed_subscriber_topics.observe(
&ctx,
subscriber_topics_count as u64,
&[],
);
metrics.subscribe_latency.record(&ctx, elapsed, &[]);
}
}
});
}

if let Some(metrics) = metrics {
let ctx = Context::current();
metrics
.subscribed_project_topics
.observe(&ctx, project_topics_count as u64, &[]);
metrics
.subscribed_subscriber_topics
.observe(&ctx, subscriber_topics_count as u64, &[]);
metrics.subscribe_latency.record(&ctx, elapsed, &[]);
} else {
info!("Renew operation already running");
}

Ok(())
Expand Down

0 comments on commit 459725e

Please sign in to comment.