Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PEP8 compliance #16

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 48 additions & 45 deletions EDF.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@
import re
import warnings


def padtrim(buf, num):
num -= len(buf)
if num>=0:
if num >= 0:
# pad the input to the specified length
return str(buf) + ' ' * num
else:
# trim the input to the specified length
return buf[0:num]


####################################################################################################
# the EDF header is represented as a tuple of (meas_info, chan_info)
# meas_info should have ['record_length', 'magic', 'hour', 'subject_id', 'recording_id', 'n_records', 'month', 'subtype', 'second', 'nchan', 'data_size', 'data_offset', 'lowpass', 'year', 'highpass', 'day', 'minute']
Expand Down Expand Up @@ -65,7 +67,6 @@ def close(self):
self.calibrate = None
self.offset = None
self.n_records = 0
return

def writeHeader(self, header):
meas_info = header[0]
Expand All @@ -76,18 +77,18 @@ def writeHeader(self, header):
assert(fid.tell() == 0)

# fill in the missing or incomplete information
if not 'subject_id' in meas_info:
if 'subject_id' not in meas_info:
meas_info['subject_id'] = ''
if not 'recording_id' in meas_info:
if 'recording_id' not in meas_info:
meas_info['recording_id'] = ''
if not 'subtype' in meas_info:
if 'subtype' not in meas_info:
meas_info['subtype'] = 'edf'
nchan = meas_info['nchan']
if not 'ch_names' in chan_info or len(chan_info['ch_names'])<nchan:
if 'ch_names' not in chan_info or len(chan_info['ch_names']) < nchan:
chan_info['ch_names'] = [str(i) for i in range(nchan)]
if not 'transducers' in chan_info or len(chan_info['transducers'])<nchan:
if 'transducers' not in chan_info or len(chan_info['transducers']) < nchan:
chan_info['transducers'] = ['' for i in range(nchan)]
if not 'units' in chan_info or len(chan_info['units'])<nchan:
if 'units' not in chan_info or len(chan_info['units']) < nchan:
chan_info['units'] = ['' for i in range(nchan)]

if meas_info['subtype'] in ('24BIT', 'bdf'):
Expand All @@ -111,11 +112,11 @@ def writeHeader(self, header):
chan_info[key] = np.asarray(chan_info[key])

for i in range(meas_info['nchan']):
fid.write(padtrim( chan_info['ch_names'][i], 16))
fid.write(padtrim(chan_info['ch_names'][i], 16))
for i in range(meas_info['nchan']):
fid.write(padtrim( chan_info['transducers'][i], 80))
fid.write(padtrim(chan_info['transducers'][i], 80))
for i in range(meas_info['nchan']):
fid.write(padtrim( chan_info['units'][i], 8))
fid.write(padtrim(chan_info['units'][i], 8))
for i in range(meas_info['nchan']):
fid.write(padtrim(str(chan_info['physical_min'][i]), 8))
for i in range(meas_info['nchan']):
Expand All @@ -125,22 +126,22 @@ def writeHeader(self, header):
for i in range(meas_info['nchan']):
fid.write(padtrim(str(chan_info['digital_max'][i]), 8))
for i in range(meas_info['nchan']):
fid.write(' ' * 80) # prefiltering
fid.write(' ' * 80) # prefiltering
for i in range(meas_info['nchan']):
fid.write(padtrim(str(chan_info['n_samps'][i]), 8))
for i in range(meas_info['nchan']):
fid.write(' ' * 32) # reserved
fid.write(' ' * 32) # reserved
meas_info['data_offset'] = fid.tell()

self.meas_info = meas_info
self.chan_info = chan_info
self.calibrate = (chan_info['physical_max'] - chan_info['physical_min'])/(chan_info['digital_max'] - chan_info['digital_min']);
self.offset = chan_info['physical_min'] - self.calibrate * chan_info['digital_min'];
self.calibrate = (chan_info['physical_max'] - chan_info['physical_min']) / (chan_info['digital_max'] - chan_info['digital_min'])
self.offset = chan_info['physical_min'] - self.calibrate * chan_info['digital_min']
channels = list(range(meas_info['nchan']))
for ch in channels:
if self.calibrate[ch]<0:
self.calibrate[ch] = 1;
self.offset[ch] = 0;
if self.calibrate[ch] < 0:
self.calibrate[ch] = 1
self.offset[ch] = 0

def writeBlock(self, data):
meas_info = self.meas_info
Expand All @@ -150,11 +151,11 @@ def writeBlock(self, data):
for i in range(meas_info['nchan']):
raw = deepcopy(data[i])

assert(len(raw)==chan_info['n_samps'][i])
if min(raw)<chan_info['physical_min'][i]:
warnings.warn('Value exceeds physical_min: ' + str(min(raw)) );
if max(raw)>chan_info['physical_max'][i]:
warnings.warn('Value exceeds physical_max: '+ str(max(raw)));
assert(len(raw) == chan_info['n_samps'][i])
if min(raw) < chan_info['physical_min'][i]:
warnings.warn('Value exceeds physical_min: ' + str(min(raw)))
if max(raw) > chan_info['physical_max'][i]:
warnings.warn('Value exceeds physical_max: ' + str(max(raw)))

raw -= self.offset[i] # FIXME I am not sure about the order of calibrate and offset
raw /= self.calibrate[i]
Expand All @@ -167,6 +168,7 @@ def writeBlock(self, data):

