Skip to content

Commit

Permalink
Support Update Reporting for batches
Browse files Browse the repository at this point in the history
  • Loading branch information
mreyescdl committed Dec 12, 2024
1 parent 6146aae commit 9bfbd32
Showing 1 changed file with 26 additions and 1 deletion.
27 changes: 26 additions & 1 deletion src/main/java/org/cdlib/mrt/zk/Batch.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public static Batch acquirePendingBatch(ZooKeeper client) throws MerrittZKNodeIn
}
return null;
}

public static Batch acquireBatchForReporting(ZooKeeper client) throws MerrittZKNodeInvalid, KeeperException, InterruptedException, MerrittStateError {
List<String> batches = client.getChildren(QueueItem.ZkPaths.Batch.path, false);
batches.sort(String::compareTo);
Expand All @@ -142,7 +143,7 @@ public static Batch acquireBatchForReporting(ZooKeeper client) throws MerrittZKN
Batch b = new Batch(cp);
b.load(client);

if (b.status() == BatchState.Completed || b.status() == BatchState.Failed) {
if (b.status() == BatchState.Completed || b.status() == BatchState.Failed || b.status() == BatchState.UpdateReporting) {
continue;
}

Expand All @@ -156,6 +157,30 @@ public static Batch acquireBatchForReporting(ZooKeeper client) throws MerrittZKN
return null;
}

public static Batch acquireUpdateBatchForReporting(ZooKeeper client) throws MerrittZKNodeInvalid, KeeperException, InterruptedException, MerrittStateError {
List<String> batches = client.getChildren(QueueItem.ZkPaths.Batch.path, false);
batches.sort(String::compareTo);
for(String cp: batches) {
String p = String.format("%s/%s/states/%s", QueueItem.ZkPaths.Batch.path, cp, BatchJobStates.Processing.path);
if (QueueItemHelper.exists(client, p)) {
if (client.getChildren(p, false).isEmpty()) {
Batch b = new Batch(cp);
b.load(client);

if (b.status() == BatchState.UpdateReporting) {
if (b.lock(client)) {
// b.setStatus(client, BatchState.Reporting);
return b;
}
else
continue;
}
}
}
}
return null;
}

public static List<String> deleteCompletedBatches(ZooKeeper client) throws MerrittZKNodeInvalid, KeeperException, InterruptedException, MerrittStateError {
List<String> deleted = new ArrayList<>();
if (!QueueItemHelper.exists(client, QueueItem.ZkPaths.Batch.path)) {
Expand Down

0 comments on commit 9bfbd32

Please sign in to comment.