Skip to content

Commit

Permalink
HDDS-8276. EC file checksum calculation fails with topology-aware rea…
Browse files Browse the repository at this point in the history
…d enabled (apache#4495)
  • Loading branch information
adoroszlai authored Mar 29, 2023
1 parent b5137a0 commit 4cb0e68
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected void checksumBlocks() throws IOException {
if (!checksumBlock(keyLocationInfo)) {
throw new PathIOException(getSrc(),
"Fail to get block checksum for " + keyLocationInfo
+ ", checksum combine mode : {}" + getCombineMode());
+ ", checksum combine mode: " + getCombineMode());
}

currentLength += keyLocationInfo.getLength();
Expand Down Expand Up @@ -173,11 +173,16 @@ private List<ContainerProtos.ChunkInfo> getChunkInfos(OmKeyLocationInfo
nodes.add(dn);
}
}
List<DatanodeDetails> nodesInOrder =
new ArrayList<>(pipeline.getNodesInOrder());
nodesInOrder.retainAll(nodes);

pipeline = Pipeline.newBuilder(pipeline)
.setReplicationConfig(StandaloneReplicationConfig
.getInstance(HddsProtos.ReplicationFactor.THREE))
.setNodes(nodes)
.build();
pipeline.setNodesInOrder(nodesInOrder);

List<ContainerProtos.ChunkInfo> chunks;
XceiverClientSpi xceiverClientSpi = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ protected void checksumBlocks() throws IOException {
if (!checksumBlock(keyLocationInfo)) {
throw new PathIOException(getSrc(),
"Fail to get block checksum for " + keyLocationInfo
+ ", checksum combine mode : {}" + getCombineMode());
+ ", checksum combine mode: " + getCombineMode());
}

currentLength += keyLocationInfo.getLength();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hadoop.fs.ozone;

import com.google.common.collect.ImmutableList;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -31,38 +30,47 @@
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.TestDataUtil;
import org.apache.hadoop.ozone.client.BucketArgs;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.util.StringUtils;
import org.junit.Rule;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.rules.Timeout;

import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.stream.DoubleStream;
import java.util.stream.Stream;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY;
import static org.apache.hadoop.ozone.TestDataUtil.createBucket;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.junit.jupiter.api.Assertions.assertEquals;

