Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[USM] Introduce tail cals for the Postgres monitoring #30547

Merged
merged 16 commits into from
Nov 4, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update program names to make flow clearer
  • Loading branch information
amitslavin committed Oct 29, 2024
commit 4a5eedea7388d427a2b11a7d813d80b86811b48f
2 changes: 1 addition & 1 deletion pkg/network/ebpf/c/protocols/classification/defs.h
Original file line number Diff line number Diff line change
@@ -152,7 +152,7 @@ typedef enum {
PROG_KAFKA_TERMINATION,
PROG_GRPC,
PROG_POSTGRES,
PROG_POSTGRES_MESSAGE_PARSER,
PROG_POSTGRES_HANDLE_COMMAND_COMPLETE,
PROG_POSTGRES_PROCESS_PARSE_MESSAGE,
PROG_POSTGRES_TERMINATION,
PROG_REDIS,
115 changes: 58 additions & 57 deletions pkg/network/ebpf/c/protocols/postgres/decoding.h
Original file line number Diff line number Diff line change
@@ -107,13 +107,28 @@ static int __always_inline skip_string(pktbuf_t pkt, int message_len) {
return SKIP_STRING_FAILED;
}

// Main processing logic for the Postgres protocol. It reads the first message header and decides what to do based on the
// message tag. If the message is a new query, it stores the query in the in-flight map. If the message is a command
// complete, it enqueues the transaction and deletes it from the in-flight map. If the message is not a command complete,
// it tries to read up to POSTGRES_MAX_MESSAGES_PAR_TAIL_CALL*POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES messages, looking for a command complete message.
// If the message is not a new query or a command complete, it ignores the message.
static __always_inline void postgres_entrypoint(pktbuf_t pkt, conn_tuple_t *conn_tuple, struct pg_message_header *header, __u8 tags) {
// postgres_handle_message reads the first message header and decides what to do based on the
// message tag. If the message is a new query, it stores the query in the in-flight map.
// If the message is a parse message, we tail call to the dedicated process_parse_message program.
// If the message is a command complete, it calls the handle_command_complete program.
static __always_inline void postgres_handle_message(pktbuf_t pkt, conn_tuple_t *conn_tuple, struct pg_message_header *header, __u8 tags) {
const __u32 zero = 0;
// If the message is a parse message, we tail call to the dedicated function to handle it.
if (header->message_tag == POSTGRES_PARSE_MAGIC_BYTE) {
pktbuf_tail_call_option_t process_parse_tail_call_array[] = {
[PKTBUF_SKB] = {
.prog_array_map = &protocols_progs,
.index = PROG_POSTGRES_PROCESS_PARSE_MESSAGE,
},
[PKTBUF_TLS] = {
.prog_array_map = &tls_process_progs,
.index = PROG_POSTGRES_PROCESS_PARSE_MESSAGE,
},
};
pktbuf_tail_call_compact(pkt, process_parse_tail_call_array);
return;
}

// If the message is a new query, we store the query in the in-flight map.
// If we had a transaction for the connection, we override it and drops the previous one.
if (header->message_tag == POSTGRES_QUERY_MAGIC_BYTE) {
@@ -133,17 +148,17 @@ static __always_inline void postgres_entrypoint(pktbuf_t pkt, conn_tuple_t *conn

iteration_value->iteration = 0;
iteration_value->data_off = 0;
pktbuf_tail_call_option_t message_parser_tail_call_array[] = {
pktbuf_tail_call_option_t handle_command_complete_tail_call_array[] = {
[PKTBUF_SKB] = {
.prog_array_map = &protocols_progs,
.index = PROG_POSTGRES_MESSAGE_PARSER,
.index = PROG_POSTGRES_HANDLE_COMMAND_COMPLETE,
},
[PKTBUF_TLS] = {
.prog_array_map = &tls_process_progs,
.index = PROG_POSTGRES_MESSAGE_PARSER,
.index = PROG_POSTGRES_HANDLE_COMMAND_COMPLETE,
},
};
pktbuf_tail_call_compact(pkt, message_parser_tail_call_array);
pktbuf_tail_call_compact(pkt, handle_command_complete_tail_call_array);
return;
}

@@ -178,12 +193,13 @@ static __always_inline void postgres_handle_parse_message(pktbuf_t pkt, conn_tup
return;
}

// A function for processing multiple messages per call up to POSTGRES_MAX_MESSAGES_PER_TAIL_CALL
// and handles tail call chaining for processing additional messages beyond this limit.
// (max POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES), serving both for plaintext and TLS Postgres traffic.
static __always_inline bool process_postgres_messages(pktbuf_t pkt, conn_tuple_t conn_tuple) {
const __u32 zero = 0;
struct pg_message_header header;
// Handles Postgres command complete messages by examining packet data for both plaintext and TLS traffic.
// This function handles multiple messages within a single packet, processing up to POSTGRES_MAX_MESSAGES_PER_TAIL_CALL
// messages per call. When more messages exist beyond this limit, it uses tail call chaining (up to
// POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES) to continue processing.
static __always_inline bool handle_command_complete_messages(pktbuf_t pkt, conn_tuple_t conn_tuple) {
const __u32 zero = 0;
struct pg_message_header header;
if (!read_message_header(pkt, &header)) {
return 0;
}
@@ -225,28 +241,24 @@ static __always_inline bool process_postgres_messages(pktbuf_t pkt, conn_tuple_t

iteration_value->iteration += 1;
iteration_value->data_off = pktbuf_data_offset(pkt);

pktbuf_tail_call_option_t message_parser_tail_call_array[] = {
[PKTBUF_SKB] = {
.prog_array_map = &protocols_progs,
.index = PROG_POSTGRES_MESSAGE_PARSER,
},
[PKTBUF_TLS] = {
.prog_array_map = &tls_process_progs,
.index = PROG_POSTGRES_MESSAGE_PARSER,
},
};

pktbuf_tail_call_compact(pkt, message_parser_tail_call_array);
pktbuf_tail_call_option_t handle_command_complete_tail_call_array[] = {
[PKTBUF_SKB] = {
.prog_array_map = &protocols_progs,
.index = PROG_POSTGRES_HANDLE_COMMAND_COMPLETE,
},
[PKTBUF_TLS] = {
.prog_array_map = &tls_process_progs,
.index = PROG_POSTGRES_HANDLE_COMMAND_COMPLETE,
},
};
pktbuf_tail_call_compact(pkt, handle_command_complete_tail_call_array);
return 0;
}

// Entrypoint to process plaintext Postgres traffic. Pulls the connection tuple and the packet buffer from the map and
// calls the main processing function. If the packet is a TCP termination, it calls the termination function.
// If the message is a parse message, it tail calls to the dedicated function to handle it as it is too large to be
// inlined in the main entrypoint. Otherwise, it calls the main processing function.
SEC("socket/postgres_process")
int socket__postgres_process(struct __sk_buff* skb) {
SEC("socket/postgres_handle")
int socket__postgres_handle(struct __sk_buff* skb) {
skb_info_t skb_info = {};
conn_tuple_t conn_tuple = {};

@@ -267,19 +279,14 @@ int socket__postgres_process(struct __sk_buff* skb) {
return 0;
}

// If the message is a parse message, we tail call to the dedicated function to handle it.
if (header.message_tag == POSTGRES_PARSE_MAGIC_BYTE) {
bpf_tail_call_compat(skb, &protocols_progs, PROG_POSTGRES_PROCESS_PARSE_MESSAGE);
return 0;
}

postgres_entrypoint(pkt, &conn_tuple, &header, NO_TAGS);
postgres_handle_message(pkt, &conn_tuple, &header, NO_TAGS);
return 0;
}

// Handles the message parsing for plaintext Postgres traffic.
SEC("socket/postgres_message_parser")
int socket__postgres_message_parser(struct __sk_buff* skb) {
// Handles plain text command complete messages for plaintext Postgres traffic. Pulls the connection tuple and the
// packet buffer from the map and calls the dedicated function to handle the message.
SEC("socket/postgres_handle_command_complete")
int socket__postgres_handle_command_complete(struct __sk_buff* skb) {
skb_info_t skb_info = {};
conn_tuple_t conn_tuple = {};

@@ -295,7 +302,7 @@ int socket__postgres_message_parser(struct __sk_buff* skb) {
normalize_tuple(&conn_tuple);

pktbuf_t pkt = pktbuf_from_skb(skb, &skb_info);
process_postgres_messages(pkt, conn_tuple);
handle_command_complete_messages(pkt, conn_tuple);
return 0;
}

@@ -318,10 +325,9 @@ int socket__postgres_process_parse_message(struct __sk_buff* skb) {
}

// Entrypoint to process TLS Postgres traffic. Pulls the connection tuple and the packet buffer from the map and calls
// the main processing function. If the packet is a Parse message, it tail calls to the dedicated function to handle it.
// Otherwise, it calls the main processing function.
SEC("uprobe/postgres_tls_process")
int uprobe__postgres_tls_process(struct pt_regs *ctx) {
// the main processing function.
SEC("uprobe/postgres_tls_handle")
int uprobe__postgres_tls_handle(struct pt_regs *ctx) {
const __u32 zero = 0;

tls_dispatcher_arguments_t *args = bpf_map_lookup_elem(&tls_dispatcher_arguments, &zero);
@@ -338,12 +344,7 @@ int uprobe__postgres_tls_process(struct pt_regs *ctx) {
return 0;
}

// If the message is a parse message, we tail call to the dedicated function to handle it.
if (header.message_tag == POSTGRES_PARSE_MAGIC_BYTE) {
bpf_tail_call_compat(ctx, &tls_process_progs, PROG_POSTGRES_PROCESS_PARSE_MESSAGE);
return 0;
}
postgres_entrypoint(pkt, &tup, &header, (__u8)args->tags);
postgres_handle_message(pkt, &tup, &header, (__u8)args->tags);
return 0;
}

@@ -383,8 +384,8 @@ int uprobe__postgres_tls_termination(struct pt_regs *ctx) {
}

// Handles message parsing for a TLS Postgres traffic.
SEC("uprobe/postgres_tls_message_parser")
int uprobe__postgres_tls_message_parser(struct pt_regs *ctx) {
SEC("uprobe/postgres_tls_handle_command_complete")
int uprobe__postgres_tls_handle_command_complete(struct pt_regs *ctx) {
const __u32 zero = 0;

tls_dispatcher_arguments_t *args = bpf_map_lookup_elem(&tls_dispatcher_arguments, &zero);
@@ -395,7 +396,7 @@ int uprobe__postgres_tls_message_parser(struct pt_regs *ctx) {
// Copying the tuple to the stack to handle verifier issues on kernel 4.14.
conn_tuple_t tup = args->tup;
pktbuf_t pkt = pktbuf_from_tls(ctx, args);
process_postgres_messages(pkt, tup);
handle_command_complete_messages(pkt, tup);
return 0;
}

2 changes: 1 addition & 1 deletion pkg/network/ebpf/c/protocols/postgres/types.h
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ typedef struct {
} postgres_event_t;

typedef struct {
__u16 iteration;
__u8 iteration;
// Saving the data offset is crucial for maintaining the current read position and ensuring proper utilization
// of tail calls.
__u32 data_off;
4 changes: 2 additions & 2 deletions pkg/network/protocols/ebpf_types.go
Original file line number Diff line number Diff line change
@@ -63,8 +63,8 @@ const (
ProgramKafkaTermination ProgramType = C.PROG_KAFKA_TERMINATION
// ProgramPostgres is the Golang representation of the C.PROG_POSTGRES enum
ProgramPostgres ProgramType = C.PROG_POSTGRES
// ProgramPostgresMessageParser is the Golang representation of the C.PROG_POSTGRES enum
ProgramPostgresMessageParser ProgramType = C.PROG_POSTGRES_MESSAGE_PARSER
// ProgramPostgresHandleCommandComplete is the Golang representation of the C.PROG_POSTGRES_HANDLE_COMMAND_COMPLETE enum
ProgramPostgresHandleCommandComplete ProgramType = C.PROG_POSTGRES_HANDLE_COMMAND_COMPLETE
// ProgramPostgresParseMessage is the Golang representation of the C.PROG_POSTGRES_PROCESS_PARSE_MESSAGE enum
ProgramPostgresParseMessage ProgramType = C.PROG_POSTGRES_PROCESS_PARSE_MESSAGE
// ProgramPostgresTermination is tail call to process Postgres termination.
2 changes: 1 addition & 1 deletion pkg/network/protocols/ebpf_types_linux.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 17 additions & 17 deletions pkg/network/protocols/postgres/protocol.go
Original file line number Diff line number Diff line change
@@ -29,17 +29,17 @@ import (

const (
// InFlightMap is the name of the in-flight map.
InFlightMap = "postgres_in_flight"
scratchBufferMap = "postgres_scratch_buffer"
iterationsMap = "postgres_iterations"
processTailCall = "socket__postgres_process"
messageParserCall = "socket__postgres_message_parser"
parseMessageTailCall = "socket__postgres_process_parse_message"
tlsProcessTailCall = "uprobe__postgres_tls_process"
tlsParseMessageTailCall = "uprobe__postgres_tls_process_parse_message"
tlsTerminationTailCall = "uprobe__postgres_tls_termination"
tlsMessageParserCall = "uprobe__postgres_tls_message_parser"
eventStream = "postgres"
InFlightMap = "postgres_in_flight"
scratchBufferMap = "postgres_scratch_buffer"
iterationsMap = "postgres_iterations"
handleTailCall = "socket__postgres_handle"
handleCommandCompleteTailCall = "socket__postgres_handle_command_complete"
parseMessageTailCall = "socket__postgres_process_parse_message"
tlsHandleTailCall = "uprobe__postgres_tls_handle"
tlsParseMessageTailCall = "uprobe__postgres_tls_process_parse_message"
tlsTerminationTailCall = "uprobe__postgres_tls_termination"
tlsHandleCommandCompleteTailCall = "uprobe__postgres_tls_handle_command_complete"
eventStream = "postgres"
)

// protocol holds the state of the postgres protocol monitoring.
@@ -79,14 +79,14 @@ var Spec = &protocols.ProtocolSpec{
ProgArrayName: protocols.ProtocolDispatcherProgramsMap,
Key: uint32(protocols.ProgramPostgres),
ProbeIdentificationPair: manager.ProbeIdentificationPair{
EBPFFuncName: processTailCall,
EBPFFuncName: handleTailCall,
},
},
{
ProgArrayName: protocols.ProtocolDispatcherProgramsMap,
Key: uint32(protocols.ProgramPostgresMessageParser),
Key: uint32(protocols.ProgramPostgresHandleCommandComplete),
ProbeIdentificationPair: manager.ProbeIdentificationPair{
EBPFFuncName: messageParserCall,
EBPFFuncName: handleCommandCompleteTailCall,
},
},
{
@@ -100,7 +100,7 @@ var Spec = &protocols.ProtocolSpec{
ProgArrayName: protocols.TLSDispatcherProgramsMap,
Key: uint32(protocols.ProgramPostgres),
ProbeIdentificationPair: manager.ProbeIdentificationPair{
EBPFFuncName: tlsProcessTailCall,
EBPFFuncName: tlsHandleTailCall,
},
},
{
@@ -119,9 +119,9 @@ var Spec = &protocols.ProtocolSpec{
},
{
ProgArrayName: protocols.TLSDispatcherProgramsMap,
Key: uint32(protocols.ProgramPostgresMessageParser),
Key: uint32(protocols.ProgramPostgresHandleCommandComplete),
ProbeIdentificationPair: manager.ProbeIdentificationPair{
EBPFFuncName: tlsMessageParserCall,
EBPFFuncName: tlsHandleCommandCompleteTailCall,
},
},
},