Skip to content

Commit

Permalink
Improve bolt state machine (#405)
Browse files Browse the repository at this point in the history
* Improve bolt state machine

* remove parameter `bolt_thread_num`

* revert lgraph_standalone.json

* remove log
  • Loading branch information
ljcui authored Jan 31, 2024
1 parent 6aa89f0 commit a82bf84
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 117 deletions.
4 changes: 0 additions & 4 deletions src/core/global_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ std::map<std::string, std::string> lgraph::GlobalConfig::FormatAsOptions() const
AddOption(options, "Bootstrap Role", ha_bootstrap_role);
}
AddOption(options, "bolt_port", bolt_port);
AddOption(options, "bolt_thread_num", bolt_thread_num);
return options;
}

Expand Down Expand Up @@ -211,7 +210,6 @@ fma_common::Configuration lgraph::GlobalConfig::InitConfig

// bolt
bolt_port = 0;
bolt_thread_num = 10;

// parse options
fma_common::Configuration argparser;
Expand Down Expand Up @@ -319,7 +317,5 @@ fma_common::Configuration lgraph::GlobalConfig::InitConfig
.Comment("Node is witness (donot have data & can not apply request) or not.");
argparser.Add(bolt_port, "bolt_port", true)
.Comment("Bolt protocol port.");
argparser.Add(bolt_thread_num, "bolt_thread_num", true)
.Comment("bolt thread pool size.");
return argparser;
}
1 change: 0 additions & 1 deletion src/core/global_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ struct BasicConfigs {
bool enable_realtime_count{};
// bolt
int bolt_port = 0;
int bolt_thread_num = 10;
};

template <typename T>
Expand Down
3 changes: 3 additions & 0 deletions src/cypher/execution_plan/execution_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "procedure/procedure.h"
#include "cypher/execution_plan/validation/graph_name_checker.h"
#include "cypher/execution_plan/execution_plan.h"
#include "server/bolt_session.h"

