diff --git a/kagen/external_memory_facade.cpp b/kagen/external_memory_facade.cpp index 15aaaf2..aa41b53 100644 --- a/kagen/external_memory_facade.cpp +++ b/kagen/external_memory_facade.cpp @@ -11,12 +11,6 @@ namespace kagen { namespace { -void FilterMultiEdges(Edgelist& edges) { - std::sort(edges.begin(), edges.end()); - auto it = std::unique(edges.begin(), edges.end()); - edges.erase(it, edges.end()); -} - void AddReverseEdges(Edgelist& edges) { for (auto& [from, to]: edges) { if (from > to) { @@ -73,8 +67,8 @@ void SwapoutGraphChunk( std::partial_sum(index.begin(), index.end(), index.begin()); - constexpr std::size_t edge_size = sizeof(typename Edgelist::value_type); - const std::string filename = BufferFilename(chunk, config); + const std::size_t edge_size = sizeof(typename Edgelist::value_type); + const std::string filename = BufferFilename(chunk, config); std::ofstream out(filename, std::ios::binary | std::ios::trunc); if (!out) { @@ -185,7 +179,12 @@ Graph SwapinGraphChunk( std::cout << "filtering ... " << std::flush; } - FilterMultiEdges(edges); + // Code calling this function expects the edges to be sorted ... + std::sort(edges.begin(), edges.end()); + + // ... and to be a "valid" graph chunk, i.e., without duplicate edges + auto it = std::unique(edges.begin(), edges.end()); + edges.erase(it, edges.end()); } Graph graph; @@ -399,18 +398,12 @@ void GenerateExternalMemoryToDisk(PGeneratorConfig config, MPI_Comm comm) { bool continue_with_next_pass = true; for (int pass = 0; continue_with_next_pass; ++pass) { - SInt offset_n = 0; - SInt offset_m = 0; + SInt last_round_offset_n = 0; + SInt last_round_offset_m = 0; const PEID rounded_chunk_count = std::ceil(1.0 * config.external.num_chunks / size) * size; - for (PEID chunk = rank; chunk < rounded_chunk_count; chunk += size) { - if (chunk >= config.external.num_chunks) { - for (PEID pe = 0; pe < size; ++pe) { - MPI_Barrier(comm); - } - continue; - } + for (PEID chunk = rank; chunk < rounded_chunk_count; chunk += size) { if (output_info) { std::cout << "Writing " << out_config.filename << " (pass " << pass + 1 << ", chunk " << chunk + 1 << "... of " << config.external.num_chunks << ") ... " << std::flush; @@ -418,19 +411,28 @@ void GenerateExternalMemoryToDisk(PGeneratorConfig config, MPI_Comm comm) { // @todo to determine whether we want to cache the aggregated bufers, we would have to know whether we // need multiple IO passes or not -- extend IO interface to give this information? - Graph graph = SwapinGraphChunk(chunk, vertex_distribution, config, false, output_info); - - if (output_info) { - std::cout << "writing(" << graph.edges.size() << "...) ... " << std::flush; - } + Graph graph = chunk < config.external.num_chunks + ? SwapinGraphChunk(chunk, vertex_distribution, config, false, output_info) + : Graph{}; GraphInfo pass_info = global_info; pass_info.local_n = graph.NumberOfLocalVertices(); pass_info.local_m = graph.NumberOfLocalEdges(); - pass_info.offset_n = offset_n; - pass_info.offset_m = offset_m; - offset_n += pass_info.local_n; - offset_m += pass_info.local_m; + MPI_Exscan(&pass_info.local_n, &pass_info.offset_n, 1, KAGEN_MPI_SINT, MPI_SUM, comm); + MPI_Exscan(&pass_info.local_m, &pass_info.offset_m, 1, KAGEN_MPI_SINT, MPI_SUM, comm); + pass_info.offset_n += last_round_offset_n; + pass_info.offset_m += last_round_offset_m; + + SInt this_round_n = 0; + SInt this_round_m = 0; + MPI_Allreduce(&pass_info.local_n, &this_round_n, 1, KAGEN_MPI_SINT, MPI_SUM, comm); + MPI_Allreduce(&pass_info.local_m, &this_round_m, 1, KAGEN_MPI_SINT, MPI_SUM, comm); + last_round_offset_n += this_round_n; + last_round_offset_m += this_round_m; + + if (output_info) { + std::cout << "writing(" << this_round_m << "...) ... " << std::flush; + } for (PEID pe = 0; pe < size; ++pe) { MPI_Barrier(comm); @@ -439,14 +441,19 @@ void GenerateExternalMemoryToDisk(PGeneratorConfig config, MPI_Comm comm) { } continue_with_next_pass = - factory->CreateWriter(out_config, graph, pass_info, chunk, config.external.num_chunks) - ->Write(pass, out_config.filename); + chunk < config.external.num_chunks + && factory->CreateWriter(out_config, graph, pass_info, chunk, config.external.num_chunks) + ->Write(pass, out_config.filename); } if (output_info) { std::cout << "OK" << std::endl; } } + + // If not all PEs participate in the last round, we need to make sure that they all know whether we are done + // or not + MPI_Allreduce(MPI_IN_PLACE, &continue_with_next_pass, 1, MPI_C_BOOL, MPI_LOR, comm); } if (output_info) {