Skip to content

Commit

Permalink
fix: reduce IO accesses by keeping relevant parts of the chunk index …
Browse files Browse the repository at this point in the history
…in memory
  • Loading branch information
DanielSeemaier committed May 27, 2024
1 parent de32b03 commit 4a53f66
Showing 1 changed file with 39 additions and 18 deletions.
57 changes: 39 additions & 18 deletions kagen/external_memory_facade.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,39 +79,50 @@ void SwapoutGraphChunk(
out.write(reinterpret_cast<const char*>(edges.data()), edge_size * edges.size());
}

SInt CountEdges(const std::string& filename, const PEID chunk, const PGeneratorConfig& config) {
void ReadIndex(const SInt chunk, const PGeneratorConfig& config, std::vector<SInt>& index) {
const std::string filename = BufferFilename(chunk, config);

std::ifstream in(filename, std::ios::binary);
if (!in) {
throw std::ios_base::failure("cannot read from " + filename);
}

std::vector<SInt> index(config.external.num_chunks + 1);
in.read(reinterpret_cast<char*>(index.data()), sizeof(SInt) * index.size());

return index[chunk + 1] - index[chunk];
}

void SwapinEdges(const std::string& filename, const PEID chunk, const PGeneratorConfig& config, Edgelist& append) {
std::vector<SInt> index(config.external.num_chunks + 1);
void SwapinEdges(
const std::string& filename, const PEID chunk, const PGeneratorConfig& config, Edgelist& append,
const std::pair<SInt, SInt>& range = {1, 0}) {
auto [first_edge, first_invalid_edge] = range;

std::ifstream in(filename, std::ios::binary);
if (!in) {
throw std::ios_base::failure("cannot read from " + filename);
}
// Only read the index structure if we do not already know the edge range
if (first_invalid_edge < first_edge) {
std::vector<SInt> index(config.external.num_chunks + 1);

in.read(reinterpret_cast<char*>(index.data()), sizeof(SInt) * index.size());
std::ifstream in(filename, std::ios::binary);
if (!in) {
throw std::ios_base::failure("cannot read from " + filename);
}

const SInt first_edge = index[chunk];
const SInt first_invalid_edge = index[chunk + 1];
const SInt num_edges = first_invalid_edge - first_edge;
in.read(reinterpret_cast<char*>(index.data()), sizeof(SInt) * index.size());

first_edge = index[chunk];
first_invalid_edge = index[chunk + 1];
}

const SInt num_edges = first_invalid_edge - first_edge;

if (num_edges > 0) {
constexpr std::size_t edge_size = sizeof(typename Edgelist::value_type);

const std::size_t old_size = append.size();
append.resize(old_size + num_edges);

in.seekg(first_edge * edge_size, std::ios_base::cur);
const SInt pos_after_index = (config.external.num_chunks + 1) * sizeof(SInt);
const SInt pos_after_prev_edges = first_edge * edge_size + pos_after_index;

std::ifstream in(filename, std::ios::binary);
in.seekg(pos_after_prev_edges);
in.read(reinterpret_cast<char*>(append.data() + old_size), edge_size * num_edges);
}
}
Expand Down Expand Up @@ -154,10 +165,20 @@ Graph SwapinGraphChunk(
std::cout << "counting unfiltered edges ... " << std::flush;
}

// @todo reading the index of every file can become a bottleneck if file access time is high
// We should keep the index in memory
std::vector<std::pair<SInt, SInt>> edge_ranges;
edge_ranges.reserve(config.external.num_chunks);

SInt num_edges = 0;

std::vector<SInt> index(config.external.num_chunks + 1);

for (int cur = 0; cur < config.external.num_chunks; ++cur) {
const std::string filename = BufferFilename(cur, config);
num_edges += CountEdges(filename, chunk, config);
ReadIndex(cur, config, index);

edge_ranges.emplace_back(index[chunk], index[chunk + 1]);
num_edges += index[chunk + 1] - index[chunk];
}

if (output_info) {
Expand All @@ -172,7 +193,7 @@ Graph SwapinGraphChunk(

for (int cur = 0; cur < config.external.num_chunks; ++cur) {
const std::string filename = BufferFilename(cur, config);
SwapinEdges(filename, chunk, config, edges);
SwapinEdges(filename, chunk, config, edges, edge_ranges[cur]);
}

if (output_info) {
Expand Down

0 comments on commit 4a53f66

Please sign in to comment.