namespace cypher {
using namespace parser;
Expand Down Expand Up @@ -1365,6 +1366,8 @@ int ExecutionPlan::Execute(RTContext *ctx) {
bolt::PackStream ps;
ps.AppendSuccess(meta);
ctx->bolt_conn_->PostResponse(std::move(ps.MutableBuffer()));
auto session = (bolt::BoltSession*)ctx->bolt_conn_->GetContext();
session->state = bolt::SessionState::STREAMING;
}

try {
Expand Down
72 changes: 64 additions & 8 deletions src/cypher/execution_plan/ops/op_produce_results.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,44 +211,100 @@ class ProduceResults : public OpBase {
return OP_ERR;
}
auto session = (bolt::BoltSession *)ctx->bolt_conn_->GetContext();
while (!session->current_msg) {
session->current_msg = session->msgs.Pop(std::chrono::milliseconds(100));
while (session->state == bolt::SessionState::STREAMING && !session->streaming_msg) {
session->streaming_msg = session->msgs.Pop(std::chrono::milliseconds(100));
if (ctx->bolt_conn_->has_closed()) {
LOG_INFO() << "The bolt connection is closed, cancel the op execution.";
return OP_ERR;
}
if (!session->streaming_msg) {
continue;
}
if (session->streaming_msg.value().type == bolt::BoltMsg::PullN ||
session->streaming_msg.value().type == bolt::BoltMsg::DiscardN) {
const auto &fields = session->streaming_msg.value().fields;
if (fields.size() != 1) {
std::string err =
FMA_FMT("{} msg fields size error, size: {}",
bolt::ToString(session->streaming_msg.value().type).c_str(),
fields.size());
LOG_ERROR() << err;
bolt::PackStream ps;
ps.AppendFailure({{"code", "error"}, {"message", err}});
ctx->bolt_conn_->PostResponse(std::move(ps.MutableBuffer()));
session->state = bolt::SessionState::FAILED;
return OP_ERR;
}
auto &val =
std::any_cast<const std::unordered_map<std::string, std::any> &>(fields[0]);
auto n = std::any_cast<int64_t>(val.at("n"));
session->streaming_msg.value().n = n;
} else if (session->streaming_msg.value().type == bolt::BoltMsg::Reset) {
LOG_INFO() << "Receive RESET, cancel the op execution.";
bolt::PackStream ps;
ps.AppendSuccess();
ctx->bolt_conn_->PostResponse(std::move(ps.MutableBuffer()));
session->state = bolt::SessionState::READY;
return OP_ERR;
} else {
LOG_ERROR() << FMA_FMT(
"Unexpected msg:{} in STREAMING state, cancel the op execution, "
"close the connection.",
bolt::ToString(session->streaming_msg.value().type));
ctx->bolt_conn_->Close();
return OP_ERR;
}
break;
}
if (session->state == bolt::SessionState::INTERRUPTED) {
LOG_WARN() << "The session state is INTERRUPTED, cancel the op execution.";
return OP_ERR;
} else if (session->state != bolt::SessionState::STREAMING) {
LOG_ERROR() << "Unexpected state: {} in op execution, close the connection.";
ctx->bolt_conn_->Close();
return OP_ERR;
} else if (session->streaming_msg.value().type != bolt::BoltMsg::PullN &&
session->streaming_msg.value().type != bolt::BoltMsg::DiscardN) {
LOG_ERROR() << FMA_FMT("Unexpected msg: {} in op execution, "
"cancel the op execution, close the connection.",
bolt::ToString(session->streaming_msg.value().type));
ctx->bolt_conn_->Close();
return OP_ERR;
}
auto child = children[0];
auto res = child->Consume(ctx);
if (res != OP_OK) {
session->ps.AppendSuccess();
session->state = bolt::SessionState::READY;
ctx->bolt_conn_->PostResponse(std::move(session->ps.MutableBuffer()));
session->ps.Reset();
return res;
}
if (session->current_msg.value().type == bolt::BoltMsg::PullN) {
if (session->streaming_msg.value().type == bolt::BoltMsg::PullN) {
auto record = ctx->result_->MutableRecord();
RRecordToURecord(ctx->txn_.get(), ctx->result_->Header(), child->record, *record);
session->ps.AppendRecords(ctx->result_->BoltRecords());
ctx->result_->ClearRecords();
bool sync = false;
if (--session->current_msg.value().n == 0) {
if (--session->streaming_msg.value().n == 0) {
std::unordered_map<std::string, std::any> meta;
meta["has_more"] = true;
session->ps.AppendSuccess(meta);
session->current_msg.reset();
session->state = bolt::SessionState::STREAMING;
session->streaming_msg.reset();
sync = true;
}
if (sync || session->ps.ConstBuffer().size() > 1024) {
ctx->bolt_conn_->PostResponse(std::move(session->ps.MutableBuffer()));
session->ps.Reset();
}
} else if (session->current_msg.value().type == bolt::BoltMsg::DiscardN) {
if (--session->current_msg.value().n == 0) {
} else if (session->streaming_msg.value().type == bolt::BoltMsg::DiscardN) {
if (--session->streaming_msg.value().n == 0) {
std::unordered_map<std::string, std::any> meta;
meta["has_more"] = true;
session->ps.AppendSuccess(meta);
session->current_msg.reset();
session->state = bolt::SessionState::STREAMING;
session->streaming_msg.reset();
ctx->bolt_conn_->PostResponse(std::move(session->ps.MutableBuffer()));
session->ps.Reset();
}
Expand Down
8 changes: 4 additions & 4 deletions src/cypher/execution_plan/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ void Scheduler::EvalCypher(RTContext *ctx, const std::string &script, ElapsedTim
r->Insert(header, lgraph::FieldData(data));
if (ctx->bolt_conn_) {
auto session = (bolt::BoltSession *)ctx->bolt_conn_->GetContext();
while (!session->current_msg) {
session->current_msg = session->msgs.Pop(std::chrono::milliseconds(100));
while (!session->streaming_msg) {
session->streaming_msg = session->msgs.Pop(std::chrono::milliseconds(100));
if (ctx->bolt_conn_->has_closed()) {
LOG_INFO() << "The bolt connection is closed, cancel the op execution.";
return;
Expand All @@ -109,9 +109,9 @@ void Scheduler::EvalCypher(RTContext *ctx, const std::string &script, ElapsedTim
meta["fields"] = ctx->result_->BoltHeader();
bolt::PackStream ps;
ps.AppendSuccess(meta);
if (session->current_msg.value().type == bolt::BoltMsg::PullN) {
if (session->streaming_msg.value().type == bolt::BoltMsg::PullN) {
ps.AppendRecords(ctx->result_->BoltRecords());
} else if (session->current_msg.value().type == bolt::BoltMsg::DiscardN) {
} else if (session->streaming_msg.value().type == bolt::BoltMsg::DiscardN) {
// ...
}
ps.AppendSuccess();
Expand Down
Loading

0 comments on commit a82bf84

Please sign in to comment.