From 8bcee7b16af96bd826515679b32d708a5182b278 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Herman=20=C3=98ie=20Kolden?= Date: Sun, 30 Jun 2024 15:05:05 +0200 Subject: [PATCH] core: support writing ulog to bytes handle Previously only writing to files was possible. It's useful to be able to write to BytesIO for e.g. returning generated ULog files without having to write an intermediary file. --- pyulog/core.py | 10 ++++++++-- test/test_ulog.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/pyulog/core.py b/pyulog/core.py index 0ad9896..c56ad00 100644 --- a/pyulog/core.py +++ b/pyulog/core.py @@ -3,6 +3,7 @@ import struct import copy import sys +import contextlib import numpy as np #pylint: disable=too-many-instance-attributes, unused-argument, missing-docstring #pylint: disable=protected-access, too-many-branches @@ -236,9 +237,14 @@ def get_dataset(self, name, multi_instance=0): return [elem for elem in self._data_list if elem.name == name and elem.multi_id == multi_instance][0] - def write_ulog(self, path): + def write_ulog(self, log_file): """ write current data back into a ulog file """ - with open(path, "wb") as ulog_file: + if isinstance(log_file, str): + handle = open(log_file, "wb") + else: + handle = contextlib.nullcontext(log_file) + + with handle as ulog_file: # Definition section self._write_file_header(ulog_file) self._write_flags(ulog_file) diff --git a/test/test_ulog.py b/test/test_ulog.py index 26d5ecf..d135ef3 100644 --- a/test/test_ulog.py +++ b/test/test_ulog.py @@ -6,6 +6,7 @@ import inspect import unittest import tempfile +from io import BytesIO from ddt import ddt, data @@ -59,4 +60,33 @@ def test_write_ulog(self, base_name): assert copied == original + @data('sample') + def test_write_ulog_memory(self, base_name): + ''' + Test that the write_ulog method can write bytes to memory. + ''' + ulog_file_name = os.path.join(TEST_PATH, base_name + '.ulg') + original = pyulog.ULog(ulog_file_name) + with BytesIO() as bytes_handle: + original.write_ulog(bytes_handle) + bytes_handle.seek(0) + copied = pyulog.ULog(bytes_handle) + + for original_key, original_value in original.__dict__.items(): + copied_value = getattr(copied, original_key) + if original_key == '_sync_seq_cnt': + # Sync messages are counted on parse, but otherwise dropped, so + # we don't rewrite them + assert copied_value == 0 + elif original_key == '_appended_offsets': + # Abruptly ended messages just before offsets are dropped, so + # we don't rewrite appended offsets + assert copied_value == [] + elif original_key == '_incompat_flags': + # Same reasoning on incompat_flags[0] as for '_appended_offsets' + assert copied_value[0] == original_value[0] & 0xFE # pylint: disable=unsubscriptable-object + assert copied_value[1:] == original_value[1:] # pylint: disable=unsubscriptable-object + else: + assert copied_value == original_value + # vim: set et fenc=utf-8 ft=python ff=unix sts=4 sw=4 ts=4