Skip to content

Commit

Permalink
HSEARCH-5208 Add a fail-fast configuration option to the mass indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
marko-bekhta committed Sep 9, 2024
1 parent 296b513 commit 1cc218c
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,13 @@ Defaults to a threshold defined by the failure handler in use; see `MassIndexing
`FailureHandler#failureFloodingThreshold`.
For the default log-based failure handler, the default threshold is 100.

|`failFast(boolean)`
| `false`
|*This feature is _incubating_: it is still under active development.*
An option to stop the indexing right after an error is encountered during the process,
without waiting for the process to attempt indexing the remaining entities.
With fail-fast enabled, the mass indexer will attempt to cancel any mass-indexing internal processes after the first
error reported to the `MassIndexingFailureHandler`.
|===

[[indexing-massindexer-tuning]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,33 @@ public void entityLoading(Optional<Integer> failureFloodingThreshold) {
);
}

@Test
void failFast() {
String exceptionMessage = "Entity loading error";

SearchMapping mapping = setupWithThrowingEntityLoading( exceptionMessage );

MassIndexer massIndexer = mapping.scope( Object.class ).massIndexer()
.threadsToLoadObjects( 1 ) // Just to simplify the assertions
.batchSizeToLoadObjects( 1 )
.failFast( true );
doMassIndexingWithFailure(
massIndexer,
ThreadExpectation.CREATED_AND_TERMINATED,
throwable -> assertThat( throwable ).isInstanceOf( SearchException.class )
.hasMessageContainingAll(
"failure(s) occurred during mass indexing",
"See the logs for details.",
"First failure: ",
exceptionMessage
)
.hasCauseInstanceOf( SimulatedFailure.class ),
expectIndexScaleWork( StubIndexScaleWork.Type.PURGE, ExecutionExpectation.SUCCEED ),
expectIndexScaleWork( StubIndexScaleWork.Type.MERGE_SEGMENTS, ExecutionExpectation.SUCCEED )
// we do not expect flush or refresh since the indexing was stopped;
);
}

