Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement flows text operations #77

Merged
merged 1 commit into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 170 additions & 0 deletions flows/src/main/java/com/softwaremill/jox/flows/ChunksUtf8Decoder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package com.softwaremill.jox.flows;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;


/**
* The general algorithm and some helper functions (with their comments) are copied from ox:
* see <a href="https://github.com/softwaremill/ox/blob/master/core/src/main/scala/ox/flow/FlowTextOps.scala">ox.flow.FlowTextOps</a>
* Which was copied from fs2: see fs2.text.decodeC <a href="https://github.com/typelevel/fs2/blob/9b1b27cf7a8d7027df852d890555b341da70ef9e/core/shared/src/main/scala/fs2/text.scala"">link</a>
* <p>
* Extracted to separate class for better readability
*/
class ChunksUtf8Decoder {
private static final int BOM_SIZE = 3; // const for UTF-8
private static final byte[] BOM_UTF8 = new byte[]{-17, -69, -65};

public static <T> Flow<String> decodeStringUtf8(FlowStage<T> flowStage) {
return Flows.usingEmit(emit -> {
final AtomicReference<State> state = new AtomicReference<>(State.ProcessBOM);
final AtomicReference<byte[]> buffer = new AtomicReference<>(null);

flowStage.run(t -> {
if (!(t instanceof byte[] bytes)) {
throw new IllegalArgumentException("requirement failed: method can be called only on flow containing byte[]");
}
byte[] newBuffer;
State newState;
if (state.get() == State.ProcessBOM) {
Map.Entry<byte[], State> processResult = processByteOrderMark(bytes, buffer.get());
newBuffer = processResult.getKey();
newState = processResult.getValue();
} else {
newBuffer = doPull(bytes, buffer.get(), emit);
newState = State.Pull;
}
buffer.set(newBuffer);
state.set(newState);
});
// A common case, worth checking in advance

if (buffer.get() != null && buffer.get().length > 0) {
emit.apply(new String(buffer.get(), StandardCharsets.UTF_8));
}
});
}

private static Map.Entry<byte[], State> processByteOrderMark(byte[] bytes, byte[] buffer) {
// A common case, worth checking in advance
if (buffer == null && bytes.length >= BOM_SIZE && startsWith(bytes, BOM_UTF8)) {
return Map.entry(bytes, State.Pull);
} else {
byte[] newBuffer0 = buffer == null ? new byte[0] : buffer;
byte[] newBuffer = Arrays.copyOf(newBuffer0, newBuffer0.length + bytes.length);
newBuffer = ByteBuffer.wrap(newBuffer).put(newBuffer0.length, bytes).array();
if (newBuffer.length >= BOM_SIZE) {
byte[] rem = startsWith(newBuffer, BOM_UTF8) ? drop(newBuffer, BOM_SIZE) : newBuffer;
return Map.entry(rem, State.Pull);
} else if (startsWith(newBuffer, take(BOM_UTF8, newBuffer.length))) {
return Map.entry(newBuffer, State.ProcessBOM); // we've accumulated less than the full BOM, let's pull some more
} else {
return Map.entry(newBuffer, State.Pull); // We've accumulated less than BOM size but we already know that these bytes aren't BOM
}
}
}

private static byte[] doPull(byte[] bytes, byte[] buffer, FlowEmit<String> output) throws Exception {
var result = processSingleChunk(buffer, bytes);
Optional<String> str = result.getKey();
if (str.isPresent()) {
output.apply(str.get());
}
return result.getValue();
}

private static Map.Entry<Optional<String>, byte[]> processSingleChunk(byte[] buffer, byte[] nextBytes) {
byte[] allBytes;
if (buffer == null || buffer.length == 0) {
allBytes = nextBytes;
} else {
allBytes = Arrays.copyOf(buffer, buffer.length + nextBytes.length);
allBytes = ByteBuffer.wrap(allBytes).put(buffer.length, nextBytes).array();
}

int splitAt = allBytes.length - lastIncompleteBytes(allBytes);
if (splitAt == allBytes.length) {
// in the common case of ASCII chars
// we are in this branch so the next buffer will
// be empty
return Map.entry(Optional.of(new String(allBytes, StandardCharsets.UTF_8)), new byte[0]);
} else if (splitAt == 0) {
return Map.entry(Optional.empty(), allBytes);
} else {
return Map.entry(
// character
Optional.of(new String(Arrays.copyOfRange(allBytes, 0, splitAt), StandardCharsets.UTF_8)),
// remaining bytes
Arrays.copyOfRange(allBytes, splitAt, allBytes.length)
);
}
}

/**
* Takes n elements from the beginning of the array and returns copy of the result
*/
private static byte[] take(byte[] a, int n) {
return Arrays.copyOfRange(a, 0, n);
}

/**
* Drops n elements from the beginning of the array and returns copy of the result
*/
private static byte[] drop(byte[] a, int n) {
return Arrays.copyOfRange(a, n, a.length);
}

/**
* Checks if array a starts with array b
*/
private static boolean startsWith(byte[] a, byte[] b) {
return ByteBuffer.wrap(a, 0, b.length).equals(ByteBuffer.wrap(b));
}

/*
* Copied from scala lib fs2 (fs2.text.decodeC.lastIncompleteBytes)
* Returns the length of an incomplete multi-byte sequence at the end of
* `bs`. If `bs` ends with an ASCII byte or a complete multi-byte sequence,
* 0 is returned.
*/
private static int lastIncompleteBytes(byte[] bs) {
int minIdx = Math.max(0, bs.length - 3);
int idx = bs.length - 1;
int counter = 0;
int res = 0;
while (minIdx <= idx) {
int c = continuationBytes(bs[idx]);
if (c >= 0) {
if (c != counter) {
res = counter + 1;
}
return res;
}
idx--;
counter++;
}
return res;
}

/*
* Copied from scala lib fs2 (fs2.text.decodeC.continuationBytes)
* Returns the number of continuation bytes if `b` is an ASCII byte or a
* leading byte of a multi-byte sequence, and -1 otherwise.
*/
private static int continuationBytes(byte b) {
if ((b & 0x80) == 0x00) return 0; // ASCII byte
else if ((b & 0xe0) == 0xc0) return 1; // leading byte of a 2 byte seq
else if ((b & 0xf0) == 0xe0) return 2; // leading byte of a 3 byte seq
else if ((b & 0xf8) == 0xf0) return 3; // leading byte of a 4 byte seq
else return -1; // continuation byte or garbage
}

// we start in the ProcessBOM state, and then transit to the Pull state
private enum State {
ProcessBOM, Pull
}
}
130 changes: 129 additions & 1 deletion flows/src/main/java/com/softwaremill/jox/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import static com.softwaremill.jox.structured.Scopes.unsupervised;
import static java.lang.Thread.sleep;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -55,7 +58,7 @@
* Running a flow is possible using one of the `run*` methods, such as {@link Flow#runToList}, {@link Flow#runToChannel} or {@link Flow#runFold}.
*/
public class Flow<T> {
protected final FlowStage<T> last;
final FlowStage<T> last;

public Flow(FlowStage<T> last) {
this.last = last;
Expand Down Expand Up @@ -677,6 +680,109 @@ public Flow<Void> drain() {
return Flows.usingEmit(_ -> last.run(_ -> {}));
}

/** Decodes a stream of chunks of bytes into UTF-8 Strings. This function is able to handle UTF-8 characters encoded on multiple bytes
* that are split across chunks.
*
* @return
* a flow of Strings decoded from incoming bytes.
*/
public Flow<String> decodeStringUtf8() {
return ChunksUtf8Decoder.decodeStringUtf8(last);
}

/**
* Encodes a flow of `String` into a flow of bytes using UTF-8.
*/
public Flow<byte[]> encodeUtf8() {
return map(s -> {
if (s instanceof String string) {
return string.getBytes(StandardCharsets.UTF_8);
}
throw new IllegalArgumentException("requirement failed: method can be called only on flow containing String");
});
}

/**
* Transforms a flow of byte arrays such that each emitted `String` is a text line from the input decoded using UTF-8 charset.
*
* @return
* a flow emitting lines read from the input byte chunks, assuming they represent text.
*/
public Flow<String> linesUtf8() {
return lines(StandardCharsets.UTF_8);
}

/**
* Transforms a flow of byte arrays such that each emitted `String` is a text line from the input.
*
* @param charset the charset to use for decoding the bytes into text.
* @return a flow emitting lines read from the input byte arrays, assuming they represent text.
*/
public Flow<String> lines(Charset charset) {
// buffer == Optional.empty() is a special state for handling empty chunks in onComplete, in order to tell them apart from empty lines
return mapStatefulConcat(Optional::<byte[]>empty,
(buffer, nextChunk) -> {
if (!byte[].class.isInstance(nextChunk)) {
throw new IllegalArgumentException("requirement failed: method can be called only on flow containing byte[]");
}
// get next incoming chunk
byte[] chunk = (byte[]) nextChunk;
if (chunk.length == 0) {
return Map.entry(Optional.empty(), Collections.emptyList());
}

// check if chunk contains newline character, if not proceed to the next chunk
int newLineIndex = getNewLineIndex(chunk);
if (newLineIndex == -1) {
if (buffer.isEmpty()) {
return Map.entry(Optional.of(chunk), Collections.emptyList());
}
var b = buffer.get();
byte[] newBuffer = Arrays.copyOf(b, b.length + chunk.length);
newBuffer = ByteBuffer.wrap(newBuffer).put(b.length, chunk).array();
return Map.entry(Optional.of(newBuffer), Collections.emptyList());
}

// buffer for lines, if chunk contains more than one newline character
List<byte[]> lines = new ArrayList<>();

// variable used to clear buffer after using it
byte[] bufferFromPreviousChunk = buffer.orElse(new byte[0]);
while (chunk.length > 0 && newLineIndex != -1) {
byte[] line = new byte[newLineIndex];
byte[] newChunk = new byte[chunk.length - newLineIndex - 1];
ByteBuffer.wrap(chunk)
.get(line, 0, newLineIndex)
.get(newLineIndex + 1, newChunk, 0, chunk.length - newLineIndex - 1);

if (bufferFromPreviousChunk.length > 0) {
// concat accumulated buffer and line
byte[] buf = Arrays.copyOf(bufferFromPreviousChunk, bufferFromPreviousChunk.length + line.length);
lines.add(ByteBuffer.wrap(buf).put(bufferFromPreviousChunk.length, line).array());
// cleanup buffer
bufferFromPreviousChunk = new byte[0];
} else {
lines.add(line);
}
chunk = newChunk;
newLineIndex = getNewLineIndex(chunk);
}
return Map.entry(Optional.of(chunk), lines);
},
buf -> buf
)
.map(chunk -> new String(chunk, charset));
}

private int getNewLineIndex(byte[] chunk) {
for (int i = 0; i < chunk.length; i++) {
if (chunk[i] == '\n') {
return i;
}
}
return -1;
}

/**
* Always runs `f` after the flow completes, whether it's because all elements are emitted, or when there's an error.
*/
Expand Down Expand Up @@ -929,6 +1035,28 @@ public <U> Flow<U> interleave(Flow<U> other, int segmentSize, boolean eagerCompl
return Flows.interleaveAll(Arrays.asList((Flow<U>) this, other), segmentSize, eagerComplete, bufferCapacity);
}

/**
* Emits a given number of elements (determined by `segmentSize`) from this flow to the returned flow, then emits the same number of
* elements from the `other` flow and repeats. The order of elements in both flows is preserved.
* <p>
* If one of the flows is done before the other, the behavior depends on the `eagerComplete` flag. When set to `true`, the returned flow is
* completed immediately, otherwise the remaining elements from the other flow are emitted by the returned flow.
* <p>
* Both flows are run concurrently and asynchronously. The size of used buffer is determined by the {@link Channel#BUFFER_SIZE} that is in scope, or default {@link Channel#DEFAULT_BUFFER_SIZE} is used.
*
* @param other
* The flow whose elements will be interleaved with the elements of this flow.
* @param segmentSize
* The number of elements sent from each flow before switching to the other one.
* @param eagerComplete
* If `true`, the returned flow is completed as soon as either of the flow completes. If `false`, the remaining elements of the
* non-completed flow are sent downstream.
*/
public <U> Flow<U> interleave(Flow<U> other, int segmentSize, boolean eagerComplete) {
//noinspection unchecked
return Flows.interleaveAll(Arrays.asList((Flow<U>) this, other), segmentSize, eagerComplete);
}

/**
* Applies the given mapping function `f`, to each element emitted by this source, transforming it into an `Iterable` of results,
* then the returned flow emits the results one by one. Can be used to unfold incoming sequences of elements into single elements.
Expand Down
22 changes: 22 additions & 0 deletions flows/src/main/java/com/softwaremill/jox/flows/Flows.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,28 @@ public static <T> Flow<T> failed(Exception t) {
});
}

/**
* Sends a given number of elements (determined by `segmentSize`) from each flow in `flows` to the returned flow and repeats. The order
* of elements in all flows is preserved.
* <p>
* If any of the flows is done before the others, the behavior depends on the `eagerComplete` flag. When set to `true`, the returned flow
* is completed immediately, otherwise the interleaving continues with the remaining non-completed flows. Once all but one flows are
* complete, the elements of the remaining non-complete flow are emitted by the returned flow.
* <p>
* The provided flows are run concurrently and asynchronously. The size of used buffer is determined by the {@link Channel#BUFFER_SIZE} that is in scope, or default {@link Channel#DEFAULT_BUFFER_SIZE} is used.
*
* @param flows
* The flows whose elements will be interleaved.
* @param segmentSize
* The number of elements sent from each flow before switching to the next one.
* @param eagerComplete
* If `true`, the returned flow is completed as soon as any of the flows completes. If `false`, the interleaving continues with the
* remaining non-completed flows.
*/
public static <T> Flow<T> interleaveAll(List<Flow<T>> flows, int segmentSize, boolean eagerComplete) {
return interleaveAll(flows, segmentSize, eagerComplete, Channel.BUFFER_SIZE.orElse(Channel.DEFAULT_BUFFER_SIZE));
}

/**
* Sends a given number of elements (determined by `segmentSize`) from each flow in `flows` to the returned flow and repeats. The order
* of elements in all flows is preserved.
Expand Down
Loading
Loading