Skip to content

Commit

Permalink
Rename edge partitioning functionality to batch partitioning.
Browse files Browse the repository at this point in the history
  • Loading branch information
c-bebop committed Oct 18, 2024
1 parent a99ba44 commit b13650c
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 77 deletions.
17 changes: 17 additions & 0 deletions include/gdsb/batcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,23 @@ template <class EdgeContainer> class Batcher
EIt m_end;
};


inline uint64_t partition_batch_count(uint64_t const batch_count, uint32_t const partition_id, uint32_t const partition_size)
{
uint64_t partition_batch_count = batch_count / partition_size;
if (partition_id == partition_size - 1)
{
partition_batch_count += batch_count % partition_size;
}

return partition_batch_count;
}

inline uint64_t batch_offset(uint64_t const batch_count, uint32_t const partition_id, uint32_t const partition_size)
{
return batch_count / partition_size * partition_id;
}

template <typename EdgeIt, typename EdgeContainer, typename V>
Batch<EdgeContainer>
thread_batch(EdgeIt batch_begin, EdgeIt batch_end, unsigned int thread_count, unsigned int thread_id, V invalid_vertex)
Expand Down
13 changes: 5 additions & 8 deletions include/gdsb/graph_input.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <gdsb/batcher.h>
#include <gdsb/graph.h>
#include <gdsb/graph_io_parameters.h>

Expand Down Expand Up @@ -267,10 +268,6 @@ std::tuple<Vertex64, uint64_t> read_binary_graph(std::ifstream& input, Header co
return std::make_tuple(header.vertex_count, header.edge_count);
}

uint64_t partition_edge_count(uint64_t total_edge_count, uint32_t partition_id, uint32_t partition_size);

uint64_t edge_offset(uint64_t total_edge_count, uint32_t const partition_id, uint32_t const partition_size);

