Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[USM] Introduce tail cals for the Postgres monitoring #30547

Merged
merged 16 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/network/ebpf/c/protocols/classification/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ typedef enum {
PROG_KAFKA_TERMINATION,
PROG_GRPC,
PROG_POSTGRES,
PROG_POSTGRES_HANDLE_RESPONSE,
PROG_POSTGRES_PROCESS_PARSE_MESSAGE,
PROG_POSTGRES_TERMINATION,
PROG_REDIS,
Expand Down
3 changes: 3 additions & 0 deletions pkg/network/ebpf/c/protocols/postgres/decoding-maps.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ BPF_HASH_MAP(postgres_in_flight, conn_tuple_t, postgres_transaction_t, 0)
// Acts as a scratch buffer for Postgres events, for preparing events before they are sent to userspace.
BPF_PERCPU_ARRAY_MAP(postgres_scratch_buffer, postgres_event_t, 1)

// Maintains the current state of tail calls for each Postgres message.
BPF_PERCPU_ARRAY_MAP(postgres_iterations, postgres_tail_call_state_t, 1)

#endif
198 changes: 156 additions & 42 deletions pkg/network/ebpf/c/protocols/postgres/decoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ static __always_inline void handle_command_complete(conn_tuple_t *conn_tuple, po
bpf_map_delete_elem(&postgres_in_flight, conn_tuple);
}

// Handles a TCP termination event by deleting the connection tuple from the in-flight map.
static void __always_inline postgres_tcp_termination(conn_tuple_t *tup) {
bpf_map_delete_elem(&postgres_in_flight, tup);
flip_tuple(tup);
Expand Down Expand Up @@ -107,12 +108,28 @@ static int __always_inline skip_string(pktbuf_t pkt, int message_len) {
return SKIP_STRING_FAILED;
}

// Main processing logic for the Postgres protocol. It reads the first message header and decides what to do based on the
// message tag. If the message is a new query, it stores the query in the in-flight map. If the message is a command
// complete, it enqueues the transaction and deletes it from the in-flight map. If the message is not a command complete,
// it tries to read up to POSTGRES_MAX_MESSAGES messages, looking for a command complete message.
// If the message is not a new query or a command complete, it ignores the message.
static __always_inline void postgres_entrypoint(pktbuf_t pkt, conn_tuple_t *conn_tuple, struct pg_message_header *header, __u8 tags) {
// Reads the first message header and decides what to do based on the
// message tag. If the message is a new query, it stores the query in the in-flight map.
// If the message is a parse message, we tail call to the dedicated process_parse_message program.
// If the message is a command complete, it calls the handle_command_complete program.
static __always_inline void postgres_handle_message(pktbuf_t pkt, conn_tuple_t *conn_tuple, struct pg_message_header *header, __u8 tags) {
// If the message is a parse message, we tail call to the dedicated function to handle it as it is too large to be
// inlined in the main function.
if (header->message_tag == POSTGRES_PARSE_MAGIC_BYTE) {
pktbuf_tail_call_option_t process_parse_tail_call_array[] = {
[PKTBUF_SKB] = {
.prog_array_map = &protocols_progs,
.index = PROG_POSTGRES_PROCESS_PARSE_MESSAGE,
},
[PKTBUF_TLS] = {
.prog_array_map = &tls_process_progs,
.index = PROG_POSTGRES_PROCESS_PARSE_MESSAGE,
},
};
pktbuf_tail_call_compact(pkt, process_parse_tail_call_array);
return;
}

// If the message is a new query, we store the query in the in-flight map.
// If we had a transaction for the connection, we override it and drops the previous one.
if (header->message_tag == POSTGRES_QUERY_MAGIC_BYTE) {
Expand All @@ -125,27 +142,25 @@ static __always_inline void postgres_entrypoint(pktbuf_t pkt, conn_tuple_t *conn
return;
}

// We didn't find a new query, thus we assume we're in the middle of a transaction.
// We look up the transaction in the in-flight map, and if it doesn't exist, we ignore the message.
postgres_transaction_t *transaction = bpf_map_lookup_elem(&postgres_in_flight, conn_tuple);
if (!transaction) {
const __u32 zero = 0;
postgres_tail_call_state_t *iteration_value = bpf_map_lookup_elem(&postgres_iterations, &zero);
if (iteration_value == NULL) {
return;
}

#pragma unroll(POSTGRES_MAX_MESSAGES)
for (__u32 iteration = 0; iteration < POSTGRES_MAX_MESSAGES; ++iteration) {
if (!read_message_header(pkt, header)) {
break;
}
if (header->message_tag == POSTGRES_COMMAND_COMPLETE_MAGIC_BYTE) {
handle_command_complete(conn_tuple, transaction);
break;
}
// We didn't find a command complete message, so we advance the data offset to the end of the message.
// reminder, the message length includes the size of the payload, 4 bytes of the message length itself, but not
// the message tag. So we need to add 1 to the message length to jump over the entire message.
pktbuf_advance(pkt, header->message_len + 1);
}
iteration_value->iteration = 0;
iteration_value->data_off = 0;
pktbuf_tail_call_option_t handle_response_tail_call_array[] = {
[PKTBUF_SKB] = {
.prog_array_map = &protocols_progs,
.index = PROG_POSTGRES_HANDLE_RESPONSE,
},
[PKTBUF_TLS] = {
.prog_array_map = &tls_process_progs,
.index = PROG_POSTGRES_HANDLE_RESPONSE,
},
};
pktbuf_tail_call_compact(pkt, handle_response_tail_call_array);
return;
}

Expand Down Expand Up @@ -180,12 +195,83 @@ static __always_inline void postgres_handle_parse_message(pktbuf_t pkt, conn_tup
return;
}

// Handles Postgres command complete messages by examining packet data for both plaintext and TLS traffic.
// This function handles multiple messages within a single packet, processing up to POSTGRES_MAX_MESSAGES_PER_TAIL_CALL
// messages per call. When more messages exist beyond this limit, it uses tail call chaining (up to
// POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES) to continue processing.
static __always_inline bool handle_response(pktbuf_t pkt, conn_tuple_t conn_tuple) {
const __u32 zero = 0;
bool found_command_complete = false;
struct pg_message_header header;

postgres_tail_call_state_t *iteration_value = bpf_map_lookup_elem(&postgres_iterations, &zero);
if (iteration_value == NULL) {
bpf_map_delete_elem(&postgres_in_flight, &conn_tuple);
return 0;
}

if (iteration_value->iteration >= POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES) {
return 0;
}

if (iteration_value->data_off != 0) {
pktbuf_set_offset(pkt, iteration_value->data_off);
}
guyarb marked this conversation as resolved.
Show resolved Hide resolved

// We didn't find a new query, thus we assume we're in the middle of a transaction.
// We look up the transaction in the in-flight map, and if it doesn't exist, we ignore the message.
postgres_transaction_t *transaction = bpf_map_lookup_elem(&postgres_in_flight, &conn_tuple);
if (!transaction) {
return 0;
}

#pragma unroll(POSTGRES_MAX_MESSAGES_PER_TAIL_CALL)
for (__u32 iteration = 0; iteration < POSTGRES_MAX_MESSAGES_PER_TAIL_CALL; ++iteration) {
if (!read_message_header(pkt, &header)) {
break;
}
if (header.message_tag == POSTGRES_COMMAND_COMPLETE_MAGIC_BYTE) {
handle_command_complete(&conn_tuple, transaction);
found_command_complete = true;
break;
}
// We didn't find a command complete message, so we advance the data offset to the end of the message.
// reminder, the message length includes the size of the payload, 4 bytes of the message length itself, but not
// the message tag. So we need to add 1 to the message length to jump over the entire message.
pktbuf_advance(pkt, header.message_len + 1);
}

if (found_command_complete) {
return 0;
}
// We didn't find a command complete message, so we need to continue processing the packet.
// We save the current data offset and increment the iteration counter.
iteration_value->iteration += 1;
guyarb marked this conversation as resolved.
Show resolved Hide resolved
iteration_value->data_off = pktbuf_data_offset(pkt);

// If the maximum number of tail calls has been reached, we can skip invoking the next tail call.
if (iteration_value->iteration >= POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES) {
return 0;
}

pktbuf_tail_call_option_t handle_response_tail_call_array[] = {
[PKTBUF_SKB] = {
.prog_array_map = &protocols_progs,
.index = PROG_POSTGRES_HANDLE_RESPONSE,
},
[PKTBUF_TLS] = {
.prog_array_map = &tls_process_progs,
.index = PROG_POSTGRES_HANDLE_RESPONSE,
},
};
pktbuf_tail_call_compact(pkt, handle_response_tail_call_array);
return 0;
}

// Entrypoint to process plaintext Postgres traffic. Pulls the connection tuple and the packet buffer from the map and
// calls the main processing function. If the packet is a TCP termination, it calls the termination function.
// If the message is a parse message, it tail calls to the dedicated function to handle it as it is too large to be
// inlined in the main entrypoint. Otherwise, it calls the main processing function.
SEC("socket/postgres_process")
int socket__postgres_process(struct __sk_buff* skb) {
SEC("socket/postgres_handle")
int socket__postgres_handle(struct __sk_buff* skb) {
skb_info_t skb_info = {};
conn_tuple_t conn_tuple = {};

Expand All @@ -206,13 +292,30 @@ int socket__postgres_process(struct __sk_buff* skb) {
return 0;
}

// If the message is a parse message, we tail call to the dedicated function to handle it.
if (header.message_tag == POSTGRES_PARSE_MAGIC_BYTE) {
bpf_tail_call_compat(skb, &protocols_progs, PROG_POSTGRES_PROCESS_PARSE_MESSAGE);
postgres_handle_message(pkt, &conn_tuple, &header, NO_TAGS);
return 0;
}

// Handles plain text command complete messages for plaintext Postgres traffic. Pulls the connection tuple and the
// packet buffer from the map and calls the dedicated function to handle the message.
SEC("socket/postgres_handle_response")
int socket__postgres_handle_response(struct __sk_buff* skb) {
skb_info_t skb_info = {};
conn_tuple_t conn_tuple = {};

if (!fetch_dispatching_arguments(&conn_tuple, &skb_info)) {
return 0;
}

if (is_tcp_termination(&skb_info)) {
postgres_tcp_termination(&conn_tuple);
return 0;
}

postgres_entrypoint(pkt, &conn_tuple, &header, NO_TAGS);
normalize_tuple(&conn_tuple);

pktbuf_t pkt = pktbuf_from_skb(skb, &skb_info);
handle_response(pkt, conn_tuple);
return 0;
}

Expand All @@ -235,10 +338,9 @@ int socket__postgres_process_parse_message(struct __sk_buff* skb) {
}

// Entrypoint to process TLS Postgres traffic. Pulls the connection tuple and the packet buffer from the map and calls
// the main processing function. If the packet is a Parse message, it tail calls to the dedicated function to handle it.
// Otherwise, it calls the main processing function.
SEC("uprobe/postgres_tls_process")
int uprobe__postgres_tls_process(struct pt_regs *ctx) {
// the main processing function.
SEC("uprobe/postgres_tls_handle")
int uprobe__postgres_tls_handle(struct pt_regs *ctx) {
const __u32 zero = 0;

tls_dispatcher_arguments_t *args = bpf_map_lookup_elem(&tls_dispatcher_arguments, &zero);
Expand All @@ -255,12 +357,7 @@ int uprobe__postgres_tls_process(struct pt_regs *ctx) {
return 0;
}

// If the message is a parse message, we tail call to the dedicated function to handle it.
if (header.message_tag == POSTGRES_PARSE_MAGIC_BYTE) {
bpf_tail_call_compat(ctx, &tls_process_progs, PROG_POSTGRES_PROCESS_PARSE_MESSAGE);
return 0;
}
postgres_entrypoint(pkt, &tup, &header, (__u8)args->tags);
postgres_handle_message(pkt, &tup, &header, (__u8)args->tags);
return 0;
}

Expand Down Expand Up @@ -299,4 +396,21 @@ int uprobe__postgres_tls_termination(struct pt_regs *ctx) {
return 0;
}

// Handles message parsing for a TLS Postgres traffic.
SEC("uprobe/postgres_tls_handle_response")
int uprobe__postgres_tls_handle_response(struct pt_regs *ctx) {
const __u32 zero = 0;

tls_dispatcher_arguments_t *args = bpf_map_lookup_elem(&tls_dispatcher_arguments, &zero);
if (args == NULL) {
return 0;
}

// Copying the tuple to the stack to handle verifier issues on kernel 4.14.
conn_tuple_t tup = args->tup;
pktbuf_t pkt = pktbuf_from_tls(ctx, args);
handle_response(pkt, tup);
return 0;
}

#endif
14 changes: 12 additions & 2 deletions pkg/network/ebpf/c/protocols/postgres/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
// Maximum length of Postgres query to send to userspace.
#define POSTGRES_BUFFER_SIZE 160

// Maximum number of Postgres messages we can parse for a single packet.
#define POSTGRES_MAX_MESSAGES 80
// Represents the maximum number of tail calls we can use to process a single message.
#define POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES 1

// Represents the maximum number of messages we process in a single tail call.
#define POSTGRES_MAX_MESSAGES_PER_TAIL_CALL 80

// Postgres transaction information we store in the kernel.
typedef struct {
Expand All @@ -26,4 +29,11 @@ typedef struct {
postgres_transaction_t tx;
} postgres_event_t;

typedef struct {
__u8 iteration;
// Saving the packet data offset is crucial for maintaining the current read position and ensuring proper utilization
// of tail calls.
__u32 data_off;
} postgres_tail_call_state_t;

#endif
10 changes: 10 additions & 0 deletions pkg/network/protocols/ebpf_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package protocols

// #cgo CFLAGS: -I ../../ebpf/c -I ../ebpf/c
// #include "../ebpf/c/protocols/classification/defs.h"
// #include "../ebpf/c/protocols/postgres/types.h"
import "C"

const (
Expand All @@ -17,6 +18,13 @@ const (
layerEncryptionBit = C.LAYER_ENCRYPTION_BIT
)

const (
// PostgresMaxMessagesPerTailCall is the maximum number of messages that can be processed in a single tail call in our Postgres decoding solution
PostgresMaxMessagesPerTailCall = C.POSTGRES_MAX_MESSAGES_PER_TAIL_CALL
// PostgresMaxTailCalls is the maximum number of tail calls that can be made in our Postgres decoding solution
PostgresMaxTailCalls = C.POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES
)

// DispatcherProgramType is a C type to represent the eBPF programs used for tail calls.
type DispatcherProgramType C.dispatcher_prog_t

Expand Down Expand Up @@ -63,6 +71,8 @@ const (
ProgramKafkaTermination ProgramType = C.PROG_KAFKA_TERMINATION
// ProgramPostgres is the Golang representation of the C.PROG_POSTGRES enum
ProgramPostgres ProgramType = C.PROG_POSTGRES
// ProgramPostgresHandleResponse is the Golang representation of the C.PROG_POSTGRES_HANDLE_RESPONSE enum
ProgramPostgresHandleResponse ProgramType = C.PROG_POSTGRES_HANDLE_RESPONSE
// ProgramPostgresParseMessage is the Golang representation of the C.PROG_POSTGRES_PROCESS_PARSE_MESSAGE enum
ProgramPostgresParseMessage ProgramType = C.PROG_POSTGRES_PROCESS_PARSE_MESSAGE
// ProgramPostgresTermination is tail call to process Postgres termination.
Expand Down
16 changes: 12 additions & 4 deletions pkg/network/protocols/ebpf_types_linux.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading