Skip to content

Commit

Permalink
[controller] [refactor] [JobRoutes] refactor JobRoutes https request …
Browse files Browse the repository at this point in the history
…to JobStatusRequest
  • Loading branch information
Whitney Deng committed Oct 17, 2024
1 parent b106edb commit c82b043
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.linkedin.venice.HttpConstants;
import com.linkedin.venice.acl.DynamicAccessController;
import com.linkedin.venice.controller.Admin;
import com.linkedin.venice.controller.server.endpoints.JobStatusRequest;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.controllerapi.IncrementalPushVersionsResponse;
import com.linkedin.venice.controllerapi.JobStatusQueryResponse;
Expand Down Expand Up @@ -53,20 +54,17 @@ public Route jobStatus(Admin admin) {
try {
// No ACL check for getting job metadata
AdminSparkServer.validateParams(request, JOB.getParams(), admin);
String cluster = request.queryParams(CLUSTER);
String store = request.queryParams(NAME);
int versionNumber = Utils.parseIntFromString(request.queryParams(VERSION), VERSION);
String incrementalPushVersion = AdminSparkServer.getOptionalParameterValue(request, INCREMENTAL_PUSH_VERSION);
String targetedRegions = request.queryParams(TARGETED_REGIONS);
String region = AdminSparkServer.getOptionalParameterValue(request, FABRIC);
responseObject = populateJobStatus(
cluster,
store,
versionNumber,
admin,
Optional.ofNullable(incrementalPushVersion),
region,
targetedRegions);

JobStatusRequest jobStatusRequest = new JobStatusRequest();
jobStatusRequest.setCluster(request.queryParams(CLUSTER));
jobStatusRequest.setStore(request.queryParams(NAME));
jobStatusRequest.setVersionNumber(Utils.parseIntFromString(request.queryParams(VERSION), VERSION));
jobStatusRequest
.setIncrementalPushVersion(AdminSparkServer.getOptionalParameterValue(request, INCREMENTAL_PUSH_VERSION));
jobStatusRequest.setTargetedRegions(request.queryParams(TARGETED_REGIONS));
jobStatusRequest.setRegion(AdminSparkServer.getOptionalParameterValue(request, FABRIC));

populateJobStatus(jobStatusRequest, admin, responseObject);
} catch (Throwable e) {
responseObject.setError(e);
AdminSparkServer.handleError(e, request, response);
Expand All @@ -75,20 +73,24 @@ public Route jobStatus(Admin admin) {
};
}

JobStatusQueryResponse populateJobStatus(
String cluster,
String store,
int versionNumber,
public JobStatusQueryResponse populateJobStatus(
JobStatusRequest jobStatusRequest,
Admin admin,
Optional<String> incrementalPushVersion,
String region,
String targetedRegions) {
JobStatusQueryResponse responseObject = new JobStatusQueryResponse();
JobStatusQueryResponse responseObject) {
String store = jobStatusRequest.getStore();
int versionNumber = jobStatusRequest.getVersionNumber();
String cluster = jobStatusRequest.getCluster();
String incrementalPushVersion = jobStatusRequest.getIncrementalPushVersion();
String region = jobStatusRequest.getRegion();
String targetedRegions = jobStatusRequest.getTargetedRegions();

String kafkaTopicName = Version.composeKafkaTopic(store, versionNumber);

Admin.OfflinePushStatusInfo offlineJobStatus =
admin.getOffLinePushStatus(cluster, kafkaTopicName, incrementalPushVersion, region, targetedRegions);
Admin.OfflinePushStatusInfo offlineJobStatus = admin.getOffLinePushStatus(
cluster,
kafkaTopicName,
Optional.ofNullable(incrementalPushVersion),
region,
targetedRegions);
responseObject.setStatus(offlineJobStatus.getExecutionStatus().toString());
responseObject.setStatusUpdateTimestamp(offlineJobStatus.getStatusUpdateTimestamp());
responseObject.setStatusDetails(offlineJobStatus.getStatusDetails());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.linkedin.venice.controller.server.endpoints;

import static com.linkedin.venice.controllerapi.ControllerApiConstants.*;

import com.linkedin.venice.utils.Utils;


public class JobStatusRequest {
private String cluster;
private String store;
private int versionNumber;
private String incrementalPushVersion;
private String targetedRegions;
private String region;

public JobStatusRequest() {
}

public void setCluster(String cluster) {
this.cluster = cluster;
}

public String getCluster() {
return cluster;
}

public void setStore(String store) {
this.store = store;
}

public String getStore() {
return store;
}

public void setVersionNumber(String versionNumber) {
this.versionNumber = parseVersionNumber(versionNumber);
}

public void setVersionNumber(int versionNumber) {
this.versionNumber = versionNumber;
}

private int parseVersionNumber(String versionNumber) {
return Utils.parseIntFromString(versionNumber, VERSION);
}

public int getVersionNumber() {
return versionNumber;
}

public void setIncrementalPushVersion(String incrementalPushVersion) {
this.incrementalPushVersion = incrementalPushVersion;
}

public String getIncrementalPushVersion() {
return incrementalPushVersion;
}

public void setTargetedRegions(String targetedRegions) {
this.targetedRegions = targetedRegions;
}

public String getTargetedRegions() {
return targetedRegions;
}

public void setRegion(String region) {
this.region = region;
}

public String getRegion() {
return region;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.linkedin.venice.controller.Admin;
import com.linkedin.venice.controller.VeniceParentHelixAdmin;
import com.linkedin.venice.controller.server.endpoints.JobStatusRequest;
import com.linkedin.venice.controllerapi.JobStatusQueryResponse;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.utils.Utils;
Expand Down Expand Up @@ -34,8 +35,17 @@ public void testPopulateJobStatus() {
String store = Utils.getUniqueString("store");
int version = 5;
JobRoutes jobRoutes = new JobRoutes(false, Optional.empty());
JobStatusQueryResponse response =
jobRoutes.populateJobStatus(cluster, store, version, mockAdmin, Optional.empty(), null, null);

JobStatusRequest jobStatusRequest = new JobStatusRequest();
jobStatusRequest.setCluster(cluster);
jobStatusRequest.setStore(store);
jobStatusRequest.setVersionNumber(version);
jobStatusRequest.setIncrementalPushVersion("");
jobStatusRequest.setTargetedRegions(null);
jobStatusRequest.setRegion(null);

JobStatusQueryResponse response = new JobStatusQueryResponse();
jobRoutes.populateJobStatus(jobStatusRequest, mockAdmin, response);

Map<String, String> extraInfo = response.getExtraInfo();
LOGGER.info("extraInfo: {}", extraInfo);
Expand Down

0 comments on commit c82b043

Please sign in to comment.