Skip to content

Commit

Permalink
Raft snapshot rework (#575)
Browse files Browse the repository at this point in the history
* reworking raft snapshot

* writing rocksdb indexes now when building from snapshot
  • Loading branch information
redixhumayun authored May 23, 2024
1 parent c55b2f4 commit 4b7ed70
Show file tree
Hide file tree
Showing 13 changed files with 466 additions and 141 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ indexify-extractor/**/*
rag/**/*
*.mp4
*.mp3
sqlite*
sqlite*
*.trace
4 changes: 4 additions & 0 deletions crates/indexify_internal_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,10 @@ impl StateChangeId {
pub fn to_key(&self) -> [u8; 8] {
self.0.to_be_bytes()
}

pub fn from_key(key: [u8; 8]) -> Self {
Self(u64::from_be_bytes(key))
}
}

impl From<StateChangeId> for u64 {
Expand Down
2 changes: 1 addition & 1 deletion src/blob_storage/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl DiskStorage {
#[tracing::instrument]
pub fn new(config: DiskStorageConfig) -> Result<Self, anyhow::Error> {
let tmp_path = format!("{}/tmp", config.path);
std::fs::create_dir_all(&tmp_path)?;
std::fs::create_dir_all(tmp_path)?;
Ok(Self { config })
}
}
Expand Down
25 changes: 12 additions & 13 deletions src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ impl Coordinator {
.get_coordinator_addr(leader_node_id)
.await?
.ok_or_else(|| anyhow::anyhow!("could not get leader node coordinator address"))?;
let leader_coord_addr = format!("http://{}", leader_coord_addr);
self.forwardable_coordinator
.register_ingestion_server(&leader_coord_addr, ingestion_server_id)
.await?;
Expand Down Expand Up @@ -493,7 +494,7 @@ impl Coordinator {
.tombstone_content_batch_with_version(
&[content_metadata.id.clone()],
vec![StateChangeProcessed {
state_change_id: change.id.clone(),
state_change_id: change.id,
processed_at: utils::timestamp_secs(),
}],
)
Expand Down Expand Up @@ -669,7 +670,7 @@ mod tests {

// Read the extraction graph back
let ret_graph =
shared_state.get_extraction_graphs_by_name(DEFAULT_TEST_NAMESPACE, &vec![eg.name])?;
shared_state.get_extraction_graphs_by_name(DEFAULT_TEST_NAMESPACE, &[eg.name])?;
assert!(ret_graph.first().unwrap().is_some());
assert_eq!(ret_graph.len(), 1);
Ok(())
Expand Down Expand Up @@ -945,7 +946,7 @@ mod tests {
let executor_id_1 = "test_executor_id_1";
let extractor_1 = mock_extractor();
coordinator
.register_executor("localhost:8956", &executor_id_1, vec![extractor_1.clone()])
.register_executor("localhost:8956", executor_id_1, vec![extractor_1.clone()])
.await?;
let eg = create_test_extraction_graph("eg_name_1", vec!["ep_policy_name_1"]);
coordinator.create_extraction_graph(eg.clone()).await?;
Expand All @@ -954,7 +955,7 @@ mod tests {
let mut extractor_2 = mock_extractor();
extractor_2.name = "MockExtractor2".to_string();
coordinator
.register_executor("localhost:8957", &executor_id_2, vec![extractor_2.clone()])
.register_executor("localhost:8957", executor_id_2, vec![extractor_2.clone()])
.await?;

// Create an extraction graph with two levels of policies
Expand Down Expand Up @@ -995,9 +996,7 @@ mod tests {

//// check that tasks have been created for the second level for the second
//// extractor
let tasks = shared_state
.tasks_for_executor(&executor_id_2, None)
.await?;
let tasks = shared_state.tasks_for_executor(executor_id_2, None).await?;
assert_eq!(tasks.len(), 4);

//// Create the final child
Expand Down Expand Up @@ -1278,7 +1277,7 @@ mod tests {
) -> Result<internal_api::ContentMetadata, anyhow::Error> {
let mut content = test_mock_content_metadata(
id,
&task.content_metadata.get_root_id(),
task.content_metadata.get_root_id(),
&task.content_metadata.extraction_graph_names[0],
);
content.parent_id = Some(task.content_metadata.id.clone());
Expand Down Expand Up @@ -1436,7 +1435,7 @@ mod tests {
"test_extraction_policy_5",
"test_extraction_policy_6",
],
&vec![Root, Child(0), Child(0), Child(1), Child(3), Child(3)],
&[Root, Child(0), Child(0), Child(1), Child(3), Child(3)],
);
coordinator.create_extraction_graph(eg.clone()).await?;
coordinator.run_scheduler().await?;
Expand Down Expand Up @@ -1479,7 +1478,7 @@ mod tests {
1,
))?;
assert_eq!(prev_tree.len(), 7); // root + 6 children
assert_eq!(prev_tree[0].latest, false);
assert!(!prev_tree[0].latest);

// replace all elements in the tree, should have two trees with 7 elements each
perform_all_tasks(&coordinator, "test_executor_id_1", &mut child_id).await?;
Expand All @@ -1499,7 +1498,7 @@ mod tests {
1,
))?;
assert_eq!(prev_tree.len(), 7);
assert_eq!(prev_tree[0].latest, false);
assert!(!prev_tree[0].latest);

// the previous tree should be deleted after all tasks for new root are complete
assert!(prev_tree.iter().all(|c| c.tombstoned));
Expand Down Expand Up @@ -1582,7 +1581,7 @@ mod tests {
))?;
// all elements should be transferred to the new root
assert_eq!(prev_tree.len(), 1);
assert_eq!(prev_tree[0].latest, false);
assert!(!prev_tree[0].latest);

coordinator.run_scheduler().await?;
// the previous tree should be tombstoned after all tasks for new root are
Expand Down Expand Up @@ -1693,7 +1692,7 @@ mod tests {
// elements after and including the identical child should be transferred to the
// new root
assert_eq!(prev_tree.len(), 4);
assert_eq!(prev_tree[0].latest, false);
assert!(!prev_tree[0].latest);

coordinator.run_scheduler().await?;
// the previous tree should be tombstoned after all tasks for new root are
Expand Down
5 changes: 2 additions & 3 deletions src/data_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,8 @@ impl DataManager {
}

// Create metadata table for the namespace if it doesn't exist
let _ = self
.metadata_index_manager
.create_metadata_table(&namespace)
self.metadata_index_manager
.create_metadata_table(namespace)
.await?;
}
let req = indexify_coordinator::UpdateIndexesStateRequest {
Expand Down
3 changes: 1 addition & 2 deletions src/ingest_extracted_content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,8 +571,7 @@ mod tests {
.shared_state
.get_state_change_watcher()
.borrow_and_update()
.id
.clone();
.id;

self.coordinator
.shared_state
Expand Down
6 changes: 4 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use opentelemetry_sdk::{
Resource,
};
use rustls::crypto::CryptoProvider;
use tracing_core::{Level, LevelFilter};
use tracing_subscriber::{prelude::__tracing_subscriber_SubscriberExt, Layer};

pub mod coordinator_filters;
Expand Down Expand Up @@ -153,10 +152,13 @@ fn setup_tracing(trace_type: &str) -> Result<()> {
}

fn setup_fmt_tracing() {
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"));
println!("Running with tracing filter {}", env_filter);
let subscriber = tracing_subscriber::Registry::default().with(
tracing_subscriber::fmt::layer()
.with_writer(std::io::stderr)
.with_filter(LevelFilter::from_level(Level::INFO)),
.with_filter(env_filter),
);
if let Err(e) = tracing::subscriber::set_global_default(subscriber) {
eprintln!("failed to set global default subscriber: {}", e);
Expand Down
10 changes: 10 additions & 0 deletions src/state/grpc_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
pub struct GrpcConfig {}

impl GrpcConfig {
/// The hard limit of maximum message size the client or server can
/// **receive**.
pub const MAX_DECODING_SIZE: usize = 16 * 1024 * 1024;
/// The hard limit of maximum message size the client or server can
/// **send**.
pub const MAX_ENCODING_SIZE: usize = 16 * 1024 * 1024;
}
18 changes: 13 additions & 5 deletions src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,12 @@ use crate::{
raft_metrics::{self, network::MetricsSnapshot},
},
server_config::ServerConfig,
state::{raft_client::RaftClient, store::new_storage},
state::{grpc_config::GrpcConfig, raft_client::RaftClient, store::new_storage},
utils::timestamp_secs,
};

pub mod forwardable_raft;
pub mod grpc_config;
pub mod grpc_server;
pub mod network;
pub mod raft_client;
Expand Down Expand Up @@ -144,6 +145,7 @@ pub struct App {
#[derive(Clone)]
pub struct RaftConfigOverrides {
snapshot_policy: Option<openraft::SnapshotPolicy>,
max_in_snapshot_log_to_keep: Option<u64>,
}

fn add_update_entry(
Expand Down Expand Up @@ -179,6 +181,8 @@ impl App {
election_timeout_min: 1500,
election_timeout_max: 3000,
enable_heartbeat: true,
install_snapshot_timeout: 2000,
snapshot_max_chunk_size: 4194304, // 4MB
..Default::default()
};

Expand All @@ -187,6 +191,9 @@ impl App {
if let Some(snapshot_policy) = overrides.snapshot_policy {
raft_config.snapshot_policy = snapshot_policy;
}
if let Some(max_in_snapshot_log_to_keep) = overrides.max_in_snapshot_log_to_keep {
raft_config.max_in_snapshot_log_to_keep = max_in_snapshot_log_to_keep;
}
}

let config = Arc::new(
Expand Down Expand Up @@ -244,7 +251,9 @@ impl App {
Arc::clone(&raft_client),
addr.to_string(),
server_config.coordinator_addr.clone(),
));
))
.max_encoding_message_size(GrpcConfig::MAX_ENCODING_SIZE)
.max_decoding_message_size(GrpcConfig::MAX_DECODING_SIZE);
let (leader_change_tx, leader_change_rx) = watch::channel::<bool>(false);

let metrics = Metrics::new(state_machine.clone());
Expand Down Expand Up @@ -815,10 +824,9 @@ impl App {
let ns = &content_metadata.first().unwrap().namespace.clone();
let extraction_graph_names = &content_metadata
.iter()
.map(|c| c.extraction_graph_names.clone())
.flatten()
.flat_map(|c| c.extraction_graph_names.clone())
.collect_vec();
let extraction_graphs = self.get_extraction_graphs_by_name(&ns, &extraction_graph_names)?;
let extraction_graphs = self.get_extraction_graphs_by_name(ns, extraction_graph_names)?;
for (eg, extraction_graph_names) in
extraction_graphs.into_iter().zip(extraction_graph_names)
{
Expand Down
6 changes: 4 additions & 2 deletions src/state/raft_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::sync::Mutex;
use tonic::transport::Channel;
use tracing::info;

use crate::metrics::raft_metrics;
use crate::{metrics::raft_metrics, state::grpc_config::GrpcConfig};

pub struct RaftClient {
clients: Arc<Mutex<HashMap<String, RaftApiClient<Channel>>>>,
Expand Down Expand Up @@ -38,7 +38,9 @@ impl RaftClient {
.map_err(|e| {
raft_metrics::network::incr_fail_connect_to_peer(addr);
anyhow!("unable to connect to raft: {} at addr {}", e, addr)
})?;
})?
.max_encoding_message_size(GrpcConfig::MAX_ENCODING_SIZE)
.max_decoding_message_size(GrpcConfig::MAX_DECODING_SIZE);
clients.insert(addr.to_string(), client.clone());
Ok(client)
}
Expand Down
Loading

0 comments on commit 4b7ed70

Please sign in to comment.