Skip to content

Commit

Permalink
rf: Read TSV files as streams
Browse files Browse the repository at this point in the history
  • Loading branch information
effigies committed Jan 10, 2025
1 parent 1e6d650 commit f36d1fb
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 33 deletions.
67 changes: 67 additions & 0 deletions src/files/tsv.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { assert, assertEquals, assertObjectMatch } from '@std/assert'
import { pathToFile } from './filetree.ts'
import { loadTSV } from './tsv.ts'
import { streamFromString } from '../tests/utils.ts'
import { ColumnsMap } from '../types/columns.ts'

Deno.test('TSV loading', async (t) => {
await t.step('Empty file produces empty map', async () => {
const file = pathToFile('/empty.tsv')
file.stream = streamFromString('')

const map = await loadTSV(file)
// map.size looks for a column called map, so work around it
assertEquals(Object.keys(map).length, 0)
})

await t.step('Single row file produces header-only map', async () => {
const file = pathToFile('/single_row.tsv')
file.stream = streamFromString('a\tb\tc\n')

const map = await loadTSV(file)
assertEquals(map.a, [])
assertEquals(map.b, [])
assertEquals(map.c, [])
})

await t.step('Single column file produces single column map', async () => {
const file = pathToFile('/single_column.tsv')
file.stream = streamFromString('a\n1\n2\n3\n')

const map = await loadTSV(file)
assertEquals(map.a, ['1', '2', '3'])
})

await t.step('Missing final newline is ignored', async () => {
const file = pathToFile('/missing_newline.tsv')
file.stream = streamFromString('a\n1\n2\n3')

const map = await loadTSV(file)
assertEquals(map.a, ['1', '2', '3'])
})

await t.step('Empty row throws issue', async () => {
const file = pathToFile('/empty_row.tsv')
file.stream = streamFromString('a\tb\tc\n1\t2\t3\n\n4\t5\t6\n')

try {
await loadTSV(file)
} catch (e: any) {
assertObjectMatch(e, { key: 'TSV_EMPTY_LINE', line: 3 })
}
})

await t.step('Mismatched row length throws issue', async () => {
const file = pathToFile('/mismatched_row.tsv')
file.stream = streamFromString('a\tb\tc\n1\t2\t3\n4\t5\n')

try {
await loadTSV(file)
} catch (e: any) {
assertObjectMatch(e, { key: 'TSV_EQUAL_ROWS', line: 3 })
}
})

// Tests will have populated the memoization cache
await loadTSV.cache.clear()
})
72 changes: 43 additions & 29 deletions src/files/tsv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,58 @@
* TSV
* Module for parsing TSV
*/
import { TextLineStream } from '@std/streams'
import { ColumnsMap } from '../types/columns.ts'
import type { BIDSFile } from '../types/filetree.ts'
import { filememoizeAsync } from '../utils/memoize.ts'
import type { WithCache } from '../utils/memoize.ts'

const normalizeEOL = (str: string): string => str.replace(/\r\n/g, '\n').replace(/\r/g, '\n')
// Typescript resolved `row && !/^\s*$/.test(row)` as `string | boolean`
const isContentfulRow = (row: string): boolean => !!(row && !/^\s*$/.test(row))
import { createUTF8Stream } from './streams.ts'

async function _loadTSV(file: BIDSFile): Promise<ColumnsMap> {
return await file.text().then(parseTSV)
}
const reader = file.stream
.pipeThrough(createUTF8Stream())
.pipeThrough(new TextLineStream())
.getReader()

