Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement more flows ops methods part 3 #76

Merged
merged 1 commit into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
291 changes: 291 additions & 0 deletions flows/src/main/java/com/softwaremill/jox/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.Callable;
Expand All @@ -31,10 +32,12 @@
import java.util.function.Supplier;

import com.softwaremill.jox.Channel;
import com.softwaremill.jox.ChannelClosedException;
import com.softwaremill.jox.ChannelDone;
import com.softwaremill.jox.ChannelError;
import com.softwaremill.jox.Sink;
import com.softwaremill.jox.Source;
import com.softwaremill.jox.structured.CancellableFork;
import com.softwaremill.jox.structured.Fork;
import com.softwaremill.jox.structured.Scopes;
import com.softwaremill.jox.structured.UnsupervisedScope;
Expand Down Expand Up @@ -268,6 +271,138 @@ public <U> Flow<U> map(Function<T, U> mappingFunction) {
});
}


/**
* Functional interface used for {@link Flow#mapStateful} and {@link Flow#mapStatefulConcat}.
*
* @param <T> type of input flow elements
* @param <S> type of state
* @param <U> type of output flow
*/
@FunctionalInterface
public interface StatefulMapper<T, S, U> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall I've added those interfaces to make the code more readable. If it makes it more blurry for you, I can remove it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this looks good :)

/**
* @param state current state
* @param element current input flow element
* @return pair of new state and `element` mapped to new type `U`
*/
Map.Entry<S, U> apply(S state, T element);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can stick with Map.Entry or create custom Pair implementation (I believe there is no point in depending on other library for this).

The main problem with Map.Entry is that it does not allow null as first/key argument (which makes sense from Map implementation point of view), but it created some problems for next implementations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map.Entry sounds good - do we really use null anywhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ox we used null as marker for lines method.
I used Optional for workaround https://github.com/softwaremill/jox/pull/77/files#diff-46d2537d747d24775cf70630d584ac42326ea6902d3ddd10f5a861841c2c3b6cR722

Overall we can leave it as it is

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, Optional is better :) 👍

}

/**
* Functional interface used for {@link Flow#mapStateful(Supplier, StatefulMapper, OnComplete)} and {@link Flow#mapStatefulConcat(Supplier, StatefulMapper, OnComplete)}
* @param <S> State Type
* @param <U> Output Flow Type
*/
@FunctionalInterface
public interface OnComplete<S, U> {
/**
* @param state at the end of the flow
* @return {@link Optional#empty()} if value should be skipped, or value wrapped in {@link Optional}
*/
Optional<U> apply(S state);
}

/**
* Applies the given mapping function `f`, using additional state, to each element emitted by this flow. The results are emitted by the
* returned flow. Optionally the returned flow emits an additional element, possibly based on the final state, once this flow is done.
* <p>
* The `initializeState` function is called once when `statefulMap` is called.
* <p>
* The `onComplete` function is called once when this flow is done. If it returns a non-empty {@link Optional}, the value will be emitted by the
* flow, while an empty value will be ignored.
*
* @param initializeState
* A function that initializes the state.
* @param f
* A function that transforms the element from this flow and the state into a pair of the next state and the result which is emitted by
* the returned flow.
* @param onComplete
* A function that transforms the final state into an optional element emitted by the returned flow.
*/
public <S, U> Flow<U> mapStateful(Supplier<S> initializeState, StatefulMapper<T, S, U> f, OnComplete<S, U> onComplete) {
StatefulMapper<T, S, Iterable<U>> resultToSome = (state, element) -> {
var result = f.apply(state, element);
return Map.entry(result.getKey(), List.of(result.getValue()));
};

return mapStatefulConcat(initializeState, resultToSome, onComplete);
}

/**
* Applies the given mapping function `f`, using additional state, to each element emitted by this flow. The results are emitted by the
* returned flow.
* <p>
* The `initializeState` function is called once when `statefulMap` is called.
* <p>
* If you want to send additional element after the flow is done, use {@link Flow#mapStateful(Supplier, StatefulMapper, OnComplete)}
*
* @param initializeState
* A function that initializes the state.
* @param f
* A function that transforms the element from this flow and the state into a pair of the next state and the result which is emitted by
* the returned flow.
*/
public <S, U> Flow<U> mapStateful(Supplier<S> initializeState, StatefulMapper<T, S, U> f) {
return mapStateful(initializeState, f, _ -> Optional.empty());
}