template <typename ReadF>
std::tuple<Vertex64, uint64_t> read_binary_graph_partition(std::ifstream& input,
BinaryGraphHeaderMetaDataV2 const& data,
Expand All @@ -281,10 +278,10 @@ std::tuple<Vertex64, uint64_t> read_binary_graph_partition(std::ifstream& input,
{
assert(partition_size > 0);

size_t const offset = edge_offset(data.edge_count, partition_id, partition_size);
size_t const offset = batch_offset(data.edge_count, partition_id, partition_size);
input.seekg(offset * edge_size_in_bytes, std::ios_base::cur);

uint64_t const edge_count = partition_edge_count(data.edge_count, partition_id, partition_size);
uint64_t const edge_count = partition_batch_count(data.edge_count, partition_id, partition_size);
bool continue_reading = true;
for (uint64_t e = 0; e < edge_count && input.is_open() && continue_reading; ++e)
{
Expand Down Expand Up @@ -393,9 +390,9 @@ template <typename CopyF, typename Edges> void insert_return_edges(CopyF&& copy_
// Each thread will copy a dedicated range of the read in edges.
#pragma omp parallel
{
size_t const edge_begin_offset = edge_offset(original_size, omp_get_thread_num(), omp_get_num_threads());
size_t const edge_begin_offset = batch_offset(original_size, omp_get_thread_num(), omp_get_num_threads());
size_t const edge_end_offset =
edge_begin_offset + partition_edge_count(original_size, omp_get_thread_num(), omp_get_num_threads());
edge_begin_offset + partition_batch_count(original_size, omp_get_thread_num(), omp_get_num_threads());

auto original_it = std::begin(edges);
std::advance(original_it, edge_begin_offset);
Expand Down
9 changes: 5 additions & 4 deletions include/gdsb/mpi_graph_io.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <gdsb/batcher.h>
#include <gdsb/graph_input.h>
#include <gdsb/mpi_error_handler.h>

Expand Down Expand Up @@ -82,10 +83,10 @@ std::tuple<Vertex64, uint64_t> read_binary_graph_partition(MPI_File const input,
uint32_t const partition_id,
uint32_t const partition_size)
{
uint64_t const edge_count = partition_edge_count(data.edge_count, partition_id, partition_size);
uint64_t const edge_count = partition_batch_count(data.edge_count, partition_id, partition_size);

// Header offset should be implicit since input is already read until begin of edges
size_t const offset = edge_offset(data.edge_count, partition_id, partition_size);
size_t const offset = batch_offset(data.edge_count, partition_id, partition_size);
size_t const offset_in_bytes = offset * edge_size_in_bytes;

int const error = MPI_File_seek(input, offset_in_bytes, MPI_SEEK_CUR);
Expand Down Expand Up @@ -115,15 +116,15 @@ std::tuple<Vertex64, uint64_t> all_read_binary_graph_partition(MPI_File const in
uint32_t const partition_size)
{
// Header offset should be implicit since input is already read until begin of edges
size_t const offset = edge_offset(data.edge_count, partition_id, partition_size);
size_t const offset = batch_offset(data.edge_count, partition_id, partition_size);
size_t const offset_in_bytes = offset * edge_size_in_bytes;
int const seek_error = MPI_File_seek(input, offset_in_bytes, MPI_SEEK_CUR);
if (seek_error != MPI_SUCCESS)
{
throw std::runtime_error("Could not seek to specified offset [" + std::to_string(offset) + "] within MPI file.");
}

uint64_t const edge_count = partition_edge_count(data.edge_count, partition_id, partition_size);
uint64_t const edge_count = partition_batch_count(data.edge_count, partition_id, partition_size);
MPI_Status status;
int const read_all_error = MPI_File_read_all(input, edges, edge_count, mpi_datatype, &status);
if (read_all_error != MPI_SUCCESS)
Expand Down
16 changes: 0 additions & 16 deletions src/graph_input.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,4 @@ float read_float(char const* source, char** end)
return value;
}

uint64_t partition_edge_count(uint64_t const total_edge_count, uint32_t const partition_id, uint32_t const partition_size)
{
uint64_t partition_edge_count = total_edge_count / partition_size;
if (partition_id == partition_size - 1)
{
partition_edge_count += total_edge_count % partition_size;
}

return partition_edge_count;
}

uint64_t edge_offset(uint64_t total_edge_count, uint32_t const partition_id, uint32_t const partition_size)
{
return total_edge_count / partition_size * partition_id;
}

} // namespace gdsb
47 changes: 47 additions & 0 deletions test/batcher.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#include <catch2/catch_test_macros.hpp>

#include "test_graph.h"

#include <gdsb/batcher.h>
#include <gdsb/graph.h>
#include <gdsb/graph_input.h>

using namespace gdsb;

Expand All @@ -21,3 +24,47 @@ TEST_CASE("Batcher")
CHECK(batch.begin->target.vertex == edges[0].target.vertex);
}
}

TEST_CASE("partition_batch_count, on enzymes graph")
{
std::ifstream binary_graph(graph_path + unweighted_directed_graph_enzymes_bin);
BinaryGraphHeaderMetaDataV2 header = read_binary_graph_header(binary_graph);

SECTION("partition size 2")
{
uint32_t partition_size = 2;

uint32_t partition_id = 0;
uint64_t edge_count = partition_batch_count(header.edge_count, partition_id, partition_size);
CHECK(edge_count == 84);

partition_id = 1;
edge_count = partition_batch_count(header.edge_count, partition_id, partition_size);
CHECK(edge_count == 84);
}

SECTION("partition size 5")
{
uint32_t partition_size = 5;

uint32_t partition_id = 0;
uint64_t edge_count = partition_batch_count(header.edge_count, partition_id, partition_size);
CHECK(edge_count == 33);

partition_id = 1;
edge_count = partition_batch_count(header.edge_count, partition_id, partition_size);
CHECK(edge_count == 33);

partition_id = 2;
edge_count = partition_batch_count(header.edge_count, partition_id, partition_size);
CHECK(edge_count == 33);

partition_id = 3;
edge_count = partition_batch_count(header.edge_count, partition_id, partition_size);
CHECK(edge_count == 33);

partition_id = 4;
edge_count = partition_batch_count(header.edge_count, partition_id, partition_size);
CHECK(edge_count == 36);
}
}
44 changes: 0 additions & 44 deletions test/graph_input_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -595,50 +595,6 @@ TEST_CASE("read_binary_graph, undirected, unweighted, static")
CHECK(edge_source_0_does_not_exist);
}

TEST_CASE("partition_edge_count, on enzymes graph")
{
std::ifstream binary_graph(graph_path + unweighted_directed_graph_enzymes_bin);
BinaryGraphHeaderMetaDataV2 header = read_binary_graph_header(binary_graph);

SECTION("partition size 2")
{
uint32_t partition_size = 2;

uint32_t partition_id = 0;
uint64_t edge_count = partition_edge_count(header.edge_count, partition_id, partition_size);
CHECK(edge_count == 84);

partition_id = 1;
edge_count = partition_edge_count(header.edge_count, partition_id, partition_size);
CHECK(edge_count == 84);
}

SECTION("partition size 5")
{
uint32_t partition_size = 5;

uint32_t partition_id = 0;
uint64_t edge_count = partition_edge_count(header.edge_count, partition_id, partition_size);
CHECK(edge_count == 33);

partition_id = 1;
edge_count = partition_edge_count(header.edge_count, partition_id, partition_size);
CHECK(edge_count == 33);

partition_id = 2;
edge_count = partition_edge_count(header.edge_count, partition_id, partition_size);
CHECK(edge_count == 33);

partition_id = 3;
edge_count = partition_edge_count(header.edge_count, partition_id, partition_size);
CHECK(edge_count == 33);

partition_id = 4;
edge_count = partition_edge_count(header.edge_count, partition_id, partition_size);
CHECK(edge_count == 36);
}
}

TEST_CASE("read_binary_graph_partition, small weighted temporal, partition id 0, partition size 2")
{
WeightedTimestampedEdges32 timestamped_edges;
Expand Down
11 changes: 6 additions & 5 deletions test/mpi_graph_io_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "test_graph.h"

#include <gdsb/batcher.h>
#include <gdsb/mpi_error_handler.h>
#include <gdsb/mpi_graph_io.h>

Expand Down Expand Up @@ -335,7 +336,7 @@ TEST_CASE("MPI, all_read_binary_graph_partition, throws exception if file seek n

uint32_t partition_id = 0;
uint32_t partition_size = 2;
WeightedTimestampedEdges32 timestamped_edges(partition_edge_count(header.edge_count, partition_id, partition_size));
WeightedTimestampedEdges32 timestamped_edges(partition_batch_count(header.edge_count, partition_id, partition_size));
CHECK_THROWS_AS(mpi::all_read_binary_graph_partition(binary_graph.get(), header, &(timestamped_edges[0]),
sizeof(WeightedTimestampedEdge32),
mpi_timestamped_edge_t.get(), partition_id, partition_size),
Expand All @@ -355,7 +356,7 @@ TEST_CASE("MPI, all_read_binary_graph_partition, small weighted temporal, partit

uint32_t partition_id = 0;
uint32_t partition_size = 2;
WeightedTimestampedEdges32 timestamped_edges(partition_edge_count(header.edge_count, partition_id, partition_size));
WeightedTimestampedEdges32 timestamped_edges(partition_batch_count(header.edge_count, partition_id, partition_size));
auto const [vertex_count, edge_count] =
mpi::all_read_binary_graph_partition(binary_graph.get(), header, &(timestamped_edges[0]), sizeof(WeightedTimestampedEdge32),
mpi_timestamped_edge_t.get(), partition_id, partition_size);
Expand Down Expand Up @@ -409,7 +410,7 @@ TEST_CASE("MPI, all_read_binary_graph_partition, small weighted temporal, partit

uint32_t partition_id = 1;
uint32_t partition_size = 2;
WeightedTimestampedEdges32 timestamped_edges(partition_edge_count(header.edge_count, partition_id, partition_size));
WeightedTimestampedEdges32 timestamped_edges(partition_batch_count(header.edge_count, partition_id, partition_size));
auto const [vertex_count, edge_count] =
mpi::all_read_binary_graph_partition(binary_graph.get(), header, &(timestamped_edges[0]), sizeof(WeightedTimestampedEdge32),
mpi_timestamped_edge_t.get(), partition_id, partition_size);
Expand Down Expand Up @@ -471,7 +472,7 @@ TEST_CASE("MPI, all_read_binary_graph_partition, undirected, unweighted, static,

uint32_t partition_id = 0;
uint32_t partition_size = 4;
Edges32 edges(partition_edge_count(header.edge_count, partition_id, partition_size));
Edges32 edges(partition_batch_count(header.edge_count, partition_id, partition_size));
auto const [vertex_count, edge_count] =
mpi::all_read_binary_graph_partition(binary_graph.get(), header, &(edges[0]), sizeof(Edge32), mpi_edge_t.get(),
partition_id, partition_size);
Expand Down Expand Up @@ -542,7 +543,7 @@ TEST_CASE("MPI, all_read_binary_graph_partition, undirected, unweighted, static,

uint32_t partition_id = 0;
uint32_t partition_size = 1;
Edges32 edges(partition_edge_count(header.edge_count, partition_id, partition_size));
Edges32 edges(partition_batch_count(header.edge_count, partition_id, partition_size));
auto const [vertex_count, edge_count] =
mpi::all_read_binary_graph_partition(binary_graph.get(), header, &(edges[0]), sizeof(Edge32), mpi_edge_t.get(),
partition_id, partition_size);
Expand Down

0 comments on commit b13650c

Please sign in to comment.