Skip to content

Commit

Permalink
add code for 26
Browse files Browse the repository at this point in the history
  • Loading branch information
tyrchen committed Oct 28, 2021
1 parent 0489cc2 commit 6113ce2
Show file tree
Hide file tree
Showing 18 changed files with 1,527 additions and 0 deletions.
18 changes: 18 additions & 0 deletions 11_memory/misc/test.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

#include <stdio.h>

struct S1 {
u_int8_t a;
u_int16_t b;
u_int8_t c;
};

struct S2 {
u_int8_t a;
u_int8_t c;
u_int16_t b;
};

void main() {
printf("size of S1: %d, S2: %d", sizeof(struct S1), sizeof(struct S2));
}
24 changes: 24 additions & 0 deletions 26_kv/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "kv1"
version = "0.1.0"
edition = "2018"

[dependencies]
bytes = "1" # 高效处理网络 buffer 的库
dashmap = "4" # 并发 HashMap
http = "0.2" # 我们使用 HTTP status code 所以引入这个类型库
prost = "0.8" # 处理 protobuf 的代码
sled = "0.34" # sled db
thiserror = "1" # 错误定义和处理
tracing = "0.1" # 日志处理

[dev-dependencies]
anyhow = "1" # 错误处理
async-prost = "0.2.1" # 支持把 protobuf 封装成 TCP frame
futures = "0.3" # 提供 Stream trait
tempfile = "3" # 处理临时目录和临时文件
tokio = { version = "1", features = ["rt", "rt-multi-thread", "io-util", "macros", "net" ] } # 异步网络库
tracing-subscriber = "0.2" # 日志处理

[build-dependencies]
prost-build = "0.8" # 编译 protobuf
100 changes: 100 additions & 0 deletions 26_kv/abi.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
syntax = "proto3";

package abi;

// 来自客户端的命令请求
message CommandRequest {
oneof request_data {
Hget hget = 1;
Hgetall hgetall = 2;
Hmget hmget = 3;
Hset hset = 4;
Hmset hmset = 5;
Hdel hdel = 6;
Hmdel hmdel = 7;
Hexist hexist = 8;
Hmexist hmexist = 9;
}
}

// 服务器的响应
message CommandResponse {
// 状态码;复用 HTTP 2xx/4xx/5xx 状态码
uint32 status = 1;
// 如果不是 2xx,message 里包含详细的信息
string message = 2;
// 成功返回的 values
repeated Value values = 3;
// 成功返回的 kv pairs
repeated Kvpair pairs = 4;
}

// 从 table 中获取一个 key,返回 value
message Hget {
string table = 1;
string key = 2;
}

// 从 table 中获取所有的 Kvpair
message Hgetall { string table = 1; }

// 从 table 中获取一组 key,返回它们的 value
message Hmget {
string table = 1;
repeated string keys = 2;
}

// 返回的值
message Value {
oneof value {
string string = 1;
bytes binary = 2;
int64 integer = 3;
double float = 4;
bool bool = 5;
}
}

// 返回的 kvpair
message Kvpair {
string key = 1;
Value value = 2;
}

// 往 table 里存一个 kvpair,
// 如果 table 不存在就创建这个 table
message Hset {
string table = 1;
Kvpair pair = 2;
}

// 往 table 中存一组 kvpair,
// 如果 table 不存在就创建这个 table
message Hmset {
string table = 1;
repeated Kvpair pairs = 2;
}

// 从 table 中删除一个 key,返回它之前的值
message Hdel {
string table = 1;
string key = 2;
}

// 从 table 中删除一组 key,返回它们之前的值
message Hmdel {
string table = 1;
repeated string keys = 2;
}

// 查看 key 是否存在
message Hexist {
string table = 1;
string key = 2;
}

// 查看一组 key 是否存在
message Hmexist {
string table = 1;
repeated string keys = 2;
}
24 changes: 24 additions & 0 deletions 26_kv/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use std::process::Command;

fn main() {
let build_enabled = option_env!("BUILD_PROTO")
.map(|v| v == "1")
.unwrap_or(false);

if !build_enabled {
println!("=== Skipped compiling protos ===");
return;
}

let mut config = prost_build::Config::new();
config.bytes(&["."]);
config.type_attribute(".", "#[derive(PartialOrd)]");
config
.out_dir("src/pb")
.compile_protos(&["abi.proto"], &["."])
.unwrap();
Command::new("cargo")
.args(&["fmt", "--", "src/*.rs"])
.status()
.expect("cargo fmt failed");
}
30 changes: 30 additions & 0 deletions 26_kv/examples/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use anyhow::Result;
use async_prost::AsyncProstStream;
use futures::prelude::*;
use kv1::{CommandRequest, CommandResponse};
use tokio::net::TcpStream;
use tracing::info;

#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();

let addr = "127.0.0.1:9527";
// 连接服务器
let stream = TcpStream::connect(addr).await?;

