Skip to content

Commit

Permalink
根据IO接口文档,更新部分算法
Browse files Browse the repository at this point in the history
  • Loading branch information
Clarkclk authored and tugraph committed Sep 5, 2024
1 parent e3b2dd2 commit c7cb577
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 210 deletions.
29 changes: 11 additions & 18 deletions include/lgraph/olap_on_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,6 @@ class OlapOnDB : public OlapBase<EdgeData> {
auto task_ctx = GetThreadContext();
auto worker = Worker::SharedWorker();


// Read from TuGraph
if ((flags_ & SNAPSHOT_PARALLEL) && txn_.IsReadOnly()) {
this->out_index_.Resize(this->num_vertices_ + 1, (size_t)0);
Expand Down Expand Up @@ -962,7 +961,7 @@ class OlapOnDB : public OlapBase<EdgeData> {
/**
* @brief Generate a graph with LightningGraph. For V1/V2 Procedures
*/
OlapOnDB(GraphDB* db, Transaction &txn, size_t flags = 0,
OlapOnDB(GraphDB *db, Transaction &txn, size_t flags = 0,
std::function<bool(VertexIterator &)> vertex_filter = nullptr,
std::function<bool(OutEdgeIterator &, EdgeData &)> out_edge_filter = nullptr)
: db_(db),
Expand Down Expand Up @@ -1490,23 +1489,17 @@ class OlapOnDB : public OlapBase<EdgeData> {
*
*/
template <typename VertexData>
void WriteToFile(ParallelVector<VertexData> &vertex_data, const std::string &output_file) {
FILE* fout = fopen(output_file.c_str(), "w");
if (fout == nullptr) {
THROW_CODE(InputError, "Unable to open file for writting!");
}
void WriteToFile(ParallelVector<VertexData> &vertex_data, const std::string &output_file,
std::function<bool(size_t vid, VertexData &vdata)> output_filter = nullptr) {
fma_common::OutputFmaStream fout;
fout.Open(output_file, 64 << 20);
for (size_t i = 0; i < this->num_vertices_; ++i) {
auto vit = txn_.GetVertexIterator(OriginalVid(i));
auto vit_label = vit.GetLabel();
auto primary_field = txn_.GetVertexPrimaryField(vit_label);
auto field_data = vit.GetField(primary_field);
json curJson;
curJson["vid"] = OriginalVid(i);
curJson["label"] = vit_label;
curJson["primary_field"] = primary_field;
curJson["field_data"] = field_data.ToString();
curJson["result"] = vertex_data[i];
fprintf(fout, "%s\n", curJson.dump().c_str());
if (output_filter != nullptr && !output_filter(i, vertex_data[i])) {
continue;
}
std::string line =
fma_common::StringFormatter::Format("{} {}\n", OriginalVid(i), vertex_data[i]);
fout.Write(line.c_str(), line.size());
}
}

Expand Down
9 changes: 6 additions & 3 deletions procedures/algo_cpp/algo.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
using namespace lgraph_api;
using namespace lgraph_api::olap;

const double SSSP_INIT_VALUE = 2e10;

/**
* @brief Compute the shortest path length between any two vertices in the graph.
*
Expand Down Expand Up @@ -156,8 +158,8 @@ size_t LocateCycleCore(OlapBase<Empty>& graph, size_t k);
*
* \return The value of modularity.
*/
double LPACore(OlapBase<Empty>& graph, ParallelVector<size_t>& label,
int num_iterations, bool sync_flag);
double LPACore(OlapBase<Empty>& graph, ParallelVector<size_t>& label, int num_iterations,
bool sync_flag);

/**
* @brief Compute the Maximal Independent Set Algorithm.
Expand Down Expand Up @@ -272,7 +274,8 @@ size_t KTrussCore(OlapBase<Empty>& graph, size_t value_k,
* @param[in] graph The graph to compute on.
* @param[in,out] label The ParallelVector to store label information.
* @param[in] active_threshold Threshold of active_vertices in original graph.
* @param[in] is_sync if to run louvain in sync mode. 0 means async mode, non-zero value means sync mode.
* @param[in] is_sync if to run louvain in sync mode. 0 means async mode, non-zero
* value means sync mode.
*
* @return return the modularity of graph.
*/
Expand Down
22 changes: 21 additions & 1 deletion procedures/algo_cpp/apsp_procedure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ extern "C" bool Process(GraphDB& db, const std::string& request, std::string& re

// prepare
start_time = get_time();
std::string output_file = "";
try {
json input = json::parse(request);
parse_from_json(output_file, "output_file", input);
} catch (std::exception& e) {
response = "json parse error: " + std::string(e.what());
std::cout << response << std::endl;
return false;
}

auto txn = db.CreateReadTxn();
OlapOnDB<double> olapondb(db, txn, SNAPSHOT_PARALLEL, nullptr, edge_convert_default<double>);
Expand All @@ -21,13 +30,23 @@ extern "C" bool Process(GraphDB& db, const std::string& request, std::string& re
// core
start_time = get_time();
std::tuple<size_t, size_t, double> max_tuple;
std::vector< std::tuple<size_t, size_t, double> > result;
std::vector<std::tuple<size_t, size_t, double> > result;
APSPCore(olapondb, result, max_tuple);
auto core_cost = get_time() - start_time;

// output
start_time = get_time();
// TODO(any): write distance back to graph
if (output_file != "") {
FILE* fout = fopen(output_file.c_str(), "w");
if (fout != NULL) {
for (auto ele : result) {
fprintf(fout, "%ld, %ld, %lf\n", std::get<0>(ele), std::get<1>(ele),
std::get<2>(ele));
}
}
fclose(fout);
}
auto output_cost = get_time() - start_time;

// return
Expand All @@ -38,6 +57,7 @@ extern "C" bool Process(GraphDB& db, const std::string& request, std::string& re
output["max_distance_val"] = std::get<2>(max_tuple);
output["num_vertices"] = olapondb.NumVertices();
output["num_edges"] = olapondb.NumEdges();
output["num_results"] = result.size();
output["prepare_cost"] = prepare_cost;
output["core_cost"] = core_cost;
output["output_cost"] = output_cost;
Expand Down
4 changes: 2 additions & 2 deletions procedures/algo_cpp/bc_procedure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ extern "C" bool Process(GraphDB& db, const std::string& request, std::string& re
start_time = get_time();
std::string output_file = "";
size_t samples = 10;
std::string output_file = "";
try {
json input = json::parse(request);
parse_from_json(samples, "samples", input);
Expand Down Expand Up @@ -54,9 +55,8 @@ extern "C" bool Process(GraphDB& db, const std::string& request, std::string& re
// output
start_time = get_time();
if (output_file != "") {
olapondb.WriteToFile<double>(score, output_file);
olapondb.WriteToFile(score, output_file);
}
// TODO(any): write score back to graph
auto output_cost = get_time() - start_time;

// return
Expand Down
84 changes: 10 additions & 74 deletions procedures/algo_cpp/bfs_procedure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,52 +20,21 @@ using namespace lgraph_api;
using namespace lgraph_api::olap;
using json = nlohmann::json;

/**
* Processes a request on a GraphDB.
*
* @param db The reference to the GraphDB object on which the request will be processed.
* @param request The input request in JSON format.
* The request should contain the following parameters:
* - "root_value": The value of the root vertex.
* - "root_label": The label of the root vertex.
* - "root_field": The field of the root vertex.
* - "vertex_label_filter": Filter vertex sets based on vertex labels
* - "edge_label_filter": Filter edge sets based on edge labels
* - "parent_id": Vertex field name to be written back into the database.
* - "output_file": Parent id to be written to the file.
* @param response The output response in JSON format.
* The response will contain the following parameters:
* - "found_vertices": The number of vertices found.
* - "num_vertices": The number of vertices in the graph.
* - "num_edges": The number of edges in the graph.
* - "prepare_cost": The time cost of preparing the graph data.
* - "core_cost": The time cost of the core algorithm.
* - "output_cost": The time cost of writing the result to a file.
* - "total_cost": The total time cost.
* @return True if the request is processed successfully, false otherwise.
*/

extern "C" bool Process(GraphDB& db, const std::string& request, std::string& response) {
auto start_time = get_time();

// prepare
start_time = get_time();
std::string root_value = "0";
std::string root_label = "";
std::string root_field = "";
std::string vertex_label_filter = "";
std::string edge_label_filter = "";
std::string parent_id = "";
std::string root_label = "node";
std::string root_field = "id";
std::string output_file = "";
std::cout << "Input: " << request << std::endl;
try {
json input = json::parse(request);
parse_from_json(root_value, "root_value", input);
parse_from_json(root_label, "root_label", input);
parse_from_json(root_field, "root_field", input);
parse_from_json(vertex_label_filter, "vertex_label_filter", input);
parse_from_json(edge_label_filter, "edge_label_filter", input);
parse_from_json(parent_id, "parent_id", input);
parse_from_json(output_file, "output_file", input);
} catch (std::exception& e) {
response = "json parse error: " + std::string(e.what());
Expand All @@ -74,60 +43,27 @@ extern "C" bool Process(GraphDB& db, const std::string& request, std::string& re
}

auto txn = db.CreateReadTxn();

std::function<bool(VertexIterator &)> vertex_filter = nullptr;
std::function<bool(OutEdgeIterator &, Empty &)> edge_filter = nullptr;

if (!vertex_label_filter.empty()) {
vertex_filter = [&vertex_label_filter](VertexIterator& vit) {
return vit.GetLabel() == vertex_label_filter;
};
}

if (!edge_label_filter.empty()) {
edge_filter = [&edge_label_filter](OutEdgeIterator& eit, Empty& edata) {
return eit.GetLabel() == edge_label_filter;
};
}

if (root_label.empty() || root_field.empty()) {
if (root_label.empty() && root_field.empty()) {
root_label = "node";
root_field = "id";
} else {
THROW_CODE(InputError, "root_label or root_field is empty");
}
}

OlapOnDB<Empty> olapondb(db, txn, SNAPSHOT_PARALLEL, vertex_filter, edge_filter);
int64_t root_vid = txn.GetVertexIndexIterator(root_label,
root_field, root_value, root_value).GetVid();
int64_t root_vid =
txn.GetVertexIndexIterator(root_label, root_field, root_value, root_value).GetVid();
OlapOnDB<Empty> olapondb(db, txn, SNAPSHOT_PARALLEL);
auto prepare_cost = get_time() - start_time;

// core
start_time = get_time();
ParallelVector<size_t> parent = olapondb.AllocVertexArray<size_t>();
size_t count = BFSCore(olapondb, olapondb.MappedVid(root_vid), parent);
printf("found_vertices = %ld\n", count);
auto core_cost = get_time() - start_time;

// output
start_time = get_time();
#pragma omp parallel for
for (size_t i = 0; i < parent.Size(); i++) {
if (parent[i] != (size_t)-1) {
parent[i] = olapondb.OriginalVid(parent[i]);
}
}
// TODO(any): write parent back to graph
if (output_file != "") {
olapondb.WriteToFile<size_t>(parent, output_file);
}
txn.Commit();

if (parent_id != "") {
olapondb.WriteToGraphDB<size_t>(parent, parent_id);
olapondb.WriteToFile<size_t>(parent, output_file, [&](size_t vid, size_t vdata) -> bool {
return vdata != (size_t)-1;
});
}

printf("found_vertices = %ld\n", count);
auto output_cost = get_time() - start_time;

// return
Expand Down
53 changes: 19 additions & 34 deletions procedures/algo_cpp/ji_procedure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ extern "C" bool Process(GraphDB& db, const std::string& request, std::string& re
std::string src_field = "id";
std::string dst_label = "node";
std::string dst_field = "id";
std::vector<std::pair<size_t, size_t>> search_list = {{0, 1}, {1, 972}, {101, 202}};
std::vector<std::pair<std::string, std::string>> search_list = {
{"0", "1"}, {"1", "972"}, {"101", "202"}};
auto txn = db.CreateReadTxn();
try {
json input = json::parse(request);
Expand All @@ -40,19 +41,13 @@ extern "C" bool Process(GraphDB& db, const std::string& request, std::string& re
parse_from_json(search_list, "search_pairs", input);
if (input["search_pairs"].is_array()) {
search_list.clear();
for (auto &e : input["search_pairs"]) {
int64_t src_id = e[0];
int64_t dst_id = e[1];
lgraph_api::FieldData src_field_data(src_id);
lgraph_api::FieldData dst_field_data(dst_id);
size_t src = txn.GetVertexIndexIterator(src_label,
src_field, src_field_data, src_field_data).GetVid();
size_t dst = txn.GetVertexIndexIterator(dst_label,
dst_field, dst_field_data, dst_field_data).GetVid();
search_list.push_back(std::make_pair(src, dst));
for (auto& e : input["search_pairs"]) {
std::string src_string = static_cast<std::string>(e[0]);
std::string dst_string = static_cast<std::string>(e[1]);
search_list.push_back(std::make_pair(src_string, dst_string));
}
}
} catch (std::exception &e) {
} catch (std::exception& e) {
response = "json parse error: " + std::string(e.what());
std::cout << response << std::endl;
return false;
Expand All @@ -63,30 +58,20 @@ extern "C" bool Process(GraphDB& db, const std::string& request, std::string& re

// core
start_time = get_time();
std::vector< std::tuple<size_t, std::string, std::string, std::string,
size_t, std::string, std::string, std::string, double> > result_list;
for (auto search_pair : search_list) {
double score = JiCore(olapondb, search_pair);
std::cout << score <<std::endl;
auto vit_first = txn.GetVertexIterator(search_pair.first, false);
auto vit_first_label = vit_first.GetLabel();
auto vit_first_primary_field = txn.GetVertexPrimaryField(vit_first_label);
auto vit_first_field_data = vit_first.GetField(vit_first_primary_field);
std::vector<std::tuple<std::string, std::string, double>> result_list;

auto vit_second = txn.GetVertexIterator(search_pair.second, false);
auto vit_second_label = vit_second.GetLabel();
auto vit_second_primary_field = txn.GetVertexPrimaryField(vit_second_label);
auto vit_second_field_data = vit_second.GetField(vit_second_primary_field);
result_list.push_back(std::make_tuple(search_pair.first,
vit_first_label,
vit_first_primary_field,
vit_first_field_data.ToString(),
search_pair.second,
vit_second_label,
vit_second_primary_field,
vit_second_field_data.ToString(),
score));
for (auto search_pair : search_list) {
auto src_string = search_pair.first;
auto dst_string = search_pair.second;
auto src_vid =
txn.GetVertexIndexIterator(src_label, src_field, src_string, src_string).GetVid();
auto dst_vid =
txn.GetVertexIndexIterator(dst_label, dst_field, dst_string, dst_string).GetVid();
auto id_pair = std::make_pair(src_vid, dst_vid);
double score = JiCore(olapondb, id_pair);
result_list.push_back(std::make_tuple(src_string, dst_string, score));
}

auto core_cost = get_time() - start_time;

// return
Expand Down
7 changes: 6 additions & 1 deletion procedures/algo_cpp/leiden_procedure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ extern "C" bool Process(GraphDB& db, const std::string& request, std::string& re
unsigned random_seed = 0;
size_t threshold = 0;
std::string weight = "";
std::string output_file = "";
std::cout << "Input: " << request << std::endl;
try {
json input = json::parse(request);
Expand All @@ -40,6 +41,7 @@ extern "C" bool Process(GraphDB& db, const std::string& request, std::string& re
parse_from_json(random_seed, "random_seed", input);
parse_from_json(weight, "weight", input);
parse_from_json(threshold, "threshold", input);
parse_from_json(output_file, "output_file", input);
} catch (std::exception& e) {
response = "json parse error: " + std::string(e.what());
std::cout << response << std::endl;
Expand All @@ -65,11 +67,14 @@ extern "C" bool Process(GraphDB& db, const std::string& request, std::string& re
start_time = get_time();
ParallelVector<size_t> label = olapondb.AllocVertexArray<size_t>();
LeidenCore(olapondb, label, random_seed, theta, gamma, threshold);
printf("label.size=%lu\n", label.Size());
auto core_cost = get_time() - start_time;

// output
start_time = get_time();
if (output_file != "") {
olapondb.WriteToFile<size_t>(label, output_file);
}

double output_cost = get_time() - start_time;

// return
Expand Down
4 changes: 2 additions & 2 deletions procedures/algo_cpp/sssp_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ using namespace lgraph_api::olap;
void SSSPCore(OlapBase<double>& graph, size_t root, ParallelVector<double>& distance) {
auto active_in = graph.AllocVertexSubset();
active_in.Add(root);
std::cout<< "root:" << root << std::endl;
std::cout << "root:" << root << std::endl;
auto active_out = graph.AllocVertexSubset();
distance.Fill((double)2e10);
distance.Fill(SSSP_INIT_VALUE);

distance[root] = 0.0;
size_t num_activations = 1;
Expand Down
Loading

0 comments on commit c7cb577

Please sign in to comment.