Skip to content

Commit

Permalink
Add UT; minor refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
smengcl committed Jan 9, 2024
1 parent 9a5b8a8 commit 96d1747
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,14 @@ List<OmBucketInfo> listBuckets(String volumeName,

ServiceInfoEx getServiceInfo() throws IOException;

// TODO: Rename arg list
/**
* List open files in OM.
* @param path root "/", path to a bucket, key path, or key prefix
* @param maxKeys Limit the number of keys that can be returned in this batch.
* @param contToken Continuation token.
* @return ListOpenFilesResult
* @throws IOException
*/
ListOpenFilesResult listOpenFiles(String path, long maxKeys, String contToken)
throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,14 +586,15 @@ public void testOzoneAdminCmdListOpenFiles()
streams[i].write(1);
}

String path = "/" + volumeName + "/" + bucketName;
try {
// Wait for flush to DB table
// Wait for DB flush
cluster.getOzoneManager().awaitDoubleBufferFlush();

String[] args = new String[] {"om", "listopenfiles",
String[] args = new String[] {"om", "lof",
"-id", omServiceId,
"-l", String.valueOf(numKeys + 1), // pagination
"-p", "/volumelof/buck1"};
"-p", path};
// Run listopenfiles
execute(ozoneAdminShell, args);
String cmdRes = getStdOut();
Expand All @@ -603,10 +604,10 @@ public void testOzoneAdminCmdListOpenFiles()
}

// Try pagination
args = new String[] {"om", "listopenfiles",
args = new String[] {"om", "lof",
"-id", omServiceId,
"-l", String.valueOf(pageSize), // pagination
"-p", "/volumelof/buck1"};
"-p", path};
execute(ozoneAdminShell, args);
cmdRes = getStdOut();

Expand All @@ -627,10 +628,10 @@ public void testOzoneAdminCmdListOpenFiles()
String contToken =
nextCmd.substring(nextCmd.lastIndexOf(kw) + kw.length());

args = new String[] {"om", "listopenfiles",
args = new String[] {"om", "lof",
"-id", omServiceId,
"-l", String.valueOf(pageSize), // pagination
"-p", "/volumelof/buck1",
"-p", path,
"-s", contToken};
execute(ozoneAdminShell, args);
cmdRes = getStdOut();
Expand All @@ -646,6 +647,8 @@ public void testOzoneAdminCmdListOpenFiles()

// hsync last key
streams[numKeys - 1].hsync();
// Wait for flush
cluster.getOzoneManager().awaitDoubleBufferFlush();

execute(ozoneAdminShell, args);
cmdRes = getStdOut();
Expand All @@ -660,7 +663,6 @@ public void testOzoneAdminCmdListOpenFiles()
}
}

// TODO: In UT, test with OBS/LEGACY bucket
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1536,25 +1536,15 @@ message ListOpenFilesRequest {
}

message ListOpenFilesResponse {
// size of openKeyTable (may not be reliable due to how RocksDB returns count)
optional uint64 globalTotal = 1;
// size of openKeyTable and openFileTable combined
optional uint64 globalTotal = 1; // TODO: Rename to totalOpenKeyCount
// if true, indicates that there are remaining entries under the path
optional bool hasMore = 2;
// result
repeated uint64 clientID = 3;
repeated KeyInfo keyInfo = 4;
// row count with the path prefix
// optional uint64 totalUnderPath = 2;
// remaining rows that are not returned after this batch of result
// optional uint64 remainingUnderPath = 3;
// repeated OpenKeyInfoWithClientID openFiles = 4;
}

//message OpenKeyInfoWithClientID {
// optional uint64 clientID = 1;
// optional KeyInfo keyInfo = 2;
//}

