Skip to content

Commit

Permalink
Fix pagination; finish integration test.
Browse files Browse the repository at this point in the history
  • Loading branch information
smengcl committed Jan 6, 2024
1 parent 41a566e commit 98ccdf7
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -556,52 +556,120 @@ public void testOzoneAdminCmdList() throws UnsupportedEncodingException {
@Test
public void testOzoneAdminCmdListOpenFiles()
throws IOException, InterruptedException {
final String volumeName = "volumelof";

OzoneConfiguration conf = cluster.getConf();
final String hostPrefix = OZONE_OFS_URI_SCHEME + "://" + omServiceId;
// OzoneConfiguration conf = new OzoneConfiguration();
// conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
// final String hostPrefix = OZONE_OFS_URI_SCHEME + "://localhost:9862";

OzoneConfiguration clientConf = getClientConfForOFS(hostPrefix, conf);
FileSystem fs = FileSystem.get(clientConf);

final String volumeName = "volumelof";
String dir1 = hostPrefix + OM_KEY_PREFIX + volumeName + OM_KEY_PREFIX +
"buck1" + OM_KEY_PREFIX + "dir1";
// Create volume, bucket, dir
assertTrue(fs.mkdirs(new Path(dir1)));
String key1 = dir1 + OM_KEY_PREFIX + "key1";
String keyPrefix = OM_KEY_PREFIX + "key";

final int numKeys = 5;
String[] keys = new String[numKeys];

for (int i = 0; i < numKeys; i++) {
keys[i] = dir1 + keyPrefix + i;
}

int pageSize = 3;

String[] args = new String[] {"om", "listopenfiles",
"-id", omServiceId,
"-p", "/volumelof/buck1"};
FSDataOutputStream[] streams = new FSDataOutputStream[numKeys];
// Create multiple keys and hold them open
for (int i = 0; i < numKeys; i++) {
streams[i] = fs.create(new Path(keys[i]));
streams[i].write(1);
}

// Create key1
try (FSDataOutputStream stream = fs.create(new Path(key1))) {
stream.write(1);
try {
// Wait for flush to DB table
cluster.getOzoneManager().awaitDoubleBufferFlush();

String[] args = new String[] {"om", "listopenfiles",
"-id", omServiceId,
"-l", String.valueOf(numKeys + 1), // pagination
"-p", "/volumelof/buck1"};
// Run listopenfiles
execute(ozoneAdminShell, args);
String res1 = out.toString();
out.reset();
String cmdRes = getStdOut();
// Should have retrieved all 5 open keys
for (int i = 0; i < numKeys; i++) {
assertTrue(cmdRes.contains(keyPrefix + i));
}

// Try hsync
stream.hsync();
// Wait for flush
// cluster.getOzoneManager().awaitDoubleBufferFlush();
// Try pagination
args = new String[] {"om", "listopenfiles",
"-id", omServiceId,
"-l", String.valueOf(pageSize), // pagination
"-p", "/volumelof/buck1"};
execute(ozoneAdminShell, args);
cmdRes = getStdOut();

// Run listopenfiles again
// Should have retrieved the 1st page only (3 keys)
for (int i = 0; i < pageSize; i++) {
assertTrue(cmdRes.contains(keyPrefix + i));
}
for (int i = pageSize; i < numKeys; i++) {
assertFalse(cmdRes.contains(keyPrefix + i));
}
// No hsync'ed file/key at this point
assertFalse(cmdRes.contains("\tYes\t"));

// Get last line of the output which has the continuation token
String[] lines = cmdRes.split("\n");
String nextCmd = lines[lines.length - 1].trim();
String kw = "--start=";
String contToken =
nextCmd.substring(nextCmd.lastIndexOf(kw) + kw.length());

args = new String[] {"om", "listopenfiles",
"-id", omServiceId,
"-l", String.valueOf(pageSize), // pagination
"-p", "/volumelof/buck1",
"-s", contToken};
execute(ozoneAdminShell, args);
String res2 = out.toString();
out.reset();
// Verify that result has key1
}
cmdRes = getStdOut();

// TODO: Test pagination
// Should have retrieved the 2nd page only (2 keys)
for (int i = 0; i < pageSize - 1; i++) {
assertFalse(cmdRes.contains(keyPrefix + i));
}
// Note: key2 is shown in the continuation token prompt
for (int i = pageSize - 1; i < numKeys; i++) {
assertTrue(cmdRes.contains(keyPrefix + i));
}

// hsync last key
streams[numKeys - 1].hsync();

execute(ozoneAdminShell, args);
cmdRes = getStdOut();

// Verify that only one key is hsync'ed
assertTrue(cmdRes.contains("\tYes\t"));
assertTrue(cmdRes.contains("\tNo\t"));
} finally {
// Cleanup
for (int i = 0; i < numKeys; i++) {
streams[i].close();
}
}

// TODO: Test with OBS/LEGACY bucket
// TODO: In UT, test with OBS/LEGACY bucket
}

// TODO: Cleanup
/**
* Return stdout as a String, then clears exising output.
*/
private String getStdOut() throws UnsupportedEncodingException {
String res = out.toString(UTF_8.name());
out.reset();
return res;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,12 @@ boolean recoverTrash(String volumeName, String bucketName,
List<OmVolumeArgs> listVolumes(String userName, String prefix,
String startKey, int maxKeys) throws IOException;

/**
* Get total open key count (estimated, due to the nature of RocksDB impl)
* of both OpenKeyTable and OpenFileTable.
*/
long getOpenKeyCount() throws IOException;

/**
* Returns the names of up to {@code count} open keys whose age is
* greater than or equal to {@code expireThreshold}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1732,6 +1732,13 @@ private boolean isOpenMultipartKey(OmKeyInfo openKeyInfo, String openDbKey)
return getMultipartInfoTable().isExist(multipartInfoDbKey);
}

@Override
public long getOpenKeyCount() throws IOException {
// Get an estimated key count of OpenKeyTable + OpenFileTable
return openKeyTable.getEstimatedKeyCount()
+ openFileTable.getEstimatedKeyCount();
}

@Override
public ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold,
int count, BucketLayout bucketLayout) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3214,17 +3214,23 @@ public ListOpenFilesResult listOpenFiles(String path,
throw omEx;
}

final String dbOpenKeyPrefix;
// Mark as final to make sure they are assigned once and only once in
// every branch.
final String dbOpenKeyPrefix, dbContTokenPrefix;
final String volumeName, bucketName;
final BucketLayout bucketLayout;

// Process path prefix
if (path == null || path.isEmpty() || path.equals(OM_KEY_PREFIX)) {
// path is root
dbOpenKeyPrefix = "";
// default to FSO. TODO: Add client option to pass OBS/LEGACY?
volumeName = "";
bucketName = "";
// default to FSO's OpenFileTable. TODO: client option to pass OBS/LEGACY?
bucketLayout = BucketLayout.FILE_SYSTEM_OPTIMIZED;
} else {
// path is bucket or key prefix, break it down to volume, bucket, prefix
StringTokenizer tokenizer = new StringTokenizer(path, OM_KEY_PREFIX);

// Validate path to avoid NoSuchElementException
if (tokenizer.countTokens() < 2) {
metrics.incNumListOpenFilesFails();
Expand All @@ -3233,8 +3239,8 @@ public ListOpenFilesResult listOpenFiles(String path,
INVALID_PATH);
}

final String volumeName = tokenizer.nextToken();
final String bucketName = tokenizer.nextToken();
volumeName = tokenizer.nextToken();
bucketName = tokenizer.nextToken();

OmBucketInfo bucketInfo;
try {
Expand All @@ -3245,7 +3251,6 @@ public ListOpenFilesResult listOpenFiles(String path,
throw ex;
}

bucketLayout = bucketInfo.getBucketLayout();

final String keyPrefix;
if (tokenizer.hasMoreTokens()) {
Expand All @@ -3255,32 +3260,56 @@ public ListOpenFilesResult listOpenFiles(String path,
keyPrefix = "";
}

if (contToken == null || contToken.isEmpty()) {
// TODO: Add helper method for this
switch (bucketInfo.getBucketLayout()) {
case FILE_SYSTEM_OPTIMIZED:
final long volumeId = metadataManager.getVolumeId(volumeName);
final long bucketId = metadataManager.getBucketId(volumeName,
bucketName);
// FSO keyPrefix could look like: -9223372036854774527/key1
dbOpenKeyPrefix = metadataManager.getOzoneKey(
Long.toString(volumeId),
Long.toString(bucketId),
keyPrefix);
break;
case OBJECT_STORE:
case LEGACY:
dbOpenKeyPrefix = metadataManager.getOzoneKey(
volumeName,
bucketName,
keyPrefix);
break;
default:
throw new OMException("Unsupported bucket layout: " +
bucketInfo.getBucketLayout(), NOT_SUPPORTED_OPERATION);
}
// Determine dbKey prefix based on the bucket type
bucketLayout = bucketInfo.getBucketLayout();
switch (bucketLayout) {
case FILE_SYSTEM_OPTIMIZED:
dbOpenKeyPrefix = getDbKeyFSO(volumeName, bucketName, keyPrefix);
break;
case OBJECT_STORE:
case LEGACY:
dbOpenKeyPrefix =
metadataManager.getOzoneKey(volumeName, bucketName, keyPrefix);
break;
default:
metrics.incNumListOpenFilesFails();
throw new OMException("Unsupported bucket layout: " +
bucketInfo.getBucketLayout(), NOT_SUPPORTED_OPERATION);
}
}

// Process cont. token
if (contToken == null || contToken.isEmpty()) {
// if a continuation token is not specified
dbContTokenPrefix = dbOpenKeyPrefix;
} else {
// need to translate FSO bucket cont. token's volume and bucket names
// into object IDs

StringTokenizer tokenizer = new StringTokenizer(contToken, OM_KEY_PREFIX);
// Validate cont. token to avoid NoSuchElementException
if (tokenizer.countTokens() < 2) {
metrics.incNumListOpenFilesFails();
throw new OMException("Invalid continuation token: " + contToken,
INVALID_PATH);
}
final String ctVolumeName = tokenizer.nextToken();
final String ctBucketName = tokenizer.nextToken();
// Validate that path and cont. token is in the same bucket
Preconditions.checkArgument(volumeName.equals(ctVolumeName),
"Path volume name '" + volumeName +
"' and token volume name '" + ctVolumeName + "' does not match");
Preconditions.checkArgument(bucketName.equals(ctBucketName),
"Path bucket name '" + bucketName +
"' and token bucket name '" + ctBucketName + "' does not match");

if (bucketLayout.equals(BucketLayout.FILE_SYSTEM_OPTIMIZED)) {
final String ctKeyPrefix = tokenizer.hasMoreTokens() ?
tokenizer.nextToken("").substring(1) : "";
dbContTokenPrefix =
getDbKeyFSO(ctVolumeName, ctBucketName, ctKeyPrefix);
} else {
dbOpenKeyPrefix = contToken;
dbContTokenPrefix = contToken;
}
}

Expand All @@ -3303,9 +3332,15 @@ public ListOpenFilesResult listOpenFiles(String path,
TableIterator<String, ? extends KeyValue<String, OmKeyInfo>>
keyIter = keyTable.iterator()
) {
openKeyIter.seek(dbOpenKeyPrefix);
KeyValue<String, OmKeyInfo> kv;
while (currentCount < maxKeys + 1 && openKeyIter.hasNext()) {
kv = openKeyIter.seek(dbContTokenPrefix);
if (contToken != null && !contToken.isEmpty() &&
kv.getKey().startsWith(dbContTokenPrefix + OM_KEY_PREFIX)) {
// Skip one entry when cont token is specified and the current entry
// has the same prefix (less the client ID) as cont token.
openKeyIter.next();
}
while (currentCount < maxKeys && openKeyIter.hasNext()) {
kv = openKeyIter.next();
if (kv != null && kv.getKey().startsWith(dbOpenKeyPrefix)) {
String dbKey = kv.getKey();
Expand All @@ -3319,10 +3354,9 @@ public ListOpenFilesResult listOpenFiles(String path,
checkAndUpdateKeyHsyncStatus(omKeyInfo, ktDbKey, keyIter);

openKeySessionList.add(
new OpenKeySession(
clientID,
omKeyInfo,
new OpenKeySession(clientID, omKeyInfo,
omKeyInfo.getLatestVersionLocations().getVersion()));
currentCount++;
}
}

Expand All @@ -3338,7 +3372,22 @@ public ListOpenFilesResult listOpenFiles(String path,
return new ListOpenFilesResult(
openKeySessionList,
hasMore,
openKeyTable.getEstimatedKeyCount());
metadataManager.getOpenKeyCount());
}

/**
* For FSO buckets, retrieve object ID for volume name and bucket name,
* and return the dbKey prefixed by volume and bucket object ID.
*/
private String getDbKeyFSO(String volumeName,
String bucketName,
String keyPrefix)
throws IOException {
final long volumeId = metadataManager.getVolumeId(volumeName);
final long bucketId = metadataManager.getBucketId(volumeName, bucketName);
// FSO keyPrefix could look like: -9223372036854774527/key1
return metadataManager.getOzoneKey(
Long.toString(volumeId), Long.toString(bucketId), keyPrefix);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ private void printOpenKeysList(ListOpenFilesResult res) {
line += "No\t\t";
}

System.out.println(line + getFullPathFromKeyInfo(omKeyInfo)
// + "\t" + omKeyInfo // TODO: Remove full KeyInfo print
);
line += getFullPathFromKeyInfo(omKeyInfo);

System.out.println(line);
}

// Compose next batch's command
Expand All @@ -181,22 +181,20 @@ private void printOpenKeysList(ListOpenFilesResult res) {
*/
private String getCmdForNextBatch(String lastElementFullPath) {
String nextBatchCmd = "ozone admin om lof";
if (!omServiceId.isEmpty()) {
if (omServiceId != null && !omServiceId.isEmpty()) {
nextBatchCmd += " -id=" + omServiceId;
}
if (!omHost.isEmpty()) {
if (omHost != null && !omHost.isEmpty()) {
nextBatchCmd += " -host=" + omHost;
}
if (json) {
nextBatchCmd += " --json";
}
nextBatchCmd += " --length=" + limit;
if (!pathPrefix.isEmpty()) {
if (pathPrefix != null && !pathPrefix.isEmpty()) {
nextBatchCmd += " --prefix=" + pathPrefix;
}
if (!startItem.isEmpty()) {
nextBatchCmd += " --start=" + lastElementFullPath;
}
nextBatchCmd += " --start=" + lastElementFullPath;
return nextBatchCmd;
}

Expand Down

0 comments on commit 98ccdf7

Please sign in to comment.