Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
ljcui committed Mar 15, 2024
1 parent 873a9b8 commit e96f6e2
Show file tree
Hide file tree
Showing 7 changed files with 667 additions and 5 deletions.
261 changes: 261 additions & 0 deletions osgraph/api.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
#include "tools/lgraph_log.h"
#include "core/data_type.h"
namespace lgraph {

lgraph_api::FieldData JsonToFieldData(const nlohmann::json& j_object) {
if (j_object.is_number_integer()) {
return lgraph_api::FieldData(j_object.get<int64_t>());
} else if (j_object.is_string()) {
return lgraph_api::FieldData(j_object.get<std::string>());
} else if (j_object.is_number_float()) {
return lgraph_api::FieldData(j_object.get<float>());
} else if (j_object.is_boolean()) {
return lgraph_api::FieldData(j_object.get<bool>());
}
throw std::runtime_error("unexpected json type, obj: " + j_object.dump());
};

struct VertexLine {
size_t vertexLabel;
size_t primaryFieldId;
lgraph_api::FieldData primaryFieldValue;
std::vector<size_t> field_ids;
std::vector<lgraph_api::FieldData> field_values;
};

bool update_vertex(lgraph_api::GraphDB &db, const std::string &request, std::string &response) {
using namespace std;
std::string vertex_type;
try {
int add_nodes = 0;
int update_nodes = 0;
int filter_nodes = 0;
nlohmann::json input = nlohmann::json::parse(request);
vertex_type = input["type"].get<string>();
string primary_field = input["primary"].get<string>();
auto vertexes_count = input["vertexes"].size();
if (vertexes_count == 0) {
nlohmann::json output;
output["ok"] = true;
output["response"]= "No vertexes to update.";
response = output.dump();
return true;
}
std::vector<VertexLine> lines;
{
auto txn = db.CreateReadTxn();
auto vertexLabel = txn.GetVertexLabelId(vertex_type);
auto vertexPrimary = txn.GetVertexFieldId(vertexLabel, primary_field);
for (auto& vertex : input["vertexes"]) {
try {
auto id = JsonToFieldData(vertex[primary_field]);
vector<string> field_names;
vector<lgraph_api::FieldData> field_values;
for (auto& el : vertex.items()) {
if (el.key() == primary_field) {
continue;
}
field_names.push_back(el.key());
field_values.push_back(JsonToFieldData(el.value()));
}
auto field_ids = txn.GetVertexFieldIds(vertexLabel, field_names);
VertexLine line;
line.vertexLabel = vertexLabel;
line.primaryFieldId = vertexPrimary;
line.primaryFieldValue = id;
line.field_ids = std::move(field_ids);
line.field_values = std::move(field_values);
lines.emplace_back(std::move(line));
} catch (const exception &e) {
continue;
}
}
txn.Abort();
}

if (!lines.empty()) {
auto txn = db.CreateWriteTxn();
for (auto& line : lines) {
try {
// try to find the vertex and update the property
auto vit = txn.GetVertexByUniqueIndex(line.vertexLabel, line.primaryFieldId, line.primaryFieldValue);
if (!line.field_ids.empty()) {
bool need_update = false;
auto fields = vit.GetFields(line.field_ids);
for (size_t i = 0; i < fields.size(); i++) {
if (fields[i] != line.field_values[i]) {
need_update = true;
break;
}
}
if (need_update) {
vit.SetFields(line.field_ids, line.field_values);
update_nodes++;
} else {
filter_nodes++;
}
} else {
filter_nodes++;
}
} catch (const runtime_error& re) {
// not found, insert
line.field_ids.push_back(line.primaryFieldId);
line.field_values.push_back(line.primaryFieldValue);
try {
txn.AddVertex(line.vertexLabel, line.field_ids, line.field_values);
} catch (const exception& e) {
LOG_WARN() << "AddVertex, type: " << vertex_type << ", e: " << e.what() << ", re: " << re.what();
throw e;
}
add_nodes++;
} catch (const std::exception& e) {
LOG_WARN() << "GetVertexByUniqueIndex, type: " << vertex_type << ", e: " << e.what();
}
}
txn.Commit();
}

nlohmann::json output;
output["ok"] = true;
output["response"]["type"] = vertex_type;
output["response"]["add_nodes"] = add_nodes;
output["response"]["update_nodes"] = update_nodes;
output["response"]["filter_nodes"] = filter_nodes;
response = output.dump();
return true;
} catch (const exception &e) {
auto err = e.what();
LOG_WARN() << vertex_type << " update_nodes, exception: " << err;
nlohmann::json output;
output["ok"] = false;
output["response"] = std::string("Error on vertex ") + vertex_type + (" processing: ") + err;
response = output.dump();
return false;
}
}

struct LineEdge {
int64_t fromVertexId;
int64_t toVertexId;
size_t edgeLabelId;
std::vector<size_t> fieldIds;
std::vector<FieldData> fieldValues;
};

bool update_edge(lgraph_api::GraphDB &db, const std::string &request, std::string &response) {
using namespace std;
std::string label;
try {
int updateEdges = 0;
int insertEdges = 0;
int filterEdges = 0;
nlohmann::json input = nlohmann::json::parse(request);
label = input["type"].get<string>();
bool repeatable = input["repeatable"].get<bool>();
string from_label = input["from_label"].get<string>();
string to_label = input["to_label"].get<string>();

auto edgesCount = input["edges"].size();
if (edgesCount == 0) {
nlohmann::json output;
output["ok"] = true;
output["response"]= "No edges to update.";
response = output.dump();
return true;
}
std::vector<LineEdge> lines;
{
auto txn = db.CreateReadTxn();
auto labelId = txn.GetEdgeLabelId(label);
for (auto& edge : input["edges"]) {
try {
FieldData fromId = JsonToFieldData(edge["from"]);
auto v1iter = txn.GetVertexByUniqueIndex(
from_label, txn.GetVertexPrimaryField(from_label), fromId);
auto fromVertexId = v1iter.GetId();

FieldData toId = JsonToFieldData(edge["to"]);
auto v2iter = txn.GetVertexByUniqueIndex(
to_label, txn.GetVertexPrimaryField(to_label), toId);
auto toVertexId = v2iter.GetId();

vector<string> fieldNames;
vector<FieldData> fieldValues;
for (auto& el : edge["property"].items()) {
fieldNames.push_back(el.key());
fieldValues.push_back(JsonToFieldData(el.value()));
}
auto fieldIds = txn.GetEdgeFieldIds(labelId, fieldNames);
LineEdge line;
line.edgeLabelId = labelId;
line.fromVertexId = fromVertexId;
line.toVertexId = toVertexId;
line.fieldIds = std::move(fieldIds);
line.fieldValues = std::move(fieldValues);
lines.emplace_back(std::move(line));
} catch (const exception &e) {
continue;
}
}
txn.Abort();
}

if (!lines.empty()) {
auto txn = db.CreateWriteTxn();
for (auto& line : lines) {
if (!repeatable) {
auto iter = txn.GetOutEdgeIterator(
{line.fromVertexId, line.toVertexId, (LabelId)line.edgeLabelId, 0, 0});
if (iter.IsValid()) {
if (!line.fieldIds.empty()) {
bool need_update = false;
auto fields = iter.GetFields(line.fieldIds);
for (size_t i = 0; i < fields.size(); i++) {
if (fields[i] != line.fieldValues[i]) {
need_update = true;
break;
}
}
if (need_update) {
iter.SetFields(line.fieldIds, line.fieldValues);
updateEdges++;
} else {
filterEdges++;
}
} else {
filterEdges++;
}
} else {
txn.AddEdge(line.fromVertexId, line.toVertexId, line.edgeLabelId,
line.fieldIds, line.fieldValues);
insertEdges++;
}
} else {
txn.AddEdge(line.fromVertexId, line.toVertexId, line.edgeLabelId,
line.fieldIds, line.fieldValues);
insertEdges++;
}
}
txn.Commit();
}

nlohmann::json output;
output["ok"] = true;
output["response"]["type"] = label;
output["response"]["add_edges"] = insertEdges;
output["response"]["filter_edges"] = filterEdges;
output["response"]["update_edges"] = updateEdges;
response = output.dump();
return true;
} catch (const exception &e) {
auto err = e.what();
LOG_WARN() << label << " update_edges, exception: " << err;
nlohmann::json output;
output["ok"] = false;
output["response"] = std::string("Error on edge ") + label + (" processing: ") + err;
response = output.dump();
return false;
}
}

}
Loading

0 comments on commit e96f6e2

Please sign in to comment.