Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

关系边写入报No vertex found with specified index value #404

Closed
yzjzgr opened this issue Jan 29, 2024 · 2 comments
Closed

关系边写入报No vertex found with specified index value #404

yzjzgr opened this issue Jan 29, 2024 · 2 comments
Assignees

Comments

@yzjzgr
Copy link

yzjzgr commented Jan 29, 2024

Environment:

  • OS: [centos 7]
  • CPU: [Intel(R) Xeon(R) Platinum 8269CY CPU @ 2.50GHz 8 cores]
  • MEM: 32G
  • TuGraph-DB Version [e.g. 4.1.0]
  • standalone

Describe the bug

在1.5亿顶点3亿关系边图,通过procedure新增顶点和关系边报No vertex found with specified index value

How to reproduce and expected behavior
步骤:

  1. 通过离线导入工具,导入1.5亿顶点、3亿关系边
  2. 启动单节点tugraph
  3. 在图中添加增加图顶点和关系边的存储过程
  4. 调存储过程向图新增顶点和关系边

结果:
写入图新增顶点和关系边过程中,可能会报:No vertex found with specified index value

图schema:
{
"schema": [
{
"label": "entity",
"primary":"primary_key",
"type": "VERTEX",
"properties": [
{ "name": "primary_key", "type": "STRING" },
{ "name": "entity_type", "type": "STRING" },
{ "name": "entity_properties", "type": "STRING","optional": true }
]
},
{
"label": "relation",
"type": "EDGE",
"properties": [
{ "name": "relation_type", "type": "STRING", "optional": false },
{ "name": "relation_unique_id", "type": "STRING", "optional": false, "index":true,"unique":true },
{ "name": "relation_properties", "type": "STRING", "optional": true }
],
"constraints": [["entity", "entity"]]
}
]
}

mock1.5亿顶点、3亿关系边python脚本:
import json

test_dir = '/data/data/server/tugraph/tmp/'
total_count = 150000000

def write_entity():
entity_dir = test_dir + "entity/"
entity_file = open(entity_dir + "entity_0.txt", 'w')
for seq in range(0, total_count):
if seq % 10000000 == 0 and seq != 0:
entity_file.close()
file_name = entity_dir + "entity_" + str(seq / 10000000) + ".txt"
entity_file = open(file_name, 'w')
entity_file.write(str(seq))
entity_file.write(chr(1))
entity_file.write('mac')
entity_file.write(chr(1))
the_dict = {'name': 'name' + str(seq)}
entity_file.write(json.dumps(the_dict))
entity_file.write('\n')
if not entity_file.closed:
entity_file.close()

def write_one_relation(relation_file, seq_start: int, seq_end: int, relation_type: str):
relation_file.write(str(seq_start))
relation_file.write(chr(1))
relation_file.write(str(seq_end))
relation_file.write(chr(1))
relation_file.write(relation_type)
relation_file.write(chr(1))
relation_file.write(relation_type + str(seq_start))
relation_file.write(chr(1))
the_dict = {'amount': 'amount' + str(seq_start)}
relation_file.write(json.dumps(the_dict))
relation_file.write('\n')

def write_relation():
relation_dir = test_dir + "relation/"
relation_file = open(relation_dir + "relation_0.txt", 'w')
for seq in range(2, total_count):
if seq % 10000000 == 0 and seq != 0:
relation_file.close()
file_name = relation_dir + "relation_" + str(seq / 10000000) + ".txt"
relation_file = open(file_name, 'w')
write_one_relation(relation_file, seq - 2, seq, 'transfer')
write_one_relation(relation_file, seq - 1, seq, 'pay')
if not relation_file.closed:
relation_file.close()

if name == "main":
write_entity()
write_relation()

数据新增存储过程:
//
// Created by admin on 2024/1/25.
//

#include
#include
#include "lgraph/lgraph.h"
#include "json.hpp"
using namespace lgraph_api;
using namespace nlohmann;
using namespace std;