// 使用 AsyncProstStream 来处理 TCP Frame
let mut client =
AsyncProstStream::<_, CommandResponse, CommandRequest, _>::from(stream).for_async();

// 生成一个 HSET 命令
let cmd = CommandRequest::new_hset("table1", "hello", "world".to_string().into());

// 发送 HSET 命令
client.send(cmd).await?;
if let Some(Ok(data)) = client.next().await {
info!("Got response {:?}", data);
}

Ok(())
}
32 changes: 32 additions & 0 deletions 26_kv/examples/dummy_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use anyhow::Result;
use async_prost::AsyncProstStream;
use futures::prelude::*;
use kv1::{CommandRequest, CommandResponse};
use tokio::net::TcpListener;
use tracing::info;

#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
let addr = "127.0.0.1:9527";
let listener = TcpListener::bind(addr).await?;
info!("Start listening on {}", addr);
loop {
let (stream, addr) = listener.accept().await?;
info!("Client {:?} connected", addr);
tokio::spawn(async move {
let mut stream =
AsyncProstStream::<_, CommandRequest, CommandResponse, _>::from(stream).for_async();
while let Some(Ok(msg)) = stream.next().await {
info!("Got a new command: {:?}", msg);
let resp = CommandResponse {
status: 404,
message: "Not found".to_string(),
..Default::default()
};
stream.send(resp).await.unwrap();
}
info!("Client {:?} disconnected", addr);
});
}
}
30 changes: 30 additions & 0 deletions 26_kv/examples/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use anyhow::Result;
use async_prost::AsyncProstStream;
use futures::prelude::*;
use kv1::{CommandRequest, CommandResponse, MemTable, Service, ServiceInner};
use tokio::net::TcpListener;
use tracing::info;

#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
let service: Service = ServiceInner::new(MemTable::new()).into();
let addr = "127.0.0.1:9527";
let listener = TcpListener::bind(addr).await?;
info!("Start listening on {}", addr);
loop {
let (stream, addr) = listener.accept().await?;
info!("Client {:?} connected", addr);
let svc = service.clone();
tokio::spawn(async move {
let mut stream =
AsyncProstStream::<_, CommandRequest, CommandResponse, _>::from(stream).for_async();
while let Some(Ok(cmd)) = stream.next().await {
info!("Got a new command: {:?}", cmd);
let res = svc.execute(cmd);
stream.send(res).await.unwrap();
}
info!("Client {:?} disconnected", addr);
});
}
}
35 changes: 35 additions & 0 deletions 26_kv/examples/server_with_sled.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use anyhow::Result;
use async_prost::AsyncProstStream;
use futures::prelude::*;
use kv1::{CommandRequest, CommandResponse, Service, ServiceInner, SledDb};
use tokio::net::TcpListener;
use tracing::info;

#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
let service: Service<SledDb> = ServiceInner::new(SledDb::new("/tmp/kvserver"))
.fn_before_send(|res| match res.message.as_ref() {
"" => res.message = "altered. Original message is empty.".into(),
s => res.message = format!("altered: {}", s),
})
.into();
let addr = "127.0.0.1:9527";
let listener = TcpListener::bind(addr).await?;
info!("Start listening on {}", addr);
loop {
let (stream, addr) = listener.accept().await?;
info!("Client {:?} connected", addr);
let svc = service.clone();
tokio::spawn(async move {
let mut stream =
AsyncProstStream::<_, CommandRequest, CommandResponse, _>::from(stream).for_async();
while let Some(Ok(cmd)) = stream.next().await {
info!("Got a new command: {:?}", cmd);
let res = svc.execute(cmd);
stream.send(res).await.unwrap();
}
info!("Client {:?} disconnected", addr);
});
}
}
25 changes: 25 additions & 0 deletions 26_kv/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use crate::Value;
use thiserror::Error;

#[derive(Error, Debug, PartialEq)]
pub enum KvError {
#[error("Not found for table: {0}, key: {1}")]
NotFound(String, String),

#[error("Command is invalid: `{0}`")]
InvalidCommand(String),
#[error("Cannot convert value {:0} to {1}")]
ConvertError(Value, &'static str),
#[error("Cannot process command {0} with table: {1}, key: {2}. Error: {}")]
StorageError(&'static str, String, String, String),

#[error("Failed to encode protobuf message")]
EncodeError(#[from] prost::EncodeError),
#[error("Failed to decode protobuf message")]
DecodeError(#[from] prost::DecodeError),
#[error("Failed to access sled db")]
SledError(#[from] sled::Error),

#[error("Internal error: {0}")]
Internal(String),
}
9 changes: 9 additions & 0 deletions 26_kv/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
mod error;
mod pb;
mod service;
mod storage;

pub use error::KvError;
pub use pb::abi::*;
pub use service::*;
pub use storage::*;
Loading

0 comments on commit 6113ce2

Please sign in to comment.