From 66087b2e2bd5283235f76007dc96d03070360844 Mon Sep 17 00:00:00 2001 From: Daniel Seemaier Date: Fri, 24 May 2024 17:18:01 +0200 Subject: [PATCH] refactor: remove debug code, rebrand streaming mode as external memory mode --- app/KaGen.cpp | 12 +- kagen/context.cpp | 8 +- kagen/context.h | 22 ++-- ..._facade.cpp => external_memory_facade.cpp} | 107 +++++------------- ...ming_facade.h => external_memory_facade.h} | 2 +- kagen/generators/barabassi/barabassi.cpp | 2 +- kagen/generators/hyperbolic/hyperbolic.cpp | 4 +- kagen/generators/kronecker/kronecker.cpp | 2 +- kagen/generators/rmat/rmat.cpp | 2 +- 9 files changed, 56 insertions(+), 105 deletions(-) rename kagen/{streaming_facade.cpp => external_memory_facade.cpp} (75%) rename kagen/{streaming_facade.h => external_memory_facade.h} (58%) diff --git a/app/KaGen.cpp b/app/KaGen.cpp index 2b4b777..84fb2e1 100644 --- a/app/KaGen.cpp +++ b/app/KaGen.cpp @@ -8,8 +8,8 @@ ******************************************************************************/ #include "kagen/context.h" #include "kagen/definitions.h" +#include "kagen/external_memory_facade.h" #include "kagen/in_memory_facade.h" -#include "kagen/streaming_facade.h" #include @@ -118,9 +118,9 @@ void SetupCommandLineArguments(CLI::App& app, PGeneratorConfig& config) { // General parameters app.add_option( - "--experimental-K", config.streaming.num_chunks, - "Number of chunks for generating the graph in a buffered streaming setting."); - app.add_option("--experimental-T", config.streaming.tmp_directory, "Directory for temporary buffer files."); + "--experimental-K", config.external.num_chunks, + "Number of chunks for generating the graph in the buffered external memory mode."); + app.add_option("--experimental-T", config.external.tmp_directory, "Directory for temporary buffer files."); app.add_flag("-q,--quiet", config.quiet, "Quiet mode"); app.add_flag("-v,--version", [&](auto) { PrintVersion(); }, "Print KaGen version")->trigger_on_parse(); @@ -489,8 +489,8 @@ int main(int argc, char* argv[]) { config.output_graph.extension = true; } - if (config.streaming.num_chunks > 1) { - GenerateStreamedToDisk(config, MPI_COMM_WORLD); + if (config.external.num_chunks > 1) { + GenerateExternalMemoryToDisk(config, MPI_COMM_WORLD); } else { GenerateInMemoryToDisk(config, MPI_COMM_WORLD); } diff --git a/kagen/context.cpp b/kagen/context.cpp index c8a7f94..2fb722c 100644 --- a/kagen/context.cpp +++ b/kagen/context.cpp @@ -37,11 +37,11 @@ std::ostream& operator<<(std::ostream& out, const PGeneratorConfig& config) { out << "General Parameters:\n"; out << " Seed: " << config.seed << "\n"; out << " Generate coordinates: " << (config.coordinates ? "yes" : "no") << "\n"; - out << " Execution mode: " << (config.streaming.num_chunks > 1 ? "streamed" : "in-memory") + out << " Execution mode: " << (config.external.num_chunks > 1 ? "streamed" : "in-memory") << "\n"; - if (config.streaming.num_chunks > 1) { - out << " Temporary directory: " << config.streaming.tmp_directory << "\n"; - out << " Sort edges: " << (config.streaming.sort_edges ? "yes" : "no") << "\n"; + if (config.external.num_chunks > 1) { + out << " Temporary directory: " << config.external.tmp_directory << "\n"; + out << " Sort edges: " << (config.external.sort_edges ? "yes" : "no") << "\n"; } else { out << " Validate generated graph: " << (config.validate_simple_graph ? "yes" : "no") << "\n"; out << " Statistics level: " << config.statistics_level << "\n"; diff --git a/kagen/context.h b/kagen/context.h index 909c596..e3f799d 100644 --- a/kagen/context.h +++ b/kagen/context.h @@ -68,15 +68,15 @@ struct OutputGraphConfig { int adjwgt_width = 64; }; -struct StreamingConfig { - int num_chunks = 1; - std::string tmp_directory = "/tmp"; - //bool remove_self_loops = false; - bool fix_reverse_edges = false; - bool fix_nonlocal_reverse_edges = false; - bool sort_edges = false; - - bool refuse_streaming_mode = false; +struct ExternalMemoryConfig { + int num_chunks = 1; + std::string tmp_directory = "/tmp"; + // bool remove_self_loops = false; + bool fix_reverse_edges = false; + bool fix_nonlocal_reverse_edges = false; + bool sort_edges = false; + + bool refuse_external_mode = false; }; // Configuration for the generator. @@ -88,8 +88,8 @@ struct PGeneratorConfig { bool skip_postprocessing = false; bool print_header = true; - // Streaming settings - StreamingConfig streaming{}; + // External memory mode settings + ExternalMemoryConfig external{}; // Generator settings GeneratorType generator; // Generator type diff --git a/kagen/streaming_facade.cpp b/kagen/external_memory_facade.cpp similarity index 75% rename from kagen/streaming_facade.cpp rename to kagen/external_memory_facade.cpp index 8e8eee0..5cd7eef 100644 --- a/kagen/streaming_facade.cpp +++ b/kagen/external_memory_facade.cpp @@ -1,4 +1,4 @@ -#include "kagen/streaming_facade.h" +#include "kagen/external_memory_facade.h" #include "kagen/definitions.h" #include "kagen/factories.h" @@ -22,7 +22,7 @@ void AddReverseEdges(Edgelist& edges) { std::swap(from, to); } } -// + // std::sort(edges.begin(), edges.end()); auto it = std::unique(edges.begin(), edges.end()); edges.erase(it, edges.end()); @@ -51,16 +51,16 @@ void AddNonlocalReverseEdges(Edgelist& edges, const std::pair& local } std::string BufferFilename(const PEID chunk, const PGeneratorConfig& config) { - return config.streaming.tmp_directory + "/sKaGen_" + std::to_string(chunk) + ".buf"; + return config.external.tmp_directory + "/sKaGen_" + std::to_string(chunk) + ".buf"; } void SwapoutGraphChunk( const Edgelist& edges, const PEID chunk, const std::vector& distribution, const PGeneratorConfig& config) { - std::vector index(config.streaming.num_chunks + 1); + std::vector index(config.external.num_chunks + 1); PEID cur = 0; for (const auto& [from, to]: edges) { - while (cur + 1 < config.streaming.num_chunks && from >= distribution[cur + 1]) { + while (cur + 1 < config.external.num_chunks && from >= distribution[cur + 1]) { ++cur; } ++index[cur + 1]; @@ -69,25 +69,7 @@ void SwapoutGraphChunk( std::partial_sum(index.begin(), index.end(), index.begin()); constexpr std::size_t edge_size = sizeof(typename Edgelist::value_type); - - //std::cout << "Swapout edges: #edges=" << edges.size() << ", index=["; - //for (const SInt i: index) { - // std::cout << i << ", "; - //} - //std::cout << "\b\b]\n" << std::flush; - - const std::string filename = BufferFilename(chunk, config); - - //std::cout << "Edges for buffer " << filename << ":" << std::endl; - // for (const auto& [u, v]: edges) { - // std::cout << u << " -> " << v << std::endl; - // } - - //std::cout << "WRITE INDEX TO " << filename << ": ["; - //for (const SInt i: index) { - // std::cout << i << ", "; - //} - //std::cout << "\b\b]\n" << std::flush; + const std::string filename = BufferFilename(chunk, config); std::ofstream out(filename, std::ios::binary | std::ios::trunc); out.write(reinterpret_cast(index.data()), sizeof(SInt) * index.size()); @@ -95,7 +77,7 @@ void SwapoutGraphChunk( } SInt CountEdges(const std::string& filename, const PEID chunk, const PGeneratorConfig& config) { - std::vector index(config.streaming.num_chunks + 1); + std::vector index(config.external.num_chunks + 1); SInt num_edges = 0; @@ -110,7 +92,7 @@ SInt CountEdges(const std::string& filename, const PEID chunk, const PGeneratorC } void SwapinEdges(const std::string& filename, const PEID chunk, const PGeneratorConfig& config, Edgelist& append) { - std::vector index(config.streaming.num_chunks + 1); + std::vector index(config.external.num_chunks + 1); std::ifstream in(filename, std::ios::binary); in.read(reinterpret_cast(index.data()), sizeof(SInt) * index.size()); @@ -119,16 +101,6 @@ void SwapinEdges(const std::string& filename, const PEID chunk, const PGenerator const SInt first_invalid_edge = index[chunk + 1]; const SInt num_edges = first_invalid_edge - first_edge; - //if (chunk == 2) { - // std::cout << "Edges from chunk " << filename << " , begin=" << first_edge << ", end=" << first_invalid_edge - // << ":" << std::endl; - // std::cout << "READ Index from " << filename << ": ["; - // for (const SInt i: index) { - // std::cout << i << ", "; - // } - // std::cout << "\b\b]" << std::endl; - //} - if (num_edges > 0) { constexpr std::size_t edge_size = sizeof(typename Edgelist::value_type); @@ -137,24 +109,12 @@ void SwapinEdges(const std::string& filename, const PEID chunk, const PGenerator in.seekg(first_edge * edge_size, std::ios_base::cur); in.read(reinterpret_cast(append.data() + old_size), edge_size * num_edges); - - //if (chunk == 2) { - // std::cout << "Edges from chunk " << filename << ", begin=" << first_edge << ", end=" << first_invalid_edge - // << ":" << std::endl; - // // for (const auto& [u, v]: append) { - // // std::cout << u << " -> " << v << std::endl; - // // } - //} - } else { - //std::cout << "No edges from chunk " << filename << std::endl; } } Graph SwapinGraphChunk(const PEID chunk, const std::vector& distribution, const PGeneratorConfig& config) { - // @todo count edges, then allocate just one buffer, sort when writing - SInt num_edges = 0; - for (int cur = 0; cur < config.streaming.num_chunks; ++cur) { + for (int cur = 0; cur < config.external.num_chunks; ++cur) { const std::string filename = BufferFilename(cur, config); num_edges += CountEdges(filename, chunk, config); } @@ -162,7 +122,7 @@ Graph SwapinGraphChunk(const PEID chunk, const std::vector& distribution, Edgelist edges; edges.reserve(num_edges); - for (int cur = 0; cur < config.streaming.num_chunks; ++cur) { + for (int cur = 0; cur < config.external.num_chunks; ++cur) { const std::string filename = BufferFilename(cur, config); SwapinEdges(filename, chunk, config, edges); } @@ -195,7 +155,7 @@ std::vector CreateVertexDistribution(const SInt n, const PEID K) { } } // namespace -void GenerateStreamedToDisk(PGeneratorConfig config, MPI_Comm comm) { +void GenerateExternalMemoryToDisk(PGeneratorConfig config, MPI_Comm comm) { PEID rank, size; MPI_Comm_rank(comm, &rank); MPI_Comm_size(comm, &size); @@ -209,26 +169,20 @@ void GenerateStreamedToDisk(PGeneratorConfig config, MPI_Comm comm) { if (config.n == 0) { if (output_error) { - std::cerr << "Error: streaming mode requires the number of nodes to be given in advance\n"; + std::cerr << "Error: external mode requires the number of nodes to be given in advance\n"; } MPI_Abort(comm, 1); } GraphInfo local_info; - auto vertex_distribution = CreateVertexDistribution(config.n, config.streaming.num_chunks); + auto vertex_distribution = CreateVertexDistribution(config.n, config.external.num_chunks); auto generator_factory = CreateGeneratorFactory(config.generator); - //std::cout << "Vertex distribution: ["; - //for (const SInt i: vertex_distribution) { - // std::cout << i << ", "; - //} - //std::cout << "\b\b]\n" << std::flush; - - for (PEID chunk = rank; chunk < config.streaming.num_chunks; chunk += size) { + for (PEID chunk = rank; chunk < config.external.num_chunks; chunk += size) { try { - config = generator_factory->NormalizeParameters(config, chunk, config.streaming.num_chunks, output_info); - if (config.streaming.refuse_streaming_mode) { - throw ConfigurationError("generator is not available in streaming mode"); + config = generator_factory->NormalizeParameters(config, chunk, config.external.num_chunks, output_info); + if (config.external.refuse_external_mode) { + throw ConfigurationError("generator is not available in external mode"); } } catch (ConfigurationError& ex) { if (output_error) { @@ -237,34 +191,31 @@ void GenerateStreamedToDisk(PGeneratorConfig config, MPI_Comm comm) { MPI_Abort(comm, 1); } - auto generator = generator_factory->Create(config, chunk, config.streaming.num_chunks); + auto generator = generator_factory->Create(config, chunk, config.external.num_chunks); if (output_info) { - std::cout << "Generating edges (" << (chunk + 1) << "... / " << config.streaming.num_chunks << ") ... " + std::cout << "Generating edges (" << (chunk + 1) << "... / " << config.external.num_chunks << ") ... " << std::flush; } Graph graph = generator->Generate(GraphRepresentation::EDGE_LIST)->Take(); if (local_info.has_edge_weights) { if (output_error) { - std::cerr << "Error: edge weights are not supported in streaming mode\n"; + std::cerr << "Error: edge weights are not supported in external mode\n"; } MPI_Abort(comm, 1); } - //std::cout << "Vertex range: [" << graph.vertex_range.first << ", " << graph.vertex_range.second << ")" - // << std::endl; - Edgelist edges = std::move(graph.edges); - if (config.streaming.fix_reverse_edges) { + if (config.external.fix_reverse_edges) { if (output_info) { std::cout << "fixing reverse edges ... " << std::flush; } AddReverseEdges(edges); } - if (config.streaming.fix_nonlocal_reverse_edges) { + if (config.external.fix_nonlocal_reverse_edges) { if (output_info) { std::cout << "fixing nonlocal reverse edges ... " << std::flush; } @@ -300,12 +251,12 @@ void GenerateStreamedToDisk(PGeneratorConfig config, MPI_Comm comm) { std::cout << "OK" << std::endl; } - if (config.streaming.fix_reverse_edges || config.streaming.fix_nonlocal_reverse_edges) { + if (config.external.fix_reverse_edges || config.external.fix_nonlocal_reverse_edges) { local_info.global_m = 0; - for (int chunk = rank; chunk < config.streaming.num_chunks; chunk += size) { + for (int chunk = rank; chunk < config.external.num_chunks; chunk += size) { if (output_info) { - std::cout << "Counting edges (chunk " << chunk + 1 << "... / " << config.streaming.num_chunks + std::cout << "Counting edges (chunk " << chunk + 1 << "... / " << config.external.num_chunks << ") ... reading ... " << std::flush; } Graph graph = SwapinGraphChunk(chunk, vertex_distribution, config); @@ -357,10 +308,10 @@ void GenerateStreamedToDisk(PGeneratorConfig config, MPI_Comm comm) { SInt offset_n = 0; SInt offset_m = 0; - for (PEID chunk = rank; chunk < config.streaming.num_chunks; chunk += size) { + for (PEID chunk = rank; chunk < config.external.num_chunks; chunk += size) { if (output_info) { std::cout << "Writing " << out_config.filename << " (pass " << pass + 1 << ", chunk " << chunk + 1 - << "... of " << config.streaming.num_chunks << ") ... reading ... " << std::flush; + << "... of " << config.external.num_chunks << ") ... reading ... " << std::flush; } Graph graph = SwapinGraphChunk(chunk, vertex_distribution, config); @@ -384,7 +335,7 @@ void GenerateStreamedToDisk(PGeneratorConfig config, MPI_Comm comm) { } continue_with_next_pass = - factory->CreateWriter(out_config, graph, pass_info, chunk, config.streaming.num_chunks) + factory->CreateWriter(out_config, graph, pass_info, chunk, config.external.num_chunks) ->Write(pass, out_config.filename); } @@ -407,7 +358,7 @@ void GenerateStreamedToDisk(PGeneratorConfig config, MPI_Comm comm) { if (output_info) { std::cout << "Cleaning up ... " << std::flush; } - for (PEID chunk = 0; chunk < config.streaming.num_chunks; ++chunk) { + for (PEID chunk = 0; chunk < config.external.num_chunks; ++chunk) { const std::string filename = BufferFilename(chunk, config); std::remove(filename.c_str()); } diff --git a/kagen/streaming_facade.h b/kagen/external_memory_facade.h similarity index 58% rename from kagen/streaming_facade.h rename to kagen/external_memory_facade.h index b2c8263..69e4497 100644 --- a/kagen/streaming_facade.h +++ b/kagen/external_memory_facade.h @@ -6,5 +6,5 @@ #include namespace kagen { -void GenerateStreamedToDisk(PGeneratorConfig config, MPI_Comm comm); +void GenerateExternalMemoryToDisk(PGeneratorConfig config, MPI_Comm comm); } diff --git a/kagen/generators/barabassi/barabassi.cpp b/kagen/generators/barabassi/barabassi.cpp index 1373340..2ae8283 100644 --- a/kagen/generators/barabassi/barabassi.cpp +++ b/kagen/generators/barabassi/barabassi.cpp @@ -35,7 +35,7 @@ BarabassiFactory::NormalizeParameters(PGeneratorConfig config, PEID, const PEID } // Streaming mode does not call Finalize() - config.streaming.fix_reverse_edges = !config.directed; + config.external.fix_reverse_edges = !config.directed; return config; } diff --git a/kagen/generators/hyperbolic/hyperbolic.cpp b/kagen/generators/hyperbolic/hyperbolic.cpp index 2fc57c3..5b62075 100644 --- a/kagen/generators/hyperbolic/hyperbolic.cpp +++ b/kagen/generators/hyperbolic/hyperbolic.cpp @@ -52,8 +52,8 @@ HyperbolicFactory::NormalizeParameters(PGeneratorConfig config, PEID, const PEID config.hp_floats = 1; } - // Streaming mode does not call Finalize() - config.streaming.fix_nonlocal_reverse_edges = true; + // External memory mode does not call Finalize() -- instead, we have to enable postprocessing via flags + config.external.fix_nonlocal_reverse_edges = true; return config; } diff --git a/kagen/generators/kronecker/kronecker.cpp b/kagen/generators/kronecker/kronecker.cpp index 0ed13e2..73b4be2 100644 --- a/kagen/generators/kronecker/kronecker.cpp +++ b/kagen/generators/kronecker/kronecker.cpp @@ -37,7 +37,7 @@ namespace kagen { PGeneratorConfig KroneckerFactory::NormalizeParameters(PGeneratorConfig config, PEID, PEID, bool) const { - config.streaming.refuse_streaming_mode = true; + config.external.refuse_external_mode = true; return config; } diff --git a/kagen/generators/rmat/rmat.cpp b/kagen/generators/rmat/rmat.cpp index 6c3bac7..5025000 100644 --- a/kagen/generators/rmat/rmat.cpp +++ b/kagen/generators/rmat/rmat.cpp @@ -31,7 +31,7 @@ RMATFactory::NormalizeParameters(PGeneratorConfig config, PEID, const PEID size, config.k = static_cast(size); } - config.streaming.refuse_streaming_mode = true; + config.external.refuse_external_mode = true; return config; }