extern "C" LGAPI void upsertEntities(Transaction &txn, json &entities) {
for (const json& entity:entities) {
string primary_key = entity["primary_key"].get();
string entity_type = entity["entity_type"].get();
string entity_properties;
if(entity.contains("entity_properties")&& !entity["entity_properties"].is_null()) {
entity_properties = entity["entity_properties"].get();
}
vector field_names;
vector field_value_strings;
field_names.emplace_back("entity_type");
field_value_strings.push_back(entity_type);
if (!entity_properties.empty()) {
field_names.emplace_back("entity_properties");
field_value_strings.push_back(entity_properties);
}
try {
VertexIterator vertexIterator = txn.GetVertexByUniqueIndex("entity", "primary_key", primary_key);
//顶点存在则更新,不存在则异常
vertexIterator.SetFields(field_names, field_value_strings);
}catch (exception & e){
field_names.emplace_back("primary_key");
field_value_strings.push_back(primary_key);
txn.AddVertex("entity", field_names, field_value_strings);
}
}
}

extern "C" LGAPI void upsertRelations(Transaction &txn, json &relations) {
for (const json& relation: relations) {
string start = relation["start"].get();
string end = relation["end"].get();
string relation_type = relation["relation_type"].get();
string relation_unique_id = relation["relation_unique_id"].get();
string relation_properties;
if(relation.contains("relation_properties")&&!relation["relation_properties"].is_null()) {
relation_properties = relation["relation_properties"].get();
}
vector field_names;
vector field_value_strings;
field_names.emplace_back("relation_type");
field_value_strings.push_back(relation_type);
if (!relation_properties.empty()) {
field_names.emplace_back("relation_properties");
field_value_strings.push_back(relation_properties);
}
try {
OutEdgeIterator outEdgeIterator = txn.GetEdgeByUniqueIndex("relation", "relation_unique_id", relation_unique_id);
//关系边存在则更新,不存在则异常
outEdgeIterator.SetFields(field_names, field_value_strings);
}catch (exception &e){
//关系边不存在
try {
VertexIterator startVertexIterator = txn.GetVertexByUniqueIndex("entity", "primary_key", start);
VertexIterator endVertexIterator = txn.GetVertexByUniqueIndex("entity","primary_key",end);
field_names.emplace_back("relation_unique_id");
field_value_strings.push_back(relation_unique_id);
txn.AddEdge(startVertexIterator.GetId(), endVertexIterator.GetId(), "relation",field_names, field_value_strings);
}catch (exception &startE) {
string msg="no start or end entity, start primary_key:";
msg.append(start);
msg.append(", end primary_key: ");
msg.append(end);
msg.append(", e:");
msg.append(startE.what());
throw runtime_error(msg);
}

        //找起始顶点
        //找结束顶点
    }
}

}

extern "C" LGAPI bool Process(GraphDB& db, const std::string& request, std::string& response) {
Transaction txn = db.CreateWriteTxn();
json input;
try {
input = json::parse(request);
} catch (std::exception & e) {
throw std::runtime_error("json parse error");
}
json entities = input["entities"];
json relations = input["relations"];
if (!entities.is_null() && entities.size()>0) {
upsertEntities(txn, entities);
}
if (!relations.is_null() && relations.size()>0) {
upsertRelations(txn, relations);
}
txn.Commit();
return true;
}

