diff --git a/src/plugins/input/tcp/CMakeLists.txt b/src/plugins/input/tcp/CMakeLists.txt index 65c067e3..e373f63b 100644 --- a/src/plugins/input/tcp/CMakeLists.txt +++ b/src/plugins/input/tcp/CMakeLists.txt @@ -10,6 +10,7 @@ add_library(tcp-input MODULE src/DecoderFactory.cpp src/Acceptor.cpp src/Plugin.cpp + src/IpfixDecoder.cpp ) install( diff --git a/src/plugins/input/tcp/src/IpfixDecoder.cpp b/src/plugins/input/tcp/src/IpfixDecoder.cpp new file mode 100644 index 00000000..8129f95f --- /dev/null +++ b/src/plugins/input/tcp/src/IpfixDecoder.cpp @@ -0,0 +1,95 @@ +/** + * \file + * \author Jakub Antonín Štigler + * \brief IPFIX decoder for tcp plugin (header file) + * \date 2024 + * + * Copyright: (C) 2023 CESNET, z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "IpfixDecoder.hpp" + +#include // runtime_error +#include // string +#include // ERRNO + +#include // read + +#include // fds_ipfix_msg_header, ipx_strerror + +#include "DecodeBuffer.hpp" // DecodeBuffer + +namespace tcp_in { + +using namespace std; + +DecodeBuffer &IpfixDecoder::decode() { + while (!m_decoded.enough_data()) { + if (!read_header()) { + break; + } + + if (!read_body()) { + break; + } + } + + return m_decoded; +} + +bool IpfixDecoder::read_header() { + if (m_msg_size != 0) { + return true; + } + + if (!read_to_size(sizeof(fds_ipfix_msg_hdr))) { + return false; + } + + auto hdr = reinterpret_cast(m_part_readed.get()); + m_msg_size = hdr->length; + return true; +} + +bool IpfixDecoder::read_body() { + if (!read_to_size(m_msg_size)) { + return false; + } + + m_decoded.add(m_part_readed); + m_msg_size = 0; + return true; +} + +bool IpfixDecoder::read_to_size(size_t size) { + auto filled = m_part_readed.size(); + auto remaining = size - filled; + m_part_readed.resize(size); + + int res = read(m_fd, m_part_readed.get() + filled, remaining); + if (res == -1) { + m_part_readed.resize(filled); + int err = errno; + if (err == EWOULDBLOCK || err == EAGAIN) { + return false; + } + + const char *err_str; + ipx_strerror(err, err_str); + throw runtime_error( + "IPFIX Decoder: Failed to read from descriptor: " + + string(err_str) + ); + } + + if (res != remaining) { + m_part_readed.resize(filled + res); + return false; + } + + return true; +} + +} // namespace tcp_in + diff --git a/src/plugins/input/tcp/src/IpfixDecoder.hpp b/src/plugins/input/tcp/src/IpfixDecoder.hpp index 8b6a0c20..6945ea1e 100644 --- a/src/plugins/input/tcp/src/IpfixDecoder.hpp +++ b/src/plugins/input/tcp/src/IpfixDecoder.hpp @@ -12,6 +12,7 @@ #include "Decoder.hpp" // Decoder #include "DecodeBuffer.hpp" // DecodeBuffer +#include "ByteVector.hpp" // ByteVector namespace tcp_in { @@ -22,13 +23,23 @@ class IpfixDecoder : public Decoder { * @brief Creates ipfix decoder. * @param fd TCP connection file descriptor. */ - IpfixDecoder(int fd) : fd(fd), decoded() {} + IpfixDecoder(int fd) : m_fd(fd), m_decoded(), m_part_readed() {} virtual DecodeBuffer &decode() override; private: - int fd; - DecodeBuffer decoded; + /** returns true if there was enough data to read the header */ + bool read_header(); + /** returns true if there was enough data to read the body */ + bool read_body(); + /** returns true if there was enough data to read to the given amount */ + bool read_to_size(size_t size); + + int m_fd; + DecodeBuffer m_decoded; + + ByteVector m_part_readed; + size_t m_msg_size; }; } // namespace tcp_in diff --git a/src/plugins/input/tcp/src/Plugin.cpp b/src/plugins/input/tcp/src/Plugin.cpp index c01e85ed..a4800677 100644 --- a/src/plugins/input/tcp/src/Plugin.cpp +++ b/src/plugins/input/tcp/src/Plugin.cpp @@ -1,3 +1,13 @@ +/** + * \file + * \author Jakub Antonín Štigler + * \brief Tcp input plugin for ipfixcol2 (source file) + * \date 2024 + * + * Copyright: (C) 2023 CESNET, z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + #include "Plugin.hpp" #include // array