Skip to content

Commit

Permalink
storage: Add save() command
Browse files Browse the repository at this point in the history
  • Loading branch information
XuShaohua committed Jun 17, 2024
1 parent 892cca1 commit e9f429b
Show file tree
Hide file tree
Showing 12 changed files with 127 additions and 35 deletions.
14 changes: 11 additions & 3 deletions src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,24 @@ impl TryFrom<Frame> for Command {
if command.is_none() {
command = GenericCommand::parse(&cmd_name, &mut parser)?;
}

// Parse stack commands.
if command.is_none() {
command = BloomFilterCommand::parse(&cmd_name, &mut parser)?;
}

// Parse management commands.
if command.is_none() {
command = ClusterManagementCommand::parse(&cmd_name, &mut parser)?;
}
if command.is_none() {
command = ConnectManagementCommand::parse(&cmd_name, &mut parser)?;
}
if command.is_none() {
command = ServerManagementCommand::parse(&cmd_name, &mut parser)?;
}

// Parse stack commands.
if command.is_none() {
command = BloomFilterCommand::parse(&cmd_name, &mut parser)?;
command = StorageManagementCommand::parse(&cmd_name, &mut parser)?;
}

if command.is_none() {
Expand Down
29 changes: 29 additions & 0 deletions src/dispatcher/cluster.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) 2024 Xu Shaohua <[email protected]>. All rights reserved.
// Use of this source is governed by GNU Affero General Public License
// that can be found in the LICENSE file.

use stdext::function_name;

use crate::commands::{ClusterToDispatcherCmd, DispatcherToListenerCmd};
use crate::dispatcher::Dispatcher;
use crate::error::{Error, ErrorKind};

impl Dispatcher {
pub(super) fn handle_cluster_cmd(&mut self, cmd: ClusterToDispatcherCmd) -> Result<(), Error> {
// Send command to listener.
log::debug!(
"{}, proxy cmd from cluster to listener, cmd: {cmd:?}",
function_name!()
);
let listener_id = cmd.session_group.listener_id();
if let Some(listener_sender) = self.listener_senders.get(&listener_id) {
let cmd = DispatcherToListenerCmd::Reply(cmd.session_group, cmd.reply_frame);
Ok(listener_sender.send(cmd)?)
} else {
Err(Error::from_string(
ErrorKind::ChannelError,
format!("Failed to find listener with id: {listener_id}"),
))
}
}
}
22 changes: 19 additions & 3 deletions src/dispatcher/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ use stdext::function_name;

use crate::cmd::Command;
use crate::commands::{
DispatcherToClusterCmd, DispatcherToMemCmd, DispatcherToServerCmd, DispatcherToStorageCmd,
ListenerToDispatcherCmd,
DispatcherToClusterCmd, DispatcherToListenerCmd, DispatcherToMemCmd, DispatcherToServerCmd,
DispatcherToStorageCmd, ListenerToDispatcherCmd,
};
use crate::dispatcher::Dispatcher;
use crate::error::Error;
use crate::error::{Error, ErrorKind};
use crate::listener::types::ListenerId;

impl Dispatcher {
pub(super) async fn handle_listener_cmd(
Expand Down Expand Up @@ -72,4 +73,19 @@ impl Dispatcher {
},
}
}

pub(super) fn send_cmd_to_listener(
&mut self,
listener_id: ListenerId,
cmd: DispatcherToListenerCmd,
) -> Result<(), Error> {
if let Some(listener_sender) = self.listener_senders.get(&listener_id) {
Ok(listener_sender.send(cmd)?)
} else {
Err(Error::from_string(
ErrorKind::ChannelError,
format!("Failed to find listener with id: {listener_id}"),
))
}
}
}
13 changes: 3 additions & 10 deletions src/dispatcher/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use stdext::function_name;

use crate::commands::{DispatcherToListenerCmd, MemToDispatcherCmd};
use crate::dispatcher::Dispatcher;
use crate::error::{Error, ErrorKind};
use crate::error::Error;

impl Dispatcher {
pub(super) fn handle_mem_cmd(&mut self, cmd: MemToDispatcherCmd) -> Result<(), Error> {
Expand All @@ -16,14 +16,7 @@ impl Dispatcher {
function_name!()
);
let listener_id = cmd.session_group.listener_id();
if let Some(listener_sender) = self.listener_senders.get(&listener_id) {
let cmd = DispatcherToListenerCmd::Reply(cmd.session_group, cmd.reply_frame);
Ok(listener_sender.send(cmd)?)
} else {
Err(Error::from_string(
ErrorKind::ChannelError,
format!("Failed to find listener with id: {listener_id}"),
))
}
let cmd = DispatcherToListenerCmd::Reply(cmd.session_group, cmd.reply_frame);
self.send_cmd_to_listener(listener_id, cmd)
}
}
1 change: 1 addition & 0 deletions src/dispatcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::commands::{
};
use crate::listener::types::ListenerId;