调存储过程新增数据java代码:
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.antgroup.tugraph.TuGraphDbRpcClient;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {
private static String UPSERT_GRAPH_PROCEDURE = "upsert_graph";
private static String GRAPH_NAME = "yzjtest10mExt";
private static String CPP = "CPP";
private static double PROCEDURE_TIME_OUT = 30000;

private static String PRIMARY_KEY = "primary_key";
private static String ENTITY_TYPE = "entity_type";
private static String ENTITY_PROPERTIES = "entity_properties";

private static String START = "start";
private static String END = "end";
private static String RELATION_TYPE = "relation_type";
private static String RELATION_UNIQUE_ID = "relation_unique_id";
private static String RELATION_PROPERTIES = "relation_properties";

public static void main(String[] args) throws Exception{
    ExecutorService executorService = new ThreadPoolExecutor(5,5, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));
    List<String> urls = new ArrayList<>();
    urls.add("10.101.1.105:9081");
    //urls.add("10.101.1.105:9082");
    //urls.add("10.101.1.105:9083");
    //TuGraphDbRpcClient client = new TuGraphDbRpcClient(urls, "admin", "73@TuGraph");
    TuGraphDbRpcClient client = new TuGraphDbRpcClient("10.101.1.105:9082", "admin", "73@TuGraph");
    long totalStart = System.currentTimeMillis();
    for (int i=0;i<400;i++) {
        int finalI = i;
        Random random = new Random();
        Thread.sleep(random.nextInt(1500));
        executorService.submit(()->{
            String param = constructTestData(finalI, 5000);
            long start = System.currentTimeMillis();
            String result = null;
            try {
                result = client.callProcedureToLeader(CPP, UPSERT_GRAPH_PROCEDURE, param, PROCEDURE_TIME_OUT, false, GRAPH_NAME, true);
            } catch (Exception e) {
                System.err.println("batch:"+finalI+",e.msg:"+e.getMessage());
                System.err.println("batch:"+finalI+"e.stack:"+e.getStackTrace());
                throw new RuntimeException(e);
            }
            long cost = (System.currentTimeMillis() - start)/1000;
            System.out.println("batch:" + finalI +", cost: " + cost + ",result: " + result);
        });

    }
    System.out.println("start shutdown");
    executorService.shutdown();
    executorService.awaitTermination(10000, TimeUnit.SECONDS);
    System.out.println("finish shutdown, cost:" + (System.currentTimeMillis()-totalStart)/1000);

}

private static String constructTestData( int seq, int batchSize) {
    JSONObject jsonObject = new JSONObject();
    JSONArray entities = new JSONArray();
    JSONArray relations = new JSONArray();
    for (int i=seq*batchSize;i<(seq+1)*batchSize;i++) {
        entities.add(constructEntity(i));
        if (i>1) {
            relations.add(constructRelation(i));
        }
    }
    jsonObject.put("entities", entities);
    jsonObject.put("relations", relations);
    return jsonObject.toJSONString();
}

private static JSONObject constructEntity(int i) {
    JSONObject jsonObject = new JSONObject();
    jsonObject.put(PRIMARY_KEY, String.format("primary_key_%d", i));
    jsonObject.put(ENTITY_TYPE, String.format("entity_type_%d", i));
    jsonObject.put(ENTITY_PROPERTIES, String.format("entity_properties_x_%d", i));
    return jsonObject;
}

private static JSONObject constructRelation(int i) {
    JSONObject jsonObject = new JSONObject();
    jsonObject.put(START, String.format("primary_key_%d", i/10000));
    jsonObject.put(END, String.format("primary_key_%d", i-1));

    jsonObject.put(RELATION_TYPE, String.format("relation_type_%d", i));
    jsonObject.put(RELATION_PROPERTIES, String.format("relation_properties_yzj_%d", i));
    jsonObject.put(RELATION_UNIQUE_ID, String.format("relation_unique_id_xxs_%d", i));
    return jsonObject;
}

}

@yzjzgr
Copy link
Author

yzjzgr commented Feb 1, 2024

先离线导入1.5亿顶点和3亿关系边。验证了先将顶点调存储过程写完,再调存储过程写关系边,关系边可以正常写入。
在同一个存储过程事务中写顶点和关系边会出问题
存储过程代码:
//
// Created by admin on 2024/1/25.
//

#include
#include
#include "lgraph/lgraph.h"
#include "json.hpp"
using namespace lgraph_api;
using namespace nlohmann;
using namespace std;

