Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[USM] Introduce tail cals for the Postgres monitoring #30547

Merged
merged 16 commits into from
Nov 4, 2024
Merged
Prev Previous commit
Next Next commit
reduced code duplication
amitslavin committed Oct 28, 2024
commit 62a906ede3ae5ad0a0fbba00770062a18377669d
176 changes: 65 additions & 111 deletions pkg/network/ebpf/c/protocols/postgres/decoding.h
Original file line number Diff line number Diff line change
@@ -180,61 +180,12 @@ static __always_inline void postgres_handle_parse_message(pktbuf_t pkt, conn_tup
return;
}

// Entrypoint to process plaintext Postgres traffic. Pulls the connection tuple and the packet buffer from the map and
// calls the main processing function. If the packet is a TCP termination, it calls the termination function.
// If the message is a parse message, it tail calls to the dedicated function to handle it as it is too large to be
// inlined in the main entrypoint. Otherwise, it calls the main processing function.
SEC("socket/postgres_process")
int socket__postgres_process(struct __sk_buff* skb) {
skb_info_t skb_info = {};
conn_tuple_t conn_tuple = {};

if (!fetch_dispatching_arguments(&conn_tuple, &skb_info)) {
return 0;
}

if (is_tcp_termination(&skb_info)) {
postgres_tcp_termination(&conn_tuple);
return 0;
}

normalize_tuple(&conn_tuple);

pktbuf_t pkt = pktbuf_from_skb(skb, &skb_info);
struct pg_message_header header;
if (!read_message_header(pkt, &header)) {
return 0;
}

// If the message is a parse message, we tail call to the dedicated function to handle it.
if (header.message_tag == POSTGRES_PARSE_MAGIC_BYTE) {
bpf_tail_call_compat(skb, &protocols_progs, PROG_POSTGRES_PROCESS_PARSE_MESSAGE);
return 0;
}

postgres_entrypoint(pkt, &conn_tuple, &header, NO_TAGS);
return 0;
}

SEC("socket/postgres_message_parser")
int socket__postgres_message_parser(struct __sk_buff* skb) {
const __u32 zero = 0;
skb_info_t skb_info = {};
conn_tuple_t conn_tuple = {};

if (!fetch_dispatching_arguments(&conn_tuple, &skb_info)) {
return 0;
}

if (is_tcp_termination(&skb_info)) {
postgres_tcp_termination(&conn_tuple);
return 0;
}

normalize_tuple(&conn_tuple);

pktbuf_t pkt = pktbuf_from_skb(skb, &skb_info);
struct pg_message_header header;
// A function for processing multiple messages per call up to POSTGRES_MAX_MESSAGES_PER_TAIL_CALL
// and handles tail call chaining for processing additional messages beyond this limit.
// (max POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES), serving both for plaintext and TLS Postgres traffic.
static __always_inline bool process_postgres_messages(pktbuf_t pkt, conn_tuple_t conn_tuple) {
const __u32 zero = 0;
struct pg_message_header header;
if (!read_message_header(pkt, &header)) {
return 0;
}
@@ -294,6 +245,64 @@ int socket__postgres_message_parser(struct __sk_buff* skb) {
return 0;
}

// Entrypoint to process plaintext Postgres traffic. Pulls the connection tuple and the packet buffer from the map and
// calls the main processing function. If the packet is a TCP termination, it calls the termination function.
// If the message is a parse message, it tail calls to the dedicated function to handle it as it is too large to be
// inlined in the main entrypoint. Otherwise, it calls the main processing function.
SEC("socket/postgres_process")
int socket__postgres_process(struct __sk_buff* skb) {
skb_info_t skb_info = {};
conn_tuple_t conn_tuple = {};

if (!fetch_dispatching_arguments(&conn_tuple, &skb_info)) {
return 0;
}

if (is_tcp_termination(&skb_info)) {
postgres_tcp_termination(&conn_tuple);
return 0;
}

normalize_tuple(&conn_tuple);

pktbuf_t pkt = pktbuf_from_skb(skb, &skb_info);
struct pg_message_header header;
if (!read_message_header(pkt, &header)) {
return 0;
}

// If the message is a parse message, we tail call to the dedicated function to handle it.
if (header.message_tag == POSTGRES_PARSE_MAGIC_BYTE) {
bpf_tail_call_compat(skb, &protocols_progs, PROG_POSTGRES_PROCESS_PARSE_MESSAGE);
return 0;
}

postgres_entrypoint(pkt, &conn_tuple, &header, NO_TAGS);
return 0;
}

SEC("socket/postgres_message_parser")
int socket__postgres_message_parser(struct __sk_buff* skb) {
skb_info_t skb_info = {};
conn_tuple_t conn_tuple = {};

if (!fetch_dispatching_arguments(&conn_tuple, &skb_info)) {
return 0;
}

if (is_tcp_termination(&skb_info)) {
postgres_tcp_termination(&conn_tuple);
return 0;
}

normalize_tuple(&conn_tuple);

pktbuf_t pkt = pktbuf_from_skb(skb, &skb_info);
process_postgres_messages(pkt, conn_tuple);

return 0;
}

// Handles plaintext Postgres Parse messages. Pulls the connection tuple and the packet buffer from the map and calls the
// dedicated function to handle the message.
SEC("socket/postgres_process_parse_message")
@@ -391,62 +400,7 @@ int uprobe__postgres_tls_message_parser(struct pt_regs *ctx) {
conn_tuple_t tup = args->tup;

pktbuf_t pkt = pktbuf_from_tls(ctx, args);
struct pg_message_header header;
if (!read_message_header(pkt, &header)) {
return 0;
}

// We didn't find a new query, thus we assume we're in the middle of a transaction.
// We look up the transaction in the in-flight map, and if it doesn't exist, we ignore the message.
postgres_transaction_t *transaction = bpf_map_lookup_elem(&postgres_in_flight, &tup);
if (!transaction) {
return 0;
}

postgres_tail_call_state_t *iteration_value = bpf_map_lookup_elem(&postgres_iterations, &zero);
if (iteration_value == NULL) {
return 0;
}

if (iteration_value->iteration >= POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES) {
return 0;
}

if (iteration_value->data_off != 0) {
pktbuf_set_offset(pkt, iteration_value->data_off);
}


#pragma unroll(POSTGRES_MAX_MESSAGES_PER_TAIL_CALL)
for (__u32 iteration = 0; iteration < POSTGRES_MAX_MESSAGES_PER_TAIL_CALL; ++iteration) {
if (!read_message_header(pkt, &header)) {
break;
}
if (header.message_tag == POSTGRES_COMMAND_COMPLETE_MAGIC_BYTE) {
handle_command_complete(&tup, transaction);
break;
}
// We didn't find a command complete message, so we advance the data offset to the end of the message.
// reminder, the message length includes the size of the payload, 4 bytes of the message length itself, but not
// the message tag. So we need to add 1 to the message length to jump over the entire message.
pktbuf_advance(pkt, header.message_len + 1);
}

iteration_value->iteration += 1;
iteration_value->data_off = pktbuf_data_offset(pkt);

pktbuf_tail_call_option_t message_parser_tail_call_array[] = {
[PKTBUF_SKB] = {
.prog_array_map = &protocols_progs,
.index = PROG_POSTGRES_MESSAGE_PARSER,
},
[PKTBUF_TLS] = {
.prog_array_map = &tls_process_progs,
.index = PROG_POSTGRES_MESSAGE_PARSER,
},
};

pktbuf_tail_call_compact(pkt, message_parser_tail_call_array);
process_postgres_messages(pkt, tup);

return 0;
}