Skip to content

Commit

Permalink
Improve read performance when reading data subsets with a large numbe…
Browse files Browse the repository at this point in the history
…r of channels (#342)
  • Loading branch information
johannesloibl authored Dec 11, 2024
1 parent 88ea672 commit 7d4e0b9
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 22 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ coverage.xml
.ipynb_checkpoints
.vscode
*.ipynb
.idea

# Wercker directories
_builds
Expand Down
72 changes: 50 additions & 22 deletions nptdms/tdms_segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ class TdmsSegment(object):
'final_chunk_lengths_override',
'object_index',
'segment_incomplete',
'has_daqmx_objects_cached',
'chunk_size_cached',
'data_objects_cached',
]

def __init__(self, position, toc_mask, next_segment_pos, data_position, segment_incomplete):
Expand All @@ -57,6 +60,9 @@ def __init__(self, position, toc_mask, next_segment_pos, data_position, segment_
self.ordered_objects = None
self.object_index = None
self.segment_incomplete = segment_incomplete
self.has_daqmx_objects_cached = None
self.chunk_size_cached = None
self.data_objects_cached = None

def __repr__(self):
return "<TdmsSegment at position %d>" % self.position
Expand Down Expand Up @@ -135,6 +141,7 @@ def read_segment_objects(self, file, previous_segment_objects, index_cache, prev

if index_cache is not None:
self.object_index = index_cache.get_index(self.ordered_objects)

self._calculate_chunks()
return properties

Expand Down Expand Up @@ -261,18 +268,18 @@ def read_raw_data_for_channel(self, f, channel_path, chunk_offset=0, num_chunks=

f.seek(self.data_position)

data_objects = [o for o in self.ordered_objects if o.has_data]
chunk_size = self._get_chunk_size()

# Ensure we're working with Python ints as np.int32 values could overflow
# (https://github.com/adamreeve/npTDMS/issues/338)
chunk_size = int(chunk_size)
chunk_offset = int(chunk_offset)

if chunk_offset > 0:
f.seek(chunk_size * chunk_offset, os.SEEK_CUR)
stop_chunk = self.num_chunks if num_chunks is None else num_chunks + chunk_offset
for chunk in self._read_channel_data_chunks(f, data_objects, channel_path, chunk_offset, stop_chunk):
for chunk in self._read_channel_data_chunks(
f, self._get_data_objects(), channel_path, chunk_offset, stop_chunk, chunk_size
):
yield chunk

def _calculate_chunks(self):
Expand Down Expand Up @@ -351,11 +358,15 @@ def _new_segment_object(self, object_path, raw_data_index_header):
return TdmsSegmentObject(object_path)

def _get_chunk_size(self):
if self.chunk_size_cached is not None:
return self.chunk_size_cached

if self._have_daqmx_objects():
return get_daqmx_chunk_size(self.ordered_objects)
return sum(
o.data_size
for o in self.ordered_objects if o.has_data)
self.chunk_size_cached = int(get_daqmx_chunk_size(self.ordered_objects))
return self.chunk_size_cached

self.chunk_size_cached = int(sum(o.data_size for o in self.ordered_objects if o.has_data))
return self.chunk_size_cached

def _read_data_chunks(self, file, data_objects, num_chunks):
""" Read multiple data chunks at once
Expand All @@ -365,13 +376,17 @@ def _read_data_chunks(self, file, data_objects, num_chunks):
for chunk in reader.read_data_chunks(file, data_objects, num_chunks):
yield chunk

def _read_channel_data_chunks(self, file, data_objects, channel_path, chunk_offset, stop_chunk):
def _read_channel_data_chunks(self, file, data_objects, channel_path, chunk_offset, stop_chunk, chunk_size):
""" Read multiple data chunks for a single channel at once
In the base case we read each chunk individually but subclasses can override this
"""
reader = self._get_data_reader()
for chunk in reader.read_channel_data_chunks(file, data_objects, channel_path, chunk_offset, stop_chunk):
initial_position = file.tell()
for i, chunk in enumerate(reader.read_channel_data_chunks(
file, data_objects, channel_path, chunk_offset, stop_chunk
)):
yield chunk
file.seek(initial_position + (i + 1) * chunk_size)

def _get_data_reader(self):
endianness = '>' if (self.toc_mask & toc_properties['kTocBigEndian']) else '<'
Expand All @@ -383,6 +398,9 @@ def _get_data_reader(self):
return ContiguousDataReader(self.num_chunks, self.final_chunk_lengths_override, endianness)

def _have_daqmx_objects(self):
if self.has_daqmx_objects_cached is not None:
return self.has_daqmx_objects_cached

data_obj_count = 0
daqmx_count = 0
for o in self.ordered_objects:
Expand All @@ -391,12 +409,12 @@ def _have_daqmx_objects(self):
if isinstance(o, DaqmxSegmentObject):
daqmx_count += 1
if daqmx_count == 0:
return False
if daqmx_count == data_obj_count:
return True
if daqmx_count > 0:
self.has_daqmx_objects_cached = False
elif daqmx_count == data_obj_count:
self.has_daqmx_objects_cached = True
elif daqmx_count > 0:
raise Exception("Cannot read mixed DAQmx and non-DAQmx data")
return False
return self.has_daqmx_objects_cached

def _have_interleaved_data(self):
""" Whether data in this segment is interleaved. Assumes data is not DAQmx.
Expand All @@ -420,6 +438,13 @@ def _have_interleaved_data(self):
else:
raise ValueError("Cannot read interleaved segment containing channels with unsized types")

def _get_data_objects(self):
if self.data_objects_cached is not None:
return self.data_objects_cached

self.data_objects_cached = [o for o in self.ordered_objects if o.has_data]
return self.data_objects_cached


class InterleavedDataReader(BaseDataReader):
""" Reads data in a TDMS segment with interleaved data
Expand Down Expand Up @@ -492,24 +517,27 @@ def _read_channel_data_chunk(self, file, data_objects, chunk_index, channel_path
""" Read data from a chunk for a single channel
"""
channel_data = RawChannelDataChunk.empty()
current_position = file.tell()
for obj in data_objects:
number_values = self._get_channel_number_values(obj, chunk_index)
if obj.path == channel_path:
file.seek(current_position)
channel_data = RawChannelDataChunk.channel_data(obj.read_values(file, number_values, self.endianness))
current_position = file.tell()
break
elif number_values == obj.number_values:
# Seek over data for other channel data
file.seek(obj.data_size, os.SEEK_CUR)
else:
current_position += obj.data_size
elif obj.data_type.size is not None:
# In last chunk with reduced chunk size
if obj.data_type.size is None:
# Type is unsized (eg. string), try reading number of values
obj.read_values(file, number_values, self.endianness)
else:
file.seek(obj.data_type.size * number_values, os.SEEK_CUR)
current_position += obj.data_type.size * number_values
else:
raise Exception("Cannot skip over channel with unsized type in a truncated segment")

return channel_data

def _get_channel_number_values(self, obj, chunk_index):
if chunk_index == (self.num_chunks - 1) and self.final_chunk_lengths_override is not None:
if self.final_chunk_lengths_override is not None and chunk_index == (self.num_chunks - 1):
return self.final_chunk_lengths_override.get(obj.path, 0)
else:
return obj.number_values
Expand Down

0 comments on commit 7d4e0b9

Please sign in to comment.