Skip to content

Commit

Permalink
Add continuationToken to make continuation token work with OBS/LEGA…
Browse files Browse the repository at this point in the history
…CY as well, and to make it easier for JSON output consumers.
  • Loading branch information
smengcl committed Jan 9, 2024
1 parent ca910f7 commit 12e87d6
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,37 +32,52 @@
*/
public class ListOpenFilesResult {
/**
* List of open files. Each has client ID and OmKeyInfo.
* Number of total open files globally.
*/
private final List<OpenKeySession> openKeySessionList;
@JsonProperty("totalOpenKeyCount")
private final long totalOpenKeyCount;
/**
* True if there are more entries after this batch under the given path.
*/
@JsonProperty("hasMore")
private final boolean hasMore;
/**
* Number of total open files globally.
* True if there are more entries after this batch under the given path.
*/
@JsonProperty("totalOpenKeyCount")
private final long totalOpenKeyCount;
@JsonProperty("contToken")
private final String continuationToken;
/**
* List of open files. Each has client ID and OmKeyInfo.
*/
private final List<OpenKeySession> openKeySessionList;

public ListOpenFilesResult(List<OpenKeySession> openKeySessionList,
boolean hasMore, long totalOpenKeyCount) {
public ListOpenFilesResult(long totalOpenKeyCount,
boolean hasMore,
String continuationToken,
List<OpenKeySession> openKeySessionList) {
this.openKeySessionList = openKeySessionList;
this.hasMore = hasMore;
this.continuationToken = continuationToken;
this.totalOpenKeyCount = totalOpenKeyCount;
}

public ListOpenFilesResult(List<Long> clientIDsList,
List<KeyInfo> keyInfosList,
boolean hasMore, long totalOpenKeyCount)
public ListOpenFilesResult(long totalOpenKeyCount,
boolean hasMore,
String continuationToken,
List<Long> clientIDsList,
List<KeyInfo> keyInfosList)
throws IOException {
this.openKeySessionList = getOpenKeySessionListFromPB(clientIDsList,
keyInfosList);
this.hasMore = hasMore;
this.continuationToken = continuationToken;
this.totalOpenKeyCount = totalOpenKeyCount;
}

/**
* Combines clientIDsList and keyInfosList into OpenKeySessionList for
* transfer to the client.
*/
private List<OpenKeySession> getOpenKeySessionListFromPB(
List<Long> clientIDsList, List<KeyInfo> keyInfosList)
throws IOException {
Expand All @@ -82,15 +97,19 @@ private List<OpenKeySession> getOpenKeySessionListFromPB(
return res;
}

public List<OpenKeySession> getOpenKeys() {
return openKeySessionList;
public long getTotalOpenKeyCount() {
return totalOpenKeyCount;
}

public boolean hasMore() {
return hasMore;
}

public long getTotalOpenKeyCount() {
return totalOpenKeyCount;
public String getContinuationToken() {
return continuationToken;
}

public List<OpenKeySession> getOpenKeys() {
return openKeySessionList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ List<OmBucketInfo> listBuckets(String volumeName,

/**
* List open files in OM.
* @param path root "/", path to a bucket, key path, or key prefix
* @param path One of: root "/", path to a bucket, key path, or key prefix
* @param maxKeys Limit the number of keys that can be returned in this batch.
* @param contToken Continuation token.
* @return ListOpenFilesResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1812,10 +1812,12 @@ public ListOpenFilesResult listOpenFiles(String path,
final ListOpenFilesResponse resp = handleError(submitRequest(omRequest))
.getListOpenFilesResponse();

return new ListOpenFilesResult(resp.getClientIDList(),
resp.getKeyInfoList(),
return new ListOpenFilesResult(
resp.getTotalOpenKeyCount(),
resp.getHasMore(),
resp.getTotalOpenKeyCount());
resp.hasContinuationToken() ? resp.getContinuationToken() : null,
resp.getClientIDList(),
resp.getKeyInfoList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,9 +563,11 @@ public void testOzoneAdminCmdListOpenFiles()
OzoneConfiguration clientConf = getClientConfForOFS(hostPrefix, conf);
FileSystem fs = FileSystem.get(clientConf);

final String volumeName = "volumelof";
final String volumeName = "volume-lof";
final String bucketName = "buck1";

String dir1 = hostPrefix + OM_KEY_PREFIX + volumeName + OM_KEY_PREFIX +
"buck1" + OM_KEY_PREFIX + "dir1";
bucketName + OM_KEY_PREFIX + "dir1";
// Create volume, bucket, dir
assertTrue(fs.mkdirs(new Path(dir1)));
String keyPrefix = OM_KEY_PREFIX + "key";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1540,9 +1540,11 @@ message ListOpenFilesResponse {
optional uint64 totalOpenKeyCount = 1;
// indicates if there are more entries to be retrieved under the given path
optional bool hasMore = 2;
// continuation token should match a dbKey in openKeyTable or openFileTable
optional string continuationToken = 3;
// result
repeated uint64 clientID = 3;
repeated KeyInfo keyInfo = 4;
repeated uint64 clientID = 4;
repeated KeyInfo keyInfo = 5;
}

message ServicePort {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1201,10 +1201,11 @@ public ListOpenFilesResult listOpenFiles(BucketLayout bucketLayout,
List<OpenKeySession> openKeySessionList = new ArrayList<>();
int currentCount = 0;
final boolean hasMore;
final String retContToken;

// TODO: If we want "better" results, we may want to iterate cache like
// TODO: If we want "better" results, we want to iterate cache like
// listKeys do. But that complicates the iteration logic by quite a bit.
// If we do that, we would want to refactor listKeys as well to dedup.
// And if we do that, we need to refactor listKeys as well to dedup.

final Table<String, OmKeyInfo> okTable, kTable;
okTable = getOpenKeyTable(bucketLayout);
Expand All @@ -1213,16 +1214,12 @@ public ListOpenFilesResult listOpenFiles(BucketLayout bucketLayout,

// No lock required since table iterator creates a "snapshot"
try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>>
openKeyIter = okTable.iterator();
TableIterator<String, ? extends KeyValue<String, OmKeyInfo>>
keyIter = kTable.iterator()
) {
openKeyIter = okTable.iterator()) {
KeyValue<String, OmKeyInfo> kv;
kv = openKeyIter.seek(dbContTokenPrefix);
if (hasContToken &&
kv.getKey().startsWith(dbContTokenPrefix + OM_KEY_PREFIX)) {
if (hasContToken && kv.getKey().equals(dbContTokenPrefix)) {
// Skip one entry when cont token is specified and the current entry
// has the same prefix (less the client ID) as cont token.
// key is exactly the same as cont token.
openKeyIter.next();
}
while (currentCount < maxKeys && openKeyIter.hasNext()) {
Expand All @@ -1235,8 +1232,8 @@ public ListOpenFilesResult listOpenFiles(BucketLayout bucketLayout,
// Trim client ID to get the keyTable dbKey
int lastSlashIdx = dbKey.lastIndexOf(OM_KEY_PREFIX);
String ktDbKey = dbKey.substring(0, lastSlashIdx);
// Check whether the key has been hsync'ed by seeking keyTable
checkAndUpdateKeyHsyncStatus(omKeyInfo, ktDbKey, keyIter);
// Check whether the key has been hsync'ed by checking keyTable
checkAndUpdateKeyHsyncStatus(omKeyInfo, ktDbKey, kTable);

openKeySessionList.add(
new OpenKeySession(clientID, omKeyInfo,
Expand All @@ -1247,34 +1244,34 @@ public ListOpenFilesResult listOpenFiles(BucketLayout bucketLayout,

// Set hasMore flag as a hint for client-side pagination
if (openKeyIter.hasNext()) {
kv = openKeyIter.next();
hasMore = kv != null && kv.getKey().startsWith(dbOpenKeyPrefix);
KeyValue<String, OmKeyInfo> nextKv = openKeyIter.next();
hasMore = nextKv != null && nextKv.getKey().startsWith(dbOpenKeyPrefix);
} else {
hasMore = false;
}

// Set continuation token
retContToken = hasMore ? kv.getKey() : null;
}

return new ListOpenFilesResult(
openKeySessionList,
getTotalOpenKeyCount(),
hasMore,
getTotalOpenKeyCount());
retContToken,
openKeySessionList);
}

/**
* Check and update OmKeyInfo from OpenKeyTable with hsync status in KeyTable.
*/
private void checkAndUpdateKeyHsyncStatus(OmKeyInfo omKeyInfo,
String dbKey,
TableIterator<String, ? extends
KeyValue<String, OmKeyInfo>>
keyIter)
Table<String, OmKeyInfo> kTable)
throws IOException {
// TODO: do keyTable.get() directly
KeyValue<String, OmKeyInfo> kv = keyIter.seek(dbKey);
if (kv != null && kv.getKey().equals(dbKey)) {
OmKeyInfo ktOmKeyInfo = kTable.get(dbKey);
if (ktOmKeyInfo != null) {
// The same key in OpenKeyTable also exists in KeyTable, indicating
// the key has been hsync'ed
OmKeyInfo ktOmKeyInfo = kv.getValue();
String hsyncClientId = ktOmKeyInfo.getMetadata().get(HSYNC_CLIENT_ID);
// Append HSYNC_CLIENT_ID to OmKeyInfo to be returned to the client
omKeyInfo.getMetadata().put(HSYNC_CLIENT_ID, hsyncClientId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3269,34 +3269,7 @@ public ListOpenFilesResult listOpenFiles(String path,
// if a continuation token is not specified
dbContTokenPrefix = dbOpenKeyPrefix;
} else {
// need to translate FSO bucket cont. token's volume and bucket names
// into object IDs

StringTokenizer tokenizer = new StringTokenizer(contToken, OM_KEY_PREFIX);
// Validate cont. token to avoid NoSuchElementException
if (tokenizer.countTokens() < 2) {
metrics.incNumListOpenFilesFails();
throw new OMException("Invalid continuation token: " + contToken,
INVALID_PATH);
}
final String ctVolumeName = tokenizer.nextToken();
final String ctBucketName = tokenizer.nextToken();
// Validate that path and cont. token refers to the same bucket
Preconditions.checkArgument(volumeName.equals(ctVolumeName),
"Path volume name '" + volumeName +
"' and token volume name '" + ctVolumeName + "' does not match");
Preconditions.checkArgument(bucketName.equals(ctBucketName),
"Path bucket name '" + bucketName +
"' and token bucket name '" + ctBucketName + "' does not match");

if (bucketLayout.equals(BucketLayout.FILE_SYSTEM_OPTIMIZED)) {
final String ctKeyPrefix = tokenizer.hasMoreTokens() ?
tokenizer.nextToken("").substring(1) : "";
dbContTokenPrefix = metadataManager.getOzoneKeyFSO(
ctVolumeName, ctBucketName, ctKeyPrefix);
} else {
dbContTokenPrefix = contToken;
}
dbContTokenPrefix = contToken;
}

// arg processing done. call inner impl (table iteration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,9 @@ private ListOpenFilesResponse listOpenFiles(ListOpenFilesRequest req,

resp.setTotalOpenKeyCount(res.getTotalOpenKeyCount());
resp.setHasMore(res.hasMore());
if (res.getContinuationToken() != null) {
resp.setContinuationToken(res.getContinuationToken());
}

for (OpenKeySession e : res.getOpenKeys()) {
resp.addClientID(e.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,7 @@ private void printOpenKeysList(ListOpenFilesResult res) {

// Compose next batch's command
if (res.hasMore()) {
OpenKeySession lastElement = openFileList.get(openFileList.size() - 1);
String nextBatchCmd =
getCmdForNextBatch(getFullPathFromKeyInfo(lastElement.getKeyInfo()));
String nextBatchCmd = getCmdForNextBatch(res.getContinuationToken());

System.out.println("\n" +
"To get the next batch of open keys, run:\n " + nextBatchCmd);
Expand Down

0 comments on commit 12e87d6

Please sign in to comment.