Skip to content

Commit

Permalink
feat: support setting udp buffer size in run time
Browse files Browse the repository at this point in the history
  • Loading branch information
birnevogel11 authored and aigarius committed Apr 12, 2024
1 parent da7de2c commit 60c0c74
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 5 deletions.
72 changes: 69 additions & 3 deletions dlt/dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@
DLT_EMPTY_FILE_ERROR = "DLT TRACE FILE IS EMPTY"
cDLT_FILE_NOT_OPEN_ERROR = "Could not open DLT Trace file (libdlt)" # pylint: disable=invalid-name

DLT_UDP_MULTICAST_FD_BUFFER_SIZE = int(os.environ.get("PYDLT_UDP_MULTICAST_FD_BUFFER_SIZE", 2 * (2**20))) # 2 Mb
DLT_UDP_MULTICAST_BUFFER_SIZE = int(os.environ.get("PYDLT_UDP_MULTICAST_BUFFER_SIZE", 8 * (2**20))) # 8 Mb


class cached_property(object): # pylint: disable=invalid-name
"""
Expand Down Expand Up @@ -865,13 +868,23 @@ def __len__(self):


class DLTClient(cDltClient):
"""DLTClient class takes care about correct initialization and
cleanup
"""
"""DLTClient class takes care about correct initialization and cleanup"""

verbose = 0

def __init__(self, **kwords):
"""Initialize a DLTClient.
:param servIP: Optional[str] - dlt server IP.
:param hostIP: Optional[str] - Only available for udp multicast mode.
Set host interface address.
:param port: Optional[int] - dlt tcp daemon port.
:param verbose: Optional[bool] - Enable verbose output.
:param udp_fd_buffer_size_bytes: Optional[int] - Only available for udp
multicast mode. Set the UDP buffer size through setsockopt (unit: bytes).
:param udp_buffer_size_bytes: Optional[int] - Only available for udp
multicast mode. Set the DltReceiver's buffer size (unit: bytes).
"""
self.is_udp_multicast = False
self.verbose = kwords.pop("verbose", 0)
if dltlib.dlt_client_init(ctypes.byref(self), self.verbose) == DLT_RETURN_ERROR:
Expand Down Expand Up @@ -915,6 +928,9 @@ def __init__(self, **kwords):
# it ourselves elsewhere
self.port = kwords.get("port", DLT_DAEMON_TCP_PORT)

self._udp_fd_buffer_size_bytes = kwords.get("udp_fd_buffer_size_bytes", DLT_UDP_MULTICAST_FD_BUFFER_SIZE)
self._udp_buffer_size_bytes = kwords.get("udp_buffer_size_bytes", DLT_UDP_MULTICAST_BUFFER_SIZE)

def connect(self, timeout=None):
"""Connect to the server
Expand Down Expand Up @@ -973,7 +989,9 @@ def connect(self, timeout=None):
else:
if self.verbose:
logger.info("Connecting DLTClient using UDP Connection")

connected = dltlib.dlt_client_connect(ctypes.byref(self), self.verbose)
self._set_udp_multicast_buffer_size()

if self.verbose:
logger.info("DLT Connection return: %s", connected)
Expand Down Expand Up @@ -1051,6 +1069,54 @@ def client_loop(self):
dltlib.dlt_client_register_message_callback(self.msg_callback)
dltlib.dlt_client_main_loop(ctypes.byref(self), None, self.verbose)

def _set_udp_multicast_buffer_size(self, custom_fd_buffer_size_bytes=None, custom_buffer_size_bytes=None) -> None:
fd_buffer_size = int(self._udp_fd_buffer_size_bytes or custom_fd_buffer_size_bytes or 0)
buffer_size_bytes = int(self._udp_buffer_size_bytes or custom_buffer_size_bytes or 0)

if fd_buffer_size:
# Socket options are associated with an open file description. This
# means that file descriptors duplicated as a consequence of dup()
# (or similar) or fork() share the same set of socket options.
# -- Chapter 61.9 Socket Options.
# The Linux Programming Interface, p.1279
#
# The buffer size can be changed with a new fd which is created by
# dup system call (it's the internal implementation in
# `socket.fromfd`), so the code creates a socket instance first
# configures it and directly close it.
with socket.fromfd(self.sock, socket.AF_INET, socket.SOCK_DGRAM) as conf_socket:
logger.debug("Set UDP Multicast socket buffer size: %s kbytes", fd_buffer_size / 1024)
conf_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, fd_buffer_size)

