diff --git a/src/server/bolt_handler.cpp b/src/server/bolt_handler.cpp index 07be5ba17e..7b6f0e200a 100644 --- a/src/server/bolt_handler.cpp +++ b/src/server/bolt_handler.cpp @@ -22,8 +22,33 @@ #include "db/galaxy.h" namespace bolt { extern boost::asio::io_service workers; +std::unordered_map<std::string, cypher::FieldData> ConvertParameters( + const std::unordered_map<std::string, std::any>& parameters) { + std::unordered_map<std::string, cypher::FieldData> ret; + for (auto& pair : parameters) { + if (pair.second.type() == typeid(std::string)) { + ret.emplace("$" + pair.first, lgraph_api::FieldData::String( + std::any_cast<const std::string&>(pair.second))); + } else if (pair.second.type() == typeid(int64_t)) { + ret.emplace("$" + pair.first, lgraph_api::FieldData::Int64( + std::any_cast<int64_t>(pair.second))); + } else if (pair.second.type() == typeid(double)) { + ret.emplace("$" + pair.first, lgraph_api::FieldData::Double( + std::any_cast<double>(pair.second))); + } else if (pair.second.type() == typeid(bool)) { + ret.emplace("$" + pair.first, lgraph_api::FieldData::Bool( + std::any_cast<bool>(pair.second))); + } else if (pair.second.type() == typeid(void)) { + ret.emplace("$" + pair.first, lgraph_api::FieldData()); + } else { + throw lgraph_api::InputError(FMA_FMT( + "Unexpected cypher parameter type, parameter: {}", pair.first)); + } + } + return ret; +} -void run_bolt_fsm(std::shared_ptr<BoltConnection> conn) { +void BoltFSM(std::shared_ptr<BoltConnection> conn) { pthread_setname_np(pthread_self(), "bolt_fsm"); auto conn_id = conn->conn_id(); LOG_DEBUG() << FMA_FMT("bolt fsm thread[conn_id:{}] start.", conn_id); @@ -109,9 +134,13 @@ void run_bolt_fsm(std::shared_ptr<BoltConnection> conn) { "Missing 'db' item in the 'extra' info of 'Run' msg"); } auto& graph = std::any_cast<const std::string&>(db_iter->second); + auto& field1 = std::any_cast< + const std::unordered_map<std::string, std::any>&>(fields[1]); + auto parameter = ConvertParameters(field1); auto sm = BoltServer::Instance().StateMachine(); cypher::RTContext ctx(sm, sm->GetGalaxy(), session->user, graph); ctx.SetBoltConnection(conn.get()); + ctx.param_tab_ = std::move(parameter); session->streaming_msg.reset(); cypher::ElapsedTime elapsed; LOG_DEBUG() << "Bolt run " << cypher; @@ -165,7 +194,7 @@ std::function<void(bolt::BoltConnection &conn, bolt::BoltMsg msg, auto session = std::make_shared<BoltSession>(); session->state = SessionState::READY; session->user = principal; - session->fsm_thread = std::thread(run_bolt_fsm, conn.shared_from_this()); + session->fsm_thread = std::thread(BoltFSM, conn.shared_from_this()); session->fsm_thread.detach(); conn.SetContext(std::move(session)); ps.AppendSuccess(meta);