Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose information about incomplete final segments #327

Merged
merged 1 commit into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/apireference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ Reading TDMS Files
.. autoclass:: ChannelDataChunk()
:members:

.. autoclass:: nptdms.tdms.FileStatus()
:members:

.. autoclass:: nptdms.tdms.ChannelSegmentStatus()
:members:

Writing TDMS Files
------------------

Expand Down
4 changes: 2 additions & 2 deletions nptdms/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,10 @@ def _read_segment_metadata(
file, segment_position, is_index_file)

segment = TdmsSegment(
position, toc_mask, next_segment_pos, data_position)
position, toc_mask, next_segment_pos, data_position, segment_incomplete)

properties = segment.read_segment_objects(
file, self._prev_segment_objects, index_cache, previous_segment, segment_incomplete)
file, self._prev_segment_objects, index_cache, previous_segment)
return segment, properties

def _read_lead_in(self, file, segment_position, is_index_file=False):
Expand Down
48 changes: 48 additions & 0 deletions nptdms/tdms.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,33 @@ def properties(self):

return self._properties

@property
def file_status(self):
""" Return information about the file status

:rtype: FileStatus
"""

incomplete_final_segment = False
channel_statuses = None
if self._reader._segments:
last_segment = self._reader._segments[-1]
incomplete_final_segment = last_segment.segment_incomplete
last_chunk_overrides = last_segment.final_chunk_lengths_override
if last_chunk_overrides is not None:
channel_statuses = dict(
(obj.path, ChannelSegmentStatus(obj.number_values, last_chunk_overrides.get(obj.path, 0)))
for obj in last_segment.ordered_objects
if obj.has_data)
elif incomplete_final_segment:
# Data lengths match expected lengths
channel_statuses = dict(
(obj.path, ChannelSegmentStatus(obj.number_values, obj.number_values))
for obj in last_segment.ordered_objects
if obj.has_data)

return FileStatus(incomplete_final_segment, channel_statuses)

def as_dataframe(self, time_index=False, absolute_time=False, scaled_data=True, arrow_dtypes=False):
"""
Converts the TDMS file to a DataFrame. DataFrame columns are named using the TDMS object paths.
Expand Down Expand Up @@ -955,6 +982,27 @@ def _data(self):
return self._raw_data.data


class FileStatus:
"""
Contains status information about a read TDMS file
"""
def __init__(self, incomplete_final_segment, channel_statuses):
#: Boolean indicating whether the last data segment was not written completely,
#: meaning it may contain less data than expected
self.incomplete_final_segment = incomplete_final_segment
#: Dictionary mapping from channel paths to ChannelSegmentStatus objects
#: when the last segment is incomplete or had an unexpected length
self.channel_statuses = channel_statuses


class ChannelSegmentStatus:
def __init__(self, expected_length, read_length):
#: Number of values expected in the segment
self.expected_length = expected_length
#: Number of values read from the segment
self.read_length = read_length


def _convert_data_chunk(chunk, raw_timestamps):
for channel_chunk in chunk.channel_data.values():
_convert_channel_data_chunk(channel_chunk, raw_timestamps)
Expand Down
23 changes: 12 additions & 11 deletions nptdms/tdms_segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ class TdmsSegment(object):
'data_position',
'final_chunk_lengths_override',
'object_index',
'segment_incomplete',
]

def __init__(self, position, toc_mask, next_segment_pos, data_position):
def __init__(self, position, toc_mask, next_segment_pos, data_position, segment_incomplete):
self.position = position
self.toc_mask = toc_mask
self.next_segment_pos = next_segment_pos
Expand All @@ -55,23 +56,23 @@ def __init__(self, position, toc_mask, next_segment_pos, data_position):
self.final_chunk_lengths_override = None
self.ordered_objects = None
self.object_index = None
self.segment_incomplete = segment_incomplete

def __repr__(self):
return "<TdmsSegment at position %d>" % self.position

def read_segment_objects(self, file, previous_segment_objects, index_cache, previous_segment, segment_incomplete):
def read_segment_objects(self, file, previous_segment_objects, index_cache, previous_segment):
"""Read segment metadata section and update object information

:param file: Open TDMS file
:param previous_segment_objects: Dictionary of path to the most
recently read segment object for a TDMS object.
:param index_cache: A SegmentIndexCache instance, or None if segment indexes are not required.
:param previous_segment: Previous segment in the file.
:param segment_incomplete: Whether the next segment offset was not set.
"""

if not self.toc_mask & toc_properties['kTocMetaData']:
self._reuse_previous_segment_metadata(previous_segment, segment_incomplete)
self._reuse_previous_segment_metadata(previous_segment)
return

endianness = '>' if (self.toc_mask & toc_properties['kTocBigEndian']) else '<'
Expand Down Expand Up @@ -134,7 +135,7 @@ def read_segment_objects(self, file, previous_segment_objects, index_cache, prev

if index_cache is not None:
self.object_index = index_cache.get_index(self.ordered_objects)
self._calculate_chunks(segment_incomplete)
self._calculate_chunks()
return properties

def get_segment_object(self, object_path):
Expand Down Expand Up @@ -194,11 +195,11 @@ def _reuse_previous_object(
segment_obj.read_raw_data_index(file, raw_data_index_header, endianness)
self.ordered_objects.append(segment_obj)

def _reuse_previous_segment_metadata(self, previous_segment, segment_incomplete):
def _reuse_previous_segment_metadata(self, previous_segment):
try:
self.ordered_objects = previous_segment.ordered_objects
self.object_index = previous_segment.object_index
self._calculate_chunks(segment_incomplete)
self._calculate_chunks()
except AttributeError:
raise ValueError(
"kTocMetaData is not set for segment but "
Expand Down Expand Up @@ -269,7 +270,7 @@ def read_raw_data_for_channel(self, f, channel_path, chunk_offset=0, num_chunks=
for chunk in self._read_channel_data_chunks(f, data_objects, channel_path, chunk_offset, stop_chunk):
yield chunk

def _calculate_chunks(self, segment_incomplete):
def _calculate_chunks(self):
"""
Work out the number of chunks the data is in, for cases
where the meta data doesn't change at all so there is no
Expand Down Expand Up @@ -299,9 +300,9 @@ def _calculate_chunks(self, segment_incomplete):
total_data_size, data_size)
self.num_chunks = 1 + int(total_data_size // data_size)
self.final_chunk_lengths_override = self._compute_final_chunk_lengths(
data_size, chunk_remainder, segment_incomplete)
data_size, chunk_remainder)

