Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EXPORT_DATA2 support #17

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 129 additions & 97 deletions saleae/saleae.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ def __init__(self, host='localhost', port=10429):

def _build(self, s):
'''Convenience method for building up a command to send'''
self._to_send.append(s)
if type(s) is list:
self._to_send.extend(s)
else:
self._to_send.append(s)

def _abort(self):
self._to_send = []
Expand Down Expand Up @@ -326,7 +329,7 @@ def get_bandwidth(self, sample_rate, device = None, channels = None):
adc_width = 12

return sample_rate[0] * len(digital_channels) +\
sample_rate[1] * len(analog_channels) * adc_width
sample_rate[1] * len(analog_channels) * adc_width

def get_performance(self):
'''Get performance value. Performance controls USB traffic and quality.
Expand Down Expand Up @@ -467,16 +470,18 @@ def set_active_channels(self, digital=None, analog=None):
>>> s.set_active_channels([0,1,2,3], [0]) #doctest:+SKIP
'''
# TODO Enfore note from docstring
if digital is None and analog is None:
raise self.ImpossibleSettings("At least one digital or analog channel must be set")
digital_no = 0 if digital is None else len(digital)
analog_no = 0 if analog is None else len(analog)
if digital_no <= 0 and analog_no <= 0:
raise self.ImpossibleSettings('Logic requires at least one activate channel (digital or analog) and none are given')

self._build('SET_ACTIVE_CHANNELS')
self._build('digital_channels')
for ch in digital:
self._build('{}'.format(ch))
self._build('analog_channels')
for ch in analog:
self._build('{}'.format(ch))
if digital_no > 0:
self._build('digital_channels')
self._build(['{0:d}'.format(ch) for ch in digital])
if analog_no > 0:
self._build('analog_channels')
self._build(['{0:d}'.format(ch) for ch in analog])
self._finish()

def reset_active_channels(self):
Expand Down Expand Up @@ -544,112 +549,139 @@ def load_from_file(self, file_path_on_target_machine):
def close_all_tabs(self):
self._cmd('CLOSE_ALL_TABS')

def export_data(self,
file_path_on_target_machine,
digital_channels=None,
analog_channels=None,
analog_format="voltage",
time_span=None, # 'None-->all_time, [x.x, y.y]-->time_span'
format="csv", # 'csv, bin, vcd, matlab'
csv_column_headers=True,
csv_delimeter='comma', # 'comma' or 'tab'
csv_timestamp='time_stamp', # 'time_stamp, sample_number'
csv_combined=True, # 'combined' else 'separate'
csv_row_per_change=True, # 'row_per_change' else 'row_per_sample'
csv_number_format='hex', # dec, hex, bin, ascii
bin_per_change=True, # 'on_change' else 'each_sample'
bin_word_size='8' # 8, 16, 32, 64
):
# export_data, C:\temp_file, digital_channels, 0, 1, analog_channels, 1, voltage, all_time, adc, csv, headers, comma, time_stamp, separate, row_per_change, Dec
# export_data, C:\temp_file, all_channels, time_span, 0.2, 0.4, vcd
# export_data, C:\temp_file, analog_channels, 0, 1, 2, adc, all_time, matlab
def _export_data_analog_binary(self, analog_format='voltage'):
'''Binary analog: [VOLTAGE|ADC]'''

# Do argument verification
if analog_format.lower() not in ['voltage', 'adc']:
raise self.ImpossibleSettings('Unsupported binary analog format')

# Build arguments
self._build(analog_format.upper())

# NOTE: the [EACH_SAMPLE|ON_CHANGE] is the same as the CSV [ROW_PER_CHANGE|ROW_PER_SAMPLE], but I am using name convention from official C# API
def _export_data_digital_binary(self, each_sample=True, no_shift=True, word_size=16):
'''Binary digital: [EACH_SAMPLE|ON_CHANGE], [NO_SHIFT|RIGHT_SHIFT], [8|16|32|64]'''
# Do argument verification
if word_size not in [8, 16, 32, 64]:
raise self.ImpossibleSettings('Unsupported binary word size')

