Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor to prepare for fixed-width column support. #219

Merged
merged 2 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/main/java/io/deephaven/csv/containers/ByteSlice.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,20 @@ public CharSequence subSequence(final int start, final int end) {
return new ByteSlice(data, newBegin, newEnd);
}

/**
* Trim the padding bytes from the front and back of the slice.
*
* @param padding The padding byte.
*/
public void trimPadding(byte padding) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to be unused (even when reviewing other PR).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, deleted

while (begin != end && data[begin] == padding) {
++begin;
}
while (begin != end && data[end - 1] == padding) {
--end;
}
}

@Override
@NotNull
public String toString() {
Expand Down
109 changes: 23 additions & 86 deletions src/main/java/io/deephaven/csv/reading/CsvReader.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package io.deephaven.csv.reading;

import io.deephaven.csv.CsvSpecs;
import io.deephaven.csv.containers.ByteSlice;
import io.deephaven.csv.densestorage.DenseStorageReader;
import io.deephaven.csv.densestorage.DenseStorageWriter;
import io.deephaven.csv.parsers.DataType;
import io.deephaven.csv.parsers.Parser;
import io.deephaven.csv.reading.cells.CellGrabber;
import io.deephaven.csv.reading.cells.DelimitedCellGrabber;
import io.deephaven.csv.reading.headers.DelimitedHeaderFinder;
import io.deephaven.csv.sinks.Sink;
import io.deephaven.csv.sinks.SinkFactory;
import io.deephaven.csv.util.*;
Expand Down Expand Up @@ -54,22 +56,29 @@ private CsvReader() {}
* the CsvReader determines what the column type is, it will use the {@link SinkFactory} to create an
* appropriate Sink<T> for the type. Note that the CsvReader might guess wrong, so it might create a
* Sink, partially populate it, and then abandon it. The final set of fully-populated Sinks will be returned
* in in the CsvReader.Result. Thread safety: The {@link SinkFactory} may be invoked concurrently, therefore
* it must be thread safe.
* in the CsvReader.Result. Thread safety: The {@link SinkFactory} may be invoked concurrently, therefore it
* must be thread safe.
* @return A CsvReader.Result containing the column names, the number of columns, and the final set of
* fully-populated Sinks.
*/
public static Result read(final CsvSpecs specs, final InputStream stream, final SinkFactory sinkFactory)
throws CsvReaderException {
return delimitedReadLogic(specs, stream, sinkFactory);
}

private static Result delimitedReadLogic(
final CsvSpecs specs, final InputStream stream, final SinkFactory sinkFactory)
throws CsvReaderException {
// These two have already been validated by CsvSpecs to be 7-bit ASCII.
final byte quoteAsByte = (byte) specs.quote();
final byte delimiterAsByte = (byte) specs.delimiter();
final CellGrabber grabber =
new CellGrabber(stream, quoteAsByte, delimiterAsByte, specs.ignoreSurroundingSpaces(),
new DelimitedCellGrabber(stream, quoteAsByte, delimiterAsByte, specs.ignoreSurroundingSpaces(),
specs.trim());
// For an "out" parameter
final MutableObject<byte[][]> firstDataRowHolder = new MutableObject<>();
final String[] headersTemp = determineHeadersToUse(specs, grabber, firstDataRowHolder);
final String[] headersTemp = DelimitedHeaderFinder.determineHeadersToUse(specs, grabber,
firstDataRowHolder);
final byte[][] firstDataRow = firstDataRowHolder.getValue();
final int numInputCols = headersTemp.length;

Expand All @@ -85,6 +94,14 @@ public static Result read(final CsvSpecs specs, final InputStream stream, final
final int numOutputCols = headersTemp2.length;
final String[] headersToUse = canonicalizeHeaders(specs, headersTemp2);

return commonReadLogic(specs, grabber, firstDataRow, numInputCols, numOutputCols, headersToUse, sinkFactory);
}


private static Result commonReadLogic(final CsvSpecs specs, CellGrabber grabber, byte[][] optionalFirstDataRow,
int numInputCols, int numOutputCols,
String[] headersToUse, final SinkFactory sinkFactory)
throws CsvReaderException {
final String[][] nullValueLiteralsToUse = new String[numOutputCols][];
for (int ii = 0; ii < numOutputCols; ++ii) {
nullValueLiteralsToUse[ii] =
Expand Down Expand Up @@ -120,7 +137,7 @@ public static Result read(final CsvSpecs specs, final InputStream stream, final

// Start the writer.
final Future<Object> numRowsFuture = ecs.submit(() -> ParseInputToDenseStorage.doit(headersToUse,
firstDataRow, grabber, specs, nullValueLiteralsToUse, dsws));
optionalFirstDataRow, grabber, specs, nullValueLiteralsToUse, dsws));

// Start the readers, taking care to not hold a reference to the DenseStorageReader.
final ArrayList<Future<Object>> sinkFutures = new ArrayList<>();
Expand Down Expand Up @@ -199,62 +216,6 @@ private static List<String> calcNullValueLiteralsToUse(final CsvSpecs specs, fin
return specs.nullValueLiterals();
}

/**
* Determine which headers to use. The result comes from either the first row of the file or the user-specified
* overrides.
*/
private static String[] determineHeadersToUse(final CsvSpecs specs,
final CellGrabber grabber, final MutableObject<byte[][]> firstDataRowHolder)
throws CsvReaderException {
String[] headersToUse = null;
if (specs.hasHeaderRow()) {
long skipCount = specs.skipHeaderRows();
byte[][] headerRow;
while (true) {
headerRow = tryReadOneRow(grabber);
if (headerRow == null) {
throw new CsvReaderException(
"Can't proceed because hasHeaders is set but input file is empty");
}
if (skipCount == 0) {
break;
}
--skipCount;
}
headersToUse = Arrays.stream(headerRow).map(String::new).toArray(String[]::new);
}

// Whether or not the input had headers, maybe override with client-specified headers.
if (specs.headers().size() != 0) {
headersToUse = specs.headers().toArray(new String[0]);
}

// If we still have nothing, try to generate synthetic column headers (works only if the file is
// non-empty, because we need to infer the column count).
final byte[][] firstDataRow;
if (headersToUse == null) {
firstDataRow = tryReadOneRow(grabber);
if (firstDataRow == null) {
throw new CsvReaderException(
"Can't proceed because input file is empty and client has not specified headers");
}
headersToUse = new String[firstDataRow.length];
for (int ii = 0; ii < headersToUse.length; ++ii) {
headersToUse[ii] = "Column" + (ii + 1);
}
} else {
firstDataRow = null;
}

// Apply column specific overrides.
for (Map.Entry<Integer, String> entry : specs.headerForIndex().entrySet()) {
headersToUse[entry.getKey()] = entry.getValue();
}

firstDataRowHolder.setValue(firstDataRow);
return headersToUse;
}

private static String[] canonicalizeHeaders(CsvSpecs specs, final String[] headers) throws CsvReaderException {
final String[] legalized = specs.headerLegalizer().apply(headers);
final Set<String> unique = new HashSet<>();
Expand Down Expand Up @@ -286,30 +247,6 @@ private static String[] canonicalizeHeaders(CsvSpecs specs, final String[] heade
throw new CsvReaderException(sb.toString());
}

/**
* Try to read one row from the input. Returns null if the input is empty
*
* @return The first row as a byte[][] or null if the input was exhausted.
*/
private static byte[][] tryReadOneRow(final CellGrabber grabber) throws CsvReaderException {
final List<byte[]> headers = new ArrayList<>();

// Grab the header
final ByteSlice slice = new ByteSlice();
final MutableBoolean lastInRow = new MutableBoolean();
final MutableBoolean endOfInput = new MutableBoolean();
do {
grabber.grabNext(slice, lastInRow, endOfInput);
final byte[] item = new byte[slice.size()];
slice.copyTo(item, 0);
headers.add(item);
} while (!lastInRow.booleanValue());
if (headers.size() == 1 && headers.get(0).length == 0 && endOfInput.booleanValue()) {
return null;
}
return headers.toArray(new byte[0][]);
}

/** Result of {@link #read}. Represents a set of columns. */
public static final class Result implements Iterable<ResultColumn> {
private final long numRows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.deephaven.csv.containers.ByteSlice;
import io.deephaven.csv.densestorage.DenseStorageReader;
import io.deephaven.csv.densestorage.DenseStorageWriter;
import io.deephaven.csv.reading.cells.CellGrabber;
import io.deephaven.csv.util.CsvReaderException;
import io.deephaven.csv.util.MutableBoolean;

Expand Down
93 changes: 93 additions & 0 deletions src/main/java/io/deephaven/csv/reading/ReaderUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package io.deephaven.csv.reading;

import io.deephaven.csv.containers.ByteSlice;
import io.deephaven.csv.tokenization.RangeTests;
import io.deephaven.csv.util.MutableInt;

public class ReaderUtil {
public static String[] makeSyntheticHeaders(int numHeaders) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be moved / made private to the single callsite?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. There were supposed to be two call sites. Now there are :-). Thanks, good catch.

final String[] result = new String[numHeaders];
for (int ii = 0; ii < result.length; ++ii) {
result[ii] = "Column" + (ii + 1);
}
return result;
}

/**
* Trim whitespace from the front and back of the slice.
*
* @param cs The slice, modified in-place to have whitespace (if any) removed.
*/
public static void trimWhitespace(final ByteSlice cs) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This becoming newly public, I'm going to complain about the name "whitespace", as that has a more strict definition, see java.lang.Character#isWhitespace(char).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Renamed to trimSpacesAndTabs. PTAL

final byte[] data = cs.data();
int begin = cs.begin();
int end = cs.end();
while (begin != end && RangeTests.isSpaceOrTab(data[begin])) {
++begin;
}
while (begin != end && RangeTests.isSpaceOrTab(data[end - 1])) {
--end;
}
cs.reset(data, begin, end);
}

/**
* Get the expected length of a UTF-8 sequence, given its first byte, and its corresponding length in the specified
* units (UTF-16 or UTF-32).
*
* @param firstByte The first byte of the UTF-8 sequence.
* @param numBytes The number of remaining bytes in the input field (including firstByte). If the UTF-8 sequence
* specifies a number of bytes larger than the number of remaining bytes, an exception is thrown.
* @param useUtf32CountingConvention Whether 'charCountResult' should be in units of UTF-32 or UTF-16.
* @param charCountResult The number of UTF-32 or UTF-16 units specified by the UTF-8 character.
* @return The length of the UTF-8 sequence.
*/
public static int getUtf8LengthAndCharLength(
byte firstByte, int numBytes,
boolean useUtf32CountingConvention, MutableInt charCountResult) {
final int utf8Length = getUtf8Length(firstByte);
if (utf8Length > numBytes) {
throw new RuntimeException(String.format(
"The next UTF-8 character needs %d bytes but there are only %d left in the field",
utf8Length, numBytes));
}
final int numChars = useUtf32CountingConvention || utf8Length < 4 ? 1 : 2;
charCountResult.setValue(numChars);
return utf8Length;
}

/**
* Calculate the expected length of a UTF-8 sequence, given its first byte.
*
* @param firstByte The first byte of the sequence.
* @return The length of the sequence, in the range 1..4 inclusive.
*/
private static int getUtf8Length(byte firstByte) {
if ((firstByte & 0x80) == 0) {
// 0xxxxxxx
// 1-byte UTF-8 character aka ASCII.
// Last code point U+007F
return 1;
}
if ((firstByte & 0xE0) == 0xC0) {
// 110xxxxx
// 2-byte UTF-8 character
// Last code point U+07FF
return 2;
}
if ((firstByte & 0xF0) == 0xE0) {
// 1110xxxx
// 3-byte UTF-8 character
// Last code point U+FFFF
return 3;
}
if ((firstByte & 0xF8) == 0xF0) {
// 11110xxx
// 4-byte UTF-8 character. Note: Java encodes all of these in two "char" variables.
// Last code point U+10FFFF
return 4;
}
throw new IllegalStateException(String.format("0x%x is not a valid starting byte for a UTF-8 sequence",
firstByte));
}
}
31 changes: 31 additions & 0 deletions src/main/java/io/deephaven/csv/reading/cells/CellGrabber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.deephaven.csv.reading.cells;

import io.deephaven.csv.containers.ByteSlice;
import io.deephaven.csv.util.CsvReaderException;
import io.deephaven.csv.util.MutableBoolean;

/**
* This class is used to traverse over text from a Reader, understanding both field and line delimiters, as well as the
* CSV quoting convention, and breaking the text into cells for use by the calling code.
*/
public interface CellGrabber {
/**
* Try to grab the next cell from the input, being aware of field delimiters, line delimiters, quoting, and
* trimming.
*
* @param dest The result, as a {@link ByteSlice}. The ByteSlice is invalidated by the next call to grabNext.
* @param lastInRow An out parameter which will be set to true if the cell just read was the last cell in the row,
* otherwise it will be set to false.
* @param endOfInput An out parameter which will be set to true if the cell just read encountered the end of the
* input, otherwise it will be set to false.
*/
void grabNext(final ByteSlice dest, final MutableBoolean lastInRow,
final MutableBoolean endOfInput) throws CsvReaderException;

/**
* Returns the "physical" row number, that is the row number of the input file. This differs from the "logical" row
* number, which is the row number of the CSV data being processed. The difference arises when, due to quotation
* marks, a single CSV row can span multiple lines of input.
*/
int physicalRowNum();
}
Loading