/**
* Test FileChecksum API.
*/
@Timeout(300)
public class TestOzoneFileChecksum {

@Rule
public Timeout timeout = Timeout.seconds(100);
private static final boolean[] TOPOLOGY_AWARENESS = new boolean[] {
true, false
};

private static final int[] DATA_SIZES = DoubleStream.of(0.5, 1, 1.5, 2, 7, 8)
.mapToInt(mb -> (int) (1024 * 1024 * mb))
.toArray();

private OzoneConfiguration conf;
private MiniOzoneCluster cluster = null;
Expand All @@ -73,7 +81,7 @@ public class TestOzoneFileChecksum {
private OzoneClient client;

@BeforeEach
public void setup() throws IOException,
void setup() throws IOException,
InterruptedException, TimeoutException {
conf = new OzoneConfiguration();
cluster = MiniOzoneCluster.newBuilder(conf)
Expand All @@ -83,99 +91,110 @@ public void setup() throws IOException,
client = cluster.newClient();
rootPath = String.format("%s://%s/",
OzoneConsts.OZONE_OFS_URI_SCHEME, conf.get(OZONE_OM_ADDRESS_KEY));
String disableCache = String.format("fs.%s.impl.disable.cache",
OzoneConsts.OZONE_OFS_URI_SCHEME);
conf.setBoolean(disableCache, true);
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
fs = FileSystem.get(conf);
ofs = (RootedOzoneFileSystem) fs;
adapter = (BasicRootedOzoneClientAdapterImpl) ofs.getAdapter();
}

@AfterEach
public void teardown() {
IOUtils.closeQuietly(client);
void teardown() {
IOUtils.closeQuietly(client, fs);
if (cluster != null) {
cluster.shutdown();
}
IOUtils.closeQuietly(fs);
}

/**
* Test EC checksum with Replicated checksum.
*/
@ParameterizedTest
@MethodSource("dataSizeMissingIndexes")
public void testEcFileChecksum(double size, List<Integer> missingIndexes)
throws IOException {
@MethodSource("missingIndexes")
void testEcFileChecksum(List<Integer> missingIndexes) throws IOException {

String volumeName = UUID.randomUUID().toString();
String legacyBucket = UUID.randomUUID().toString();
String ecBucketName = UUID.randomUUID().toString();

// Size in multiples of MB
int dataLen = (int) (1024 * 1024 * size);
byte[] data = RandomStringUtils.randomAlphabetic(dataLen)
.getBytes(UTF_8);
client.getObjectStore().createVolume(volumeName);

BucketArgs omBucketArgs1 = BucketArgs.newBuilder()
BucketArgs.Builder bucketArgs = BucketArgs.newBuilder()
.setStorageType(StorageType.DISK)
.setBucketLayout(BucketLayout.LEGACY)
.build();
.setBucketLayout(BucketLayout.LEGACY);

String vol2 = UUID.randomUUID().toString();
String legacyBucket = UUID.randomUUID().toString();
TestDataUtil.createVolumeAndBucket(client, vol2,
legacyBucket, omBucketArgs1);
createBucket(client, volumeName, bucketArgs.build(), legacyBucket);

try (OzoneFSOutputStream file = adapter.createFile(vol2 +
"/" + legacyBucket + "/test", (short) 3, true, false)) {
file.write(data);
}
bucketArgs.setDefaultReplicationConfig(
new DefaultReplicationConfig(
new ECReplicationConfig("RS-3-2-1024k")));

Path parent1 = new Path("/" + vol2 + "/" + legacyBucket + "/");
Path replicatedKey = new Path(parent1, "test");
FileChecksum replicatedChecksum = fs.getFileChecksum(replicatedKey);
String replicatedChecksumString = StringUtils.byteToHexString(
replicatedChecksum.getBytes(), 0, replicatedChecksum.getLength());
final OzoneBucket ecBucket =
createBucket(client, volumeName, bucketArgs.build(), ecBucketName);

BucketArgs omBucketArgs = BucketArgs.newBuilder()
.setStorageType(StorageType.DISK)
.setBucketLayout(BucketLayout.LEGACY)
.setDefaultReplicationConfig(
new DefaultReplicationConfig(
new ECReplicationConfig("RS-3-2-1024k")))
.build();
assertEquals(ReplicationType.EC.name(),
ecBucket.getReplicationConfig().getReplicationType().name());

Map<Integer, String> replicatedChecksums = new HashMap<>();

String vol = UUID.randomUUID().toString();
String ecBucket = UUID.randomUUID().toString();
final OzoneBucket bucket101 = TestDataUtil
.createVolumeAndBucket(client, vol, ecBucket,
omBucketArgs);
for (int dataLen : DATA_SIZES) {
byte[] data = randomAlphabetic(dataLen).getBytes(UTF_8);

Assertions.assertEquals(ReplicationType.EC.name(),
bucket101.getReplicationConfig().getReplicationType().name());
try (OutputStream file = adapter.createFile(volumeName + "/"
+ legacyBucket + "/test" + dataLen, (short) 3, true, false)) {
file.write(data);
}

try (OzoneFSOutputStream file = adapter
.createFile(vol + "/" + ecBucket + "/test", (short) 3, true, false)) {
file.write(data);
Path parent1 = new Path("/" + volumeName + "/" + legacyBucket + "/");
Path replicatedKey = new Path(parent1, "test" + dataLen);
FileChecksum replicatedChecksum = fs.getFileChecksum(replicatedKey);
String replicatedChecksumString = StringUtils.byteToHexString(
replicatedChecksum.getBytes(), 0, replicatedChecksum.getLength());
replicatedChecksums.put(dataLen, replicatedChecksumString);

try (OutputStream file = adapter.createFile(volumeName + "/"
+ ecBucketName + "/test" + dataLen, (short) 3, true, false)) {
file.write(data);
}
}

// Fail DataNodes
for (int index: missingIndexes) {
cluster.shutdownHddsDatanode(index);
}

// Compute checksum after failed DNs
Path parent = new Path("/" + vol + "/" + ecBucket + "/");
Path ecKey = new Path(parent, "test");
FileChecksum ecChecksum = fs.getFileChecksum(ecKey);
String ecChecksumString = StringUtils.byteToHexString(
ecChecksum.getBytes(), 0, ecChecksum.getLength());

Assertions.assertEquals(replicatedChecksumString, ecChecksumString);
for (boolean topologyAware : TOPOLOGY_AWARENESS) {
OzoneConfiguration clientConf = new OzoneConfiguration(conf);
clientConf.setBoolean(OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
topologyAware);
try (FileSystem fsForRead = FileSystem.get(clientConf)) {
for (int dataLen : DATA_SIZES) {
// Compute checksum after failed DNs
Path parent = new Path("/" + volumeName + "/" + ecBucketName + "/");
Path ecKey = new Path(parent, "test" + dataLen);
FileChecksum ecChecksum = fsForRead.getFileChecksum(ecKey);
String ecChecksumString = StringUtils.byteToHexString(
ecChecksum.getBytes(), 0, ecChecksum.getLength());

assertEquals(replicatedChecksums.get(dataLen), ecChecksumString,
() -> "Checksum mismatch for data size: " + dataLen +
", topologyAware: " + topologyAware +
", failed nodes: " + missingIndexes);
}
}
}
}

public static Stream<Arguments> dataSizeMissingIndexes() {
static Stream<List<Integer>> missingIndexes() {
return Stream.of(
arguments(0.5, ImmutableList.of(0, 1)),
arguments(1, ImmutableList.of(1, 2)),
arguments(1.5, ImmutableList.of(2, 3)),
arguments(2, ImmutableList.of(3, 4)),
arguments(7, ImmutableList.of(0, 3)),
arguments(8, ImmutableList.of(0, 4)));
ImmutableList.of(0, 1),
ImmutableList.of(1, 2),
ImmutableList.of(2, 3),
ImmutableList.of(3, 4),
ImmutableList.of(0, 3),
ImmutableList.of(0, 4)
);
}
}

0 comments on commit 4cb0e68

Please sign in to comment.