Skip to content

Commit

Permalink
refactor: remove debug code, rebrand streaming mode as external memor…
Browse files Browse the repository at this point in the history
…y mode
  • Loading branch information
DanielSeemaier committed May 24, 2024
1 parent a60abca commit 66087b2
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 105 deletions.
12 changes: 6 additions & 6 deletions app/KaGen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
******************************************************************************/
#include "kagen/context.h"
#include "kagen/definitions.h"
#include "kagen/external_memory_facade.h"
#include "kagen/in_memory_facade.h"
#include "kagen/streaming_facade.h"

#include <mpi.h>

Expand Down Expand Up @@ -118,9 +118,9 @@ void SetupCommandLineArguments(CLI::App& app, PGeneratorConfig& config) {

// General parameters
app.add_option(
"--experimental-K", config.streaming.num_chunks,
"Number of chunks for generating the graph in a buffered streaming setting.");
app.add_option("--experimental-T", config.streaming.tmp_directory, "Directory for temporary buffer files.");
"--experimental-K", config.external.num_chunks,
"Number of chunks for generating the graph in the buffered external memory mode.");
app.add_option("--experimental-T", config.external.tmp_directory, "Directory for temporary buffer files.");

app.add_flag("-q,--quiet", config.quiet, "Quiet mode");
app.add_flag("-v,--version", [&](auto) { PrintVersion(); }, "Print KaGen version")->trigger_on_parse();
Expand Down Expand Up @@ -489,8 +489,8 @@ int main(int argc, char* argv[]) {
config.output_graph.extension = true;
}

