Skip to content

Commit

Permalink
Implement flows text operations
Browse files Browse the repository at this point in the history
  • Loading branch information
emil-bar committed Dec 30, 2024
1 parent 2498438 commit f564246
Show file tree
Hide file tree
Showing 5 changed files with 572 additions and 1 deletion.
169 changes: 169 additions & 0 deletions flows/src/main/java/com/softwaremill/jox/flows/ChunksUtf8Decoder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package com.softwaremill.jox.flows;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;


/**
* The general algorithm and some helper functions (with their comments) are copied from ox: see ox.flow.FlowTextOps
* Which was copied from fs2: see fs2.text.decodeC <a href="https://github.com/typelevel/fs2/blob/9b1b27cf7a8d7027df852d890555b341da70ef9e/core/shared/src/main/scala/fs2/text.scala"">link</a>
* <p>
* Extracted to separate class for better readability
*/
class ChunksUtf8Decoder {
private static final int BOM_SIZE = 3; // const for UTF-8
private static final byte[] BOM_UTF8 = new byte[]{-17, -69, -65};

public static <T> Flow<String> decodeStringUtf8(FlowStage<T> flowStage) {
return Flows.usingEmit(emit -> {
final AtomicReference<State> state = new AtomicReference<>(State.ProcessBOM);
final AtomicReference<byte[]> buffer = new AtomicReference<>(null);

flowStage.run(t -> {
if (!(t instanceof byte[] bytes)) {
throw new IllegalArgumentException("requirement failed: method can be called only on flow containing byte[]");
}
byte[] newBuffer;
State newState;
if (state.get() == State.ProcessBOM) {
Map.Entry<byte[], State> processResult = processByteOrderMark(bytes, buffer.get());
newBuffer = processResult.getKey();
newState = processResult.getValue();
} else {
newBuffer = doPull(bytes, buffer.get(), emit);
newState = State.Pull;
}
buffer.set(newBuffer);
state.set(newState);
});
// A common case, worth checking in advance

if (buffer.get() != null && buffer.get().length > 0) {
emit.apply(new String(buffer.get(), StandardCharsets.UTF_8));
}
});
}

private static Map.Entry<byte[], State> processByteOrderMark(byte[] bytes, byte[] buffer) {
// A common case, worth checking in advance
if (buffer == null && bytes.length >= BOM_SIZE && startsWith(bytes, BOM_UTF8)) {
return Map.entry(bytes, State.Pull);
} else {
byte[] newBuffer0 = buffer == null ? new byte[0] : buffer;
byte[] newBuffer = Arrays.copyOf(newBuffer0, newBuffer0.length + bytes.length);
newBuffer = ByteBuffer.wrap(newBuffer).put(newBuffer0.length, bytes).array();
if (newBuffer.length >= BOM_SIZE) {
byte[] rem = startsWith(newBuffer, BOM_UTF8) ? drop(newBuffer, BOM_SIZE) : newBuffer;
return Map.entry(rem, State.Pull);
} else if (startsWith(newBuffer, take(BOM_UTF8, newBuffer.length))) {
return Map.entry(newBuffer, State.ProcessBOM); // we've accumulated less than the full BOM, let's pull some more
} else {
return Map.entry(newBuffer, State.Pull); // We've accumulated less than BOM size but we already know that these bytes aren't BOM
}
}
}

private static byte[] doPull(byte[] bytes, byte[] buffer, FlowEmit<String> output) throws Exception {
var result = processSingleChunk(buffer, bytes);
Optional<String> str = result.getKey();
if (str.isPresent()) {
output.apply(str.get());
}
return result.getValue();
}

private static Map.Entry<Optional<String>, byte[]> processSingleChunk(byte[] buffer, byte[] nextBytes) {
byte[] allBytes;
if (buffer == null || buffer.length == 0) {
allBytes = nextBytes;
} else {
allBytes = Arrays.copyOf(buffer, buffer.length + nextBytes.length);
allBytes = ByteBuffer.wrap(allBytes).put(buffer.length, nextBytes).array();
}

int splitAt = allBytes.length - lastIncompleteBytes(allBytes);
if (splitAt == allBytes.length) {
// in the common case of ASCII chars
// we are in this branch so the next buffer will
// be empty
return Map.entry(Optional.of(new String(allBytes, StandardCharsets.UTF_8)), new byte[0]);
} else if (splitAt == 0) {
return Map.entry(Optional.empty(), allBytes);
} else {
return Map.entry(
// character
Optional.of(new String(Arrays.copyOfRange(allBytes, 0, splitAt), StandardCharsets.UTF_8)),
// remaining bytes
Arrays.copyOfRange(allBytes, splitAt, allBytes.length)
);
}
}

/**
* Takes n elements from the beginning of the array and returns copy of the result
*/
private static byte[] take(byte[] a, int n) {
return Arrays.copyOfRange(a, 0, n);
}

/**
* Drops n elements from the beginning of the array and returns copy of the result
*/
private static byte[] drop(byte[] a, int n) {
return Arrays.copyOfRange(a, n, a.length);
}

/**
* Checks if array a starts with array b
*/
private static boolean startsWith(byte[] a, byte[] b) {
return ByteBuffer.wrap(a, 0, b.length).equals(ByteBuffer.wrap(b));
}

/*
* Copied from scala lib fs2 (fs2.text.decodeC.lastIncompleteBytes)
* Returns the length of an incomplete multi-byte sequence at the end of
* `bs`. If `bs` ends with an ASCII byte or a complete multi-byte sequence,
* 0 is returned.
*/
private static int lastIncompleteBytes(byte[] bs) {
int minIdx = Math.max(0, bs.length - 3);
int idx = bs.length - 1;
int counter = 0;
int res = 0;
while (minIdx <= idx) {
int c = continuationBytes(bs[idx]);
if (c >= 0) {
if (c != counter) {
res = counter + 1;
}
return res;
}
idx--;
counter++;
}
return res;
}

/*
* Copied from scala lib fs2 (fs2.text.decodeC.continuationBytes)
* Returns the number of continuation bytes if `b` is an ASCII byte or a
* leading byte of a multi-byte sequence, and -1 otherwise.
*/
private static int continuationBytes(byte b) {
if ((b & 0x80) == 0x00) return 0; // ASCII byte
else if ((b & 0xe0) == 0xc0) return 1; // leading byte of a 2 byte seq
else if ((b & 0xf0) == 0xe0) return 2; // leading byte of a 3 byte seq
else if ((b & 0xf8) == 0xf0) return 3; // leading byte of a 4 byte seq
else return -1; // continuation byte or garbage
}

// we start in the ProcessBOM state, and then transit to the Pull state
private enum State {
ProcessBOM, Pull
}
}
130 changes: 129 additions & 1 deletion flows/src/main/java/com/softwaremill/jox/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import static com.softwaremill.jox.structured.Scopes.unsupervised;
import static java.lang.Thread.sleep;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -55,7 +58,7 @@
* Running a flow is possible using one of the `run*` methods, such as {@link Flow#runToList}, {@link Flow#runToChannel} or {@link Flow#runFold}.
*/
public class Flow<T> {
protected final FlowStage<T> last;
final FlowStage<T> last;

public Flow(FlowStage<T> last) {
this.last = last;
Expand Down Expand Up @@ -677,6 +680,109 @@ public Flow<Void> drain() {
return Flows.usingEmit(_ -> last.run(_ -> {}));
}

/** Decodes a stream of chunks of bytes into UTF-8 Strings. This function is able to handle UTF-8 characters encoded on multiple bytes
* that are split across chunks.
*
* @return
* a flow of Strings decoded from incoming bytes.
*/
public Flow<String> decodeStringUtf8() {
return ChunksUtf8Decoder.decodeStringUtf8(last);
}

/**
* Encodes a flow of `String` into a flow of bytes using UTF-8.
*/
public Flow<byte[]> encodeUtf8() {
return map(s -> {
if (s instanceof String string) {
return string.getBytes(StandardCharsets.UTF_8);
}
throw new IllegalArgumentException("requirement failed: method can be called only on flow containing String");
});
}

/**
* Transforms a flow of byte arrays such that each emitted `String` is a text line from the input decoded using UTF-8 charset.
*
* @return
* a flow emitting lines read from the input byte chunks, assuming they represent text.
*/
public Flow<String> linesUtf8() {
return lines(StandardCharsets.UTF_8);
}

/**
* Transforms a flow of byte arrays such that each emitted `String` is a text line from the input.
*
* @param charset the charset to use for decoding the bytes into text.
* @return a flow emitting lines read from the input byte arrays, assuming they represent text.
*/
public Flow<String> lines(Charset charset) {
// buffer == Optional.empty() is a special state for handling empty chunks in onComplete, in order to tell them apart from empty lines
return mapStatefulConcat(Optional::<byte[]>empty,
(buffer, nextChunk) -> {
if (!byte[].class.isInstance(nextChunk)) {
throw new IllegalArgumentException("requirement failed: method can be called only on flow containing byte[]");
}
// get next incoming chunk
byte[] chunk = (byte[]) nextChunk;
if (chunk.length == 0) {
return Map.entry(Optional.empty(), Collections.emptyList());
}

// check if chunk contains newline character, if not proceed to the next chunk
int newLineIndex = getNewLineIndex(chunk);
if (newLineIndex == -1) {
if (buffer.isEmpty()) {
return Map.entry(Optional.of(chunk), Collections.emptyList());
}
var b = buffer.get();
byte[] newBuffer = Arrays.copyOf(b, b.length + chunk.length);
newBuffer = ByteBuffer.wrap(newBuffer).put(b.length, chunk).array();
return Map.entry(Optional.of(newBuffer), Collections.emptyList());
}

// buffer for lines, if chunk contains more than one newline character
List<byte[]> lines = new ArrayList<>();

// variable used to clear buffer after using it
byte[] bufferFromPreviousChunk = buffer.orElse(new byte[0]);
while (chunk.length > 0 && newLineIndex != -1) {
byte[] line = new byte[newLineIndex];
byte[] newChunk = new byte[chunk.length - newLineIndex - 1];
ByteBuffer.wrap(chunk)
.get(line, 0, newLineIndex)
.get(newLineIndex + 1, newChunk, 0, chunk.length - newLineIndex - 1);

if (bufferFromPreviousChunk.length > 0) {
// concat accumulated buffer and line
byte[] buf = Arrays.copyOf(bufferFromPreviousChunk, bufferFromPreviousChunk.length + line.length);
lines.add(ByteBuffer.wrap(buf).put(bufferFromPreviousChunk.length, line).array());
// cleanup buffer
bufferFromPreviousChunk = new byte[0];
} else {
lines.add(line);
}
chunk = newChunk;
newLineIndex = getNewLineIndex(chunk);
}
return Map.entry(Optional.of(chunk), lines);
},
buf -> buf
)
.map(chunk -> new String(chunk, charset));
}

private int getNewLineIndex(byte[] chunk) {
for (int i = 0; i < chunk.length; i++) {
if (chunk[i] == '\n') {
return i;
}
}
return -1;
}

/**
* Always runs `f` after the flow completes, whether it's because all elements are emitted, or when there's an error.
*/
Expand Down Expand Up @@ -929,6 +1035,28 @@ public <U> Flow<U> interleave(Flow<U> other, int segmentSize, boolean eagerCompl
return Flows.interleaveAll(Arrays.asList((Flow<U>) this, other), segmentSize, eagerComplete, bufferCapacity);
}

/**
* Emits a given number of elements (determined by `segmentSize`) from this flow to the returned flow, then emits the same number of
* elements from the `other` flow and repeats. The order of elements in both flows is preserved.
* <p>
* If one of the flows is done before the other, the behavior depends on the `eagerComplete` flag. When set to `true`, the returned flow is
* completed immediately, otherwise the remaining elements from the other flow are emitted by the returned flow.
* <p>
* Both flows are run concurrently and asynchronously. The size of used buffer is determined by the {@link Channel#BUFFER_SIZE} that is in scope, or default {@link Channel#DEFAULT_BUFFER_SIZE} is used.
*
* @param other
* The flow whose elements will be interleaved with the elements of this flow.
* @param segmentSize
* The number of elements sent from each flow before switching to the other one.
* @param eagerComplete
* If `true`, the returned flow is completed as soon as either of the flow completes. If `false`, the remaining elements of the
* non-completed flow are sent downstream.
*/
public <U> Flow<U> interleave(Flow<U> other, int segmentSize, boolean eagerComplete) {
//noinspection unchecked
return Flows.interleaveAll(Arrays.asList((Flow<U>) this, other), segmentSize, eagerComplete);
}

/**
* Applies the given mapping function `f`, to each element emitted by this source, transforming it into an `Iterable` of results,
* then the returned flow emits the results one by one. Can be used to unfold incoming sequences of elements into single elements.
Expand Down
22 changes: 22 additions & 0 deletions flows/src/main/java/com/softwaremill/jox/flows/Flows.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,28 @@ public static <T> Flow<T> failed(Exception t) {
});
}

/**
* Sends a given number of elements (determined by `segmentSize`) from each flow in `flows` to the returned flow and repeats. The order
* of elements in all flows is preserved.
* <p>
* If any of the flows is done before the others, the behavior depends on the `eagerComplete` flag. When set to `true`, the returned flow
* is completed immediately, otherwise the interleaving continues with the remaining non-completed flows. Once all but one flows are
* complete, the elements of the remaining non-complete flow are emitted by the returned flow.
* <p>
* The provided flows are run concurrently and asynchronously. The size of used buffer is determined by the {@link Channel#BUFFER_SIZE} that is in scope, or default {@link Channel#DEFAULT_BUFFER_SIZE} is used.
*
* @param flows
* The flows whose elements will be interleaved.
* @param segmentSize
* The number of elements sent from each flow before switching to the next one.
* @param eagerComplete
* If `true`, the returned flow is completed as soon as any of the flows completes. If `false`, the interleaving continues with the
* remaining non-completed flows.
*/
public static <T> Flow<T> interleaveAll(List<Flow<T>> flows, int segmentSize, boolean eagerComplete) {
return interleaveAll(flows, segmentSize, eagerComplete, Channel.BUFFER_SIZE.orElse(Channel.DEFAULT_BUFFER_SIZE));
}

/**
* Sends a given number of elements (determined by `segmentSize`) from each flow in `flows` to the returned flow and repeats. The order
* of elements in all flows is preserved.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import java.util.List;

import com.softwaremill.jox.Channel;
import org.junit.jupiter.api.Test;

public class FlowInterleaveTest {
Expand Down Expand Up @@ -51,6 +52,34 @@ void shouldInterleaveTwoSourcesWithSegmentSizeEqualTo1AndDifferentLengths() thro
assertEquals(List.of(1, 2, 3, 4, 5, 6, 8, 10, 12), result);
}

@Test
void shouldInterleaveWithDefaultBufferCapacity() throws Exception {
// given
var c1 = Flows.fromValues(1, 3, 5);
var c2 = Flows.fromValues(2, 4, 6, 8, 10, 12);

// when
var result = c1.interleave(c2, 1, false)
.runToList();

// then
assertEquals(List.of(1, 2, 3, 4, 5, 6, 8, 10, 12), result);
}

@Test
void shouldInterleaveWithBufferCapacityTakenFromScope() throws Exception {
// given
var c1 = Flows.fromValues(1, 3, 5);
var c2 = Flows.fromValues(2, 4, 6, 8, 10, 12);

// when
var result = ScopedValue.where(Channel.BUFFER_SIZE, 10)
.call(() -> c1.interleave(c2, 1, false).runToList());

// then
assertEquals(List.of(1, 2, 3, 4, 5, 6, 8, 10, 12), result);
}

@Test
void shouldInterleaveTwoSourcesSegmentSizeBiggerThan1() throws Exception {
// given
Expand Down
Loading

0 comments on commit f564246

Please sign in to comment.