From b5b1340f91d6cd2deb563eb67d8cbb3d1852544d Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Wed, 9 Aug 2023 08:58:04 +0200 Subject: [PATCH] [FLINK-32751][runtime] Refactors CollectSinkOperatorCoordinator to enable case-specific logging shutdownNow() is added to immediately stop any already submitted requests. That makes the close logic more explicit rather than relying on the Runnables to finish implicitly through the connection loss. Any ongoing requests are now also cancelled. This allows us to log specific cases of unexpected errors properly. --- .../CollectSinkOperatorCoordinator.java | 140 ++++++++++-------- 1 file changed, 82 insertions(+), 58 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java index e926d625e0a0b..806ed180496d2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java @@ -25,8 +25,10 @@ import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +43,10 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.util.Collections; +import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -63,6 +68,9 @@ public class CollectSinkOperatorCoordinator private DataInputViewStreamWrapper inStream; private DataOutputViewStreamWrapper outStream; + private final Set> ongoingRequests = + ConcurrentHashMap.newKeySet(); + private ExecutorService executorService; public CollectSinkOperatorCoordinator(int socketTimeout) { @@ -79,8 +87,9 @@ public void start() throws Exception { @Override public void close() throws Exception { + LOG.info("Closing the CollectSinkOperatorCoordinator."); + this.executorService.shutdownNow(); closeConnection(); - this.executorService.shutdown(); } @Override @@ -101,75 +110,82 @@ public CompletableFuture handleCoordinationRequest( "Coordination request must be a CollectCoordinationRequest"); CollectCoordinationRequest collectRequest = (CollectCoordinationRequest) request; - CompletableFuture responseFuture = new CompletableFuture<>(); - if (address == null) { - completeWithEmptyResponse(collectRequest, responseFuture); - return responseFuture; + return CompletableFuture.completedFuture(createEmptyResponse(collectRequest)); } - executorService.submit(() -> handleRequestImpl(collectRequest, responseFuture, address)); - return responseFuture; + final CompletableFuture responseFuture = + FutureUtils.supplyAsync( + () -> handleRequestImpl(collectRequest, address), executorService); + + ongoingRequests.add(responseFuture); + return responseFuture.handle( + (response, error) -> { + ongoingRequests.remove(responseFuture); + + if (response != null) { + return response; + } + + // cancelling the future implies that the error handling happens somewhere else + if (!ExceptionUtils.findThrowable(error, CancellationException.class) + .isPresent()) { + // Request failed: Close current connection and send back empty results + // we catch every exception here because the Socket might suddenly become + // null. We don't want the coordinator to fail if the sink fails. + if (LOG.isDebugEnabled()) { + LOG.warn( + "Collect sink coordinator encountered an unexpected error.", + error); + } else { + LOG.warn( + "Collect sink coordinator encounters a {}: {}", + error.getClass().getSimpleName(), + error.getMessage()); + } + + closeConnection(); + } + + return createEmptyResponse(collectRequest); + }); } - private void handleRequestImpl( - CollectCoordinationRequest request, - CompletableFuture responseFuture, - InetSocketAddress sinkAddress) { + private CoordinationResponse handleRequestImpl( + CollectCoordinationRequest request, InetSocketAddress sinkAddress) throws IOException { if (sinkAddress == null) { - closeConnection(); - completeWithEmptyResponse(request, responseFuture); - return; + throw new NullPointerException("No sinkAddress available."); } - try { - if (socket == null) { - socket = new Socket(); - socket.setSoTimeout(socketTimeout); - socket.setKeepAlive(true); - socket.setTcpNoDelay(true); - - socket.connect(sinkAddress); - inStream = new DataInputViewStreamWrapper(socket.getInputStream()); - outStream = new DataOutputViewStreamWrapper(socket.getOutputStream()); - LOG.info("Sink connection established"); - } + if (socket == null) { + socket = new Socket(); + socket.setSoTimeout(socketTimeout); + socket.setKeepAlive(true); + socket.setTcpNoDelay(true); - // send version and offset to sink server - if (LOG.isDebugEnabled()) { - LOG.debug("Forwarding request to sink socket server"); - } - request.serialize(outStream); - - // fetch back serialized results - if (LOG.isDebugEnabled()) { - LOG.debug("Fetching serialized result from sink socket server"); - } - responseFuture.complete(new CollectCoordinationResponse(inStream)); - } catch (Exception e) { - // request failed, close current connection and send back empty results - // we catch every exception here because socket might suddenly becomes null if the sink - // fails - // and we do not want the coordinator to fail - if (LOG.isDebugEnabled()) { - // this is normal when sink restarts or job ends, so we print a debug log - LOG.debug("Collect sink coordinator encounters an exception", e); - } - closeConnection(); - completeWithEmptyResponse(request, responseFuture); + socket.connect(sinkAddress); + inStream = new DataInputViewStreamWrapper(socket.getInputStream()); + outStream = new DataOutputViewStreamWrapper(socket.getOutputStream()); + LOG.info("Sink connection established"); } + + // send version and offset to sink server + LOG.debug("Forwarding request to sink socket server"); + request.serialize(outStream); + + // fetch back serialized results + LOG.debug("Fetching serialized result from sink socket server"); + return new CollectCoordinationResponse(inStream); } - private void completeWithEmptyResponse( - CollectCoordinationRequest request, CompletableFuture future) { - future.complete( - new CollectCoordinationResponse( - request.getVersion(), - // this lastCheckpointedOffset is OK - // because client will only expose results to the users when the - // checkpointed offset increases - -1, - Collections.emptyList())); + private CollectCoordinationResponse createEmptyResponse(CollectCoordinationRequest request) { + return new CollectCoordinationResponse( + request.getVersion(), + // this lastCheckpointedOffset is OK + // because client will only expose results to the users when the + // checkpointed offset increases + -1, + Collections.emptyList()); } private void closeConnection() { @@ -217,6 +233,9 @@ public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData throws Exception { if (checkpointData == null) { // restore before any checkpoint completed + LOG.info("Any ongoing requests are cancelled due to a coordinator reset."); + cancelOngoingRequests(); + closeConnection(); } else { ByteArrayInputStream bais = new ByteArrayInputStream(checkpointData); @@ -225,6 +244,11 @@ public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData } } + private void cancelOngoingRequests() { + ongoingRequests.forEach(ft -> ft.cancel(true)); + ongoingRequests.clear(); + } + /** Provider for {@link CollectSinkOperatorCoordinator}. */ public static class Provider implements OperatorCoordinator.Provider {