From befd1dec67ea8e120e6d48bf211bf64805cec49e Mon Sep 17 00:00:00 2001 From: "amit.slavin" Date: Mon, 28 Oct 2024 10:52:53 +0200 Subject: [PATCH 01/16] init structs for the tail calls --- .../ebpf/c/protocols/classification/defs.h | 1 + .../ebpf/c/protocols/postgres/decoding-maps.h | 3 + .../ebpf/c/protocols/postgres/decoding.h | 103 ++++++++++++++---- pkg/network/ebpf/c/protocols/postgres/types.h | 14 ++- pkg/network/protocols/ebpf_types.go | 2 + pkg/network/protocols/ebpf_types_linux.go | 10 +- pkg/network/protocols/postgres/protocol.go | 12 ++ 7 files changed, 115 insertions(+), 30 deletions(-) diff --git a/pkg/network/ebpf/c/protocols/classification/defs.h b/pkg/network/ebpf/c/protocols/classification/defs.h index 823112a4fb7e1..3d19b1933869e 100644 --- a/pkg/network/ebpf/c/protocols/classification/defs.h +++ b/pkg/network/ebpf/c/protocols/classification/defs.h @@ -152,6 +152,7 @@ typedef enum { PROG_KAFKA_TERMINATION, PROG_GRPC, PROG_POSTGRES, + PROG_POSTGRES_MESSAGE_PARSER, PROG_POSTGRES_PROCESS_PARSE_MESSAGE, PROG_POSTGRES_TERMINATION, PROG_REDIS, diff --git a/pkg/network/ebpf/c/protocols/postgres/decoding-maps.h b/pkg/network/ebpf/c/protocols/postgres/decoding-maps.h index f90b61ef92048..67288e1243f9e 100644 --- a/pkg/network/ebpf/c/protocols/postgres/decoding-maps.h +++ b/pkg/network/ebpf/c/protocols/postgres/decoding-maps.h @@ -12,4 +12,7 @@ BPF_HASH_MAP(postgres_in_flight, conn_tuple_t, postgres_transaction_t, 0) // Acts as a scratch buffer for Postgres events, for preparing events before they are sent to userspace. BPF_PERCPU_ARRAY_MAP(postgres_scratch_buffer, postgres_event_t, 1) +// Maintains the current state of tail calls for each Postgres message. +BPF_PERCPU_ARRAY_MAP(postgres_iterations, postgres_tail_call_state_t, 1) + #endif diff --git a/pkg/network/ebpf/c/protocols/postgres/decoding.h b/pkg/network/ebpf/c/protocols/postgres/decoding.h index 550631ea1afdc..e64e49defeec9 100644 --- a/pkg/network/ebpf/c/protocols/postgres/decoding.h +++ b/pkg/network/ebpf/c/protocols/postgres/decoding.h @@ -110,9 +110,10 @@ static int __always_inline skip_string(pktbuf_t pkt, int message_len) { // Main processing logic for the Postgres protocol. It reads the first message header and decides what to do based on the // message tag. If the message is a new query, it stores the query in the in-flight map. If the message is a command // complete, it enqueues the transaction and deletes it from the in-flight map. If the message is not a command complete, -// it tries to read up to POSTGRES_MAX_MESSAGES messages, looking for a command complete message. +// it tries to read up to POSTGRES_MAX_MESSAGES_PAR_TAIL_CALL*POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES messages, looking for a command complete message. // If the message is not a new query or a command complete, it ignores the message. -static __always_inline void postgres_entrypoint(pktbuf_t pkt, conn_tuple_t *conn_tuple, struct pg_message_header *header, __u8 tags) { +static __always_inline void postgres_entrypoint(struct __sk_buff* skb, pktbuf_t pkt, conn_tuple_t *conn_tuple, struct pg_message_header *header, __u8 tags) { + const __u32 zero = 0; // If the message is a new query, we store the query in the in-flight map. // If we had a transaction for the connection, we override it and drops the previous one. if (header->message_tag == POSTGRES_QUERY_MAGIC_BYTE) { @@ -124,28 +125,16 @@ static __always_inline void postgres_entrypoint(pktbuf_t pkt, conn_tuple_t *conn handle_new_query(pkt, conn_tuple, header->message_len - sizeof(__u32), tags); return; } - - // We didn't find a new query, thus we assume we're in the middle of a transaction. - // We look up the transaction in the in-flight map, and if it doesn't exist, we ignore the message. - postgres_transaction_t *transaction = bpf_map_lookup_elem(&postgres_in_flight, conn_tuple); - if (!transaction) { + postgres_tail_call_state_t *iteration_value = bpf_map_lookup_elem(&postgres_iterations, &zero); + if (iteration_value == NULL) { return; } + iteration_value->iteration = 0; + iteration_value->data_off = 0; + bpf_map_update_elem(&postgres_iterations, &zero, iteration_value, BPF_ANY); + + bpf_tail_call_compat(skb, &protocols_progs, PROG_POSTGRES_MESSAGE_PARSER); -#pragma unroll(POSTGRES_MAX_MESSAGES) - for (__u32 iteration = 0; iteration < POSTGRES_MAX_MESSAGES; ++iteration) { - if (!read_message_header(pkt, header)) { - break; - } - if (header->message_tag == POSTGRES_COMMAND_COMPLETE_MAGIC_BYTE) { - handle_command_complete(conn_tuple, transaction); - break; - } - // We didn't find a command complete message, so we advance the data offset to the end of the message. - // reminder, the message length includes the size of the payload, 4 bytes of the message length itself, but not - // the message tag. So we need to add 1 to the message length to jump over the entire message. - pktbuf_advance(pkt, header->message_len + 1); - } return; } @@ -212,7 +201,73 @@ int socket__postgres_process(struct __sk_buff* skb) { return 0; } - postgres_entrypoint(pkt, &conn_tuple, &header, NO_TAGS); + postgres_entrypoint(skb, pkt, &conn_tuple, &header, NO_TAGS); + return 0; +} + +SEC("socket/postgres_message_parser") +int socket__postgres_message_parser(struct __sk_buff* skb) { + const __u32 zero = 0; + skb_info_t skb_info = {}; + conn_tuple_t conn_tuple = {}; + + if (!fetch_dispatching_arguments(&conn_tuple, &skb_info)) { + return 0; + } + + if (is_tcp_termination(&skb_info)) { + postgres_tcp_termination(&conn_tuple); + return 0; + } + + normalize_tuple(&conn_tuple); + + pktbuf_t pkt = pktbuf_from_skb(skb, &skb_info); + struct pg_message_header header; + if (!read_message_header(pkt, &header)) { + return 0; + } + + // We didn't find a new query, thus we assume we're in the middle of a transaction. + // We look up the transaction in the in-flight map, and if it doesn't exist, we ignore the message. + postgres_transaction_t *transaction = bpf_map_lookup_elem(&postgres_in_flight, &conn_tuple); + if (!transaction) { + return 0; + } + + + postgres_tail_call_state_t *iteration_value = bpf_map_lookup_elem(&postgres_iterations, &zero); + if (iteration_value == NULL) { + return 0; + } + if (iteration_value->iteration >= POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES) { + return 0; + } + if (iteration_value->data_off != 0) { + pkt.skb_info->data_off = iteration_value->data_off; + } + + +#pragma unroll(POSTGRES_MAX_MESSAGES_PER_TAIL_CALL) + for (__u32 iteration = 0; iteration < POSTGRES_MAX_MESSAGES_PER_TAIL_CALL; ++iteration) { + if (!read_message_header(pkt, &header)) { + break; + } + if (header.message_tag == POSTGRES_COMMAND_COMPLETE_MAGIC_BYTE) { + handle_command_complete(&conn_tuple, transaction); + break; + } + // We didn't find a command complete message, so we advance the data offset to the end of the message. + // reminder, the message length includes the size of the payload, 4 bytes of the message length itself, but not + // the message tag. So we need to add 1 to the message length to jump over the entire message. + pktbuf_advance(pkt, header.message_len + 1); + } + + iteration_value->iteration += 1; + iteration_value->data_off = pktbuf_data_offset(pkt); + bpf_map_update_elem(&postgres_iterations, &zero, iteration_value, BPF_ANY); + bpf_tail_call_compat(skb, &protocols_progs, PROG_POSTGRES_MESSAGE_PARSER); + return 0; } @@ -247,7 +302,7 @@ int uprobe__postgres_tls_process(struct pt_regs *ctx) { } // Copying the tuple to the stack to handle verifier issues on kernel 4.14. - conn_tuple_t tup = args->tup; +// conn_tuple_t tup = args->tup; pktbuf_t pkt = pktbuf_from_tls(ctx, args); struct pg_message_header header; @@ -260,7 +315,7 @@ int uprobe__postgres_tls_process(struct pt_regs *ctx) { bpf_tail_call_compat(ctx, &tls_process_progs, PROG_POSTGRES_PROCESS_PARSE_MESSAGE); return 0; } - postgres_entrypoint(pkt, &tup, &header, (__u8)args->tags); +// postgres_entrypoint(pkt, &tup, &header, (__u8)args->tags); return 0; } diff --git a/pkg/network/ebpf/c/protocols/postgres/types.h b/pkg/network/ebpf/c/protocols/postgres/types.h index 71521972a22bb..c5f98768e6b0c 100644 --- a/pkg/network/ebpf/c/protocols/postgres/types.h +++ b/pkg/network/ebpf/c/protocols/postgres/types.h @@ -6,8 +6,11 @@ // Maximum length of Postgres query to send to userspace. #define POSTGRES_BUFFER_SIZE 160 -// Maximum number of Postgres messages we can parse for a single packet. -#define POSTGRES_MAX_MESSAGES 80 +// Represents the maximum number of tail calls we can use to process a single message. +#define POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES 4 + +// Represents the maximum number of messages we process in a single tail call. +#define POSTGRES_MAX_MESSAGES_PER_TAIL_CALL 20 // Postgres transaction information we store in the kernel. typedef struct { @@ -26,4 +29,11 @@ typedef struct { postgres_transaction_t tx; } postgres_event_t; +typedef struct { + __u16 iteration; + // Saving the data offset is crucial for maintaining the current read position and ensuring proper utilization + // of tail calls. + __u32 data_off; +} postgres_tail_call_state_t; + #endif diff --git a/pkg/network/protocols/ebpf_types.go b/pkg/network/protocols/ebpf_types.go index 9fcf544843b86..41e3642bf0318 100644 --- a/pkg/network/protocols/ebpf_types.go +++ b/pkg/network/protocols/ebpf_types.go @@ -63,6 +63,8 @@ const ( ProgramKafkaTermination ProgramType = C.PROG_KAFKA_TERMINATION // ProgramPostgres is the Golang representation of the C.PROG_POSTGRES enum ProgramPostgres ProgramType = C.PROG_POSTGRES + // ProgramPostgresMessageParser is the Golang representation of the C.PROG_POSTGRES enum + ProgramPostgresMessageParser ProgramType = C.PROG_POSTGRES_MESSAGE_PARSER // ProgramPostgresParseMessage is the Golang representation of the C.PROG_POSTGRES_PROCESS_PARSE_MESSAGE enum ProgramPostgresParseMessage ProgramType = C.PROG_POSTGRES_PROCESS_PARSE_MESSAGE // ProgramPostgresTermination is tail call to process Postgres termination. diff --git a/pkg/network/protocols/ebpf_types_linux.go b/pkg/network/protocols/ebpf_types_linux.go index 9cc859e489174..2af59f9c4d0cc 100644 --- a/pkg/network/protocols/ebpf_types_linux.go +++ b/pkg/network/protocols/ebpf_types_linux.go @@ -52,13 +52,15 @@ const ( ProgramPostgres ProgramType = 0x12 - ProgramPostgresParseMessage ProgramType = 0x13 + ProgramPostgresMessageParser ProgramType = 0x13 - ProgramPostgresTermination ProgramType = 0x14 + ProgramPostgresParseMessage ProgramType = 0x14 - ProgramRedis ProgramType = 0x15 + ProgramPostgresTermination ProgramType = 0x15 - ProgramRedisTermination ProgramType = 0x16 + ProgramRedis ProgramType = 0x16 + + ProgramRedisTermination ProgramType = 0x17 ) type ebpfProtocolType uint16 diff --git a/pkg/network/protocols/postgres/protocol.go b/pkg/network/protocols/postgres/protocol.go index af1a51d688536..fe25dfbefdbfc 100644 --- a/pkg/network/protocols/postgres/protocol.go +++ b/pkg/network/protocols/postgres/protocol.go @@ -31,7 +31,9 @@ const ( // InFlightMap is the name of the in-flight map. InFlightMap = "postgres_in_flight" scratchBufferMap = "postgres_scratch_buffer" + iterationsMap = "postgres_iterations" processTailCall = "socket__postgres_process" + messageParserCall = "socket__postgres_message_parser" parseMessageTailCall = "socket__postgres_process_parse_message" tlsProcessTailCall = "uprobe__postgres_tls_process" tlsParseMessageTailCall = "uprobe__postgres_tls_process_parse_message" @@ -58,6 +60,9 @@ var Spec = &protocols.ProtocolSpec{ { Name: scratchBufferMap, }, + { + Name: iterationsMap, + }, { Name: "postgres_batch_events", }, @@ -76,6 +81,13 @@ var Spec = &protocols.ProtocolSpec{ EBPFFuncName: processTailCall, }, }, + { + ProgArrayName: protocols.ProtocolDispatcherProgramsMap, + Key: uint32(protocols.ProgramPostgresMessageParser), + ProbeIdentificationPair: manager.ProbeIdentificationPair{ + EBPFFuncName: messageParserCall, + }, + }, { ProgArrayName: protocols.ProtocolDispatcherProgramsMap, Key: uint32(protocols.ProgramPostgresParseMessage), From baf1166d269f752ed8da2fbbb32e8f86c8bc895b Mon Sep 17 00:00:00 2001 From: "amit.slavin" Date: Mon, 28 Oct 2024 11:51:16 +0200 Subject: [PATCH 02/16] use pktbuf --- .../ebpf/c/protocols/postgres/decoding.h | 41 ++++++++++++++----- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/pkg/network/ebpf/c/protocols/postgres/decoding.h b/pkg/network/ebpf/c/protocols/postgres/decoding.h index e64e49defeec9..12d546c2ad81b 100644 --- a/pkg/network/ebpf/c/protocols/postgres/decoding.h +++ b/pkg/network/ebpf/c/protocols/postgres/decoding.h @@ -112,7 +112,7 @@ static int __always_inline skip_string(pktbuf_t pkt, int message_len) { // complete, it enqueues the transaction and deletes it from the in-flight map. If the message is not a command complete, // it tries to read up to POSTGRES_MAX_MESSAGES_PAR_TAIL_CALL*POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES messages, looking for a command complete message. // If the message is not a new query or a command complete, it ignores the message. -static __always_inline void postgres_entrypoint(struct __sk_buff* skb, pktbuf_t pkt, conn_tuple_t *conn_tuple, struct pg_message_header *header, __u8 tags) { +static __always_inline void postgres_entrypoint(pktbuf_t pkt, conn_tuple_t *conn_tuple, struct pg_message_header *header, __u8 tags) { const __u32 zero = 0; // If the message is a new query, we store the query in the in-flight map. // If we had a transaction for the connection, we override it and drops the previous one. @@ -131,9 +131,18 @@ static __always_inline void postgres_entrypoint(struct __sk_buff* skb, pktbuf_t } iteration_value->iteration = 0; iteration_value->data_off = 0; - bpf_map_update_elem(&postgres_iterations, &zero, iteration_value, BPF_ANY); - - bpf_tail_call_compat(skb, &protocols_progs, PROG_POSTGRES_MESSAGE_PARSER); + pktbuf_tail_call_option_t message_parser_tail_call_array[] = { + [PKTBUF_SKB] = { + .prog_array_map = &protocols_progs, + .index = PROG_POSTGRES_MESSAGE_PARSER, + }, + [PKTBUF_TLS] = { + .prog_array_map = &tls_process_progs, + .index = PROG_POSTGRES_MESSAGE_PARSER, + }, + }; + + pktbuf_tail_call_compact(pkt, message_parser_tail_call_array); return; } @@ -201,7 +210,7 @@ int socket__postgres_process(struct __sk_buff* skb) { return 0; } - postgres_entrypoint(skb, pkt, &conn_tuple, &header, NO_TAGS); + postgres_entrypoint(pkt, &conn_tuple, &header, NO_TAGS); return 0; } @@ -235,14 +244,15 @@ int socket__postgres_message_parser(struct __sk_buff* skb) { return 0; } - postgres_tail_call_state_t *iteration_value = bpf_map_lookup_elem(&postgres_iterations, &zero); if (iteration_value == NULL) { return 0; } + if (iteration_value->iteration >= POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES) { return 0; } + if (iteration_value->data_off != 0) { pkt.skb_info->data_off = iteration_value->data_off; } @@ -265,8 +275,19 @@ int socket__postgres_message_parser(struct __sk_buff* skb) { iteration_value->iteration += 1; iteration_value->data_off = pktbuf_data_offset(pkt); - bpf_map_update_elem(&postgres_iterations, &zero, iteration_value, BPF_ANY); - bpf_tail_call_compat(skb, &protocols_progs, PROG_POSTGRES_MESSAGE_PARSER); + + pktbuf_tail_call_option_t message_parser_tail_call_array[] = { + [PKTBUF_SKB] = { + .prog_array_map = &protocols_progs, + .index = PROG_POSTGRES_MESSAGE_PARSER, + }, + [PKTBUF_TLS] = { + .prog_array_map = &tls_process_progs, + .index = PROG_POSTGRES_MESSAGE_PARSER, + }, + }; + + pktbuf_tail_call_compact(pkt, message_parser_tail_call_array); return 0; } @@ -302,7 +323,7 @@ int uprobe__postgres_tls_process(struct pt_regs *ctx) { } // Copying the tuple to the stack to handle verifier issues on kernel 4.14. -// conn_tuple_t tup = args->tup; + conn_tuple_t tup = args->tup; pktbuf_t pkt = pktbuf_from_tls(ctx, args); struct pg_message_header header; @@ -315,7 +336,7 @@ int uprobe__postgres_tls_process(struct pt_regs *ctx) { bpf_tail_call_compat(ctx, &tls_process_progs, PROG_POSTGRES_PROCESS_PARSE_MESSAGE); return 0; } -// postgres_entrypoint(pkt, &tup, &header, (__u8)args->tags); + postgres_entrypoint(pkt, &tup, &header, (__u8)args->tags); return 0; } From 6a5e84f7eadbbe6080d9fcb53237041842529abf Mon Sep 17 00:00:00 2001 From: "amit.slavin" Date: Mon, 28 Oct 2024 13:54:42 +0200 Subject: [PATCH 03/16] fix TLS --- .../ebpf/c/protocols/postgres/decoding.h | 78 ++++++++++++++++++- pkg/network/protocols/postgres/protocol.go | 8 ++ 2 files changed, 85 insertions(+), 1 deletion(-) diff --git a/pkg/network/ebpf/c/protocols/postgres/decoding.h b/pkg/network/ebpf/c/protocols/postgres/decoding.h index 12d546c2ad81b..7add8c70d0d52 100644 --- a/pkg/network/ebpf/c/protocols/postgres/decoding.h +++ b/pkg/network/ebpf/c/protocols/postgres/decoding.h @@ -125,10 +125,12 @@ static __always_inline void postgres_entrypoint(pktbuf_t pkt, conn_tuple_t *conn handle_new_query(pkt, conn_tuple, header->message_len - sizeof(__u32), tags); return; } + postgres_tail_call_state_t *iteration_value = bpf_map_lookup_elem(&postgres_iterations, &zero); if (iteration_value == NULL) { return; } + iteration_value->iteration = 0; iteration_value->data_off = 0; pktbuf_tail_call_option_t message_parser_tail_call_array[] = { @@ -254,7 +256,7 @@ int socket__postgres_message_parser(struct __sk_buff* skb) { } if (iteration_value->data_off != 0) { - pkt.skb_info->data_off = iteration_value->data_off; + pktbuf_set_offset(pkt, iteration_value->data_off); } @@ -375,4 +377,78 @@ int uprobe__postgres_tls_termination(struct pt_regs *ctx) { return 0; } +// Handles message parsing for a TLS Postgres connection. +SEC("uprobe/postgres_tls_message_parser") +int uprobe__postgres_tls_message_parser(struct pt_regs *ctx) { + const __u32 zero = 0; + + tls_dispatcher_arguments_t *args = bpf_map_lookup_elem(&tls_dispatcher_arguments, &zero); + if (args == NULL) { + return 0; + } + + // Copying the tuple to the stack to handle verifier issues on kernel 4.14. + conn_tuple_t tup = args->tup; + + pktbuf_t pkt = pktbuf_from_tls(ctx, args); + struct pg_message_header header; + if (!read_message_header(pkt, &header)) { + return 0; + } + + // We didn't find a new query, thus we assume we're in the middle of a transaction. + // We look up the transaction in the in-flight map, and if it doesn't exist, we ignore the message. + postgres_transaction_t *transaction = bpf_map_lookup_elem(&postgres_in_flight, &tup); + if (!transaction) { + return 0; + } + + postgres_tail_call_state_t *iteration_value = bpf_map_lookup_elem(&postgres_iterations, &zero); + if (iteration_value == NULL) { + return 0; + } + + if (iteration_value->iteration >= POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES) { + return 0; + } + + if (iteration_value->data_off != 0) { + pktbuf_set_offset(pkt, iteration_value->data_off); + } + + +#pragma unroll(POSTGRES_MAX_MESSAGES_PER_TAIL_CALL) + for (__u32 iteration = 0; iteration < POSTGRES_MAX_MESSAGES_PER_TAIL_CALL; ++iteration) { + if (!read_message_header(pkt, &header)) { + break; + } + if (header.message_tag == POSTGRES_COMMAND_COMPLETE_MAGIC_BYTE) { + handle_command_complete(&tup, transaction); + break; + } + // We didn't find a command complete message, so we advance the data offset to the end of the message. + // reminder, the message length includes the size of the payload, 4 bytes of the message length itself, but not + // the message tag. So we need to add 1 to the message length to jump over the entire message. + pktbuf_advance(pkt, header.message_len + 1); + } + + iteration_value->iteration += 1; + iteration_value->data_off = pktbuf_data_offset(pkt); + + pktbuf_tail_call_option_t message_parser_tail_call_array[] = { + [PKTBUF_SKB] = { + .prog_array_map = &protocols_progs, + .index = PROG_POSTGRES_MESSAGE_PARSER, + }, + [PKTBUF_TLS] = { + .prog_array_map = &tls_process_progs, + .index = PROG_POSTGRES_MESSAGE_PARSER, + }, + }; + + pktbuf_tail_call_compact(pkt, message_parser_tail_call_array); + + return 0; +} + #endif diff --git a/pkg/network/protocols/postgres/protocol.go b/pkg/network/protocols/postgres/protocol.go index fe25dfbefdbfc..f4442089dde86 100644 --- a/pkg/network/protocols/postgres/protocol.go +++ b/pkg/network/protocols/postgres/protocol.go @@ -38,6 +38,7 @@ const ( tlsProcessTailCall = "uprobe__postgres_tls_process" tlsParseMessageTailCall = "uprobe__postgres_tls_process_parse_message" tlsTerminationTailCall = "uprobe__postgres_tls_termination" + tlsMessageParserCall = "uprobe__postgres_tls_message_parser" eventStream = "postgres" ) @@ -116,6 +117,13 @@ var Spec = &protocols.ProtocolSpec{ EBPFFuncName: tlsTerminationTailCall, }, }, + { + ProgArrayName: protocols.TLSDispatcherProgramsMap, + Key: uint32(protocols.ProgramPostgresMessageParser), + ProbeIdentificationPair: manager.ProbeIdentificationPair{ + EBPFFuncName: tlsMessageParserCall, + }, + }, }, } From 62a906ede3ae5ad0a0fbba00770062a18377669d Mon Sep 17 00:00:00 2001 From: "amit.slavin" Date: Mon, 28 Oct 2024 14:25:07 +0200 Subject: [PATCH 04/16] reduced code duplication --- .../ebpf/c/protocols/postgres/decoding.h | 176 +++++++----------- 1 file changed, 65 insertions(+), 111 deletions(-) diff --git a/pkg/network/ebpf/c/protocols/postgres/decoding.h b/pkg/network/ebpf/c/protocols/postgres/decoding.h index 7add8c70d0d52..995a739d05be7 100644 --- a/pkg/network/ebpf/c/protocols/postgres/decoding.h +++ b/pkg/network/ebpf/c/protocols/postgres/decoding.h @@ -180,61 +180,12 @@ static __always_inline void postgres_handle_parse_message(pktbuf_t pkt, conn_tup return; } -// Entrypoint to process plaintext Postgres traffic. Pulls the connection tuple and the packet buffer from the map and -// calls the main processing function. If the packet is a TCP termination, it calls the termination function. -// If the message is a parse message, it tail calls to the dedicated function to handle it as it is too large to be -// inlined in the main entrypoint. Otherwise, it calls the main processing function. -SEC("socket/postgres_process") -int socket__postgres_process(struct __sk_buff* skb) { - skb_info_t skb_info = {}; - conn_tuple_t conn_tuple = {}; - - if (!fetch_dispatching_arguments(&conn_tuple, &skb_info)) { - return 0; - } - - if (is_tcp_termination(&skb_info)) { - postgres_tcp_termination(&conn_tuple); - return 0; - } - - normalize_tuple(&conn_tuple); - - pktbuf_t pkt = pktbuf_from_skb(skb, &skb_info); - struct pg_message_header header; - if (!read_message_header(pkt, &header)) { - return 0; - } - - // If the message is a parse message, we tail call to the dedicated function to handle it. - if (header.message_tag == POSTGRES_PARSE_MAGIC_BYTE) { - bpf_tail_call_compat(skb, &protocols_progs, PROG_POSTGRES_PROCESS_PARSE_MESSAGE); - return 0; - } - - postgres_entrypoint(pkt, &conn_tuple, &header, NO_TAGS); - return 0; -} - -SEC("socket/postgres_message_parser") -int socket__postgres_message_parser(struct __sk_buff* skb) { - const __u32 zero = 0; - skb_info_t skb_info = {}; - conn_tuple_t conn_tuple = {}; - - if (!fetch_dispatching_arguments(&conn_tuple, &skb_info)) { - return 0; - } - - if (is_tcp_termination(&skb_info)) { - postgres_tcp_termination(&conn_tuple); - return 0; - } - - normalize_tuple(&conn_tuple); - - pktbuf_t pkt = pktbuf_from_skb(skb, &skb_info); - struct pg_message_header header; +// A function for processing multiple messages per call up to POSTGRES_MAX_MESSAGES_PER_TAIL_CALL +// and handles tail call chaining for processing additional messages beyond this limit. +// (max POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES), serving both for plaintext and TLS Postgres traffic. +static __always_inline bool process_postgres_messages(pktbuf_t pkt, conn_tuple_t conn_tuple) { + const __u32 zero = 0; + struct pg_message_header header; if (!read_message_header(pkt, &header)) { return 0; } @@ -294,6 +245,64 @@ int socket__postgres_message_parser(struct __sk_buff* skb) { return 0; } +// Entrypoint to process plaintext Postgres traffic. Pulls the connection tuple and the packet buffer from the map and +// calls the main processing function. If the packet is a TCP termination, it calls the termination function. +// If the message is a parse message, it tail calls to the dedicated function to handle it as it is too large to be +// inlined in the main entrypoint. Otherwise, it calls the main processing function. +SEC("socket/postgres_process") +int socket__postgres_process(struct __sk_buff* skb) { + skb_info_t skb_info = {}; + conn_tuple_t conn_tuple = {}; + + if (!fetch_dispatching_arguments(&conn_tuple, &skb_info)) { + return 0; + } + + if (is_tcp_termination(&skb_info)) { + postgres_tcp_termination(&conn_tuple); + return 0; + } + + normalize_tuple(&conn_tuple); + + pktbuf_t pkt = pktbuf_from_skb(skb, &skb_info); + struct pg_message_header header; + if (!read_message_header(pkt, &header)) { + return 0; + } + + // If the message is a parse message, we tail call to the dedicated function to handle it. + if (header.message_tag == POSTGRES_PARSE_MAGIC_BYTE) { + bpf_tail_call_compat(skb, &protocols_progs, PROG_POSTGRES_PROCESS_PARSE_MESSAGE); + return 0; + } + + postgres_entrypoint(pkt, &conn_tuple, &header, NO_TAGS); + return 0; +} + +SEC("socket/postgres_message_parser") +int socket__postgres_message_parser(struct __sk_buff* skb) { + skb_info_t skb_info = {}; + conn_tuple_t conn_tuple = {}; + + if (!fetch_dispatching_arguments(&conn_tuple, &skb_info)) { + return 0; + } + + if (is_tcp_termination(&skb_info)) { + postgres_tcp_termination(&conn_tuple); + return 0; + } + + normalize_tuple(&conn_tuple); + + pktbuf_t pkt = pktbuf_from_skb(skb, &skb_info); + process_postgres_messages(pkt, conn_tuple); + + return 0; +} + // Handles plaintext Postgres Parse messages. Pulls the connection tuple and the packet buffer from the map and calls the // dedicated function to handle the message. SEC("socket/postgres_process_parse_message") @@ -391,62 +400,7 @@ int uprobe__postgres_tls_message_parser(struct pt_regs *ctx) { conn_tuple_t tup = args->tup; pktbuf_t pkt = pktbuf_from_tls(ctx, args); - struct pg_message_header header; - if (!read_message_header(pkt, &header)) { - return 0; - } - - // We didn't find a new query, thus we assume we're in the middle of a transaction. - // We look up the transaction in the in-flight map, and if it doesn't exist, we ignore the message. - postgres_transaction_t *transaction = bpf_map_lookup_elem(&postgres_in_flight, &tup); - if (!transaction) { - return 0; - } - - postgres_tail_call_state_t *iteration_value = bpf_map_lookup_elem(&postgres_iterations, &zero); - if (iteration_value == NULL) { - return 0; - } - - if (iteration_value->iteration >= POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES) { - return 0; - } - - if (iteration_value->data_off != 0) { - pktbuf_set_offset(pkt, iteration_value->data_off); - } - - -#pragma unroll(POSTGRES_MAX_MESSAGES_PER_TAIL_CALL) - for (__u32 iteration = 0; iteration < POSTGRES_MAX_MESSAGES_PER_TAIL_CALL; ++iteration) { - if (!read_message_header(pkt, &header)) { - break; - } - if (header.message_tag == POSTGRES_COMMAND_COMPLETE_MAGIC_BYTE) { - handle_command_complete(&tup, transaction); - break; - } - // We didn't find a command complete message, so we advance the data offset to the end of the message. - // reminder, the message length includes the size of the payload, 4 bytes of the message length itself, but not - // the message tag. So we need to add 1 to the message length to jump over the entire message. - pktbuf_advance(pkt, header.message_len + 1); - } - - iteration_value->iteration += 1; - iteration_value->data_off = pktbuf_data_offset(pkt); - - pktbuf_tail_call_option_t message_parser_tail_call_array[] = { - [PKTBUF_SKB] = { - .prog_array_map = &protocols_progs, - .index = PROG_POSTGRES_MESSAGE_PARSER, - }, - [PKTBUF_TLS] = { - .prog_array_map = &tls_process_progs, - .index = PROG_POSTGRES_MESSAGE_PARSER, - }, - }; - - pktbuf_tail_call_compact(pkt, message_parser_tail_call_array); + process_postgres_messages(pkt, tup); return 0; } From eb1f4925945767d0be5e6eef3da51f02cef7c392 Mon Sep 17 00:00:00 2001 From: "amit.slavin" Date: Mon, 28 Oct 2024 16:17:34 +0200 Subject: [PATCH 05/16] typos --- pkg/network/ebpf/c/protocols/postgres/decoding.h | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/pkg/network/ebpf/c/protocols/postgres/decoding.h b/pkg/network/ebpf/c/protocols/postgres/decoding.h index 995a739d05be7..9610f4767bea3 100644 --- a/pkg/network/ebpf/c/protocols/postgres/decoding.h +++ b/pkg/network/ebpf/c/protocols/postgres/decoding.h @@ -143,9 +143,7 @@ static __always_inline void postgres_entrypoint(pktbuf_t pkt, conn_tuple_t *conn .index = PROG_POSTGRES_MESSAGE_PARSER, }, }; - pktbuf_tail_call_compact(pkt, message_parser_tail_call_array); - return; } @@ -210,7 +208,6 @@ static __always_inline bool process_postgres_messages(pktbuf_t pkt, conn_tuple_t pktbuf_set_offset(pkt, iteration_value->data_off); } - #pragma unroll(POSTGRES_MAX_MESSAGES_PER_TAIL_CALL) for (__u32 iteration = 0; iteration < POSTGRES_MAX_MESSAGES_PER_TAIL_CALL; ++iteration) { if (!read_message_header(pkt, &header)) { @@ -241,7 +238,6 @@ static __always_inline bool process_postgres_messages(pktbuf_t pkt, conn_tuple_t }; pktbuf_tail_call_compact(pkt, message_parser_tail_call_array); - return 0; } @@ -281,6 +277,7 @@ int socket__postgres_process(struct __sk_buff* skb) { return 0; } +// Handles the message parsing for plaintext Postgres traffic. SEC("socket/postgres_message_parser") int socket__postgres_message_parser(struct __sk_buff* skb) { skb_info_t skb_info = {}; @@ -299,7 +296,6 @@ int socket__postgres_message_parser(struct __sk_buff* skb) { pktbuf_t pkt = pktbuf_from_skb(skb, &skb_info); process_postgres_messages(pkt, conn_tuple); - return 0; } @@ -386,7 +382,7 @@ int uprobe__postgres_tls_termination(struct pt_regs *ctx) { return 0; } -// Handles message parsing for a TLS Postgres connection. +// Handles message parsing for a TLS Postgres traffic. SEC("uprobe/postgres_tls_message_parser") int uprobe__postgres_tls_message_parser(struct pt_regs *ctx) { const __u32 zero = 0; @@ -398,10 +394,8 @@ int uprobe__postgres_tls_message_parser(struct pt_regs *ctx) { // Copying the tuple to the stack to handle verifier issues on kernel 4.14. conn_tuple_t tup = args->tup; - pktbuf_t pkt = pktbuf_from_tls(ctx, args); process_postgres_messages(pkt, tup); - return 0; } From 4a5eedea7388d427a2b11a7d813d80b86811b48f Mon Sep 17 00:00:00 2001 From: "amit.slavin" Date: Tue, 29 Oct 2024 15:56:37 +0200 Subject: [PATCH 06/16] update program names to make flow clearer --- .../ebpf/c/protocols/classification/defs.h | 2 +- .../ebpf/c/protocols/postgres/decoding.h | 115 +++++++++--------- pkg/network/ebpf/c/protocols/postgres/types.h | 2 +- pkg/network/protocols/ebpf_types.go | 4 +- pkg/network/protocols/ebpf_types_linux.go | 2 +- pkg/network/protocols/postgres/protocol.go | 34 +++--- 6 files changed, 80 insertions(+), 79 deletions(-) diff --git a/pkg/network/ebpf/c/protocols/classification/defs.h b/pkg/network/ebpf/c/protocols/classification/defs.h index 3d19b1933869e..83368c054aa72 100644 --- a/pkg/network/ebpf/c/protocols/classification/defs.h +++ b/pkg/network/ebpf/c/protocols/classification/defs.h @@ -152,7 +152,7 @@ typedef enum { PROG_KAFKA_TERMINATION, PROG_GRPC, PROG_POSTGRES, - PROG_POSTGRES_MESSAGE_PARSER, + PROG_POSTGRES_HANDLE_COMMAND_COMPLETE, PROG_POSTGRES_PROCESS_PARSE_MESSAGE, PROG_POSTGRES_TERMINATION, PROG_REDIS, diff --git a/pkg/network/ebpf/c/protocols/postgres/decoding.h b/pkg/network/ebpf/c/protocols/postgres/decoding.h index 9610f4767bea3..62e3fd1a07711 100644 --- a/pkg/network/ebpf/c/protocols/postgres/decoding.h +++ b/pkg/network/ebpf/c/protocols/postgres/decoding.h @@ -107,13 +107,28 @@ static int __always_inline skip_string(pktbuf_t pkt, int message_len) { return SKIP_STRING_FAILED; } -// Main processing logic for the Postgres protocol. It reads the first message header and decides what to do based on the -// message tag. If the message is a new query, it stores the query in the in-flight map. If the message is a command -// complete, it enqueues the transaction and deletes it from the in-flight map. If the message is not a command complete, -// it tries to read up to POSTGRES_MAX_MESSAGES_PAR_TAIL_CALL*POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES messages, looking for a command complete message. -// If the message is not a new query or a command complete, it ignores the message. -static __always_inline void postgres_entrypoint(pktbuf_t pkt, conn_tuple_t *conn_tuple, struct pg_message_header *header, __u8 tags) { +// postgres_handle_message reads the first message header and decides what to do based on the +// message tag. If the message is a new query, it stores the query in the in-flight map. +// If the message is a parse message, we tail call to the dedicated process_parse_message program. +// If the message is a command complete, it calls the handle_command_complete program. +static __always_inline void postgres_handle_message(pktbuf_t pkt, conn_tuple_t *conn_tuple, struct pg_message_header *header, __u8 tags) { const __u32 zero = 0; + // If the message is a parse message, we tail call to the dedicated function to handle it. + if (header->message_tag == POSTGRES_PARSE_MAGIC_BYTE) { + pktbuf_tail_call_option_t process_parse_tail_call_array[] = { + [PKTBUF_SKB] = { + .prog_array_map = &protocols_progs, + .index = PROG_POSTGRES_PROCESS_PARSE_MESSAGE, + }, + [PKTBUF_TLS] = { + .prog_array_map = &tls_process_progs, + .index = PROG_POSTGRES_PROCESS_PARSE_MESSAGE, + }, + }; + pktbuf_tail_call_compact(pkt, process_parse_tail_call_array); + return; + } + // If the message is a new query, we store the query in the in-flight map. // If we had a transaction for the connection, we override it and drops the previous one. if (header->message_tag == POSTGRES_QUERY_MAGIC_BYTE) { @@ -133,17 +148,17 @@ static __always_inline void postgres_entrypoint(pktbuf_t pkt, conn_tuple_t *conn iteration_value->iteration = 0; iteration_value->data_off = 0; - pktbuf_tail_call_option_t message_parser_tail_call_array[] = { + pktbuf_tail_call_option_t handle_command_complete_tail_call_array[] = { [PKTBUF_SKB] = { .prog_array_map = &protocols_progs, - .index = PROG_POSTGRES_MESSAGE_PARSER, + .index = PROG_POSTGRES_HANDLE_COMMAND_COMPLETE, }, [PKTBUF_TLS] = { .prog_array_map = &tls_process_progs, - .index = PROG_POSTGRES_MESSAGE_PARSER, + .index = PROG_POSTGRES_HANDLE_COMMAND_COMPLETE, }, }; - pktbuf_tail_call_compact(pkt, message_parser_tail_call_array); + pktbuf_tail_call_compact(pkt, handle_command_complete_tail_call_array); return; } @@ -178,12 +193,13 @@ static __always_inline void postgres_handle_parse_message(pktbuf_t pkt, conn_tup return; } -// A function for processing multiple messages per call up to POSTGRES_MAX_MESSAGES_PER_TAIL_CALL -// and handles tail call chaining for processing additional messages beyond this limit. -// (max POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES), serving both for plaintext and TLS Postgres traffic. -static __always_inline bool process_postgres_messages(pktbuf_t pkt, conn_tuple_t conn_tuple) { - const __u32 zero = 0; - struct pg_message_header header; +// Handles Postgres command complete messages by examining packet data for both plaintext and TLS traffic. +// This function handles multiple messages within a single packet, processing up to POSTGRES_MAX_MESSAGES_PER_TAIL_CALL +// messages per call. When more messages exist beyond this limit, it uses tail call chaining (up to +// POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES) to continue processing. +static __always_inline bool handle_command_complete_messages(pktbuf_t pkt, conn_tuple_t conn_tuple) { + const __u32 zero = 0; + struct pg_message_header header; if (!read_message_header(pkt, &header)) { return 0; } @@ -225,28 +241,24 @@ static __always_inline bool process_postgres_messages(pktbuf_t pkt, conn_tuple_t iteration_value->iteration += 1; iteration_value->data_off = pktbuf_data_offset(pkt); - - pktbuf_tail_call_option_t message_parser_tail_call_array[] = { - [PKTBUF_SKB] = { - .prog_array_map = &protocols_progs, - .index = PROG_POSTGRES_MESSAGE_PARSER, - }, - [PKTBUF_TLS] = { - .prog_array_map = &tls_process_progs, - .index = PROG_POSTGRES_MESSAGE_PARSER, - }, - }; - - pktbuf_tail_call_compact(pkt, message_parser_tail_call_array); + pktbuf_tail_call_option_t handle_command_complete_tail_call_array[] = { + [PKTBUF_SKB] = { + .prog_array_map = &protocols_progs, + .index = PROG_POSTGRES_HANDLE_COMMAND_COMPLETE, + }, + [PKTBUF_TLS] = { + .prog_array_map = &tls_process_progs, + .index = PROG_POSTGRES_HANDLE_COMMAND_COMPLETE, + }, + }; + pktbuf_tail_call_compact(pkt, handle_command_complete_tail_call_array); return 0; } // Entrypoint to process plaintext Postgres traffic. Pulls the connection tuple and the packet buffer from the map and // calls the main processing function. If the packet is a TCP termination, it calls the termination function. -// If the message is a parse message, it tail calls to the dedicated function to handle it as it is too large to be -// inlined in the main entrypoint. Otherwise, it calls the main processing function. -SEC("socket/postgres_process") -int socket__postgres_process(struct __sk_buff* skb) { +SEC("socket/postgres_handle") +int socket__postgres_handle(struct __sk_buff* skb) { skb_info_t skb_info = {}; conn_tuple_t conn_tuple = {}; @@ -267,19 +279,14 @@ int socket__postgres_process(struct __sk_buff* skb) { return 0; } - // If the message is a parse message, we tail call to the dedicated function to handle it. - if (header.message_tag == POSTGRES_PARSE_MAGIC_BYTE) { - bpf_tail_call_compat(skb, &protocols_progs, PROG_POSTGRES_PROCESS_PARSE_MESSAGE); - return 0; - } - - postgres_entrypoint(pkt, &conn_tuple, &header, NO_TAGS); + postgres_handle_message(pkt, &conn_tuple, &header, NO_TAGS); return 0; } -// Handles the message parsing for plaintext Postgres traffic. -SEC("socket/postgres_message_parser") -int socket__postgres_message_parser(struct __sk_buff* skb) { +// Handles plain text command complete messages for plaintext Postgres traffic. Pulls the connection tuple and the +// packet buffer from the map and calls the dedicated function to handle the message. +SEC("socket/postgres_handle_command_complete") +int socket__postgres_handle_command_complete(struct __sk_buff* skb) { skb_info_t skb_info = {}; conn_tuple_t conn_tuple = {}; @@ -295,7 +302,7 @@ int socket__postgres_message_parser(struct __sk_buff* skb) { normalize_tuple(&conn_tuple); pktbuf_t pkt = pktbuf_from_skb(skb, &skb_info); - process_postgres_messages(pkt, conn_tuple); + handle_command_complete_messages(pkt, conn_tuple); return 0; } @@ -318,10 +325,9 @@ int socket__postgres_process_parse_message(struct __sk_buff* skb) { } // Entrypoint to process TLS Postgres traffic. Pulls the connection tuple and the packet buffer from the map and calls -// the main processing function. If the packet is a Parse message, it tail calls to the dedicated function to handle it. -// Otherwise, it calls the main processing function. -SEC("uprobe/postgres_tls_process") -int uprobe__postgres_tls_process(struct pt_regs *ctx) { +// the main processing function. +SEC("uprobe/postgres_tls_handle") +int uprobe__postgres_tls_handle(struct pt_regs *ctx) { const __u32 zero = 0; tls_dispatcher_arguments_t *args = bpf_map_lookup_elem(&tls_dispatcher_arguments, &zero); @@ -338,12 +344,7 @@ int uprobe__postgres_tls_process(struct pt_regs *ctx) { return 0; } - // If the message is a parse message, we tail call to the dedicated function to handle it. - if (header.message_tag == POSTGRES_PARSE_MAGIC_BYTE) { - bpf_tail_call_compat(ctx, &tls_process_progs, PROG_POSTGRES_PROCESS_PARSE_MESSAGE); - return 0; - } - postgres_entrypoint(pkt, &tup, &header, (__u8)args->tags); + postgres_handle_message(pkt, &tup, &header, (__u8)args->tags); return 0; } @@ -383,8 +384,8 @@ int uprobe__postgres_tls_termination(struct pt_regs *ctx) { } // Handles message parsing for a TLS Postgres traffic. -SEC("uprobe/postgres_tls_message_parser") -int uprobe__postgres_tls_message_parser(struct pt_regs *ctx) { +SEC("uprobe/postgres_tls_handle_command_complete") +int uprobe__postgres_tls_handle_command_complete(struct pt_regs *ctx) { const __u32 zero = 0; tls_dispatcher_arguments_t *args = bpf_map_lookup_elem(&tls_dispatcher_arguments, &zero); @@ -395,7 +396,7 @@ int uprobe__postgres_tls_message_parser(struct pt_regs *ctx) { // Copying the tuple to the stack to handle verifier issues on kernel 4.14. conn_tuple_t tup = args->tup; pktbuf_t pkt = pktbuf_from_tls(ctx, args); - process_postgres_messages(pkt, tup); + handle_command_complete_messages(pkt, tup); return 0; } diff --git a/pkg/network/ebpf/c/protocols/postgres/types.h b/pkg/network/ebpf/c/protocols/postgres/types.h index c5f98768e6b0c..9deea8ae24118 100644 --- a/pkg/network/ebpf/c/protocols/postgres/types.h +++ b/pkg/network/ebpf/c/protocols/postgres/types.h @@ -30,7 +30,7 @@ typedef struct { } postgres_event_t; typedef struct { - __u16 iteration; + __u8 iteration; // Saving the data offset is crucial for maintaining the current read position and ensuring proper utilization // of tail calls. __u32 data_off; diff --git a/pkg/network/protocols/ebpf_types.go b/pkg/network/protocols/ebpf_types.go index 41e3642bf0318..ce5fea46afa6b 100644 --- a/pkg/network/protocols/ebpf_types.go +++ b/pkg/network/protocols/ebpf_types.go @@ -63,8 +63,8 @@ const ( ProgramKafkaTermination ProgramType = C.PROG_KAFKA_TERMINATION // ProgramPostgres is the Golang representation of the C.PROG_POSTGRES enum ProgramPostgres ProgramType = C.PROG_POSTGRES - // ProgramPostgresMessageParser is the Golang representation of the C.PROG_POSTGRES enum - ProgramPostgresMessageParser ProgramType = C.PROG_POSTGRES_MESSAGE_PARSER + // ProgramPostgresHandleCommandComplete is the Golang representation of the C.PROG_POSTGRES_HANDLE_COMMAND_COMPLETE enum + ProgramPostgresHandleCommandComplete ProgramType = C.PROG_POSTGRES_HANDLE_COMMAND_COMPLETE // ProgramPostgresParseMessage is the Golang representation of the C.PROG_POSTGRES_PROCESS_PARSE_MESSAGE enum ProgramPostgresParseMessage ProgramType = C.PROG_POSTGRES_PROCESS_PARSE_MESSAGE // ProgramPostgresTermination is tail call to process Postgres termination. diff --git a/pkg/network/protocols/ebpf_types_linux.go b/pkg/network/protocols/ebpf_types_linux.go index 2af59f9c4d0cc..826b559d48666 100644 --- a/pkg/network/protocols/ebpf_types_linux.go +++ b/pkg/network/protocols/ebpf_types_linux.go @@ -52,7 +52,7 @@ const ( ProgramPostgres ProgramType = 0x12 - ProgramPostgresMessageParser ProgramType = 0x13 + ProgramPostgresHandleCommandComplete ProgramType = 0x13 ProgramPostgresParseMessage ProgramType = 0x14 diff --git a/pkg/network/protocols/postgres/protocol.go b/pkg/network/protocols/postgres/protocol.go index f4442089dde86..b34ab6ca71892 100644 --- a/pkg/network/protocols/postgres/protocol.go +++ b/pkg/network/protocols/postgres/protocol.go @@ -29,17 +29,17 @@ import ( const ( // InFlightMap is the name of the in-flight map. - InFlightMap = "postgres_in_flight" - scratchBufferMap = "postgres_scratch_buffer" - iterationsMap = "postgres_iterations" - processTailCall = "socket__postgres_process" - messageParserCall = "socket__postgres_message_parser" - parseMessageTailCall = "socket__postgres_process_parse_message" - tlsProcessTailCall = "uprobe__postgres_tls_process" - tlsParseMessageTailCall = "uprobe__postgres_tls_process_parse_message" - tlsTerminationTailCall = "uprobe__postgres_tls_termination" - tlsMessageParserCall = "uprobe__postgres_tls_message_parser" - eventStream = "postgres" + InFlightMap = "postgres_in_flight" + scratchBufferMap = "postgres_scratch_buffer" + iterationsMap = "postgres_iterations" + handleTailCall = "socket__postgres_handle" + handleCommandCompleteTailCall = "socket__postgres_handle_command_complete" + parseMessageTailCall = "socket__postgres_process_parse_message" + tlsHandleTailCall = "uprobe__postgres_tls_handle" + tlsParseMessageTailCall = "uprobe__postgres_tls_process_parse_message" + tlsTerminationTailCall = "uprobe__postgres_tls_termination" + tlsHandleCommandCompleteTailCall = "uprobe__postgres_tls_handle_command_complete" + eventStream = "postgres" ) // protocol holds the state of the postgres protocol monitoring. @@ -79,14 +79,14 @@ var Spec = &protocols.ProtocolSpec{ ProgArrayName: protocols.ProtocolDispatcherProgramsMap, Key: uint32(protocols.ProgramPostgres), ProbeIdentificationPair: manager.ProbeIdentificationPair{ - EBPFFuncName: processTailCall, + EBPFFuncName: handleTailCall, }, }, { ProgArrayName: protocols.ProtocolDispatcherProgramsMap, - Key: uint32(protocols.ProgramPostgresMessageParser), + Key: uint32(protocols.ProgramPostgresHandleCommandComplete), ProbeIdentificationPair: manager.ProbeIdentificationPair{ - EBPFFuncName: messageParserCall, + EBPFFuncName: handleCommandCompleteTailCall, }, }, { @@ -100,7 +100,7 @@ var Spec = &protocols.ProtocolSpec{ ProgArrayName: protocols.TLSDispatcherProgramsMap, Key: uint32(protocols.ProgramPostgres), ProbeIdentificationPair: manager.ProbeIdentificationPair{ - EBPFFuncName: tlsProcessTailCall, + EBPFFuncName: tlsHandleTailCall, }, }, { @@ -119,9 +119,9 @@ var Spec = &protocols.ProtocolSpec{ }, { ProgArrayName: protocols.TLSDispatcherProgramsMap, - Key: uint32(protocols.ProgramPostgresMessageParser), + Key: uint32(protocols.ProgramPostgresHandleCommandComplete), ProbeIdentificationPair: manager.ProbeIdentificationPair{ - EBPFFuncName: tlsMessageParserCall, + EBPFFuncName: tlsHandleCommandCompleteTailCall, }, }, }, From 075d026e52fe15d939fcd4d2e66e5304ca06e759 Mon Sep 17 00:00:00 2001 From: "amit.slavin" Date: Tue, 29 Oct 2024 16:22:01 +0200 Subject: [PATCH 07/16] increasing POSTGRES_MAX_MESSAGES_PER_TAIL_CALL to reach max limits --- pkg/network/ebpf/c/protocols/postgres/types.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/network/ebpf/c/protocols/postgres/types.h b/pkg/network/ebpf/c/protocols/postgres/types.h index 9deea8ae24118..5a90b908dcd76 100644 --- a/pkg/network/ebpf/c/protocols/postgres/types.h +++ b/pkg/network/ebpf/c/protocols/postgres/types.h @@ -7,10 +7,10 @@ #define POSTGRES_BUFFER_SIZE 160 // Represents the maximum number of tail calls we can use to process a single message. -#define POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES 4 +#define POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES 1 // Represents the maximum number of messages we process in a single tail call. -#define POSTGRES_MAX_MESSAGES_PER_TAIL_CALL 20 +#define POSTGRES_MAX_MESSAGES_PER_TAIL_CALL 100 // Postgres transaction information we store in the kernel. typedef struct { @@ -31,7 +31,7 @@ typedef struct { typedef struct { __u8 iteration; - // Saving the data offset is crucial for maintaining the current read position and ensuring proper utilization + // Saving the packet data offset is crucial for maintaining the current read position and ensuring proper utilization // of tail calls. __u32 data_off; } postgres_tail_call_state_t; From 8f7f385a856e4034267533177920166d4ada9ba9 Mon Sep 17 00:00:00 2001 From: "amit.slavin" Date: Tue, 29 Oct 2024 17:41:52 +0200 Subject: [PATCH 08/16] reduced POSTGRES_MAX_MESSAGES_PER_TAIL_CALL to support older kernel versions --- pkg/network/ebpf/c/protocols/postgres/decoding.h | 1 + pkg/network/ebpf/c/protocols/postgres/types.h | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/network/ebpf/c/protocols/postgres/decoding.h b/pkg/network/ebpf/c/protocols/postgres/decoding.h index 62e3fd1a07711..602988e071f8e 100644 --- a/pkg/network/ebpf/c/protocols/postgres/decoding.h +++ b/pkg/network/ebpf/c/protocols/postgres/decoding.h @@ -65,6 +65,7 @@ static __always_inline void handle_command_complete(conn_tuple_t *conn_tuple, po bpf_map_delete_elem(&postgres_in_flight, conn_tuple); } +// Handles a TCP termination event by deleting the connection tuple from the in-flight map. static void __always_inline postgres_tcp_termination(conn_tuple_t *tup) { bpf_map_delete_elem(&postgres_in_flight, tup); flip_tuple(tup); diff --git a/pkg/network/ebpf/c/protocols/postgres/types.h b/pkg/network/ebpf/c/protocols/postgres/types.h index 5a90b908dcd76..bd5cdb06bd117 100644 --- a/pkg/network/ebpf/c/protocols/postgres/types.h +++ b/pkg/network/ebpf/c/protocols/postgres/types.h @@ -10,7 +10,7 @@ #define POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES 1 // Represents the maximum number of messages we process in a single tail call. -#define POSTGRES_MAX_MESSAGES_PER_TAIL_CALL 100 +#define POSTGRES_MAX_MESSAGES_PER_TAIL_CALL 90 // Postgres transaction information we store in the kernel. typedef struct { From c601f3c6cdeec1e390eb4f010288c38c2793ff68 Mon Sep 17 00:00:00 2001 From: "amit.slavin" Date: Wed, 30 Oct 2024 09:52:15 +0200 Subject: [PATCH 09/16] reduced POSTGRES_MAX_MESSAGES_PER_TAIL_CALL to support older kernel versions --- pkg/network/ebpf/c/protocols/postgres/types.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/network/ebpf/c/protocols/postgres/types.h b/pkg/network/ebpf/c/protocols/postgres/types.h index bd5cdb06bd117..0e88332dc78a7 100644 --- a/pkg/network/ebpf/c/protocols/postgres/types.h +++ b/pkg/network/ebpf/c/protocols/postgres/types.h @@ -10,7 +10,7 @@ #define POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES 1 // Represents the maximum number of messages we process in a single tail call. -#define POSTGRES_MAX_MESSAGES_PER_TAIL_CALL 90 +#define POSTGRES_MAX_MESSAGES_PER_TAIL_CALL 80 // Postgres transaction information we store in the kernel. typedef struct { From 15f0b719cd42eb4d3bd971a511c139d0876dfa4d Mon Sep 17 00:00:00 2001 From: "amit.slavin" Date: Thu, 31 Oct 2024 15:35:42 +0200 Subject: [PATCH 10/16] added uts and fix some cr notes --- .../ebpf/c/protocols/postgres/decoding.h | 7 +- pkg/network/protocols/ebpf_types.go | 9 ++- pkg/network/protocols/ebpf_types_linux.go | 7 +- pkg/network/usm/postgres_monitor_test.go | 69 +++++++++++++++++++ 4 files changed, 81 insertions(+), 11 deletions(-) diff --git a/pkg/network/ebpf/c/protocols/postgres/decoding.h b/pkg/network/ebpf/c/protocols/postgres/decoding.h index 602988e071f8e..7b148595b653b 100644 --- a/pkg/network/ebpf/c/protocols/postgres/decoding.h +++ b/pkg/network/ebpf/c/protocols/postgres/decoding.h @@ -114,7 +114,8 @@ static int __always_inline skip_string(pktbuf_t pkt, int message_len) { // If the message is a command complete, it calls the handle_command_complete program. static __always_inline void postgres_handle_message(pktbuf_t pkt, conn_tuple_t *conn_tuple, struct pg_message_header *header, __u8 tags) { const __u32 zero = 0; - // If the message is a parse message, we tail call to the dedicated function to handle it. + // If the message is a parse message, we tail call to the dedicated function to handle it as it is too large to be + // inlined in the main function. if (header->message_tag == POSTGRES_PARSE_MAGIC_BYTE) { pktbuf_tail_call_option_t process_parse_tail_call_array[] = { [PKTBUF_SKB] = { @@ -201,10 +202,6 @@ static __always_inline void postgres_handle_parse_message(pktbuf_t pkt, conn_tup static __always_inline bool handle_command_complete_messages(pktbuf_t pkt, conn_tuple_t conn_tuple) { const __u32 zero = 0; struct pg_message_header header; - if (!read_message_header(pkt, &header)) { - return 0; - } - // We didn't find a new query, thus we assume we're in the middle of a transaction. // We look up the transaction in the in-flight map, and if it doesn't exist, we ignore the message. postgres_transaction_t *transaction = bpf_map_lookup_elem(&postgres_in_flight, &conn_tuple); diff --git a/pkg/network/protocols/ebpf_types.go b/pkg/network/protocols/ebpf_types.go index ce5fea46afa6b..52be2af2600e0 100644 --- a/pkg/network/protocols/ebpf_types.go +++ b/pkg/network/protocols/ebpf_types.go @@ -9,12 +9,14 @@ package protocols // #cgo CFLAGS: -I ../../ebpf/c -I ../ebpf/c // #include "../ebpf/c/protocols/classification/defs.h" +// #include "../ebpf/c/protocols/postgres/types.h" import "C" const ( - layerAPIBit = C.LAYER_API_BIT - layerApplicationBit = C.LAYER_APPLICATION_BIT - layerEncryptionBit = C.LAYER_ENCRYPTION_BIT + layerAPIBit = C.LAYER_API_BIT + layerApplicationBit = C.LAYER_APPLICATION_BIT + layerEncryptionBit = C.LAYER_ENCRYPTION_BIT + PostgresMaxMessagesPerTailCall = C.POSTGRES_MAX_MESSAGES_PER_TAIL_CALL ) // DispatcherProgramType is a C type to represent the eBPF programs used for tail calls. @@ -100,4 +102,5 @@ const ( ebpfMySQL ebpfProtocolType = C.PROTOCOL_MYSQL // GRPC protocol ebpfGRPC ebpfProtocolType = C.PROTOCOL_GRPC + // todo: fix doc ) diff --git a/pkg/network/protocols/ebpf_types_linux.go b/pkg/network/protocols/ebpf_types_linux.go index 826b559d48666..0814db01788ec 100644 --- a/pkg/network/protocols/ebpf_types_linux.go +++ b/pkg/network/protocols/ebpf_types_linux.go @@ -4,9 +4,10 @@ package protocols const ( - layerAPIBit = 0x2000 - layerApplicationBit = 0x4000 - layerEncryptionBit = 0x8000 + layerAPIBit = 0x2000 + layerApplicationBit = 0x4000 + layerEncryptionBit = 0x8000 + PostgresMaxMessagesPerTailCall = 0x50 ) type DispatcherProgramType uint32 diff --git a/pkg/network/usm/postgres_monitor_test.go b/pkg/network/usm/postgres_monitor_test.go index 94a188ab4d31b..129f7f0f9e245 100644 --- a/pkg/network/usm/postgres_monitor_test.go +++ b/pkg/network/usm/postgres_monitor_test.go @@ -586,6 +586,75 @@ func testDecoding(t *testing.T, isTLS bool) { validatePostgres(t, monitor, map[string]map[postgres.Operation]int{}, isTLS) }, }, + // The purpose of this test is to validate the POSTGRES_MAX_MESSAGES_PER_TAIL_CALL limit. + { + name: "validate supporting POSTGRES_MAX_MESSAGES_PER_TAIL_CALL limit", + preMonitorSetup: func(t *testing.T, ctx pgTestContext) { + pg, err := postgres.NewPGXClient(postgres.ConnectionOptions{ + ServerAddress: ctx.serverAddress, + EnableTLS: isTLS, + }) + require.NoError(t, err) + require.NoError(t, pg.Ping()) + ctx.extras["pg"] = pg + }, + postMonitorSetup: func(t *testing.T, ctx pgTestContext) { + pg, err := postgres.NewPGXClient(postgres.ConnectionOptions{ + ServerAddress: ctx.serverAddress, + EnableTLS: isTLS, + }) + require.NoError(t, err) + require.NoError(t, pg.Ping()) + ctx.extras["pg"] = pg + require.NoError(t, pg.RunQuery(createTableQuery)) + // We reduce the limit by 2 messages because the protocol adds messages at the beginning of the maximum message response. + require.NoError(t, pg.RunQuery(createInsertQuery(generateTestValues(1, protocols.PostgresMaxMessagesPerTailCall-3)...))) + require.NoError(t, pg.RunQuery(selectAllQuery)) + }, + validation: func(t *testing.T, _ pgTestContext, monitor *Monitor) { + validatePostgres(t, monitor, map[string]map[postgres.Operation]int{ + "dummy": { + postgres.SelectOP: adjustCount(1), + postgres.InsertOP: adjustCount(1), + postgres.CreateTableOP: adjustCount(1), + }, + }, isTLS) + }, + }, + // This test validates that when we exceed the POSTGRES_MAX_MESSAGES_PER_TAIL_CALL limit, + // the request is not captured as we will miss the response.In this case, it applies to the SELECT query. + { + name: "validate exceeding POSTGRES_MAX_MESSAGES_PER_TAIL_CALL limit is not supported", + preMonitorSetup: func(t *testing.T, ctx pgTestContext) { + pg, err := postgres.NewPGXClient(postgres.ConnectionOptions{ + ServerAddress: ctx.serverAddress, + EnableTLS: isTLS, + }) + require.NoError(t, err) + require.NoError(t, pg.Ping()) + ctx.extras["pg"] = pg + }, + postMonitorSetup: func(t *testing.T, ctx pgTestContext) { + pg, err := postgres.NewPGXClient(postgres.ConnectionOptions{ + ServerAddress: ctx.serverAddress, + EnableTLS: isTLS, + }) + require.NoError(t, err) + require.NoError(t, pg.Ping()) + ctx.extras["pg"] = pg + require.NoError(t, pg.RunQuery(createTableQuery)) + require.NoError(t, pg.RunQuery(createInsertQuery(generateTestValues(1, protocols.PostgresMaxMessagesPerTailCall+1)...))) + require.NoError(t, pg.RunQuery(selectAllQuery)) + }, + validation: func(t *testing.T, _ pgTestContext, monitor *Monitor) { + validatePostgres(t, monitor, map[string]map[postgres.Operation]int{ + "dummy": { + postgres.InsertOP: adjustCount(1), + postgres.CreateTableOP: adjustCount(1), + }, + }, isTLS) + }, + }, } for _, tt := range tests { From 2714b2cb9c32509a10853c7d0aadf7f0908fb41b Mon Sep 17 00:00:00 2001 From: "amit.slavin" Date: Thu, 31 Oct 2024 15:36:44 +0200 Subject: [PATCH 11/16] removed doc --- pkg/network/protocols/ebpf_types.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/network/protocols/ebpf_types.go b/pkg/network/protocols/ebpf_types.go index 52be2af2600e0..9998b6b9db11c 100644 --- a/pkg/network/protocols/ebpf_types.go +++ b/pkg/network/protocols/ebpf_types.go @@ -102,5 +102,4 @@ const ( ebpfMySQL ebpfProtocolType = C.PROTOCOL_MYSQL // GRPC protocol ebpfGRPC ebpfProtocolType = C.PROTOCOL_GRPC - // todo: fix doc ) From 69c88406352a29dcbb0f619caee6fd1b2322ba03 Mon Sep 17 00:00:00 2001 From: "amit.slavin" Date: Sun, 3 Nov 2024 10:35:43 +0200 Subject: [PATCH 12/16] updated uts --- pkg/network/protocols/ebpf_types.go | 11 ++++++++--- pkg/network/protocols/ebpf_types_linux.go | 10 +++++++--- pkg/network/usm/postgres_monitor_test.go | 11 ++++++----- 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/pkg/network/protocols/ebpf_types.go b/pkg/network/protocols/ebpf_types.go index 9998b6b9db11c..4db716db11122 100644 --- a/pkg/network/protocols/ebpf_types.go +++ b/pkg/network/protocols/ebpf_types.go @@ -13,10 +13,15 @@ package protocols import "C" const ( - layerAPIBit = C.LAYER_API_BIT - layerApplicationBit = C.LAYER_APPLICATION_BIT - layerEncryptionBit = C.LAYER_ENCRYPTION_BIT + layerAPIBit = C.LAYER_API_BIT + layerApplicationBit = C.LAYER_APPLICATION_BIT + layerEncryptionBit = C.LAYER_ENCRYPTION_BIT +) + +// Represents the maximum number of messages that can be processed in our Postgres decoding solution. +const ( PostgresMaxMessagesPerTailCall = C.POSTGRES_MAX_MESSAGES_PER_TAIL_CALL + PostgresMaxTailCalls = C.POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES ) // DispatcherProgramType is a C type to represent the eBPF programs used for tail calls. diff --git a/pkg/network/protocols/ebpf_types_linux.go b/pkg/network/protocols/ebpf_types_linux.go index 0814db01788ec..2f57405a460ae 100644 --- a/pkg/network/protocols/ebpf_types_linux.go +++ b/pkg/network/protocols/ebpf_types_linux.go @@ -4,10 +4,14 @@ package protocols const ( - layerAPIBit = 0x2000 - layerApplicationBit = 0x4000 - layerEncryptionBit = 0x8000 + layerAPIBit = 0x2000 + layerApplicationBit = 0x4000 + layerEncryptionBit = 0x8000 +) + +const ( PostgresMaxMessagesPerTailCall = 0x50 + PostgresMaxTailCalls = 0x1 ) type DispatcherProgramType uint32 diff --git a/pkg/network/usm/postgres_monitor_test.go b/pkg/network/usm/postgres_monitor_test.go index 129f7f0f9e245..37e8dc6447aa4 100644 --- a/pkg/network/usm/postgres_monitor_test.go +++ b/pkg/network/usm/postgres_monitor_test.go @@ -45,6 +45,7 @@ const ( alterTableQuery = "ALTER TABLE dummy ADD test VARCHAR(255);" truncateTableQuery = "TRUNCATE TABLE dummy" showQuery = "SHOW search_path" + maxSupportedMessages = protocols.PostgresMaxMessagesPerTailCall * protocols.PostgresMaxTailCalls ) var ( @@ -586,7 +587,7 @@ func testDecoding(t *testing.T, isTLS bool) { validatePostgres(t, monitor, map[string]map[postgres.Operation]int{}, isTLS) }, }, - // The purpose of this test is to validate the POSTGRES_MAX_MESSAGES_PER_TAIL_CALL limit. + // The purpose of this test is to validate the POSTGRES_MAX_MESSAGES_PER_TAIL_CALL * POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES limit. { name: "validate supporting POSTGRES_MAX_MESSAGES_PER_TAIL_CALL limit", preMonitorSetup: func(t *testing.T, ctx pgTestContext) { @@ -608,7 +609,7 @@ func testDecoding(t *testing.T, isTLS bool) { ctx.extras["pg"] = pg require.NoError(t, pg.RunQuery(createTableQuery)) // We reduce the limit by 2 messages because the protocol adds messages at the beginning of the maximum message response. - require.NoError(t, pg.RunQuery(createInsertQuery(generateTestValues(1, protocols.PostgresMaxMessagesPerTailCall-3)...))) + require.NoError(t, pg.RunQuery(createInsertQuery(generateTestValues(1, maxSupportedMessages-3)...))) require.NoError(t, pg.RunQuery(selectAllQuery)) }, validation: func(t *testing.T, _ pgTestContext, monitor *Monitor) { @@ -621,10 +622,10 @@ func testDecoding(t *testing.T, isTLS bool) { }, isTLS) }, }, - // This test validates that when we exceed the POSTGRES_MAX_MESSAGES_PER_TAIL_CALL limit, + // This test validates that when we exceed the POSTGRES_MAX_MESSAGES_PER_TAIL_CALL * POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES limit, // the request is not captured as we will miss the response.In this case, it applies to the SELECT query. { - name: "validate exceeding POSTGRES_MAX_MESSAGES_PER_TAIL_CALL limit is not supported", + name: "validate exceeding max supported messages limit is not supported", preMonitorSetup: func(t *testing.T, ctx pgTestContext) { pg, err := postgres.NewPGXClient(postgres.ConnectionOptions{ ServerAddress: ctx.serverAddress, @@ -643,7 +644,7 @@ func testDecoding(t *testing.T, isTLS bool) { require.NoError(t, pg.Ping()) ctx.extras["pg"] = pg require.NoError(t, pg.RunQuery(createTableQuery)) - require.NoError(t, pg.RunQuery(createInsertQuery(generateTestValues(1, protocols.PostgresMaxMessagesPerTailCall+1)...))) + require.NoError(t, pg.RunQuery(createInsertQuery(generateTestValues(1, maxSupportedMessages+1)...))) require.NoError(t, pg.RunQuery(selectAllQuery)) }, validation: func(t *testing.T, _ pgTestContext, monitor *Monitor) { From 3b24f9c7ccd04af0b108a1468e3d7b6e63516409 Mon Sep 17 00:00:00 2001 From: "amit.slavin" Date: Sun, 3 Nov 2024 10:51:20 +0200 Subject: [PATCH 13/16] changed handle command complete to handle response --- .../ebpf/c/protocols/classification/defs.h | 2 +- .../ebpf/c/protocols/postgres/decoding.h | 30 +++++++++---------- pkg/network/protocols/ebpf_types.go | 4 +-- pkg/network/protocols/ebpf_types_linux.go | 2 +- pkg/network/protocols/postgres/protocol.go | 30 +++++++++---------- 5 files changed, 34 insertions(+), 34 deletions(-) diff --git a/pkg/network/ebpf/c/protocols/classification/defs.h b/pkg/network/ebpf/c/protocols/classification/defs.h index 83368c054aa72..46dbfd8e7df51 100644 --- a/pkg/network/ebpf/c/protocols/classification/defs.h +++ b/pkg/network/ebpf/c/protocols/classification/defs.h @@ -152,7 +152,7 @@ typedef enum { PROG_KAFKA_TERMINATION, PROG_GRPC, PROG_POSTGRES, - PROG_POSTGRES_HANDLE_COMMAND_COMPLETE, + PROG_POSTGRES_HANDLE_RESPONSE, PROG_POSTGRES_PROCESS_PARSE_MESSAGE, PROG_POSTGRES_TERMINATION, PROG_REDIS, diff --git a/pkg/network/ebpf/c/protocols/postgres/decoding.h b/pkg/network/ebpf/c/protocols/postgres/decoding.h index 7b148595b653b..c96cce7479803 100644 --- a/pkg/network/ebpf/c/protocols/postgres/decoding.h +++ b/pkg/network/ebpf/c/protocols/postgres/decoding.h @@ -150,17 +150,17 @@ static __always_inline void postgres_handle_message(pktbuf_t pkt, conn_tuple_t * iteration_value->iteration = 0; iteration_value->data_off = 0; - pktbuf_tail_call_option_t handle_command_complete_tail_call_array[] = { + pktbuf_tail_call_option_t handle_response_tail_call_array[] = { [PKTBUF_SKB] = { .prog_array_map = &protocols_progs, - .index = PROG_POSTGRES_HANDLE_COMMAND_COMPLETE, + .index = PROG_POSTGRES_HANDLE_RESPONSE, }, [PKTBUF_TLS] = { .prog_array_map = &tls_process_progs, - .index = PROG_POSTGRES_HANDLE_COMMAND_COMPLETE, + .index = PROG_POSTGRES_HANDLE_RESPONSE, }, }; - pktbuf_tail_call_compact(pkt, handle_command_complete_tail_call_array); + pktbuf_tail_call_compact(pkt, handle_response_tail_call_array); return; } @@ -199,7 +199,7 @@ static __always_inline void postgres_handle_parse_message(pktbuf_t pkt, conn_tup // This function handles multiple messages within a single packet, processing up to POSTGRES_MAX_MESSAGES_PER_TAIL_CALL // messages per call. When more messages exist beyond this limit, it uses tail call chaining (up to // POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES) to continue processing. -static __always_inline bool handle_command_complete_messages(pktbuf_t pkt, conn_tuple_t conn_tuple) { +static __always_inline bool handle_response(pktbuf_t pkt, conn_tuple_t conn_tuple) { const __u32 zero = 0; struct pg_message_header header; // We didn't find a new query, thus we assume we're in the middle of a transaction. @@ -239,17 +239,17 @@ static __always_inline bool handle_command_complete_messages(pktbuf_t pkt, conn_ iteration_value->iteration += 1; iteration_value->data_off = pktbuf_data_offset(pkt); - pktbuf_tail_call_option_t handle_command_complete_tail_call_array[] = { + pktbuf_tail_call_option_t handle_response_tail_call_array[] = { [PKTBUF_SKB] = { .prog_array_map = &protocols_progs, - .index = PROG_POSTGRES_HANDLE_COMMAND_COMPLETE, + .index = PROG_POSTGRES_HANDLE_RESPONSE, }, [PKTBUF_TLS] = { .prog_array_map = &tls_process_progs, - .index = PROG_POSTGRES_HANDLE_COMMAND_COMPLETE, + .index = PROG_POSTGRES_HANDLE_RESPONSE, }, }; - pktbuf_tail_call_compact(pkt, handle_command_complete_tail_call_array); + pktbuf_tail_call_compact(pkt, handle_response_tail_call_array); return 0; } @@ -283,8 +283,8 @@ int socket__postgres_handle(struct __sk_buff* skb) { // Handles plain text command complete messages for plaintext Postgres traffic. Pulls the connection tuple and the // packet buffer from the map and calls the dedicated function to handle the message. -SEC("socket/postgres_handle_command_complete") -int socket__postgres_handle_command_complete(struct __sk_buff* skb) { +SEC("socket/postgres_handle_response") +int socket__postgres_handle_response(struct __sk_buff* skb) { skb_info_t skb_info = {}; conn_tuple_t conn_tuple = {}; @@ -300,7 +300,7 @@ int socket__postgres_handle_command_complete(struct __sk_buff* skb) { normalize_tuple(&conn_tuple); pktbuf_t pkt = pktbuf_from_skb(skb, &skb_info); - handle_command_complete_messages(pkt, conn_tuple); + handle_response(pkt, conn_tuple); return 0; } @@ -382,8 +382,8 @@ int uprobe__postgres_tls_termination(struct pt_regs *ctx) { } // Handles message parsing for a TLS Postgres traffic. -SEC("uprobe/postgres_tls_handle_command_complete") -int uprobe__postgres_tls_handle_command_complete(struct pt_regs *ctx) { +SEC("uprobe/postgres_tls_handle_response") +int uprobe__postgres_tls_handle_response(struct pt_regs *ctx) { const __u32 zero = 0; tls_dispatcher_arguments_t *args = bpf_map_lookup_elem(&tls_dispatcher_arguments, &zero); @@ -394,7 +394,7 @@ int uprobe__postgres_tls_handle_command_complete(struct pt_regs *ctx) { // Copying the tuple to the stack to handle verifier issues on kernel 4.14. conn_tuple_t tup = args->tup; pktbuf_t pkt = pktbuf_from_tls(ctx, args); - handle_command_complete_messages(pkt, tup); + handle_response(pkt, tup); return 0; } diff --git a/pkg/network/protocols/ebpf_types.go b/pkg/network/protocols/ebpf_types.go index 4db716db11122..87d1617c1f80d 100644 --- a/pkg/network/protocols/ebpf_types.go +++ b/pkg/network/protocols/ebpf_types.go @@ -70,8 +70,8 @@ const ( ProgramKafkaTermination ProgramType = C.PROG_KAFKA_TERMINATION // ProgramPostgres is the Golang representation of the C.PROG_POSTGRES enum ProgramPostgres ProgramType = C.PROG_POSTGRES - // ProgramPostgresHandleCommandComplete is the Golang representation of the C.PROG_POSTGRES_HANDLE_COMMAND_COMPLETE enum - ProgramPostgresHandleCommandComplete ProgramType = C.PROG_POSTGRES_HANDLE_COMMAND_COMPLETE + // ProgramPostgresHandleResponse is the Golang representation of the C.PROG_POSTGRES_HANDLE_RESPONSE enum + ProgramPostgresHandleResponse ProgramType = C.PROG_POSTGRES_HANDLE_RESPONSE // ProgramPostgresParseMessage is the Golang representation of the C.PROG_POSTGRES_PROCESS_PARSE_MESSAGE enum ProgramPostgresParseMessage ProgramType = C.PROG_POSTGRES_PROCESS_PARSE_MESSAGE // ProgramPostgresTermination is tail call to process Postgres termination. diff --git a/pkg/network/protocols/ebpf_types_linux.go b/pkg/network/protocols/ebpf_types_linux.go index 2f57405a460ae..1386e5c97a2a2 100644 --- a/pkg/network/protocols/ebpf_types_linux.go +++ b/pkg/network/protocols/ebpf_types_linux.go @@ -57,7 +57,7 @@ const ( ProgramPostgres ProgramType = 0x12 - ProgramPostgresHandleCommandComplete ProgramType = 0x13 + ProgramPostgresHandleResponse ProgramType = 0x13 ProgramPostgresParseMessage ProgramType = 0x14 diff --git a/pkg/network/protocols/postgres/protocol.go b/pkg/network/protocols/postgres/protocol.go index b34ab6ca71892..fd044eb715055 100644 --- a/pkg/network/protocols/postgres/protocol.go +++ b/pkg/network/protocols/postgres/protocol.go @@ -29,17 +29,17 @@ import ( const ( // InFlightMap is the name of the in-flight map. - InFlightMap = "postgres_in_flight" - scratchBufferMap = "postgres_scratch_buffer" - iterationsMap = "postgres_iterations" - handleTailCall = "socket__postgres_handle" - handleCommandCompleteTailCall = "socket__postgres_handle_command_complete" - parseMessageTailCall = "socket__postgres_process_parse_message" - tlsHandleTailCall = "uprobe__postgres_tls_handle" - tlsParseMessageTailCall = "uprobe__postgres_tls_process_parse_message" - tlsTerminationTailCall = "uprobe__postgres_tls_termination" - tlsHandleCommandCompleteTailCall = "uprobe__postgres_tls_handle_command_complete" - eventStream = "postgres" + InFlightMap = "postgres_in_flight" + scratchBufferMap = "postgres_scratch_buffer" + iterationsMap = "postgres_iterations" + handleTailCall = "socket__postgres_handle" + handleResponseTailCall = "socket__postgres_handle_response" + parseMessageTailCall = "socket__postgres_process_parse_message" + tlsHandleTailCall = "uprobe__postgres_tls_handle" + tlsParseMessageTailCall = "uprobe__postgres_tls_process_parse_message" + tlsTerminationTailCall = "uprobe__postgres_tls_termination" + tlsHandleResponseTailCall = "uprobe__postgres_tls_handle_response" + eventStream = "postgres" ) // protocol holds the state of the postgres protocol monitoring. @@ -84,9 +84,9 @@ var Spec = &protocols.ProtocolSpec{ }, { ProgArrayName: protocols.ProtocolDispatcherProgramsMap, - Key: uint32(protocols.ProgramPostgresHandleCommandComplete), + Key: uint32(protocols.ProgramPostgresHandleResponse), ProbeIdentificationPair: manager.ProbeIdentificationPair{ - EBPFFuncName: handleCommandCompleteTailCall, + EBPFFuncName: handleResponseTailCall, }, }, { @@ -119,9 +119,9 @@ var Spec = &protocols.ProtocolSpec{ }, { ProgArrayName: protocols.TLSDispatcherProgramsMap, - Key: uint32(protocols.ProgramPostgresHandleCommandComplete), + Key: uint32(protocols.ProgramPostgresHandleResponse), ProbeIdentificationPair: manager.ProbeIdentificationPair{ - EBPFFuncName: tlsHandleCommandCompleteTailCall, + EBPFFuncName: tlsHandleResponseTailCall, }, }, }, From 89b5b95144c5e142f415661cad5ac01e04955ed5 Mon Sep 17 00:00:00 2001 From: "amit.slavin" Date: Sun, 3 Nov 2024 15:50:41 +0200 Subject: [PATCH 14/16] fixed cr notes --- .../ebpf/c/protocols/postgres/decoding.h | 86 ++++++++++--------- pkg/network/protocols/ebpf_types.go | 5 +- pkg/network/protocols/ebpf_types_linux.go | 3 +- pkg/network/usm/postgres_monitor_test.go | 20 +---- 4 files changed, 56 insertions(+), 58 deletions(-) diff --git a/pkg/network/ebpf/c/protocols/postgres/decoding.h b/pkg/network/ebpf/c/protocols/postgres/decoding.h index c96cce7479803..ab7ef0348fe56 100644 --- a/pkg/network/ebpf/c/protocols/postgres/decoding.h +++ b/pkg/network/ebpf/c/protocols/postgres/decoding.h @@ -108,25 +108,24 @@ static int __always_inline skip_string(pktbuf_t pkt, int message_len) { return SKIP_STRING_FAILED; } -// postgres_handle_message reads the first message header and decides what to do based on the +// Reads the first message header and decides what to do based on the // message tag. If the message is a new query, it stores the query in the in-flight map. // If the message is a parse message, we tail call to the dedicated process_parse_message program. // If the message is a command complete, it calls the handle_command_complete program. static __always_inline void postgres_handle_message(pktbuf_t pkt, conn_tuple_t *conn_tuple, struct pg_message_header *header, __u8 tags) { - const __u32 zero = 0; // If the message is a parse message, we tail call to the dedicated function to handle it as it is too large to be // inlined in the main function. if (header->message_tag == POSTGRES_PARSE_MAGIC_BYTE) { pktbuf_tail_call_option_t process_parse_tail_call_array[] = { - [PKTBUF_SKB] = { - .prog_array_map = &protocols_progs, - .index = PROG_POSTGRES_PROCESS_PARSE_MESSAGE, - }, - [PKTBUF_TLS] = { - .prog_array_map = &tls_process_progs, - .index = PROG_POSTGRES_PROCESS_PARSE_MESSAGE, - }, - }; + [PKTBUF_SKB] = { + .prog_array_map = &protocols_progs, + .index = PROG_POSTGRES_PROCESS_PARSE_MESSAGE, + }, + [PKTBUF_TLS] = { + .prog_array_map = &tls_process_progs, + .index = PROG_POSTGRES_PROCESS_PARSE_MESSAGE, + }, + }; pktbuf_tail_call_compact(pkt, process_parse_tail_call_array); return; } @@ -143,6 +142,7 @@ static __always_inline void postgres_handle_message(pktbuf_t pkt, conn_tuple_t * return; } + const __u32 zero = 0; postgres_tail_call_state_t *iteration_value = bpf_map_lookup_elem(&postgres_iterations, &zero); if (iteration_value == NULL) { return; @@ -151,15 +151,15 @@ static __always_inline void postgres_handle_message(pktbuf_t pkt, conn_tuple_t * iteration_value->iteration = 0; iteration_value->data_off = 0; pktbuf_tail_call_option_t handle_response_tail_call_array[] = { - [PKTBUF_SKB] = { - .prog_array_map = &protocols_progs, - .index = PROG_POSTGRES_HANDLE_RESPONSE, - }, - [PKTBUF_TLS] = { - .prog_array_map = &tls_process_progs, - .index = PROG_POSTGRES_HANDLE_RESPONSE, - }, - }; + [PKTBUF_SKB] = { + .prog_array_map = &protocols_progs, + .index = PROG_POSTGRES_HANDLE_RESPONSE, + }, + [PKTBUF_TLS] = { + .prog_array_map = &tls_process_progs, + .index = PROG_POSTGRES_HANDLE_RESPONSE, + }, + }; pktbuf_tail_call_compact(pkt, handle_response_tail_call_array); return; } @@ -201,16 +201,12 @@ static __always_inline void postgres_handle_parse_message(pktbuf_t pkt, conn_tup // POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES) to continue processing. static __always_inline bool handle_response(pktbuf_t pkt, conn_tuple_t conn_tuple) { const __u32 zero = 0; + bool found_command_complete = false; struct pg_message_header header; - // We didn't find a new query, thus we assume we're in the middle of a transaction. - // We look up the transaction in the in-flight map, and if it doesn't exist, we ignore the message. - postgres_transaction_t *transaction = bpf_map_lookup_elem(&postgres_in_flight, &conn_tuple); - if (!transaction) { - return 0; - } postgres_tail_call_state_t *iteration_value = bpf_map_lookup_elem(&postgres_iterations, &zero); if (iteration_value == NULL) { + bpf_map_delete_elem(&postgres_in_flight, &conn_tuple); return 0; } @@ -222,6 +218,13 @@ static __always_inline bool handle_response(pktbuf_t pkt, conn_tuple_t conn_tupl pktbuf_set_offset(pkt, iteration_value->data_off); } + // We didn't find a new query, thus we assume we're in the middle of a transaction. + // We look up the transaction in the in-flight map, and if it doesn't exist, we ignore the message. + postgres_transaction_t *transaction = bpf_map_lookup_elem(&postgres_in_flight, &conn_tuple); + if (!transaction) { + return 0; + } + #pragma unroll(POSTGRES_MAX_MESSAGES_PER_TAIL_CALL) for (__u32 iteration = 0; iteration < POSTGRES_MAX_MESSAGES_PER_TAIL_CALL; ++iteration) { if (!read_message_header(pkt, &header)) { @@ -229,6 +232,7 @@ static __always_inline bool handle_response(pktbuf_t pkt, conn_tuple_t conn_tupl } if (header.message_tag == POSTGRES_COMMAND_COMPLETE_MAGIC_BYTE) { handle_command_complete(&conn_tuple, transaction); + found_command_complete = true; break; } // We didn't find a command complete message, so we advance the data offset to the end of the message. @@ -237,19 +241,23 @@ static __always_inline bool handle_response(pktbuf_t pkt, conn_tuple_t conn_tupl pktbuf_advance(pkt, header.message_len + 1); } - iteration_value->iteration += 1; - iteration_value->data_off = pktbuf_data_offset(pkt); - pktbuf_tail_call_option_t handle_response_tail_call_array[] = { - [PKTBUF_SKB] = { - .prog_array_map = &protocols_progs, - .index = PROG_POSTGRES_HANDLE_RESPONSE, - }, - [PKTBUF_TLS] = { - .prog_array_map = &tls_process_progs, - .index = PROG_POSTGRES_HANDLE_RESPONSE, - }, - }; - pktbuf_tail_call_compact(pkt, handle_response_tail_call_array); + if (!found_command_complete) { + // We didn't find a command complete message, so we need to continue processing the packet. + // We save the current data offset and increment the iteration counter. + iteration_value->iteration += 1; + iteration_value->data_off = pktbuf_data_offset(pkt); + pktbuf_tail_call_option_t handle_response_tail_call_array[] = { + [PKTBUF_SKB] = { + .prog_array_map = &protocols_progs, + .index = PROG_POSTGRES_HANDLE_RESPONSE, + }, + [PKTBUF_TLS] = { + .prog_array_map = &tls_process_progs, + .index = PROG_POSTGRES_HANDLE_RESPONSE, + }, + }; + pktbuf_tail_call_compact(pkt, handle_response_tail_call_array); + } return 0; } diff --git a/pkg/network/protocols/ebpf_types.go b/pkg/network/protocols/ebpf_types.go index 87d1617c1f80d..57bdb4539bf5e 100644 --- a/pkg/network/protocols/ebpf_types.go +++ b/pkg/network/protocols/ebpf_types.go @@ -18,10 +18,11 @@ const ( layerEncryptionBit = C.LAYER_ENCRYPTION_BIT ) -// Represents the maximum number of messages that can be processed in our Postgres decoding solution. const ( + // PostgresMaxMessagesPerTailCall is the maximum number of messages that can be processed in a single tail call in our Postgres decoding solution PostgresMaxMessagesPerTailCall = C.POSTGRES_MAX_MESSAGES_PER_TAIL_CALL - PostgresMaxTailCalls = C.POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES + // PostgresMaxTailCalls is the maximum number of tail calls that can be made in our Postgres decoding solution + PostgresMaxTailCalls = C.POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES ) // DispatcherProgramType is a C type to represent the eBPF programs used for tail calls. diff --git a/pkg/network/protocols/ebpf_types_linux.go b/pkg/network/protocols/ebpf_types_linux.go index 1386e5c97a2a2..ae855cfe48b77 100644 --- a/pkg/network/protocols/ebpf_types_linux.go +++ b/pkg/network/protocols/ebpf_types_linux.go @@ -11,7 +11,8 @@ const ( const ( PostgresMaxMessagesPerTailCall = 0x50 - PostgresMaxTailCalls = 0x1 + + PostgresMaxTailCalls = 0x1 ) type DispatcherProgramType uint32 diff --git a/pkg/network/usm/postgres_monitor_test.go b/pkg/network/usm/postgres_monitor_test.go index 37e8dc6447aa4..ccd8620940963 100644 --- a/pkg/network/usm/postgres_monitor_test.go +++ b/pkg/network/usm/postgres_monitor_test.go @@ -398,11 +398,7 @@ func testDecoding(t *testing.T, isTLS bool) { ctx.extras["pg"] = pg }, postMonitorSetup: func(t *testing.T, ctx pgTestContext) { - pg, err := postgres.NewPGXClient(postgres.ConnectionOptions{ - ServerAddress: ctx.serverAddress, - EnableTLS: isTLS, - }) - require.NoError(t, err) + pg := ctx.extras["pg"].(*postgres.PGXClient) require.NoError(t, pg.Ping()) ctx.extras["pg"] = pg require.NoError(t, pg.RunQuery(createTableQuery)) @@ -589,7 +585,7 @@ func testDecoding(t *testing.T, isTLS bool) { }, // The purpose of this test is to validate the POSTGRES_MAX_MESSAGES_PER_TAIL_CALL * POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES limit. { - name: "validate supporting POSTGRES_MAX_MESSAGES_PER_TAIL_CALL limit", + name: "validate supporting max supported messages limit", preMonitorSetup: func(t *testing.T, ctx pgTestContext) { pg, err := postgres.NewPGXClient(postgres.ConnectionOptions{ ServerAddress: ctx.serverAddress, @@ -600,11 +596,7 @@ func testDecoding(t *testing.T, isTLS bool) { ctx.extras["pg"] = pg }, postMonitorSetup: func(t *testing.T, ctx pgTestContext) { - pg, err := postgres.NewPGXClient(postgres.ConnectionOptions{ - ServerAddress: ctx.serverAddress, - EnableTLS: isTLS, - }) - require.NoError(t, err) + pg := ctx.extras["pg"].(*postgres.PGXClient) require.NoError(t, pg.Ping()) ctx.extras["pg"] = pg require.NoError(t, pg.RunQuery(createTableQuery)) @@ -636,11 +628,7 @@ func testDecoding(t *testing.T, isTLS bool) { ctx.extras["pg"] = pg }, postMonitorSetup: func(t *testing.T, ctx pgTestContext) { - pg, err := postgres.NewPGXClient(postgres.ConnectionOptions{ - ServerAddress: ctx.serverAddress, - EnableTLS: isTLS, - }) - require.NoError(t, err) + pg := ctx.extras["pg"].(*postgres.PGXClient) require.NoError(t, pg.Ping()) ctx.extras["pg"] = pg require.NoError(t, pg.RunQuery(createTableQuery)) From 764f177ac490133e9175798ce3d50009a95b8210 Mon Sep 17 00:00:00 2001 From: "amit.slavin" Date: Sun, 3 Nov 2024 18:06:02 +0200 Subject: [PATCH 15/16] fixed cr note --- .../ebpf/c/protocols/postgres/decoding.h | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/pkg/network/ebpf/c/protocols/postgres/decoding.h b/pkg/network/ebpf/c/protocols/postgres/decoding.h index ab7ef0348fe56..35c8ab4710b5a 100644 --- a/pkg/network/ebpf/c/protocols/postgres/decoding.h +++ b/pkg/network/ebpf/c/protocols/postgres/decoding.h @@ -241,23 +241,24 @@ static __always_inline bool handle_response(pktbuf_t pkt, conn_tuple_t conn_tupl pktbuf_advance(pkt, header.message_len + 1); } - if (!found_command_complete) { - // We didn't find a command complete message, so we need to continue processing the packet. - // We save the current data offset and increment the iteration counter. - iteration_value->iteration += 1; - iteration_value->data_off = pktbuf_data_offset(pkt); - pktbuf_tail_call_option_t handle_response_tail_call_array[] = { - [PKTBUF_SKB] = { - .prog_array_map = &protocols_progs, - .index = PROG_POSTGRES_HANDLE_RESPONSE, - }, - [PKTBUF_TLS] = { - .prog_array_map = &tls_process_progs, - .index = PROG_POSTGRES_HANDLE_RESPONSE, - }, - }; - pktbuf_tail_call_compact(pkt, handle_response_tail_call_array); + if (found_command_complete) { + return 0; } + // We didn't find a command complete message, so we need to continue processing the packet. + // We save the current data offset and increment the iteration counter. + iteration_value->iteration += 1; + iteration_value->data_off = pktbuf_data_offset(pkt); + pktbuf_tail_call_option_t handle_response_tail_call_array[] = { + [PKTBUF_SKB] = { + .prog_array_map = &protocols_progs, + .index = PROG_POSTGRES_HANDLE_RESPONSE, + }, + [PKTBUF_TLS] = { + .prog_array_map = &tls_process_progs, + .index = PROG_POSTGRES_HANDLE_RESPONSE, + }, + }; + pktbuf_tail_call_compact(pkt, handle_response_tail_call_array); return 0; } From 3d8e492bf4c5fcdd90c381e40eabcdf593489373 Mon Sep 17 00:00:00 2001 From: "amit.slavin" Date: Mon, 4 Nov 2024 09:52:45 +0200 Subject: [PATCH 16/16] limit tail call when we reach max --- pkg/network/ebpf/c/protocols/postgres/decoding.h | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/network/ebpf/c/protocols/postgres/decoding.h b/pkg/network/ebpf/c/protocols/postgres/decoding.h index 35c8ab4710b5a..300ca6ba949a9 100644 --- a/pkg/network/ebpf/c/protocols/postgres/decoding.h +++ b/pkg/network/ebpf/c/protocols/postgres/decoding.h @@ -248,6 +248,12 @@ static __always_inline bool handle_response(pktbuf_t pkt, conn_tuple_t conn_tupl // We save the current data offset and increment the iteration counter. iteration_value->iteration += 1; iteration_value->data_off = pktbuf_data_offset(pkt); + + // If the maximum number of tail calls has been reached, we can skip invoking the next tail call. + if (iteration_value->iteration >= POSTGRES_MAX_TAIL_CALLS_FOR_MAX_MESSAGES) { + return 0; + } + pktbuf_tail_call_option_t handle_response_tail_call_array[] = { [PKTBUF_SKB] = { .prog_array_map = &protocols_progs,