Skip to content

Commit

Permalink
snapshot (working pinger for 3 peers)
Browse files Browse the repository at this point in the history
  • Loading branch information
ferrol aderholdt committed Nov 19, 2024
1 parent 8f97cc6 commit f99fa2a
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 29 deletions.
12 changes: 9 additions & 3 deletions src/components/tl/ucp/tl_ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ ucc_status_t ucc_tl_ucp_connect_team_ep(ucc_tl_ucp_team_t *team,
: TL_UCP_EP_ADDR_WORKER(addr);

/* FERROL: setup pinger here */
if (ctx->pinger && ctx->pinger_peer[core_rank] == NULL) {
if (ctx->pinger && grank != core_rank && ctx->pinger_peer[core_rank] == NULL) {
int ret = 0;
paddr = (TL_UCP_EP_ADDR_ONESIDED_INFO(paddr, ctx));
paddr -= sizeof(struct pinger_attr);
pattr = (struct pinger_attr *)paddr;
Expand All @@ -74,9 +75,14 @@ ucc_status_t ucc_tl_ucp_connect_team_ep(ucc_tl_ucp_team_t *team,
memcpy(&peer_attr.sin, &pattr->sin, sizeof(struct sockaddr_in));
peer_attr.port = pattr->port;
printf("[%d] connecting to %d\n", grank, core_rank);
pinger_connect(ctx->pinger, &peer_attr, &ctx->pinger_peer[core_rank]);
ret = pinger_connect(ctx->pinger, &peer_attr, &ctx->pinger_peer[core_rank]);
while (ret != 0) {
//back off and try again
sleep(1);
tl_debug(UCC_TL_UCP_TEAM_LIB(team), "[%d] backing off to reconnect to %d\n", grank, core_rank);
ret = pinger_connect(ctx->pinger, &peer_attr, &ctx->pinger_peer[core_rank]);
}
printf("[%d] connected to %d\n", grank, core_rank);
sleep(1);
}
return ucc_tl_ucp_connect_ep(ctx, use_service_worker, ep, addr);
}
Expand Down
64 changes: 38 additions & 26 deletions src/components/tl/ucp/tl_ucp_pinger.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
#include "my_ibv_helper.h"
#include "tl_ucp_pinger.h"

static int debug = 1;
static int debug = 0;
static int debug_fast_path = 0;
#define DEBUG_LOG if (debug) printf
#define DEBUG_LOG_FAST_PATH if (debug_fast_path) printf
Expand Down Expand Up @@ -139,7 +139,7 @@ struct dcping_cb {
};

struct pinger {
struct dcping_cb client;
struct dcping_cb *client;
struct dcping_cb server;

int npeers;
Expand Down Expand Up @@ -643,7 +643,7 @@ static int dcping_bind_server(struct dcping_cb *cb)
DEBUG_LOG("rdma_bind_addr successful on address: <%s:%d>\n", str, be16toh(cb->port));

DEBUG_LOG("rdma_listen\n");
ret = rdma_listen(cb->cm_id, 3);
ret = rdma_listen(cb->cm_id, 10);
if (ret) {
perror("rdma_listen");
return ret;
Expand All @@ -659,10 +659,10 @@ static int dcping_bind_server(struct dcping_cb *cb)
return 0;
}

static void free_cb(struct dcping_cb *cb)
/*static void free_cb(struct dcping_cb *cb)
{
free(cb);
}
}*/

static int dcping_client_dc_send_wr(struct dcping_cb *cb, uint64_t wr_id, struct dcping_rdma_info *info);
static int dcping_client_get_cqe_tiemstmp(struct dcping_cb *cb, uint64_t wr_id, uint64_t *ts_hw_start, uint64_t *ts_hw_end);
Expand All @@ -672,27 +672,28 @@ int do_pings(struct pinger *p)
int i, ret;
int npeers = p->next_peer;

//printf("pinging %d peers\n", npeers);
// printf("pinging %d peers\n", npeers);

for (i = 0; i < npeers; i++) {
/* post ping */
ret = dcping_client_dc_send_wr(&p->client, i, &p->remote_buf_info[i]);
// printf("posting ping for %d\n", i);
ret = dcping_client_dc_send_wr(&p->client[i], i, &p->remote_buf_info[i]);
assert(!ret);
}

uint64_t ts_hw_start, ts_hw_end, rtt_nsec, rtt_hw;
for (i = 0; i < npeers; i++) {
/* complete ping */
ret = dcping_client_get_cqe_tiemstmp(&p->client, i, &ts_hw_start, &ts_hw_end);
ret = dcping_client_get_cqe_tiemstmp(&p->client[i], i, &ts_hw_start, &ts_hw_end);
if (ret) {
DEBUG_LOG("cqe processing failed (peer %d)\n", i);
continue;
}

rtt_hw = ts_hw_end - ts_hw_start;
rtt_nsec = rtt_hw * USEC_PER_SEC / p->client.hw_clocks_kHz;
rtt_nsec = rtt_hw * USEC_PER_SEC / p->client[i].hw_clocks_kHz;
p->rtts[i] = rtt_nsec;
//printf("peer %d rtt: %ld.%3.3ld\n", i, rtt_nsec/1000, rtt_nsec%1000);
// printf("peer %d rtt: %ld.%3.3ld\n", i, rtt_nsec/1000, rtt_nsec%1000);
}

usleep(p->ping_interval_us);
Expand Down Expand Up @@ -756,6 +757,7 @@ static int dcping_run_server(struct pinger *p)
ret = dcping_setup_qp(cb, req_cm_id);
if (ret) {
fprintf(stderr, "setup_qp failed: %d\n", ret);
abort();
return ret;
}

Expand All @@ -771,6 +773,7 @@ static int dcping_run_server(struct pinger *p)

struct rdma_conn_param conn_param;
dcping_init_conn_param(cb, req_cm_id, &conn_param);
DEBUG_LOG("accepting rdma\n");
ret = rdma_accept(req_cm_id, &conn_param);
if (ret) {
perror("rdma_accept");
Expand Down Expand Up @@ -805,7 +808,7 @@ static int dcping_run_server(struct pinger *p)
dcping_free_buffers(cb);
err1:
dcping_free_qp(cb);

abort();
return ret;
}

Expand Down Expand Up @@ -1157,7 +1160,8 @@ int pinger_create(struct pinger_attr *attr, pinger_t *pinger)
p->rtts = calloc(attr->npeers, sizeof(pinger_rtt_t));
assert(p->rtts);

p->remote_buf_info = calloc(attr->npeers, sizeof(*(p->remote_buf_info)));
printf("allocating for %d peers\n", attr->npeers);
p->remote_buf_info = calloc(attr->npeers + 1, sizeof(*(p->remote_buf_info)));
assert(p->remote_buf_info);

p->server.cm_channel = create_first_event_channel();
Expand All @@ -1171,6 +1175,8 @@ int pinger_create(struct pinger_attr *attr, pinger_t *pinger)
return -1;
}

p->client = calloc(attr->npeers, sizeof(struct dcping_cb));

ret = pthread_create(&p->server_thread,
NULL, pinger_server_func, (void *)p);

Expand All @@ -1194,12 +1200,12 @@ int pinger_destroy(pinger_t *pinger)

rdma_destroy_id(p->server.cm_id);
rdma_destroy_event_channel(p->server.cm_channel);
rdma_destroy_id(p->client.cm_id);
rdma_destroy_event_channel(p->client.cm_channel);
rdma_destroy_id(p->client[0].cm_id);
rdma_destroy_event_channel(p->client[0].cm_channel);
free(p->rtts);
free(p->remote_buf_info);
free_cb(&p->server);
free_cb(&p->client);
//free_cb(&p->server);
//free_cb(&p->client);

return 0;
}
Expand All @@ -1209,45 +1215,51 @@ int pinger_connect(pinger_t pinger, struct pinger_peer_attr *attr, pinger_pid_t
struct pinger *p = (struct pinger *)pinger;
int ret;

memcpy(&p->client.sin, &attr->sin, sizeof(struct sockaddr_storage));
p->client.port = htobe16(attr->port);
memcpy(&p->client[p->next_peer].sin, &attr->sin, sizeof(struct sockaddr_storage));
p->client[p->next_peer].port = htobe16(attr->port);

p->client.cm_channel = create_first_event_channel();
if (!p->client.cm_channel) {
p->client[p->next_peer].cm_channel = create_first_event_channel();
if (!p->client[p->next_peer].cm_channel) {
return -1;
}

ret = rdma_create_id(p->client.cm_channel, &p->client.cm_id, &p->client, RDMA_PS_TCP);
printf("create id\n");
ret = rdma_create_id(p->client[p->next_peer].cm_channel, &p->client[p->next_peer].cm_id, &p->client[p->next_peer], RDMA_PS_TCP);
if (ret) {
perror("rdma_create_id");
return -1;
}

ret = dcping_bind_client(p, &p->client);
printf("bind client\n");
ret = dcping_bind_client(p, &p->client[p->next_peer]);
if (ret)
return ret;

ret = dcping_setup_qp(&p->client, NULL);
printf("setup qp\n");
ret = dcping_setup_qp(&p->client[p->next_peer], NULL);
if (ret) {
fprintf(stderr, "setup_qp failed: %d\n", ret);
return ret;
}

ret = dcping_setup_buffers(&p->client);
printf("setup buffers\n");
ret = dcping_setup_buffers(&p->client[p->next_peer]);
if (ret) {
fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
return ret;
}

ret = dcping_connect_client(p, &p->client);
printf("connect client\n");
ret = dcping_connect_client(p, &p->client[p->next_peer]);
if (ret) {
fprintf(stderr, "connect error %d\n", ret);
return ret;
}

printf("connected\n");
*peer = (void *)p->next_peer++;

printf("DONE CONNECTING\n");
printf("DONE CONNECTING, next peer %ld\n", p->next_peer);
return 0;
}

Expand Down

0 comments on commit f99fa2a

Please sign in to comment.