message ServicePort {
enum Type {
RPC = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,12 @@ public interface OMMetadataManager extends DBStoreHAManager {
* @param key - key name
* @return DB key as String.
*/

String getOzoneKey(String volume, String bucket, String key);

/**
* Get DB key for a key or prefix in an FSO bucket given existing
* volume and bucket names.
*/
String getOzoneKeyFSO(String volumeName,
String bucketName,
String keyPrefix)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,18 @@ public String getOzoneKey(String volume, String bucket, String key) {
return builder.toString();
}

@Override
public String getOzoneKeyFSO(String volumeName,
String bucketName,
String keyPrefix)
throws IOException {
final long volumeId = getVolumeId(volumeName);
final long bucketId = getBucketId(volumeName, bucketName);
// FSO keyPrefix could look like: -9223372036854774527/key1
return getOzoneKey(Long.toString(volumeId),
Long.toString(bucketId), keyPrefix);
}

@Override
public String getOzoneDirKey(String volume, String bucket, String key) {
key = OzoneFSUtils.addTrailingSlashIfNeeded(key);
Expand Down Expand Up @@ -1245,7 +1257,7 @@ public ListOpenFilesResult listOpenFiles(BucketLayout bucketLayout,
return new ListOpenFilesResult(
openKeySessionList,
hasMore,
getOpenKeyCount());
getTotalOpenKeyCount());
}

/**
Expand All @@ -1257,6 +1269,7 @@ private void checkAndUpdateKeyHsyncStatus(OmKeyInfo omKeyInfo,
KeyValue<String, OmKeyInfo>>
keyIter)
throws IOException {
// TODO: do keyTable.get() directly
KeyValue<String, OmKeyInfo> kv = keyIter.seek(dbKey);
if (kv != null && kv.getKey().equals(dbKey)) {
// The same key in OpenKeyTable also exists in KeyTable, indicating
Expand Down Expand Up @@ -1826,7 +1839,7 @@ private boolean isOpenMultipartKey(OmKeyInfo openKeyInfo, String openDbKey)
}

@Override
public long getOpenKeyCount() throws IOException {
public long getTotalOpenKeyCount() throws IOException {
// Get an estimated key count of OpenKeyTable + OpenFileTable
return openKeyTable.getEstimatedKeyCount()
+ openFileTable.getEstimatedKeyCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3200,7 +3200,7 @@ public ListOpenFilesResult listOpenFiles(String path,
metrics.incNumListOpenFiles();
checkAdminUserPrivilege("list open files.");

// Mark as final to make sure they are assigned once and only once in
// Using final to make sure they are assigned once and only once in
// every branch.
final String dbOpenKeyPrefix, dbContTokenPrefix;
final String volumeName, bucketName;
Expand Down Expand Up @@ -3249,12 +3249,13 @@ public ListOpenFilesResult listOpenFiles(String path,
bucketLayout = bucketInfo.getBucketLayout();
switch (bucketLayout) {
case FILE_SYSTEM_OPTIMIZED:
dbOpenKeyPrefix = getDbKeyFSO(volumeName, bucketName, keyPrefix);
dbOpenKeyPrefix = metadataManager.getOzoneKeyFSO(
volumeName, bucketName, keyPrefix);
break;
case OBJECT_STORE:
case LEGACY:
dbOpenKeyPrefix =
metadataManager.getOzoneKey(volumeName, bucketName, keyPrefix);
dbOpenKeyPrefix = metadataManager.getOzoneKey(
volumeName, bucketName, keyPrefix);
break;
default:
metrics.incNumListOpenFilesFails();
Expand Down Expand Up @@ -3291,8 +3292,8 @@ public ListOpenFilesResult listOpenFiles(String path,
if (bucketLayout.equals(BucketLayout.FILE_SYSTEM_OPTIMIZED)) {
final String ctKeyPrefix = tokenizer.hasMoreTokens() ?
tokenizer.nextToken("").substring(1) : "";
dbContTokenPrefix =
getDbKeyFSO(ctVolumeName, ctBucketName, ctKeyPrefix);
dbContTokenPrefix = metadataManager.getOzoneKeyFSO(
ctVolumeName, ctBucketName, ctKeyPrefix);
} else {
dbContTokenPrefix = contToken;
}
Expand All @@ -3304,21 +3305,6 @@ public ListOpenFilesResult listOpenFiles(String path,
!StringUtils.isEmpty(contToken), dbContTokenPrefix);
}

/**
* For FSO buckets, retrieve object ID for volume name and bucket name,
* and return the dbKey prefixed by volume and bucket object ID.
*/
private String getDbKeyFSO(String volumeName,
String bucketName,
String keyPrefix)
throws IOException {
final long volumeId = metadataManager.getVolumeId(volumeName);
final long bucketId = metadataManager.getBucketId(volumeName, bucketName);
// FSO keyPrefix could look like: -9223372036854774527/key1
return metadataManager.getOzoneKey(
Long.toString(volumeId), Long.toString(bucketId), keyPrefix);
}

@Override
public void transferLeadership(String newLeaderId)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
Expand All @@ -49,6 +51,7 @@
import java.io.File;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
Expand All @@ -61,6 +64,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MPU_EXPIRE_THRESHOLD;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MPU_EXPIRE_THRESHOLD_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD;
Expand Down Expand Up @@ -558,6 +562,141 @@ public void testListKeysWithFewDeleteEntriesInCache() throws Exception {

}

@Test
public void testListOpenFilesFSO() throws Exception {
testListOpenFiles(BucketLayout.FILE_SYSTEM_OPTIMIZED);
}

@Test
public void testListOpenFilesOBS() throws Exception {
testListOpenFiles(BucketLayout.OBJECT_STORE);
}

@Test
public void testListOpenFilesLegacy() throws Exception {
// OBS and LEGACY should share the same internal structure for the most part
// still, testing both here for the sake of completeness
testListOpenFiles(BucketLayout.LEGACY);
}

/**
* Tests inner impl of listOpenFiles with different bucket types with and
* without pagination. NOTE: This UT does NOT test hsync in this since hsync
* status check is done purely on the client side.
* @param bucketLayout BucketLayout
*/
public void testListOpenFiles(BucketLayout bucketLayout) throws Exception {
final long clientID = 1000L;

String volumeName = "volume-lof";
String bucketName = "bucket-" + bucketLayout.name().toLowerCase();
String keyPrefix = "key";

OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
omMetadataManager, bucketLayout);

long volumeId = -1L, bucketId = -1L;
if (bucketLayout.isFileSystemOptimized()) {
volumeId = omMetadataManager.getVolumeId(volumeName);
bucketId = omMetadataManager.getBucketId(volumeName, bucketName);
}

int numOpenKeys = 3;
List<String> openKeys = new ArrayList<>();
for (int i = 0; i < numOpenKeys; i++) {
final OmKeyInfo keyInfo = OMRequestTestUtils.createOmKeyInfo(volumeName,
bucketName, keyPrefix + i, HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.ONE, 0L, Time.now());

final String dbOpenKeyName;
if (bucketLayout.isFileSystemOptimized()) {
keyInfo.setParentObjectID(i);
keyInfo.setFileName(OzoneFSUtils.getFileName(keyInfo.getKeyName()));
OMRequestTestUtils.addFileToKeyTable(true, false,
keyInfo.getFileName(), keyInfo, clientID, 0L, omMetadataManager);
dbOpenKeyName = omMetadataManager.getOpenFileName(volumeId, bucketId,
keyInfo.getParentObjectID(), keyInfo.getFileName(), clientID);
} else {
OMRequestTestUtils.addKeyToTable(true, false,
keyInfo, clientID, 0L, omMetadataManager);
dbOpenKeyName = omMetadataManager.getOpenKey(volumeName, bucketName,
keyInfo.getKeyName(), clientID);
}
openKeys.add(dbOpenKeyName);
}

String dbPrefix;
if (bucketLayout.isFileSystemOptimized()) {
dbPrefix = omMetadataManager.getOzoneKeyFSO(volumeName, bucketName, "");
} else {
dbPrefix = omMetadataManager.getOzoneKey(volumeName, bucketName, "");
}

// Without pagination
ListOpenFilesResult res = omMetadataManager.listOpenFiles(
bucketLayout, 100L, dbPrefix, false, dbPrefix);

assertEquals(numOpenKeys, res.getTotalOpenKeyCount());
assertEquals(false, res.hasMore());
List<OpenKeySession> keySessionList = res.getOpenKeys();
assertEquals(numOpenKeys, keySessionList.size());
// Verify that every single open key shows up in the result, and in order
for (int i = 0; i < numOpenKeys; i++) {
OpenKeySession keySession = keySessionList.get(i);
assertEquals(keyPrefix + i, keySession.getKeyInfo().getKeyName());
assertEquals(clientID, keySession.getId());
assertEquals(0, keySession.getOpenVersion());
}

// With pagination
long pageSize = 2;
int numExpectedKeys = (int)pageSize;
res = omMetadataManager.listOpenFiles(
bucketLayout, pageSize, dbPrefix, false, dbPrefix);

// total open key count should still be 3
assertEquals(numOpenKeys, res.getTotalOpenKeyCount());
// hasMore should have been set
assertEquals(true, res.hasMore());
keySessionList = res.getOpenKeys();
assertEquals(numExpectedKeys, keySessionList.size());
for (int i = 0; i < numExpectedKeys; i++) {
OpenKeySession keySession = keySessionList.get(i);
assertEquals(keyPrefix + i, keySession.getKeyInfo().getKeyName());
assertEquals(clientID, keySession.getId());
assertEquals(0, keySession.getOpenVersion());
}

// Get the second page
OmKeyInfo lastKeyInfo = keySessionList.get((int)pageSize - 1).getKeyInfo();
String dbContTokenPrefix;
if (bucketLayout.isFileSystemOptimized()) {
dbContTokenPrefix = OM_KEY_PREFIX + volumeId +
OM_KEY_PREFIX + bucketId +
OM_KEY_PREFIX + lastKeyInfo.getPath();
} else {
dbContTokenPrefix = OM_KEY_PREFIX + volumeName +
OM_KEY_PREFIX + bucketName +
OM_KEY_PREFIX + lastKeyInfo.getFileName();
}
res = omMetadataManager.listOpenFiles(
bucketLayout, pageSize, dbPrefix, true, dbContTokenPrefix);

numExpectedKeys = numOpenKeys - (int)pageSize;
// total open key count should still be 3
assertEquals(numOpenKeys, res.getTotalOpenKeyCount());
assertEquals(false, res.hasMore());
keySessionList = res.getOpenKeys();
assertEquals(numExpectedKeys, keySessionList.size());
for (int i = 0; i < numExpectedKeys; i++) {
OpenKeySession keySession = keySessionList.get(i);
assertEquals(keyPrefix + (pageSize + i),
keySession.getKeyInfo().getKeyName());
assertEquals(clientID, keySession.getId());
assertEquals(0, keySession.getOpenVersion());
}
}

private static BucketLayout getDefaultBucketLayout() {
return BucketLayout.DEFAULT;
}
Expand Down

0 comments on commit 96d1747

Please sign in to comment.