extern "C" LGAPI void upsertEntities(Transaction &txn, json &entities) {
for (const json& entity:entities) {
string primary_key = entity["primary_key"].get();
string entity_type = entity["entity_type"].get();
string entity_properties;
if(entity.contains("entity_properties")&& !entity["entity_properties"].is_null()) {
entity_properties = entity["entity_properties"].get();
}
vector field_names;
vector field_value_strings;
field_names.emplace_back("entity_type");
field_value_strings.push_back(entity_type);
if (!entity_properties.empty()) {
field_names.emplace_back("entity_properties");
field_value_strings.push_back(entity_properties);
}
try {
VertexIterator vertexIterator = txn.GetVertexByUniqueIndex("entity", "primary_key", primary_key);
//顶点存在则更新,不存在则异常
vertexIterator.SetFields(field_names, field_value_strings);
}catch (exception & e){
field_names.emplace_back("primary_key");
field_value_strings.push_back(primary_key);
txn.AddVertex("entity", field_names, field_value_strings);
}
}
}

extern "C" LGAPI void upsertRelations(Transaction &txn, json &relations) {
for (const json& relation: relations) {
string start = relation["start"].get();
string end = relation["end"].get();
string relation_type = relation["relation_type"].get();
string relation_unique_id = relation["relation_unique_id"].get();
string relation_properties;
if(relation.contains("relation_properties")&&!relation["relation_properties"].is_null()) {
relation_properties = relation["relation_properties"].get();
}
vector field_names;
vector field_value_strings;
field_names.emplace_back("relation_type");
field_value_strings.push_back(relation_type);
if (!relation_properties.empty()) {
field_names.emplace_back("relation_properties");
field_value_strings.push_back(relation_properties);
}

// EdgeIndexIterator edgeIndexIterator = txn.GetEdgeIndexIterator("relation", "relation_unique_id", relation_unique_id,relation_unique_id);
// if (edgeIndexIterator.IsValid()) {
// EdgeUid edgeUid = edgeIndexIterator.GetUid();
// OutEdgeIterator outEdgeIterator = txn.GetOutEdgeIterator(edgeUid);
// outEdgeIterator.SetFields(field_names, field_value_strings);
//} else {
try {
VertexIterator startVertexIterator = txn.GetVertexByUniqueIndex("entity", "primary_key", start);
VertexIterator endVertexIterator = txn.GetVertexByUniqueIndex("entity","primary_key",end);
field_names.emplace_back("relation_unique_id");
field_value_strings.push_back(relation_unique_id);
txn.AddEdge(startVertexIterator.GetId(), endVertexIterator.GetId(), "relation",field_names, field_value_strings);
}catch (exception &startE) {
string msg="no start or end entity, start primary_key:";
msg.append(start);
msg.append(", end primary_key: ");
msg.append(end);
msg.append(", e:");
msg.append(startE.what());
throw runtime_error(msg);
}
//}
}
}

extern "C" LGAPI bool Process(GraphDB& db, const std::string& request, std::string& response) {
Transaction txn = db.CreateWriteTxn();
json input;
try {
input = json::parse(request);
} catch (std::exception & e) {
throw std::runtime_error("json parse error");
}
json entities = input["entities"];
json relations = input["relations"];
if (!entities.is_null() && entities.size()>0) {
upsertEntities(txn, entities);
}
if (!relations.is_null() && relations.size()>0) {
upsertRelations(txn, relations);
}
txn.Commit();
return true;
}

