Skip to content

Commit

Permalink
Merge pull request #43 from noleme/feature/currentout-interrupt
Browse files Browse the repository at this point in the history
Flow: added missing nonFatal helper for Loaders, interrupt and interr…
  • Loading branch information
eledhwen authored Oct 27, 2022
2 parents c14cd31 + 80c6eeb commit 7f161d1
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 64 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.noleme</groupId>
<artifactId>noleme-flow</artifactId>
<version>0.16</version>
<version>0.17</version>
<packaging>jar</packaging>

<name>Noleme Flow</name>
Expand Down
26 changes: 26 additions & 0 deletions src/main/java/com/noleme/flow/CurrentOut.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
import com.noleme.flow.actor.transformer.BiTransformer;
import com.noleme.flow.actor.transformer.Transformer;
import com.noleme.flow.annotation.Experimental;
import com.noleme.flow.interruption.Interruption;
import com.noleme.flow.node.Node;
import com.noleme.flow.slice.SinkSlice;
import com.noleme.flow.slice.PipeSlice;
import com.noleme.flow.stream.StreamOut;

import java.util.function.Predicate;

/**
* Concept representing a {@link Node} with a potential downstream.
* CurrentOut nodes include {@link FlowOut} and {@link com.noleme.flow.stream.StreamOut} subtypes.
Expand All @@ -35,6 +38,29 @@ public interface CurrentOut<O> extends Node
*/
CurrentIn<O> into(Loader<O> loader);

/**
* Binds the current node into an {@link Interruption}.
* It will trigger a local interruption and allow the rest of the flow to continue.
*
* @return
*/
default CurrentOut<O> interrupt()
{
return this.into(new Interruption<>());
}

/**
* Binds the current node into an {@link Interruption}.
* It will trigger a local interruption if the provided predicate is satisfied, and allow the rest of the flow to continue.
*
* @param predicate
* @return
*/
default CurrentOut<O> interruptIf(Predicate<O> predicate)
{
return this.into(new Interruption<>());
}

/**
* Synonymous with into(Transformer), has the advantage of not allowing ambiguous lambdas.
* @see #into(Transformer)
Expand Down
29 changes: 29 additions & 0 deletions src/main/java/com/noleme/flow/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,35 @@ public static <I1, I2, O> BiTransformer<I1, I2, O> nonFatal(BiTransformer<I1, I2
};
}

/**
* @see #nonFatal(Loader, Consumer)
*/
public static <I> Loader<I> nonFatal(Loader<I> extractor)
{
return nonFatal(extractor, e -> {});
}

/**
* An adapter function for absorbing {@link Exception} and replace them with a log line and the control {@link InterruptionException}.
*
* @param loader A loader instance to be wrapped
* @param <I> the type of the upstream flow
* @return the resulting wrapper Extractor node
*/
public static <I> Loader<I> nonFatal(Loader<I> loader, Consumer<Exception> handler)
{
return input -> {
try {
loader.load(input);
}
catch (Exception e) {
logger.error(e.getMessage(), e);
handler.accept(e);
throw InterruptionException.interrupt();
}
};
}

/**
* An adapter function for leveraging a given {@link Transformer} over a collection of its inputs.
* It essentially produces a {@link Transformer} that will iterate over the input collection and delegate each item to the provided {@link Transformer} implementation.
Expand Down
11 changes: 2 additions & 9 deletions src/main/java/com/noleme/flow/Join.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,13 @@ public Join<I1, I2, O> sample(String name)
}


/**
*
* @return
*/
@Override
public Pipe<O, O> interrupt()
{
return this.into(new Interruption<>());
}

/**
*
* @param predicate
* @return
*/
@Override
public Pipe<O, O> interruptIf(Predicate<O> predicate)
{
return this.into(new Interruption<>(predicate));
Expand Down
11 changes: 2 additions & 9 deletions src/main/java/com/noleme/flow/Pipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,13 @@ public Pipe<I, O> sample(String name)
return this;
}

/**
*
* @return
*/
@Override
public Pipe<O, O> interrupt()
{
return this.into(new Interruption<>());
}

/**
*
* @param predicate
* @return
*/
@Override
public Pipe<O, O> interruptIf(Predicate<O> predicate)
{
return this.into(new Interruption<>(predicate));
Expand Down
11 changes: 2 additions & 9 deletions src/main/java/com/noleme/flow/Source.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,13 @@ public Source<O> sample(String name)
return this;
}

/**
*
* @return
*/
@Override
public Pipe<O, O> interrupt()
{
return this.into(new Interruption<>());
}

/**
*
* @param predicate
* @return
*/
@Override
public Pipe<O, O> interruptIf(Predicate<O> predicate)
{
return this.into(new Interruption<>(predicate));
Expand Down
11 changes: 2 additions & 9 deletions src/main/java/com/noleme/flow/stream/StreamGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,13 @@ public StreamGenerator<I, O> driftSink(Loader<O> loader)
return this;
}

/**
*
* @return
*/
@Override
public StreamPipe<O, O> interrupt()
{
return this.into(new Interruption<>());
}

/**
*
* @param predicate
* @return
*/
@Override
public StreamPipe<O, O> interruptIf(Predicate<O> predicate)
{
return this.into(new Interruption<>(predicate));
Expand Down
11 changes: 2 additions & 9 deletions src/main/java/com/noleme/flow/stream/StreamJoin.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,13 @@ public StreamJoin<I1, I2, O> driftSink(Loader<O> loader)
return this;
}

/**
*
* @return
*/
@Override
public StreamPipe<O, O> interrupt()
{
return this.into(new Interruption<>());
}

/**
*
* @param predicate
* @return
*/
@Override
public StreamPipe<O, O> interruptIf(Predicate<O> predicate)
{
return this.into(new Interruption<>(predicate));
Expand Down
11 changes: 2 additions & 9 deletions src/main/java/com/noleme/flow/stream/StreamPipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,13 @@ public StreamPipe<I, O> driftSink(Loader<O> loader)
}


/**
*
* @return
*/
@Override
public StreamPipe<O, O> interrupt()
{
return this.into(new Interruption<>());
}

/**
*
* @param predicate
* @return
*/
@Override
public StreamPipe<O, O> interruptIf(Predicate<O> predicate)
{
return this.into(new Interruption<>(predicate));
Expand Down
11 changes: 2 additions & 9 deletions src/main/java/com/noleme/flow/stream/StreamSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,13 @@ public StreamSource<O> driftSink(Loader<O> loader)
}


/**
*
* @return
*/
@Override
public StreamPipe<O, O> interrupt()
{
return this.into(new Interruption<>());
}

/**
*
* @param predicate
* @return
*/
@Override
public StreamPipe<O, O> interruptIf(Predicate<O> predicate)
{
return this.into(new Interruption<>(predicate));
Expand Down

0 comments on commit 7f161d1

Please sign in to comment.