diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/JobRoutes.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/JobRoutes.java index 31f55e5987..d420200d40 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/JobRoutes.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/JobRoutes.java @@ -15,6 +15,7 @@ import com.linkedin.venice.HttpConstants; import com.linkedin.venice.acl.DynamicAccessController; import com.linkedin.venice.controller.Admin; +import com.linkedin.venice.controller.server.endpoints.JobStatusRequest; import com.linkedin.venice.controllerapi.ControllerResponse; import com.linkedin.venice.controllerapi.IncrementalPushVersionsResponse; import com.linkedin.venice.controllerapi.JobStatusQueryResponse; @@ -53,20 +54,17 @@ public Route jobStatus(Admin admin) { try { // No ACL check for getting job metadata AdminSparkServer.validateParams(request, JOB.getParams(), admin); - String cluster = request.queryParams(CLUSTER); - String store = request.queryParams(NAME); - int versionNumber = Utils.parseIntFromString(request.queryParams(VERSION), VERSION); - String incrementalPushVersion = AdminSparkServer.getOptionalParameterValue(request, INCREMENTAL_PUSH_VERSION); - String targetedRegions = request.queryParams(TARGETED_REGIONS); - String region = AdminSparkServer.getOptionalParameterValue(request, FABRIC); - responseObject = populateJobStatus( - cluster, - store, - versionNumber, - admin, - Optional.ofNullable(incrementalPushVersion), - region, - targetedRegions); + + JobStatusRequest jobStatusRequest = new JobStatusRequest(); + jobStatusRequest.setCluster(request.queryParams(CLUSTER)); + jobStatusRequest.setStore(request.queryParams(NAME)); + jobStatusRequest.setVersionNumber(Utils.parseIntFromString(request.queryParams(VERSION), VERSION)); + jobStatusRequest + .setIncrementalPushVersion(AdminSparkServer.getOptionalParameterValue(request, INCREMENTAL_PUSH_VERSION)); + jobStatusRequest.setTargetedRegions(request.queryParams(TARGETED_REGIONS)); + jobStatusRequest.setRegion(AdminSparkServer.getOptionalParameterValue(request, FABRIC)); + + populateJobStatus(jobStatusRequest, admin, responseObject); } catch (Throwable e) { responseObject.setError(e); AdminSparkServer.handleError(e, request, response); @@ -75,20 +73,24 @@ public Route jobStatus(Admin admin) { }; } - JobStatusQueryResponse populateJobStatus( - String cluster, - String store, - int versionNumber, + public JobStatusQueryResponse populateJobStatus( + JobStatusRequest jobStatusRequest, Admin admin, - Optional incrementalPushVersion, - String region, - String targetedRegions) { - JobStatusQueryResponse responseObject = new JobStatusQueryResponse(); + JobStatusQueryResponse responseObject) { + String store = jobStatusRequest.getStore(); + int versionNumber = jobStatusRequest.getVersionNumber(); + String cluster = jobStatusRequest.getCluster(); + String incrementalPushVersion = jobStatusRequest.getIncrementalPushVersion(); + String region = jobStatusRequest.getRegion(); + String targetedRegions = jobStatusRequest.getTargetedRegions(); String kafkaTopicName = Version.composeKafkaTopic(store, versionNumber); - - Admin.OfflinePushStatusInfo offlineJobStatus = - admin.getOffLinePushStatus(cluster, kafkaTopicName, incrementalPushVersion, region, targetedRegions); + Admin.OfflinePushStatusInfo offlineJobStatus = admin.getOffLinePushStatus( + cluster, + kafkaTopicName, + Optional.ofNullable(incrementalPushVersion), + region, + targetedRegions); responseObject.setStatus(offlineJobStatus.getExecutionStatus().toString()); responseObject.setStatusUpdateTimestamp(offlineJobStatus.getStatusUpdateTimestamp()); responseObject.setStatusDetails(offlineJobStatus.getStatusDetails()); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/endpoints/JobStatusRequest.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/endpoints/JobStatusRequest.java new file mode 100644 index 0000000000..e27d631f9d --- /dev/null +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/endpoints/JobStatusRequest.java @@ -0,0 +1,74 @@ +package com.linkedin.venice.controller.server.endpoints; + +import static com.linkedin.venice.controllerapi.ControllerApiConstants.*; + +import com.linkedin.venice.utils.Utils; + + +public class JobStatusRequest { + private String cluster; + private String store; + private int versionNumber; + private String incrementalPushVersion; + private String targetedRegions; + private String region; + + public JobStatusRequest() { + } + + public void setCluster(String cluster) { + this.cluster = cluster; + } + + public String getCluster() { + return cluster; + } + + public void setStore(String store) { + this.store = store; + } + + public String getStore() { + return store; + } + + public void setVersionNumber(String versionNumber) { + this.versionNumber = parseVersionNumber(versionNumber); + } + + public void setVersionNumber(int versionNumber) { + this.versionNumber = versionNumber; + } + + private int parseVersionNumber(String versionNumber) { + return Utils.parseIntFromString(versionNumber, VERSION); + } + + public int getVersionNumber() { + return versionNumber; + } + + public void setIncrementalPushVersion(String incrementalPushVersion) { + this.incrementalPushVersion = incrementalPushVersion; + } + + public String getIncrementalPushVersion() { + return incrementalPushVersion; + } + + public void setTargetedRegions(String targetedRegions) { + this.targetedRegions = targetedRegions; + } + + public String getTargetedRegions() { + return targetedRegions; + } + + public void setRegion(String region) { + this.region = region; + } + + public String getRegion() { + return region; + } +} diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/JobRoutesTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/JobRoutesTest.java index f3ce901a72..3e8cbbf527 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/JobRoutesTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/JobRoutesTest.java @@ -7,6 +7,7 @@ import com.linkedin.venice.controller.Admin; import com.linkedin.venice.controller.VeniceParentHelixAdmin; +import com.linkedin.venice.controller.server.endpoints.JobStatusRequest; import com.linkedin.venice.controllerapi.JobStatusQueryResponse; import com.linkedin.venice.pushmonitor.ExecutionStatus; import com.linkedin.venice.utils.Utils; @@ -34,8 +35,17 @@ public void testPopulateJobStatus() { String store = Utils.getUniqueString("store"); int version = 5; JobRoutes jobRoutes = new JobRoutes(false, Optional.empty()); - JobStatusQueryResponse response = - jobRoutes.populateJobStatus(cluster, store, version, mockAdmin, Optional.empty(), null, null); + + JobStatusRequest jobStatusRequest = new JobStatusRequest(); + jobStatusRequest.setCluster(cluster); + jobStatusRequest.setStore(store); + jobStatusRequest.setVersionNumber(version); + jobStatusRequest.setIncrementalPushVersion(""); + jobStatusRequest.setTargetedRegions(null); + jobStatusRequest.setRegion(null); + + JobStatusQueryResponse response = new JobStatusQueryResponse(); + jobRoutes.populateJobStatus(jobStatusRequest, mockAdmin, response); Map extraInfo = response.getExtraInfo(); LOGGER.info("extraInfo: {}", extraInfo);