调存储过程写数据代码:
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.antgroup.tugraph.TuGraphDbRpcClient;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {
private static String UPSERT_GRAPH_PROCEDURE = "upsert_graph";
private static String GRAPH_NAME = "yzjtest150m";
private static String CPP = "CPP";
private static double PROCEDURE_TIME_OUT = 30000;

private static String PRIMARY_KEY = "primary_key";
private static String ENTITY_TYPE = "entity_type";
private static String ENTITY_PROPERTIES = "entity_properties";

private static String START = "start";
private static String END = "end";
private static String RELATION_TYPE = "relation_type";
private static String RELATION_UNIQUE_ID = "relation_unique_id";
private static String RELATION_PROPERTIES = "relation_properties";

private static String STANDALONE_URL = "10.101.1.105:9081";

public static void main(String[] args) throws Exception{
    upsertEntities();
    upsertRelations();
}


private static void upsertEntities() throws Exception{
    ExecutorService executorService = new ThreadPoolExecutor(5,5, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));
    List<String> urls = new ArrayList<>();
    urls.add("10.101.1.105:9081");
    //urls.add("10.101.1.105:9082");
    //urls.add("10.101.1.105:9083");
    //TuGraphDbRpcClient client = new TuGraphDbRpcClient(urls, "admin", "73@TuGraph");
    TuGraphDbRpcClient client = new TuGraphDbRpcClient(STANDALONE_URL, "admin", "73@TuGraph");
    long totalStart = System.currentTimeMillis();
    for (int i=0;i<400;i++) {
        int finalI = i;
        Random random = new Random();
        Thread.sleep(random.nextInt(1500));
        executorService.submit(()->{
            String param = constructEntityData(finalI, 5000);
            long start = System.currentTimeMillis();
            String result = null;
            try {
                result = client.callProcedureToLeader(CPP, UPSERT_GRAPH_PROCEDURE, param, PROCEDURE_TIME_OUT, false, GRAPH_NAME, true);
            } catch (Exception e) {
                System.err.println("batch:"+finalI+",e.msg:"+e.getMessage());
                System.err.println("batch:"+finalI+"e.stack:"+e.getStackTrace());
                throw new RuntimeException(e);
            }
            long cost = (System.currentTimeMillis() - start)/1000;
            System.out.println("batch:" + finalI +", cost: " + cost + ",result: " + result);
        });

    }
    System.out.println("start entity shutdown");
    executorService.shutdown();
    executorService.awaitTermination(10000, TimeUnit.SECONDS);
    System.out.println("finish entity shutdown, cost:" + (System.currentTimeMillis()-totalStart)/1000);
}


private static void upsertRelations() throws Exception{
    ExecutorService executorService = new ThreadPoolExecutor(5,5, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));
    List<String> urls = new ArrayList<>();
    urls.add("10.101.1.105:9081");
    //urls.add("10.101.1.105:9082");
    //urls.add("10.101.1.105:9083");
    //TuGraphDbRpcClient client = new TuGraphDbRpcClient(urls, "admin", "73@TuGraph");
    TuGraphDbRpcClient client = new TuGraphDbRpcClient(STANDALONE_URL, "admin", "73@TuGraph");
    long totalStart = System.currentTimeMillis();
    for (int i=0;i<400;i++) {
        int finalI = i;
        Random random = new Random();
        Thread.sleep(random.nextInt(1500));
        executorService.submit(()->{
            String param = constructRelationData(finalI, 5000);
            long start = System.currentTimeMillis();
            String result = null;
            try {
                result = client.callProcedureToLeader(CPP, UPSERT_GRAPH_PROCEDURE, param, PROCEDURE_TIME_OUT, false, GRAPH_NAME, true);
            } catch (Exception e) {
                System.err.println("batch:"+finalI+",e.msg:"+e.getMessage());
                System.err.println("batch:"+finalI+"e.stack:"+e.getStackTrace());
                throw new RuntimeException(e);
            }
            long cost = (System.currentTimeMillis() - start)/1000;
            System.out.println("batch:" + finalI +", cost: " + cost + ",result: " + result);
        });

    }
    System.out.println("start relation shutdown");
    executorService.shutdown();
    executorService.awaitTermination(10000, TimeUnit.SECONDS);
    System.out.println("finish relation shutdown, cost:" + (System.currentTimeMillis()-totalStart)/1000);
}


