Skip to content

Commit

Permalink
Don't hold mutex across await points
Browse files Browse the repository at this point in the history
  • Loading branch information
kalabukdima committed Aug 16, 2024
1 parent 31c361c commit 3e30849
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 24 deletions.
12 changes: 6 additions & 6 deletions src/network/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration};

use contract_client::PeerId;
use futures::{Stream, StreamExt};
use parking_lot::{Mutex, RwLock};
use parking_lot::Mutex;
use subsquid_messages::{data_chunk::DataChunk, query_result, Ping, Query, QueryResult};
use subsquid_network_transport::{
GatewayConfig, GatewayEvent, GatewayTransportHandle, P2PTransportBuilder, QueueFull,
Expand Down Expand Up @@ -31,7 +31,7 @@ pub struct NetworkClient {
network_state: Mutex<NetworkState>,
contract_client: Box<dyn contract_client::Client>,
tasks: Mutex<HashMap<QueryId, QueryTask>>,
dataset_storage: RwLock<StorageClient>,
dataset_storage: StorageClient,
dataset_update_interval: Duration,
chain_update_interval: Duration,
}
Expand Down Expand Up @@ -62,7 +62,7 @@ impl NetworkClient {
network_state: Mutex::new(NetworkState::new(config)),
contract_client,
tasks: Mutex::new(HashMap::new()),
dataset_storage: RwLock::new(dataset_storage),
dataset_storage,
})
}

Expand All @@ -84,7 +84,7 @@ impl NetworkClient {
break;
}
}
self.dataset_storage.write().update().await;
self.dataset_storage.update().await;
}
}

Expand Down Expand Up @@ -144,11 +144,11 @@ impl NetworkClient {
}

pub fn find_chunk(&self, dataset: &DatasetId, block: u64) -> Option<DataChunk> {
self.dataset_storage.read().find_chunk(dataset, block)
self.dataset_storage.find_chunk(dataset, block)
}

pub fn next_chunk(&self, dataset: &DatasetId, chunk: &DataChunk) -> Option<DataChunk> {
self.dataset_storage.read().next_chunk(dataset, chunk)
self.dataset_storage.next_chunk(dataset, chunk)
}

pub fn find_worker(&self, dataset: &DatasetId, block: u32) -> Option<PeerId> {
Expand Down
38 changes: 20 additions & 18 deletions src/network/storage.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::{collections::HashMap, str::FromStr};

use aws_sdk_s3 as s3;
use parking_lot::RwLock;
use subsquid_datasets::DatasetStorage;
use subsquid_messages::data_chunk::DataChunk;
use tokio::sync::Mutex;

use crate::types::DatasetId;

Expand Down Expand Up @@ -35,9 +37,9 @@ impl StorageClient {
Ok(Self { datasets })
}

pub async fn update(&mut self) {
pub async fn update(&self) {
tracing::info!("Updating known chunks");
futures::future::join_all(self.datasets.iter_mut().map(|(id, dataset)| async move {
futures::future::join_all(self.datasets.iter().map(|(id, dataset)| async move {
let result = dataset.update().await;
if let Err(e) = result {
tracing::warn!("Failed to update dataset {}: {:?}", id, e);
Expand All @@ -47,55 +49,55 @@ impl StorageClient {
}

pub fn find_chunk(&self, dataset: &DatasetId, block: u64) -> Option<DataChunk> {
self.datasets.get(dataset)?.find(block).cloned()
self.datasets.get(dataset)?.find(block)
}

pub fn next_chunk(&self, dataset: &DatasetId, chunk: &DataChunk) -> Option<DataChunk> {
self.datasets
.get(dataset)?
.find(chunk.last_block() as u64 + 1)
.cloned()
}
}

struct Dataset {
storage: DatasetStorage,
chunks: Vec<DataChunk>,
storage: Mutex<DatasetStorage>,
chunks: RwLock<Vec<DataChunk>>,
}

impl Dataset {
fn new(bucket: &str, s3_client: aws_sdk_s3::Client) -> anyhow::Result<Self> {
let storage = DatasetStorage::new(bucket, s3_client);
Ok(Self {
chunks: Vec::new(),
storage,
chunks: RwLock::new(Vec::new()),
storage: Mutex::new(storage),
})
}

async fn update(&mut self) -> anyhow::Result<()> {
let new_chunks = self.storage.list_all_new_chunks().await?;
async fn update(&self) -> anyhow::Result<()> {
// TODO: move synchronization inside the list_all_new_chunks method
let new_chunks = self.storage.lock().await.list_all_new_chunks().await?;
if !new_chunks.is_empty() {
tracing::info!(
"Found {} new chunks for dataset {}",
new_chunks.len(),
self.storage.bucket
self.storage.lock().await.bucket
);
}
let mut chunks = self.chunks.write();
for chunk in new_chunks {
let chunk = DataChunk::from_str(&chunk.chunk_str)
.unwrap_or_else(|_| panic!("Failed to parse chunk: {}", chunk.chunk_str));
self.chunks.push(chunk);
chunks.push(chunk);
}
Ok(())
}

fn find(&self, block: u64) -> Option<&DataChunk> {
if block < self.chunks.first()?.first_block() as u64 {
fn find(&self, block: u64) -> Option<DataChunk> {
let chunks = self.chunks.read();
if block < chunks.first()?.first_block() as u64 {
return None;
}
let first_suspect = self
.chunks
.partition_point(|chunk| (chunk.last_block() as u64) < block);
(first_suspect < self.chunks.len()).then(|| &self.chunks[first_suspect])
let first_suspect = chunks.partition_point(|chunk| (chunk.last_block() as u64) < block);
(first_suspect < chunks.len()).then(|| chunks[first_suspect].clone())
}
}

0 comments on commit 3e30849

Please sign in to comment.