def _compute_final_chunk_lengths(self, chunk_size, chunk_remainder, segment_incomplete):
def _compute_final_chunk_lengths(self, chunk_size, chunk_remainder):
"""Compute object data lengths for a final chunk that has less data than expected
"""
if self._have_daqmx_objects():
Expand All @@ -314,7 +315,7 @@ def _compute_final_chunk_lengths(self, chunk_size, chunk_remainder, segment_inco
return obj_chunk_sizes

interleaved_data = self.toc_mask & toc_properties['kTocInterleavedData']
if interleaved_data or not segment_incomplete:
if interleaved_data or not self.segment_incomplete:
for obj in self.ordered_objects:
if not obj.has_data:
continue
Expand Down
16 changes: 16 additions & 0 deletions nptdms/test/test_daqmx.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,22 @@ def test_incomplete_segment_with_different_length_buffers():
np.testing.assert_array_equal(group["Channel5"][:], [5] * 2)
np.testing.assert_array_equal(group["Channel6"][:], [6] * 2)

file_status = tdms_data.file_status
assert file_status.incomplete_final_segment
assert file_status.channel_statuses is not None
for channel in ["Channel1", "Channel2"]:
channel_status = file_status.channel_statuses[f"/'Group'/'{channel}'"]
assert channel_status.expected_length == 4
assert channel_status.read_length == 4
for channel in ["Channel3", "Channel4"]:
channel_status = file_status.channel_statuses[f"/'Group'/'{channel}'"]
assert channel_status.expected_length == 2
assert channel_status.read_length == 1
for channel in ["Channel5", "Channel6"]:
channel_status = file_status.channel_statuses[f"/'Group'/'{channel}'"]
assert channel_status.expected_length == 1
assert channel_status.read_length == 0


def test_multiple_raw_data_buffers_with_scalers_split_across_buffers():
""" DAQmx with scalers split across different raw data buffers
Expand Down
49 changes: 49 additions & 0 deletions nptdms/test/test_tdms_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,12 @@ def test_incomplete_segment_with_string_data():
channel = tdms_data["Group"]["StringChannel"]
assert len(channel) == 0

file_status = tdms_data.file_status
assert file_status.incomplete_final_segment
channel_status = file_status.channel_statuses["/'Group'/'StringChannel'"]
assert channel_status.expected_length == 2
assert channel_status.read_length == 0


def test_truncated_interleaved_data():
"""
Expand Down Expand Up @@ -807,6 +813,49 @@ def test_truncated_interleaved_data():
assert len(chan) == 3
assert len(chan_data) == 3

file_status = tdms_file.file_status
assert file_status.incomplete_final_segment
chan1_status = file_status.channel_statuses["/'group'/'channel1'"]
assert chan1_status.expected_length == 4
assert chan1_status.read_length == 3
chan2_status = file_status.channel_statuses["/'group'/'channel2'"]
assert chan2_status.expected_length == 4
assert chan2_status.read_length == 3


def test_incomplete_last_segment_with_all_data_present():
""" Last segment doesn't have length set, but all data can be read
"""
test_file = GeneratedFile()
test_file.add_segment(
("kTocMetaData", "kTocRawData", "kTocNewObjList"),
segment_objects_metadata(
channel_metadata("/'group'/'channel1'", 3, 2),
channel_metadata("/'group'/'channel2'", 3, 2),
),
"01 00 00 00" "02 00 00 00"
"05 00 00 00" "06 00 00 00"
)
test_file.add_segment(
("kTocRawData", ),
"",
"03 00 00 00" "04 00 00 00"
"07 00 00 00" "08 00 00 00",
incomplete=True
)

tdms_data = test_file.load()

compare_arrays(tdms_data['group']['channel1'][:], np.array([1, 2, 3, 4], dtype=np.int32))
compare_arrays(tdms_data['group']['channel2'][:], np.array([5, 6, 7, 8], dtype=np.int32))

file_status = tdms_data.file_status
assert file_status.incomplete_final_segment
for channel in ['channel1', 'channel2']:
chan_status = file_status.channel_statuses[f"/'group'/'{channel}'"]
assert chan_status.expected_length == 2
assert chan_status.read_length == 2


def test_truncated_metadata_in_last_segment():
""" Test the scenario where writing the file was aborted with part of the metadata written
Expand Down
Loading