if (config.streaming.num_chunks > 1) {
GenerateStreamedToDisk(config, MPI_COMM_WORLD);
if (config.external.num_chunks > 1) {
GenerateExternalMemoryToDisk(config, MPI_COMM_WORLD);
} else {
GenerateInMemoryToDisk(config, MPI_COMM_WORLD);
}
Expand Down
8 changes: 4 additions & 4 deletions kagen/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ std::ostream& operator<<(std::ostream& out, const PGeneratorConfig& config) {
out << "General Parameters:\n";
out << " Seed: " << config.seed << "\n";
out << " Generate coordinates: " << (config.coordinates ? "yes" : "no") << "\n";
out << " Execution mode: " << (config.streaming.num_chunks > 1 ? "streamed" : "in-memory")
out << " Execution mode: " << (config.external.num_chunks > 1 ? "streamed" : "in-memory")
<< "\n";
if (config.streaming.num_chunks > 1) {
out << " Temporary directory: " << config.streaming.tmp_directory << "\n";
out << " Sort edges: " << (config.streaming.sort_edges ? "yes" : "no") << "\n";
if (config.external.num_chunks > 1) {
out << " Temporary directory: " << config.external.tmp_directory << "\n";
out << " Sort edges: " << (config.external.sort_edges ? "yes" : "no") << "\n";
} else {
out << " Validate generated graph: " << (config.validate_simple_graph ? "yes" : "no") << "\n";
out << " Statistics level: " << config.statistics_level << "\n";
Expand Down
22 changes: 11 additions & 11 deletions kagen/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ struct OutputGraphConfig {
int adjwgt_width = 64;
};

struct StreamingConfig {
int num_chunks = 1;
std::string tmp_directory = "/tmp";
//bool remove_self_loops = false;
bool fix_reverse_edges = false;
bool fix_nonlocal_reverse_edges = false;
bool sort_edges = false;

bool refuse_streaming_mode = false;
struct ExternalMemoryConfig {
int num_chunks = 1;
std::string tmp_directory = "/tmp";
// bool remove_self_loops = false;
bool fix_reverse_edges = false;
bool fix_nonlocal_reverse_edges = false;
bool sort_edges = false;

bool refuse_external_mode = false;
};

// Configuration for the generator.
Expand All @@ -88,8 +88,8 @@ struct PGeneratorConfig {
bool skip_postprocessing = false;
bool print_header = true;

// Streaming settings
StreamingConfig streaming{};
// External memory mode settings
ExternalMemoryConfig external{};

// Generator settings
GeneratorType generator; // Generator type
Expand Down
107 changes: 29 additions & 78 deletions kagen/streaming_facade.cpp → kagen/external_memory_facade.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include "kagen/streaming_facade.h"
#include "kagen/external_memory_facade.h"

#include "kagen/definitions.h"
#include "kagen/factories.h"
Expand All @@ -22,7 +22,7 @@ void AddReverseEdges(Edgelist& edges) {
std::swap(from, to);
}
}
//
//
std::sort(edges.begin(), edges.end());
auto it = std::unique(edges.begin(), edges.end());
edges.erase(it, edges.end());
Expand Down Expand Up @@ -51,16 +51,16 @@ void AddNonlocalReverseEdges(Edgelist& edges, const std::pair<SInt, SInt>& local
}

std::string BufferFilename(const PEID chunk, const PGeneratorConfig& config) {
return config.streaming.tmp_directory + "/sKaGen_" + std::to_string(chunk) + ".buf";
return config.external.tmp_directory + "/sKaGen_" + std::to_string(chunk) + ".buf";
}

void SwapoutGraphChunk(
const Edgelist& edges, const PEID chunk, const std::vector<SInt>& distribution, const PGeneratorConfig& config) {
std::vector<SInt> index(config.streaming.num_chunks + 1);
std::vector<SInt> index(config.external.num_chunks + 1);

PEID cur = 0;
for (const auto& [from, to]: edges) {
while (cur + 1 < config.streaming.num_chunks && from >= distribution[cur + 1]) {
while (cur + 1 < config.external.num_chunks && from >= distribution[cur + 1]) {
++cur;
}
++index[cur + 1];
Expand All @@ -69,33 +69,15 @@ void SwapoutGraphChunk(
std::partial_sum(index.begin(), index.end(), index.begin());

constexpr std::size_t edge_size = sizeof(typename Edgelist::value_type);

//std::cout << "Swapout edges: #edges=" << edges.size() << ", index=[";
//for (const SInt i: index) {
// std::cout << i << ", ";
//}
//std::cout << "\b\b]\n" << std::flush;

const std::string filename = BufferFilename(chunk, config);

//std::cout << "Edges for buffer " << filename << ":" << std::endl;
// for (const auto& [u, v]: edges) {
// std::cout << u << " -> " << v << std::endl;
// }

//std::cout << "WRITE INDEX TO " << filename << ": [";
//for (const SInt i: index) {
// std::cout << i << ", ";
//}
//std::cout << "\b\b]\n" << std::flush;
const std::string filename = BufferFilename(chunk, config);

std::ofstream out(filename, std::ios::binary | std::ios::trunc);
out.write(reinterpret_cast<const char*>(index.data()), sizeof(SInt) * index.size());
out.write(reinterpret_cast<const char*>(edges.data()), edge_size * edges.size());
}

SInt CountEdges(const std::string& filename, const PEID chunk, const PGeneratorConfig& config) {
std::vector<SInt> index(config.streaming.num_chunks + 1);
std::vector<SInt> index(config.external.num_chunks + 1);

SInt num_edges = 0;

Expand All @@ -110,7 +92,7 @@ SInt CountEdges(const std::string& filename, const PEID chunk, const PGeneratorC
}

void SwapinEdges(const std::string& filename, const PEID chunk, const PGeneratorConfig& config, Edgelist& append) {
std::vector<SInt> index(config.streaming.num_chunks + 1);
std::vector<SInt> index(config.external.num_chunks + 1);

std::ifstream in(filename, std::ios::binary);
in.read(reinterpret_cast<char*>(index.data()), sizeof(SInt) * index.size());
Expand All @@ -119,16 +101,6 @@ void SwapinEdges(const std::string& filename, const PEID chunk, const PGenerator
const SInt first_invalid_edge = index[chunk + 1];
const SInt num_edges = first_invalid_edge - first_edge;

//if (chunk == 2) {
// std::cout << "Edges from chunk " << filename << " , begin=" << first_edge << ", end=" << first_invalid_edge
// << ":" << std::endl;
// std::cout << "READ Index from " << filename << ": [";
// for (const SInt i: index) {
// std::cout << i << ", ";
// }
// std::cout << "\b\b]" << std::endl;
//}

if (num_edges > 0) {
constexpr std::size_t edge_size = sizeof(typename Edgelist::value_type);

Expand All @@ -137,32 +109,20 @@ void SwapinEdges(const std::string& filename, const PEID chunk, const PGenerator

in.seekg(first_edge * edge_size, std::ios_base::cur);
in.read(reinterpret_cast<char*>(append.data() + old_size), edge_size * num_edges);

//if (chunk == 2) {
// std::cout << "Edges from chunk " << filename << ", begin=" << first_edge << ", end=" << first_invalid_edge
// << ":" << std::endl;
// // for (const auto& [u, v]: append) {
// // std::cout << u << " -> " << v << std::endl;
// // }
//}
} else {
//std::cout << "No edges from chunk " << filename << std::endl;
}
}

Graph SwapinGraphChunk(const PEID chunk, const std::vector<SInt>& distribution, const PGeneratorConfig& config) {
// @todo count edges, then allocate just one buffer, sort when writing

SInt num_edges = 0;
for (int cur = 0; cur < config.streaming.num_chunks; ++cur) {
for (int cur = 0; cur < config.external.num_chunks; ++cur) {
const std::string filename = BufferFilename(cur, config);
num_edges += CountEdges(filename, chunk, config);
}

Edgelist edges;
edges.reserve(num_edges);

for (int cur = 0; cur < config.streaming.num_chunks; ++cur) {
for (int cur = 0; cur < config.external.num_chunks; ++cur) {
const std::string filename = BufferFilename(cur, config);
SwapinEdges(filename, chunk, config, edges);
}
Expand Down Expand Up @@ -195,7 +155,7 @@ std::vector<SInt> CreateVertexDistribution(const SInt n, const PEID K) {
}
} // namespace

void GenerateStreamedToDisk(PGeneratorConfig config, MPI_Comm comm) {
void GenerateExternalMemoryToDisk(PGeneratorConfig config, MPI_Comm comm) {
PEID rank, size;
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &size);
Expand All @@ -209,26 +169,20 @@ void GenerateStreamedToDisk(PGeneratorConfig config, MPI_Comm comm) {

if (config.n == 0) {
if (output_error) {
std::cerr << "Error: streaming mode requires the number of nodes to be given in advance\n";
std::cerr << "Error: external mode requires the number of nodes to be given in advance\n";
}
MPI_Abort(comm, 1);
}

GraphInfo local_info;
auto vertex_distribution = CreateVertexDistribution(config.n, config.streaming.num_chunks);
auto vertex_distribution = CreateVertexDistribution(config.n, config.external.num_chunks);
auto generator_factory = CreateGeneratorFactory(config.generator);

//std::cout << "Vertex distribution: [";
//for (const SInt i: vertex_distribution) {
// std::cout << i << ", ";
//}
//std::cout << "\b\b]\n" << std::flush;

for (PEID chunk = rank; chunk < config.streaming.num_chunks; chunk += size) {
for (PEID chunk = rank; chunk < config.external.num_chunks; chunk += size) {
try {
config = generator_factory->NormalizeParameters(config, chunk, config.streaming.num_chunks, output_info);
if (config.streaming.refuse_streaming_mode) {
throw ConfigurationError("generator is not available in streaming mode");
config = generator_factory->NormalizeParameters(config, chunk, config.external.num_chunks, output_info);
if (config.external.refuse_external_mode) {
throw ConfigurationError("generator is not available in external mode");
}
} catch (ConfigurationError& ex) {
if (output_error) {
Expand All @@ -237,34 +191,31 @@ void GenerateStreamedToDisk(PGeneratorConfig config, MPI_Comm comm) {
MPI_Abort(comm, 1);
}

auto generator = generator_factory->Create(config, chunk, config.streaming.num_chunks);
auto generator = generator_factory->Create(config, chunk, config.external.num_chunks);

if (output_info) {
std::cout << "Generating edges (" << (chunk + 1) << "... / " << config.streaming.num_chunks << ") ... "
std::cout << "Generating edges (" << (chunk + 1) << "... / " << config.external.num_chunks << ") ... "
<< std::flush;
}

Graph graph = generator->Generate(GraphRepresentation::EDGE_LIST)->Take();
if (local_info.has_edge_weights) {
if (output_error) {
std::cerr << "Error: edge weights are not supported in streaming mode\n";
std::cerr << "Error: edge weights are not supported in external mode\n";
}
MPI_Abort(comm, 1);
}

//std::cout << "Vertex range: [" << graph.vertex_range.first << ", " << graph.vertex_range.second << ")"
// << std::endl;

Edgelist edges = std::move(graph.edges);

if (config.streaming.fix_reverse_edges) {
if (config.external.fix_reverse_edges) {
if (output_info) {
std::cout << "fixing reverse edges ... " << std::flush;
}
AddReverseEdges(edges);
}

if (config.streaming.fix_nonlocal_reverse_edges) {
if (config.external.fix_nonlocal_reverse_edges) {
if (output_info) {
std::cout << "fixing nonlocal reverse edges ... " << std::flush;
}
Expand Down Expand Up @@ -300,12 +251,12 @@ void GenerateStreamedToDisk(PGeneratorConfig config, MPI_Comm comm) {
std::cout << "OK" << std::endl;
}

if (config.streaming.fix_reverse_edges || config.streaming.fix_nonlocal_reverse_edges) {
if (config.external.fix_reverse_edges || config.external.fix_nonlocal_reverse_edges) {
local_info.global_m = 0;

for (int chunk = rank; chunk < config.streaming.num_chunks; chunk += size) {
for (int chunk = rank; chunk < config.external.num_chunks; chunk += size) {
if (output_info) {
std::cout << "Counting edges (chunk " << chunk + 1 << "... / " << config.streaming.num_chunks
std::cout << "Counting edges (chunk " << chunk + 1 << "... / " << config.external.num_chunks
<< ") ... reading ... " << std::flush;
}
Graph graph = SwapinGraphChunk(chunk, vertex_distribution, config);
Expand Down Expand Up @@ -357,10 +308,10 @@ void GenerateStreamedToDisk(PGeneratorConfig config, MPI_Comm comm) {
SInt offset_n = 0;
SInt offset_m = 0;

for (PEID chunk = rank; chunk < config.streaming.num_chunks; chunk += size) {
for (PEID chunk = rank; chunk < config.external.num_chunks; chunk += size) {
if (output_info) {
std::cout << "Writing " << out_config.filename << " (pass " << pass + 1 << ", chunk " << chunk + 1
<< "... of " << config.streaming.num_chunks << ") ... reading ... " << std::flush;
<< "... of " << config.external.num_chunks << ") ... reading ... " << std::flush;
}

Graph graph = SwapinGraphChunk(chunk, vertex_distribution, config);
Expand All @@ -384,7 +335,7 @@ void GenerateStreamedToDisk(PGeneratorConfig config, MPI_Comm comm) {
}

continue_with_next_pass =
factory->CreateWriter(out_config, graph, pass_info, chunk, config.streaming.num_chunks)
factory->CreateWriter(out_config, graph, pass_info, chunk, config.external.num_chunks)
->Write(pass, out_config.filename);
}

Expand All @@ -407,7 +358,7 @@ void GenerateStreamedToDisk(PGeneratorConfig config, MPI_Comm comm) {
if (output_info) {
std::cout << "Cleaning up ... " << std::flush;
}
for (PEID chunk = 0; chunk < config.streaming.num_chunks; ++chunk) {
for (PEID chunk = 0; chunk < config.external.num_chunks; ++chunk) {
const std::string filename = BufferFilename(chunk, config);
std::remove(filename.c_str());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
#include <mpi.h>

namespace kagen {
void GenerateStreamedToDisk(PGeneratorConfig config, MPI_Comm comm);
void GenerateExternalMemoryToDisk(PGeneratorConfig config, MPI_Comm comm);
}
2 changes: 1 addition & 1 deletion kagen/generators/barabassi/barabassi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ BarabassiFactory::NormalizeParameters(PGeneratorConfig config, PEID, const PEID
}

// Streaming mode does not call Finalize()
config.streaming.fix_reverse_edges = !config.directed;
config.external.fix_reverse_edges = !config.directed;

return config;
}
Expand Down
4 changes: 2 additions & 2 deletions kagen/generators/hyperbolic/hyperbolic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ HyperbolicFactory::NormalizeParameters(PGeneratorConfig config, PEID, const PEID
config.hp_floats = 1;
}

// Streaming mode does not call Finalize()
config.streaming.fix_nonlocal_reverse_edges = true;
// External memory mode does not call Finalize() -- instead, we have to enable postprocessing via flags
config.external.fix_nonlocal_reverse_edges = true;

return config;
}
Expand Down
2 changes: 1 addition & 1 deletion kagen/generators/kronecker/kronecker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

namespace kagen {
PGeneratorConfig KroneckerFactory::NormalizeParameters(PGeneratorConfig config, PEID, PEID, bool) const {
config.streaming.refuse_streaming_mode = true;
config.external.refuse_external_mode = true;
return config;
}

Expand Down
2 changes: 1 addition & 1 deletion kagen/generators/rmat/rmat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ RMATFactory::NormalizeParameters(PGeneratorConfig config, PEID, const PEID size,
config.k = static_cast<SInt>(size);
}

config.streaming.refuse_streaming_mode = true;
config.external.refuse_external_mode = true;

return config;
}
Expand Down

0 comments on commit 66087b2

Please sign in to comment.