real_buffer_size = int(conf_socket.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF) / 2)
if real_buffer_size != fd_buffer_size:
logger.warning(
(
"Failed to set UDP Multicast buffer size. set_size: %s, real_size: %s. "
"Bypass the error and continue"
),
fd_buffer_size / 1024,
real_buffer_size / 1024,
)
logger.warning(
(
"Please run command `sysctl -w net.core.rmem_max=%s` with root permission to "
"set the maximum size and restart dlt again."
),
fd_buffer_size,
)

if buffer_size_bytes:
logger.debug("Set UDP Multicast DltReceiver buffer size: %s kbytes", buffer_size_bytes / 1024)
ret = dltlib.dlt_receiver_init(
ctypes.byref(self.receiver), self.sock, self.receiver.type, buffer_size_bytes
)
if ret < 0:
raise RuntimeError(
f"Failed to set UDP Multicast DltReceiver buffer size. return code: {ret}, "
f"buffer_size_bytes: {buffer_size_bytes}"
)


def py_dlt_file_main_loop(dlt_reader, limit=None, callback=None):
"""Main loop to read dlt messages from dlt file."""
Expand Down
12 changes: 11 additions & 1 deletion dlt/dlt_broker_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from dlt.dlt import (
DLTClient,
DLT_DAEMON_TCP_PORT,
DLT_UDP_MULTICAST_BUFFER_SIZE,
DLT_UDP_MULTICAST_FD_BUFFER_SIZE,
cDLT_FILE_NOT_OPEN_ERROR,
load,
py_dlt_client_main_loop,
Expand Down Expand Up @@ -402,6 +404,8 @@ def __init__(
self.tracefile = None
self.last_connected = time.time()
self.last_message = time.time() - 120.0
self._udp_fd_buffer_size_bytes = client_cfg.get("udp_fd_buffer_size_bytes", DLT_UDP_MULTICAST_FD_BUFFER_SIZE)
self._udp_buffer_size_bytes = client_cfg.get("udp_buffer_size_bytes", DLT_UDP_MULTICAST_BUFFER_SIZE)

def is_valid_message(self, message):
return message and (message.apid != "" or message.ctid != "")
Expand All @@ -420,7 +424,13 @@ def _client_connect(self):
self._port,
self._filename,
)
self._client = DLTClient(servIP=self._ip_address, port=self._port, verbose=self.verbose)
self._client = DLTClient(
servIP=self._ip_address,
port=self._port,
verbose=self.verbose,
udp_fd_buffer_size_bytes=self._udp_fd_buffer_size_bytes,
udp_buffer_size_bytes=self._udp_buffer_size_bytes,
)
connected = self._client.connect(self.timeout)
if connected:
logger.info("DLTClient connected to %s", self._client.servIP)
Expand Down
22 changes: 21 additions & 1 deletion dlt/py_dlt_receive.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import time

from dlt.dlt import DLT_UDP_MULTICAST_FD_BUFFER_SIZE, DLT_UDP_MULTICAST_BUFFER_SIZE
from dlt.dlt_broker import DLTBroker

logging.basicConfig(format="%(asctime)s %(name)s %(levelname)-8s %(message)s")
Expand All @@ -18,13 +19,32 @@ def parse_args():
parser = argparse.ArgumentParser(description="Receive DLT messages")
parser.add_argument("--host", required=True, help="hostname or ip address to connect to")
parser.add_argument("--file", required=True, help="The file into which the messages will be written")
parser.add_argument(
"--udp-fd-buffer-size",
dest="udp_fd_buffer_size",
default=DLT_UDP_MULTICAST_FD_BUFFER_SIZE,
type=int,
help=f"Set the socket buffer size in udp multicast mode. default: {DLT_UDP_MULTICAST_FD_BUFFER_SIZE} bytes",
)
parser.add_argument(
"--udp-buffer-size",
dest="udp_buffer_size",
default=DLT_UDP_MULTICAST_BUFFER_SIZE,
type=int,
help=f"Set the DltReceiver buffer size in udp multicast mode. default: {DLT_UDP_MULTICAST_BUFFER_SIZE} bytes",
)
return parser.parse_args()


def dlt_receive(options):
"""Receive DLT messages via DLTBroker"""
logger.info("Creating DLTBroker instance")
broker = DLTBroker(ip_address=options.host, filename=options.file)
broker = DLTBroker(
ip_address=options.host,
filename=options.file,
udp_fd_buffer_size_bytes=options.udp_buffer_size,
udp_buffer_size_bytes=options.udp_fd_buffer_size,
)

logger.info("Starting DLTBroker")
broker.start() # start the loop
Expand Down

0 comments on commit 60c0c74

Please sign in to comment.