Skip to content

Commit

Permalink
[FLINK-32751][runtime] Refactors CollectSinkOperatorCoordinator to en…
Browse files Browse the repository at this point in the history
…able case-specific logging

shutdownNow() is added to immediately stop any already submitted requests. That makes
the close logic more explicit rather than relying on the Runnables to finish implicitly
through the connection loss.

Any ongoing requests are now also cancelled. This allows us to log specific cases of unexpected
errors properly.
  • Loading branch information
XComp committed Aug 21, 2023
1 parent 1c76850 commit b5b1340
Showing 1 changed file with 82 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,7 +43,10 @@
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand All @@ -63,6 +68,9 @@ public class CollectSinkOperatorCoordinator
private DataInputViewStreamWrapper inStream;
private DataOutputViewStreamWrapper outStream;

private final Set<CompletableFuture<CoordinationResponse>> ongoingRequests =
ConcurrentHashMap.newKeySet();

private ExecutorService executorService;

public CollectSinkOperatorCoordinator(int socketTimeout) {
Expand All @@ -79,8 +87,9 @@ public void start() throws Exception {

@Override
public void close() throws Exception {
LOG.info("Closing the CollectSinkOperatorCoordinator.");
this.executorService.shutdownNow();
closeConnection();
this.executorService.shutdown();
}

@Override
Expand All @@ -101,75 +110,82 @@ public CompletableFuture<CoordinationResponse> handleCoordinationRequest(
"Coordination request must be a CollectCoordinationRequest");

CollectCoordinationRequest collectRequest = (CollectCoordinationRequest) request;
CompletableFuture<CoordinationResponse> responseFuture = new CompletableFuture<>();

if (address == null) {
completeWithEmptyResponse(collectRequest, responseFuture);
return responseFuture;
return CompletableFuture.completedFuture(createEmptyResponse(collectRequest));
}

executorService.submit(() -> handleRequestImpl(collectRequest, responseFuture, address));
return responseFuture;
final CompletableFuture<CoordinationResponse> responseFuture =
FutureUtils.supplyAsync(
() -> handleRequestImpl(collectRequest, address), executorService);

ongoingRequests.add(responseFuture);
return responseFuture.handle(
(response, error) -> {
ongoingRequests.remove(responseFuture);

if (response != null) {
return response;
}

// cancelling the future implies that the error handling happens somewhere else
if (!ExceptionUtils.findThrowable(error, CancellationException.class)
.isPresent()) {
// Request failed: Close current connection and send back empty results
// we catch every exception here because the Socket might suddenly become
// null. We don't want the coordinator to fail if the sink fails.
if (LOG.isDebugEnabled()) {
LOG.warn(
"Collect sink coordinator encountered an unexpected error.",
error);
} else {
LOG.warn(
"Collect sink coordinator encounters a {}: {}",
error.getClass().getSimpleName(),
error.getMessage());
}

closeConnection();
}

return createEmptyResponse(collectRequest);
});
}

private void handleRequestImpl(
CollectCoordinationRequest request,
CompletableFuture<CoordinationResponse> responseFuture,
InetSocketAddress sinkAddress) {
private CoordinationResponse handleRequestImpl(
CollectCoordinationRequest request, InetSocketAddress sinkAddress) throws IOException {
if (sinkAddress == null) {
closeConnection();
completeWithEmptyResponse(request, responseFuture);
return;
throw new NullPointerException("No sinkAddress available.");
}

try {
if (socket == null) {
socket = new Socket();
socket.setSoTimeout(socketTimeout);
socket.setKeepAlive(true);
socket.setTcpNoDelay(true);

socket.connect(sinkAddress);
inStream = new DataInputViewStreamWrapper(socket.getInputStream());
outStream = new DataOutputViewStreamWrapper(socket.getOutputStream());
LOG.info("Sink connection established");
}
if (socket == null) {
socket = new Socket();
socket.setSoTimeout(socketTimeout);
socket.setKeepAlive(true);
socket.setTcpNoDelay(true);

// send version and offset to sink server
if (LOG.isDebugEnabled()) {
LOG.debug("Forwarding request to sink socket server");
}
request.serialize(outStream);

// fetch back serialized results
if (LOG.isDebugEnabled()) {
LOG.debug("Fetching serialized result from sink socket server");
}
responseFuture.complete(new CollectCoordinationResponse(inStream));
} catch (Exception e) {
// request failed, close current connection and send back empty results
// we catch every exception here because socket might suddenly becomes null if the sink
// fails
// and we do not want the coordinator to fail
if (LOG.isDebugEnabled()) {
// this is normal when sink restarts or job ends, so we print a debug log
LOG.debug("Collect sink coordinator encounters an exception", e);
}
closeConnection();
completeWithEmptyResponse(request, responseFuture);
socket.connect(sinkAddress);
inStream = new DataInputViewStreamWrapper(socket.getInputStream());
outStream = new DataOutputViewStreamWrapper(socket.getOutputStream());
LOG.info("Sink connection established");
}

// send version and offset to sink server
LOG.debug("Forwarding request to sink socket server");
request.serialize(outStream);

// fetch back serialized results
LOG.debug("Fetching serialized result from sink socket server");
return new CollectCoordinationResponse(inStream);
}

private void completeWithEmptyResponse(
CollectCoordinationRequest request, CompletableFuture<CoordinationResponse> future) {
future.complete(
new CollectCoordinationResponse(
request.getVersion(),
// this lastCheckpointedOffset is OK
// because client will only expose results to the users when the
// checkpointed offset increases
-1,
Collections.emptyList()));
private CollectCoordinationResponse createEmptyResponse(CollectCoordinationRequest request) {
return new CollectCoordinationResponse(
request.getVersion(),
// this lastCheckpointedOffset is OK
// because client will only expose results to the users when the
// checkpointed offset increases
-1,
Collections.emptyList());
}

private void closeConnection() {
Expand Down Expand Up @@ -217,6 +233,9 @@ public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData
throws Exception {
if (checkpointData == null) {
// restore before any checkpoint completed
LOG.info("Any ongoing requests are cancelled due to a coordinator reset.");
cancelOngoingRequests();

closeConnection();
} else {
ByteArrayInputStream bais = new ByteArrayInputStream(checkpointData);
Expand All @@ -225,6 +244,11 @@ public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData
}
}

private void cancelOngoingRequests() {
ongoingRequests.forEach(ft -> ft.cancel(true));
ongoingRequests.clear();
}

/** Provider for {@link CollectSinkOperatorCoordinator}. */
public static class Provider implements OperatorCoordinator.Provider {

Expand Down

0 comments on commit b5b1340

Please sign in to comment.