From 2a06ef4f60c4c0efde6728a59fb26514c9e7d91c Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Mon, 30 Sep 2024 14:30:20 +0200 Subject: [PATCH] Improve tracking of connectionless UDP syscalls (#1741) --- .editorconfig | 4 + collector/lib/CollectorConfig.cpp | 14 +- collector/lib/CollectorConfig.h | 10 + collector/lib/NetworkSignalHandler.cpp | 35 +- collector/lib/NetworkSignalHandler.h | 2 + collector/lib/system-inspector/Service.cpp | 1 + falcosecurity-libs | 2 +- integration-tests/container/QA_TAG | 2 +- integration-tests/container/udp/Containerfile | 17 + integration-tests/container/udp/Makefile | 25 ++ integration-tests/container/udp/udp-client.c | 372 ++++++++++++++++++ integration-tests/container/udp/udp-server.c | 338 ++++++++++++++++ integration-tests/images.yml | 1 + integration-tests/integration_test.go | 8 + integration-tests/suites/udp_networkflow.go | 284 +++++++++++++ 15 files changed, 1110 insertions(+), 5 deletions(-) create mode 100644 integration-tests/container/udp/Containerfile create mode 100644 integration-tests/container/udp/Makefile create mode 100644 integration-tests/container/udp/udp-client.c create mode 100644 integration-tests/container/udp/udp-server.c create mode 100644 integration-tests/suites/udp_networkflow.go diff --git a/.editorconfig b/.editorconfig index bb1d2806ca..4aeab816f3 100644 --- a/.editorconfig +++ b/.editorconfig @@ -10,6 +10,10 @@ trim_trailing_whitespace = true indent_style = space indent_size = 2 +[integration-tests/**.c] +indent_style = space +indent_size = 2 + [*.sh] indent_style = space indent_size = 4 diff --git a/collector/lib/CollectorConfig.cpp b/collector/lib/CollectorConfig.cpp index a876c872f7..38c283c7f0 100644 --- a/collector/lib/CollectorConfig.cpp +++ b/collector/lib/CollectorConfig.cpp @@ -60,6 +60,8 @@ BoolEnvVar use_podman_ce("ROX_COLLECTOR_CE_USE_PODMAN", false); BoolEnvVar enable_introspection("ROX_COLLECTOR_INTROSPECTION_ENABLE", false); +BoolEnvVar track_send_recv("ROX_COLLECTOR_TRACK_SEND_RECV", false); + // Collector arguments alternatives StringEnvVar log_level("ROX_COLLECTOR_LOG_LEVEL"); IntEnvVar scrape_interval("ROX_COLLECTOR_SCRAPE_INTERVAL"); @@ -103,9 +105,16 @@ void CollectorConfig::InitCollectorConfig(CollectorArgs* args) { use_docker_ce_ = use_docker_ce.value(); use_podman_ce_ = use_podman_ce.value(); enable_introspection_ = enable_introspection.value(); + track_send_recv_ = track_send_recv.value(); for (const auto& syscall : kSyscalls) { - syscalls_.push_back(syscall); + syscalls_.emplace_back(syscall); + } + + if (track_send_recv_) { + for (const auto& syscall : kSendRecvSyscalls) { + syscalls_.emplace_back(syscall); + } } // Get hostname @@ -454,7 +463,8 @@ std::ostream& operator<<(std::ostream& os, const CollectorConfig& c) { << ", set_import_users:" << c.ImportUsers() << ", collect_connection_status:" << c.CollectConnectionStatus() << ", enable_detailed_metrics:" << c.EnableDetailedMetrics() - << ", enable_external_ips:" << c.EnableExternalIPs(); + << ", enable_external_ips:" << c.EnableExternalIPs() + << ", track_send_recv:" << c.TrackingSendRecv(); } // Returns size of ring buffers to be allocated. diff --git a/collector/lib/CollectorConfig.h b/collector/lib/CollectorConfig.h index 1d803ed1e3..e62cdf8dca 100644 --- a/collector/lib/CollectorConfig.h +++ b/collector/lib/CollectorConfig.h @@ -48,6 +48,14 @@ class CollectorConfig { #endif "vfork", }; + static constexpr const char* kSendRecvSyscalls[] = { + "sendto", + "sendmsg", + "sendmmsg", + "recvfrom", + "recvmsg", + "recvmmsg", + }; static const UnorderedSet kIgnoredL4ProtoPortPairs; static constexpr bool kEnableProcessesListeningOnPorts = true; @@ -82,6 +90,7 @@ class CollectorConfig { bool UseDockerCe() const { return use_docker_ce_; } bool UsePodmanCe() const { return use_podman_ce_; } bool IsIntrospectionEnabled() const { return enable_introspection_; } + bool TrackingSendRecv() const { return track_send_recv_; } const std::vector& GetConnectionStatsQuantiles() const { return connection_stats_quantiles_; } double GetConnectionStatsError() const { return connection_stats_error_; } unsigned int GetConnectionStatsWindow() const { return connection_stats_window_; } @@ -122,6 +131,7 @@ class CollectorConfig { bool use_docker_ce_; bool use_podman_ce_; bool enable_introspection_; + bool track_send_recv_; std::vector connection_stats_quantiles_; double connection_stats_error_; unsigned int connection_stats_window_; diff --git a/collector/lib/NetworkSignalHandler.cpp b/collector/lib/NetworkSignalHandler.cpp index 9e35003960..8a1f4e2f71 100644 --- a/collector/lib/NetworkSignalHandler.cpp +++ b/collector/lib/NetworkSignalHandler.cpp @@ -24,6 +24,17 @@ EventMap modifiers = { {"connect<", Modifier::ADD}, {"accept<", Modifier::ADD}, {"getsockopt<", Modifier::ADD}, + {"sendto<", Modifier::ADD}, + {"sendto>", Modifier::ADD}, + {"sendmsg<", Modifier::ADD}, + {"sendmsg>", Modifier::ADD}, + {"sendmmsg<", Modifier::ADD}, + {"recvfrom<", Modifier::ADD}, + {"recvfrom>", Modifier::ADD}, + {"recvmsg<", Modifier::ADD}, + {"recvmsg>", Modifier::ADD}, + {"recvmmsg<", Modifier::ADD}, + {"recvmmsg>", Modifier::ADD}, }, Modifier::INVALID, }; @@ -31,7 +42,7 @@ EventMap modifiers = { } // namespace NetworkSignalHandler::NetworkSignalHandler(sinsp* inspector, std::shared_ptr conn_tracker, system_inspector::Stats* stats) - : event_extractor_(std::make_unique()), conn_tracker_(std::move(conn_tracker)), stats_(stats), collect_connection_status_(true) { + : event_extractor_(std::make_unique()), conn_tracker_(std::move(conn_tracker)), stats_(stats), collect_connection_status_(true), track_send_recv_(false) { event_extractor_->Init(inspector); } @@ -141,6 +152,28 @@ SignalHandler::Result NetworkSignalHandler::HandleSignal(sinsp_evt* evt) { } std::vector NetworkSignalHandler::GetRelevantEvents() { + if (track_send_recv_) { + return { + "close<", + "shutdown<", + "connect<", + "accept<", + "getsockopt<", + "sendto<", + "sendto>", + "sendmsg<", + "sendmsg>", + "sendmmsg<", + "recvfrom<", + "recvfrom>", + "recvmsg<", + "recvmsg>", + "recvmmsg<", + "recvmmsg>", + "recvmsg<", + "recvmsg>", + }; + } return {"close<", "shutdown<", "connect<", "accept<", "getsockopt<"}; } diff --git a/collector/lib/NetworkSignalHandler.h b/collector/lib/NetworkSignalHandler.h index 79c77b3682..a6fbba0085 100644 --- a/collector/lib/NetworkSignalHandler.h +++ b/collector/lib/NetworkSignalHandler.h @@ -29,6 +29,7 @@ class NetworkSignalHandler final : public SignalHandler { bool Stop() override; void SetCollectConnectionStatus(bool collect_connection_status) { collect_connection_status_ = collect_connection_status; } + void SetTrackSendRecv(bool track_send_recv) { track_send_recv_ = track_send_recv; } private: std::optional GetConnection(sinsp_evt* evt); @@ -38,6 +39,7 @@ class NetworkSignalHandler final : public SignalHandler { system_inspector::Stats* stats_; bool collect_connection_status_; + bool track_send_recv_; }; } // namespace collector diff --git a/collector/lib/system-inspector/Service.cpp b/collector/lib/system-inspector/Service.cpp index 99b9fd2cbf..54a7a45a79 100644 --- a/collector/lib/system-inspector/Service.cpp +++ b/collector/lib/system-inspector/Service.cpp @@ -46,6 +46,7 @@ void Service::Init(const CollectorConfig& config, std::shared_ptr(inspector_.get(), conn_tracker, &userspace_stats_); network_signal_handler_->SetCollectConnectionStatus(config.CollectConnectionStatus()); + network_signal_handler_->SetTrackSendRecv(config.TrackingSendRecv()); AddSignalHandler(std::move(network_signal_handler_)); } diff --git a/falcosecurity-libs b/falcosecurity-libs index 40fbddbbb4..4984d2703a 160000 --- a/falcosecurity-libs +++ b/falcosecurity-libs @@ -1 +1 @@ -Subproject commit 40fbddbbb43330c1a289123989a53b3943498165 +Subproject commit 4984d2703a1c66513a1e4b994f7c11e6831aded2 diff --git a/integration-tests/container/QA_TAG b/integration-tests/container/QA_TAG index 6085e94650..227cea2156 100644 --- a/integration-tests/container/QA_TAG +++ b/integration-tests/container/QA_TAG @@ -1 +1 @@ -1.2.1 +2.0.0 diff --git a/integration-tests/container/udp/Containerfile b/integration-tests/container/udp/Containerfile new file mode 100644 index 0000000000..eb63b6e6d9 --- /dev/null +++ b/integration-tests/container/udp/Containerfile @@ -0,0 +1,17 @@ +FROM fedora:40 AS builder + +WORKDIR /tmp +COPY udp-server.c . +COPY udp-client.c . + +RUN dnf install -y gcc && \ + gcc udp-server.c -Wall -Wpedantic -Werror -o udp-server && \ + gcc udp-client.c -Wall -Wpedantic -Werror -o udp-client + +FROM fedora:40 + +COPY --from=builder /tmp/udp-server /usr/local/bin +COPY --from=builder /tmp/udp-client /usr/local/bin +EXPOSE 9090 + +ENTRYPOINT ["udp-server"] diff --git a/integration-tests/container/udp/Makefile b/integration-tests/container/udp/Makefile new file mode 100644 index 0000000000..69e827082a --- /dev/null +++ b/integration-tests/container/udp/Makefile @@ -0,0 +1,25 @@ +BASE_PATH = . +include ../Makefile-constants.mk + +.DEFAULT_GOAL = all + +COLLECTOR_QA_UDP_TAG := udp + +ifneq ($(COLLECTOR_QA_TAG),) +COLLECTOR_QA_UDP_TAG=udp-$(COLLECTOR_QA_TAG) +endif + +.PHONY: all +all: build + +.PHONY: build +build: + @docker buildx build --load --platform ${PLATFORM} \ + -t quay.io/rhacs-eng/qa-multi-arch:$(COLLECTOR_QA_UDP_TAG) \ + -f Containerfile . + +.PHONY: build-and-push +build-and-push: + @docker buildx build --push --platform ${PLATFORM} \ + -t quay.io/rhacs-eng/qa-multi-arch:$(COLLECTOR_QA_UDP_TAG) \ + -f Containerfile . diff --git a/integration-tests/container/udp/udp-client.c b/integration-tests/container/udp/udp-client.c new file mode 100644 index 0000000000..496b646c3a --- /dev/null +++ b/integration-tests/container/udp/udp-client.c @@ -0,0 +1,372 @@ +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +static bool running = true; +static const char LOREM_IPSUM[] = + "Lorem ipsum dolor sit amet, consectetur adipiscing elit. " + "Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. " + "Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. " + "Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. " + "Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n"; +static const int LOREM_IPSUM_LEN = sizeof(LOREM_IPSUM) / sizeof(char); +static const size_t IOVEC_N = 32; +static const size_t MMSGHDR_N = 32; + +typedef enum send_method_e { + SENDTO = 0, + SENDMSG = 1, + SENDMMSG = 2, +} send_method_t; + +typedef struct target_array_s { + struct addrinfo** ptr; + unsigned int len; +} target_array_t; + +typedef struct config_s { + int ai_family; + int period; + send_method_t syscall; + unsigned int msgs_per_target; +} config_t; + +typedef struct run_data_s { + send_method_t syscall; + target_array_t targets; + int fd; + int period; + unsigned int msgs_per_target; +} run_data_t; + +void usage(char* prog) { + printf("%s [FLAGS] [... IP]\n", prog); + printf("\n:\n\tThe IP of the server to send messages to.\n\tCan specify multiple destinations."); + printf("\n[FLAGS]:\n"); + printf(" -6, --ipv6\n\tUse IPv6 instead of IPv4, the provided IP must be in the correct format\n\n"); + printf(" -h, --help\n\tShow this help message\n\n"); + printf(" -p, --period=2\n\tPeriod used to send data\n\n"); + printf(" -s, --syscall=\"sendto\"\n\tSyscall to be used for sending messages\n\tOne of: sendto, sendmsg, sendmmsg\n\n"); + printf(" -m, --messages=32\n\tMessages to send to each target when using sendmmsg\n\n"); +} + +void handle_stop(int sig) { + running = false; +} + +void iovec_free(struct iovec* iov) { + free(iov); +} + +struct iovec* iovec_new() { + struct iovec* iov = calloc(IOVEC_N, sizeof(struct iovec)); + if (iov == NULL) { + return NULL; + } + + for (size_t i = 0; i < IOVEC_N; i++) { + iov[i].iov_base = (void*)LOREM_IPSUM; + iov[i].iov_len = LOREM_IPSUM_LEN; + } + + return iov; +} + +void mmsghdr_free(struct mmsghdr* mmh, unsigned int len, unsigned int msgs_per_target) { + if (mmh == NULL) { + return; + } + + for (int i = 0; i < len * msgs_per_target; i++) { + iovec_free(mmh[i].msg_hdr.msg_iov); + } + + free(mmh); +} + +struct mmsghdr* mmsghdr_new(target_array_t* targets, unsigned int msgs_per_target) { + assert(targets != NULL); + + struct mmsghdr* mmh = calloc(targets->len * msgs_per_target, sizeof(struct mmsghdr)); + if (mmh == NULL) { + return NULL; + } + + for (int i = 0; i < targets->len; i++) { + struct addrinfo* server = targets->ptr[i]; + + for (int j = 0; j < msgs_per_target; j++) { + struct msghdr* mh = &mmh[j + (i * msgs_per_target)].msg_hdr; + mh->msg_iov = iovec_new(); + if (mh->msg_iov == NULL) { + goto fail; + } + mh->msg_iovlen = IOVEC_N; + mh->msg_name = server->ai_addr; + mh->msg_namelen = server->ai_addrlen; + } + } + + return mmh; + +fail: + mmsghdr_free(mmh, targets->len, msgs_per_target); + return NULL; +} + +ssize_t send_sendto(run_data_t* data) { + assert(data != NULL); + + for (int i = 0; i < data->targets.len; i++) { + struct addrinfo* addr = data->targets.ptr[i]; + + if (sendto(data->fd, LOREM_IPSUM, LOREM_IPSUM_LEN, 0, addr->ai_addr, addr->ai_addrlen) < 0) { + return errno; + } + } + return 0; +} + +ssize_t send_sendmsg(run_data_t* data) { + assert(data != NULL); + + int err = 0; + for (int i = 0; i < data->targets.len; i++) { + struct addrinfo* addr = data->targets.ptr[i]; + + struct iovec* iov = iovec_new(); + if (iov == NULL) { + return ENOMEM; + } + + struct msghdr mh = { + .msg_name = addr->ai_addr, + .msg_namelen = addr->ai_addrlen, + .msg_iov = iov, + .msg_iovlen = IOVEC_N, + }; + + if (sendmsg(data->fd, &mh, 0) < 0) { + err = errno; + } + + iovec_free(iov); + + if (err != 0) { + break; + } + } + return err; +} + +ssize_t send_sendmmsg(run_data_t* data) { + assert(data != NULL); + + // size that mmh will have on return from mmsghdr_new. + unsigned int vlen = data->targets.len * data->msgs_per_target; + struct mmsghdr* mmh = mmsghdr_new(&data->targets, data->msgs_per_target); + if (mmh == NULL) { + return ENOMEM; + } + + int err = 0; + if (sendmmsg(data->fd, mmh, vlen, 0) < 0) { + err = errno; + } + + mmsghdr_free(mmh, data->targets.len, data->msgs_per_target); + return err; +} + +ssize_t m_send(run_data_t* data) { + printf("Sending data...\n"); + switch (data->syscall) { + case SENDTO: + return send_sendto(data); + case SENDMSG: + return send_sendmsg(data); + case SENDMMSG: + return send_sendmmsg(data); + default: + fprintf(stderr, "Invalid syscall\n"); + return -1; + } +} + +struct addrinfo** get_targets(int argc, char** argv, int n_servers, int ai_family) { + struct addrinfo** targets = (struct addrinfo**)calloc(n_servers, sizeof(struct addrinfo*)); + if (targets == NULL) { + fprintf(stderr, "Failed to reserve memory for target servers: (%d) %s", errno, strerror(errno)); + return NULL; + } + + struct addrinfo hints = { + .ai_family = ai_family, + .ai_socktype = SOCK_DGRAM, + }; + + for (int i = 0; i < n_servers; i++) { + char* ip = argv[optind + i]; + char* port = NULL; + if (ai_family == AF_INET6) { + port = strrchr(ip, ']'); + if (port == NULL) { + port = "9090"; + } else if (*ip != '[' || *(port + 1) != ':') { + fprintf(stderr, "Invalid IPv6 address\n"); + goto error; + } else { + ip++; + *port = '\0'; + port += 2; + } + } else { + port = strrchr(ip, ':'); + if (port != NULL) { + *port = '\0'; + port++; + } else { + port = "9090"; + } + } + + struct addrinfo* servinfo = NULL; + ssize_t ret = getaddrinfo(ip, port, &hints, &servinfo); + + if (ret != 0) { + fprintf(stderr, "getaddrinfo failed: %zd\n", ret); + goto error; + } + + targets[i] = servinfo; + } + + return targets; + +error: + free(targets); + return NULL; +} + +int run(run_data_t* data) { + assert(data != NULL); + + while (running) { + ssize_t ret = m_send(data); + if (ret != 0) { + fprintf(stderr, "send failed: (%zd) %s\n", ret, strerror(ret)); + return -1; + } + + sleep(data->period); + } + return 0; +} + +int main(int argc, char* argv[]) { + struct option options[] = { + {.name = "ipv6", .has_arg = no_argument, .flag = NULL, .val = '6'}, + {.name = "help", .has_arg = no_argument, .flag = NULL, .val = 'h'}, + {.name = "period", .has_arg = required_argument, .flag = NULL, .val = 'p'}, + {.name = "syscall", .has_arg = required_argument, .flag = NULL, .val = 's'}, + {.name = "messages", .has_arg = required_argument, .flag = NULL, .val = 'm'}, + {.name = 0, .has_arg = 0, .flag = 0, .val = 0}}; + int opt = 0; + config_t config = { + .ai_family = AF_INET, + .period = 2, + .syscall = SENDTO, + .msgs_per_target = MMSGHDR_N, + }; + + while ((opt = getopt_long(argc, argv, "6hp:s:m:", options, NULL)) != -1) { + switch (opt) { + case '6': + config.ai_family = AF_INET6; + break; + + case 'h': + usage(argv[0]); + return 0; + + case 'p': + config.period = atoi(optarg); + break; + + case 's': + if (strcmp(optarg, "sendto") == 0) { + config.syscall = SENDTO; + } else if (strcmp(optarg, "sendmsg") == 0) { + config.syscall = SENDMSG; + } else if (strcmp(optarg, "sendmmsg") == 0) { + config.syscall = SENDMMSG; + } else { + fprintf(stderr, "Unknown send method: %s\n", optarg); + usage(argv[0]); + return -1; + } + break; + + case 'm': + config.msgs_per_target = atoi(optarg); + break; + + default: + fprintf(stderr, "Unknown option %s\n", argv[optind]); + usage(argv[0]); + return -1; + } + } + + int n_servers = argc - optind; + if (n_servers <= 0) { + fprintf(stderr, "Missing required argument\n"); + usage(argv[0]); + return -1; + } + + struct addrinfo** targets = get_targets(argc, argv, n_servers, config.ai_family); + if (targets == NULL) { + return -1; + } + + int fd = socket(config.ai_family, SOCK_DGRAM, 0); + if (fd < 0) { + fprintf(stderr, "Failed to create socket: (%d) %s\n", errno, strerror(errno)); + return -1; + } + + run_data_t run_data = { + .syscall = config.syscall, + .targets = {.ptr = targets, .len = n_servers}, + .fd = fd, + .period = config.period, + .msgs_per_target = config.msgs_per_target, + }; + + signal(SIGTERM, handle_stop); + signal(SIGINT, handle_stop); + + int ret = run(&run_data); + + for (int i = 0; i < run_data.targets.len; i++) { + struct addrinfo* addr = run_data.targets.ptr[i]; + freeaddrinfo(addr); + close(run_data.fd); + } + free(run_data.targets.ptr); + + return ret; +} diff --git a/integration-tests/container/udp/udp-server.c b/integration-tests/container/udp/udp-server.c new file mode 100644 index 0000000000..11fffff2b3 --- /dev/null +++ b/integration-tests/container/udp/udp-server.c @@ -0,0 +1,338 @@ +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +static const size_t BUF_SIZE = 4096; +static const size_t IOVEC_N = 32; +static const size_t MMSGHDR_N = 32; +static bool running = true; + +typedef enum recv_method_e { + RECVFROM = 0, + RECVMSG = 1, + RECVMMSG = 2, +} recv_method_t; + +void usage(char* prog) { + printf("%s [FLAGS]\n", prog); + printf("\nFLAGS:\n"); + printf(" -6, --ipv6:\n\tUse IPv6 instead of IPv4\n\n"); + printf(" -h, --help:\n\tShow this help message\n\n"); + printf(" -p, --port=\"9090\":\n\tUse the specified port\n\n"); + printf(" -s, --syscall=\"recvfrom\":\n\tSyscall to be used for receiving messages\n\tOne of: recvfrom, recvmsg, recvmmsg\n\n"); +} + +void handle_stop(int sig) { + running = false; +} + +void* get_in_addr(struct sockaddr* sa) { + if (sa->sa_family == AF_INET) { + return &(((struct sockaddr_in*)sa)->sin_addr); + } + return &(((struct sockaddr_in6*)sa)->sin6_addr); +} + +void iovec_free(struct iovec* iov) { + if (iov == NULL) { + return; + } + + for (int i = 0; i < IOVEC_N; i++) { + free(iov[i].iov_base); + } + free(iov); +} + +struct iovec* iovec_new() { + struct iovec* ret = calloc(IOVEC_N, sizeof(struct iovec)); + if (ret == NULL) { + return NULL; + } + + for (int i = 0; i < IOVEC_N; i++) { + struct iovec* iov = &ret[i]; + iov->iov_base = malloc(BUF_SIZE + 1); + if (iov->iov_base == NULL) { + goto fail; + } + iov->iov_len = BUF_SIZE; + } + + return ret; + +fail: + iovec_free(ret); + return NULL; +} + +void mmsghdr_free(struct mmsghdr* mmh) { + if (mmh == NULL) { + return; + } + + for (int i = 0; i < MMSGHDR_N; i++) { + struct msghdr* mh = &mmh[i].msg_hdr; + free(mh->msg_name); + iovec_free(mh->msg_iov); + } + + free(mmh); +} + +struct mmsghdr* mmsghdr_new() { + struct mmsghdr* mmh = calloc(MMSGHDR_N, sizeof(struct mmsghdr)); + if (mmh == NULL) { + return NULL; + } + + for (int i = 0; i < MMSGHDR_N; i++) { + struct msghdr* mh = &mmh[i].msg_hdr; + mh->msg_namelen = sizeof(struct sockaddr); + mh->msg_name = malloc(mh->msg_namelen); + if (mh->msg_name == NULL) { + goto fail; + } + + mh->msg_iov = iovec_new(); + if (mh->msg_iov == NULL) { + goto fail; + } + mh->msg_iovlen = IOVEC_N; + } + + return mmh; + +fail: + mmsghdr_free(mmh); + return NULL; +} + +void print_msg(struct msghdr* mh, ssize_t remaining) { + struct sockaddr* from = mh->msg_name; + struct iovec* iov = mh->msg_iov; + char s[INET6_ADDRSTRLEN]; + + for (int i = 0; i < mh->msg_iovlen && remaining >= 0; i++) { + char* buf = iov[i].iov_base; + size_t terminator_index = remaining < iov[i].iov_len ? remaining : iov[i].iov_len; + buf[terminator_index] = '\0'; + + printf("%s: %s\n", inet_ntop(from->sa_family, get_in_addr(from), s, sizeof(s)), buf); + fflush(stdout); + + remaining -= iov[i].iov_len; + } +} + +ssize_t receive_recvfrom(int fd) { + char buf[BUF_SIZE + 1]; + char s[INET6_ADDRSTRLEN]; + struct sockaddr from; + socklen_t fromlen = sizeof(from); + ssize_t res = 0; + + res = recvfrom(fd, buf, BUF_SIZE, 0, &from, &fromlen); + if (res < 0) { + if (errno == EINTR || errno == EAGAIN) { + return 0; + } + return errno; + } + + buf[res] = '\0'; + printf("%s: %s\n", inet_ntop(from.sa_family, get_in_addr(&from), s, sizeof(s)), buf); + fflush(stdout); + + return 0; +} + +ssize_t receive_recvmsg(int fd) { + ssize_t res = 0; + int err = 0; + struct sockaddr from; + socklen_t fromlen = sizeof(from); + struct iovec* iov = iovec_new(); + if (iov == NULL) { + return ENOMEM; + } + struct msghdr mh = { + .msg_name = &from, + .msg_namelen = fromlen, + .msg_iov = iov, + .msg_iovlen = IOVEC_N, + }; + + res = recvmsg(fd, &mh, 0); + if (res < 0) { + if (errno != EINTR && errno != EAGAIN) { + err = errno; + } + goto end; + } + + print_msg(&mh, res); + +end: + iovec_free(iov); + return err; +} + +ssize_t receive_recvmmsg(int fd) { + ssize_t res = 0; + int err = 0; + struct mmsghdr* mmh = mmsghdr_new(); + if (mmh == NULL) { + return ENOMEM; + } + + res = recvmmsg(fd, mmh, MMSGHDR_N, 0, NULL); + if (res < 0) { + if (errno != EINTR && errno != EAGAIN) { + err = errno; + } + goto end; + } + + for (int i = 0; i < res; i++) { + struct msghdr* mh = &mmh[i].msg_hdr; + ssize_t size = mmh[i].msg_len; + + print_msg(mh, size); + } + +end: + mmsghdr_free(mmh); + return err; +} + +ssize_t receive(int fd, recv_method_t syscall) { + switch (syscall) { + case RECVFROM: + return receive_recvfrom(fd); + case RECVMSG: + return receive_recvmsg(fd); + case RECVMMSG: + return receive_recvmmsg(fd); + default: + fprintf(stderr, "Invalid syscall\n"); + return -1; + } +} + +int run(int ai_family, const char* port, recv_method_t syscall) { + int fd = -1; + struct addrinfo hints = { + .ai_family = ai_family, + .ai_socktype = SOCK_DGRAM, + .ai_flags = AI_PASSIVE, + }; + struct addrinfo* res = NULL; + ssize_t ret = getaddrinfo(NULL, port, &hints, &res); + + if (ret != 0) { + fprintf(stderr, "getaddrinfo failed: %zd\n", ret); + return -1; + } + + struct addrinfo* p = res; + for (; p != NULL; p = p->ai_next) { + fd = socket(p->ai_family, p->ai_socktype, p->ai_protocol); + if (fd < 0) { + fprintf(stderr, "Failed to create socket: (%d) %s\n", errno, strerror(errno)); + continue; + } + + if (bind(fd, res->ai_addr, res->ai_addrlen) != 0) { + fprintf(stderr, "bind failed: (%d) %s\n", errno, strerror(errno)); + continue; + } + + break; + } + + if (p == NULL) { + fprintf(stderr, "failed to create socket\n"); + return -1; + } + + freeaddrinfo(res); + + struct timeval tv = { + .tv_sec = 1, .tv_usec = 0}; + setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval)); + + while (running) { + ret = receive(fd, syscall); + if (ret < 0) { + fprintf(stderr, "receive failed: (%zd) %s\n", ret, strerror(ret)); + continue; + } + } + + close(fd); + return 0; +} + +int main(int argc, char* argv[]) { + struct option options[] = { + {.name = "ipv6", .has_arg = no_argument, .flag = NULL, .val = '6'}, + {.name = "help", .has_arg = no_argument, .flag = NULL, .val = 'h'}, + {.name = "port", .has_arg = required_argument, .flag = NULL, .val = 'p'}, + {.name = "syscall", .has_arg = required_argument, .flag = NULL, .val = 's'}, + {.name = 0, .has_arg = 0, .flag = 0, .val = 0}}; + int opt = 0; + int ai_family = AF_INET; + char* port = "9090"; + recv_method_t syscall = RECVFROM; + + while ((opt = getopt_long(argc, argv, "6hp:s:", options, NULL)) != -1) { + switch (opt) { + case '6': + ai_family = AF_INET6; + break; + case 'h': + usage(argv[0]); + return 0; + case 'p': + port = optarg; + printf("Using port %s\n", port); + break; + case 's': + if (strcmp(optarg, "recvfrom") == 0) { + syscall = RECVFROM; + } else if (strcmp(optarg, "recvmsg") == 0) { + syscall = RECVMSG; + } else if (strcmp(optarg, "recvmmsg") == 0) { + syscall = RECVMMSG; + } else { + fprintf(stderr, "Unknown receive method: %s\n", optarg); + usage(argv[0]); + return -1; + } + + break; + default: + fprintf(stderr, "Unknown option %s\n", argv[optind]); + usage(argv[0]); + return -1; + } + } + + signal(SIGTERM, handle_stop); + signal(SIGINT, handle_stop); + + return run(ai_family, port, syscall); +} diff --git a/integration-tests/images.yml b/integration-tests/images.yml index b89115787c..a1e6df783d 100644 --- a/integration-tests/images.yml +++ b/integration-tests/images.yml @@ -13,6 +13,7 @@ qa: qa-schedule-curls: quay.io/rhacs-eng/qa-multi-arch:collector-schedule-curls qa-alpine-curl: quay.io/rhacs-eng/qa-multi-arch:alpine-curl qa-perf-event-open: quay.io/rhacs-eng/qa-multi-arch:collector-perf-event-open + qa-udp: quay.io/rhacs-eng/qa-multi-arch:udp non_qa: nginx: nginx:1.14-alpine diff --git a/integration-tests/integration_test.go b/integration-tests/integration_test.go index d63f6ea1ac..5f6015e099 100644 --- a/integration-tests/integration_test.go +++ b/integration-tests/integration_test.go @@ -1,6 +1,7 @@ package integrationtests import ( + "strings" "testing" "github.com/stretchr/testify/suite" @@ -534,3 +535,10 @@ func TestGperftools(t *testing.T) { func TestRingBuffer(t *testing.T) { suite.Run(t, new(suites.RingBufferTestSuite)) } + +func TestUdpNetworkFlow(t *testing.T) { + if strings.Contains(config.VMInfo().Config, "rhel-8-4-sap") { + t.Skip("Skipping test on RHEL 8.4 SAP due to a verifier issue") + } + suite.Run(t, new(suites.UdpNetworkFlow)) +} diff --git a/integration-tests/suites/udp_networkflow.go b/integration-tests/suites/udp_networkflow.go new file mode 100644 index 0000000000..adfd664d62 --- /dev/null +++ b/integration-tests/suites/udp_networkflow.go @@ -0,0 +1,284 @@ +package suites + +import ( + "fmt" + "strconv" + "time" + + "github.com/stackrox/collector/integration-tests/pkg/collector" + "github.com/stackrox/collector/integration-tests/pkg/common" + "github.com/stackrox/collector/integration-tests/pkg/config" + "github.com/stackrox/collector/integration-tests/pkg/log" + "github.com/stackrox/collector/integration-tests/pkg/types" +) + +const ( + UDP_CLIENT = "udp-client" + UDP_SERVER = "udp-server" + + // The number of containers to be created in + // multi destination/source tests + CONTAINER_COUNT = 3 +) + +type UdpNetworkFlow struct { + IntegrationTestSuiteBase + DNSEnabled bool +} + +type containerData struct { + id string + ip string + port uint16 +} + +func (c *containerData) String() string { + out := fmt.Sprintf("%s: %s", c.id, c.ip) + if c.port != 0 { + out += fmt.Sprintf(":%d", c.port) + } + + return out +} + +func (s *UdpNetworkFlow) SetupSuite() { + // The network needs to be removed after the containers, so its + // the first cleanup we register. + s.RegisterCleanup(UDP_CLIENT, UDP_SERVER) + s.StartContainerStats() + collectorOptions := collector.StartupOptions{ + Env: map[string]string{ + "ROX_COLLECTOR_TRACK_SEND_RECV": "true", + }, + } + s.StartCollector(false, &collectorOptions) + + image := config.Images().QaImageByKey("qa-udp") + + err := s.Executor().PullImage(image) + s.Require().NoError(err) +} + +func (s *UdpNetworkFlow) AfterTest(suiteName, testName string) { + containers := []string{ + UDP_CLIENT, + UDP_SERVER, + } + + for i := 0; i < CONTAINER_COUNT; i++ { + containers = append(containers, fmt.Sprintf("%s-%d", UDP_SERVER, i)) + } + + for i := 0; i < CONTAINER_COUNT; i++ { + containers = append(containers, fmt.Sprintf("%s-%d", UDP_CLIENT, i)) + } + s.cleanupContainers(containers...) +} + +func (s *UdpNetworkFlow) TearDownSubTest() { + s.cleanupContainers(UDP_CLIENT, UDP_SERVER) +} + +func (s *UdpNetworkFlow) TearDownSuite() { + s.WritePerfResults() +} + +func (s *UdpNetworkFlow) TestUdpNetorkflow() { + sendSyscalls := []string{"sendto", "sendmsg", "sendmmsg"} + recvSyscalls := []string{"recvfrom", "recvmsg", "recvmmsg"} + image := config.Images().QaImageByKey("qa-udp") + + port := uint16(9090) + for _, send := range sendSyscalls { + for _, recv := range recvSyscalls { + testName := fmt.Sprintf("%s_%s", send, recv) + s.Run(testName, func() { + s.runTest(image, recv, send, port) + }) + + port++ + } + } +} + +func (s *UdpNetworkFlow) runTest(image, recv, send string, port uint16) { + server := s.runServer(config.ContainerStartConfig{ + Name: UDP_SERVER, + Image: image, + Command: newServerCmd(recv, port), + }, port) + client := s.runClient(config.ContainerStartConfig{ + Name: UDP_CLIENT, + Image: image, + Command: newClientCmd(send, "", "", server), + Entrypoint: []string{"udp-client"}, + }) + log.Info("Server: %s - Client: %s\n", server.String(), client.String()) + + // Expected client connection + clientConnection := types.NetworkInfo{ + LocalAddress: "", + RemoteAddress: fmt.Sprintf("%s:%d", server.ip, server.port), + Role: "ROLE_CLIENT", + SocketFamily: "SOCKET_FAMILY_UNKNOWN", + CloseTimestamp: types.NilTimestamp, + } + + // Expected server connection + serverConnection := types.NetworkInfo{ + LocalAddress: fmt.Sprintf(":%d", server.port), + RemoteAddress: client.ip, + Role: "ROLE_SERVER", + SocketFamily: "SOCKET_FAMILY_UNKNOWN", + CloseTimestamp: types.NilTimestamp, + } + + s.Sensor().ExpectConnections(s.T(), client.id, 5*time.Second, clientConnection) + s.Sensor().ExpectConnections(s.T(), server.id, 5*time.Second, serverConnection) +} + +func (s *UdpNetworkFlow) TestMultipleDestinations() { + image := config.Images().QaImageByKey("qa-udp") + + servers := make([]containerData, CONTAINER_COUNT) + clientConnections := make([]types.NetworkInfo, CONTAINER_COUNT) + for i := 0; i < CONTAINER_COUNT; i++ { + name := fmt.Sprintf("%s-%d", UDP_SERVER, i) + port := uint16(9000 + i) + servers[i] = s.runServer(config.ContainerStartConfig{ + Name: name, + Image: image, + Command: newServerCmd("recvfrom", port), + }, port) + log.Info("Server: %s\n", servers[i].String()) + + // Load the client connection collector has to send for this server. + clientConnections[i] = types.NetworkInfo{ + LocalAddress: "", + RemoteAddress: fmt.Sprintf("%s:%d", servers[i].ip, servers[i].port), + Role: "ROLE_CLIENT", + SocketFamily: "SOCKET_FAMILY_UNKNOWN", + CloseTimestamp: types.NilTimestamp, + } + } + + // We give a big period here to ensure the syscall happens just once + // Due to an implementation restriction, the total number of messages + // sent must be less than 32. + client := s.runClient(config.ContainerStartConfig{ + Name: UDP_CLIENT, + Image: image, + Command: newClientCmd("sendmmsg", "300", "8", servers...), + Entrypoint: []string{"udp-client"}, + }) + log.Info("Client: %s\n", client.String()) + + for _, server := range servers { + serverConnection := types.NetworkInfo{ + LocalAddress: fmt.Sprintf(":%d", server.port), + RemoteAddress: client.ip, + Role: "ROLE_SERVER", + SocketFamily: "SOCKET_FAMILY_UNKNOWN", + CloseTimestamp: types.NilTimestamp, + } + s.Sensor().ExpectConnections(s.T(), server.id, 5*time.Second, serverConnection) + } + s.Sensor().ExpectConnections(s.T(), client.id, 5*time.Second, clientConnections...) +} + +func (s *UdpNetworkFlow) TestMultipleSources() { + image := config.Images().QaImageByKey("qa-udp") + port := uint16(9100) + + server := s.runServer(config.ContainerStartConfig{ + Name: UDP_SERVER, + Image: image, + Command: newServerCmd("recvmmsg", port), + }, port) + log.Info("Server: %s\n", server.String()) + + clients := make([]containerData, CONTAINER_COUNT) + serverConnections := make([]types.NetworkInfo, CONTAINER_COUNT) + for i := 0; i < CONTAINER_COUNT; i++ { + name := fmt.Sprintf("%s-%d", UDP_CLIENT, i) + clients[i] = s.runClient(config.ContainerStartConfig{ + Name: name, + Image: image, + Command: newClientCmd("sendto", "300", "", server), + Entrypoint: []string{"udp-client"}, + }) + log.Info("Client: %s\n", clients[i].String()) + + // Load the server connection collector has to send for this client. + serverConnections[i] = types.NetworkInfo{ + LocalAddress: fmt.Sprintf(":%d", server.port), + RemoteAddress: clients[i].ip, + Role: "ROLE_SERVER", + SocketFamily: "SOCKET_FAMILY_UNKNOWN", + CloseTimestamp: types.NilTimestamp, + } + } + + clientConnection := types.NetworkInfo{ + LocalAddress: "", + RemoteAddress: fmt.Sprintf("%s:%d", server.ip, server.port), + Role: "ROLE_CLIENT", + SocketFamily: "SOCKET_FAMILY_UNKNOWN", + CloseTimestamp: types.NilTimestamp, + } + + for _, client := range clients { + s.Sensor().ExpectConnections(s.T(), client.id, 5*time.Second, clientConnection) + } + s.Sensor().ExpectConnections(s.T(), server.id, 5*time.Second, serverConnections...) +} + +func newServerCmd(recv string, port uint16) []string { + return []string{ + "--syscall", recv, + "--port", strconv.FormatUint(uint64(port), 10), + } +} + +func (s *UdpNetworkFlow) runServer(cfg config.ContainerStartConfig, port uint16) containerData { + return s.runContainer(cfg, port) +} + +func newClientCmd(send, period, msgs string, servers ...containerData) []string { + cmd := []string{ + "--syscall", send, + } + + if period != "" { + cmd = append(cmd, "--period", period) + } + + if msgs != "" { + cmd = append(cmd, "--messages", msgs) + } + + for _, server := range servers { + serverLocation := server.ip + cmd = append(cmd, fmt.Sprintf("%s:%d", serverLocation, server.port)) + } + + return cmd +} + +func (s *UdpNetworkFlow) runClient(cfg config.ContainerStartConfig) containerData { + return s.runContainer(cfg, 0) +} + +func (s *UdpNetworkFlow) runContainer(cfg config.ContainerStartConfig, port uint16) containerData { + id, err := s.Executor().StartContainer(cfg) + s.Require().NoError(err) + + ip, err := s.getIPAddress(cfg.Name) + s.Require().NoError(err) + + return containerData{ + id: common.ContainerShortID(id), + ip: ip, + port: port, + } +}