Skip to content

Commit

Permalink
UDP: add some statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
gdamore committed Oct 12, 2024
1 parent 0f7ad00 commit 782d817
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 10 deletions.
2 changes: 2 additions & 0 deletions include/nng/nng.h
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,8 @@ NNG_DECL nng_listener nng_pipe_listener(nng_pipe);
// low order 16 bits will be set. This is provided in native byte order,
// which makes it more convenient than using the NNG_OPT_LOCADDR option.
#define NNG_OPT_TCP_BOUND_PORT "tcp-bound-port"
// UDP alias for convenience uses the same value
#define NNG_OPT_UDP_BOUND_PORT NNG_OPT_TCP_BOUND_PORT

// IPC options. These will largely vary depending on the platform,
// as POSIX systems have very different options than Windows.
Expand Down
111 changes: 101 additions & 10 deletions src/sp/transport/udp/udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "core/message.h"
#include "core/nng_impl.h"
#include "core/options.h"
#include "core/pipe.h"
#include "core/platform.h"
#include "nng/nng.h"

Expand Down Expand Up @@ -219,6 +220,15 @@ struct udp_ep {

#ifdef NNG_ENABLE_STATS
nni_stat_item st_rcv_max;
nni_stat_item st_rcv_reorder;
nni_stat_item st_rcv_toobig;
nni_stat_item st_rcv_nomatch;
nni_stat_item st_rcv_copy;
nni_stat_item st_rcv_nocopy;
nni_stat_item st_rcv_nobuf;
nni_stat_item st_snd_toobig;
nni_stat_item st_snd_nobuf;
nni_stat_item st_peer_inactive;
#endif
};

Expand Down Expand Up @@ -363,9 +373,12 @@ udp_check_pipe_sequence(udp_pipe *p, uint32_t seq)
delta = (int32_t) (seq - p->peer_seq);
if (delta < 0) {
// out of order delivery
nni_stat_inc(&p->ep->st_rcv_reorder, 1);

Check warning on line 376 in src/sp/transport/udp/udp.c

View check run for this annotation

Codecov / codecov/patch

src/sp/transport/udp/udp.c#L376

Added line #L376 was not covered by tests
return (false);
}
// TODO: bump a stat for misses if delta > 0.
if (delta > 0) {
nni_stat_inc(&p->ep->st_rcv_reorder, 1);
}
p->peer_seq = seq + 1; // expected next sequence number
return (true);
}
Expand Down Expand Up @@ -452,7 +465,7 @@ udp_queue_tx(udp_ep *ep, nng_sockaddr *sa, udp_sp_msg *msg, nni_msg *payload)

