From 7d4e0b906328d7fc3b8eafa415bf9cccabdb364d Mon Sep 17 00:00:00 2001 From: Johannes Loibl <48917891+johannesloibl@users.noreply.github.com> Date: Wed, 11 Dec 2024 21:04:10 +0100 Subject: [PATCH] Improve read performance when reading data subsets with a large number of channels (#342) --- .gitignore | 1 + nptdms/tdms_segment.py | 72 +++++++++++++++++++++++++++++------------- 2 files changed, 51 insertions(+), 22 deletions(-) diff --git a/.gitignore b/.gitignore index 384e350..8f159db 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,7 @@ coverage.xml .ipynb_checkpoints .vscode *.ipynb +.idea # Wercker directories _builds diff --git a/nptdms/tdms_segment.py b/nptdms/tdms_segment.py index a48893b..cacde39 100644 --- a/nptdms/tdms_segment.py +++ b/nptdms/tdms_segment.py @@ -45,6 +45,9 @@ class TdmsSegment(object): 'final_chunk_lengths_override', 'object_index', 'segment_incomplete', + 'has_daqmx_objects_cached', + 'chunk_size_cached', + 'data_objects_cached', ] def __init__(self, position, toc_mask, next_segment_pos, data_position, segment_incomplete): @@ -57,6 +60,9 @@ def __init__(self, position, toc_mask, next_segment_pos, data_position, segment_ self.ordered_objects = None self.object_index = None self.segment_incomplete = segment_incomplete + self.has_daqmx_objects_cached = None + self.chunk_size_cached = None + self.data_objects_cached = None def __repr__(self): return "" % self.position @@ -135,6 +141,7 @@ def read_segment_objects(self, file, previous_segment_objects, index_cache, prev if index_cache is not None: self.object_index = index_cache.get_index(self.ordered_objects) + self._calculate_chunks() return properties @@ -261,18 +268,18 @@ def read_raw_data_for_channel(self, f, channel_path, chunk_offset=0, num_chunks= f.seek(self.data_position) - data_objects = [o for o in self.ordered_objects if o.has_data] chunk_size = self._get_chunk_size() # Ensure we're working with Python ints as np.int32 values could overflow # (https://github.com/adamreeve/npTDMS/issues/338) - chunk_size = int(chunk_size) chunk_offset = int(chunk_offset) if chunk_offset > 0: f.seek(chunk_size * chunk_offset, os.SEEK_CUR) stop_chunk = self.num_chunks if num_chunks is None else num_chunks + chunk_offset - for chunk in self._read_channel_data_chunks(f, data_objects, channel_path, chunk_offset, stop_chunk): + for chunk in self._read_channel_data_chunks( + f, self._get_data_objects(), channel_path, chunk_offset, stop_chunk, chunk_size + ): yield chunk def _calculate_chunks(self): @@ -351,11 +358,15 @@ def _new_segment_object(self, object_path, raw_data_index_header): return TdmsSegmentObject(object_path) def _get_chunk_size(self): + if self.chunk_size_cached is not None: + return self.chunk_size_cached + if self._have_daqmx_objects(): - return get_daqmx_chunk_size(self.ordered_objects) - return sum( - o.data_size - for o in self.ordered_objects if o.has_data) + self.chunk_size_cached = int(get_daqmx_chunk_size(self.ordered_objects)) + return self.chunk_size_cached + + self.chunk_size_cached = int(sum(o.data_size for o in self.ordered_objects if o.has_data)) + return self.chunk_size_cached def _read_data_chunks(self, file, data_objects, num_chunks): """ Read multiple data chunks at once @@ -365,13 +376,17 @@ def _read_data_chunks(self, file, data_objects, num_chunks): for chunk in reader.read_data_chunks(file, data_objects, num_chunks): yield chunk - def _read_channel_data_chunks(self, file, data_objects, channel_path, chunk_offset, stop_chunk): + def _read_channel_data_chunks(self, file, data_objects, channel_path, chunk_offset, stop_chunk, chunk_size): """ Read multiple data chunks for a single channel at once In the base case we read each chunk individually but subclasses can override this """ reader = self._get_data_reader() - for chunk in reader.read_channel_data_chunks(file, data_objects, channel_path, chunk_offset, stop_chunk): + initial_position = file.tell() + for i, chunk in enumerate(reader.read_channel_data_chunks( + file, data_objects, channel_path, chunk_offset, stop_chunk + )): yield chunk + file.seek(initial_position + (i + 1) * chunk_size) def _get_data_reader(self): endianness = '>' if (self.toc_mask & toc_properties['kTocBigEndian']) else '<' @@ -383,6 +398,9 @@ def _get_data_reader(self): return ContiguousDataReader(self.num_chunks, self.final_chunk_lengths_override, endianness) def _have_daqmx_objects(self): + if self.has_daqmx_objects_cached is not None: + return self.has_daqmx_objects_cached + data_obj_count = 0 daqmx_count = 0 for o in self.ordered_objects: @@ -391,12 +409,12 @@ def _have_daqmx_objects(self): if isinstance(o, DaqmxSegmentObject): daqmx_count += 1 if daqmx_count == 0: - return False - if daqmx_count == data_obj_count: - return True - if daqmx_count > 0: + self.has_daqmx_objects_cached = False + elif daqmx_count == data_obj_count: + self.has_daqmx_objects_cached = True + elif daqmx_count > 0: raise Exception("Cannot read mixed DAQmx and non-DAQmx data") - return False + return self.has_daqmx_objects_cached def _have_interleaved_data(self): """ Whether data in this segment is interleaved. Assumes data is not DAQmx. @@ -420,6 +438,13 @@ def _have_interleaved_data(self): else: raise ValueError("Cannot read interleaved segment containing channels with unsized types") + def _get_data_objects(self): + if self.data_objects_cached is not None: + return self.data_objects_cached + + self.data_objects_cached = [o for o in self.ordered_objects if o.has_data] + return self.data_objects_cached + class InterleavedDataReader(BaseDataReader): """ Reads data in a TDMS segment with interleaved data @@ -492,24 +517,27 @@ def _read_channel_data_chunk(self, file, data_objects, chunk_index, channel_path """ Read data from a chunk for a single channel """ channel_data = RawChannelDataChunk.empty() + current_position = file.tell() for obj in data_objects: number_values = self._get_channel_number_values(obj, chunk_index) if obj.path == channel_path: + file.seek(current_position) channel_data = RawChannelDataChunk.channel_data(obj.read_values(file, number_values, self.endianness)) + current_position = file.tell() + break elif number_values == obj.number_values: # Seek over data for other channel data - file.seek(obj.data_size, os.SEEK_CUR) - else: + current_position += obj.data_size + elif obj.data_type.size is not None: # In last chunk with reduced chunk size - if obj.data_type.size is None: - # Type is unsized (eg. string), try reading number of values - obj.read_values(file, number_values, self.endianness) - else: - file.seek(obj.data_type.size * number_values, os.SEEK_CUR) + current_position += obj.data_type.size * number_values + else: + raise Exception("Cannot skip over channel with unsized type in a truncated segment") + return channel_data def _get_channel_number_values(self, obj, chunk_index): - if chunk_index == (self.num_chunks - 1) and self.final_chunk_lengths_override is not None: + if self.final_chunk_lengths_override is not None and chunk_index == (self.num_chunks - 1): return self.final_chunk_lengths_override.get(obj.path, 0) else: return obj.number_values