export const loadTSV = filememoizeAsync(_loadTSV)
try {
const headerRow = await reader.read()
const headers = (headerRow.done || !headerRow.value) ? [] : headerRow.value.split('\t')

function parseTSV(contents: string) {
const columns = new ColumnsMap()
const rows: string[][] = normalizeEOL(contents)
.split('\n')
.filter(isContentfulRow)
.map((str) => str.split('\t'))
const headers = rows.length ? rows[0] : []
// Initialize columns in array for construction efficiency
const initialCapacity = 1000
const columns: string[][] = headers.map(() => new Array<string>(initialCapacity))

if (rows.some((row) => row.length !== headers.length)) {
throw { key: 'TSV_EQUAL_ROWS' }
}
let rowIndex = 0 // Keep in scope after loop
for (; ; rowIndex++) {
const { done, value } = await reader.read()
if (done) break

headers.map((x) => {
columns[x] = []
})
if (headers.length !== Object.keys(columns).length) {
throw { key: 'TSV_COLUMN_HEADER_DUPLICATE', evidence: headers.join(', ') }
}
for (let i = 1; i < rows.length; i++) {
for (let j = 0; j < headers.length; j++) {
const col = columns[headers[j]] as string[]
col.push(rows[i][j])
// Expect a newline at the end of the file, but otherwise error on empty lines
if (!value) {
const nextRow = await reader.read()
if (nextRow.done) break
throw { key: 'TSV_EMPTY_LINE', line: rowIndex + 2 }
}

const values = value.split('\t')
if (values.length !== headers.length) {
throw { key: 'TSV_EQUAL_ROWS', line: rowIndex + 2 }
}
columns.forEach((column, columnIndex) => {
// Double array size if we exceed the current capacity
if (rowIndex >= column.length) {
column.length = column.length * 2
}
column[rowIndex] = values[columnIndex]
})
}

// Construct map, truncating columns to number of rows read
return new ColumnsMap(
headers.map((header, index) => [header, columns[index].slice(0, rowIndex)]),
)
} finally {
reader.releaseLock()
}
return columns
}

export const loadTSV = filememoizeAsync(_loadTSV)
4 changes: 4 additions & 0 deletions src/issues/list.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ export const bidsIssues: IssueDefinitionRecord = {
severity: 'error',
reason: 'All rows must have the same number of columns as there are headers.',
},
TSV_EMPTY_LINE: {
severity: 'error',
reason: 'An empty line was found in the TSV file.',
},
TSV_COLUMN_MISSING: {
severity: 'error',
reason: 'A required column is missing',
Expand Down
5 changes: 3 additions & 2 deletions src/tests/regression.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { assert } from '@std/assert'
import { pathsToTree } from '../files/filetree.ts'
import { validate } from '../validators/bids.ts'
import type { BIDSFile } from '../types/filetree.ts'
import { streamFromString } from './utils.ts'

Deno.test('Regression tests', async (t) => {
await t.step('Verify ignored files in scans.tsv do not trigger error', async () => {
Expand All @@ -17,7 +18,7 @@ Deno.test('Regression tests', async (t) => {
// Without ignore, NOT_INCLUDED is triggered for CT, but the scans file is happy
let ds = pathsToTree(paths)
let scans_tsv = ds.get('sub-01/sub-01_scans.tsv') as BIDSFile
scans_tsv.text = () => Promise.resolve(scans_content)
scans_tsv.stream = streamFromString(scans_content)
let result = await validate(ds, {
datasetPath: '/dataset',
debug: 'ERROR',
Expand All @@ -30,7 +31,7 @@ Deno.test('Regression tests', async (t) => {
// With ignore, NOT_INCLUDED is not triggered for CT, and the scans file is still happy
ds = pathsToTree(paths, ignore)
scans_tsv = ds.get('sub-01/sub-01_scans.tsv') as BIDSFile
scans_tsv.text = () => Promise.resolve(scans_content)
scans_tsv.stream = streamFromString(scans_content)
result = await validate(ds, {
datasetPath: '/dataset',
debug: 'ERROR',
Expand Down
4 changes: 2 additions & 2 deletions src/types/columns.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// Allow ColumnsMap to be accessed as an object too
export class ColumnsMap extends Map<string, string[]> {
[key: string]: Map<string, string[]>[keyof Map<string, string[]>] | string[]
constructor() {
constructor(iterable?: Iterable<readonly [string, string[]]>) {
super()
const columns = new Map<string, string[]>() as ColumnsMap
const columns = new Map<string, string[]>(iterable) as ColumnsMap
return new Proxy<ColumnsMap>(columns, columnMapAccessorProxy)
}
}
Expand Down

0 comments on commit f36d1fb

Please sign in to comment.