Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory pool add #15

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ third-party

# Build
/*build*

EthanLavi marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ target_include_directories(rome PUBLIC
$<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>)
target_compile_definitions(rome PUBLIC ROME_LOG_LEVEL=${LOG_LEVEL})
# We are defining an external fmt package, we don't need spdlog's internal one
target_compile_definitions(rome PUBLIC SPDLOG_FMT_EXTERNAL=ON)
target_link_libraries(rome PUBLIC rome::protos rdma::ibverbs rdma::cm fmt::fmt std::coroutines)
target_link_libraries(rome PUBLIC absl::status absl::statusor absl::synchronization)

Expand Down
6 changes: 6 additions & 0 deletions DevDockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
FROM ubuntu:22.04
RUN apt-get update
RUN apt-get install libprotobuf-dev protobuf-compiler -y
RUN apt-get install cmake -y
RUN apt-get install clang-15 libabsl-dev librdmacm-dev libibverbs-dev libgtest-dev libbenchmark-dev libfmt-dev libspdlog-dev libgmock-dev -y
RUN apt-get install libc6-dev-i386 -y
29 changes: 19 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ librome uses:

For Ubuntu 22.04 the following packages can be installed through apt:

* libabsl-dev
* librdmacm-dev
* libibverbs-dev
* libgtest-dev
* libbenchmark-dev
* libfmt-dev
* libspdlog-dev
* protobuf-compiler
* libabsl-dev
* librdmacm-dev
* libibverbs-dev
* libgtest-dev
* libbenchmark-dev
* libfmt-dev
* libspdlog-dev
* protobuf-compiler
* libgmock-dev

`cicd/install_dependencies_ubuntu.sh` is a script for installing these on Ubuntu 22.04.
Expand All @@ -111,6 +111,17 @@ Make sure to clear your build directory if recompiling with a different compiler

`make install` will install librome in your default installation location or the directory passed through defining `CMAKE_INSTALL_PREFIX`.

## Docker

Create and run a docker container to emulate the build enviornment with the minimum dependencies installed

```{bash}
docker build --tag sss-dev --file DevDockerfile .
docker run --privileged --rm -v {MOUNT_DIR}:/home --name sss -it sss-dev
```

You can then develop from that container using the Dev Container extension so you can take full advantage of syntax highlighting and be able to build locally.

# Old Setup instructions (unsure if this still works)
The Dockerfile contains all the dependencies required by this project and handles automatically setting up the correct development environment.
There are enough comments in the Dockerfile itself to understand what is going on, but at a high level its main purpose is to install the tooling necessary to build the project.
Expand Down Expand Up @@ -150,5 +161,3 @@ One peculiarity for UTM's `davfs` setup is that it requires a username and passw
When prompted, just hit enter.
To avoid the prompt altogether, you can update `/etc/davfs2/secrets` to include a line for the hosted files.
In my configuration, I simply put the following line: `http://localhost:9843 user passwd`.

## Docker
10 changes: 5 additions & 5 deletions gladiators/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
add_executable(coroutines coroutines/main.cc)
target_link_libraries(coroutines PRIVATE rome::rome)
add_executable(coroutines_out coroutines/main.cc)
target_link_libraries(coroutines_out PRIVATE rome::rome)

add_executable(hello_world hello_world/main.cc)
target_link_libraries(hello_world PRIVATE rome::rome)
target_link_libraries(hello_world PRIVATE absl::flags absl::flags_parse)
add_executable(hello_world_out hello_world/main.cc)
target_link_libraries(hello_world_out PRIVATE rome::rome)
target_link_libraries(hello_world_out PRIVATE absl::flags absl::flags_parse)
61 changes: 61 additions & 0 deletions include/rome/rdma/connection_manager/connection.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#pragma once

#include <rdma/rdma_cma.h>

#include <cstdint>
#include <memory>

#include "rome/rdma/channel/rdma_accessor.h"
#include "rome/rdma/channel/rdma_channel.h"
#include "rome/rdma/channel/twosided_messenger.h"

namespace rome::rdma {

// Contains the necessary information for communicating between nodes. This
// class wraps a unique pointer to the `rdma_cm_id` that holds the QP used for
// communication, along with the `RdmaChannel` that represents the memory used
// for 2-sided message-passing.
template <typename Channel = RdmaChannel<EmptyRdmaMessenger, EmptyRdmaAccessor>>
class Connection {
public:
typedef Channel channel_type;

Connection()
: terminated_(false),
src_id_(std::numeric_limits<uint32_t>::max()),
dst_id_(std::numeric_limits<uint32_t>::max()),
channel_(nullptr) {}
Connection(uint32_t src_id, uint32_t dst_id,
std::unique_ptr<channel_type> channel)
: terminated_(false),
src_id_(src_id),
dst_id_(dst_id),
channel_(std::move(channel)) {}

Connection(const Connection&) = delete;
Connection(Connection&& c)
: terminated_(c.terminated_),
src_id_(c.src_id_),
dst_id_(c.dst_id_),
channel_(std::move(c.channel_)) {}

// Getters.
inline bool terminated() const { return terminated_; }
uint32_t src_id() const { return src_id_; }
uint32_t dst_id() const { return dst_id_; }
rdma_cm_id* id() const { return channel_->id(); }
channel_type* channel() const { return channel_.get(); }

void Terminate() { terminated_ = true; }

private:
volatile bool terminated_;

uint32_t src_id_;
uint32_t dst_id_;

// Remotely accessible memory that is used for 2-sided message-passing.
std::unique_ptr<channel_type> channel_;
};

} // namespace rome::rdma
169 changes: 169 additions & 0 deletions include/rome/rdma/connection_manager/connection_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
#pragma once

#include <infiniband/verbs.h>
#include <rdma/rdma_cma.h>

#include <chrono>
#include <limits>
#include <memory>
#include <optional>
#include <string_view>
#include <thread>
#include <unordered_set>

#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "connection.h"
#include "rome/rdma/channel/rdma_accessor.h"
#include "rome/rdma/channel/rdma_channel.h"
#include "rome/rdma/channel/twosided_messenger.h"
#include "rome/rdma/rdma_broker.h"
#include "rome/rdma/rdma_device.h"
#include "rome/rdma/rdma_memory.h"
#include "rome/rdma/rdma_receiver.h"
#include "rome/util/coroutine.h"

namespace rome::rdma {

template <typename ChannelType>
class ConnectionManager : public RdmaReceiverInterface {
public:
typedef Connection<ChannelType> conn_type;

~ConnectionManager();
explicit ConnectionManager(uint32_t my_id);

absl::Status Start(std::string_view addr, std::optional<uint16_t> port);

// Getters.
std::string address() const { return broker_->address(); }
uint16_t port() const { return broker_->port(); }
ibv_pd* pd() const { return broker_->pd(); }

int GetNumConnections() {
Acquire(my_id_);
int size = established_.size();
Release();
return size;
}

// `RdmaReceiverInterface` implementaiton
void OnConnectRequest(rdma_cm_id* id, rdma_cm_event* event) override;
void OnEstablished(rdma_cm_id* id, rdma_cm_event* event) override;
void OnDisconnect(rdma_cm_id* id) override;

// `RdmaClientInterface` implementation
absl::StatusOr<conn_type*> Connect(uint32_t node_id, std::string_view server,
uint16_t port);

absl::StatusOr<conn_type*> GetConnection(uint32_t node_id);

void Shutdown();

private:
// The size of each memory region dedicated to a single connection.
static constexpr int kCapacity = 1 << 12; // 4 KiB
static constexpr int kMaxRecvBytes = 64;

static constexpr int kMaxWr = kCapacity / kMaxRecvBytes;
static constexpr int kMaxSge = 1;
static constexpr int kMaxInlineData = 0;

static constexpr char kPdId[] = "ConnectionManager";

static constexpr int kUnlocked = -1;

static constexpr uint32_t kMinBackoffUs = 100;
static constexpr uint32_t kMaxBackoffUs = 5000000;

// Each `rdma_cm_id` can be associated with some context, which is represented
// by `IdContext`. `node_id` is the numerical identifier for the peer node of
// the connection and `conn_param` is used to provide private data during the
// connection set up to send the local node identifier upon connection setup.
struct IdContext {
uint32_t node_id;
rdma_conn_param conn_param;
ChannelType* channel;

static inline uint32_t GetNodeId(void* ctx) {
return reinterpret_cast<IdContext*>(ctx)->node_id;
}

static inline ChannelType* GetRdmaChannel(void* ctx) {
return reinterpret_cast<IdContext*>(ctx)->channel;
}
};

// Lock acquisition will spin until either the lock is acquired successfully
// or the locker is an outgoing connection request from this node.
inline bool Acquire(int peer_id) {
for (int expected = kUnlocked;
!mu_.compare_exchange_weak(expected, peer_id); expected = kUnlocked) {
if (expected == my_id_) {
ROME_DEBUG(
"[Acquire] (Node {}) Giving up lock acquisition: actual={}, "
"swap={}",
my_id_, expected, peer_id);
return false;
}
}
return true;
}

inline void Release() { mu_ = kUnlocked; }

constexpr ibv_qp_init_attr DefaultQpInitAttr() {
ibv_qp_init_attr init_attr;
std::memset(&init_attr, 0, sizeof(init_attr));
init_attr.cap.max_send_wr = init_attr.cap.max_recv_wr = kMaxWr;
init_attr.cap.max_send_sge = init_attr.cap.max_recv_sge = kMaxSge;
init_attr.cap.max_inline_data = kMaxInlineData;
init_attr.sq_sig_all = 0; // Must request completions.
init_attr.qp_type = IBV_QPT_RC;
return init_attr;
}

constexpr ibv_qp_attr DefaultQpAttr() {
ibv_qp_attr attr;
std::memset(&attr, 0, sizeof(attr));
attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ |
IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_ATOMIC;
attr.max_dest_rd_atomic = 8;
attr.path_mtu = IBV_MTU_4096;
attr.min_rnr_timer = 12;
attr.rq_psn = 0;
attr.sq_psn = 0;
attr.timeout = 12;
attr.retry_cnt = 7;
attr.rnr_retry = 1;
attr.max_rd_atomic = 8;
return attr;
}

absl::StatusOr<conn_type*> ConnectLoopback(rdma_cm_id* id);

// Whether or not to stop handling requests.
volatile bool accepting_;

// Current status
absl::Status status_;

uint32_t my_id_;
std::unique_ptr<RdmaBroker> broker_;
ibv_pd* pd_; // Convenience ptr to protection domain of `broker_`

// Maintains connection information for a given Internet address. A connection
// manager only maintains a single connection per node. Nodes are identified
// by a string representing their IP address.
std::atomic<int> mu_;
std::unordered_map<uint32_t, std::unique_ptr<conn_type>> requested_;
std::unordered_map<uint32_t, std::unique_ptr<conn_type>> established_;

uint32_t backoff_us_{0};

rdma_cm_id* loopback_id_ = nullptr;
};

} // namespace rome::rdma

#include "connection_manager_impl.h"
Loading