@Test
void indexing() {
SearchMapping mapping = setup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,17 @@ public interface MassIndexer {
@Incubating
MassIndexer failureFloodingThreshold(long threshold);

/**
* Enables the fail-fast option for this mass indexer.
* <p>
* With fail-fast option enabled, the mass indexer will request cancelling all internal mass-indexing processes
* right after the first error is reported to the {@link MassIndexingFailureHandler}.
*
* @param failFast Whether to enabled fail fast option for this mass indexer.
*
* @return {@code this} for method chaining
*/
@Incubating
MassIndexer failFast(boolean failFast);

}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ public MassIndexer failureFloodingThreshold(long threshold) {
return this;
}

@Override
public MassIndexer failFast(boolean failFast) {
delegate.failFast( failFast );
return this;
}

ConditionalExpression reindexOnly(Class<?> type, String conditionalExpression) {
return context.reindexOnly( type, conditionalExpression );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.ObjectPath;
import org.hibernate.search.mapper.pojo.mapping.impl.PojoContainedTypeManager;
import org.hibernate.search.mapper.pojo.mapping.impl.PojoIndexedTypeManager;
import org.hibernate.search.mapper.pojo.massindexing.impl.MassIndexingOperationHandledFailureException;
import org.hibernate.search.mapper.pojo.model.path.PojoModelPathValueNode;
import org.hibernate.search.mapper.pojo.model.path.spi.ProjectionConstructorPath;
import org.hibernate.search.mapper.pojo.model.spi.PojoConstructorModel;
Expand Down Expand Up @@ -1023,4 +1024,8 @@ SearchException invalidParameterTypeForDistanceProjectionInProjectionConstructor
+ "Remove the `convert` attribute and keep only the `valueModel=ValueModel.%1$s`.")
SearchException usingNonDefaultValueConvertAndValueModelNotAllowed(String model,
String convert, @Param EventContext eventContext);

@Message(id = ID_OFFSET + 163,
value = "Mass indexer running in a fail fast mode encountered a problem. Stopping the process.")
MassIndexingOperationHandledFailureException massIndexerFailFast();
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,8 @@ public class MassIndexingOperationHandledFailureException extends SearchExceptio
public MassIndexingOperationHandledFailureException(Throwable cause) {
super( cause );
}

public MassIndexingOperationHandledFailureException(String message) {
super( message );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public void afterExecution(Context context) {
private Boolean dropAndCreateSchemaOnStart;
private Boolean purgeAtStart;
private Boolean mergeSegmentsAfterPurge;
private Boolean failFast;
private Long failureFloodingThreshold = null;

private MassIndexingFailureHandler failureHandler;
Expand Down Expand Up @@ -226,13 +227,21 @@ public PojoMassIndexer failureFloodingThreshold(long threshold) {
return this;
}

@Override
public PojoMassIndexer failFast(boolean failFast) {
this.failFast = failFast;
return this;
}

private MassIndexingFailureHandler getOrCreateFailureHandler() {
MassIndexingFailureHandler result = failureHandler;
if ( result == null ) {
result = new PojoMassIndexingDelegatingFailureHandler( mappingContext.failureHandler() );
MassIndexingFailureHandler handler = failureHandler;
if ( handler == null ) {
handler = new PojoMassIndexingDelegatingFailureHandler( mappingContext.failureHandler() );
}
result = new PojoMassIndexingFailSafeFailureHandlerWrapper( result );
return result;
return new PojoMassIndexingFailSafeFailureHandlerWrapper(
handler,
Boolean.TRUE.equals( failFast )
);
}

private MassIndexingMonitor getOrCreateMonitor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ public class PojoMassIndexingFailSafeFailureHandlerWrapper implements MassIndexi
private static final Log log = LoggerFactory.make( Log.class, MethodHandles.lookup() );

private final MassIndexingFailureHandler delegate;
private final boolean failFast;

public PojoMassIndexingFailSafeFailureHandlerWrapper(MassIndexingFailureHandler delegate) {
public PojoMassIndexingFailSafeFailureHandlerWrapper(MassIndexingFailureHandler delegate, boolean failFast) {
this.delegate = delegate;
this.failFast = failFast;
}

@Override
Expand All @@ -30,6 +32,9 @@ public void handle(MassIndexingFailureContext context) {
catch (Throwable t) {
log.failureInMassIndexingFailureHandler( t );
}
finally {
failFastIfNeeded();
}
}

@Override
Expand All @@ -40,6 +45,9 @@ public void handle(MassIndexingEntityFailureContext context) {
catch (Throwable t) {
log.failureInMassIndexingFailureHandler( t );
}
finally {
failFastIfNeeded();
}
}

@Override
Expand All @@ -52,4 +60,10 @@ public long failureFloodingThreshold() {
return MassIndexingFailureHandler.super.failureFloodingThreshold();
}
}

private void failFastIfNeeded() {
if ( failFast ) {
throw log.massIndexerFailFast();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,17 @@ public interface PojoMassIndexer {
*/
@Incubating
PojoMassIndexer failureFloodingThreshold(long threshold);

/**
* Enables the fail-fast option for this mass indexer.
* <p>
* With fail-fast option enabled, the mass indexer will request cancelling all internal mass-indexing processes
* right after the first error is reported to the {@link MassIndexingFailureHandler}.
*
* @param failFast Whether to enabled fail fast option for this mass indexer.
*
* @return {@code this} for method chaining
*/
@Incubating
PojoMassIndexer failFast(boolean failFast);
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,4 +182,17 @@ public interface MassIndexer {
*/
@Incubating
MassIndexer environment(MassIndexingEnvironment environment);

/**
* Enables the fail-fast option for this mass indexer.
* <p>
* With fail-fast option enabled, the mass indexer will request cancelling all internal mass-indexing processes
* right after the first error is reported to the {@link MassIndexingFailureHandler}.
*
* @param failFast Whether to enabled fail fast option for this mass indexer.
*
* @return {@code this} for method chaining
*/
@Incubating
MassIndexer failFast(boolean failFast);
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,10 @@ public MassIndexer environment(MassIndexingEnvironment environment) {
delegate.environment( environment );
return this;
}

@Override
public MassIndexer failFast(boolean failFast) {
delegate.failFast( failFast );
return this;
}
}

0 comments on commit 1cc218c

Please sign in to comment.