# Build arguments
self._build('EACH_SAMPLE' if each_sample else 'ON_CHANGE')
self._build('NO_SHIFT' if no_shift else 'RIGHT_SHIFT')
self._build(str(word_size))

def _export_data_analog_csv(self, column_headers=True, delimiter='comma', display_base='hex', analog_format='voltage'):
'''CVS export analog/mixed: [HEADERS|NO_HEADERS], [COMMA|TAB], [BIN|DEC|HEX|ASCII], [VOLTAGE|ADC]'''

# Do argument verification
if delimiter.lower() not in ['comma', 'tab']:
raise self.ImpossibleSettings('Unsupported CSV delimiter')
if display_base.lower() not in ['bin', 'dec', 'hex', 'ascii']:
raise self.ImpossibleSettings('Unsupported CSV display base')
if analog_format.lower() not in ['voltage', 'adc']:
raise self.ImpossibleSettings('Unsupported CSV analog format')

# Build arguments
self._build('HEADERS' if column_headers else 'NO_HEADERS')
self._build(delimiter.upper())
self._build(display_base.upper())
self._build(analog_format.upper())

def _export_data_digital_csv(self, column_headers=True, delimiter='comma', timestamp='time_stamp', display_base='hex', rows_per_change=True):
'''CVS export digital: [HEADERS|NO_HEADERS], [COMMA|TAB], [TIME_STAMP|SAMPLE_NUMBER], [COMBINED, [BIN|DEC|HEX|ASCII]|SEPARATE], [ROW_PER_CHANGE|ROW_PER_SAMPLE]'''

# Do argument verification
if delimiter.lower() not in ['comma', 'tab']:
raise self.ImpossibleSettings('Unsupported CSV delimiter')
if timestamp.lower() not in ['time_stamp', 'sample_number']:
raise self.ImpossibleSettings('Unsupported timestamp setting')
if display_base.lower() not in ['bin', 'dec', 'hex', 'ascii', 'separate']:
raise self.ImpossibleSettings('Unsupported CSV display base')

# Build arguments
self._build('HEADERS' if column_headers else 'NO_HEADERS')
self._build(delimiter.upper())
self._build(timestamp.upper())
self._build('SEPERATE' if display_base.lower() == 'SEPERATE' else ['COMBINED', display_base.upper()])
self._build('ROW_PER_CHANGE' if rows_per_change else 'ROW_PER_SAMPLE')

def _export_data_digital_vcd(self):
'''VCD digital: no arguments'''
pass

def _export_data_analog_matlab(self, analog_format='voltage'):
'''Matlab analog: [VOLTAGE|ADC]'''

# Do argument verification
if analog_format.lower() not in ['voltage', 'adc']:
raise self.ImpossibleSettings('Unsupported Matlab analog format')

# Build arguments
self._build(analog_format.upper())

def _export_data_digital_matlab(self):
'''Matlab digital: no arguments'''
pass


def export_data(self, file_path_on_target_machine, digital_channels=None, analog_channels=None, time_span=None, format='csv', **export_args):
'''Export command:
EXPORT_DATA2,
<filename>,
[ALL_CHANNELS|SPECIFIC_CHANNELS, [DIGITAL_ONLY|ANALOG_ONLY|ANALOG_AND_DIGITAL], <channel index> [ANALOG|DIGITAL], ..., <channel index> [ANALOG|DIGITAL]],
[ALL_TIME|TIME_SPAN, <(double)start>, <(double)end>],
[BINARY, <binary settings>|CSV, <csv settings>|VCD|MATLAB, <matlab settings>]
'''
while not self.is_processing_complete():
time.sleep(1)

