Skip to content

Commit

Permalink
add support for parameterized query in bolt driver (#398)
Browse files Browse the repository at this point in the history
add support for parameterized query
  • Loading branch information
ljcui authored Feb 3, 2024
1 parent 7581761 commit 1238a20
Showing 1 changed file with 31 additions and 2 deletions.
33 changes: 31 additions & 2 deletions src/server/bolt_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,33 @@
#include "db/galaxy.h"
namespace bolt {
extern boost::asio::io_service workers;
std::unordered_map<std::string, cypher::FieldData> ConvertParameters(
const std::unordered_map<std::string, std::any>& parameters) {
std::unordered_map<std::string, cypher::FieldData> ret;
for (auto& pair : parameters) {
if (pair.second.type() == typeid(std::string)) {
ret.emplace("$" + pair.first, lgraph_api::FieldData::String(
std::any_cast<const std::string&>(pair.second)));
} else if (pair.second.type() == typeid(int64_t)) {
ret.emplace("$" + pair.first, lgraph_api::FieldData::Int64(
std::any_cast<int64_t>(pair.second)));
} else if (pair.second.type() == typeid(double)) {
ret.emplace("$" + pair.first, lgraph_api::FieldData::Double(
std::any_cast<double>(pair.second)));
} else if (pair.second.type() == typeid(bool)) {
ret.emplace("$" + pair.first, lgraph_api::FieldData::Bool(
std::any_cast<bool>(pair.second)));
} else if (pair.second.type() == typeid(void)) {
ret.emplace("$" + pair.first, lgraph_api::FieldData());
} else {
throw lgraph_api::InputError(FMA_FMT(
"Unexpected cypher parameter type, parameter: {}", pair.first));
}
}
return ret;
}

void run_bolt_fsm(std::shared_ptr<BoltConnection> conn) {
void BoltFSM(std::shared_ptr<BoltConnection> conn) {
pthread_setname_np(pthread_self(), "bolt_fsm");
auto conn_id = conn->conn_id();
LOG_DEBUG() << FMA_FMT("bolt fsm thread[conn_id:{}] start.", conn_id);
Expand Down Expand Up @@ -109,9 +134,13 @@ void run_bolt_fsm(std::shared_ptr<BoltConnection> conn) {
"Missing 'db' item in the 'extra' info of 'Run' msg");
}
auto& graph = std::any_cast<const std::string&>(db_iter->second);
auto& field1 = std::any_cast<
const std::unordered_map<std::string, std::any>&>(fields[1]);
auto parameter = ConvertParameters(field1);
auto sm = BoltServer::Instance().StateMachine();
cypher::RTContext ctx(sm, sm->GetGalaxy(), session->user, graph);
ctx.SetBoltConnection(conn.get());
ctx.param_tab_ = std::move(parameter);
session->streaming_msg.reset();
cypher::ElapsedTime elapsed;
LOG_DEBUG() << "Bolt run " << cypher;
Expand Down Expand Up @@ -165,7 +194,7 @@ std::function<void(bolt::BoltConnection &conn, bolt::BoltMsg msg,
auto session = std::make_shared<BoltSession>();
session->state = SessionState::READY;
session->user = principal;
session->fsm_thread = std::thread(run_bolt_fsm, conn.shared_from_this());
session->fsm_thread = std::thread(BoltFSM, conn.shared_from_this());
session->fsm_thread.detach();
conn.SetContext(std::move(session));
ps.AppendSuccess(meta);
Expand Down

0 comments on commit 1238a20

Please sign in to comment.