/**
* Applies the given mapping function `f`, using additional state, to each element emitted by this flow. The returned flow emits the
* results one by one. Optionally the returned flow emits an additional element, possibly based on the final state, once this flow is
* done.
* <p>
* The `initializeState` function is called once when `statefulMap` is called.
* <p>
* The `onComplete` function is called once when this flow is done. If it returns a non-empty value, the value will be emitted by the
* returned flow, while an empty value will be ignored.
*
* @param initializeState
* A function that initializes the state.
* @param f
* A function that transforms the element from this flow and the state into a pair of the next state and a
* {@code Iterable} of results which are emitted one by one by the returned flow. If the result of `f` is empty,
* nothing is emitted by the returned flow.
* @param onComplete
* A function that transforms the final state into an optional element emitted by the returned flow.
*/
public <S, U> Flow<U> mapStatefulConcat(Supplier<S> initializeState, StatefulMapper<T, S, Iterable<U>> f, OnComplete<S, U> onComplete) {
AtomicReference<S> state = new AtomicReference<>(initializeState.get());
return Flows.usingEmit(emit -> {
last.run(t -> {
Map.Entry<S, Iterable<U>> result = f.apply(state.get(), t);
for (U u : result.getValue()) {
emit.apply(u);
}
state.set(result.getKey());
});

Optional<U> onCompleteResult = onComplete.apply(state.get());
if (onCompleteResult.isPresent()) {
emit.apply(onCompleteResult.get());
}
});
}

/**
* Applies the given mapping function `f`, using additional state, to each element emitted by this flow. The returned flow emits the
* results one by one.
* <p>
* The `initializeState` function is called once when `statefulMap` is called.
* <p>
* If you want to send additional element after the flow is done, use {@link Flow#mapStatefulConcat(Supplier, StatefulMapper, OnComplete)}.
*
* @param initializeState
* A function that initializes the state.
* @param f
* A function that transforms the element from this flow and the state into a pair of the next state and a
* {@code Iterable} of results which are emitted one by one by the returned flow. If the result of `f` is empty,
* nothing is emitted by the returned flow.
*/
public <S, U> Flow<U> mapStatefulConcat(Supplier<S> initializeState, StatefulMapper<T, S, Iterable<U>> f) {
return mapStatefulConcat(initializeState, f, _ -> Optional.empty());
}

/**
* Emits only those elements emitted by this flow, for which `filteringPredicate` returns `true`.
*/
Expand Down Expand Up @@ -339,6 +474,162 @@ public Flow<T> take(int n) {
});
}

/** Groups elements emitted by this flow into child flows. Elements for which `groupingFunction` returns the same value (of type `V`) end up in
* the same child flow. `childFlowTransform` is applied to each created child flow, and the resulting flow is run in the background.
* Finally, the child flows are merged back, that is any elements that they emit are emitted by the returned flow.
* <p>
* Up to `parallelism` child flows are run concurrently in the background. When the limit is reached, the child flow which didn't
* receive a new element the longest is completed as done.
* <p>
* Child flows for `V` values might be created multiple times (if, after completing a child flow because of parallelism limit, new
* elements arrive, mapped to a given `V` value). However, it is guaranteed that for a given `V` value, there will be at most one child
* flow running at any time.
* <p>
* Child flows should only complete as done when the flow of received `T` elements completes. Otherwise, the entire stream will fail with
* an error.
* <p>
* Errors that occur in this flow, or in any child flows, become errors of the returned flow (exceptions are wrapped in
* {@link ChannelClosedException}.
* <p>
* The size of the buffers for the elements emitted by this flow (which is also run in the background) and the child flows are determined
* by the {@link Channel#BUFFER_SIZE} that is in scope, or default {@link Channel#DEFAULT_BUFFER_SIZE} is used.
*
* @param parallelism
* An upper bound on the number of child flows that run in parallel at any time.
* @param groupingFunction
* Function used to determine the group for an element of type `T`. Each group is represented by a value of type `V`.
* @param childFlowTransform
* The function that is used to create a child flow, which is later in the background. The arguments are the group value, for which the
* flow is created, and a flow of `T` elements in that group (each such element has the same group value `V` returned by `predicated`).
*/
public <V, U> Flow<U> groupBy(int parallelism, Function<T, V> groupingFunction, ChildFlowTransformer<T, V, U> childFlowTransform) {
return new GroupByImpl<>(this, parallelism, groupingFunction, childFlowTransform)
.run();
}

/**
* Functional interface used in {@link Flow#groupBy} for transforming the child flows.
*/
@FunctionalInterface
public interface ChildFlowTransformer<T, V, U> {
Function<Flow<T>, Flow<U>> apply(V group);
}