# The path needs to be absolute. This is hard to check reliably since we
# don't know the OS on the target machine, but we can do a basic check
# for something that will definitely fail
# NOTE: Note to Saleae, Logic should resolve relative paths, I do not see reasons not to do this ...
if file_path_on_target_machine[0] in ('~', '.'):
raise NotImplementedError('File path must be absolute')
raise ValueError('File path must be absolute')

self._build('EXPORT_DATA')
self._build('EXPORT_DATA2')
self._build(file_path_on_target_machine)

# Channel selection
is_analog = False
if (digital_channels is None) and (analog_channels is None):
self._build('all_channels')
analog_channels = self.get_active_channels()[1]
self._build('ALL_CHANNELS')
is_analog = len(self.get_active_channels()[1]) > 0
else:
self._build('SPECIFIC_CHANNELS')
# Check for mixed mode
# NOTE: This feel redundant, we can see if we digital only, analog only or mixed from parsing the channels right?!
# especially given the fact that only ANALOG_AND_DIGITAL is printed and never DIGITAL_ONLY or ANALOG_ONLY (according to Saleae C# implementation)
if digital_channels is not None and len(digital_channels) and analog_channels is not None and len(analog_channels):
self._build('ANALOG_AND_DIGITAL')

# Add in the channels
if digital_channels is not None and len(digital_channels):
self._build('digital_channels')
for ch in digital_channels:
self._build(str(ch))
self._build(['{0:d} DIGITAL'.format(ch) for ch in digital_channels])
if analog_channels is not None and len(analog_channels):
self._build('analog_channels')
for ch in analog_channels:
self._build(str(ch))
if analog_channels is not None and len(analog_channels):
if analog_format not in ('voltage', 'adc'):
raise NotImplementedError("bad analog_format")
self._build(analog_format)
self._build(['{0:d} ANALOG'.format(ch) for ch in analog_channels])
is_analog = True

# Time selection
if time_span is None:
self._build('all_time')
self._build('ALL_TIME')
elif len(time_span) == 2:
self._build('time_span')
self._build(str(time_span[0]))
self._build(str(time_span[1]))
self._build(['TIME_SPAN', '{0:f}'.format(time_span[0]), '{0:f}'.format(time_span[0])])
else:
raise NotImplementedError('invalid time format')

if format == 'csv':
self._build(format)

if csv_column_headers:
self._build('headers')
else:
self._build('no_headers')

if csv_delimeter not in ('comma', 'tab'):
raise NotImplementedError('bad csv delimeter')
self._build(csv_delimeter)

if csv_timestamp not in ('time_stamp', 'sample_number'):
raise NotImplementedError('bad csv timestamp')
self._build(csv_timestamp)

if csv_combined:
self._build('combined')
else:
self._build('separate')

if csv_row_per_change:
self._build('row_per_change')
else:
self._build('row_per_sample')

if csv_number_format not in ('dec', 'hex', 'bin', 'ascii'):
raise NotImplementedError('bad csv number format')
self._build(csv_number_format)
elif format == 'bin':
self._build(format)

if bin_per_change:
self._build('on_change')
else:
self._build('each_sample')
raise self.ImpossibleSettings('Unsupported time span')

if bin_word_size not in ('8', '16', '32', '64'):
raise NotImplementedError('bad bin word size')
self._build(bin_word_size)
# Find exporter
export_name = '_export_data_{0:s}_{1:s}'.format('analog' if is_analog else 'digital', format.lower())
if not hasattr(self, export_name):
raise NotImplementedError('Unsupported export format given ({0:s})'.format(export_name))

elif format in ('vcd', 'matlab'):
# No options for these
self._build(format)
else:
raise NotImplementedError('unknown format')
# Let specific export function handle arguments
self._build(format.upper())
getattr(self, export_name)(**export_args)

self._finish()

time.sleep(0.050) # HACK: Delete me when Logic (saleae) race conditions are fixed

def get_analyzers(self):
'''Return a list of analyzers currently in use, with indexes.'''
Expand Down