####################################################################################################


class EDFReader():
def __init__(self, fname=None):
self.fname = None
Expand Down Expand Up @@ -203,8 +205,8 @@ def readHeader(self):
meas_info['subject_id'] = fid.read(80).strip().decode() # subject id
meas_info['recording_id'] = fid.read(80).strip().decode() # recording id

day, month, year = [int(x) for x in re.findall('(\d+)', fid.read(8).decode())]
hour, minute, second = [int(x) for x in re.findall('(\d+)', fid.read(8).decode())]
day, month, year = [int(x) for x in re.findall(r'(\d+)', fid.read(8).decode())]
hour, minute, second = [int(x) for x in re.findall(r'(\d+)', fid.read(8).decode())]
meas_info['day'] = day
meas_info['month'] = month
meas_info['year'] = year
Expand Down Expand Up @@ -249,14 +251,14 @@ def readHeader(self):
chan_info['digital_max'] = digital_max = np.array([float(fid.read(8).decode()) for ch in channels])

prefiltering = [fid.read(80).strip().decode() for ch in channels][:-1]
highpass = np.ravel([re.findall('HP:\s+(\w+)', filt) for filt in prefiltering])
lowpass = np.ravel([re.findall('LP:\s+(\w+)', filt) for filt in prefiltering])
high_pass_default = 0.
highpass = np.ravel([re.findall(r'HP:\s+(\w+)', filt) for filt in prefiltering])
lowpass = np.ravel([re.findall(r'LP:\s+(\w+)', filt) for filt in prefiltering])
highpass_default = 0.
if highpass.size == 0:
meas_info['highpass'] = high_pass_default
meas_info['highpass'] = highpass_default
elif all(highpass):
if highpass[0] == 'NaN':
meas_info['highpass'] = high_pass_default
meas_info['highpass'] = highpass_default
elif highpass[0] == 'DC':
meas_info['highpass'] = 0.
else:
Expand All @@ -280,27 +282,27 @@ def readHeader(self):
# number of samples per record
chan_info['n_samps'] = n_samps = np.array([int(fid.read(8).decode()) for ch in channels])

fid.read(32 *meas_info['nchan']).decode() # reserved
fid.read(32 * meas_info['nchan']).decode() # reserved
assert fid.tell() == header_nbytes

if meas_info['n_records']==-1:
if meas_info['n_records'] == -1:
# this happens if the n_records is not updated at the end of recording
tot_samps = (os.path.getsize(self.fname)-meas_info['data_offset'])/meas_info['data_size']
meas_info['n_records'] = tot_samps/sum(n_samps)
tot_samps = (os.path.getsize(self.fname) - meas_info['data_offset']) / meas_info['data_size']
meas_info['n_records'] = tot_samps / sum(n_samps)

self.calibrate = (chan_info['physical_max'] - chan_info['physical_min'])/(chan_info['digital_max'] - chan_info['digital_min']);
self.offset = chan_info['physical_min'] - self.calibrate * chan_info['digital_min'];
self.calibrate = (chan_info['physical_max'] - chan_info['physical_min']) / (chan_info['digital_max'] - chan_info['digital_min'])
self.offset = chan_info['physical_min'] - self.calibrate * chan_info['digital_min']
for ch in channels:
if self.calibrate[ch]<0:
self.calibrate[ch] = 1;
self.offset[ch] = 0;
if self.calibrate[ch] < 0:
self.calibrate[ch] = 1
self.offset[ch] = 0

self.meas_info = meas_info
self.chan_info = chan_info
return (meas_info, chan_info)

def readBlock(self, block):
assert(block>=0)
assert(block >= 0)
meas_info = self.meas_info
chan_info = self.chan_info
data = []
Expand All @@ -309,7 +311,7 @@ def readBlock(self, block):
blocksize = np.sum(chan_info['n_samps']) * meas_info['data_size']
fid.seek(meas_info['data_offset'] + block * blocksize)
for i in range(meas_info['nchan']):
buf = fid.read(chan_info['n_samps'][i]*meas_info['data_size'])
buf = fid.read(chan_info['n_samps'][i] * meas_info['data_size'])
raw = np.asarray(unpack('<{}h'.format(chan_info['n_samps'][i]), buf), dtype=np.float32)
raw *= self.calibrate[i]
raw += self.offset[i] # FIXME I am not sure about the order of calibrate and offset
Expand All @@ -325,8 +327,8 @@ def readSamples(self, channel, begsample, endsample):
data = self.readBlock(begblock)[channel]
for block in range(begblock+1, endblock+1):
data = np.append(data, self.readBlock(block)[channel])
begsample -= begblock*n_samps
endsample -= begblock*n_samps
begsample -= begblock * n_samps
endsample -= begblock * n_samps
return data[begsample:(endsample+1)]

####################################################################################################
Expand All @@ -348,12 +350,13 @@ def getNSamples(self):
return self.chan_info['n_samps'] * self.meas_info['n_records']

def readSignal(self, chanindx):
begsample = 0;
endsample = self.chan_info['n_samps'][chanindx] * self.meas_info['n_records'] - 1;
begsample = 0
endsample = self.chan_info['n_samps'][chanindx] * self.meas_info['n_records'] - 1
return self.readSamples(chanindx, begsample, endsample)

####################################################################################################


if False:
file_in = EDFReader()
file_in.open('/Users/roboos/day 01[10.03].edf')
Expand Down