Skip to content

Commit

Permalink
[CELEBORN-1686][FOLLOWUP] Avoid NPE when clean up data pusher
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

when code execute in this path , an unexpected NPE occurs .

1. write data
``` java
// org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter#write
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
    boolean needCleanupPusher = true;
    try {
       ....
       // write data
      close();
      needCleanupPusher = false;
    } catch (InterruptedException e) {
      TaskInterruptedHelper.throwTaskKillException();
    } finally {
      if (needCleanupPusher) {
        cleanupPusher();
      }
    }
  }
```
2. close data pusher, exception may throw when push merge data after set `IdleQueue` to null.
```java
org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter#close
private void close() throws IOException, InterruptedException {
    long pushMergedDataTime = System.nanoTime();
    dataPusher.waitOnTermination();
    // ** set `IdleQueue` to null here
    sendBufferPool.returnPushTaskQueue(dataPusher.getAndResetIdleQueue());
    shuffleClient.prepareForMergeData(shuffleId, mapId, encodedAttemptId);
    closeWrite();
    // ** exception occurs when push merge data
    shuffleClient.pushMergedData(shuffleId, mapId, encodedAttemptId);
    .....
  }
```
3.  cleanup DataPusher in  `finally`
``` java
// org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter#write{finally_block}
//   ->org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter#cleanupPusher
//     -> org.apache.celeborn.client.write.DataPusher#waitOnTermination
//       -> org.apache.celeborn.client.write.DataPusher#waitIdleQueueFullWithLock
private void waitIdleQueueFullWithLock() throws InterruptedException {
    try {
      while (idleQueue.remainingCapacity() > 0 // ** where npe occurs **
          && exceptionRef.get() == null
          && (pushThread != null && pushThread.isAlive())) {
        idleFull.await(WAIT_TIME_NANOS, TimeUnit.NANOSECONDS);
      }
    } catch (InterruptedException e) {
      logger.error("Thread interrupted while waitIdleQueueFullWithLock.", e);
      throw e;
    } finally {
      idleLock.unlock();
    }
  }

```

error stacktrace

![image](https://github.com/user-attachments/assets/b05b3483-cf90-4a7d-b4cd-114ea1621508)

### Why are the changes needed?

avoid potential NPE exception when clean up data pusher.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

Closes #2926 from Z1Wu/fix/npe_shuffle_write.

Authored-by: wuziyi <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
  • Loading branch information
wuziyi authored and pan3793 committed Dec 2, 2024
1 parent 93bdda5 commit c1175f8
Showing 1 changed file with 2 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ protected void pushData(PushTask task) throws IOException {

private void waitIdleQueueFullWithLock() throws InterruptedException {
try {
while (idleQueue.remainingCapacity() > 0
while (idleQueue != null
&& idleQueue.remainingCapacity() > 0
&& exceptionRef.get() == null
&& (pushThread != null && pushThread.isAlive())) {
idleFull.await(WAIT_TIME_NANOS, TimeUnit.NANOSECONDS);
Expand Down

0 comments on commit c1175f8

Please sign in to comment.