From 095a2c180f096d6b035d089b1f7780e5e5e6deea Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Thu, 14 Mar 2024 11:01:32 +1300 Subject: [PATCH] Expose information about incomplete segments (#327) --- docs/apireference.rst | 6 +++++ nptdms/reader.py | 4 +-- nptdms/tdms.py | 48 ++++++++++++++++++++++++++++++++++ nptdms/tdms_segment.py | 23 ++++++++-------- nptdms/test/test_daqmx.py | 16 ++++++++++++ nptdms/test/test_tdms_file.py | 49 +++++++++++++++++++++++++++++++++++ 6 files changed, 133 insertions(+), 13 deletions(-) diff --git a/docs/apireference.rst b/docs/apireference.rst index aa77446..6d889f8 100644 --- a/docs/apireference.rst +++ b/docs/apireference.rst @@ -27,6 +27,12 @@ Reading TDMS Files .. autoclass:: ChannelDataChunk() :members: +.. autoclass:: nptdms.tdms.FileStatus() + :members: + +.. autoclass:: nptdms.tdms.ChannelSegmentStatus() + :members: + Writing TDMS Files ------------------ diff --git a/nptdms/reader.py b/nptdms/reader.py index 12f284b..8c7b031 100644 --- a/nptdms/reader.py +++ b/nptdms/reader.py @@ -275,10 +275,10 @@ def _read_segment_metadata( file, segment_position, is_index_file) segment = TdmsSegment( - position, toc_mask, next_segment_pos, data_position) + position, toc_mask, next_segment_pos, data_position, segment_incomplete) properties = segment.read_segment_objects( - file, self._prev_segment_objects, index_cache, previous_segment, segment_incomplete) + file, self._prev_segment_objects, index_cache, previous_segment) return segment, properties def _read_lead_in(self, file, segment_position, is_index_file=False): diff --git a/nptdms/tdms.py b/nptdms/tdms.py index df2e1cb..1c1270e 100644 --- a/nptdms/tdms.py +++ b/nptdms/tdms.py @@ -164,6 +164,33 @@ def properties(self): return self._properties + @property + def file_status(self): + """ Return information about the file status + + :rtype: FileStatus + """ + + incomplete_final_segment = False + channel_statuses = None + if self._reader._segments: + last_segment = self._reader._segments[-1] + incomplete_final_segment = last_segment.segment_incomplete + last_chunk_overrides = last_segment.final_chunk_lengths_override + if last_chunk_overrides is not None: + channel_statuses = dict( + (obj.path, ChannelSegmentStatus(obj.number_values, last_chunk_overrides.get(obj.path, 0))) + for obj in last_segment.ordered_objects + if obj.has_data) + elif incomplete_final_segment: + # Data lengths match expected lengths + channel_statuses = dict( + (obj.path, ChannelSegmentStatus(obj.number_values, obj.number_values)) + for obj in last_segment.ordered_objects + if obj.has_data) + + return FileStatus(incomplete_final_segment, channel_statuses) + def as_dataframe(self, time_index=False, absolute_time=False, scaled_data=True, arrow_dtypes=False): """ Converts the TDMS file to a DataFrame. DataFrame columns are named using the TDMS object paths. @@ -955,6 +982,27 @@ def _data(self): return self._raw_data.data +class FileStatus: + """ + Contains status information about a read TDMS file + """ + def __init__(self, incomplete_final_segment, channel_statuses): + #: Boolean indicating whether the last data segment was not written completely, + #: meaning it may contain less data than expected + self.incomplete_final_segment = incomplete_final_segment + #: Dictionary mapping from channel paths to ChannelSegmentStatus objects + #: when the last segment is incomplete or had an unexpected length + self.channel_statuses = channel_statuses + + +class ChannelSegmentStatus: + def __init__(self, expected_length, read_length): + #: Number of values expected in the segment + self.expected_length = expected_length + #: Number of values read from the segment + self.read_length = read_length + + def _convert_data_chunk(chunk, raw_timestamps): for channel_chunk in chunk.channel_data.values(): _convert_channel_data_chunk(channel_chunk, raw_timestamps) diff --git a/nptdms/tdms_segment.py b/nptdms/tdms_segment.py index 6c753ff..0e6110a 100644 --- a/nptdms/tdms_segment.py +++ b/nptdms/tdms_segment.py @@ -44,9 +44,10 @@ class TdmsSegment(object): 'data_position', 'final_chunk_lengths_override', 'object_index', + 'segment_incomplete', ] - def __init__(self, position, toc_mask, next_segment_pos, data_position): + def __init__(self, position, toc_mask, next_segment_pos, data_position, segment_incomplete): self.position = position self.toc_mask = toc_mask self.next_segment_pos = next_segment_pos @@ -55,11 +56,12 @@ def __init__(self, position, toc_mask, next_segment_pos, data_position): self.final_chunk_lengths_override = None self.ordered_objects = None self.object_index = None + self.segment_incomplete = segment_incomplete def __repr__(self): return "" % self.position - def read_segment_objects(self, file, previous_segment_objects, index_cache, previous_segment, segment_incomplete): + def read_segment_objects(self, file, previous_segment_objects, index_cache, previous_segment): """Read segment metadata section and update object information :param file: Open TDMS file @@ -67,11 +69,10 @@ def read_segment_objects(self, file, previous_segment_objects, index_cache, prev recently read segment object for a TDMS object. :param index_cache: A SegmentIndexCache instance, or None if segment indexes are not required. :param previous_segment: Previous segment in the file. - :param segment_incomplete: Whether the next segment offset was not set. """ if not self.toc_mask & toc_properties['kTocMetaData']: - self._reuse_previous_segment_metadata(previous_segment, segment_incomplete) + self._reuse_previous_segment_metadata(previous_segment) return endianness = '>' if (self.toc_mask & toc_properties['kTocBigEndian']) else '<' @@ -134,7 +135,7 @@ def read_segment_objects(self, file, previous_segment_objects, index_cache, prev if index_cache is not None: self.object_index = index_cache.get_index(self.ordered_objects) - self._calculate_chunks(segment_incomplete) + self._calculate_chunks() return properties def get_segment_object(self, object_path): @@ -194,11 +195,11 @@ def _reuse_previous_object( segment_obj.read_raw_data_index(file, raw_data_index_header, endianness) self.ordered_objects.append(segment_obj) - def _reuse_previous_segment_metadata(self, previous_segment, segment_incomplete): + def _reuse_previous_segment_metadata(self, previous_segment): try: self.ordered_objects = previous_segment.ordered_objects self.object_index = previous_segment.object_index - self._calculate_chunks(segment_incomplete) + self._calculate_chunks() except AttributeError: raise ValueError( "kTocMetaData is not set for segment but " @@ -269,7 +270,7 @@ def read_raw_data_for_channel(self, f, channel_path, chunk_offset=0, num_chunks= for chunk in self._read_channel_data_chunks(f, data_objects, channel_path, chunk_offset, stop_chunk): yield chunk - def _calculate_chunks(self, segment_incomplete): + def _calculate_chunks(self): """ Work out the number of chunks the data is in, for cases where the meta data doesn't change at all so there is no @@ -299,9 +300,9 @@ def _calculate_chunks(self, segment_incomplete): total_data_size, data_size) self.num_chunks = 1 + int(total_data_size // data_size) self.final_chunk_lengths_override = self._compute_final_chunk_lengths( - data_size, chunk_remainder, segment_incomplete) + data_size, chunk_remainder) - def _compute_final_chunk_lengths(self, chunk_size, chunk_remainder, segment_incomplete): + def _compute_final_chunk_lengths(self, chunk_size, chunk_remainder): """Compute object data lengths for a final chunk that has less data than expected """ if self._have_daqmx_objects(): @@ -314,7 +315,7 @@ def _compute_final_chunk_lengths(self, chunk_size, chunk_remainder, segment_inco return obj_chunk_sizes interleaved_data = self.toc_mask & toc_properties['kTocInterleavedData'] - if interleaved_data or not segment_incomplete: + if interleaved_data or not self.segment_incomplete: for obj in self.ordered_objects: if not obj.has_data: continue diff --git a/nptdms/test/test_daqmx.py b/nptdms/test/test_daqmx.py index 882b9c0..782687f 100644 --- a/nptdms/test/test_daqmx.py +++ b/nptdms/test/test_daqmx.py @@ -625,6 +625,22 @@ def test_incomplete_segment_with_different_length_buffers(): np.testing.assert_array_equal(group["Channel5"][:], [5] * 2) np.testing.assert_array_equal(group["Channel6"][:], [6] * 2) + file_status = tdms_data.file_status + assert file_status.incomplete_final_segment + assert file_status.channel_statuses is not None + for channel in ["Channel1", "Channel2"]: + channel_status = file_status.channel_statuses[f"/'Group'/'{channel}'"] + assert channel_status.expected_length == 4 + assert channel_status.read_length == 4 + for channel in ["Channel3", "Channel4"]: + channel_status = file_status.channel_statuses[f"/'Group'/'{channel}'"] + assert channel_status.expected_length == 2 + assert channel_status.read_length == 1 + for channel in ["Channel5", "Channel6"]: + channel_status = file_status.channel_statuses[f"/'Group'/'{channel}'"] + assert channel_status.expected_length == 1 + assert channel_status.read_length == 0 + def test_multiple_raw_data_buffers_with_scalers_split_across_buffers(): """ DAQmx with scalers split across different raw data buffers diff --git a/nptdms/test/test_tdms_file.py b/nptdms/test/test_tdms_file.py index e4b1cc6..7c6c67c 100644 --- a/nptdms/test/test_tdms_file.py +++ b/nptdms/test/test_tdms_file.py @@ -777,6 +777,12 @@ def test_incomplete_segment_with_string_data(): channel = tdms_data["Group"]["StringChannel"] assert len(channel) == 0 + file_status = tdms_data.file_status + assert file_status.incomplete_final_segment + channel_status = file_status.channel_statuses["/'Group'/'StringChannel'"] + assert channel_status.expected_length == 2 + assert channel_status.read_length == 0 + def test_truncated_interleaved_data(): """ @@ -807,6 +813,49 @@ def test_truncated_interleaved_data(): assert len(chan) == 3 assert len(chan_data) == 3 + file_status = tdms_file.file_status + assert file_status.incomplete_final_segment + chan1_status = file_status.channel_statuses["/'group'/'channel1'"] + assert chan1_status.expected_length == 4 + assert chan1_status.read_length == 3 + chan2_status = file_status.channel_statuses["/'group'/'channel2'"] + assert chan2_status.expected_length == 4 + assert chan2_status.read_length == 3 + + +def test_incomplete_last_segment_with_all_data_present(): + """ Last segment doesn't have length set, but all data can be read + """ + test_file = GeneratedFile() + test_file.add_segment( + ("kTocMetaData", "kTocRawData", "kTocNewObjList"), + segment_objects_metadata( + channel_metadata("/'group'/'channel1'", 3, 2), + channel_metadata("/'group'/'channel2'", 3, 2), + ), + "01 00 00 00" "02 00 00 00" + "05 00 00 00" "06 00 00 00" + ) + test_file.add_segment( + ("kTocRawData", ), + "", + "03 00 00 00" "04 00 00 00" + "07 00 00 00" "08 00 00 00", + incomplete=True + ) + + tdms_data = test_file.load() + + compare_arrays(tdms_data['group']['channel1'][:], np.array([1, 2, 3, 4], dtype=np.int32)) + compare_arrays(tdms_data['group']['channel2'][:], np.array([5, 6, 7, 8], dtype=np.int32)) + + file_status = tdms_data.file_status + assert file_status.incomplete_final_segment + for channel in ['channel1', 'channel2']: + chan_status = file_status.channel_statuses[f"/'group'/'{channel}'"] + assert chan_status.expected_length == 2 + assert chan_status.read_length == 2 + def test_truncated_metadata_in_last_segment(): """ Test the scenario where writing the file was aborted with part of the metadata written