Skip to content

Commit

Permalink
[CELEBORN-1571] Fix flaky test - pushdata timeout will add to pushExc…
Browse files Browse the repository at this point in the history
…ludedWorker

### What changes were proposed in this pull request?

### Why are the changes needed?

Because the worker port is in use, the driver's worker status may change from shutdown status to unknown, causing the test to fail.

https://github.com/apache/celeborn/actions/runs/10465286274/job/28980278764
```java
- celeborn spark integration test - pushdata timeout will add to pushExcludedWorkers *** FAILED ***
  WORKER_UNKNOWN did not equal PUSH_DATA_TIMEOUT_PRIMARY, and WORKER_UNKNOWN did not equal PUSH_DATA_TIMEOUT_REPLICA (PushDataTimeoutTest.scala:150)
```

unit-tests.log
```
24/08/20 05:28:30,400 INFO [celeborn-dispatcher-7] Master: Receive ReportNodeFailure [
Host: localhost
RpcPort: 41487
PushPort: 34259
FetchPort: 45713
ReplicatePort: 35107
InternalPort: 41487

24/08/20 05:29:29,414 WARN [celeborn-client-lifecycle-manager-change-partition-executor-3] WorkerStatusTracker:
Reporting failed workers:
Host:localhost:RpcPort:42267:PushPort:43741:FetchPort:46483:ReplicatePort:43587   PUSH_DATA_TIMEOUT_PRIMARY   2024-08-19T22:29:29.414-0700
Current unknown workers:
Host:localhost:RpcPort:41487:PushPort:34259:FetchPort:45713:ReplicatePort:35107:InternalPort:41487   2024-08-19T22:29:29.108-0700
Current shutdown workers:
Host:localhost:RpcPort:41487:PushPort:34259:FetchPort:45713:ReplicatePort:35107:InternalPort:41487
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
GA

Closes apache#2697 from cxzl25/CELEBORN-1571.

Authored-by: sychen <[email protected]>
Signed-off-by: Shuang <[email protected]>
  • Loading branch information
cxzl25 authored and RexXiong committed Oct 11, 2024
1 parent 9f31087 commit 362865f
Showing 1 changed file with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,12 @@ class PushDataTimeoutTest extends AnyFunSuite
.getLifecycleManager
.workerStatusTracker
.excludedWorkers
.asScala.filter { case (_, (code, _)) =>
code != StatusCode.WORKER_UNKNOWN
}.toMap

assert(excludedWorkers.size() > 0)
excludedWorkers.asScala.foreach { case (_, (code, _)) =>
assert(excludedWorkers.size > 0)
excludedWorkers.foreach { case (_, (code, _)) =>
assert(code == StatusCode.PUSH_DATA_TIMEOUT_PRIMARY ||
code == StatusCode.PUSH_DATA_TIMEOUT_REPLICA)
}
Expand Down

0 comments on commit 362865f

Please sign in to comment.