Skip to content

Commit

Permalink
[FLINK-37182][tests] Save partitionedFile as a class member in the Pa…
Browse files Browse the repository at this point in the history
…rtitionedFileWriteReadTest.
  • Loading branch information
aoli-al authored and JunRuiLee committed Jan 22, 2025
1 parent 085f37b commit abbfc66
Showing 1 changed file with 72 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.util.IOUtils;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -58,6 +59,15 @@
*/
class PartitionedFileWriteReadTest {
private @TempDir Path tempPath;
// We need a reference to the PartitionedFile to call deleteQuietly() after the test
private PartitionedFile partitionedFile;

@AfterEach
void tearDown() {
if (partitionedFile != null) {
partitionedFile.deleteQuietly();
}
}

@Test
void testWriteAndReadPartitionedFile() throws Exception {
Expand All @@ -77,18 +87,17 @@ void testWriteAndReadPartitionedFile() throws Exception {
}

int[] writeOrder = DataBufferTest.getRandomSubpartitionOrder(numSubpartitions);
PartitionedFile partitionedFile =
createPartitionedFile(
numSubpartitions,
bufferSize,
numBuffers,
numRegions,
buffersWritten,
regionStat,
createPartitionedFileWriter(numSubpartitions, writeOrder),
subpartitionIndex -> subpartitionIndex,
random.nextBoolean(),
writeOrder);
createPartitionedFile(
numSubpartitions,
bufferSize,
numBuffers,
numRegions,
buffersWritten,
regionStat,
createPartitionedFileWriter(numSubpartitions, writeOrder),
subpartitionIndex -> subpartitionIndex,
random.nextBoolean(),
writeOrder);

FileChannel dataFileChannel = openFileChannel(partitionedFile.getDataFilePath());
FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath());
Expand All @@ -111,7 +120,6 @@ void testWriteAndReadPartitionedFile() throws Exception {
}
}
IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
partitionedFile.deleteQuietly();

for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
assertThat(buffersWritten[subpartition]).hasSameSizeAs(buffersRead[subpartition]);
Expand Down Expand Up @@ -144,31 +152,28 @@ void testComputeReadablePosition(boolean randomSubpartitionOrder, boolean broadc
randomSubpartitionOrder
? DataBufferTest.getRandomSubpartitionOrder(numSubpartitions)
: new int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
PartitionedFile nonBroadcastPartitionedFile =
createPartitionedFile(
numSubpartitions,
bufferSize,
numBuffers,
numRegions,
buffersWritten,
regionStat,
createPartitionedFileWriter(numSubpartitions, writeOrder),
subpartitionIndex -> subpartitionIndex,
broadcastRegion,
writeOrder);

FileChannel dataFileChannel =
openFileChannel(nonBroadcastPartitionedFile.getDataFilePath());
FileChannel indexFileChannel =
openFileChannel(nonBroadcastPartitionedFile.getIndexFilePath());
createPartitionedFile(
numSubpartitions,
bufferSize,
numBuffers,
numRegions,
buffersWritten,
regionStat,
createPartitionedFileWriter(numSubpartitions, writeOrder),
subpartitionIndex -> subpartitionIndex,
broadcastRegion,
writeOrder);

FileChannel dataFileChannel = openFileChannel(partitionedFile.getDataFilePath());
FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath());

verifyReadablePosition(
0,
numSubpartitions - 1,
writeOrder[0],
dataFileChannel,
indexFileChannel,
nonBroadcastPartitionedFile,
partitionedFile,
regionStat,
broadcastRegion);

Expand All @@ -178,7 +183,7 @@ void testComputeReadablePosition(boolean randomSubpartitionOrder, boolean broadc
writeOrder[0],
dataFileChannel,
indexFileChannel,
nonBroadcastPartitionedFile,
partitionedFile,
regionStat,
broadcastRegion);

Expand All @@ -188,10 +193,9 @@ void testComputeReadablePosition(boolean randomSubpartitionOrder, boolean broadc
writeOrder[0],
dataFileChannel,
indexFileChannel,
nonBroadcastPartitionedFile,
partitionedFile,
regionStat,
broadcastRegion);
nonBroadcastPartitionedFile.deleteQuietly();
}

private void verifyReadablePosition(
Expand Down Expand Up @@ -280,18 +284,17 @@ void testWriteAndReadPartitionedFileForSubpartitionRange() throws Exception {
}

int[] writeOrder = DataBufferTest.getRandomSubpartitionOrder(numSubpartitions);
PartitionedFile partitionedFile =
createPartitionedFile(
numSubpartitions,
bufferSize,
numBuffers,
numRegions,
buffersWritten,
regionStat,
createPartitionedFileWriter(numSubpartitions, writeOrder),
subpartitionIndex -> subpartitionIndex / 2,
false,
writeOrder);
createPartitionedFile(
numSubpartitions,
bufferSize,
numBuffers,
numRegions,
buffersWritten,
regionStat,
createPartitionedFileWriter(numSubpartitions, writeOrder),
subpartitionIndex -> subpartitionIndex / 2,
false,
writeOrder);

FileChannel dataFileChannel = openFileChannel(partitionedFile.getDataFilePath());
FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath());
Expand All @@ -315,7 +318,6 @@ void testWriteAndReadPartitionedFileForSubpartitionRange() throws Exception {
}
}
IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
partitionedFile.deleteQuietly();

