Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for certain non-trivial Stream searching #34

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions src/main/java/no/digipost/DiggCollectors.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import no.digipost.concurrent.OneTimeAssignment;
import no.digipost.stream.EmptyResultIfEmptySourceCollector;
import no.digipost.stream.NonEmptyStream;
import no.digipost.stream.SubjectFilter;
import no.digipost.tuple.Tuple;
import no.digipost.tuple.ViewableAsTuple;
import no.digipost.util.ViewableAsOptional;
Expand All @@ -34,6 +35,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -49,6 +51,37 @@
public final class DiggCollectors {


/**
* Create a collector used for finding and accumulating a specific result,
* where applying a filter is not adequate. The returned
* {@link SubjectFilter subject filter} is used for
* further specifying the final (compound) condition for the result to find.
* <p>
* When searching for a result <em>in context</em> of other elements, you must carefully
* ensure the source to have appropriate ordering and parallelity (or probably rather lack thereof)
* for the correct and expected operation of the collector.
* <p>
* Note: because {@link Collector collectors} are applied to <em>all</em> elements
* in a Stream, care should be taken to exclude non-applicable elements e.g. using
* a {@link Stream#filter(Predicate) filter}, and {@link Stream#limit(long) limit}
* especially for infinite Streams, before collecting.
*
*
* @param <T> The element type which is inspected by the subject filter. This type is
* typically the same as the element type of the Stream the final collector
* is applied to.
*
* @param subjectElement the predicate for selecting a subject element for further use
* in accumulating a result from applying the final collector.
*
* @return the subject filter for the collector, which must be further specified
* to build the final collector
*/
public static <T> SubjectFilter<T> find(Predicate<T> subjectElement) {
return new SubjectFilter<>(subjectElement);
}


/**
* A <em>multituple</em> is similar to a <em>multimap</em> in that it consists of one {@link Tuple#first() first} value and a List of
* values as the {@link Tuple#second() second} value,
Expand Down
61 changes: 61 additions & 0 deletions src/main/java/no/digipost/stream/AtomicReferenceFolder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (C) Posten Norge AS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package no.digipost.stream;

import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.function.Predicate;

/**
* A consumer function for "folding" a value with an {@link AtomicReference}.
*
* @param <T> The type of the value this function folds.
*/
@FunctionalInterface
interface AtomicReferenceFolder<T> extends BiConsumer<AtomicReference<T>, T> {

static <T> AtomicReferenceFolder<T> clearReference() {
return (ref, value) -> ref.set(null);
}

static <T> AtomicReferenceFolder<T> keepFirst(Predicate<? super T> predicate) {
return (currentRef, candidateElement) ->
currentRef.accumulateAndGet(candidateElement, (current, candidate) -> current == null && predicate.test(candidate) ? candidate : current);
}

static <T> AtomicReferenceFolder<T> keepLast(Predicate<? super T> predicate) {
return (currentRef, candidateElement) -> {
if (predicate.test(candidateElement)) {
currentRef.set(candidateElement);
}
};
}

default AtomicReferenceFolder<T> doInsteadIf(Predicate<? super T> valuePredicate, AtomicReferenceFolder<T> foldOperation) {
return doInsteadIf((ref, value) -> valuePredicate.test(value), foldOperation);
}

default AtomicReferenceFolder<T> doInsteadIf(BiPredicate<? super AtomicReference<T>, ? super T> refAndValuePredicate, AtomicReferenceFolder<T> foldOperation) {
return (ref, value) -> {
if (refAndValuePredicate.test(ref, value)) {
foldOperation.accept(ref, value);
} else {
this.accept(ref, value);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (C) Posten Norge AS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package no.digipost.stream;

import java.util.EnumSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;

import static java.util.Collections.unmodifiableSet;
import static java.util.stream.Collector.Characteristics.CONCURRENT;

final class AtomicReferenceFoldingCollector<T> implements Collector<T, AtomicReference<T>, Optional<T>> {

private final AtomicReferenceFolder<T> accumulator;
private final Set<Characteristics> characteristics;

AtomicReferenceFoldingCollector(AtomicReferenceFolder<T> accumulator) {
this(accumulator, EnumSet.of(CONCURRENT));
}

AtomicReferenceFoldingCollector(AtomicReferenceFolder<T> accumulator, Set<Characteristics> characteristics) {
this.accumulator = accumulator;
this.characteristics = unmodifiableSet(characteristics);
}


@Override
public AtomicReferenceFolder<T> accumulator() {
return accumulator;
}

@Override
public Set<Characteristics> characteristics() {
return characteristics;
}

/**
* The combiner, while thread-safe, has essentially undefined behavior if combining two
* found elements, because there is no way to tell if one or the other should be prioritized.
* In all cases of combining one found element with a not found, the found element will be
* returned from the function.
*/
@Override
public BinaryOperator<AtomicReference<T>> combiner() {
return (ref1, ref2) -> {
ref1.compareAndSet(null, ref2.get());
return ref1;
};
}

@Override
public Function<AtomicReference<T>, Optional<T>> finisher() {
return ref -> Optional.ofNullable(ref.get());
}

@Override
public Supplier<AtomicReference<T>> supplier() {
return AtomicReference::new;
}
}
60 changes: 60 additions & 0 deletions src/main/java/no/digipost/stream/SubjectFilter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (C) Posten Norge AS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package no.digipost.stream;

import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Collector;

import static java.util.function.Predicate.isEqual;
import static no.digipost.stream.AtomicReferenceFolder.clearReference;
import static no.digipost.stream.AtomicReferenceFolder.keepFirst;
import static no.digipost.stream.AtomicReferenceFolder.keepLast;

/**
* The initial subject filter for building a searching {@code Collector}.
* The {@link Collector} is acquired by invoking a method to finalize the
* (compound) condition for accumulating the result across the elements
* the collector will be applied to.
*
* @param <T> the type of elements this subject filter inspects
*
* @see #keepFirstNotFollowedBy(Predicate)
* @see #keepLastNotFollowedBy(Predicate)
*/
public final class SubjectFilter<T> {
private final Predicate<T> subjectElement;

public SubjectFilter(Predicate<T> subjectElement) {
this.subjectElement = subjectElement;
}

public Collector<T, ?, Optional<T>> keepFirstNotFollowedBy(T cancellingElement) {
return keepFirstNotFollowedBy(isEqual(cancellingElement));
}

public Collector<T, ?, Optional<T>> keepFirstNotFollowedBy(Predicate<? super T> cancellingElement) {
return new AtomicReferenceFoldingCollector<>(keepFirst(subjectElement).doInsteadIf(cancellingElement, clearReference()));
}

public Collector<T, ?, Optional<T>> keepLastNotFollowedBy(T cancellingElement) {
return keepLastNotFollowedBy(isEqual(cancellingElement));
}

public Collector<T, ?, Optional<T>> keepLastNotFollowedBy(Predicate<? super T> cancellingElement) {
return new AtomicReferenceFoldingCollector<>(keepLast(subjectElement).doInsteadIf(cancellingElement, clearReference()));
}
}
48 changes: 48 additions & 0 deletions src/test/java/no/digipost/DiggCollectorsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import no.digipost.tuple.Tuple;
import no.digipost.tuple.ViewableAsTuple;
import no.digipost.util.ViewableAsOptional;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.quicktheories.core.Gen;
import uk.co.probablyfine.matchers.OptionalMatchers;

Expand All @@ -31,6 +34,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static java.util.Arrays.asList;
Expand All @@ -39,6 +44,7 @@
import static no.digipost.DiggCollectors.allowAtMostOne;
import static no.digipost.DiggCollectors.allowAtMostOneOrElseThrow;
import static no.digipost.DiggCollectors.asSuppressedExceptionsOf;
import static no.digipost.DiggCollectors.find;
import static no.digipost.DiggCollectors.toMultimap;
import static no.digipost.DiggCollectors.toMultituple;
import static no.digipost.DiggCollectors.toSingleExceptionWithSuppressed;
Expand All @@ -50,6 +56,7 @@
import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.doThrow;
Expand Down Expand Up @@ -207,4 +214,45 @@ public void allowAtMostOneFailsWithCustomException() {
assertThat(thrown, sameInstance(customException));
});
}

@Nested
class FindTest {

@Test
void emptyStream() {
assertThat(Stream.empty().collect(find(e -> true).keepLastNotFollowedBy(null)), whereNot(Optional::isPresent));
}

@Test
void findsLastItemInStreamNothingIsVoided() {
assertThat(Stream.of(1, 2, 3, 4).collect(find((Integer n) -> n > 2).keepLastNotFollowedBy(0)), contains(4));
}

@Test
void findsLastMatchingItemAndNotCancelledByAnyFollowingElement() {
assertThat(Stream.of(1, 2, 3, 4, 5, 6).collect(find((Integer n) -> n > 2 && n < 6).keepLastNotFollowedBy(0)), contains(5));
}

@Test
void findsLastMatchingItemNotFollowedByCertainValue() {
assertThat(Stream.of(1, 2, 3, 4, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7).collect(find((Integer n) -> n > 3 && n < 6).keepLastNotFollowedBy(8)), contains(5));
}

@Test
void findsFirstMatchingItemNotFollowedByCertainValue() {
assertThat(Stream.of(1, 2, 3, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7).collect(find((Integer n) -> n > 3 && n < 6).keepFirstNotFollowedBy(8)), contains(4));
}

@RepeatedTest(20) @Timeout(10)
void parallelStreamsWorksButBehaviorIsNotReallyDefined() {
Random random = new Random();
Optional<Integer> result = Optional.empty();
while(!result.isPresent()) {
result = IntStream.generate(() -> random.nextInt(1000)).limit(500_000).boxed()
.parallel()
.collect(find((Integer n) -> n < 100).keepLastNotFollowedBy(n -> n > 990));
}
assertThat(result, contains(lessThan(100)));
}
}
}