-
Notifications
You must be signed in to change notification settings - Fork 0
/
convert_eprime.py
376 lines (306 loc) · 14 KB
/
convert_eprime.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
# -*- coding: utf-8 -*-
"""
Created on Mon May 5 12:32:17 2014
Three functions for converting data generated by E-Prime experiment to more
useable csv format.
1. etext_to_rcsv: Converts exported "E-Prime text" file to reduced csv based
on desired column headers. Make sure, when exporting the edat file as
"E-Prime text", that Unicode is turned off.
2. text_to_csv: Converts text file produced by successful completion of
E-Prime experiment to csv. Output from text_to_csv can be used to deduce
information necessary for text_to_rcsv (e.g. columns to merge, columns to
rename, etc.). These variables would then be saved in the headers.pickle
file.
3. text_to_rcsv: Converts text file produced by successful completion of
E-Prime experiment to reduced csv, using information from the variables
contained in headers.pickle. Considerably more complex than text_to_csv,
but if used properly the output should be indistinguishable from the output
of etext_to_rcsv, only without the tedious step of exporting the
"E-Prime text" file by hand.
command line usage: python convert_eprime.py [function_name] [inputs]
@author: tsalo
"""
import os
import pickle
import inspect
import csv
import numpy.core.fromnumeric as fn
import sys
import pandas as pd
import numpy as np
# Read global variables from pickle file.
code_dir = os.path.dirname(os.path.abspath(inspect.stack()[0][1]))
with open(os.path.join(code_dir, "headers.pickle")) as file_:
[headers, remnulls, replace_dict, fill_block, merge_cols, merge_col_names,
null_cols] = pickle.load(file_)
def etext_to_rcsv(in_file, task):
"""
Reads exported "E-Prime text" file, reduces columns based on tasks-specific
list of headers, and writes out reduced csv.
Parameters
----------
in_file : str
Exported E-Prime text file to convert and reduce.
task : str
Task name, used with dictionary from headers.pickle file to determine
columns to keep.
Examples
----------
>>> file_ = "subj0001_stop_signal_task-0.txt"
>>> task = "TEST_STOPSIGNAL"
>>> ce.etext_to_rcsv(file_, task)
Output file successfully created- subj0001_stop_signal_task-0.csv
"""
header_list = headers.get(task)
filename, suffix = os.path.splitext(in_file)
if suffix == ".txt":
rem_lines = range(3)
delimiter_ = "\t"
elif suffix == ".csv":
rem_lines = []
delimiter_ = ","
else:
raise Exception("File not txt or csv: {0}".format(in_file))
df = pd.read_csv(in_file, skiprows=rem_lines, sep=delimiter_)
df = df[header_list]
if remnulls.get(task):
df = df.dropna(axis=0)
out_file = filename + ".csv"
df.to_csv(out_file, index=False)
print("Output file successfully created- {0}".format(out_file))
def text_to_csv(text_file, out_file):
"""
Converts text file produced by successful completion of E-Prime experiment
to csv. Output from text_to_csv can be used to determine information
necessary for text_to_rcsv (e.g. columns to merge, columns to rename,
etc.).
Parameters
----------
text_file : str
Raw E-Prime text file to convert.
out_file : str
Name of output file (csv format) to generate.
Examples
----------
>>> in_file = "subj0001_stop_signal_task-0.txt"
>>> out_file = "subj0001_0.csv"
>>> ce.text_to_csv(in_file, out_file)
Output file successfully created- subj0001_0.csv
"""
df = _text_to_df(text_file)
df.to_csv(out_file, index=False)
print("Output file successfully created- {0}".format(out_file))
def text_to_rcsv(text_file, edat_file, out_file, task):
"""
Converts text file produced by successful completion of E-Prime experiment
to reduced csv. Considerably more complex than text_to_csv.
Parameters
----------
text_file : str
Raw E-Prime text file to convert.
edat_file : str
Raw E-Prime edat file paired with text_file. Only used for its file
type, because sometimes files will differ between version of E-Prime
(edat vs. edat2 suffix).
out_file : str
Name of output file (csv format) to generate.
task : str
Task name, used with dictionary from headers.pickle file to determine
columns to keep.
Examples
----------
>>> in_file = "subj0001_stop_signal_task-0.txt"
>>> edat_file = "subj0001_stop_signal_task-0.edat2"
>>> out_file = "subj0001_0.csv"
>>> task = "TEST_STOPSIGNAL"
>>> ce.text_to_rcsv(in_file, edat_file, out_file, task)
Output file successfully created- subj0001_0.csv
"""
# [_, edat_suffix] = os.path.splitext(edat_file)
# header_list = headers.get(task)
# replacements = replace_dict.get(task).get(edat_suffix)
# Load the text file as a list.
with open(text_file, "r") as fo:
text_data = list(fo)
# Remove unicode characters.
filtered_data = [_strip(row) for row in text_data]
# remove empty rows
filtered_data = [string for string in filtered_data if string != '']
# Determine where rows begin and end.
start_index = [i_row for i_row, row in enumerate(filtered_data) if row == "*** LogFrame Start ***"]
end_index = [i_row for i_row, row in enumerate(filtered_data) if row == "*** LogFrame End ***"]
if (len(start_index) != len(end_index) or start_index[0] >= end_index[0]):
print("Warning: LogFrame Starts and Ends do not match up.")
n_rows = min(len(start_index), len(end_index))
# Find column headers and remove duplicates.
all_headers = []
data_by_rows = []
for i_row in range(n_rows):
one_row = filtered_data[start_index[i_row]+1:end_index[i_row]]
data_by_rows.append(one_row)
for j_col in range(len(one_row)):
split_header_idx = one_row[j_col].index(":")
all_headers.append(one_row[j_col][:split_header_idx])
unique_headers = list(set(all_headers))
# Preallocate list of lists composed of NULLs.
null_col = ["NULL"] * (n_rows+1)
data_matrix = [null_col[:] for i_col in range(len(unique_headers))]
# Fill list of lists with relevant data from data_by_rows and
# unique_headers.
for i_col in range(len(unique_headers)):
data_matrix[i_col][0] = unique_headers[i_col]
for i_row in range(n_rows):
for j_col in range(len(data_by_rows[i_row])):
split_header_idx = data_by_rows[i_row][j_col].index(":")
for k_header in range(len(unique_headers)):
if (data_by_rows[i_row][j_col][:split_header_idx] == unique_headers[k_header]):
data_matrix[k_header][i_row+1] = data_by_rows[i_row][j_col][split_header_idx+1:].lstrip()
# If a column is all NULLs except for the header and one value at the
# bottom, fill the column up with that bottom value.
# THIS SECTION NEEDS CLEANUP!
for i_col, col in enumerate(data_matrix):
rows_w_vals = [j_cell for j_cell, cell in enumerate(col) if cell != "NULL"]
if len(rows_w_vals) == 2 and (rows_w_vals[1] == len(col) - 1 or rows_w_vals[1] == len(col)-2 or rows_w_vals[1] == 1):
data_matrix[i_col][1:len(col)] = ([col[rows_w_vals[1]]] * (len(col)-1))
# elif any([header in col[0] for header in fill_block]):
# for null_row in range(1, len(rows_w_vals)):
# first = rows_w_vals[null_row-1] + 1
# last = rows_w_vals[null_row]
# n_rows_to_fill = len(range(rows_w_vals[null_row-1] + 1, rows_w_vals[null_row]))
# data_matrix[i_col][first:last] = (col[rows_w_vals[null_row]] * n_rows_to_fill)
data_matrix[i_col] = col[:len(col)-2]
# Transpose data_matrix.
t_data_matrix = _transpose(data_matrix)
# # Replace text headers with edat headers (replacement dict). Unnecessary if
# # your processing scripts are built around text files instead of edat
# # files.
# t_data_matrix[0] = [replacements.get(item, item) for item in t_data_matrix[0]]
#
# # Pare data_matrix down based on desired headers
# # Create list of columns with relevant headers.
# header_index = [t_data_matrix[0].index(header) for header in header_list]
#
# # Merge any columns that need to be merged.
# columns_to_merge = merge_cols.get(task)
# merge_col_names_list = merge_col_names.get(task)
# merged_data = []
# for i_merge in range(len(merge_col_names_list)):
# merge_col_nums = [t_data_matrix[0].index(hed) for hed in columns_to_merge[i_merge]]
# data_to_merge = [data_matrix[col] for col in merge_col_nums]
# merged_data.append(_merge_lists(data_to_merge, "all_else"))
# merged_data[i_merge][0] = merge_col_names_list[i_merge]
#
# out_matrix = [[t_data_matrix[i_row][col] for col in header_index] for i_row in range(fn.size(t_data_matrix, 0))]
#
# # Transpose merged_data and append them to out_matrix.
# if len(merged_data) != 0:
# t_merged_data = _transpose(merged_data)
# for i_row in range(len(out_matrix)):
# out_matrix[i_row] = out_matrix[i_row] + t_merged_data[i_row]
# # Create column from which null index will be created.
# # Remove all instances of NULL by creating an index of NULL occurrences
# # and removing them from out_matrix.
# null_column_names = null_cols.get(task)
# null_column_index = [header_index[header_list.index(column)] for column in null_column_names]
# nulls_to_merge = [data_matrix[col_num] for col_num in null_column_index]
# merged_nulls_list = _merge_lists(nulls_to_merge, "all_null")
# null_index = sorted([i_row for i_row in range(len(merged_nulls_list)) if merged_nulls_list[i_row] == "NULL"], reverse=True)
# [out_matrix.pop(null_row) for null_row in null_index]
try:
with open(out_file, "w", newline='') as fo:
file_ = csv.writer(fo)
for row in t_data_matrix:
file_.writerow(row)
print("Output file successfully created- {0}".format(out_file))
except IOError:
print("Can't open output file- {0}".format(out_file))
def _merge_lists(lists, option):
"""
Merges multiple lists into one list, with the default being the values of
the first list. It either replaces values with NULL if NULL is in that
position in another list or replaces NULL with values if values are in that
position in another list.
"""
if type(lists[0]) != list:
return lists
else:
merged = lists[0]
for i_col in range(len(lists)):
if option == "all_null":
merged = [lists[i_col][i_row] if lists[i_col][i_row] == "NULL"
else merged[i_row] for i_row in range(len(merged))]
elif option == "all_else":
merged = [lists[i_col][i_row] if lists[i_col][i_row] != "NULL"
else merged[i_row] for i_row in range(len(merged))]
return merged
def _strip(string):
"""
Removes unicode characters in string.
"""
return "".join([val for val in string if 31 < ord(val) < 127])
def _text_to_df(text_file):
# Load the text file as a list.
with open(text_file, "r") as fo:
text_data = list(fo)
# Remove unicode characters.
filtered_data = [_strip(row) for row in text_data]
# Determine where rows begin and end.
start_index = [i_row for i_row, row in enumerate(filtered_data) if row == "*** LogFrame Start ***"]
end_index = [i_row for i_row, row in enumerate(filtered_data) if row == "*** LogFrame End ***"]
if (len(start_index) != len(end_index) or start_index[0] >= end_index[0]):
print("Warning: LogFrame Starts and Ends do not match up.")
n_rows = min(len(start_index), len(end_index))
# Find column headers and remove duplicates.
all_headers = []
data_by_rows = []
for i_row in range(n_rows):
one_row = filtered_data[start_index[i_row]+1:end_index[i_row]]
data_by_rows.append(one_row)
for j_col in range(len(one_row)):
split_header_idx = one_row[j_col].index(":")
all_headers.append(one_row[j_col][:split_header_idx])
unique_headers = list(set(all_headers))
# Preallocate list of lists composed of NULLs.
data_matrix = np.empty((n_rows, len(unique_headers)), dtype=object)
data_matrix[:] = np.nan
# Fill list of lists with relevant data from data_by_rows and
# unique_headers.
for i_row in range(n_rows):
for j_col in range(len(data_by_rows[i_row])):
split_header_idx = data_by_rows[i_row][j_col].index(":")
for k_header in range(len(unique_headers)):
if (data_by_rows[i_row][j_col][:split_header_idx] == unique_headers[k_header]):
data_matrix[i_row, k_header] = data_by_rows[i_row][j_col][split_header_idx+1:].lstrip()
df = pd.DataFrame(columns=unique_headers, data=data_matrix)
return df
def _transpose(list_):
"""
Transposes a list of lists.
"""
transposed_ = [[row[col] for row in list_] for col in range(len(list_[0]))]
transposed = [col for col in transposed_ if col]
return transposed
def _try_index(list_, val):
"""
Indexes a list without throwing an error if the value isn't found.
"""
try:
return list_.index(val)
except:
print(val)
pass
if __name__ == "__main__":
"""
If called from the command line, the desired function should be the first
argument.
"""
function_name = sys.argv[1]
module_functions = [name for name, obj in inspect.getmembers(sys.modules[__name__])
if (inspect.isfunction(obj) and not name.startswith('_'))]
if function_name not in module_functions:
raise IOError("Function {0} not in convert_eprime.".format(function_name))
function = globals()[function_name]
n_args = len(inspect.getargspec(function).args)
if n_args != len(sys.argv) - 2:
raise IOError("Function {0} takes {1} args, not {2}.".format(function_name, n_args, len(sys.argv)-2))
function(*sys.argv[2:])