Skip to content

Commit

Permalink
unordered read
Browse files Browse the repository at this point in the history
author: gloway
Signed-off-by: mingji <[email protected]>
  • Loading branch information
FMX committed Sep 4, 2024
1 parent 23ab184 commit c51f2eb
Show file tree
Hide file tree
Showing 5 changed files with 1,129 additions and 592 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.tez.runtime.library.common.shuffle.impl;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.annotations.VisibleForTesting;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.FetchResult;
Expand All @@ -34,40 +34,106 @@
import org.apache.celeborn.common.exception.CelebornIOException;

public class CelebornFetcher extends CallableWithNdc<FetchResult> {

private static final Logger LOG = LoggerFactory.getLogger(CelebornFetcher.class);

private final FetcherCallback fetcherCallback;

private final FetchedInputAllocator inputManager;

private long copyBlockCount = 0;

private volatile boolean stopped = false;

private final CelebornTezReader celebornTezReader;
private long readTime = 0;
private long decompressTime = 0;
private long serializeTime = 0;
private long waitTime = 0;
private long copyTime = 0; // the sum of readTime + decompressTime + serializeTime + waitTime
private static int uniqueMapId = 0;
CelebornTezReader reader;
private final AtomicBoolean isShutDown = new AtomicBoolean(false);
final int partition;

public CelebornFetcher(
FetchedInputAllocator inputManager,
private boolean hasPendingData = false;
private long startWait;
private int waitCount = 0;
private byte[] shuffleData = null;

CelebornFetcher(
FetcherCallback fetcherCallback,
CelebornTezReader reader,
int partition) {
FetchedInputAllocator inputManager,
CelebornTezReader celebornTezReader) {
this.fetcherCallback = fetcherCallback;
this.inputManager = inputManager;
this.partition = partition;
this.reader = reader;
this.celebornTezReader = celebornTezReader;
}

public void fetchAllBlocksInOnPartition() throws IOException {
try {
byte[] data = reader.fetchData();
long blockStartFetchTime = System.currentTimeMillis();
issueMapOutputMerge(data.length, blockStartFetchTime, data);
} catch (Exception e) {
LOG.error("Failed to fetchAllCelebornBlocks.", e);
throw e;
public void fetchAllRssBlocks() throws IOException {
while (!stopped) {
try {
copyFromRssServer();
} catch (Exception e) {
LOG.error("Failed to fetchAllRssBlocks.", e);
throw e;
}
}
}

private boolean issueMapOutputMerge(int compressedLength, long blockStartFetch, byte[] data)
throws IOException {
@VisibleForTesting
public void copyFromRssServer() throws IOException {
long blockStartFetch = 0;
// fetch a block
if (!hasPendingData) {
final long startFetch = System.currentTimeMillis();
blockStartFetch = System.currentTimeMillis();
shuffleData = celebornTezReader.getShuffleBlock();
long fetchDuration = System.currentTimeMillis() - startFetch;
readTime += fetchDuration;
}

if (shuffleData != null) {
// start to merge
final long startSerialization = System.currentTimeMillis();
if (issueMapOutputMerge(blockStartFetch)) {
long serializationDuration = System.currentTimeMillis() - startSerialization;
serializeTime += serializationDuration;
// if reserve successes, reset status for next fetch
if (hasPendingData) {
waitTime += System.currentTimeMillis() - startWait;
}
hasPendingData = false;
shuffleData = null;
} else {
LOG.info("UncompressedData is null");
// if reserve fail, return and wait
waitCount++;
startWait = System.currentTimeMillis();
return;
}

// update some status
copyBlockCount++;
copyTime = readTime + decompressTime + serializeTime + waitTime;
} else {
LOG.info("UncompressedData is null");
// finish reading data, close related reader and check data consistent
celebornTezReader.close();
LOG.info(
"Reduce task partition:"
+ celebornTezReader.getPartitionId()
+ " read block cnt: "
+ copyBlockCount
+ " cost "
+ readTime
+ " ms to fetch and "
+ serializeTime
+ " ms to serialize and "
+ waitTime
+ " ms to wait resource, total copy time: "
+ copyTime);
LOG.info("Stop fetcher");
stopFetch();
}
}

private boolean issueMapOutputMerge(long blockStartFetch) throws IOException {
// Allocate a MapOutput (either in-memory or on-disk) to put uncompressed block
// In Rss, a MapOutput is sent as multiple blocks, so the reducer needs to
// treat each "block" as a faked "mapout".
Expand All @@ -78,28 +144,29 @@ private boolean issueMapOutputMerge(int compressedLength, long blockStartFetch,

try {
fetchedInput =
inputManager.allocate(compressedLength, compressedLength, uniqueInputAttemptIdentifier);
inputManager.allocate(
shuffleData.length, shuffleData.length, uniqueInputAttemptIdentifier);
} catch (IOException ioe) {
// kill this reduce attempt
throw ioe;
}

// Allocated space and then write data to mapOutput
try {
CelebornTezBypassWriter.write(fetchedInput, data);
CelebornTezBypassWriter.write(fetchedInput, shuffleData);
// let the merger knows this block is ready for merging
fetcherCallback.fetchSucceeded(
null,
uniqueInputAttemptIdentifier,
fetchedInput,
compressedLength,
compressedLength,
shuffleData.length,
shuffleData.length,
System.currentTimeMillis() - blockStartFetch);
} catch (Throwable t) {
LOG.error("Failed to write fetchedInput.", t);
throw new CelebornIOException(
"Partition: "
+ partition
+ celebornTezReader.getPartitionId()
+ " cannot write block to "
+ fetchedInput.getClass().getSimpleName()
+ " due to: "
Expand All @@ -112,21 +179,25 @@ private InputAttemptIdentifier getNextUniqueInputAttemptIdentifier() {
return new InputAttemptIdentifier(uniqueMapId++, 0);
}

private void stopFetch() {
stopped = true;
}

@VisibleForTesting
public int getRetryCount() {
return waitCount;
}

@Override
protected FetchResult callInternal() throws Exception {
reader.init();
if (!isShutDown.get()) {
fetchAllBlocksInOnPartition();
isShutDown.getAndSet(true);
}
celebornTezReader.init();
fetchAllRssBlocks();
return null;
}

public void close() throws IOException {
isShutDown.getAndSet(true);
}
public void shutdown() {}

public int getPartition() {
return partition;
public Integer getPartitionId() {
return celebornTezReader.getPartitionId();
}
}
Loading

0 comments on commit c51f2eb

Please sign in to comment.