/**
* Chunks up the emitted elements into groups, within a time window, or limited by the specified number of elements, whatever happens
* first. The timeout is reset after a group is emitted. If timeout expires and the buffer is empty, nothing is emitted. As soon as a new
* element is emitted, the flow will emit it as a single-element group and reset the timer.
* <p>
* The size of buffers used by this method is determined by {@link Channel#BUFFER_SIZE} that is in scope, or default {@link Channel#DEFAULT_BUFFER_SIZE} is used.
*
* @param n
* The maximum number of elements in a group.
* @param duration
* The time window in which the elements are grouped.
*/
public Flow<List<T>> groupedWithin(int n, Duration duration) {
return groupedWeightedWithin(n, duration, _ -> 1L);
}

/**
* Chunks up the emitted elements into groups, within a time window, or limited by the cumulative weight being greater or equal to the
* `minWeight`, whatever happens first. The timeout is reset after a group is emitted. If timeout expires and the buffer is empty,
* nothing is emitted. As soon as a new element is received, the flow will emit it as a single-element group and reset the timer.
* <p>
* The size of buffer used by this method is determined by {@link Channel#BUFFER_SIZE} that is in scope, or default {@link Channel#DEFAULT_BUFFER_SIZE} is used.
*
* @param minWeight
* The minimum cumulative weight of elements in a group if no timeout happens.
* @param duration
* The time window in which the elements are grouped.
* @param costFn
* The function that calculates the weight of an element.
*/
@SuppressWarnings("unchecked")
public Flow<List<T>> groupedWeightedWithin(long minWeight, Duration duration, Function<T, Long> costFn) {
if (minWeight <= 0) throw new IllegalArgumentException("requirement failed: minWeight must be > 0");
if (duration.toMillis() <= 0) throw new IllegalArgumentException("requirement failed: duration must be > 0");

return Flows.usingEmit(emit -> {
Scopes.unsupervised(scope -> {
Source<T> flowSource = runToChannel(scope);
Channel<List<T>> outputChannel = Channel.withScopedBufferSize();
Channel<GroupingTimeout> timerChannel = Channel.withScopedBufferSize();

forkPropagate(scope, outputChannel, () -> {
List<T> buffer = new ArrayList<>();
final AtomicLong accumulatedCost = new AtomicLong(0);

CancellableFork<GroupingTimeout> timeoutFork = forkTimeout(scope, timerChannel, duration);

Callable<CancellableFork<Void>> sendBufferAndCleanupCost = () -> {
outputChannel.send(new ArrayList<>(buffer));
buffer.clear();
accumulatedCost.set(0);
return null;
};

boolean shouldRun = true;
while (shouldRun) {
shouldRun = switch (selectOrClosed(flowSource.receiveClause(), timerChannel.receiveClause())) {
case ChannelDone _:
// source is done, emit the buffer and finish
if (timeoutFork != null) timeoutFork.cancelNow();
if (!buffer.isEmpty()) outputChannel.send(buffer);
outputChannel.done();
yield false;
case ChannelError(Throwable cause):
// source returned error, propagate it and finish
if (timeoutFork != null) timeoutFork.cancelNow();
outputChannel.error(cause);
yield false;
case GroupingTimeout _:
timeoutFork = null; // enter 'timed out state', may stay in this state if buffer is empty
if (!buffer.isEmpty()) {
sendBufferAndCleanupCost.call();
// cancel existing timeout and start a new one
if (timeoutFork != null) timeoutFork.cancelNow();
timeoutFork = forkTimeout(scope, timerChannel, duration);
}
yield true;
case Object t:
buffer.add((T) t);
try {
long cost = accumulatedCost.updateAndGet(v -> v + costFn.apply((T) t));
if (timeoutFork == null || cost >= minWeight) {
// timeout passed when buffer was empty or buffer full
sendBufferAndCleanupCost.call();
// cancel existing timeout and start a new one
if (timeoutFork != null) timeoutFork.cancelNow();
timeoutFork = forkTimeout(scope, timerChannel, duration);
}
yield true;
} catch (Exception e) {
if (timeoutFork != null) timeoutFork.cancelNow();
throw e;
}
};
}
return null;
});
FlowEmit.channelToEmit(outputChannel, emit);
return null;
});
});
}

private CancellableFork<GroupingTimeout> forkTimeout(UnsupervisedScope scope, Channel<GroupingTimeout> timerChannel, Duration duration) {
return scope.forkCancellable(() -> {
Thread.sleep(duration);
timerChannel.sendOrClosed(GroupingTimeout.INSTANCE);
return null;
});
}

private enum GroupingTimeout {
INSTANCE
}

/**
* Chunks up the elements into groups of the specified size. The last group may be smaller due to the flow being complete.
*
Expand Down
Loading
Loading