Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[USM] Introduce tail cals for the Postgres monitoring #30547

Merged
merged 16 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/network/ebpf/c/protocols/classification/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ typedef enum {
PROG_KAFKA_TERMINATION,
PROG_GRPC,
PROG_POSTGRES,
PROG_POSTGRES_MESSAGE_PARSER,
PROG_POSTGRES_PROCESS_PARSE_MESSAGE,
guyarb marked this conversation as resolved.
Show resolved Hide resolved
PROG_POSTGRES_TERMINATION,
PROG_REDIS,
Expand Down
3 changes: 3 additions & 0 deletions pkg/network/ebpf/c/protocols/postgres/decoding-maps.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ BPF_HASH_MAP(postgres_in_flight, conn_tuple_t, postgres_transaction_t, 0)
// Acts as a scratch buffer for Postgres events, for preparing events before they are sent to userspace.
BPF_PERCPU_ARRAY_MAP(postgres_scratch_buffer, postgres_event_t, 1)

// Maintains the current state of tail calls for each Postgres message.
BPF_PERCPU_ARRAY_MAP(postgres_iterations, postgres_tail_call_state_t, 1)

#endif
138 changes: 119 additions & 19 deletions pkg/network/ebpf/c/protocols/postgres/decoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,10 @@ static int __always_inline skip_string(pktbuf_t pkt, int message_len) {
// Main processing logic for the Postgres protocol. It reads the first message header and decides what to do based on the
// message tag. If the message is a new query, it stores the query in the in-flight map. If the message is a command
// complete, it enqueues the transaction and deletes it from the in-flight map. If the message is not a command complete,
// it tries to read up to POSTGRES_MAX_MESSAGES messages, looking for a command complete message.
// it tries to read up to POSTGRES_MAX_MESSAGES_PAR_TAIL_CALL*POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES messages, looking for a command complete message.
guyarb marked this conversation as resolved.
Show resolved Hide resolved
// If the message is not a new query or a command complete, it ignores the message.
static __always_inline void postgres_entrypoint(pktbuf_t pkt, conn_tuple_t *conn_tuple, struct pg_message_header *header, __u8 tags) {
const __u32 zero = 0;
guyarb marked this conversation as resolved.
Show resolved Hide resolved
// If the message is a new query, we store the query in the in-flight map.
// If we had a transaction for the connection, we override it and drops the previous one.
if (header->message_tag == POSTGRES_QUERY_MAGIC_BYTE) {
Expand All @@ -125,27 +126,24 @@ static __always_inline void postgres_entrypoint(pktbuf_t pkt, conn_tuple_t *conn
return;
}

// We didn't find a new query, thus we assume we're in the middle of a transaction.
// We look up the transaction in the in-flight map, and if it doesn't exist, we ignore the message.
postgres_transaction_t *transaction = bpf_map_lookup_elem(&postgres_in_flight, conn_tuple);
if (!transaction) {
postgres_tail_call_state_t *iteration_value = bpf_map_lookup_elem(&postgres_iterations, &zero);
if (iteration_value == NULL) {
return;
}

#pragma unroll(POSTGRES_MAX_MESSAGES)
for (__u32 iteration = 0; iteration < POSTGRES_MAX_MESSAGES; ++iteration) {
if (!read_message_header(pkt, header)) {
break;
}
if (header->message_tag == POSTGRES_COMMAND_COMPLETE_MAGIC_BYTE) {
handle_command_complete(conn_tuple, transaction);
break;
}
// We didn't find a command complete message, so we advance the data offset to the end of the message.
// reminder, the message length includes the size of the payload, 4 bytes of the message length itself, but not
// the message tag. So we need to add 1 to the message length to jump over the entire message.
pktbuf_advance(pkt, header->message_len + 1);
}
iteration_value->iteration = 0;
iteration_value->data_off = 0;
pktbuf_tail_call_option_t message_parser_tail_call_array[] = {
[PKTBUF_SKB] = {
.prog_array_map = &protocols_progs,
.index = PROG_POSTGRES_MESSAGE_PARSER,
},
[PKTBUF_TLS] = {
.prog_array_map = &tls_process_progs,
.index = PROG_POSTGRES_MESSAGE_PARSER,
},
};
pktbuf_tail_call_compact(pkt, message_parser_tail_call_array);
return;
}

Expand Down Expand Up @@ -180,6 +178,69 @@ static __always_inline void postgres_handle_parse_message(pktbuf_t pkt, conn_tup
return;
}

// A function for processing multiple messages per call up to POSTGRES_MAX_MESSAGES_PER_TAIL_CALL
// and handles tail call chaining for processing additional messages beyond this limit.
// (max POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES), serving both for plaintext and TLS Postgres traffic.
static __always_inline bool process_postgres_messages(pktbuf_t pkt, conn_tuple_t conn_tuple) {
const __u32 zero = 0;
struct pg_message_header header;
if (!read_message_header(pkt, &header)) {
return 0;
}

// We didn't find a new query, thus we assume we're in the middle of a transaction.
// We look up the transaction in the in-flight map, and if it doesn't exist, we ignore the message.
postgres_transaction_t *transaction = bpf_map_lookup_elem(&postgres_in_flight, &conn_tuple);
if (!transaction) {
return 0;
}
guyarb marked this conversation as resolved.
Show resolved Hide resolved

postgres_tail_call_state_t *iteration_value = bpf_map_lookup_elem(&postgres_iterations, &zero);
if (iteration_value == NULL) {
return 0;
}

if (iteration_value->iteration >= POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES) {
return 0;
}
guyarb marked this conversation as resolved.
Show resolved Hide resolved

if (iteration_value->data_off != 0) {
pktbuf_set_offset(pkt, iteration_value->data_off);
}
guyarb marked this conversation as resolved.
Show resolved Hide resolved

#pragma unroll(POSTGRES_MAX_MESSAGES_PER_TAIL_CALL)
for (__u32 iteration = 0; iteration < POSTGRES_MAX_MESSAGES_PER_TAIL_CALL; ++iteration) {
if (!read_message_header(pkt, &header)) {
break;
}
if (header.message_tag == POSTGRES_COMMAND_COMPLETE_MAGIC_BYTE) {
handle_command_complete(&conn_tuple, transaction);
break;
}
// We didn't find a command complete message, so we advance the data offset to the end of the message.
// reminder, the message length includes the size of the payload, 4 bytes of the message length itself, but not
// the message tag. So we need to add 1 to the message length to jump over the entire message.
pktbuf_advance(pkt, header.message_len + 1);
}

iteration_value->iteration += 1;
guyarb marked this conversation as resolved.
Show resolved Hide resolved
iteration_value->data_off = pktbuf_data_offset(pkt);

pktbuf_tail_call_option_t message_parser_tail_call_array[] = {
[PKTBUF_SKB] = {
.prog_array_map = &protocols_progs,
.index = PROG_POSTGRES_MESSAGE_PARSER,
},
[PKTBUF_TLS] = {
.prog_array_map = &tls_process_progs,
.index = PROG_POSTGRES_MESSAGE_PARSER,
},
};

pktbuf_tail_call_compact(pkt, message_parser_tail_call_array);
return 0;
}

// Entrypoint to process plaintext Postgres traffic. Pulls the connection tuple and the packet buffer from the map and
// calls the main processing function. If the packet is a TCP termination, it calls the termination function.
// If the message is a parse message, it tail calls to the dedicated function to handle it as it is too large to be
Expand Down Expand Up @@ -216,6 +277,28 @@ int socket__postgres_process(struct __sk_buff* skb) {
return 0;
}

// Handles the message parsing for plaintext Postgres traffic.
SEC("socket/postgres_message_parser")
int socket__postgres_message_parser(struct __sk_buff* skb) {
skb_info_t skb_info = {};
conn_tuple_t conn_tuple = {};

if (!fetch_dispatching_arguments(&conn_tuple, &skb_info)) {
return 0;
}

if (is_tcp_termination(&skb_info)) {
postgres_tcp_termination(&conn_tuple);
return 0;
}

normalize_tuple(&conn_tuple);

pktbuf_t pkt = pktbuf_from_skb(skb, &skb_info);
process_postgres_messages(pkt, conn_tuple);
return 0;
}

// Handles plaintext Postgres Parse messages. Pulls the connection tuple and the packet buffer from the map and calls the
// dedicated function to handle the message.
SEC("socket/postgres_process_parse_message")
Expand Down Expand Up @@ -299,4 +382,21 @@ int uprobe__postgres_tls_termination(struct pt_regs *ctx) {
return 0;
}

// Handles message parsing for a TLS Postgres traffic.
SEC("uprobe/postgres_tls_message_parser")
int uprobe__postgres_tls_message_parser(struct pt_regs *ctx) {
const __u32 zero = 0;

tls_dispatcher_arguments_t *args = bpf_map_lookup_elem(&tls_dispatcher_arguments, &zero);
if (args == NULL) {
return 0;
}

// Copying the tuple to the stack to handle verifier issues on kernel 4.14.
conn_tuple_t tup = args->tup;
pktbuf_t pkt = pktbuf_from_tls(ctx, args);
process_postgres_messages(pkt, tup);
return 0;
}

#endif
14 changes: 12 additions & 2 deletions pkg/network/ebpf/c/protocols/postgres/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
// Maximum length of Postgres query to send to userspace.
#define POSTGRES_BUFFER_SIZE 160

// Maximum number of Postgres messages we can parse for a single packet.
#define POSTGRES_MAX_MESSAGES 80
// Represents the maximum number of tail calls we can use to process a single message.
#define POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES 4

// Represents the maximum number of messages we process in a single tail call.
#define POSTGRES_MAX_MESSAGES_PER_TAIL_CALL 20
guyarb marked this conversation as resolved.
Show resolved Hide resolved

// Postgres transaction information we store in the kernel.
typedef struct {
Expand All @@ -26,4 +29,11 @@ typedef struct {
postgres_transaction_t tx;
} postgres_event_t;

typedef struct {
__u16 iteration;
guyarb marked this conversation as resolved.
Show resolved Hide resolved
// Saving the data offset is crucial for maintaining the current read position and ensuring proper utilization
guyarb marked this conversation as resolved.
Show resolved Hide resolved
// of tail calls.
__u32 data_off;
} postgres_tail_call_state_t;

#endif
2 changes: 2 additions & 0 deletions pkg/network/protocols/ebpf_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ const (
ProgramKafkaTermination ProgramType = C.PROG_KAFKA_TERMINATION
// ProgramPostgres is the Golang representation of the C.PROG_POSTGRES enum
ProgramPostgres ProgramType = C.PROG_POSTGRES
// ProgramPostgresMessageParser is the Golang representation of the C.PROG_POSTGRES enum
guyarb marked this conversation as resolved.
Show resolved Hide resolved
ProgramPostgresMessageParser ProgramType = C.PROG_POSTGRES_MESSAGE_PARSER
// ProgramPostgresParseMessage is the Golang representation of the C.PROG_POSTGRES_PROCESS_PARSE_MESSAGE enum
ProgramPostgresParseMessage ProgramType = C.PROG_POSTGRES_PROCESS_PARSE_MESSAGE
// ProgramPostgresTermination is tail call to process Postgres termination.
Expand Down
10 changes: 6 additions & 4 deletions pkg/network/protocols/ebpf_types_linux.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions pkg/network/protocols/postgres/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ const (
// InFlightMap is the name of the in-flight map.
InFlightMap = "postgres_in_flight"
scratchBufferMap = "postgres_scratch_buffer"
iterationsMap = "postgres_iterations"
processTailCall = "socket__postgres_process"
messageParserCall = "socket__postgres_message_parser"
parseMessageTailCall = "socket__postgres_process_parse_message"
tlsProcessTailCall = "uprobe__postgres_tls_process"
tlsParseMessageTailCall = "uprobe__postgres_tls_process_parse_message"
tlsTerminationTailCall = "uprobe__postgres_tls_termination"
tlsMessageParserCall = "uprobe__postgres_tls_message_parser"
eventStream = "postgres"
)

Expand All @@ -58,6 +61,9 @@ var Spec = &protocols.ProtocolSpec{
{
Name: scratchBufferMap,
},
{
Name: iterationsMap,
},
{
Name: "postgres_batch_events",
},
Expand All @@ -76,6 +82,13 @@ var Spec = &protocols.ProtocolSpec{
EBPFFuncName: processTailCall,
guyarb marked this conversation as resolved.
Show resolved Hide resolved
},
},
{
ProgArrayName: protocols.ProtocolDispatcherProgramsMap,
Key: uint32(protocols.ProgramPostgresMessageParser),
ProbeIdentificationPair: manager.ProbeIdentificationPair{
EBPFFuncName: messageParserCall,
},
},
{
ProgArrayName: protocols.ProtocolDispatcherProgramsMap,
Key: uint32(protocols.ProgramPostgresParseMessage),
Expand Down Expand Up @@ -104,6 +117,13 @@ var Spec = &protocols.ProtocolSpec{
EBPFFuncName: tlsTerminationTailCall,
},
},
{
ProgArrayName: protocols.TLSDispatcherProgramsMap,
Key: uint32(protocols.ProgramPostgresMessageParser),
ProbeIdentificationPair: manager.ProbeIdentificationPair{
EBPFFuncName: tlsMessageParserCall,
},
},
},
}

Expand Down
Loading