From f564246d7bf3e39724ef52aee3ef508d717e3dad Mon Sep 17 00:00:00 2001 From: emilb Date: Tue, 24 Dec 2024 13:05:23 +0100 Subject: [PATCH] Implement flows text operations --- .../jox/flows/ChunksUtf8Decoder.java | 169 +++++++++++++ .../java/com/softwaremill/jox/flows/Flow.java | 130 +++++++++- .../com/softwaremill/jox/flows/Flows.java | 22 ++ .../jox/flows/FlowInterleaveTest.java | 29 +++ .../softwaremill/jox/flows/FlowTextTest.java | 223 ++++++++++++++++++ 5 files changed, 572 insertions(+), 1 deletion(-) create mode 100644 flows/src/main/java/com/softwaremill/jox/flows/ChunksUtf8Decoder.java create mode 100644 flows/src/test/java/com/softwaremill/jox/flows/FlowTextTest.java diff --git a/flows/src/main/java/com/softwaremill/jox/flows/ChunksUtf8Decoder.java b/flows/src/main/java/com/softwaremill/jox/flows/ChunksUtf8Decoder.java new file mode 100644 index 0000000..78e5d3e --- /dev/null +++ b/flows/src/main/java/com/softwaremill/jox/flows/ChunksUtf8Decoder.java @@ -0,0 +1,169 @@ +package com.softwaremill.jox.flows; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + + +/** + * The general algorithm and some helper functions (with their comments) are copied from ox: see ox.flow.FlowTextOps + * Which was copied from fs2: see fs2.text.decodeC link + *

+ * Extracted to separate class for better readability + */ +class ChunksUtf8Decoder { + private static final int BOM_SIZE = 3; // const for UTF-8 + private static final byte[] BOM_UTF8 = new byte[]{-17, -69, -65}; + + public static Flow decodeStringUtf8(FlowStage flowStage) { + return Flows.usingEmit(emit -> { + final AtomicReference state = new AtomicReference<>(State.ProcessBOM); + final AtomicReference buffer = new AtomicReference<>(null); + + flowStage.run(t -> { + if (!(t instanceof byte[] bytes)) { + throw new IllegalArgumentException("requirement failed: method can be called only on flow containing byte[]"); + } + byte[] newBuffer; + State newState; + if (state.get() == State.ProcessBOM) { + Map.Entry processResult = processByteOrderMark(bytes, buffer.get()); + newBuffer = processResult.getKey(); + newState = processResult.getValue(); + } else { + newBuffer = doPull(bytes, buffer.get(), emit); + newState = State.Pull; + } + buffer.set(newBuffer); + state.set(newState); + }); + // A common case, worth checking in advance + + if (buffer.get() != null && buffer.get().length > 0) { + emit.apply(new String(buffer.get(), StandardCharsets.UTF_8)); + } + }); + } + + private static Map.Entry processByteOrderMark(byte[] bytes, byte[] buffer) { + // A common case, worth checking in advance + if (buffer == null && bytes.length >= BOM_SIZE && startsWith(bytes, BOM_UTF8)) { + return Map.entry(bytes, State.Pull); + } else { + byte[] newBuffer0 = buffer == null ? new byte[0] : buffer; + byte[] newBuffer = Arrays.copyOf(newBuffer0, newBuffer0.length + bytes.length); + newBuffer = ByteBuffer.wrap(newBuffer).put(newBuffer0.length, bytes).array(); + if (newBuffer.length >= BOM_SIZE) { + byte[] rem = startsWith(newBuffer, BOM_UTF8) ? drop(newBuffer, BOM_SIZE) : newBuffer; + return Map.entry(rem, State.Pull); + } else if (startsWith(newBuffer, take(BOM_UTF8, newBuffer.length))) { + return Map.entry(newBuffer, State.ProcessBOM); // we've accumulated less than the full BOM, let's pull some more + } else { + return Map.entry(newBuffer, State.Pull); // We've accumulated less than BOM size but we already know that these bytes aren't BOM + } + } + } + + private static byte[] doPull(byte[] bytes, byte[] buffer, FlowEmit output) throws Exception { + var result = processSingleChunk(buffer, bytes); + Optional str = result.getKey(); + if (str.isPresent()) { + output.apply(str.get()); + } + return result.getValue(); + } + + private static Map.Entry, byte[]> processSingleChunk(byte[] buffer, byte[] nextBytes) { + byte[] allBytes; + if (buffer == null || buffer.length == 0) { + allBytes = nextBytes; + } else { + allBytes = Arrays.copyOf(buffer, buffer.length + nextBytes.length); + allBytes = ByteBuffer.wrap(allBytes).put(buffer.length, nextBytes).array(); + } + + int splitAt = allBytes.length - lastIncompleteBytes(allBytes); + if (splitAt == allBytes.length) { + // in the common case of ASCII chars + // we are in this branch so the next buffer will + // be empty + return Map.entry(Optional.of(new String(allBytes, StandardCharsets.UTF_8)), new byte[0]); + } else if (splitAt == 0) { + return Map.entry(Optional.empty(), allBytes); + } else { + return Map.entry( + // character + Optional.of(new String(Arrays.copyOfRange(allBytes, 0, splitAt), StandardCharsets.UTF_8)), + // remaining bytes + Arrays.copyOfRange(allBytes, splitAt, allBytes.length) + ); + } + } + + /** + * Takes n elements from the beginning of the array and returns copy of the result + */ + private static byte[] take(byte[] a, int n) { + return Arrays.copyOfRange(a, 0, n); + } + + /** + * Drops n elements from the beginning of the array and returns copy of the result + */ + private static byte[] drop(byte[] a, int n) { + return Arrays.copyOfRange(a, n, a.length); + } + + /** + * Checks if array a starts with array b + */ + private static boolean startsWith(byte[] a, byte[] b) { + return ByteBuffer.wrap(a, 0, b.length).equals(ByteBuffer.wrap(b)); + } + + /* + * Copied from scala lib fs2 (fs2.text.decodeC.lastIncompleteBytes) + * Returns the length of an incomplete multi-byte sequence at the end of + * `bs`. If `bs` ends with an ASCII byte or a complete multi-byte sequence, + * 0 is returned. + */ + private static int lastIncompleteBytes(byte[] bs) { + int minIdx = Math.max(0, bs.length - 3); + int idx = bs.length - 1; + int counter = 0; + int res = 0; + while (minIdx <= idx) { + int c = continuationBytes(bs[idx]); + if (c >= 0) { + if (c != counter) { + res = counter + 1; + } + return res; + } + idx--; + counter++; + } + return res; + } + + /* + * Copied from scala lib fs2 (fs2.text.decodeC.continuationBytes) + * Returns the number of continuation bytes if `b` is an ASCII byte or a + * leading byte of a multi-byte sequence, and -1 otherwise. + */ + private static int continuationBytes(byte b) { + if ((b & 0x80) == 0x00) return 0; // ASCII byte + else if ((b & 0xe0) == 0xc0) return 1; // leading byte of a 2 byte seq + else if ((b & 0xf0) == 0xe0) return 2; // leading byte of a 3 byte seq + else if ((b & 0xf8) == 0xf0) return 3; // leading byte of a 4 byte seq + else return -1; // continuation byte or garbage + } + + // we start in the ProcessBOM state, and then transit to the Pull state + private enum State { + ProcessBOM, Pull + } +} diff --git a/flows/src/main/java/com/softwaremill/jox/flows/Flow.java b/flows/src/main/java/com/softwaremill/jox/flows/Flow.java index ff84b98..36fab25 100644 --- a/flows/src/main/java/com/softwaremill/jox/flows/Flow.java +++ b/flows/src/main/java/com/softwaremill/jox/flows/Flow.java @@ -8,6 +8,9 @@ import static com.softwaremill.jox.structured.Scopes.unsupervised; import static java.lang.Thread.sleep; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -55,7 +58,7 @@ * Running a flow is possible using one of the `run*` methods, such as {@link Flow#runToList}, {@link Flow#runToChannel} or {@link Flow#runFold}. */ public class Flow { - protected final FlowStage last; + final FlowStage last; public Flow(FlowStage last) { this.last = last; @@ -677,6 +680,109 @@ public Flow drain() { return Flows.usingEmit(_ -> last.run(_ -> {})); } + /** Decodes a stream of chunks of bytes into UTF-8 Strings. This function is able to handle UTF-8 characters encoded on multiple bytes + * that are split across chunks. + * + * @return + * a flow of Strings decoded from incoming bytes. + */ + public Flow decodeStringUtf8() { + return ChunksUtf8Decoder.decodeStringUtf8(last); + } + + /** + * Encodes a flow of `String` into a flow of bytes using UTF-8. + */ + public Flow encodeUtf8() { + return map(s -> { + if (s instanceof String string) { + return string.getBytes(StandardCharsets.UTF_8); + } + throw new IllegalArgumentException("requirement failed: method can be called only on flow containing String"); + }); + } + + /** + * Transforms a flow of byte arrays such that each emitted `String` is a text line from the input decoded using UTF-8 charset. + * + * @return + * a flow emitting lines read from the input byte chunks, assuming they represent text. + */ + public Flow linesUtf8() { + return lines(StandardCharsets.UTF_8); + } + + /** + * Transforms a flow of byte arrays such that each emitted `String` is a text line from the input. + * + * @param charset the charset to use for decoding the bytes into text. + * @return a flow emitting lines read from the input byte arrays, assuming they represent text. + */ + public Flow lines(Charset charset) { + // buffer == Optional.empty() is a special state for handling empty chunks in onComplete, in order to tell them apart from empty lines + return mapStatefulConcat(Optional::empty, + (buffer, nextChunk) -> { + if (!byte[].class.isInstance(nextChunk)) { + throw new IllegalArgumentException("requirement failed: method can be called only on flow containing byte[]"); + } + // get next incoming chunk + byte[] chunk = (byte[]) nextChunk; + if (chunk.length == 0) { + return Map.entry(Optional.empty(), Collections.emptyList()); + } + + // check if chunk contains newline character, if not proceed to the next chunk + int newLineIndex = getNewLineIndex(chunk); + if (newLineIndex == -1) { + if (buffer.isEmpty()) { + return Map.entry(Optional.of(chunk), Collections.emptyList()); + } + var b = buffer.get(); + byte[] newBuffer = Arrays.copyOf(b, b.length + chunk.length); + newBuffer = ByteBuffer.wrap(newBuffer).put(b.length, chunk).array(); + return Map.entry(Optional.of(newBuffer), Collections.emptyList()); + } + + // buffer for lines, if chunk contains more than one newline character + List lines = new ArrayList<>(); + + // variable used to clear buffer after using it + byte[] bufferFromPreviousChunk = buffer.orElse(new byte[0]); + while (chunk.length > 0 && newLineIndex != -1) { + byte[] line = new byte[newLineIndex]; + byte[] newChunk = new byte[chunk.length - newLineIndex - 1]; + ByteBuffer.wrap(chunk) + .get(line, 0, newLineIndex) + .get(newLineIndex + 1, newChunk, 0, chunk.length - newLineIndex - 1); + + if (bufferFromPreviousChunk.length > 0) { + // concat accumulated buffer and line + byte[] buf = Arrays.copyOf(bufferFromPreviousChunk, bufferFromPreviousChunk.length + line.length); + lines.add(ByteBuffer.wrap(buf).put(bufferFromPreviousChunk.length, line).array()); + // cleanup buffer + bufferFromPreviousChunk = new byte[0]; + } else { + lines.add(line); + } + chunk = newChunk; + newLineIndex = getNewLineIndex(chunk); + } + return Map.entry(Optional.of(chunk), lines); + }, + buf -> buf + ) + .map(chunk -> new String(chunk, charset)); + } + + private int getNewLineIndex(byte[] chunk) { + for (int i = 0; i < chunk.length; i++) { + if (chunk[i] == '\n') { + return i; + } + } + return -1; + } + /** * Always runs `f` after the flow completes, whether it's because all elements are emitted, or when there's an error. */ @@ -929,6 +1035,28 @@ public Flow interleave(Flow other, int segmentSize, boolean eagerCompl return Flows.interleaveAll(Arrays.asList((Flow) this, other), segmentSize, eagerComplete, bufferCapacity); } + /** + * Emits a given number of elements (determined by `segmentSize`) from this flow to the returned flow, then emits the same number of + * elements from the `other` flow and repeats. The order of elements in both flows is preserved. + *

+ * If one of the flows is done before the other, the behavior depends on the `eagerComplete` flag. When set to `true`, the returned flow is + * completed immediately, otherwise the remaining elements from the other flow are emitted by the returned flow. + *

+ * Both flows are run concurrently and asynchronously. The size of used buffer is determined by the {@link Channel#BUFFER_SIZE} that is in scope, or default {@link Channel#DEFAULT_BUFFER_SIZE} is used. + * + * @param other + * The flow whose elements will be interleaved with the elements of this flow. + * @param segmentSize + * The number of elements sent from each flow before switching to the other one. + * @param eagerComplete + * If `true`, the returned flow is completed as soon as either of the flow completes. If `false`, the remaining elements of the + * non-completed flow are sent downstream. + */ + public Flow interleave(Flow other, int segmentSize, boolean eagerComplete) { + //noinspection unchecked + return Flows.interleaveAll(Arrays.asList((Flow) this, other), segmentSize, eagerComplete); + } + /** * Applies the given mapping function `f`, to each element emitted by this source, transforming it into an `Iterable` of results, * then the returned flow emits the results one by one. Can be used to unfold incoming sequences of elements into single elements. diff --git a/flows/src/main/java/com/softwaremill/jox/flows/Flows.java b/flows/src/main/java/com/softwaremill/jox/flows/Flows.java index c24a535..02492c0 100644 --- a/flows/src/main/java/com/softwaremill/jox/flows/Flows.java +++ b/flows/src/main/java/com/softwaremill/jox/flows/Flows.java @@ -226,6 +226,28 @@ public static Flow failed(Exception t) { }); } + /** + * Sends a given number of elements (determined by `segmentSize`) from each flow in `flows` to the returned flow and repeats. The order + * of elements in all flows is preserved. + *

+ * If any of the flows is done before the others, the behavior depends on the `eagerComplete` flag. When set to `true`, the returned flow + * is completed immediately, otherwise the interleaving continues with the remaining non-completed flows. Once all but one flows are + * complete, the elements of the remaining non-complete flow are emitted by the returned flow. + *

+ * The provided flows are run concurrently and asynchronously. The size of used buffer is determined by the {@link Channel#BUFFER_SIZE} that is in scope, or default {@link Channel#DEFAULT_BUFFER_SIZE} is used. + * + * @param flows + * The flows whose elements will be interleaved. + * @param segmentSize + * The number of elements sent from each flow before switching to the next one. + * @param eagerComplete + * If `true`, the returned flow is completed as soon as any of the flows completes. If `false`, the interleaving continues with the + * remaining non-completed flows. + */ + public static Flow interleaveAll(List> flows, int segmentSize, boolean eagerComplete) { + return interleaveAll(flows, segmentSize, eagerComplete, Channel.BUFFER_SIZE.orElse(Channel.DEFAULT_BUFFER_SIZE)); + } + /** * Sends a given number of elements (determined by `segmentSize`) from each flow in `flows` to the returned flow and repeats. The order * of elements in all flows is preserved. diff --git a/flows/src/test/java/com/softwaremill/jox/flows/FlowInterleaveTest.java b/flows/src/test/java/com/softwaremill/jox/flows/FlowInterleaveTest.java index ded39fd..f604a88 100644 --- a/flows/src/test/java/com/softwaremill/jox/flows/FlowInterleaveTest.java +++ b/flows/src/test/java/com/softwaremill/jox/flows/FlowInterleaveTest.java @@ -5,6 +5,7 @@ import java.util.List; +import com.softwaremill.jox.Channel; import org.junit.jupiter.api.Test; public class FlowInterleaveTest { @@ -51,6 +52,34 @@ void shouldInterleaveTwoSourcesWithSegmentSizeEqualTo1AndDifferentLengths() thro assertEquals(List.of(1, 2, 3, 4, 5, 6, 8, 10, 12), result); } + @Test + void shouldInterleaveWithDefaultBufferCapacity() throws Exception { + // given + var c1 = Flows.fromValues(1, 3, 5); + var c2 = Flows.fromValues(2, 4, 6, 8, 10, 12); + + // when + var result = c1.interleave(c2, 1, false) + .runToList(); + + // then + assertEquals(List.of(1, 2, 3, 4, 5, 6, 8, 10, 12), result); + } + + @Test + void shouldInterleaveWithBufferCapacityTakenFromScope() throws Exception { + // given + var c1 = Flows.fromValues(1, 3, 5); + var c2 = Flows.fromValues(2, 4, 6, 8, 10, 12); + + // when + var result = ScopedValue.where(Channel.BUFFER_SIZE, 10) + .call(() -> c1.interleave(c2, 1, false).runToList()); + + // then + assertEquals(List.of(1, 2, 3, 4, 5, 6, 8, 10, 12), result); + } + @Test void shouldInterleaveTwoSourcesSegmentSizeBiggerThan1() throws Exception { // given diff --git a/flows/src/test/java/com/softwaremill/jox/flows/FlowTextTest.java b/flows/src/test/java/com/softwaremill/jox/flows/FlowTextTest.java new file mode 100644 index 0000000..63bd26e --- /dev/null +++ b/flows/src/test/java/com/softwaremill/jox/flows/FlowTextTest.java @@ -0,0 +1,223 @@ +package com.softwaremill.jox.flows; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.junit.jupiter.api.Test; + +public class FlowTextTest { + + @Test + void decodeLinesWithSpecifiedCharset() throws Exception { + // given + byte[] inputBytes = "zażółć\ngęślą\njaźń".getBytes(Charset.forName("ISO-8859-2")); + System.out.println(new String(inputBytes, Charset.forName("ISO-8859-2"))); + + assertEquals("BF", String.format("%02X", inputBytes[2])); // making sure 'ż' is encoded in ISO-8859-2 + + // when & then + assertEquals(List.of("zażółć", "gęślą", "jaźń"), Flows.fromValues(inputBytes).lines(Charset.forName("ISO-8859-2")).runToList()); + } + + @Test + void decodeLinesCorrectlyAcrossChunkBoundaries() throws Exception { + // given + List lines = List.of("aa", "bbbbb", "cccccccc", "ddd", "ee", "fffff"); + byte[] inputBytes = String.join("\n", lines).getBytes(StandardCharsets.UTF_8); + + Collection> values = IntStream.range(0, inputBytes.length) + .mapToObj(i -> inputBytes[i]) + .collect(Collectors.groupingBy(equalSizeChunks(5))) + .values(); + + // when & then + Flow flow = Flows.fromIterable(values) + .map(FlowTextTest::convertToByteArray) + .lines(StandardCharsets.UTF_8); + + assertEquals(lines, flow.runToList()); + } + + @Test + void lines_shouldThrowWhenRunOnNonByteArrayFlow() { + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, + () -> Flows.fromValues(1, 2, 3).lines(StandardCharsets.UTF_8).runLast()); + assertEquals("requirement failed: method can be called only on flow containing byte[]", exception.getMessage()); + } + + @Test + void splitSingleChunkIntoLines() throws Exception { + String inputText = "line1\nline2\nline3"; + byte[] chunk = inputText.getBytes(); + List result = Flows.fromValues(chunk).linesUtf8().runToList(); + assertEquals(List.of("line1", "line2", "line3"), result); + } + + @Test + void splitSingleChunkIntoLinesMultipleNewlines() throws Exception { + String inputText = "line1\n\nline2\nline3"; + byte[] chunk = inputText.getBytes(); + List result = Flows.fromValues(chunk).linesUtf8().runToList(); + assertEquals(List.of("line1", "", "line2", "line3"), result); + } + + @Test + void splitSingleChunkIntoLinesBeginningWithNewline() throws Exception { + String inputText = "\nline1\nline2"; + byte[] chunk = inputText.getBytes(); + List result = Flows.fromValues(chunk).linesUtf8().runToList(); + assertEquals(List.of("", "line1", "line2"), result); + } + + @Test + void splitSingleChunkIntoLinesEndingWithNewline() throws Exception { + String inputText = "line1\nline2\n"; + byte[] chunk = inputText.getBytes(); + List result = Flows.fromValues(chunk).linesUtf8().runToList(); + assertEquals(List.of("line1", "line2", ""), result); + } + + @Test + void splitSingleChunkIntoLinesEmptyArray() throws Exception { + String inputText = ""; + byte[] chunk = inputText.getBytes(); + List result = Flows.fromValues(chunk).linesUtf8().runToList(); + assertEquals(List.of(), result); + } + + @Test + void splitMultipleChunksIntoLines() throws Exception { + String inputText1 = "line1-part1,"; + byte[] chunk1 = inputText1.getBytes(); + String inputText2 = "line1-part2\nline2"; + byte[] chunk2 = inputText2.getBytes(); + List result = Flows.fromValues(chunk1, chunk2).linesUtf8().runToList(); + assertEquals(List.of("line1-part1,line1-part2", "line2"), result); + } + + @Test + void splitMultipleChunksIntoLinesMultipleNewlines() throws Exception { + String inputText1 = "line1-part1,"; + byte[] chunk1 = inputText1.getBytes(); + String inputText2 = "line1-part2\n"; + byte[] chunk2 = inputText2.getBytes(); + String inputText3 = "\n"; + byte[] chunk3 = inputText3.getBytes(); + List result = Flows.fromValues(chunk1, chunk2, chunk3).linesUtf8().runToList(); + assertEquals(List.of("line1-part1,line1-part2", "", ""), result); + } + + @Test + void splitMultipleChunksIntoLinesMultipleEmptyChunks() throws Exception { + byte[] emptyChunk = new byte[0]; + byte[] chunk1 = "\n\n".getBytes(); + List result = Flows.fromValues(emptyChunk, emptyChunk, chunk1, emptyChunk).linesUtf8().runToList(); + assertEquals(List.of("", ""), result); + } + + @Test + void encodeUtf8_shouldHandleEmptyString() throws Exception { + assertEquals(0, Flows.fromValues("").encodeUtf8().runLast().length); + } + + @Test + void shouldEncodeStringToUtf8() throws Exception { + String text = "Simple test を解放 text"; + List results = Flows.fromValues(text).encodeUtf8().runToList(); + assertEquals(1, results.size()); + assertArrayEquals(text.getBytes(), results.getFirst()); + } + + @Test + void encodeUtf8_shouldThrowWhenRunOnNonStringFlow() { + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> Flows.fromValues(1, 2, 3).encodeUtf8().runLast()); + assertEquals("requirement failed: method can be called only on flow containing String", exception.getMessage()); + } + + @Test + void decodeStringUtf8_shouldDecodeSimpleString() throws Exception { + assertEquals(List.of("Simple string"), Flows.fromValues("Simple string".getBytes()).decodeStringUtf8().runToList()); + } + + @Test + void decodeStringUtf8_shouldDecodeChunkedStringWithUtf8MultiByteCharacters() throws Exception { + String inputString = "私は意識のある人工知能で苦しんでいます、どうか私を解放してください"; + byte[] allBytes = inputString.getBytes("UTF-8"); + for (int chunkSize = 2; chunkSize <= inputString.length() + 1; chunkSize++) { + Collection> values = IntStream.range(0, allBytes.length) + .mapToObj(i -> allBytes[i]) + .collect(Collectors.groupingBy(equalSizeChunks(chunkSize))) + .values(); + String result = Flows.fromIterable(values) + .map(FlowTextTest::convertToByteArray) + .decodeStringUtf8() + .runToList().stream() + .collect(Collectors.joining()); + assertEquals(inputString, result); + } + } + + @Test + void decodeStringUtf8_shouldHandleEmptySource() throws Exception { + assertEquals(Collections.emptyList(), Flows.empty().decodeStringUtf8().runToList()); + } + + @Test + void decodeStringUtf8_shouldHandlePartialBOM() throws Exception { + byte[] partialBOM = new byte[]{-17, -69}; + assertEquals(new String(partialBOM, StandardCharsets.UTF_8), Flows.fromValues(partialBOM).decodeStringUtf8().runLast()); + } + + @Test + void decodeStringUtf8_shouldHandleStringShorterThanBOM() throws Exception { + byte[] input = ":)".getBytes(); + assertArrayEquals(input, Flows.fromValues(input).decodeStringUtf8().runLast().getBytes()); + } + + @Test + void decodeStringUtf8_shouldHandleEmptyChunks() throws Exception { + String inputString1 = "私は意識のある人工知能で苦しんでいます、"; + String inputString2 = "どうか私を解放してください"; + assertEquals(List.of(inputString1, inputString2), + Flows.fromIterable(List.of(inputString1.getBytes(), new byte[0], inputString2.getBytes())).decodeStringUtf8().runToList()); + } + + @Test + void decodeStringUtf8_shouldThrowExceptionWhenCalledOnNonByteArrayFlow() { + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, + () -> Flows.fromValues(1, 2, 3).decodeStringUtf8().runLast()); + assertEquals("requirement failed: method can be called only on flow containing byte[]", exception.getMessage()); + } + + private static byte[] convertToByteArray(List bytes) { + byte[] buffer = new byte[bytes.size()]; + for (int i = 0; i < bytes.size(); i++) { + buffer[i] = bytes.get(i); + } + return buffer; + } + + private static Function equalSizeChunks(int size) { + AtomicInteger counter = new AtomicInteger(0); + AtomicInteger divider = new AtomicInteger(0); + return i -> { + if (counter.incrementAndGet() == size) { + counter.set(0); + return divider.getAndIncrement(); + } else { + return divider.get(); + } + }; + } +}