Skip to content

Commit

Permalink
租约实现
Browse files Browse the repository at this point in the history
  • Loading branch information
wubenqi committed May 18, 2017
1 parent 9cbed6b commit fd751f7
Show file tree
Hide file tree
Showing 21 changed files with 1,013 additions and 451 deletions.
13 changes: 13 additions & 0 deletions client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,16 @@ void Client::DoCommandLineLoop() {
int main(int argc, char* argv[]) {
return nebula::DoMain<Client>(argc, argv);
}

void DebugTest() {
zproto::GetRouteTableReq get_route_table_req;
std::string o;
get_route_table_req.SerializeToString(&o);
std::cout << o.length() << ", " << get_route_table_req.Utf8DebugString() << std::endl;

get_route_table_req.Clear();
bool rv = get_route_table_req.ParseFromArray(o.c_str(), o.length());
if (!rv) std::cout << "error!!!" << std::endl;
std::cout << o.length() << ", " << get_route_table_req.Utf8DebugString() << std::endl;
}

6 changes: 4 additions & 2 deletions client/client_command_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ int DoFetchNextSeq(const std::vector<folly::StringPiece>& command_lines) {

zproto::FetchNextSequenceReq fetch_next_sequence_req;
fetch_next_sequence_req.set_id(id);

fetch_next_sequence_req.set_version(0);

ZRpcClientCall<zproto::SequenceRsp>("alloc_client",
MakeRpcRequest(fetch_next_sequence_req),
[] (std::shared_ptr<ApiRpcOk<zproto::SequenceRsp>> seq_rsp,
Expand All @@ -72,7 +73,8 @@ int DoGetCurrentSeq(const std::vector<folly::StringPiece>& command_lines) {

zproto::GetCurrentSequenceReq get_current_sequence_req;
get_current_sequence_req.set_id(id);

get_current_sequence_req.set_version(0);

ZRpcClientCall<zproto::SequenceRsp>("alloc_client",
MakeRpcRequest(get_current_sequence_req),
[] (std::shared_ptr<ApiRpcOk<zproto::SequenceRsp>> seq_rsp,
Expand Down
5 changes: 2 additions & 3 deletions seqsvr/allocsvr/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ set (SRC_LIST
../base/set.h
../base/router_table.cc
../base/router_table.h
lease_clerk.cc
lease_clerk.h
allocsvr_manager.cc
allocsvr_manager.h
alloc_server.cc
Expand All @@ -36,9 +38,6 @@ set (SRC_LIST
../proto/cc/seqsvr.pb.h
../proto/proto/seqsvr.proto
../base/message_handler_util.h

../storesvr/storesvr_manager.cc
../storesvr/storesvr_manager.h
)

add_executable (allocsvr ${SRC_LIST})
Expand Down
19 changes: 14 additions & 5 deletions seqsvr/allocsvr/alloc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,27 @@ bool AllocServer::Run() {
return -1;
}

// GPerftoolsProfiler profiler;
// profiler.ProfilerStart();
AllocSvrManager::GetInstance()->Initialize(1, 1);
auto alloc_manager = AllocSvrManager::GetInstance();

// auto store_instance = StoreSvrManager::GetInstance();
// store_instance->Initialize(1, "/tmp/seq.dat");
main_eb_.runAfterDelay([&] {
auto alloc_service = net_engine_manager->Lookup("alloc_server");
auto alloc_name = std::string("alloc_server:10000"); // + folly::to<std::string>(alloc_service->GetServiceConfig().port);

// GPerftoolsProfiler profiler;
// profiler.ProfilerStart();
alloc_manager->Initialize(timer_manager_.get(), "set_cluster_1", alloc_name);

// auto store_instance = StoreSvrManager::GetInstance();
// store_instance->Initialize(1, "/tmp/seq.dat");
}, 1000);

BaseDaemon::Run();

// profiler.ProfilerStop();

net_engine_manager->Stop();
alloc_manager->Destroy();

return true;
}

Expand Down
42 changes: 24 additions & 18 deletions seqsvr/allocsvr/alloc_service_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,41 @@
#include "allocsvr/allocsvr_manager.h"

int AllocServiceImpl::FetchNextSequence(const zproto::FetchNextSequenceReq& request, zproto::SequenceRsp* response) {
auto seq = AllocSvrManager::GetInstance()->FetchNextSequence(request.id());
response->set_sequence(seq);
SequenceWithRouterTable seq_with_router;
AllocSvrManager::GetInstance()->FetchNextSequence(request.id(), request.version(), seq_with_router);
response->set_sequence(seq_with_router.seq);
if (seq_with_router.router) response->set_allocated_router(seq_with_router.Release());

return 0;
}

int AllocServiceImpl::GetCurrentSequence(const zproto::GetCurrentSequenceReq& request, zproto::SequenceRsp* response) {
auto seq = AllocSvrManager::GetInstance()->GetCurrentSequence(request.id());
response->set_sequence(seq);
SequenceWithRouterTable seq_with_router;
AllocSvrManager::GetInstance()->GetCurrentSequence(request.id(), request.version(), seq_with_router);
response->set_sequence(seq_with_router.seq);
if (seq_with_router.router) response->set_allocated_router(seq_with_router.Release());

return 0;
}

int AllocServiceImpl::FetchNextSequenceList(const zproto::FetchNextSequenceListReq& request, zproto::SequenceListRsp* response) {
auto alloc_mgr = AllocSvrManager::GetInstance();
for (int i=0; i<request.id_list_size(); ++i) {
auto seq = alloc_mgr->FetchNextSequence(request.id_list(i));
auto id_seq = response->add_sequence_list();
id_seq->set_id(request.id_list(i));
id_seq->set_sequence(seq);
}
// auto alloc_mgr = AllocSvrManager::GetInstance();
// for (int i=0; i<request.id_list_size(); ++i) {
// auto seq = alloc_mgr->FetchNextSequence(request.id_list(i));
// auto id_seq = response->add_sequence_list();
// id_seq->set_id(request.id_list(i));
// id_seq->set_sequence(seq);
// }
return 0;
}

int AllocServiceImpl::GetCurrentSequenceList(const zproto::GetCurrentSequenceListReq& request, zproto::SequenceListRsp* response) {
auto alloc_mgr = AllocSvrManager::GetInstance();
for (int i=0; i<request.id_list_size(); ++i) {
auto seq = alloc_mgr->GetCurrentSequence(request.id_list(i));
auto id_seq = response->add_sequence_list();
id_seq->set_id(request.id_list(i));
id_seq->set_sequence(seq);
}
// auto alloc_mgr = AllocSvrManager::GetInstance();
// for (int i=0; i<request.id_list_size(); ++i) {
// auto seq = alloc_mgr->GetCurrentSequence(request.id_list(i));
// auto id_seq = response->add_sequence_list();
// id_seq->set_id(request.id_list(i));
// id_seq->set_sequence(seq);
// }
return 0;
}
191 changes: 136 additions & 55 deletions seqsvr/allocsvr/allocsvr_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,59 +37,139 @@ std::shared_ptr<AllocSvrManager> AllocSvrManager::GetInstance() {
return g_allocsvr_manager.try_get();
}

void AllocSvrManager::Initialize(uint32_t set_id, uint32_t alloc_id) {
void AllocSvrManager::Initialize(nebula::TimerManager* timer_manager, const std::string& set_name, const std::string& alloc_name) {
// 首先加载路由表
// 加载成功后再加载max_seq
set_name_ = set_name;
alloc_name_ = alloc_name;

lease_ = std::make_unique<LeaseClerk>(timer_manager, this);
lease_->Start();

/*
// 1. 初始化set_id_和alloc_id_
set_id_ = set_id;
alloc_id_ = alloc_id;
Load(set_id, alloc_id);
*/
}

uint64_t AllocSvrManager::GetCurrentSequence(uint32_t id) {
#ifdef DEBUG_TEST
DCHECK(id<kMaxIDSize);
#endif
std::lock_guard<std::mutex> g(mutex_);
return cur_seqs_[id];
void AllocSvrManager::Destroy() {
if (lease_) {
lease_->Stop();
}
}

uint64_t AllocSvrManager::FetchNextSequence(uint32_t id) {
#ifdef DEBUG_TEST
DCHECK(id<kMaxIDSize);
#endif
//uint64_t AllocSvrManager::GetCurrentSequence(uint32_t id) {
//#ifdef DEBUG_TEST
// DCHECK(id<kMaxIDSize);
//#endif
// std::lock_guard<std::mutex> g(mutex_);
// return cur_seqs_[id];
//}
//
//uint64_t AllocSvrManager::FetchNextSequence(uint32_t id) {
//#ifdef DEBUG_TEST
// DCHECK(id<kMaxIDSize);
//#endif
//
// auto idx = id/kSectionSize;
//
// std::lock_guard<std::mutex> g(mutex_);
// auto seq = ++cur_seqs_[id];
// if (seq > section_max_seqs_[idx]) {
// ++section_max_seqs_[idx];
// SaveMaxSeq(idx, section_max_seqs_[idx]);
// }
// return seq;
//}

bool AllocSvrManager::GetCurrentSequence(uint32_t id, uint32_t client_version, SequenceWithRouterTable& o) {
if (!cache_alloc_entry_) return false;
if (state_ != kAllocInited) return false;

auto idx = id/kSectionSize;
std::lock_guard<std::mutex> g(mutex_);
auto seq = ++cur_seqs_[id];
if (seq > section_max_seqs_[idx]) {
++section_max_seqs_[idx];
Save(set_id_, alloc_id_, idx, section_max_seqs_[idx]);
for (auto & v : cache_alloc_entry_->ranges ) {
if (id>=v.id && id<v.id+v.size) {
o.seq = cur_seqs_[id];
if (client_version < table_.version()) {
o.router = new zproto::Router;
table_.SerializeToRouter(o.router);
}
return true;
}
}
return seq;

return false;
}

bool AllocSvrManager::FetchNextSequence(uint32_t id, uint32_t client_version, SequenceWithRouterTable& o) {
if (!cache_alloc_entry_) return false;
if (state_ != kAllocInited) return false;

for (auto & v : cache_alloc_entry_->ranges ) {
if (id>=v.id && id<v.id+v.size) {
auto idx = id/kSectionSize;

o.seq = ++cur_seqs_[id];
if (o.seq > section_max_seqs_[idx]) {
++section_max_seqs_[idx];
SaveMaxSeq(idx, section_max_seqs_[idx]);
}

if (client_version < table_.version()) {
o.router = new zproto::Router;
table_.SerializeToRouter(o.router);
}
return true;
}
}

return false;
}

//////////////////////////////////////////////////////////////////////////////////////////////
// 租约生效
void AllocSvrManager::OnLeaseValid(RouteTable& table) {
table_.Swap(table);
// route_search_table_.Initialize(table_);
cache_alloc_entry_ = table_.LookupAlloc(set_name_, alloc_name_);
state_ = kAllocWaitLoadSeq;
LoadMaxSeq();
}

// 路由表更新
void AllocSvrManager::OnLeaseUpdated(RouteTable& table) {
table_.Swap(table);
}

// 租约失效
void AllocSvrManager::OnLeaseInvalid() {
}

//////////////////////////////////////////////////////////////////////////////////////////////
// bytes
void AllocSvrManager::Load(uint32_t set_id, uint32_t alloc_id) {
void AllocSvrManager::LoadMaxSeq() {
// TODO(@beqni): NWR读
// 2. 去storesvr加载max_seqs
state_ = kAllocWaitLoad;
state_ = kAllocWaitRouteTable;

zproto::LoadMaxSeqsDataReq load_max_seqs_data_req;
load_max_seqs_data_req.set_set_id(set_id);
load_max_seqs_data_req.set_alloc_id(alloc_id);

ZRpcClientCall<zproto::LoadMaxSeqsDataRsp>("store_client",
MakeRpcRequest(load_max_seqs_data_req),
[&] (std::shared_ptr<ApiRpcOk<zproto::LoadMaxSeqsDataRsp>> load_max_seqs_data_rsp,
ProtoRpcResponsePtr rpc_error) -> int {
if (rpc_error) {
LOG(ERROR) << "LoadMaxSeqsDataReq - rpc_error: " << rpc_error->ToString();
OnLoad("");
} else {
LOG(INFO) << "LoadMaxSeqsDataReq - load_max_seqs_data_rsp: " << load_max_seqs_data_rsp->ToString();
OnLoad((*load_max_seqs_data_rsp)->max_seqs());
}
return 0;
});
ZRpcClientCall<zproto::LoadMaxSeqsDataRsp>(
"store_client",
MakeRpcRequest(load_max_seqs_data_req),
[&] (std::shared_ptr<ApiRpcOk<zproto::LoadMaxSeqsDataRsp>> load_max_seqs_data_rsp,
ProtoRpcResponsePtr rpc_error) -> int {
if (rpc_error) {
LOG(ERROR) << "LoadMaxSeqsDataReq - rpc_error: " << rpc_error->ToString();
OnMaxSeqLoaded("");
} else {
LOG(INFO) << "LoadMaxSeqsDataReq - load_max_seqs_data_rsp: "
<< load_max_seqs_data_rsp->ToString();
OnMaxSeqLoaded((*load_max_seqs_data_rsp)->max_seqs());
}
return 0;
});

// 先使用StoreSvrManager加载,跑通流程
// auto store = StoreSvrManager::GetInstance();
Expand All @@ -98,32 +178,32 @@ void AllocSvrManager::Load(uint32_t set_id, uint32_t alloc_id) {
// OnLoad(max_seqs_data);
}

void AllocSvrManager::Save(uint32_t set_id, uint32_t alloc_id, uint32_t section_id, uint64_t section_max_seq) {
void AllocSvrManager::SaveMaxSeq(uint32_t section_id, uint64_t section_max_seq) {
// TODO(@beqni): NWR写
// auto store = StoreSvrManager::GetInstance();
// bool rv = store->SetSectionsData(set_id, alloc_id, section_id, section_max_seq);

zproto::SaveMaxSeqReq save_max_seq_req;
save_max_seq_req.set_set_id(set_id);
save_max_seq_req.set_alloc_id(alloc_id);
save_max_seq_req.set_section_id(section_id);
save_max_seq_req.set_max_seq(section_max_seq);

ZRpcClientCall<zproto::SaveMaxSeqRsp>("store_client",
MakeRpcRequest(save_max_seq_req),
[section_max_seq, this] (std::shared_ptr<ApiRpcOk<zproto::SaveMaxSeqRsp>> save_max_seq_rsp,
ProtoRpcResponsePtr rpc_error) -> int {
if (rpc_error) {
LOG(ERROR) << "SaveMaxSeqReq - rpc_error: " << rpc_error->ToString();
this->OnSave(false);
} else {
LOG(INFO) << "SaveMaxSeqReq - load_max_seqs_data_rsp: " << save_max_seq_rsp->ToString();
this->OnSave((*save_max_seq_rsp)->last_max_seq() == section_max_seq-1);
}
return 0;
});
ZRpcClientCall<zproto::SaveMaxSeqRsp>(
"store_client",
MakeRpcRequest(save_max_seq_req),
[section_max_seq, this] (std::shared_ptr<ApiRpcOk<zproto::SaveMaxSeqRsp>> save_max_seq_rsp,
ProtoRpcResponsePtr rpc_error) -> int {
if (rpc_error) {
LOG(ERROR) << "SaveMaxSeqReq - rpc_error: " << rpc_error->ToString();
this->OnMaxSeqSaved(false);
} else {
LOG(INFO) << "SaveMaxSeqReq - load_max_seqs_data_rsp: " << save_max_seq_rsp->ToString();
this->OnMaxSeqSaved((*save_max_seq_rsp)->last_max_seq() == section_max_seq-1);
}
return 0;
});
}

void AllocSvrManager::OnLoad(const std::string& data) {
void AllocSvrManager::OnMaxSeqLoaded(const std::string& data) {
if (!data.empty()) {
// TODO(@benqi): 检查数据是否合法
// 复制数据
Expand All @@ -135,12 +215,13 @@ void AllocSvrManager::OnLoad(const std::string& data) {

std::fill(cur_seqs_.begin()+(kSectionSlotSize-1)*kSectionSize, cur_seqs_.end(), section_max_seqs_[kSectionSlotSize-1]);

state_ = kAllocLoaded;
state_ = kAllocInited;
} else {
state_ = kAllocError;
}
}

void AllocSvrManager::OnSave(bool result) {
void AllocSvrManager::OnMaxSeqSaved(bool result) {
}


Loading

0 comments on commit fd751f7

Please sign in to comment.