if (ring->count == ring->size || !ep->started) {
// ring is full
// TODO: bump a stat
nni_stat_inc(&ep->st_snd_nobuf, 1);

Check warning on line 468 in src/sp/transport/udp/udp.c

View check run for this annotation

Codecov / codecov/patch

src/sp/transport/udp/udp.c#L468

Added line #L468 was not covered by tests
if (payload != NULL) {
nni_msg_free(payload);
}
Expand Down Expand Up @@ -621,7 +634,7 @@ udp_recv_data(udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa)
// NB: Peer ID endianness does not matter, as long we use it
// consistently.
if ((p = udp_find_pipe(ep, peer_id, send_id)) == NULL) {
// TODO: Bump a stat...
nni_stat_inc(&ep->st_rcv_nomatch, 1);

Check warning on line 637 in src/sp/transport/udp/udp.c

View check run for this annotation

Codecov / codecov/patch

src/sp/transport/udp/udp.c#L637

Added line #L637 was not covered by tests
udp_send_disc_full(ep, sa, send_id, peer_id, 0, DISC_NOTCONN);
// Question: how do we store the sockaddr for that?
return;
Expand All @@ -637,6 +650,7 @@ udp_recv_data(udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa)
// Make sure the message wasn't truncated, and that it fits within
// our maximum agreed upon payload.
if ((dreq->us_length > len) || (dreq->us_length > p->rcvmax)) {
nni_stat_inc(&ep->st_rcv_toobig, 1);

Check warning on line 653 in src/sp/transport/udp/udp.c

View check run for this annotation

Codecov / codecov/patch

src/sp/transport/udp/udp.c#L653

Added line #L653 was not covered by tests
udp_send_disc(ep, p, DISC_MSGSIZE);
return;
}
Expand All @@ -652,32 +666,37 @@ udp_recv_data(udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa)

if (!udp_check_pipe_sequence(p, dreq->us_sequence)) {
// out of order delivery, drop it
// TODO: bump a stat
return;
}

if (nni_lmq_full(&p->rx_mq)) {
// bump a NOBUF stat
nni_stat_inc(&ep->st_rcv_nobuf, 1);

Check warning on line 673 in src/sp/transport/udp/udp.c

View check run for this annotation

Codecov / codecov/patch

src/sp/transport/udp/udp.c#L673

Added line #L673 was not covered by tests
return;
}

// Short message, just alloc and copy
if (len <= ep->short_msg) {
nni_stat_inc(&ep->st_rcv_copy, 1);

Check warning on line 679 in src/sp/transport/udp/udp.c

View check run for this annotation

Codecov / codecov/patch

src/sp/transport/udp/udp.c#L679

Added line #L679 was not covered by tests
if (nng_msg_alloc(&msg, len) != 0) {
// TODO: bump a stat
if (p->npipe != NULL) {
nni_pipe_bump_error(p->npipe, NNG_ENOMEM);

Check warning on line 682 in src/sp/transport/udp/udp.c

View check run for this annotation

Codecov / codecov/patch

src/sp/transport/udp/udp.c#L682

Added line #L682 was not covered by tests
}
return;
}
nni_msg_set_address(msg, sa);
nni_msg_clear(msg);
nni_msg_append(msg, nni_msg_body(ep->rx_payload), len);
nni_lmq_put(&p->rx_mq, msg);
} else {
nni_stat_inc(&ep->st_rcv_nocopy, 1);
// Message size larger than copy break, do zero copy
msg = ep->rx_payload;
if (nng_msg_alloc(&ep->rx_payload,
ep->rcvmax + sizeof(ep->rx_msg)) != 0) {
// TODO: bump a stat
ep->rx_payload = msg; // make sure we put it back
if (p->npipe != NULL) {
nni_pipe_bump_error(p->npipe, NNG_ENOMEM);

Check warning on line 698 in src/sp/transport/udp/udp.c

View check run for this annotation

Codecov / codecov/patch

src/sp/transport/udp/udp.c#L698

Added line #L698 was not covered by tests
}
return;
}

Expand Down Expand Up @@ -900,8 +919,6 @@ udp_rx_cb(void *arg)
hdr->data.us_length = NNI_GET16LE(&hdr->data.us_length);
#endif

// TODO: verify that incoming type matches us!

switch (hdr->data.us_op_code) {
case OPCODE_DATA:
udp_recv_data(ep, &hdr->data, n, sa);
Expand Down Expand Up @@ -964,7 +981,7 @@ udp_pipe_send(void *arg, nni_aio *aio)
// floor. this is on the sender, so there isn't a compelling
// need to disconnect the pipe, since it we're not being
// "ill-behaved" to our peer.
// TODO: bump a stat
nni_stat_inc(&ep->st_snd_toobig, 1);
nni_msg_free(msg);
return;
}
Expand Down Expand Up @@ -1241,6 +1258,7 @@ udp_timer_cb(void *arg)
// This will probably not be received by the peer,
// since we aren't getting anything from them. But
// having it on the wire may help debugging later.
nni_stat_inc(&ep->st_peer_inactive, 1);

Check warning on line 1261 in src/sp/transport/udp/udp.c

View check run for this annotation

Codecov / codecov/patch

src/sp/transport/udp/udp.c#L1261

Added line #L1261 was not covered by tests
udp_send_disc(ep, p, DISC_INACTIVE);
continue;
}
Expand Down Expand Up @@ -1321,7 +1339,80 @@ udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock)
.si_unit = NNG_UNIT_BYTES,
.si_atomic = true,
};
static const nni_stat_info rcv_reorder_info = {
.si_name = "rcv_reorder",
.si_desc = "messages received out of order",
.si_type = NNG_STAT_COUNTER,
.si_unit = NNG_UNIT_MESSAGES,
.si_atomic = true,
};
static const nni_stat_info rcv_toobig_info = {
.si_name = "rcv_toobig",
.si_desc = "received messages rejected because too big",
.si_type = NNG_STAT_COUNTER,
.si_unit = NNG_UNIT_MESSAGES,
.si_atomic = true,
};
static const nni_stat_info rcv_nomatch_info = {
.si_name = "rcv_nomatch",
.si_desc = "received messages without a matching connection",
.si_type = NNG_STAT_COUNTER,
.si_unit = NNG_UNIT_MESSAGES,
.si_atomic = true,
};
static const nni_stat_info rcv_copy_info = {
.si_name = "rcv_copy",
.si_desc = "received messages copied (small)",
.si_type = NNG_STAT_COUNTER,
.si_unit = NNG_UNIT_MESSAGES,
.si_atomic = true,
};
static const nni_stat_info rcv_nocopy_info = {
.si_name = "rcv_nocopy",
.si_desc = "received messages zero copy (large)",
.si_type = NNG_STAT_COUNTER,
.si_unit = NNG_UNIT_MESSAGES,
.si_atomic = true,
};
static const nni_stat_info rcv_nobuf_info = {
.si_name = "rcv_nobuf",
.si_desc = "received messages dropped no buffer",
.si_type = NNG_STAT_COUNTER,
.si_unit = NNG_UNIT_MESSAGES,
.si_atomic = true,
};
static const nni_stat_info snd_toobig_info = {
.si_name = "snd_toobig",
.si_desc = "sent messages rejected because too big",
.si_type = NNG_STAT_COUNTER,
.si_unit = NNG_UNIT_MESSAGES,
.si_atomic = true,
};
static const nni_stat_info snd_nobuf_info = {
.si_name = "snd_nobuf",
.si_desc = "sent messages dropped no buffer",
.si_type = NNG_STAT_COUNTER,
.si_unit = NNG_UNIT_MESSAGES,
.si_atomic = true,
};
static const nni_stat_info peer_inactive_info = {
.si_name = "peer_inactive",
.si_desc = "connections closed due to inactive peer",
.si_type = NNG_STAT_COUNTER,
.si_unit = NNG_UNIT_EVENTS,
.si_atomic = true,
};

nni_stat_init(&ep->st_rcv_max, &rcv_max_info);
nni_stat_init(&ep->st_rcv_copy, &rcv_copy_info);
nni_stat_init(&ep->st_rcv_nocopy, &rcv_nocopy_info);
nni_stat_init(&ep->st_rcv_reorder, &rcv_reorder_info);
nni_stat_init(&ep->st_rcv_toobig, &rcv_toobig_info);
nni_stat_init(&ep->st_rcv_nomatch, &rcv_nomatch_info);
nni_stat_init(&ep->st_rcv_nobuf, &rcv_nobuf_info);
nni_stat_init(&ep->st_snd_toobig, &snd_toobig_info);
nni_stat_init(&ep->st_snd_nobuf, &snd_nobuf_info);
nni_stat_init(&ep->st_peer_inactive, &peer_inactive_info);
#endif

// schedule our timer callback - forever for now
Expand Down

0 comments on commit 782d817

Please sign in to comment.