Skip to content

Commit

Permalink
fix: parhip format when using external memory mode
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielSeemaier committed May 27, 2024
1 parent 642a8c2 commit 19beb40
Showing 1 changed file with 36 additions and 29 deletions.
65 changes: 36 additions & 29 deletions kagen/external_memory_facade.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@

namespace kagen {
namespace {
void FilterMultiEdges(Edgelist& edges) {
std::sort(edges.begin(), edges.end());
auto it = std::unique(edges.begin(), edges.end());
edges.erase(it, edges.end());
}

void AddReverseEdges(Edgelist& edges) {
for (auto& [from, to]: edges) {
if (from > to) {
Expand Down Expand Up @@ -73,8 +67,8 @@ void SwapoutGraphChunk(

std::partial_sum(index.begin(), index.end(), index.begin());

constexpr std::size_t edge_size = sizeof(typename Edgelist::value_type);
const std::string filename = BufferFilename(chunk, config);
const std::size_t edge_size = sizeof(typename Edgelist::value_type);
const std::string filename = BufferFilename(chunk, config);

std::ofstream out(filename, std::ios::binary | std::ios::trunc);
if (!out) {
Expand Down Expand Up @@ -185,7 +179,12 @@ Graph SwapinGraphChunk(
std::cout << "filtering ... " << std::flush;
}

FilterMultiEdges(edges);
// Code calling this function expects the edges to be sorted ...
std::sort(edges.begin(), edges.end());

// ... and to be a "valid" graph chunk, i.e., without duplicate edges
auto it = std::unique(edges.begin(), edges.end());
edges.erase(it, edges.end());
}

Graph graph;
Expand Down Expand Up @@ -399,38 +398,41 @@ void GenerateExternalMemoryToDisk(PGeneratorConfig config, MPI_Comm comm) {

bool continue_with_next_pass = true;
for (int pass = 0; continue_with_next_pass; ++pass) {
SInt offset_n = 0;
SInt offset_m = 0;
SInt last_round_offset_n = 0;
SInt last_round_offset_m = 0;

const PEID rounded_chunk_count = std::ceil(1.0 * config.external.num_chunks / size) * size;
for (PEID chunk = rank; chunk < rounded_chunk_count; chunk += size) {
if (chunk >= config.external.num_chunks) {
for (PEID pe = 0; pe < size; ++pe) {
MPI_Barrier(comm);
}
continue;
}

for (PEID chunk = rank; chunk < rounded_chunk_count; chunk += size) {
if (output_info) {
std::cout << "Writing " << out_config.filename << " (pass " << pass + 1 << ", chunk " << chunk + 1
<< "... of " << config.external.num_chunks << ") ... " << std::flush;
}

// @todo to determine whether we want to cache the aggregated bufers, we would have to know whether we
// need multiple IO passes or not -- extend IO interface to give this information?
Graph graph = SwapinGraphChunk(chunk, vertex_distribution, config, false, output_info);

if (output_info) {
std::cout << "writing(" << graph.edges.size() << "...) ... " << std::flush;
}
Graph graph = chunk < config.external.num_chunks
? SwapinGraphChunk(chunk, vertex_distribution, config, false, output_info)
: Graph{};

GraphInfo pass_info = global_info;
pass_info.local_n = graph.NumberOfLocalVertices();
pass_info.local_m = graph.NumberOfLocalEdges();
pass_info.offset_n = offset_n;
pass_info.offset_m = offset_m;
offset_n += pass_info.local_n;
offset_m += pass_info.local_m;
MPI_Exscan(&pass_info.local_n, &pass_info.offset_n, 1, KAGEN_MPI_SINT, MPI_SUM, comm);
MPI_Exscan(&pass_info.local_m, &pass_info.offset_m, 1, KAGEN_MPI_SINT, MPI_SUM, comm);
pass_info.offset_n += last_round_offset_n;
pass_info.offset_m += last_round_offset_m;

SInt this_round_n = 0;
SInt this_round_m = 0;
MPI_Allreduce(&pass_info.local_n, &this_round_n, 1, KAGEN_MPI_SINT, MPI_SUM, comm);
MPI_Allreduce(&pass_info.local_m, &this_round_m, 1, KAGEN_MPI_SINT, MPI_SUM, comm);
last_round_offset_n += this_round_n;
last_round_offset_m += this_round_m;

if (output_info) {
std::cout << "writing(" << this_round_m << "...) ... " << std::flush;
}

for (PEID pe = 0; pe < size; ++pe) {
MPI_Barrier(comm);
Expand All @@ -439,14 +441,19 @@ void GenerateExternalMemoryToDisk(PGeneratorConfig config, MPI_Comm comm) {
}

continue_with_next_pass =
factory->CreateWriter(out_config, graph, pass_info, chunk, config.external.num_chunks)
->Write(pass, out_config.filename);
chunk < config.external.num_chunks
&& factory->CreateWriter(out_config, graph, pass_info, chunk, config.external.num_chunks)
->Write(pass, out_config.filename);
}

if (output_info) {
std::cout << "OK" << std::endl;
}
}

// If not all PEs participate in the last round, we need to make sure that they all know whether we are done
// or not
MPI_Allreduce(MPI_IN_PLACE, &continue_with_next_pass, 1, MPI_C_BOOL, MPI_LOR, comm);
}

if (output_info) {
Expand Down

0 comments on commit 19beb40

Please sign in to comment.