Skip to content

Commit

Permalink
KE-42300 try something 3
Browse files Browse the repository at this point in the history
  • Loading branch information
Yu Gan committed Sep 25, 2023
1 parent 9ac0e0f commit 24b9a8d
Showing 1 changed file with 55 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Checksum;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -174,30 +178,60 @@ public void write(Iterator<Product2<K, V>> records) throws Exception {
// included in the shuffle write time.
writeMetrics.incWriteTime(System.nanoTime() - openStartTime);

final long starttttt = System.currentTimeMillis();
if (useMultiThreadedShuffle) {
final LinkedList<Future<Void>> writeFutures = new LinkedList<>();
try {
while (records.hasNext()) {
final Product2<K, V> record = records.next();
final K key = record._1();
Map.Entry<Integer, DiskBlockObjectWriter> entry =
partitionSlotWriters.get(partitioner.getPartition(key));
final int slotNum = entry.getKey();
final DiskBlockObjectWriter writer = entry.getValue();

writeFutures.add(SortShuffleManager.queueWriteTask(slotNum, () -> {
writer.write(key, record._2());
long inflight = 0;
final Map<Integer, LinkedList<Product2<K, V>>>
partitionRecords = new HashMap<>(numPartitions);
final BlockingQueue<Integer> finQueue = new LinkedBlockingQueue<>();
final BlockingQueue<Throwable> errQueue = new LinkedBlockingQueue<>();
while (records.hasNext()) {
inflight++;
final Product2<K, V> record = records.next();
final K key = record._1();
final int partition = partitioner.getPartition(key);
partitionRecords.putIfAbsent(partition, new LinkedList<>());
LinkedList<Product2<K, V>> recordBatch = partitionRecords.get(partition);
if (recordBatch.size() > 500) {
partitionRecords.remove(partition);
final LinkedList<Product2<K, V>> batch = recordBatch;
SortShuffleManager.queueWriteTask(partitionSlotWriters.get(partition).getKey(),
() -> {
try {
final DiskBlockObjectWriter writer = partitionSlotWriters.get(partition).getValue();
batch.forEach(r -> writer.write(r._1(), r._2()));
finQueue.put(batch.size());
} catch (Throwable t) {
errQueue.put(t);
}
return null;
}));
});
}
} finally {
try {
while (!writeFutures.isEmpty()) {
writeFutures.remove().get();
}
} finally {
writeFutures.forEach(f -> f.cancel(true));
}

partitionRecords.forEach((partition, batch) ->
SortShuffleManager.queueWriteTask(partitionSlotWriters.get(partition).getKey(),
() -> {
try {
final DiskBlockObjectWriter writer =
partitionSlotWriters.get(partition).getValue();
batch.forEach(r -> writer.write(r._1(), r._2()));
finQueue.put(batch.size());
} catch (Throwable t) {
errQueue.put(t);
}
return null;
}));

Integer size;
Throwable thr;
while (inflight > 0) {
thr = errQueue.poll(50, TimeUnit.MILLISECONDS);
if (thr != null) {
throw new IOException(thr);
}
size = finQueue.poll(50, TimeUnit.MILLISECONDS);
if(size != null) {
inflight -= size;
}
}
} else {
Expand All @@ -208,8 +242,6 @@ public void write(Iterator<Product2<K, V>> records) throws Exception {
}
}

final long timeSpanned = System.currentTimeMillis() - starttttt;

for (int i = 0; i < numPartitions; i++) {
try (DiskBlockObjectWriter writer = partitionSlotWriters.get(i).getValue()) {
partitionWriterSegments[i] = writer.commitAndGet();
Expand Down

0 comments on commit 24b9a8d

Please sign in to comment.