diff --git a/pyulog/core.py b/pyulog/core.py index 0ad9896..c56ad00 100644 --- a/pyulog/core.py +++ b/pyulog/core.py @@ -3,6 +3,7 @@ import struct import copy import sys +import contextlib import numpy as np #pylint: disable=too-many-instance-attributes, unused-argument, missing-docstring #pylint: disable=protected-access, too-many-branches @@ -236,9 +237,14 @@ def get_dataset(self, name, multi_instance=0): return [elem for elem in self._data_list if elem.name == name and elem.multi_id == multi_instance][0] - def write_ulog(self, path): + def write_ulog(self, log_file): """ write current data back into a ulog file """ - with open(path, "wb") as ulog_file: + if isinstance(log_file, str): + handle = open(log_file, "wb") + else: + handle = contextlib.nullcontext(log_file) + + with handle as ulog_file: # Definition section self._write_file_header(ulog_file) self._write_flags(ulog_file) diff --git a/test/test_ulog.py b/test/test_ulog.py index 26d5ecf..d135ef3 100644 --- a/test/test_ulog.py +++ b/test/test_ulog.py @@ -6,6 +6,7 @@ import inspect import unittest import tempfile +from io import BytesIO from ddt import ddt, data @@ -59,4 +60,33 @@ def test_write_ulog(self, base_name): assert copied == original + @data('sample') + def test_write_ulog_memory(self, base_name): + ''' + Test that the write_ulog method can write bytes to memory. + ''' + ulog_file_name = os.path.join(TEST_PATH, base_name + '.ulg') + original = pyulog.ULog(ulog_file_name) + with BytesIO() as bytes_handle: + original.write_ulog(bytes_handle) + bytes_handle.seek(0) + copied = pyulog.ULog(bytes_handle) + + for original_key, original_value in original.__dict__.items(): + copied_value = getattr(copied, original_key) + if original_key == '_sync_seq_cnt': + # Sync messages are counted on parse, but otherwise dropped, so + # we don't rewrite them + assert copied_value == 0 + elif original_key == '_appended_offsets': + # Abruptly ended messages just before offsets are dropped, so + # we don't rewrite appended offsets + assert copied_value == [] + elif original_key == '_incompat_flags': + # Same reasoning on incompat_flags[0] as for '_appended_offsets' + assert copied_value[0] == original_value[0] & 0xFE # pylint: disable=unsubscriptable-object + assert copied_value[1:] == original_value[1:] # pylint: disable=unsubscriptable-object + else: + assert copied_value == original_value + # vim: set et fenc=utf-8 ft=python ff=unix sts=4 sw=4 ts=4