Skip to content

Commit f400075

Browse files
committedSep 16, 2020
UCP/TAG: Add debug info for eager messages
1 parent f1239d7 commit f400075

File tree

10 files changed

+90
-32
lines changed

10 files changed

+90
-32
lines changed
 

‎src/ucp/core/ucp_request.inl

+2
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ ucp_request_complete_tag_recv(ucp_worker_h worker, ucp_request_t *req,
113113
entry->send_tag = req->recv.tag.info.sender_tag;
114114
entry->status = state;
115115
entry->recvd_size = req->recv.tag.info.length;
116+
memcpy(entry->udata, req->recv.buffer,
117+
ucs_min(UCP_TAG_MAX_DATA, req->recv.tag.info.length));
116118
}
117119

118120
UCS_PROFILE_REQUEST_EVENT(req, "complete_recv", status);

‎src/ucp/proto/proto_am.c

+1
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ void ucp_proto_am_zcopy_req_complete(ucp_request_t *req, ucs_status_t status)
8888
{
8989
ucs_assert(req->send.state.uct_comp.count == 0);
9090
ucp_request_send_buffer_dereg(req); /* TODO register+lane change */
91+
ucp_send_request_update_data(req, "sent_zcopy");
9192
ucp_request_complete_send(req, status);
9293
}
9394

‎src/ucp/proto/proto_am.inl

+17
Original file line numberDiff line numberDiff line change
@@ -490,4 +490,21 @@ ucp_proto_ssend_ack_request_alloc(ucp_worker_h worker, uintptr_t ep_ptr)
490490
return req;
491491
}
492492

493+
static inline void
494+
ucp_send_request_update_data(ucp_request_t *req, const char *status)
495+
{
496+
ucp_worker_h worker = req->send.ep->worker;
497+
ucp_tag_rndv_debug_entry_t *entry;
498+
499+
if (ucs_unlikely(worker->tm.rndv_debug.queue_length == 0)) {
500+
return;
501+
}
502+
503+
entry = ucp_worker_rndv_debug_entry(worker, req->send.rndv_req_id);
504+
entry->status = status;
505+
memcpy(entry->ndata, req->send.buffer,
506+
ucs_min(UCP_TAG_MAX_DATA, req->send.length));
507+
}
508+
509+
493510
#endif

‎src/ucp/tag/eager_rcv.c

+18
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,21 @@ ucp_eager_offload_handler(void *arg, void *data, size_t length,
7171
return status;
7272
}
7373

74+
static void ucp_recv_request_update_data(ucp_request_t *req, const char *status,
75+
void *data, size_t length)
76+
{
77+
ucp_worker_h worker = req->recv.worker;
78+
ucp_tag_rndv_debug_entry_t *entry;
79+
80+
if (ucs_unlikely(worker->tm.rndv_debug.queue_length == 0)) {
81+
return;
82+
}
83+
84+
entry = ucp_worker_rndv_debug_entry(worker, req->recv.req_id);
85+
entry->status = status;
86+
memcpy(entry->ndata, data, ucs_min(UCP_TAG_MAX_DATA, length));
87+
}
88+
7489
static UCS_F_ALWAYS_INLINE ucs_status_t
7590
ucp_eager_tagged_handler(void *arg, void *data, size_t length, unsigned am_flags,
7691
uint16_t flags, uint16_t hdr_len, uint16_t priv_length)
@@ -98,6 +113,9 @@ ucp_eager_tagged_handler(void *arg, void *data, size_t length, unsigned am_flags
98113
if (req != NULL) {
99114
ucp_eager_expected_handler(worker, req, data, recv_len, recv_tag, flags);
100115

116+
ucp_recv_request_update_data(req, "eager_recv",
117+
UCS_PTR_BYTE_OFFSET(data, hdr_len), recv_len);
118+
101119
if (flags & UCP_RECV_DESC_FLAG_EAGER_SYNC) {
102120
ucp_tag_eager_sync_send_ack(worker, data, flags);
103121
}

‎src/ucp/tag/eager_snd.c

+3
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ static ucs_status_t ucp_tag_eager_contig_short(uct_pending_req_t *self)
133133
return status;
134134
}
135135

136+
ucp_send_request_update_data(req, "sent_short");
136137
ucp_request_complete_send(req, UCS_OK);
137138
return UCS_OK;
138139
}
@@ -144,6 +145,7 @@ static ucs_status_t ucp_tag_eager_bcopy_single(uct_pending_req_t *self)
144145
if (status == UCS_OK) {
145146
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
146147
ucp_request_send_generic_dt_finish(req);
148+
ucp_send_request_update_data(req, "sent_bcopy_s");
147149
ucp_request_complete_send(req, UCS_OK);
148150
}
149151
return status;
@@ -159,6 +161,7 @@ static ucs_status_t ucp_tag_eager_bcopy_multi(uct_pending_req_t *self)
159161
if (status == UCS_OK) {
160162
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
161163
ucp_request_send_generic_dt_finish(req);
164+
ucp_send_request_update_data(req, "sent_bcopy_m");
162165
ucp_request_complete_send(req, UCS_OK);
163166
} else if (status == UCP_STATUS_PENDING_SWITCH) {
164167
status = UCS_OK;

‎src/ucp/tag/rndv.c

+13-29
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ static void ucp_rndv_complete_send(ucp_request_t *sreq, ucs_status_t status)
6262
sreq, sreq->send.rndv_req_id, worker);
6363
}
6464

