From 831bcb7e9587047bb457cb3abfdd711900c1d9a1 Mon Sep 17 00:00:00 2001 From: GongChangYan <1084015508@qq.com> Date: Tue, 16 Jan 2024 14:00:45 +0800 Subject: [PATCH 1/8] fix mis (#375) Co-authored-by: Shipeng Qi --- procedures/algo_cpp/mis_core.cpp | 3 +++ procedures/algo_cpp/mis_procedure.cpp | 2 +- procedures/algo_cpp/mis_standalone.cpp | 5 ++++- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/procedures/algo_cpp/mis_core.cpp b/procedures/algo_cpp/mis_core.cpp index fd008c36a9..22248badb1 100644 --- a/procedures/algo_cpp/mis_core.cpp +++ b/procedures/algo_cpp/mis_core.cpp @@ -33,6 +33,9 @@ void MISCore(OlapBase &graph, ParallelVector &mis, size_t &mis_size while (active_num != 0) { active_num = graph.ProcessVertexInRange( [&](size_t dst) { + if (mis[dst]) { + return (size_t)0; + } auto edges = graph.OutEdges(dst); for (auto &edge : edges) { size_t src = edge.neighbour; diff --git a/procedures/algo_cpp/mis_procedure.cpp b/procedures/algo_cpp/mis_procedure.cpp index 5286bd00b7..62815e7517 100644 --- a/procedures/algo_cpp/mis_procedure.cpp +++ b/procedures/algo_cpp/mis_procedure.cpp @@ -34,7 +34,7 @@ extern "C" bool Process(GraphDB& db, const std::string& request, std::string& re return false; } - size_t construct_param = SNAPSHOT_PARALLEL; + size_t construct_param = SNAPSHOT_PARALLEL | SNAPSHOT_IDMAPPING; if (make_symmetric != 0) { construct_param = SNAPSHOT_PARALLEL | SNAPSHOT_UNDIRECTED; } diff --git a/procedures/algo_cpp/mis_standalone.cpp b/procedures/algo_cpp/mis_standalone.cpp index 0c993275dc..5d1f7d55d4 100644 --- a/procedures/algo_cpp/mis_standalone.cpp +++ b/procedures/algo_cpp/mis_standalone.cpp @@ -24,7 +24,7 @@ using json = nlohmann::json; class MyConfig : public ConfigBase { public: std::string name = std::string("mis"); - int make_symmetric = 0; + int make_symmetric = 1; void AddParameter(fma_common::Configuration& config) { ConfigBase::AddParameter(config); config.Add(make_symmetric, "make_symmetric", true) @@ -53,6 +53,9 @@ int main(int argc, char** argv) { start_time = get_time(); OlapOnDisk graph; MyConfig config(argc, argv); + if (!config.id_mapping) { + printf("id_mapping is false, the results may contain vertices that do not exist\n"); + } if (config.make_symmetric == 0) { graph.Load(config, INPUT_SYMMETRIC); From 98b31f28c1df94b4ede1cd497d4ec290eb303b3b Mon Sep 17 00:00:00 2001 From: lipanpan03 <41904587+lipanpan03@users.noreply.github.com> Date: Wed, 17 Jan 2024 19:15:55 +0800 Subject: [PATCH 2/8] Improve high availability documentation and usage examples (#380) * modify ha bootstrap * modify ha bootstrap doc * modify ha doc conf parameter * modify ha doc conf parameter --- .../2.running/3.high-availability-mode.md | 32 +++++++++------- .../2.running/3.high-availability-mode.md | 37 ++++++++++--------- 2 files changed, 37 insertions(+), 32 deletions(-) diff --git a/docs/en-US/source/5.developer-manual/2.running/3.high-availability-mode.md b/docs/en-US/source/5.developer-manual/2.running/3.high-availability-mode.md index 0ec0f707ec..5d22c81b36 100644 --- a/docs/en-US/source/5.developer-manual/2.running/3.high-availability-mode.md +++ b/docs/en-US/source/5.developer-manual/2.running/3.high-availability-mode.md @@ -31,7 +31,7 @@ After installing TuGraph, you can use the `lgraph_server` command to start a hig ### 3.1.The initial data is consistent When the data in all servers is the same or there is no data at startup, the user can -specify `--conf host1:port1,host2:port2` to start the server. +specify `--ha_conf host1:port1,host2:port2` to start the server. In this way, all prepared TuGraph instances can be added to the initial backup group at one time, All servers in the backup group elect `leader` according to the RAFT protocol, and other servers join the backup group with the role of `follower`. @@ -39,7 +39,7 @@ servers join the backup group with the role of `follower`. An example command to start an initial backup group is as follows: ```bash -$ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --conf 172.22.224.15:9090,172.22.224.16:9090,172.22.224.17:9090 +$ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --ha_conf 172.22.224.15:9090,172.22.224.16:9090,172.22.224.17:9090 ``` After the first server is started, it will elect itself as the 'leader' and organize a backup group with only itself. @@ -47,22 +47,26 @@ After the first server is started, it will elect itself as the 'leader' and orga ### 3.2.Inconsistent initial data If there is already data in the first server (imported by the `lgraph_import` tool or transferred from a server in non-high availability mode), And it has not been used in high-availability mode before, the user should use the boostrap method to start. Start the server with data in bootstrap -mode with the `ha_bootstrap_role` parameter as 1, and specify the machine as the `leader` through the `conf` +mode with the `ha_bootstrap_role` parameter as 1, and specify the machine as the `leader` through the `ha_conf` parameter. In bootstrap mode, the server will copy its own data to the new server before adding the newly joined server to the backup group, so that the data in each server is consistent. An example command to start a data server is as follows: ```bash -$ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --conf 172.22.224.15:9090 --ha_bootstrap_role 1 +$ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --ha_conf 172.22.224.15:9090,172.22.224.16:9090,172.22.224.17:9090 --ha_bootstrap_role 1 ``` -Other servers without data need to specify the `ha_bootstrap_role` parameter as 2, and specify the `leader` through the `conf` parameter. The command example is as follows +Other servers without data need to specify the `ha_bootstrap_role` parameter as 2, and specify the `leader` through the `ha_conf` parameter. The command example is as follows ```bash -**$ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --conf 172.22.224.15:9090 --ha_bootstrap_role 2 +**$ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --ha_conf 172.22.224.15:9090,172.22.224.16:9090,172.22.224.17:9090 --ha_bootstrap_role 2 ``` +**You need to pay attention to two points when using bootstrap to start an HA cluster:** +1. You need to wait for the `leader` node to generate a snapshot and start successfully before joining the `follower` node, otherwise the `follower` node may fail to join. When starting the `follower` node, you can configure the `ha_node_join_group_s` parameter to be slightly larger to allow multiple waits and timeout retries when joining the HA cluster. +2. The HA cluster can only use the bootstrap mode when it is started for the first time. It can only be started in the normal mode (see Section 3.1) when it is started later. In particular, multiple nodes of the same cluster cannot be started in the bootstrap mode, otherwise it may cause Data inconsistency + ## 4.Start witness node ### 4.1. Witness nodes are not allowed to become leader @@ -72,7 +76,7 @@ The startup method of `witness` node is the same as that of ordinary nodes. You An example command to start the witness node server is as follows: ```bash -$ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --conf 172.22.224.15:9090,172.22.224.16:9090,172.22.224.17:9090 --ha_is_witness 1 +$ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --ha_conf 172.22.224.15:9090,172.22.224.16:9090,172.22.224.17:9090 --ha_is_witness 1 ``` Note: By default, the `witness` node is not allowed to become the `leader` node, which can improve the performance of the cluster, but will reduce the availability of the cluster when the `leader` node crashes. @@ -84,7 +88,7 @@ You can specify the `ha_enable_witness_to_leader` parameter as `true`, so that t An example of the command to start the `witness` node server that is allowed to become the `leader` node is as follows: ```bash -$ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --conf 172.22.224.15:9090,172.22.224.16:9090,172.22.224.17:9090 --ha_is_witness 1 --ha_enable_witness_to_leader 1 +$ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --ha_conf 172.22.224.15:9090,172.22.224.16:9090,172.22.224.17:9090 --ha_is_witness 1 --ha_enable_witness_to_leader 1 ``` Note: Although allowing `witness` nodes to become `leader` nodes can improve the availability of the cluster, it may affect data consistency in extreme cases. Therefore, it should generally be ensured that the number of `witness` nodes + 1 is less than half of the total number of cluster nodes. @@ -92,11 +96,11 @@ Note: Although allowing `witness` nodes to become `leader` nodes can improve the ## 5.Scale out other servers After starting the initial backup group, if you want to scale out the backup group, add new servers to the backup group, -The `--conf HOST:PORT` option should be used, where `HOST` can be the IP address of any server already in this backup group, +The `--ha_conf HOST:PORT` option should be used, where `HOST` can be the IP address of any server already in this backup group, And `PORT` is its RPC port. E.g: ```bash -./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --conf 172.22.224.15:9090 +./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --ha_conf 172.22.224.15:9090 ``` This command will start a TuGraph server in high availability mode and try to add it to the backup group containing the server `172.22.224.15:9090`. @@ -108,17 +112,17 @@ When a server goes offline via 'CTRL-C', it will notify the current 'leader' ser If a server is terminated or disconnected from other servers in the backup group, the server is considered a failed node and the leader server will remove it from the backup group after a specified time limit. -If any server leaves the backup group and wishes to rejoin, it must start with the '--conf {HOST:PORT}' option, where 'HOST' is the IP address of a server in the current backup group. +If any server leaves the backup group and wishes to rejoin, it must start with the '--ha_conf {HOST:PORT}' option, where 'HOST' is the IP address of a server in the current backup group. ## 7.Restarting the Server Restarting the entire backup group is not recommended as it disrupts service. All servers can be shut down if desired. But on reboot, It must be ensured that at least N/2+1 servers in the backup group at shutdown can start normally, otherwise the startup will fail. and, Regardless of whether `enable_bootstrap` is specified as true when initially starting the replication group, restarting the server only needs to pass -Specify the `--conf host1:port1,host2:port2` parameter to restart all servers at once. The command example is as follows: +Specify the `--ha_conf host1:port1,host2:port2` parameter to restart all servers at once. The command example is as follows: ```bash -$ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --conf 172.22.224.15:9090,172.22.224.16:9090,172.22.224.17:9090 +$ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --ha_conf 172.22.224.15:9090,172.22.224.16:9090,172.22.224.17:9090 ``` ## 8.docker deploys a highly available cluster @@ -144,7 +148,7 @@ docker run --net=host -itd -p -v {src_dir}:{dst_dir} --name tugraph_ha tugraph/t ### 8.3.Start service Use the following command to start the service on each server, because docker and the host share IP, so you can directly specify to start the service on the host IP ```shell -$ lgraph_server -c lgraph.json --host 172.22.224.15 --rpc_port 9090 --enable_ha true --conf 172.22.224.15:9090,172.22.224.16:9090,172.22.224.17:9090 +$ lgraph_server -c lgraph.json --host 172.22.224.15 --rpc_port 9090 --enable_ha true --ha_conf 172.22.224.15:9090,172.22.224.16:9090,172.22.224.17:9090 ``` ## 9.Server Status diff --git a/docs/zh-CN/source/5.developer-manual/2.running/3.high-availability-mode.md b/docs/zh-CN/source/5.developer-manual/2.running/3.high-availability-mode.md index 7140252dd7..f9abf18093 100644 --- a/docs/zh-CN/source/5.developer-manual/2.running/3.high-availability-mode.md +++ b/docs/zh-CN/source/5.developer-manual/2.running/3.high-availability-mode.md @@ -43,38 +43,39 @@ v3.6及以上版本支持此功能。 ### 3.1.初始数据一致 -当启动时所有服务器中的数据相同或没有数据时,用户可以通过 -指定`--conf host1:port1,host2:port2`启动服务器。 -这种方式可以将准备好的所有TuGraph实例一次性加入初始备份组, -由备份组中的所有服务器根据raft协议选举出`leader`,并将其他 -服务器以`follower`的角色加入备份组。 +当启动时所有服务器中的数据相同或没有数据时,用户可以通过指定`--ha_conf host1:port1,host2:port2`启动服务器。 +这种方式可以将准备好的所有TuGraph实例一次性加入初始备份组,由备份组中的所有服务器根据raft协议选举出`leader`,并将其他服务器以`follower`的角色加入备份组。 启动初始备份组的命令示例如下所示: ```bash -$ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --conf 172.22.224.15:9090,172.22.224.16:9090,172.22.224.17:9090 +$ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --ha_conf 172.22.224.15:9090,172.22.224.16:9090,172.22.224.17:9090 ``` ### 3.2.初始数据不一致 如果第一台服务器中已有数据(以`lgraph_import`工具导入或从非高可用模式的服务器传输得到), 并且之前并未在高可用模式下使用,则用户应使用boostrap方式启动。 -以`ha_bootstrap_role`参数为1在bootstrap模式下启动有数据的服务器,并通过`conf`参数指定本机为`leader`。 +以`ha_bootstrap_role`参数为1在bootstrap模式下启动有数据的服务器,并通过`ha_conf`参数指定本机为`leader`。 在bootstrap模式下,服务器在将新加入的服务器添加到备份组之前会将自己的 数据复制到新服务器中,以使每个服务器中的数据保持一致。 启动有数据服务器的命令示例如下所示: ```bash -$ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --conf 172.22.224.15:9090 --ha_bootstrap_role 1 +$ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --ha_conf 172.22.224.15:9090,172.22.224.16:9090,172.22.224.17:9090 --ha_bootstrap_role 1 ``` -其他无数据的服务器需要指定`ha_bootstrap_role`参数为2,并通过`conf`参数指定`leader`即可,命令示例如下所示 +其他无数据的服务器需要指定`ha_bootstrap_role`参数为2,并通过`ha_conf`参数指定`leader`即可,命令示例如下所示 ```bash -$ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --conf 172.22.224.15:9090 --ha_bootstrap_role 2 +$ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --ha_conf 172.22.224.15:9090,172.22.224.16:9090,172.22.224.17:9090 --ha_bootstrap_role 2 ``` +**使用bootstrap启动HA集群时需要注意两点:** +1. 需要等待`leader`节点生成snapshot并且成功启动之后再加入`follower`节点,否则`follower`节点可能加入失败。在启动`follower`节点时可以将`ha_node_join_group_s`参数配置的稍大,以在加入HA集群时多次等待和超时重试。 +2. HA集群只有在第一次启动时可以使用bootstrap模式,后续再启动时只能使用普通模式(见3.1节)启动,尤其不能让同一个集群的多个节点以bootstrap模式启动,否则可能产生数据不一致的情况 + ## 4.启动witness节点 ### 4.1.不允许witness节点成为leader @@ -84,7 +85,7 @@ $ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --conf 172.22. 启动`witness`节点服务器的命令示例如下所示: ```bash -$ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --conf 172.22.224.15:9090,172.22.224.16:9090,172.22.224.17:9090 --ha_is_witness 1 +$ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --ha_conf 172.22.224.15:9090,172.22.224.16:9090,172.22.224.17:9090 --ha_is_witness 1 ``` 注:默认不允许`witness`节点成为`leader`节点,这可以提高集群的性能,但是在`leader`节点崩溃时会降低集群的可用性。 @@ -96,7 +97,7 @@ $ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --conf 172.22. 启动允许成为`leader`节点的`witness`节点服务器的命令示例如下所示: ```bash -$ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --conf 172.22.224.15:9090,172.22.224.16:9090,172.22.224.17:9090 --ha_is_witness 1 --ha_enable_witness_to_leader 1 +$ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --ha_conf 172.22.224.15:9090,172.22.224.16:9090,172.22.224.17:9090 --ha_is_witness 1 --ha_enable_witness_to_leader 1 ``` 注:尽管允许`witness`节点成为`leader`节点可以提高集群的可用性,但是在极端情况下可能会影响数据的一致性。因此一般应保证`witness`节点数量+1少于集群节点总数量的一半。 @@ -104,11 +105,11 @@ $ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --conf 172.22. ## 5.横向扩展其他服务器 启动初始备份组后,如果想对备份组进行横向扩展,要将新服务器添加到备份组, -应使用`--conf HOST:PORT`选项,其中`HOST`可以是该备份组中已有的任何服务器的 IP 地址, +应使用`--ha_conf HOST:PORT`选项,其中`HOST`可以是该备份组中已有的任何服务器的 IP 地址, 而`PORT`是其 RPC 端口。例如: ```bash -./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --conf 172.22.224.15:9090 +./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --ha_conf 172.22.224.15:9090 ``` 此命令将启动一台高可用模式的 TuGraph 服务器,并尝试将其添加到包含服务器`172.22.224.15:9090`的备份组中。 @@ -121,17 +122,17 @@ $ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --conf 172.22. 如果服务器被终止或者与备份组中的其他服务器失去连接,则该服务器将被视为失败节点,`leader`服务器将在特定时限后将其从备份组中删除。 -如果任何服务器离开备份组并希望重新加入,则必须从`--conf HOST:PORT`选项开始,其中`HOST`是当前备份组中的某台服务器的 IP 地址。 +如果任何服务器离开备份组并希望重新加入,则必须从`--ha_conf HOST:PORT`选项开始,其中`HOST`是当前备份组中的某台服务器的 IP 地址。 ## 7.重启服务器 不建议重新启动整个备份组,因为它会中断服务。如果需要,可以关闭所有服务器。但在重新启动时, 必须保证关闭时的备份组中至少有N/2+1的服务器能正常启动,否则启动失败。 并且, 无论初始启动复制组时是否指定`enable_bootstrap`为true,重启服务器时都只需通过 -指定`--conf host1:port1,host2:port2`参数一次性重启所有服务器即可,命令示例如下所示: +指定`--ha_conf host1:port1,host2:port2`参数一次性重启所有服务器即可,命令示例如下所示: ```bash -$ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --conf 172.22.224.15:9090,172.22.224.16:9090,172.22.224.17:9090 +$ ./lgraph_server -c lgraph.json --rpc_port 9090 --enable_ha true --ha_conf 172.22.224.15:9090,172.22.224.16:9090,172.22.224.17:9090 ``` ## 8.docker部署高可用集群 @@ -158,7 +159,7 @@ docker run --net=host -itd -p -v {src_dir}:{dst_dir} --name tugraph_ha tugraph/t ### 8.3.启动服务 在每台服务器上使用如下命令启动服务,因为docker和宿主机共享IP,所以可以直接指定在宿主机IP上启动服务 ```shell -$ lgraph_server -c lgraph.json --host 172.22.224.15 --rpc_port 9090 --enable_ha true --conf 172.22.224.15:9090,172.22.224.16:9090,172.22.224.17:9090 +$ lgraph_server -c lgraph.json --host 172.22.224.15 --rpc_port 9090 --enable_ha true --ha_conf 172.22.224.15:9090,172.22.224.16:9090,172.22.224.17:9090 ``` ## 9.查看服务器状态 From 17ccb73e8c35eacd8d3d1aa008566553572b4bb7 Mon Sep 17 00:00:00 2001 From: Ke Huang <569078986@qq.com> Date: Wed, 17 Jan 2024 19:26:54 +0800 Subject: [PATCH 3/8] Fix errors when making the package (#382) 1.make: unit_test is created by default. 2.make package: unit_test is not included. 3.Set -DWITH_TESTS=OFF to compile without unit_test. Co-authored-by: Shipeng Qi --- CMakeLists.txt | 9 +++------ Options.cmake | 6 +++++- ci/build_export.sh | 4 ++-- ci/build_release.sh | 3 +-- ci/github_ci.sh | 4 +--- test/CMakeLists.txt | 6 ++---- 6 files changed, 14 insertions(+), 18 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index a8d804fb0d..10c7b6f22b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -56,9 +56,6 @@ if (BUILD_PROCEDURE) endif (BUILD_PROCEDURE) # unit_test -add_subdirectory(test) -set(LGRAPH_TOOLKITS lgraph_import lgraph_backup lgraph_warmup lgraph_peek lgraph_export lgraph_binlog lgraph_peer) -add_dependencies(unit_test ${LGRAPH_TOOLKITS} lgraph_server) - -set_target_properties(unit_test PROPERTIES EXCLUDE_FROM_ALL TRUE) -set_target_properties(fma_unit_test PROPERTIES EXCLUDE_FROM_ALL TRUE) +if (WITH_TESTS) + add_subdirectory(test) +endif (WITH_TESTS) diff --git a/Options.cmake b/Options.cmake index b96a9d0e11..54e13c2faa 100644 --- a/Options.cmake +++ b/Options.cmake @@ -91,6 +91,11 @@ if (BUILD_PROCEDURE) message("Build procedures.") endif (BUILD_PROCEDURE) +option(WITH_TESTS "build with tests" ON) +if (WITH_TESTS) + message("Build with tests.") +endif (WITH_TESTS) + # disable krb5 set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DOPENSSL_NO_KRB5=1") @@ -189,4 +194,3 @@ if (ENABLE_PREDOWNLOAD_DEPENDS_PACKAGE) execute_process(COMMAND /bin/sh install.sh WORKING_DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/src/python/FMA_shell/pkg) endif () - diff --git a/ci/build_export.sh b/ci/build_export.sh index 2450d875d4..4d8ce39cd6 100644 --- a/ci/build_export.sh +++ b/ci/build_export.sh @@ -18,9 +18,9 @@ mkdir build && cd build # build cpp if [[ "$ASAN" == "asan" ]]; then echo 'build with asan ...' -cmake .. -DCMAKE_BUILD_TYPE=Debug -DENABLE_ASAN=ON -DBUILD_PROCEDURE=$WITH_PROCEDURE +cmake .. -DCMAKE_BUILD_TYPE=Debug -DENABLE_ASAN=ON -DBUILD_PROCEDURE=$WITH_PROCEDURE -DWITH_TEST=OFF else -cmake .. -DCMAKE_BUILD_TYPE=Coverage -DBUILD_PROCEDURE=$WITH_PROCEDURE +cmake .. -DCMAKE_BUILD_TYPE=Coverage -DBUILD_PROCEDURE=$WITH_PROCEDURE -DWITH_TEST=OFF fi make -j6 diff --git a/ci/build_release.sh b/ci/build_release.sh index 151b360a2a..ded8fd1548 100644 --- a/ci/build_release.sh +++ b/ci/build_release.sh @@ -23,5 +23,4 @@ else cmake .. -DCMAKE_BUILD_TYPE=Release fi -make -j6 - +make -j6 package diff --git a/ci/github_ci.sh b/ci/github_ci.sh index 9724d62e07..0e7671f8de 100644 --- a/ci/github_ci.sh +++ b/ci/github_ci.sh @@ -23,11 +23,9 @@ cmake .. -DCMAKE_BUILD_TYPE=Debug -DENABLE_ASAN=ON -DBUILD_PROCEDURE=$WITH_PROCE else cmake .. -DCMAKE_BUILD_TYPE=Coverage -DBUILD_PROCEDURE=$WITH_PROCEDURE fi -make -j2 +make -j2 package if [[ "$TEST" == "ut" ]]; then - make unit_test fma_unit_test -j2 - # build tugraph db management cd $WORKSPACE/deps/tugraph-db-management/ sh local_build.sh diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 49c87073bd..2a67977579 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -143,7 +143,5 @@ target_link_libraries(unit_test target_compile_definitions(unit_test PRIVATE FMA_IN_UNIT_TEST=1) -# install -install(TARGETS unit_test - RUNTIME DESTINATION bin - LIBRARY DESTINATION lib64) + +add_dependencies(unit_test ${LGRAPH_TOOLKITS} lgraph_server) From 2b6fc4b69defd920c75e7c199f0ed3920e95952e Mon Sep 17 00:00:00 2001 From: GongChangYan <1084015508@qq.com> Date: Thu, 18 Jan 2024 10:39:15 +0800 Subject: [PATCH 4/8] fix github issue #338 --- .../tugraph-compile-arm64v8-centos7-Dockerfile | 15 ++++++++++++++- ci/images/tugraph-compile-centos7-Dockerfile | 15 ++++++++++++++- ci/images/tugraph-compile-centos8-Dockerfile | 2 +- ci/images/tugraph-compile-ubuntu18.04-Dockerfile | 2 +- .../tugraph-runtime-arm64v8-centos7-Dockerfile | 2 +- ci/images/tugraph-runtime-centos7-Dockerfile | 3 ++- ci/images/tugraph-runtime-centos8-Dockerfile | 3 ++- ci/images/tugraph-runtime-ubuntu18.04-Dockerfile | 3 ++- src/client/python/TuGraphClient/TuGraphClient.py | 3 ++- 9 files changed, 39 insertions(+), 9 deletions(-) mode change 100755 => 100644 src/client/python/TuGraphClient/TuGraphClient.py diff --git a/ci/images/tugraph-compile-arm64v8-centos7-Dockerfile b/ci/images/tugraph-compile-arm64v8-centos7-Dockerfile index 551ef16eca..f8561520b3 100644 --- a/ci/images/tugraph-compile-arm64v8-centos7-Dockerfile +++ b/ci/images/tugraph-compile-arm64v8-centos7-Dockerfile @@ -50,7 +50,7 @@ RUN wget https://tugraph-web.oss-cn-beijing.aliyuncs.com/tugraph/deps/Python-3.6 && tar xf Python-3.6.9.tgz && cd Python-3.6.9 && ./configure --prefix=/usr/local \ && make -j4 && make install \ && python3 -m pip install --upgrade pip -i https://pypi.antfin-inc.com/simple/ --trusted-host pypi.antfin-inc.com \ - && python3 -m pip install pexpect requests pytest httpx cython==3.0.0a11 sphinx myst_parser sphinx_panels sphinx_rtd_theme numpy==1.19.5 torch==1.10.2 ogb pandas==0.24.2 -i https://pypi.antfin-inc.com/simple/ --trusted-host pypi.antfin-inc.com \ + && python3 -m pip install nest_asyncio pexpect requests pytest httpx cython==3.0.0a11 sphinx myst_parser sphinx_panels sphinx_rtd_theme numpy==1.19.5 torch==1.10.2 ogb pandas==0.24.2 -i https://pypi.antfin-inc.com/simple/ --trusted-host pypi.antfin-inc.com \ && rm -rf /Python* # dgl==1.0.0 :Could not find a version that satisfies the requirement dgl @@ -228,6 +228,19 @@ RUN wget https://tugraph-web.oss-cn-beijing.aliyuncs.com/tugraph/deps/rocksdb-v7 && tar zxf rocksdb-v7.8.3.tar.gz && cd rocksdb-7.8.3 && cmake -DCMAKE_BUILD_TYPE=Release -DPORTABLE=ON -DFORCE_SSE42=OFF -DWITH_JEMALLOC=ON \ && make -j6 && make install && rm -rf /rocksdb* +# set maven mirror +RUN mkdir -p ~/.m2 \ + && echo '' > ~/.m2/settings.xml \ + && echo ' ' >> ~/.m2/settings.xml \ + && echo ' ' >> ~/.m2/settings.xml \ + && echo ' alimaven' >> ~/.m2/settings.xml \ + && echo ' central' >> ~/.m2/settings.xml \ + && echo ' https://maven.aliyun.com/nexus/content/groups/public/' >> ~/.m2/settings.xml \ + && echo ' ' >> ~/.m2/settings.xml \ + && echo ' ' >> ~/.m2/settings.xml \ + && echo '' >> ~/.m2/settings.xml \ + && cat ~/.m2/settings.xml + # install antlr4-4.13.0 # if it is not accessible, replace it with the link below # https://github.com/antlr/antlr4/archive/refs/tags/4.13.0.tar.gz diff --git a/ci/images/tugraph-compile-centos7-Dockerfile b/ci/images/tugraph-compile-centos7-Dockerfile index 825a1fbff2..2da71dcb06 100644 --- a/ci/images/tugraph-compile-centos7-Dockerfile +++ b/ci/images/tugraph-compile-centos7-Dockerfile @@ -55,7 +55,7 @@ RUN wget https://tugraph-web.oss-cn-beijing.aliyuncs.com/tugraph/deps/Python-3.6 && tar xf Python-3.6.9.tgz && cd Python-3.6.9 && ./configure --prefix=/usr/local \ && make ${JFLAG} && make install \ && python3 -m pip install --upgrade pip ${PYPI} \ - && python3 -m pip install pexpect requests pytest httpx cython==3.0.0a11 sphinx myst_parser sphinx_panels sphinx_rtd_theme numpy==1.19.5 torch==1.10.2 dgl==1.0.0 ogb pandas==0.24.2 ${PYPI} \ + && python3 -m pip install nest_asyncio pexpect requests pytest httpx cython==3.0.0a11 sphinx myst_parser sphinx_panels sphinx_rtd_theme numpy==1.19.5 torch==1.10.2 dgl==1.0.0 ogb pandas==0.24.2 ${PYPI} \ && rm -rf /Python* # install cmake @@ -234,6 +234,19 @@ RUN wget https://tugraph-web.oss-cn-beijing.aliyuncs.com/tugraph/deps/rocksdb-v7 && tar zxf rocksdb-v7.8.3.tar.gz && cd rocksdb-7.8.3 && cmake -DCMAKE_BUILD_TYPE=Release -DPORTABLE=ON -DFORCE_SSE42=ON -DWITH_JEMALLOC=ON \ && make ${JFLAG} && make install && rm -rf /rocksdb* +# set maven mirror +RUN mkdir -p ~/.m2 \ + && echo '' > ~/.m2/settings.xml \ + && echo ' ' >> ~/.m2/settings.xml \ + && echo ' ' >> ~/.m2/settings.xml \ + && echo ' alimaven' >> ~/.m2/settings.xml \ + && echo ' central' >> ~/.m2/settings.xml \ + && echo ' https://maven.aliyun.com/nexus/content/groups/public/' >> ~/.m2/settings.xml \ + && echo ' ' >> ~/.m2/settings.xml \ + && echo ' ' >> ~/.m2/settings.xml \ + && echo '' >> ~/.m2/settings.xml \ + && cat ~/.m2/settings.xml + # install antlr4-4.13.0 # if it is not accessible, replace it with the link below # https://github.com/antlr/antlr4/archive/refs/tags/4.13.0.tar.gz diff --git a/ci/images/tugraph-compile-centos8-Dockerfile b/ci/images/tugraph-compile-centos8-Dockerfile index 1af2883601..47c9314154 100644 --- a/ci/images/tugraph-compile-centos8-Dockerfile +++ b/ci/images/tugraph-compile-centos8-Dockerfile @@ -58,7 +58,7 @@ RUN wget https://tugraph-web.oss-cn-beijing.aliyuncs.com/tugraph/deps/Python-3.6 && tar xf Python-3.6.9.tgz && cd Python-3.6.9 && ./configure --prefix=/usr/local \ && make ${JFLAG} && make install \ && python3 -m pip install --upgrade pip ${PYPI} \ - && python3 -m pip install pexpect requests pytest httpx cython==3.0.0a11 sphinx myst_parser sphinx_panels sphinx_rtd_theme numpy==1.19.5 torch==1.10.2 dgl==1.0.0 ogb pandas==0.24.2 ${PYPI} \ + && python3 -m pip install nest_asyncio pexpect requests pytest httpx cython==3.0.0a11 sphinx myst_parser sphinx_panels sphinx_rtd_theme numpy==1.19.5 torch==1.10.2 dgl==1.0.0 ogb pandas==0.24.2 ${PYPI} \ && rm -rf /Python* # install cmake diff --git a/ci/images/tugraph-compile-ubuntu18.04-Dockerfile b/ci/images/tugraph-compile-ubuntu18.04-Dockerfile index 3b32cb75e5..e2b19387b8 100644 --- a/ci/images/tugraph-compile-ubuntu18.04-Dockerfile +++ b/ci/images/tugraph-compile-ubuntu18.04-Dockerfile @@ -60,7 +60,7 @@ RUN wget https://tugraph-web.oss-cn-beijing.aliyuncs.com/tugraph/deps/Python-3.6 && tar xf Python-3.6.9.tgz && cd Python-3.6.9 && ./configure --prefix=/usr/local \ && make ${JFLAG} && make install \ && python3 -m pip install --upgrade pip ${PYPI} \ - && python3 -m pip install pexpect requests pytest httpx cython==3.0.0a11 sphinx myst_parser sphinx_panels sphinx_rtd_theme numpy==1.19.5 torch==1.10.2 dgl==1.0.0 ogb pandas==0.24.2 ${PYPI} \ + && python3 -m pip install nest_asyncio pexpect requests pytest httpx cython==3.0.0a11 sphinx myst_parser sphinx_panels sphinx_rtd_theme numpy==1.19.5 torch==1.10.2 dgl==1.0.0 ogb pandas==0.24.2 ${PYPI} \ && rm -rf /Python* # install cmake diff --git a/ci/images/tugraph-runtime-arm64v8-centos7-Dockerfile b/ci/images/tugraph-runtime-arm64v8-centos7-Dockerfile index a016299bb2..6ab167b49e 100644 --- a/ci/images/tugraph-runtime-arm64v8-centos7-Dockerfile +++ b/ci/images/tugraph-runtime-arm64v8-centos7-Dockerfile @@ -33,7 +33,7 @@ RUN wget https://tugraph-web.oss-cn-beijing.aliyuncs.com/tugraph/deps/Python-3.6 && tar xf Python-3.6.9.tgz && cd Python-3.6.9 && ./configure --prefix=/usr/local \ && make -j4 && make install \ && python3 -m pip install --upgrade pip -i https://pypi.antfin-inc.com/simple/ --trusted-host pypi.antfin-inc.com \ - && python3 -m pip install pexpect requests pytest httpx cython==3.0.0a11 sphinx myst_parser sphinx_panels sphinx_rtd_theme numpy==1.19.5 torch==1.10.2 ogb pandas==0.24.2 -i https://pypi.antfin-inc.com/simple/ --trusted-host pypi.antfin-inc.com \ + && python3 -m pip install nest_asyncio pexpect requests pytest httpx cython==3.0.0a11 sphinx myst_parser sphinx_panels sphinx_rtd_theme numpy==1.19.5 torch==1.10.2 ogb pandas==0.24.2 -i https://pypi.antfin-inc.com/simple/ --trusted-host pypi.antfin-inc.com \ && rm -rf /Python* # dgl==1.0.0 diff --git a/ci/images/tugraph-runtime-centos7-Dockerfile b/ci/images/tugraph-runtime-centos7-Dockerfile index ef81574949..3c29b544c9 100644 --- a/ci/images/tugraph-runtime-centos7-Dockerfile +++ b/ci/images/tugraph-runtime-centos7-Dockerfile @@ -10,6 +10,7 @@ RUN yum -x filesystem update -y && yum install -y \ make \ wget \ bzip2 \ + unzip \ openssl-static \ java-1.8.0-openjdk* \ libcurl-devel.x86_64 \ @@ -34,7 +35,7 @@ RUN wget https://tugraph-web.oss-cn-beijing.aliyuncs.com/tugraph/deps/Python-3.6 && tar xf Python-3.6.9.tgz && cd Python-3.6.9 && ./configure --prefix=/usr/local \ && make -j4 && make install \ && python3 -m pip install --upgrade pip -i https://pypi.antfin-inc.com/simple/ --trusted-host pypi.antfin-inc.com \ - && python3 -m pip install pexpect requests pytest httpx cython==3.0.0a11 sphinx myst_parser sphinx_panels sphinx_rtd_theme numpy==1.19.5 torch==1.10.2 dgl==1.0.0 ogb pandas==0.24.2 -i https://pypi.antfin-inc.com/simple/ --trusted-host pypi.antfin-inc.com \ + && python3 -m pip install nest_asyncio pexpect requests pytest httpx cython==3.0.0a11 sphinx myst_parser sphinx_panels sphinx_rtd_theme numpy==1.19.5 torch==1.10.2 dgl==1.0.0 ogb pandas==0.24.2 -i https://pypi.antfin-inc.com/simple/ --trusted-host pypi.antfin-inc.com \ && rm -rf /Python* # install cmake diff --git a/ci/images/tugraph-runtime-centos8-Dockerfile b/ci/images/tugraph-runtime-centos8-Dockerfile index 09eb7969e5..77fcf300ef 100644 --- a/ci/images/tugraph-runtime-centos8-Dockerfile +++ b/ci/images/tugraph-runtime-centos8-Dockerfile @@ -14,6 +14,7 @@ RUN yum update -y && yum install -y \ make \ wget \ bzip2 \ + unzip \ openssl-devel.x86_64 \ java-1.8.0-openjdk* \ libcurl-devel.x86_64 \ @@ -38,7 +39,7 @@ RUN wget https://tugraph-web.oss-cn-beijing.aliyuncs.com/tugraph/deps/Python-3.6 && tar xf Python-3.6.9.tgz && cd Python-3.6.9 && ./configure --prefix=/usr/local \ && make -j4 && make install \ && python3 -m pip install --upgrade pip -i https://pypi.antfin-inc.com/simple/ --trusted-host pypi.antfin-inc.com \ - && python3 -m pip install pexpect requests pytest httpx cython==3.0.0a11 sphinx myst_parser sphinx_panels sphinx_rtd_theme numpy==1.19.5 torch==1.10.2 dgl==1.0.0 ogb pandas==0.24.2 -i https://pypi.antfin-inc.com/simple/ --trusted-host pypi.antfin-inc.com \ + && python3 -m pip install nest_asyncio pexpect requests pytest httpx cython==3.0.0a11 sphinx myst_parser sphinx_panels sphinx_rtd_theme numpy==1.19.5 torch==1.10.2 dgl==1.0.0 ogb pandas==0.24.2 -i https://pypi.antfin-inc.com/simple/ --trusted-host pypi.antfin-inc.com \ && rm -rf /Python* # install cmake diff --git a/ci/images/tugraph-runtime-ubuntu18.04-Dockerfile b/ci/images/tugraph-runtime-ubuntu18.04-Dockerfile index 3f1fe20cd3..cbe0ed7da7 100644 --- a/ci/images/tugraph-runtime-ubuntu18.04-Dockerfile +++ b/ci/images/tugraph-runtime-ubuntu18.04-Dockerfile @@ -17,6 +17,7 @@ RUN apt-get update && apt-get install -y \ make \ dpkg \ bzip2 \ + unzip \ zlib1g-dev \ libssl-dev \ openjdk-8-jdk \ @@ -41,7 +42,7 @@ RUN wget https://tugraph-web.oss-cn-beijing.aliyuncs.com/tugraph/deps/Python-3.6 && tar xf Python-3.6.9.tgz && cd Python-3.6.9 && ./configure --prefix=/usr/local \ && make -j4 && make install \ && python3 -m pip install --upgrade pip -i https://pypi.antfin-inc.com/simple/ --trusted-host pypi.antfin-inc.com \ - && python3 -m pip install pexpect requests pytest httpx cython==3.0.0a11 sphinx myst_parser sphinx_panels sphinx_rtd_theme numpy==1.19.5 torch==1.10.2 dgl==1.0.0 ogb pandas==0.24.2 -i https://pypi.antfin-inc.com/simple/ --trusted-host pypi.antfin-inc.com \ + && python3 -m pip install nest_asyncio pexpect requests pytest httpx cython==3.0.0a11 sphinx myst_parser sphinx_panels sphinx_rtd_theme numpy==1.19.5 torch==1.10.2 dgl==1.0.0 ogb pandas==0.24.2 -i https://pypi.antfin-inc.com/simple/ --trusted-host pypi.antfin-inc.com \ && rm -rf /Python* # install cmake diff --git a/src/client/python/TuGraphClient/TuGraphClient.py b/src/client/python/TuGraphClient/TuGraphClient.py old mode 100755 new mode 100644 index 87f0b00a8e..45e0ee8297 --- a/src/client/python/TuGraphClient/TuGraphClient.py +++ b/src/client/python/TuGraphClient/TuGraphClient.py @@ -7,8 +7,9 @@ import warnings from functools import partial import httpx +import nest_asyncio - +nest_asyncio.apply() requests = httpx.AsyncClient() warnings.simplefilter("ignore", UserWarning) From 01993ac04c0baf5fca638a652f812a3d0dfffb6d Mon Sep 17 00:00:00 2001 From: ljcui <1003130223@qq.com> Date: Thu, 18 Jan 2024 10:39:26 +0800 Subject: [PATCH 5/8] add bolt streaming --- .../4.client-tools/6.bolt-client.md | 2 +- .../4.client-tools/7.bolt-console-client.md | 20 ++ .../4.client-tools/6.bolt-client.md | 2 +- .../4.client-tools/7.bolt-console-client.md | 20 ++ include/lgraph/lgraph_result.h | 6 +- src/bolt/blocking_queue.h | 9 + src/bolt/connection.cpp | 36 +++- src/bolt/connection.h | 18 +- src/bolt/io_service.h | 4 +- src/bolt/pack_stream.h | 6 + src/bolt/to_string.h | 8 +- src/cypher/execution_plan/execution_plan.cpp | 19 +- src/cypher/execution_plan/ops/op_create.h | 5 +- src/cypher/execution_plan/ops/op_delete.h | 4 +- src/cypher/execution_plan/ops/op_merge.h | 5 +- .../execution_plan/ops/op_produce_results.h | 65 +++++- src/cypher/execution_plan/ops/op_set.h | 5 +- src/cypher/execution_plan/runtime_context.h | 6 + src/cypher/execution_plan/scheduler.cpp | 65 +++--- src/import/import_v3.cpp | 5 +- src/lgraph_api/lgraph_result.cpp | 5 + src/server/bolt_handler.cpp | 56 ++--- src/server/bolt_session.h | 9 +- test/test_lgraph_cli.cpp | 202 +++++++++++++++--- toolkits/lgraph_cli.cpp | 105 ++++++--- 25 files changed, 521 insertions(+), 166 deletions(-) diff --git a/docs/en-US/source/5.developer-manual/4.client-tools/6.bolt-client.md b/docs/en-US/source/5.developer-manual/4.client-tools/6.bolt-client.md index 438c3c9313..e45c65a194 100644 --- a/docs/en-US/source/5.developer-manual/4.client-tools/6.bolt-client.md +++ b/docs/en-US/source/5.developer-manual/4.client-tools/6.bolt-client.md @@ -12,6 +12,6 @@ Add `bolt_port=7687` (modify the port) to TuGraph's config file to enable the bo TuGraph does not implement all the features of neo4j bolt protocol. -Streaming read and explicit transactions are currently not supported and need to avoid using these features. +Explicit transactions are currently not supported and need to avoid using these features. There are some examples for reference under `demos/Bolt` in the code repository. \ No newline at end of file diff --git a/docs/en-US/source/5.developer-manual/4.client-tools/7.bolt-console-client.md b/docs/en-US/source/5.developer-manual/4.client-tools/7.bolt-console-client.md index 9514609e1b..3ba53fb6f5 100644 --- a/docs/en-US/source/5.developer-manual/4.client-tools/7.bolt-console-client.md +++ b/docs/en-US/source/5.developer-manual/4.client-tools/7.bolt-console-client.md @@ -84,4 +84,24 @@ lgraph_cli --ip 127.0.0.1 --port 7687 --graph default --user admin --password 73 | (:person {id:3,born:1967,poster_image:"https://image.tmdb.org/t/p/w185/8iATAc5z5XOKFFARLsvaawa8MTY.jpg",name:"Carrie-Anne Moss"}) | +-------------------------------------------------------------------------------------------------------------------------------------+ 2 rows +``` + +## online export + +lgraph_cli supports streaming read, so just redirect the output to a file. The output format supports csv and json + +### csv + +```powershell + +echo "match(n) return n.id, n.name;" | lgraph_cli --ip 127.0.0.1 --port 7687 --graph default --user admin --password 73@TuGraph --format csv > output.txt + +``` + +### json + +```powershell + +echo "match(n) return n.id, n.name;" | lgraph_cli --ip 127.0.0.1 --port 7687 --graph default --user admin --password 73@TuGraph --format json > output.txt + ``` \ No newline at end of file diff --git a/docs/zh-CN/source/5.developer-manual/4.client-tools/6.bolt-client.md b/docs/zh-CN/source/5.developer-manual/4.client-tools/6.bolt-client.md index a8a7bc4424..b1f34412ce 100644 --- a/docs/zh-CN/source/5.developer-manual/4.client-tools/6.bolt-client.md +++ b/docs/zh-CN/source/5.developer-manual/4.client-tools/6.bolt-client.md @@ -10,6 +10,6 @@ TuGraph目前兼容了Neo4j的bolt协议,可以直接使用Neo4j的客户端 ## Neo4j客户端使用限制 -目前还没有兼容Neo4j bolt协议的全部功能,Neo4j客户端的的流式读取、显示事务功能目前还不支持,使用客户端的时候注意避免使用这两个功能。 +目前还没有兼容Neo4j bolt协议的全部功能,Neo4j客户端的显示事务功能目前还不支持,使用客户端的时候注意避免使用这两个功能。 在代码目录中的demo/Bolt下面有golang、java、js、python、rust 这几个语言的的例子,可供参考。 \ No newline at end of file diff --git a/docs/zh-CN/source/5.developer-manual/4.client-tools/7.bolt-console-client.md b/docs/zh-CN/source/5.developer-manual/4.client-tools/7.bolt-console-client.md index af71d05264..f9ef4f4aed 100644 --- a/docs/zh-CN/source/5.developer-manual/4.client-tools/7.bolt-console-client.md +++ b/docs/zh-CN/source/5.developer-manual/4.client-tools/7.bolt-console-client.md @@ -83,4 +83,24 @@ lgraph_cli --ip 127.0.0.1 --port 7687 --graph default --user admin --password 73 | (:person {id:3,born:1967,poster_image:"https://image.tmdb.org/t/p/w185/8iATAc5z5XOKFFARLsvaawa8MTY.jpg",name:"Carrie-Anne Moss"}) | +-------------------------------------------------------------------------------------------------------------------------------------+ 2 rows +``` + +## 在线数据导出 + +lgraph_cli 支持流式读取,导出数据只需要把lgraph_cli的输出重定向到文件中即可,导出格式支持csv和json。 + +### csv格式 + +```powershell + +echo "match(n) return n.id, n.name;" | lgraph_cli --ip 127.0.0.1 --port 7687 --graph default --user admin --password 73@TuGraph --format csv > output.txt + +``` + +### json格式 + +```powershell + +echo "match(n) return n.id, n.name;" | lgraph_cli --ip 127.0.0.1 --port 7687 --graph default --user admin --password 73@TuGraph --format json > output.txt + ``` \ No newline at end of file diff --git a/include/lgraph/lgraph_result.h b/include/lgraph/lgraph_result.h index 6e02be2dfd..236950a11a 100644 --- a/include/lgraph/lgraph_result.h +++ b/include/lgraph/lgraph_result.h @@ -49,7 +49,6 @@ class Record { public: friend class Result; - Record(const Record &); Record(Record &&); Record &operator=(const Record &); @@ -269,6 +268,11 @@ class Result { */ void Load(const std::string &json); + /** + * @brief Clear all the records, Size() will be 0. + */ + void ClearRecords(); + std::vector BoltHeader(); std::vector> BoltRecords(); diff --git a/src/bolt/blocking_queue.h b/src/bolt/blocking_queue.h index 1f91fbe2d4..a3ca59c482 100644 --- a/src/bolt/blocking_queue.h +++ b/src/bolt/blocking_queue.h @@ -39,6 +39,15 @@ class BlockingQueue { queue_.pop_back(); return ret; } + std::optional Pop(const std::chrono::milliseconds& timeout) { + std::unique_lock lock(mutex_); + if (!condition_.wait_for(lock, timeout, [this] {return !queue_.empty();})) { + return {}; + } + T ret = std::move(queue_.back()); + queue_.pop_back(); + return ret; + } private: std::mutex mutex_; diff --git a/src/bolt/connection.cpp b/src/bolt/connection.cpp index 870bf3df90..a667ff4838 100644 --- a/src/bolt/connection.cpp +++ b/src/bolt/connection.cpp @@ -17,6 +17,7 @@ */ #include #include "fma-common/string_formatter.h" +#include "fma-common/utils.h" #include "tools/json.hpp" #include "bolt/connection.h" #include "bolt/messages.h" @@ -36,7 +37,7 @@ void BoltConnection::Start() { // read handshake async_read(socket(), buffer(handshake_buffer_), std::bind(&BoltConnection::ReadHandshakeDone, - this, std::placeholders::_1, std::placeholders::_2)); + shared_from_this(), std::placeholders::_1, std::placeholders::_2)); } void BoltConnection::Close() { @@ -56,11 +57,13 @@ void BoltConnection::DoSend() { LOG_WARN() << FMA_FMT("async write error: {}, clear {} pending message", ec.message(), msg_queue_.size()); msg_queue_.clear(); + msg_queue_size_ = 0; Close(); return; } assert(msg_queue_.size() >= send_buffers_.size()); msg_queue_.erase(msg_queue_.begin(), msg_queue_.begin() + send_buffers_.size()); + msg_queue_size_ = msg_queue_.size(); send_buffers_.clear(); if (!msg_queue_.empty()) { DoSend(); @@ -76,6 +79,7 @@ void BoltConnection::Respond(std::string str) { } bool need_invoke = msg_queue_.empty(); msg_queue_.push_back(std::move(str)); + msg_queue_size_ = msg_queue_.size(); if (need_invoke) { DoSend(); } @@ -84,8 +88,15 @@ void BoltConnection::Respond(std::string str) { // async respond // used in non-io thread, thread safe void BoltConnection::PostResponse(std::string str) { - io_service().post([this, msg = std::move(str)]() mutable { - Respond(std::move(msg)); + while (!has_closed() && msg_queue_size_ > 1024) { + fma_common::SleepUs(1000); // sleep 1 ms + } + if (has_closed()) { + LOG_WARN() << "connection is closed, drop this message"; + return; + } + io_service().post([self = shared_from_this(), msg = std::move(str)]() mutable { + self->Respond(std::move(msg)); }); } @@ -120,7 +131,7 @@ void BoltConnection::ReadHandshakeDone(const boost::system::error_code& ec, cons } // write accepted version async_write(socket(),buffer(version_buffer_), // NOLINT - std::bind(&BoltConnection::WriteResponseDone, this, + std::bind(&BoltConnection::WriteResponseDone, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); } @@ -132,7 +143,7 @@ void BoltConnection::WriteResponseDone(const boost::system::error_code& ec, cons } // read chunk size async_read(socket(),buffer(&chunk_size_, sizeof(chunk_size_)), //NOLINT - std::bind(&BoltConnection::ReadChunkSizeDone, this, + std::bind(&BoltConnection::ReadChunkSizeDone, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); } @@ -155,13 +166,22 @@ void BoltConnection::ReadChunkSizeDone(const boost::system::error_code& ec, cons } LOG_DEBUG() << FMA_FMT("msg: {}, fields: {}", ToString(tag), Print(fields)); - handle_(*this, tag, std::move(fields)); + try { + handle_(*this, tag, std::move(fields)); + } catch (const std::exception& e) { + LOG_ERROR() << "Exception in bolt connection: " << e.what(); + Close(); + return; + } + if (has_closed()) { + return; + } chunk_.resize(0); } auto old_size = chunk_.size(); chunk_.resize(old_size + chunk_size_); async_read(socket(),buffer(chunk_.data() + old_size, chunk_size_), // NOLINT - std::bind(&BoltConnection::ReadChunkDone, this, + std::bind(&BoltConnection::ReadChunkDone, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); } @@ -172,7 +192,7 @@ void BoltConnection::ReadChunkDone(const boost::system::error_code& ec, const si return; } async_read(socket(), buffer(&chunk_size_, sizeof(chunk_size_)), - std::bind(&BoltConnection::ReadChunkSizeDone, this, + std::bind(&BoltConnection::ReadChunkSizeDone, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); } diff --git a/src/bolt/connection.h b/src/bolt/connection.h index b1a7bf5048..25f9af5a82 100644 --- a/src/bolt/connection.h +++ b/src/bolt/connection.h @@ -41,17 +41,23 @@ class Connection : private boost::asio::noncopyable { explicit Connection(boost::asio::io_service& io_service) : io_service_(io_service), socket_(io_service_), has_closed_(false) { } + virtual ~Connection() { + LOG_DEBUG() << FMA_FMT("destroy connection[id:{}]", conn_id_); + } tcp::socket& socket() { return socket_; } virtual void Close() { - // std::cout << "close conn[id:" << conn_id_ << "]" << std::endl; - socket_.close(); + boost::system::error_code ec; + socket_.close(ec); + if (ec) { + LOG_WARN() << "Close error: " << ec.message(); + } has_closed_ = true; } - virtual bool has_closed() {return has_closed_;} + bool has_closed() {return has_closed_;} int64_t& conn_id() { return conn_id_;} boost::asio::io_service& io_service() {return io_service_;} virtual void Start() = 0; - virtual void PostResponse(std::string res) {} + private: boost::asio::io_service& io_service_; tcp::socket socket_; @@ -70,7 +76,7 @@ class BoltConnection handle_(std::move(handle)) {} void Start() override; void Close() override; - void PostResponse(std::string res) override; + void PostResponse(std::string res); void Respond(std::string str); void SetContext(std::shared_ptr ctx) { context_ = std::move(ctx); @@ -78,7 +84,6 @@ class BoltConnection void* GetContext() { return context_.get(); } - private: void ReadHandshakeDone(const boost::system::error_code &ec, size_t size); void ReadChunkSizeDone(const boost::system::error_code &ec, size_t size); @@ -95,6 +100,7 @@ class BoltConnection std::vector chunk_; Unpacker unpacker_; std::deque msg_queue_; + std::atomic msg_queue_size_ = 0; std::vector send_buffers_; // only shared_ptr can store void pointer std::shared_ptr context_; diff --git a/src/bolt/io_service.h b/src/bolt/io_service.h index 1e1d434617..9d00e0af73 100644 --- a/src/bolt/io_service.h +++ b/src/bolt/io_service.h @@ -122,8 +122,8 @@ class IOService : private boost::asio::noncopyable { void clean_closed_conn() { for (auto it = connections_.cbegin(); it != connections_.cend();) { if (it->second->has_closed()) { - // std::cout << "connections pool erase connection[id: " - // << it->second->conn_id() << "]" << std::endl; + LOG_DEBUG() << FMA_FMT("erase connection[id:{},use_count:{}] from pool", + it->second->conn_id(), it->second.use_count()); it = connections_.erase(it); } else { ++it; diff --git a/src/bolt/pack_stream.h b/src/bolt/pack_stream.h index 72fe2794fb..1269519326 100644 --- a/src/bolt/pack_stream.h +++ b/src/bolt/pack_stream.h @@ -244,6 +244,12 @@ class PackStream { End(); } + void AppendRecords(const std::vector>& records) { + for (auto& r : records) { + AppendRecord(r); + } + } + const std::string& ConstBuffer() const { return chunker_.buf; } diff --git a/src/bolt/to_string.h b/src/bolt/to_string.h index a68f8b19d1..f4b05516c2 100644 --- a/src/bolt/to_string.h +++ b/src/bolt/to_string.h @@ -137,13 +137,15 @@ nlohmann::json ToJsonObj(const bolt::Path& path) { std::string Print(const std::any& boltType) { auto j = detail::ToJsonObj(boltType); - if (boltType.type() == typeid(bolt::Node) || - boltType.type() == typeid(bolt::Relationship) || - boltType.type() == typeid(bolt::Path)) { + if (j.is_string()) { return j.get(); } else { return j.dump(); } } +nlohmann::json ToJson(const std::any& boltType) { + return detail::ToJsonObj(boltType); +} + } // namespace bolt diff --git a/src/cypher/execution_plan/execution_plan.cpp b/src/cypher/execution_plan/execution_plan.cpp index a0826b9725..0e7f6faee8 100644 --- a/src/cypher/execution_plan/execution_plan.cpp +++ b/src/cypher/execution_plan/execution_plan.cpp @@ -241,6 +241,11 @@ static void BuildResultSetInfo(const QueryPart &stmt, ResultInfo &result_info) { } } } + } else if (!stmt.create_clause.empty() || stmt.delete_clause || + !stmt.merge_clause.empty() || !stmt.set_clause.empty()) { + CYPHER_THROW_ASSERT(result_info.header.colums.empty()); + result_info.header.colums.emplace_back( + "", "", false, lgraph_api::LGraphType::STRING); } } @@ -1354,6 +1359,14 @@ int ExecutionPlan::Execute(RTContext *ctx) { } ctx->result_ = std::make_unique(lgraph_api::Result(header)); + if (ctx->bolt_conn_) { + std::unordered_map meta; + meta["fields"] = ctx->result_->BoltHeader(); + bolt::PackStream ps; + ps.AppendSuccess(meta); + ctx->bolt_conn_->PostResponse(std::move(ps.MutableBuffer())); + } + try { OpBase::OpResult res; do { @@ -1400,18 +1413,12 @@ const ResultInfo &ExecutionPlan::GetResultInfo() const { return _result_info; } std::string ExecutionPlan::DumpPlan(int indent, bool statistics) const { std::string s = statistics ? "Profile statistics:\n" : "Execution Plan:\n"; OpBase::DumpStream(_root, indent, statistics, s); - if (LoggerManager::GetInstance().GetLevel() >= severity_level::DEBUG) { - LOG_DEBUG() << s; - } return s; } std::string ExecutionPlan::DumpGraph() const { std::string s; for (auto &g : _pattern_graphs) s.append(g.DumpGraph()); - if (LoggerManager::GetInstance().GetLevel() >= severity_level::DEBUG) { - LOG_DEBUG() << s; - } return s; } } // namespace cypher diff --git a/src/cypher/execution_plan/ops/op_create.h b/src/cypher/execution_plan/ops/op_create.h index 09041c82cd..8389afed16 100644 --- a/src/cypher/execution_plan/ops/op_create.h +++ b/src/cypher/execution_plan/ops/op_create.h @@ -174,10 +174,7 @@ class OpCreate : public OpBase { .append(" vertices, created ") .append(std::to_string(ctx->result_info_->statistics.edges_created)) .append(" edges."); - auto header = ctx->result_->Header(); - header.emplace_back(std::make_pair("", lgraph_api::LGraphType::STRING)); - ctx->result_->ResetHeader(header); - // ctx->result_info_->header.colums.emplace_back(""); + CYPHER_THROW_ASSERT(ctx->result_->Header().size() == 1); CYPHER_THROW_ASSERT(record); record->values.clear(); record->AddConstant(lgraph::FieldData(summary)); diff --git a/src/cypher/execution_plan/ops/op_delete.h b/src/cypher/execution_plan/ops/op_delete.h index 17c65b3413..9c709db377 100644 --- a/src/cypher/execution_plan/ops/op_delete.h +++ b/src/cypher/execution_plan/ops/op_delete.h @@ -94,9 +94,7 @@ class OpDelete : public OpBase { .append(" vertices, deleted ") .append(std::to_string(ctx->result_info_->statistics.edges_deleted)) .append(" edges."); - auto header = ctx->result_->Header(); - header.emplace_back(std::make_pair("", lgraph_api::LGraphType::STRING)); - ctx->result_->ResetHeader(header); + CYPHER_THROW_ASSERT(ctx->result_->Header().size() == 1); CYPHER_THROW_ASSERT(record); record->values.clear(); record->AddConstant(lgraph::FieldData(summary)); diff --git a/src/cypher/execution_plan/ops/op_merge.h b/src/cypher/execution_plan/ops/op_merge.h index 635409ccad..506d787dca 100644 --- a/src/cypher/execution_plan/ops/op_merge.h +++ b/src/cypher/execution_plan/ops/op_merge.h @@ -465,10 +465,7 @@ class OpMerge : public OpBase { .append(" vertices, merged ") .append(std::to_string(ctx->result_info_->statistics.edges_created)) .append(" edges."); - // ctx->result_info_->header.colums.emplace_back(""); - auto header = ctx->result_->Header(); - header.emplace_back(std::make_pair("", lgraph_api::LGraphType::STRING)); - ctx->result_->ResetHeader(header); + CYPHER_THROW_ASSERT(ctx->result_->Header().size() == 1); CYPHER_THROW_ASSERT(record); record->values.clear(); record->AddConstant(lgraph::FieldData(summary)); diff --git a/src/cypher/execution_plan/ops/op_produce_results.h b/src/cypher/execution_plan/ops/op_produce_results.h index a7ea732f52..546736dfc1 100644 --- a/src/cypher/execution_plan/ops/op_produce_results.h +++ b/src/cypher/execution_plan/ops/op_produce_results.h @@ -22,6 +22,7 @@ #include "lgraph/lgraph_types.h" #include "resultset/record.h" #include "server/json_convert.h" +#include "server/bolt_session.h" /* Runtime Record to User Record */ static void RRecordToURecord( @@ -203,14 +204,64 @@ class ProduceResults : public OpBase { Initialize(ctx); state_ = Consuming; } - if (children.empty()) return OP_DEPLETED; - auto child = children[0]; - auto res = child->Consume(ctx); - if (res != OP_OK) return res; - auto record = ctx->result_->MutableRecord(); - RRecordToURecord(ctx->txn_.get(), ctx->result_->Header(), child->record, *record); - return OP_OK; + if (ctx->bolt_conn_) { + if (ctx->bolt_conn_->has_closed()) { + LOG_INFO() << "The bolt connection is closed, cancel the op execution."; + return OP_ERR; + } + auto session = (bolt::BoltSession *)ctx->bolt_conn_->GetContext(); + while (!session->current_msg) { + session->current_msg = session->msgs.Pop(std::chrono::milliseconds(100)); + if (ctx->bolt_conn_->has_closed()) { + LOG_INFO() << "The bolt connection is closed, cancel the op execution."; + return OP_ERR; + } + } + auto child = children[0]; + auto res = child->Consume(ctx); + if (res != OP_OK) { + session->ps.AppendSuccess(); + ctx->bolt_conn_->PostResponse(std::move(session->ps.MutableBuffer())); + session->ps.Reset(); + return res; + } + if (session->current_msg.value().type == bolt::BoltMsg::PullN) { + auto record = ctx->result_->MutableRecord(); + RRecordToURecord(ctx->txn_.get(), ctx->result_->Header(), child->record, *record); + session->ps.AppendRecords(ctx->result_->BoltRecords()); + ctx->result_->ClearRecords(); + bool sync = false; + if (--session->current_msg.value().n == 0) { + std::unordered_map meta; + meta["has_more"] = true; + session->ps.AppendSuccess(meta); + session->current_msg.reset(); + sync = true; + } + if (sync || session->ps.ConstBuffer().size() > 1024) { + ctx->bolt_conn_->PostResponse(std::move(session->ps.MutableBuffer())); + session->ps.Reset(); + } + } else if (session->current_msg.value().type == bolt::BoltMsg::DiscardN) { + if (--session->current_msg.value().n == 0) { + std::unordered_map meta; + meta["has_more"] = true; + session->ps.AppendSuccess(meta); + session->current_msg.reset(); + ctx->bolt_conn_->PostResponse(std::move(session->ps.MutableBuffer())); + session->ps.Reset(); + } + } + return OP_OK; + } else { + auto child = children[0]; + auto res = child->Consume(ctx); + if (res != OP_OK) return res; + auto record = ctx->result_->MutableRecord(); + RRecordToURecord(ctx->txn_.get(), ctx->result_->Header(), child->record, *record); + return OP_OK; + } } /* Restart */ diff --git a/src/cypher/execution_plan/ops/op_set.h b/src/cypher/execution_plan/ops/op_set.h index 9f3397df50..e39700ad9f 100644 --- a/src/cypher/execution_plan/ops/op_set.h +++ b/src/cypher/execution_plan/ops/op_set.h @@ -227,10 +227,7 @@ class OpSet : public OpBase { summary.append("set ") .append(std::to_string(ctx->result_info_->statistics.properties_set)) .append(" properties."); - // ctx->result_info_->header.colums.emplace_back(""); - auto header = ctx->result_->Header(); - header.emplace_back(std::make_pair("", lgraph_api::LGraphType::STRING)); - ctx->result_->ResetHeader(header); + CYPHER_THROW_ASSERT(ctx->result_->Header().size() == 1); CYPHER_THROW_ASSERT(record); record->values.clear(); record->AddConstant(lgraph::FieldData(summary)); diff --git a/src/cypher/execution_plan/runtime_context.h b/src/cypher/execution_plan/runtime_context.h index b947adc4ce..505c8d5544 100644 --- a/src/cypher/execution_plan/runtime_context.h +++ b/src/cypher/execution_plan/runtime_context.h @@ -20,6 +20,7 @@ #include "cypher/parser/data_typedef.h" #include "cypher/resultset/result_info.h" #include "lgraph/lgraph_result.h" +#include "bolt/connection.h" namespace cypher { @@ -58,6 +59,7 @@ class RTContext : public SubmitQueryContext { std::unique_ptr txn_; std::unique_ptr result_info_; std::unique_ptr result_; + bolt::BoltConnection* bolt_conn_ = nullptr; RTContext() = default; @@ -75,5 +77,9 @@ class RTContext : public SubmitQueryContext { } return true; } + + void SetBoltConnection(bolt::BoltConnection* c) { + bolt_conn_ = c; + } }; } // namespace cypher diff --git a/src/cypher/execution_plan/scheduler.cpp b/src/cypher/execution_plan/scheduler.cpp index fb4998ea14..8c3140370b 100644 --- a/src/cypher/execution_plan/scheduler.cpp +++ b/src/cypher/execution_plan/scheduler.cpp @@ -35,6 +35,8 @@ #include "cypher/execution_plan/execution_plan_v2.h" #include "cypher/rewriter/GenAnonymousAliasRewriter.h" +#include "server/bolt_session.h" + namespace cypher { void Scheduler::Eval(RTContext *ctx, const lgraph_api::GraphQueryType &type, @@ -80,25 +82,48 @@ void Scheduler::EvalCypher(RTContext *ctx, const std::string &script, ElapsedTim plan = std::make_shared(); plan->Build(visitor.GetQuery(), visitor.CommandType(), ctx); plan->Validate(ctx); - if (visitor.CommandType() == parser::CmdType::EXPLAIN) { + if (plan->CommandType() != parser::CmdType::QUERY) { ctx->result_info_ = std::make_unique(); ctx->result_ = std::make_unique(); - - ctx->result_->ResetHeader({{"@plan", lgraph_api::LGraphType::STRING}}); + std::string header, data; + if (plan->CommandType() == parser::CmdType::EXPLAIN) { + header = "@plan"; + data = plan->DumpPlan(0, false); + } else { + header = "@profile"; + data = plan->DumpGraph(); + } + ctx->result_->ResetHeader({{header, lgraph_api::LGraphType::STRING}}); auto r = ctx->result_->MutableRecord(); - r->Insert("@plan", lgraph::FieldData(plan->DumpPlan(0, false))); + r->Insert(header, lgraph::FieldData(data)); + if (ctx->bolt_conn_) { + auto session = (bolt::BoltSession *)ctx->bolt_conn_->GetContext(); + while (!session->current_msg) { + session->current_msg = session->msgs.Pop(std::chrono::milliseconds(100)); + if (ctx->bolt_conn_->has_closed()) { + LOG_INFO() << "The bolt connection is closed, cancel the op execution."; + return; + } + } + std::unordered_map meta; + meta["fields"] = ctx->result_->BoltHeader(); + bolt::PackStream ps; + ps.AppendSuccess(meta); + if (session->current_msg.value().type == bolt::BoltMsg::PullN) { + ps.AppendRecords(ctx->result_->BoltRecords()); + } else if (session->current_msg.value().type == bolt::BoltMsg::DiscardN) { + // ... + } + ps.AppendSuccess(); + ctx->bolt_conn_->PostResponse(std::move(ps.MutableBuffer())); + } return; } LOG_DEBUG() << "Plan cache disabled."; - // FMA_DBG_STREAM(Logger()) - // << "Miss execution plan cache, build plan for this query."; - } else { - // FMA_DBG_STREAM(Logger()) - // << "Hit execution plan cache."; } + LOG_DEBUG() << plan->DumpPlan(0, false); + LOG_DEBUG() << plan->DumpGraph(); elapsed.t_compile = fma_common::GetTime() - t0; - plan->DumpGraph(); - plan->DumpPlan(0, false); if (!plan->ReadOnly() && ctx->optimistic_) { while (1) { try { @@ -113,24 +138,6 @@ void Scheduler::EvalCypher(RTContext *ctx, const std::string &script, ElapsedTim } elapsed.t_total = fma_common::GetTime() - t0; elapsed.t_exec = elapsed.t_total - elapsed.t_compile; - if (plan->CommandType() == CmdType::PROFILE) { - ctx->result_info_ = std::make_unique(); - ctx->result_ = std::make_unique(); - ctx->result_->ResetHeader({{"@profile", lgraph_api::LGraphType::STRING}}); - - auto r = ctx->result_->MutableRecord(); - r->Insert("@profile", lgraph::FieldData(plan->DumpGraph())); - return; - } else { - /* promote priority of the recent plan - * OR add the plan to the plan cache. */ - // tls_plan_cache.Put(script, plan); - // FMA_DBG_STREAM(Logger()) - // << "Current Plan Cache (tid" << std::this_thread::get_id() << "):"; - // for (auto &p : tls_plan_cache.List()) - // FMA_DBG_STREAM(Logger()) << p.first << "\n"; - // return; - } } void Scheduler::EvalGql(RTContext *ctx, const std::string &script, ElapsedTime &elapsed) { diff --git a/src/import/import_v3.cpp b/src/import/import_v3.cpp index 2e88869350..f541928df7 100644 --- a/src/import/import_v3.cpp +++ b/src/import/import_v3.cpp @@ -544,13 +544,15 @@ void Importer::EdgeDataToSST() { BufferedBlobWriter blob_writer(db_->GetLightningGraph()); std::atomic pending_tasks(0); std::atomic sst_file_id(0); + cuckoohash_map unique_index_keys; for (auto& file : data_files_) { // skip vertex files if (file.is_vertex_file) { continue; } boost::asio::post(*parse_file_threads_, - [this, &file, &blob_writer, &pending_tasks, &sst_file_id](){ + [this, &file, &blob_writer, &pending_tasks, + &sst_file_id, &unique_index_keys](){ try { std::vector fts; { @@ -574,7 +576,6 @@ void Importer::EdgeDataToSST() { LabelId src_label_id, dst_label_id; std::vector field_ids; std::vector unique_index_info; - cuckoohash_map unique_index_keys; Schema* schema; { auto txn = db_->CreateReadTxn(); diff --git a/src/lgraph_api/lgraph_result.cpp b/src/lgraph_api/lgraph_result.cpp index 9a52bbc8c0..123cbbdd52 100644 --- a/src/lgraph_api/lgraph_result.cpp +++ b/src/lgraph_api/lgraph_result.cpp @@ -368,6 +368,11 @@ std::string Result::Dump(bool is_standard) { } } +void Result::ClearRecords() { + result.clear(); + row_count_ = -1; +} + std::vector Result::BoltHeader() { std::vector ret; for (auto& h : header) { diff --git a/src/server/bolt_handler.cpp b/src/server/bolt_handler.cpp index 2b31824b46..32ca34ea4d 100644 --- a/src/server/bolt_handler.cpp +++ b/src/server/bolt_handler.cpp @@ -62,17 +62,18 @@ std::functionstate = SessionState::STREAMING; // Now only implicit transactions are supported - workers.post([&conn, fields = std::move(fields)](){ + workers.post([conn = conn.shared_from_this(), + fields = std::move(fields)](){ if (fields.size() < 3) { LOG_ERROR() << "Run msg fields size error, size: " << fields.size(); bolt::PackStream ps; ps.AppendFailure({{"code", "error"}, {"message", "Run msg fields size error"}}); ps.AppendIgnored(); - conn.PostResponse(std::move(ps.MutableBuffer())); + conn->PostResponse(std::move(ps.MutableBuffer())); return; } - auto session = (BoltSession*)conn.GetContext(); + auto session = (BoltSession*)conn->GetContext(); auto& cypher = std::any_cast(fields[0]); auto& extra = std::any_cast< const std::unordered_map&>(fields[2]); @@ -84,12 +85,13 @@ std::functionPostResponse(std::move(ps.MutableBuffer())); return; } auto& graph = std::any_cast(db_iter->second); auto sm = BoltServer::Instance().StateMachine(); cypher::RTContext ctx(sm, sm->GetGalaxy(), session->user, graph); + ctx.SetBoltConnection(conn.get()); cypher::ElapsedTime elapsed; LOG_DEBUG() << "Bolt run " << cypher; try { @@ -101,38 +103,38 @@ std::functionPostResponse(std::move(ps.MutableBuffer())); return; } LOG_DEBUG() << "Cypher execution completed"; - std::unordered_map meta; - meta["fields"] = ctx.result_->BoltHeader(); - bolt::PackStream ps; - ps.AppendSuccess(meta); - conn.PostResponse(std::move(ps.MutableBuffer())); - auto msg = session->msgs.Pop(); - ps.Reset(); - if (msg == BoltMsg::PullN) { - for (const auto& r : ctx.result_->BoltRecords()) { - ps.AppendRecord(r); - } - ps.AppendSuccess(); - } else if (msg == BoltMsg::DiscardN) { - ps.AppendSuccess(); - } else { - LOG_WARN() << "Unexpected bolt msg: " << ToString(msg) << " after RUN"; - ps.AppendSuccess(); - } - conn.PostResponse(std::move(ps.MutableBuffer())); - LOG_DEBUG() << "Posting response completed"; session->state = SessionState::READY; }); } else if (msg == BoltMsg::PullN) { + if (fields.size() != 1) { + LOG_ERROR() << "PullN msg fields size error, size: " << fields.size(); + ps.AppendFailure({{"code", "error"}, + {"message", "PullN msg fields size error"}}); + conn.Respond(std::move(ps.MutableBuffer())); + conn.Close(); + return; + } + auto& val = std::any_cast&>(fields[0]); + auto n = std::any_cast(val.at("n")); auto session = (BoltSession*)conn.GetContext(); - session->msgs.Push(BoltMsg::PullN); + session->msgs.Push({BoltMsg::PullN, n}); } else if (msg == BoltMsg::DiscardN) { + if (fields.size() != 1) { + LOG_ERROR() << "DiscardN msg fields size error, size: " << fields.size(); + ps.AppendFailure({{"code", "error"}, + {"message", "DiscardN msg fields size error"}}); + conn.Respond(std::move(ps.MutableBuffer())); + conn.Close(); + return; + } + auto& val = std::any_cast&>(fields[0]); + auto n = std::any_cast(val.at("n")); auto session = (BoltSession*)conn.GetContext(); - session->msgs.Push(BoltMsg::DiscardN); + session->msgs.Push({BoltMsg::DiscardN, n}); } else if (msg == BoltMsg::Begin || msg == BoltMsg::Commit || msg == BoltMsg::Rollback) { diff --git a/src/server/bolt_session.h b/src/server/bolt_session.h index 0a13c1d21c..9015e1c133 100644 --- a/src/server/bolt_session.h +++ b/src/server/bolt_session.h @@ -31,10 +31,17 @@ enum class SessionState { FAILED }; +struct BoltMsgDetail { + BoltMsg type; + int64_t n = 0; +}; + struct BoltSession { + std::optional current_msg; + PackStream ps; std::string user; SessionState state; - BlockingQueue msgs; + BlockingQueue msgs; }; } // namespace bolt diff --git a/test/test_lgraph_cli.cpp b/test/test_lgraph_cli.cpp index d4b13fd8e7..546265a209 100644 --- a/test/test_lgraph_cli.cpp +++ b/test/test_lgraph_cli.cpp @@ -22,18 +22,6 @@ class TestLGraphCLI : public TuGraphTest {}; TEST_F(TestLGraphCLI, LGraphCLI) { using namespace lgraph; - lgraph::GlobalConfig conf; - conf.db_dir = "./testdb"; - conf.http_port = 7774; - conf.enable_rpc = false; - conf.bolt_port = 7687; - conf.bind_host = "127.0.0.1"; - AutoCleanDir cleaner(conf.db_dir); - auto server = StartLGraphServer(conf); - auto WriteFile = [](const std::string& name, const std::string& content) { - fma_common::OutputFmaStream out(name); - out.Write(content.data(), content.size()); - }; std::string file = "statements.txt"; std::string statements = R"( CALL db.createVertexLabel('person', 'int8', @@ -83,27 +71,149 @@ create (n1)-[r:is_friend {message:'hi..'}]->(n2); match (n)-[r]->(m) return n; +explain match (n)-[r]->(m) return n; + match (n)-[r]->(m) return r; +profile match (n)-[r]->(m) return r; + match p = (n)-[r]->(m) return p; )"; - std::string expected = R"(0 rows + + { + lgraph::GlobalConfig conf; + conf.db_dir = "./testdb"; + conf.http_port = 7774; + conf.enable_rpc = false; + conf.bolt_port = 7687; + conf.bind_host = "127.0.0.1"; + AutoCleanDir cleaner(conf.db_dir); + auto server = StartLGraphServer(conf); + auto WriteFile = [](const std::string& name, const std::string& content) { + fma_common::OutputFmaStream out(name); + out.Write(content.data(), content.size()); + }; + std::string expected = R"( + + +created 2 vertices, created 0 edges. + +created 0 vertices, created 1 edges. +n +(:person {int16:16,float:1.1100000143051147,double:100.98,int8:8,string:"foo bar",int32:32,int64:64,bool:true,datetime:1493632800000000000,date:17289}) +@plan +Execution Plan: +Produce Results + Project [n] + Expand(All) [n --> m ] + Node By Label Scan [n:person] + +r +[:is_friend {message:"hi.."}] +@profile +Current Pattern Graph: +N[0] n:person (MATCHED) +N[1] m:person (MATCHED) +R[0 --> 1] r:{<1>: is_friend} (MATCHED) +Symbol: [n] type(NODE), scope(LOCAL), symbol_id(0) +Symbol: [m] type(NODE), scope(LOCAL), symbol_id(2) +Symbol: [r] type(RELATIONSHIP), scope(LOCAL), symbol_id(1) + +p +(:person {int16:16,float:1.1100000143051147,double:100.98,int8:8,string:"foo bar",int32:32,int64:64,bool:true,datetime:1493632800000000000,date:17289})-[:is_friend {message:"hi.."}]->(:person {int16:116,float:11.109999656677246,double:1100.98,int8:18,string:"bar foo",int32:132,int64:164,bool:true,datetime:1525168800000000000,date:17654}) +)"; + + WriteFile(file, statements); + std::string lgraph_cli = + "./lgraph_cli --ip 127.0.0.1 --port 7687 --graph default " + "--user admin --password 73@TuGraph --format csv"; + lgraph::SubProcess cli(FMA_FMT("{} < {}", lgraph_cli, file)); + cli.Wait(); + UT_EXPECT_EQ(cli.Stdout(), expected); + server->Kill(); + server->Wait(); + } + + { + lgraph::GlobalConfig conf; + conf.db_dir = "./testdb"; + conf.http_port = 7774; + conf.enable_rpc = false; + conf.bolt_port = 7687; + conf.bind_host = "127.0.0.1"; + AutoCleanDir cleaner(conf.db_dir); + auto server = StartLGraphServer(conf); + auto WriteFile = [](const std::string& name, const std::string& content) { + fma_common::OutputFmaStream out(name); + out.Write(content.data(), content.size()); + }; + std::string expected = R"xx([""] +[""] +[""] +["created 2 vertices, created 0 edges."] +[""] +["created 0 vertices, created 1 edges."] +["n"] +["(:person {int16:16,float:1.1100000143051147,double:100.98,int8:8,string:\"foo bar\",int32:32,int64:64,bool:true,datetime:1493632800000000000,date:17289})"] +["@plan"] +["Execution Plan:\nProduce Results\n Project [n]\n Expand(All) [n --> m ]\n Node By Label Scan [n:person]\n"] +["r"] +["[:is_friend {message:\"hi..\"}]"] +["@profile"] +["Current Pattern Graph:\nN[0] n:person (MATCHED)\nN[1] m:person (MATCHED)\nR[0 --> 1] r:{<1>: is_friend} (MATCHED)\nSymbol: [n] type(NODE), scope(LOCAL), symbol_id(0)\nSymbol: [m] type(NODE), scope(LOCAL), symbol_id(2)\nSymbol: [r] type(RELATIONSHIP), scope(LOCAL), symbol_id(1)\n"] +["p"] +["(:person {int16:16,float:1.1100000143051147,double:100.98,int8:8,string:\"foo bar\",int32:32,int64:64,bool:true,datetime:1493632800000000000,date:17289})-[:is_friend {message:\"hi..\"}]->(:person {int16:116,float:11.109999656677246,double:1100.98,int8:18,string:\"bar foo\",int32:132,int64:164,bool:true,datetime:1525168800000000000,date:17654})"] +)xx"; + + WriteFile(file, statements); + std::string lgraph_cli = + "./lgraph_cli --ip 127.0.0.1 --port 7687 --graph default " + "--user admin --password 73@TuGraph --format json"; + lgraph::SubProcess cli(FMA_FMT("{} < {}", lgraph_cli, file)); + cli.Wait(); + UT_EXPECT_EQ(cli.Stdout(), expected); + server->Kill(); + server->Wait(); + } + + { + lgraph::GlobalConfig conf; + conf.db_dir = "./testdb"; + conf.http_port = 7774; + conf.enable_rpc = false; + conf.bolt_port = 7687; + conf.bind_host = "127.0.0.1"; + AutoCleanDir cleaner(conf.db_dir); + auto server = StartLGraphServer(conf); + auto WriteFile = [](const std::string& name, const std::string& content) { + fma_common::OutputFmaStream out(name); + out.Write(content.data(), content.size()); + }; + std::string expected = R"xx(+--+ + ++--+ 0 rows -+----------------------------------------+ -| | -+----------------------------------------+ -| "created 2 vertices, created 0 edges." | -+----------------------------------------+ ++--+ + ++--+ + +0 rows + ++--------------------------------------+ +| | ++--------------------------------------+ +| created 2 vertices, created 0 edges. | ++--------------------------------------+ 1 rows -+----------------------------------------+ -| | -+----------------------------------------+ -| "created 0 vertices, created 1 edges." | -+----------------------------------------+ ++--------------------------------------+ +| | ++--------------------------------------+ +| created 0 vertices, created 1 edges. | ++--------------------------------------+ 1 rows @@ -115,6 +225,18 @@ match p = (n)-[r]->(m) return p; 1 rows ++-------------------------------------------+ +| @plan | ++-------------------------------------------+ +| Execution Plan: | +| Produce Results | +| Project [n] | +| Expand(All) [n --> m ] | +| Node By Label Scan [n:person] | ++-------------------------------------------+ + +1 rows + +-------------------------------+ | r | +-------------------------------+ @@ -123,6 +245,20 @@ match p = (n)-[r]->(m) return p; 1 rows ++------------------------------------------------------------+ +| @profile | ++------------------------------------------------------------+ +| Current Pattern Graph: | +| N[0] n:person (MATCHED) | +| N[1] m:person (MATCHED) | +| R[0 --> 1] r:{<1>: is_friend} (MATCHED) | +| Symbol: [n] type(NODE), scope(LOCAL), symbol_id(0) | +| Symbol: [m] type(NODE), scope(LOCAL), symbol_id(2) | +| Symbol: [r] type(RELATIONSHIP), scope(LOCAL), symbol_id(1) | ++------------------------------------------------------------+ + +1 rows + +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | p | +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ @@ -131,14 +267,16 @@ match p = (n)-[r]->(m) return p; 1 rows -)"; +)xx"; - WriteFile(file, statements); - std::string lgraph_cli = "./lgraph_cli --ip 127.0.0.1 --port 7687 --graph default " - "--user admin --password 73@TuGraph"; - lgraph::SubProcess cli(FMA_FMT("{} < {}", lgraph_cli, file)); - cli.Wait(); - UT_EXPECT_EQ(cli.Stdout(), expected); - server->Kill(); - server->Wait(); + WriteFile(file, statements); + std::string lgraph_cli = + "./lgraph_cli --ip 127.0.0.1 --port 7687 --graph default " + "--user admin --password 73@TuGraph"; + lgraph::SubProcess cli(FMA_FMT("{} < {}", lgraph_cli, file)); + cli.Wait(); + UT_EXPECT_EQ(cli.Stdout(), expected); + server->Kill(); + server->Wait(); + } } diff --git a/toolkits/lgraph_cli.cpp b/toolkits/lgraph_cli.cpp index 4790044d29..f1df265468 100644 --- a/toolkits/lgraph_cli.cpp +++ b/toolkits/lgraph_cli.cpp @@ -17,6 +17,7 @@ */ #include +#include #include #include #include "fma-common/configuration.h" @@ -28,6 +29,12 @@ #include "tabulate/table.hpp" using namespace boost; +enum class OutputFormat { + TABLE = 0, + CSV, + JSON +}; + std::any ReadMessage(asio::ip::tcp::socket& socket, bolt::Hydrator& hydrator) { hydrator.ClearErr(); uint16_t size = 0; @@ -53,23 +60,51 @@ std::any ReadMessage(asio::ip::tcp::socket& socket, bolt::Hydrator& hydrator) { return ret.first; } -std::pair FetchRecords( - asio::ip::tcp::socket& socket, bolt::Hydrator& hydrator) { +bool FetchRecords(asio::ip::tcp::socket& socket, bolt::Hydrator& hydrator, OutputFormat of) { std::string error; + std::optional> header; tabulate::Table table; while (true) { auto msg = ReadMessage(socket, hydrator); if (msg.type() == typeid(std::optional)) { const auto& val = std::any_cast&>(msg); - std::vector values; + nlohmann::json values = nlohmann::json::array(); for (auto& item : val.value().values) { - values.push_back(bolt::Print(item)); + values.push_back(bolt::ToJson(item)); + } + if (values.size() != header.value().size()) { + LOG_FATAL() << FMA_FMT("mismatched data, header column size: {}, " + "record column size: {}", header.value().size(), values.size()); + } + if (of != OutputFormat::JSON) { + std::vector strs; + for (const auto& v : values) { + if (v.is_string()) { + strs.push_back(v.get()); + } else { + strs.push_back(v.dump()); + } + } + if (of == OutputFormat::TABLE) { + table.add_row({strs.begin(), strs.end()}); + } else { + LOG_INFO() << boost::algorithm::join(strs, ","); + } + } else { + LOG_INFO() << values.dump(); } - table.add_row({values.begin(), values.end()}); } else if (msg.type() == typeid(bolt::Success*)) { auto success = std::any_cast(msg); - if (table.size() == 0) { - table.add_row({success->fields.begin(), success->fields.end()}); + if (!header) { + header = success->fields; + if (of == OutputFormat::TABLE) { + table.add_row({header.value().begin(), header.value().end()}); + } else if (of == OutputFormat::CSV) { + LOG_INFO() << boost::algorithm::join(header.value(), ","); + } else { + nlohmann::json j = header.value(); + LOG_INFO() << j.dump(); + } } else { break; } @@ -84,7 +119,21 @@ std::pair FetchRecords( break; } } - return {table, error}; + + if (error.empty()) { + if (of == OutputFormat::TABLE) { + if (table.size() > 0) { + LOG_INFO() << table << "\n"; + LOG_INFO() << FMA_FMT("{} rows", table.size() - 1) << "\n"; + } else { + LOG_INFO() << FMA_FMT("{} rows", table.size()) << "\n"; + } + } + return true; + } else { + LOG_ERROR() << error << "\n"; + return false; + } } std::string ReadStatement() { @@ -128,24 +177,40 @@ char* hints(const char* buf, int* color, int* bold) { int main(int argc, char** argv) { fma_common::Configuration config; + std::string format = "table"; std::string ip = "127.0.0.1"; int port = 7687; std::string graph = "default"; std::string username = "admin"; std::string password = "73@TuGraph"; + config.Add(format, "format", true). + Comment("output format (table, csv, json)"). + SetPossibleValues({"table", "csv", "json"}); config.Add(ip, "ip", true).Comment("TuGraph bolt protocol ip"); config.Add(port, "port", true).Comment("TuGraph bolt protocol port"); config.Add(graph, "graph", true).Comment("Graph to use"); config.Add(username, "user", true).Comment("User to login"); config.Add(password, "password", true).Comment("Password to use when connecting to server"); - config.ExitAfterHelp(true); - config.ParseAndFinalize(argc, argv); + try { + config.ExitAfterHelp(true); + config.ParseAndFinalize(argc, argv); + } catch (std::exception& e) { + LOG_ERROR() << e.what(); + return -1; + } bool is_terminal = false; if (isatty(STDIN_FILENO)) { is_terminal = true; } + OutputFormat of = OutputFormat::TABLE; + if (format == "csv") { + of = OutputFormat::CSV; + } else if (format == "json") { + of = OutputFormat::JSON; + } + const char* history_file = ".lgraphcli_history"; linenoiseSetCompletionCallback(completion); linenoiseSetHintsCallback(hints); @@ -213,23 +278,13 @@ int main(int argc, char** argv) { meta.clear(); meta = {{"db", graph}}; ps.AppendRun(statement, {}, meta); - ps.AppendPullN(1024); + ps.AppendPullN(-1); asio::write(socket, asio::const_buffer(ps.ConstBuffer().data(), ps.ConstBuffer().size())); - std::string error; - tabulate::Table table; - std::tie(table, error) = FetchRecords(socket, hydrator); - if (error.empty()) { - if (table.size() > 1) { - LOG_INFO() << table << "\n"; - } - LOG_INFO() << FMA_FMT("{} rows", table.size()-1) << "\n"; - if (is_terminal) { - linenoiseHistoryAdd(statement.c_str()); - linenoiseHistorySave(history_file); - } - } else { - LOG_ERROR() << error << "\n"; + bool ret = FetchRecords(socket, hydrator, of); + if (is_terminal && ret) { + linenoiseHistoryAdd(statement.c_str()); + linenoiseHistorySave(history_file); } } catch (std::exception& e) { LOG_ERROR() << e.what(); From 477b6b0b1a60dc9e33405a9ca3a7b667505a2a00 Mon Sep 17 00:00:00 2001 From: lipanpan03 <656461146@qq.com> Date: Thu, 18 Jan 2024 10:39:36 +0800 Subject: [PATCH 6/8] improve ha start in bootstrapping --- src/server/ha_state_machine.cpp | 56 +++++++++++++++------------------ 1 file changed, 26 insertions(+), 30 deletions(-) diff --git a/src/server/ha_state_machine.cpp b/src/server/ha_state_machine.cpp index 5859a2af80..3084e268d1 100644 --- a/src/server/ha_state_machine.cpp +++ b/src/server/ha_state_machine.cpp @@ -16,15 +16,21 @@ namespace braft { } void lgraph::HaStateMachine::Start() { - static const int64_t BOOTSTRAP_LOG_INDEX = 1024; - + // check ha node can be started + butil::EndPoint addr; + butil::str2endpoint(config_.host.c_str(), config_.rpc_port, &addr); + if (butil::IP_ANY == addr.ip) { + throw std::runtime_error("TuGraph can't be started from IP_ANY (0.0.0.0) in HA mode."); + } if (node_) { LOG_WARN() << "HaStateMachine already started."; return; } - ::lgraph::StateMachine::Start(); + + // bootstrap if (config_.ha_bootstrap_role == 1) { + const int64_t BOOTSTRAP_LOG_INDEX = 1024; if (config_.ha_is_witness) { ::lgraph::StateMachine::Stop(); throw std::runtime_error("Can not bootstrap on witness node"); @@ -42,8 +48,6 @@ void lgraph::HaStateMachine::Start() { galaxy_->BootstrapRaftLogIndex(new_log_index); // now bootstrap braft::BootstrapOptions options; - butil::EndPoint addr; - butil::str2endpoint(config_.host.c_str(), config_.rpc_port, &addr); options.group_conf.add_peer(braft::PeerId(addr)); options.fsm = this; options.node_owns_fsm = false; @@ -66,11 +70,7 @@ void lgraph::HaStateMachine::Start() { #endif } - butil::EndPoint addr; - butil::str2endpoint(config_.host.c_str(), config_.rpc_port, &addr); - if (butil::IP_ANY == addr.ip) { - throw std::runtime_error("TuGraph can't be started from IP_ANY (0.0.0.0) in HA mode."); - } + // start braft::Node braft::Node* node = new braft::Node("lgraph", braft::PeerId(addr, 0, config_.ha_is_witness)); braft::NodeOptions node_options; @@ -100,42 +100,38 @@ void lgraph::HaStateMachine::Start() { fma_common::StringFormatter::Format("Failed to init raft node: ec={}", r)); } node_ = node; + + // start HA by initial_conf if (config_.ha_bootstrap_role != 2) { LOG_INFO() << "Start HA by initial_conf"; - int t = 0; - while (!joined_group_.load(std::memory_order_acquire)) { + for (int t = 0; t < config_.ha_node_join_group_s; t++) { LOG_INFO() << "Waiting to join replication group..."; - if (t++ > config_.ha_node_join_group_s) { - break; - } fma_common::SleepS(1); - } - if (t <= config_.ha_node_join_group_s) { - return; + if (joined_group_.load(std::memory_order_acquire)) return; } } + + // start HA by add_peer if (config_.ha_bootstrap_role != 1) { LOG_INFO() << "Start HA by add_peer"; braft::Configuration init_conf; - int t = 0; if (init_conf.parse_from(config_.ha_conf) != 0) { ::lgraph::StateMachine::Stop(); throw std::runtime_error("Fail to parse configuration " + config_.ha_conf); } - while (!joined_group_.load(std::memory_order_acquire)) { + for (int t = 0; t < config_.ha_node_join_group_s; t++) { + fma_common::SleepS(1); + LOG_INFO() << "Waiting to join replication group..."; braft::PeerId my_id(addr); butil::Status status = braft::cli::add_peer(braft::GroupId("lgraph"), init_conf, my_id, braft::cli::CliOptions()); - if (!status.ok()) { + if (status.ok()) + return; + else LOG_WARN() << "Failed to join group: " << status.error_str(); - } - LOG_INFO() << "Waiting to join replication group..."; - if (t++ > config_.ha_node_join_group_s) { - throw std::runtime_error("Failed to join replication group"); - } - fma_common::SleepS(1); } } + throw std::runtime_error("Failed to start node && join group!"); } void lgraph::HaStateMachine::Stop() { @@ -182,12 +178,12 @@ bool lgraph::HaStateMachine::DoRequest(bool is_write, const LGraphRequest* req, void lgraph::HaStateMachine::on_leader_start(int64_t term) { leader_term_.store(term, std::memory_order_release); - LOG_INFO() << "Node becomes leader"; - joined_group_ = true; + joined_group_.store(true, std::memory_order_release); #if LGRAPH_SHARE_DIR _HoldWriteLock(galaxy_->GetRWLock()); galaxy_->ReloadFromDisk(); #endif + LOG_INFO() << "Node becomes leader"; } void lgraph::HaStateMachine::on_leader_stop(const butil::Status& status) { @@ -215,8 +211,8 @@ void lgraph::HaStateMachine::on_stop_following(const ::braft::LeaderChangeContex } void lgraph::HaStateMachine::on_start_following(const ::braft::LeaderChangeContext& ctx) { + joined_group_.store(true, std::memory_order_release); LOG_INFO() << "Node joined the group as follower."; - joined_group_ = true; } bool lgraph::HaStateMachine::ReplicateAndApplyRequest(const LGraphRequest* req, From 1b2f540db53d42038a4cb3c07a63667f7fe87328 Mon Sep 17 00:00:00 2001 From: lipanpan03 <41904587+lipanpan03@users.noreply.github.com> Date: Thu, 18 Jan 2024 20:17:50 +0800 Subject: [PATCH 7/8] fix ha witness ut (#383) * fix ha witness ut * fix ha witness ut --- test/test_ha_witness.cpp | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/test/test_ha_witness.cpp b/test/test_ha_witness.cpp index b22b579ba6..71a7a15aae 100644 --- a/test/test_ha_witness.cpp +++ b/test/test_ha_witness.cpp @@ -259,7 +259,7 @@ TEST_F(TestHAWitness, HAWitnessDisableLeader) { std::string cmd = FMA_FMT(cmd_f.c_str(), "ha" + std::to_string(stoi(output[1]) - 29091)); ret = system(cmd.c_str()); UT_EXPECT_EQ(ret, 0); - fma_common::SleepS(10); + fma_common::SleepS(15); ret = client->ImportSchemaFromContent(result, schema) && client->ImportDataFromContent(result, data_desc, data_person, ","); UT_EXPECT_TRUE(ret); @@ -270,9 +270,18 @@ TEST_F(TestHAWitness, HAWitnessDisableLeader) { cmd = FMA_FMT(cmd_f.c_str(), "ha" + std::to_string(stoi(output[1]) - 29091)); ret = system(cmd.c_str()); UT_EXPECT_EQ(ret, 0); - fma_common::SleepS(10); + fma_common::SleepS(15); // start follower, witness which has newer log will be leader temporary + try { + client = std::make_unique( + this->host + ":29094", "admin", "73@TuGraph"); + } catch (std::exception &e) { + cmd = FMA_FMT(witness_cmd_f, "ha3", host, "27074", "29094", + host + ":29092," + host + ":29093," + host + ":29094", true); + UT_EXPECT_EQ(system(cmd.c_str()), 0); + fma_common::SleepS(5); + } boost::split(output, follower_rpc, boost::is_any_of(":")); std::string ha_dir = "ha" + std::to_string(stoi(output[1]) - 29091); cmd = FMA_FMT(server_cmd_f, ha_dir, host, stoi(output[1]) - 2020, @@ -281,6 +290,7 @@ TEST_F(TestHAWitness, HAWitnessDisableLeader) { UT_EXPECT_EQ(ret, 0); fma_common::SleepS(15); master_rpc.clear(); + int times = 0; do { try { client = std::make_unique( @@ -306,7 +316,7 @@ TEST_F(TestHAWitness, HAWitnessDisableLeader) { fma_common::SleepS(1); continue; } - } while (true); + } while (++times < 20); fma_common::SleepS(5); ret = client->CallCypherToLeader(result, "MATCH (n) RETURN COUNT(n)"); UT_EXPECT_TRUE(ret); From 1679aed1945d6618c033c41ee9ca63d240cc7356 Mon Sep 17 00:00:00 2001 From: lishaoheng <133954629+gtahoo@users.noreply.github.com> Date: Fri, 19 Jan 2024 13:08:38 +0800 Subject: [PATCH 8/8] fix #100 : add upload file size limit. (#384) fix browser legacy-upload file size limit. Co-authored-by: Shipeng Qi --- .../source/4.user-guide/2.tugraph-browser-legacy.md | 9 +++++---- .../source/4.user-guide/2.tugraph-browser-legacy.md | 1 + 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/docs/en-US/source/4.user-guide/2.tugraph-browser-legacy.md b/docs/en-US/source/4.user-guide/2.tugraph-browser-legacy.md index b7e5e9f2fa..c0a3a1674b 100644 --- a/docs/en-US/source/4.user-guide/2.tugraph-browser-legacy.md +++ b/docs/en-US/source/4.user-guide/2.tugraph-browser-legacy.md @@ -18,7 +18,7 @@ When the user completes the installation of the graph database, you can access i ### 2. Login -![alt 登录](https://tugraph-web-static.oss-cn-beijing.aliyuncs.com/%E6%96%87%E6%A1%A3/2.Operating/1.tugraph-browser-lpgin.png) +![alt Login](https://tugraph-web-static.oss-cn-beijing.aliyuncs.com/%E6%96%87%E6%A1%A3/2.Operating/1.tugraph-browser-lpgin.png) - When the page is opened successfully, the first thing you see is the login page, and the user needs to fill in the account number and password to log in. - Default account: admin @@ -31,11 +31,11 @@ When the user completes the installation of the graph database, you can access i - When you log in for the first time, the system will create an empty graph by default - ![alt 快速上手](https://tugraph-web-static.oss-cn-beijing.aliyuncs.com/%E6%96%87%E6%A1%A3/2.Operating/2.tugraph-browser-quickstart-01.png) + ![alt quick start](https://tugraph-web-static.oss-cn-beijing.aliyuncs.com/%E6%96%87%E6%A1%A3/2.Operating/2.tugraph-browser-quickstart-01.png) - The user clicks on the help option and selects Get Started quickly - ![alt 帮助](https://tugraph-web-static.oss-cn-beijing.aliyuncs.com/%E6%96%87%E6%A1%A3/2.Operating/3.tugraph-browser-quickstart-02.png) + ![alt help](https://tugraph-web-static.oss-cn-beijing.aliyuncs.com/%E6%96%87%E6%A1%A3/2.Operating/3.tugraph-browser-quickstart-02.png) - Then click "One-click Create Model" -- >" One-click Create Data "to complete the construction of the built-in Movie data graph @@ -123,7 +123,8 @@ When the user completes the installation of the graph database, you can access i - Select a local CSV file - Select the model for the corresponding node or edge - Do the data mapping - -Complete the data import +- Complete the data import +- The maximum supported size for a single file is 2GB. ##### 3.3.5 plug-in (Stored Procedure) diff --git a/docs/zh-CN/source/4.user-guide/2.tugraph-browser-legacy.md b/docs/zh-CN/source/4.user-guide/2.tugraph-browser-legacy.md index 2a422f569e..0815c0865d 100644 --- a/docs/zh-CN/source/4.user-guide/2.tugraph-browser-legacy.md +++ b/docs/zh-CN/source/4.user-guide/2.tugraph-browser-legacy.md @@ -124,6 +124,7 @@ TuGraph Browser 的主要功能是为使用图数据库的开发人员,提供 - 选择对应的节点或边的模型 - 进行数据映射 - 完成数据导入 + - 单个文件最大支持2GB ##### 3.3.5 插件