mod cluster;
mod listener;
mod mem;
mod run;
Expand Down
9 changes: 8 additions & 1 deletion src/dispatcher/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,20 @@ impl Dispatcher {
log::warn!("[dispatcher] Failed to handle mem cmd, got err: {err:?}");
}
}
Some(cmd) = self.cluster_receiver.recv() => {
if let Err(err) = self.handle_cluster_cmd(cmd) {
log::warn!("[dispatcher] Failed to handle cluster cmd, got err: {err:?}");
}
}
Some(cmd) = self.server_receiver.recv() => {
if let Err(err) = self.handle_server_cmd(cmd) {
log::warn!("[dispatcher] Failed to handle server cmd, got err: {err:?}");
}
}
Some(cmd) = self.storage_receiver.recv() => {
self.handle_storage_cmd(cmd).await;
if let Err(err) = self.handle_storage_cmd(cmd) {
log::warn!("[dispatcher] Failed to handle storage cmd, got err: {err:?}");
}
}
}
}
Expand Down
13 changes: 3 additions & 10 deletions src/dispatcher/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use stdext::function_name;

use crate::commands::{DispatcherToListenerCmd, ServerToDispatcherCmd};
use crate::dispatcher::Dispatcher;
use crate::error::{Error, ErrorKind};
use crate::error::Error;

impl Dispatcher {
pub(super) fn handle_server_cmd(&mut self, cmd: ServerToDispatcherCmd) -> Result<(), Error> {
Expand All @@ -16,14 +16,7 @@ impl Dispatcher {
function_name!()
);
let listener_id = cmd.session_group.listener_id();
if let Some(listener_sender) = self.listener_senders.get(&listener_id) {
let cmd = DispatcherToListenerCmd::Reply(cmd.session_group, cmd.reply_frame);
Ok(listener_sender.send(cmd)?)
} else {
Err(Error::from_string(
ErrorKind::ChannelError,
format!("Failed to find listener with id: {listener_id}"),
))
}
let cmd = DispatcherToListenerCmd::Reply(cmd.session_group, cmd.reply_frame);
self.send_cmd_to_listener(listener_id, cmd)
}
}
15 changes: 11 additions & 4 deletions src/dispatcher/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@

use stdext::function_name;

use crate::commands::StorageToDispatcherCmd;
use crate::commands::{DispatcherToListenerCmd, StorageToDispatcherCmd};
use crate::dispatcher::Dispatcher;
use crate::error::Error;

impl Dispatcher {
#[allow(clippy::unused_async)]
pub(super) async fn handle_storage_cmd(&mut self, cmd: StorageToDispatcherCmd) {
log::debug!("{} cmd: {cmd:?}", function_name!());
pub(super) fn handle_storage_cmd(&mut self, cmd: StorageToDispatcherCmd) -> Result<(), Error> {
// Send command to listener.
log::debug!(
"{}, proxy cmd from storage to listener, cmd: {cmd:?}",
function_name!()
);
let listener_id = cmd.session_group.listener_id();
let cmd = DispatcherToListenerCmd::Reply(cmd.session_group, cmd.reply_frame);
self.send_cmd_to_listener(listener_id, cmd)
}
}
5 changes: 5 additions & 0 deletions src/storage/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Copyright (c) 2024 Xu Shaohua <[email protected]>. All rights reserved.
// Use of this source is governed by GNU Affero General Public License
// that can be found in the LICENSE file.

pub mod save;
22 changes: 22 additions & 0 deletions src/storage/commands/save.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) 2024 Xu Shaohua <[email protected]>. All rights reserved.
// Use of this source is governed by GNU Affero General Public License
// that can be found in the LICENSE file.

use crate::cmd::reply_frame::ReplyFrame;

/// The `SAVE` commands performs a synchronous save of the dataset producing a point in time snapshot
/// of all the data inside the server instance, in the form of an RDB file.
///
/// You almost never want to call `SAVE` in production environments where it will
/// block all the other clients. Instead, the `BGSAVE` is usually used.
/// However, in case of issues preventing Redis to create the background saving child
/// (for instance errors in the fork(2) system call), the `SAVE` command can be a good last resort
/// to perform the dump of the latest dataset.
///
/// Reply:
/// - Simple string reply: OK.
pub fn save() -> ReplyFrame {
// TODO(Shaohua): Save to RDB file.
log::info!("[storage] TODO: save to RDB file.");
ReplyFrame::ok()
}
18 changes: 14 additions & 4 deletions src/storage/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
// Use of this source is governed by GNU Affero General Public License
// that can be found in the LICENSE file.

use stdext::function_name;

use crate::commands::DispatcherToStorageCmd;
use crate::cmd::storage_mgmt::StorageManagementCommand;
use crate::commands::{DispatcherToStorageCmd, StorageToDispatcherCmd};
use crate::error::Error;
use crate::storage::commands::save;
use crate::storage::Storage;

impl Storage {
Expand All @@ -14,7 +14,17 @@ impl Storage {
&mut self,
cmd: DispatcherToStorageCmd,
) -> Result<(), Error> {
log::debug!("{} cmd: {cmd:?}", function_name!());
let session_group = cmd.session_group;

let reply_frame = match cmd.command {
StorageManagementCommand::Save => save::save(),
};

let msg = StorageToDispatcherCmd {
session_group,
reply_frame,
};
self.dispatcher_sender.send(msg).await?;
Ok(())
}
}
1 change: 1 addition & 0 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use tokio::sync::mpsc::{Receiver, Sender};

use crate::commands::{DispatcherToStorageCmd, StorageToDispatcherCmd};

mod commands;
mod dispatcher;
mod run;

Expand Down

0 comments on commit e9f429b

Please sign in to comment.