for (int subpartition = 0; subpartition < numSubpartitions; subpartition += 2) {
assertThat(buffersWritten[subpartition / 2])
Expand All @@ -329,7 +331,7 @@ void testWriteAndReadPartitionedFileForSubpartitionRange() throws Exception {
}
}

private PartitionedFile createPartitionedFile(
private void createPartitionedFile(
int numSubpartitions,
int bufferSize,
int numBuffers,
Expand Down Expand Up @@ -391,7 +393,7 @@ private PartitionedFile createPartitionedFile(
}
}
}
return fileWriter.finish();
partitionedFile = fileWriter.finish();
}

private static long getTotalBytes(List<BufferWithSubpartition> bufferWithSubpartitions) {
Expand Down Expand Up @@ -459,7 +461,7 @@ void testWriteAndReadWithEmptySubpartition() throws Exception {
}
}
}
PartitionedFile partitionedFile = fileWriter.finish();
partitionedFile = fileWriter.finish();

FileChannel dataFileChannel = openFileChannel(partitionedFile.getDataFilePath());
FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath());
Expand All @@ -486,7 +488,6 @@ void testWriteAndReadWithEmptySubpartition() throws Exception {
assertThat(subpartitionBuffers[subpartition]).isEmpty();
}
IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
partitionedFile.deleteQuietly();
}

@Test
Expand Down Expand Up @@ -516,7 +517,7 @@ void testWriteAndReadWithEmptySubpartitionForMultipleSubpartitions() throws Exce
}
}
}
PartitionedFile partitionedFile = fileWriter.finish();
partitionedFile = fileWriter.finish();

FileChannel dataFileChannel = openFileChannel(partitionedFile.getDataFilePath());
FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath());
Expand Down Expand Up @@ -549,7 +550,6 @@ void testWriteAndReadWithEmptySubpartitionForMultipleSubpartitions() throws Exce
}
}
IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
partitionedFile.deleteQuietly();
}

private void assertBufferEquals(Buffer expected, Buffer actual) {
Expand Down Expand Up @@ -591,7 +591,7 @@ void testNotWriteDataOfTheSameSubpartitionTogether() throws Exception {
.isInstanceOf(IllegalStateException.class);

} finally {
partitionedFileWriter.finish().deleteQuietly();
partitionedFile = partitionedFileWriter.finish();
}
}

Expand Down Expand Up @@ -621,7 +621,7 @@ void testReadEmptyPartitionedFile() throws Exception {
int bufferSize = 1024;
int numSubpartitions = 2;
int targetSubpartition = 1;
PartitionedFile partitionedFile = createEmptyPartitionedFile();
createEmptyPartitionedFile();

List<Buffer>[] buffersRead = new List[numSubpartitions];
for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
Expand All @@ -646,7 +646,6 @@ void testReadEmptyPartitionedFile() throws Exception {
buffer -> addReadBuffer(buffer, buffersRead[targetSubpartition]));
assertThat(buffersRead[targetSubpartition]).isEmpty();
IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
partitionedFile.deleteQuietly();
}

/**
Expand All @@ -671,22 +670,21 @@ void testMultipleThreadGetIndexEntry() throws Exception {
}

int[] writeOrder = DataBufferTest.getRandomSubpartitionOrder(numSubpartitions);
PartitionedFile partitionedFile =
createPartitionedFile(
createPartitionedFile(
numSubpartitions,
bufferSize,
numBuffers,
numRegions,
buffersWritten,
regionStat,
createPartitionedFileWriter(
numSubpartitions,
bufferSize,
numBuffers,
numRegions,
buffersWritten,
regionStat,
createPartitionedFileWriter(
numSubpartitions,
PartitionedFile.INDEX_ENTRY_SIZE * numSubpartitions,
PartitionedFile.INDEX_ENTRY_SIZE * numSubpartitions,
writeOrder),
subpartitionIndex -> subpartitionIndex,
random.nextBoolean(),
writeOrder);
PartitionedFile.INDEX_ENTRY_SIZE * numSubpartitions,
PartitionedFile.INDEX_ENTRY_SIZE * numSubpartitions,
writeOrder),
subpartitionIndex -> subpartitionIndex,
random.nextBoolean(),
writeOrder);

FileChannel dataFileChannel = openFileChannel(partitionedFile.getDataFilePath());
FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath());
Expand Down Expand Up @@ -721,7 +719,6 @@ public void go() throws Exception {
}

IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
partitionedFile.deleteQuietly();
}

private FileChannel openFileChannel(Path path) throws IOException {
Expand All @@ -733,9 +730,9 @@ private List<BufferWithSubpartition> getBufferWithSubpartitions(
return Collections.singletonList(new BufferWithSubpartition(buffer, subpartitionIndex));
}

private PartitionedFile createEmptyPartitionedFile() throws IOException {
private void createEmptyPartitionedFile() throws IOException {
PartitionedFileWriter partitionedFileWriter = createPartitionedFileWriter(2, new int[0]);
return partitionedFileWriter.finish();
partitionedFile = partitionedFileWriter.finish();
}

private PartitionedFileWriter createPartitionedFileWriter(
Expand All @@ -762,7 +759,7 @@ private PartitionedFileWriter createPartitionedFileWriter(

private PartitionedFileWriter createAndFinishPartitionedFileWriter() throws IOException {
PartitionedFileWriter partitionedFileWriter = createPartitionedFileWriter(1, new int[0]);
partitionedFileWriter.finish().deleteQuietly();
partitionedFile = partitionedFileWriter.finish();
return partitionedFileWriter;
}

Expand Down

0 comments on commit abbfc66

Please sign in to comment.