65+
ucp_send_request_update_data(sreq, "rndv_done");
6566
ucp_request_complete_send(sreq, status);
6667
}
6768

@@ -117,30 +118,13 @@ ucp_rndv_get_req_add_debug_entry(ucp_request_t *rndv_req,
117118
entry->send_req = NULL;
118119
}
119120

120-
/* add debug entry for rndv send flow */
121-
static void
122-
ucp_rndv_send_add_debug_entry(ucp_request_t *req)
123-
{
124-
ucp_tag_rndv_debug_entry_t *entry;
125-
126-
entry = ucp_rndv_add_debug_entry_common(req);
127-
entry->type = "rndv_send";
128-
entry->rts_seq = 0;
129-
entry->send_tag = req->send.msg_proto.tag.tag;
130-
entry->recv_tag = 0;
131-
entry->remote_address = 0;
132-
entry->remote_reqptr = 0;
133-
entry->rndv_get_req = NULL;
134-
entry->recv_req = NULL;
135-
entry->send_req = req;
136-
}
137-
138121
/* to be used from debugger */
139122
void ucp_rndv_print_debug_data(ucp_worker_h worker, const char *filename,
140123
ucp_tag_t send_tag)
141124
{
142125
ucp_tag_rndv_debug_entry_t *entry;
143126
size_t i, count;
127+
uint64_t ndata, udata;
144128
FILE *file;
145129

146130
if (filename == NULL) {
@@ -159,13 +143,18 @@ void ucp_rndv_print_debug_data(ucp_worker_h worker, const char *filename,
159143
if ((send_tag != 0) && (send_tag != entry->send_tag)) {
160144
continue;
161145
}
146+
memcpy(&udata, entry->udata, sizeof(udata));
147+
memcpy(&ndata, entry->ndata, sizeof(ndata));
162148
fprintf(file,
163-
"%s id %lu rts_seq %lu stag 0x%lx rtag 0x%lx rva 0x%lx rmreq 0x%lx "
164-
"lva %p sz %zu greq %p rreq %p sreq %p\n",
165-
entry->type, entry->id, entry->rts_seq, entry->send_tag,
166-
entry->recv_tag, entry->remote_address, entry->remote_reqptr,
167-
entry->local_address, entry->size, entry->rndv_get_req,
168-
entry->recv_req, entry->send_req);
149+
"id %lu %s st '%s' rts_seq %lu pend %u stag 0x%lx rtag 0x%lx rva 0x%lx "
150+
"rmreq %lu lva %p sz %zu rxsz %zu greq %p sreq %p rreq %p "
151+
"udata 0x%"PRIx64" ndata 0x%"PRIx64"\n",
152+
entry->id, entry->type, entry->status, entry->rts_seq,
153+
entry->pending_count, entry->send_tag, entry->recv_tag,
154+
entry->remote_address, entry->remote_reqptr,
155+
entry->local_address, entry->size, entry->recvd_size,
156+
entry->rndv_get_req, entry->send_req, entry->recv_req,
157+
udata, ndata);
169158
}
170159

171160
if (filename != NULL) {
@@ -378,11 +367,6 @@ ucs_status_t ucp_tag_send_start_rndv(ucp_request_t *sreq)
378367

379368
sreq->flags |= UCP_REQUEST_FLAG_SEND_RNDV;
380369

381-
sreq->send.rndv_req_id = worker->rndv_req_id++;
382-
if (ucs_unlikely(worker->tm.rndv_debug.queue_length > 0)) {
383-
ucp_rndv_send_add_debug_entry(sreq);
384-
}
385-
386370
status = ucp_ep_resolve_dest_ep_ptr(ep, sreq->send.lane);
387371
if (status != UCS_OK) {
388372
return status;

‎src/ucp/tag/tag_match.h

+4-1
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,13 @@ typedef union {
5353

5454
KHASH_INIT(ucp_tag_frag_hash, uint64_t, ucp_tag_frag_match_t, 1,
5555
kh_int64_hash_func, kh_int64_hash_equal);
56+
#define UCP_TAG_MAX_DATA 8
5657

5758

5859
typedef struct ucp_tag_rndv_debug_entry {
60+
uint64_t id;
5961
const char *type;
6062
const char *status;
61-
uint64_t id;
6263
uint64_t rts_seq;
6364
unsigned pending_count;
6465
ucp_ep_h ep;
@@ -72,6 +73,8 @@ typedef struct ucp_tag_rndv_debug_entry {
7273
ucp_request_t *rndv_get_req;
7374
ucp_request_t *send_req;
7475
ucp_request_t *recv_req;
76+
uint8_t udata[UCP_TAG_MAX_DATA];
77+
uint8_t ndata[UCP_TAG_MAX_DATA]; /* data for the network */
7578
} ucp_tag_rndv_debug_entry_t;
7679

7780

‎src/ucp/tag/tag_recv.c

+3-1
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@ ucp_tag_recv_request_completed(ucp_worker_h worker, ucp_request_t *req,
3232
ucp_tag_rndv_debug_entry_t *entry =
3333
ucp_worker_rndv_debug_entry(worker, req->recv.req_id);
3434
entry->send_tag = info->sender_tag;
35-
entry->status = "recv_completed1";
35+
entry->status = "recv_completed_unexp";
3636
entry->recvd_size = info->length;
37+
memcpy(entry->udata, req->recv.buffer,
38+
ucs_min(UCP_TAG_MAX_DATA, info->length));
3739
}
3840

3941
req->status = status;

‎src/ucp/tag/tag_send.c

+28-1
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,34 @@ ucp_tag_send_req(ucp_request_t *req, size_t dt_count,
120120
return req + 1;
121121
}
122122

123+
static void ucp_tag_send_add_debug_entry(ucp_request_t *req)
124+
{
125+
ucp_tag_rndv_debug_entry_t *entry = ucp_worker_rndv_debug_entry(req->send.ep->worker,
126+
req->send.rndv_req_id);
127+
entry->id = req->send.rndv_req_id;
128+
entry->type = "tag_send";
129+
entry->ep = req->send.ep;
130+
entry->local_address = req->send.buffer;
131+
entry->size = req->send.length;
132+
entry->rts_seq = 0;
133+
entry->send_tag = req->send.msg_proto.tag.tag;
134+
entry->recv_tag = 0;
135+
entry->remote_address = 0;
136+
entry->remote_reqptr = 0;
137+
entry->rndv_get_req = NULL;
138+
entry->recv_req = NULL;
139+
entry->send_req = req;
140+
memcpy(entry->udata, req->send.buffer,
141+
ucs_min(UCP_TAG_MAX_DATA, req->send.length));
142+
}
143+
123144
static UCS_F_ALWAYS_INLINE void
124145
ucp_tag_send_req_init(ucp_request_t* req, ucp_ep_h ep, const void* buffer,
125146
uintptr_t datatype, size_t count, ucp_tag_t tag,
126147
uint32_t flags)
127148
{
149+
ucp_worker_h worker = ep->worker;
150+
128151
req->flags = flags | UCP_REQUEST_FLAG_SEND_TAG;
129152
req->send.ep = ep;
130153
req->send.buffer = (void*)buffer;
@@ -139,7 +162,11 @@ ucp_tag_send_req_init(ucp_request_t* req, ucp_ep_h ep, const void* buffer,
139162
req->send.length);
140163
req->send.lane = ucp_ep_config(ep)->tag.lane;
141164
req->send.pending_lane = UCP_NULL_LANE;
142-
req->send.rndv_req_id = 0;
165+
req->send.rndv_req_id = worker->rndv_req_id++;
166+
167+
if (ucs_unlikely(worker->tm.rndv_debug.queue_length > 0)) {
168+
ucp_tag_send_add_debug_entry(req);
169+
}
143170
}
144171

145172
//static UCS_F_ALWAYS_INLINE int

‎test/apps/iodemo/io_demo.cc

+1
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ class P2pDemoCommon : public UcxContext {
189189
for (size_t i = 0; i < _data_buffers.size(); ++i) {
190190
std::string &data_buffer = _data_buffers[i];
191191
data_buffer.resize(opts().max_data_size + ALIGNMENT);
192+
std::fill(data_buffer.begin(), data_buffer.end(), 'A');
192193
uintptr_t ptr = (uintptr_t)&data_buffer[0];
193194
_padding = ((ptr + ALIGNMENT - 1) & ~(ALIGNMENT - 1)) - ptr;
194195
}

0 commit comments

Comments
 (0)
Please sign in to comment.