private static String constructEntityData(int seq, int batchSize) {
    JSONObject jsonObject = new JSONObject();
    JSONArray entities = new JSONArray();
    JSONArray relations = new JSONArray();
    for (int i=seq*batchSize;i<(seq+1)*batchSize;i++) {
        entities.add(constructEntity(i));
        if (i>1) {
            //relations.add(constructRelation(i));
        }
    }
    jsonObject.put("entities", entities);
    jsonObject.put("relations", relations);
    return jsonObject.toJSONString();
}

private static String constructRelationData(int seq, int batchSize) {
    JSONObject jsonObject = new JSONObject();
    JSONArray entities = new JSONArray();
    JSONArray relations = new JSONArray();
    for (int i=seq*batchSize;i<(seq+1)*batchSize;i++) {
        //entities.add(constructEntity(i));
        if (i>1) {
            relations.add(constructRelation(i));
        }
    }
    jsonObject.put("entities", entities);
    jsonObject.put("relations", relations);
    return jsonObject.toJSONString();
}

private static JSONObject constructEntity(int i) {
    JSONObject jsonObject = new JSONObject();
    jsonObject.put(PRIMARY_KEY, String.format("primary_key_%d", i));
    jsonObject.put(ENTITY_TYPE, String.format("entity_type_%d", i));
    jsonObject.put(ENTITY_PROPERTIES, String.format("entity_properties_x_%d", i));
    return jsonObject;
}

private static JSONObject constructRelation(int i) {
    JSONObject jsonObject = new JSONObject();
    jsonObject.put(START, String.format("primary_key_%d", i/10000));
    jsonObject.put(END, String.format("primary_key_%d", i-1));

    jsonObject.put(RELATION_TYPE, String.format("relation_type_%d", i));
    jsonObject.put(RELATION_PROPERTIES, String.format("relation_properties_yzj_%d", i));
    jsonObject.put(RELATION_UNIQUE_ID, String.format("relation_unique_id_xxs_%d", i));
    return jsonObject;
}

}

@lipanpan03
Copy link
Collaborator

lipanpan03 commented Mar 14, 2024

已复现,并发调用java client插入点边的代码中存在问题,并发插入时无法保证边依赖的点已经插入,不是TuGraph transaction的问题,具体见调存储过程写数据java代码的第149行:
jsonObject.put(START, String.format("primary_key_%d", i/10000))
边的这个顶点是在其他线程中插入的,而线程池执行任务的顺序是随机的,即使插入i/10000这个点的任务先提交,也进行了sleep,但也不能保证该任务在当前任务执行时已经执行成功了。因此,需要将这几行代码改为

private static String constructTestData( int seq, int batchSize) {
        JSONObject jsonObject = new JSONObject();
        JSONArray entities = new JSONArray();
        JSONArray relations = new JSONArray();
        for (int i=seq*batchSize;i<(seq+1)*batchSize;i++) {
            entities.add(constructEntity(i));
            if (i > seq*batchSize) {
                relations.add(constructRelation(i));
            }
        }
        jsonObject.put("entities", entities);
        jsonObject.put("relations", relations);
        return jsonObject.toJSONString();
    }

    private static JSONObject constructEntity(int i) {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put(PRIMARY_KEY, String.format("primary_key_%d", i));
        jsonObject.put(ENTITY_TYPE, String.format("entity_type_%d", i));
        jsonObject.put(ENTITY_PROPERTIES, String.format("entity_properties_x_%d", i));
        return jsonObject;
    }

    private static JSONObject constructRelation(int i) {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put(START, String.format("primary_key_%d", i-1));
        jsonObject.put(END, String.format("primary_key_%d", i));

        jsonObject.put(RELATION_TYPE, String.format("relation_type_%d", i));
        jsonObject.put(RELATION_PROPERTIES, String.format("relation_properties_yzj_%d", i));
        jsonObject.put(RELATION_UNIQUE_ID, String.format("relation_unique_id_xxs_%d", i));
        return jsonObject;
    }

从而保证当前任务插入边所依赖的点必然已经插入了,因为是在当前任务中插入的,procedure中又是先插点后插边

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants