From 1b6348a2a6357b2f3cd030e2a9b213fc032e1c4e Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Mon, 6 Jan 2025 16:17:22 +0700 Subject: [PATCH] Add new command to admin-tool --- .../java/com/linkedin/venice/AdminTool.java | 13 +++ .../main/java/com/linkedin/venice/Arg.java | 5 +- .../java/com/linkedin/venice/Command.java | 5 + .../controllerapi/ControllerClient.java | 8 ++ .../venice/controllerapi/ControllerRoute.java | 6 +- .../com/linkedin/venice/controller/Admin.java | 5 +- .../venice/controller/VeniceHelixAdmin.java | 31 +++-- .../controller/VeniceParentHelixAdmin.java | 11 +- .../controller/server/AdminSparkServer.java | 107 +----------------- .../server/AdminTopicMetadataRoutes.java | 40 +++++-- 10 files changed, 100 insertions(+), 131 deletions(-) diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java index c9733d099b9..85017d84acd 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java @@ -584,6 +584,9 @@ public static void main(String[] args) throws Exception { case DUMP_HOST_HEARTBEAT: dumpHostHeartbeat(cmd); break; + case UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION: + updateAdminOperationProtocolVersion(cmd); + break; default: StringJoiner availableCommands = new StringJoiner(", "); for (Command c: Command.values()) { @@ -3147,6 +3150,16 @@ private static void dumpHostHeartbeat(CommandLine cmd) throws Exception { } } + private static void updateAdminOperationProtocolVersion(CommandLine cmd) throws Exception { + String clusterName = getRequiredArgument(cmd, Arg.CLUSTER, Command.UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION); + String protocolVersionInString = + getRequiredArgument(cmd, Arg.ADMIN_OPERATION_PROTOCOL_VERSION, Command.UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION); + long protocolVersion = + Utils.parseLongFromString(protocolVersionInString, Arg.ADMIN_OPERATION_PROTOCOL_VERSION.name()); + ControllerResponse response = controllerClient.updateAdminOperationProtocolVersion(clusterName, protocolVersion); + printObject(response); + } + private static void migrateVeniceZKPaths(CommandLine cmd) throws Exception { Set clusterNames = Utils.parseCommaSeparatedStringToSet(getRequiredArgument(cmd, Arg.CLUSTER_LIST)); String srcZKUrl = getRequiredArgument(cmd, Arg.SRC_ZOOKEEPER_URL); diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java index 75b2115a495..2a0cad48ec2 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java @@ -294,7 +294,10 @@ public enum Arg { ), DAVINCI_HEARTBEAT_REPORTED( "dvc-heartbeat-reported", "dvchb", true, "Flag to indicate whether DVC is bootstrapping and sending heartbeats" - ), ENABLE_STORE_MIGRATION("enable-store-migration", "esm", true, "Toggle store migration store config"); + ), ENABLE_STORE_MIGRATION("enable-store-migration", "esm", true, "Toggle store migration store config"), + ADMIN_OPERATION_PROTOCOL_VERSION( + "admin-operation-protocol-version", "aopv", true, "Admin operation protocol version" + ),; private final String argName; private final String first; diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java index 51828eb3ae7..d45b72e0a11 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java @@ -3,6 +3,7 @@ import static com.linkedin.venice.Arg.ACCESS_CONTROL; import static com.linkedin.venice.Arg.ACL_PERMS; import static com.linkedin.venice.Arg.ACTIVE_ACTIVE_REPLICATION_ENABLED; +import static com.linkedin.venice.Arg.ADMIN_OPERATION_PROTOCOL_VERSION; import static com.linkedin.venice.Arg.ALLOW_STORE_MIGRATION; import static com.linkedin.venice.Arg.AUTO_SCHEMA_REGISTER_FOR_PUSHJOB_ENABLED; import static com.linkedin.venice.Arg.BACKUP_FOLDER; @@ -562,6 +563,10 @@ public enum Command { "dump-host-heartbeat", "Dump all heartbeat belong to a certain storage node. You can use topic/partition to filter specific resource, and you can choose to filter resources that are lagging.", new Arg[] { SERVER_URL, KAFKA_TOPIC_NAME }, new Arg[] { PARTITION, LAG_FILTER_ENABLED } + ), + UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION( + "update-admin-operation-protocol-version", "Update the admin operation protocol version", + new Arg[] { CLUSTER, ADMIN_OPERATION_PROTOCOL_VERSION } ); private final String commandName; diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java index 7f521ae5c86..9e282888a21 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java @@ -1359,6 +1359,14 @@ public ControllerResponse updateAdminTopicMetadata( return request(ControllerRoute.UPDATE_ADMIN_TOPIC_METADATA, params, ControllerResponse.class); } + public ControllerResponse updateAdminOperationProtocolVersion( + String clusterName, + Long adminOperationProtocolVersion) { + QueryParams params = + newParams().add(CLUSTER, clusterName).add(ADMIN_OPERATION_PROTOCOL_VERSION, adminOperationProtocolVersion); + return request(ControllerRoute.UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION, params, ControllerResponse.class); + } + public ControllerResponse deleteKafkaTopic(String topicName) { QueryParams params = newParams().add(TOPIC, topicName); return request(ControllerRoute.DELETE_KAFKA_TOPIC, params, ControllerResponse.class); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerRoute.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerRoute.java index 454e6a35590..31fafb0237b 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerRoute.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerRoute.java @@ -284,7 +284,11 @@ public enum ControllerRoute { ), GET_ADMIN_TOPIC_METADATA("/get_admin_topic_metadata", HttpMethod.GET, Collections.singletonList(CLUSTER), NAME), UPDATE_ADMIN_TOPIC_METADATA( "/update_admin_topic_metadata", HttpMethod.POST, Arrays.asList(CLUSTER, EXECUTION_ID), NAME, OFFSET, - UPSTREAM_OFFSET, ADMIN_OPERATION_PROTOCOL_VERSION + UPSTREAM_OFFSET + ), + UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION( + "/update_admin_operation_protocol_version", HttpMethod.POST, Collections.singletonList(CLUSTER), + ADMIN_OPERATION_PROTOCOL_VERSION ), DELETE_KAFKA_TOPIC("/delete_kafka_topic", HttpMethod.POST, Arrays.asList(CLUSTER, TOPIC)), CREATE_STORAGE_PERSONA( diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java index 1fd43b4cb13..1921a817c15 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java @@ -964,8 +964,9 @@ void updateAdminTopicMetadata( long executionId, Optional storeName, Optional offset, - Optional upstreamOffset, - Optional adminOperationProtocolVersion); + Optional upstreamOffset); + + void updateAdminOperationProtocolVersion(String clusterName, Long adminOperationProtocolVersion); void createStoragePersona( String clusterName, diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 919f072866d..06bfbe02265 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -7133,27 +7133,16 @@ public Map getAdminTopicMetadata(String clusterName, Optional storeName, Optional offset, - Optional upstreamOffset, - Optional adminOperationProtocolVersion) { + Optional upstreamOffset) { Map metadata = adminConsumerServices.get(clusterName).getAdminTopicMetadata(clusterName); if (storeName.isPresent()) { executionIdAccessor.updateLastSucceededExecutionIdMap(clusterName, storeName.get(), executionId); - } else if (adminOperationProtocolVersion.isPresent()) { - Pair currentOffsets = AdminTopicMetadataAccessor.getOffsets(metadata); - adminConsumerServices.get(clusterName) - .updateAdminTopicMetadata( - clusterName, - executionId, - currentOffsets.getFirst(), - currentOffsets.getSecond(), - adminOperationProtocolVersion.get()); } else { if (!offset.isPresent() || !upstreamOffset.isPresent()) { throw new VeniceException("Offsets must be provided to update cluster-level admin topic metadata"); @@ -7170,6 +7159,24 @@ public void updateAdminTopicMetadata( } } + /** + * Update the version of admin operation protocol in admin topic metadata + */ + public void updateAdminOperationProtocolVersion(String clusterName, Long adminOperationProtocolVersion) { + Map metadata = adminConsumerServices.get(clusterName).getAdminTopicMetadata(clusterName); + + Pair currentOffsets = AdminTopicMetadataAccessor.getOffsets(metadata); + Long executionId = AdminTopicMetadataAccessor.getExecutionId(metadata); + + adminConsumerServices.get(clusterName) + .updateAdminTopicMetadata( + clusterName, + executionId, + currentOffsets.getFirst(), + currentOffsets.getSecond(), + adminOperationProtocolVersion); + } + /** * @see Admin#getRoutersClusterConfig(String) */ diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 4c1e65873ad..755cf17e950 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -4274,8 +4274,15 @@ public void updateAdminTopicMetadata( long executionId, Optional storeName, Optional offset, - Optional upstreamOffset, - Optional adminOperationProtocolVersion) { + Optional upstreamOffset) { + throw new VeniceUnsupportedOperationException("updateAdminTopicMetadata"); + } + + /** + * Unsupported operation in the parent controller. + */ + @Override + public void updateAdminOperationProtocolVersion(String clusterName, Long adminOperationProtocolVersion) { throw new VeniceUnsupportedOperationException("updateAdminTopicMetadata"); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java index c77d7fafab5..f24b06d2716 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java @@ -1,109 +1,7 @@ package com.linkedin.venice.controller.server; import static com.linkedin.venice.controllerapi.ControllerApiConstants.CLUSTER; -import static com.linkedin.venice.controllerapi.ControllerRoute.ABORT_MIGRATION; -import static com.linkedin.venice.controllerapi.ControllerRoute.ADD_DERIVED_SCHEMA; -import static com.linkedin.venice.controllerapi.ControllerRoute.ADD_VALUE_SCHEMA; -import static com.linkedin.venice.controllerapi.ControllerRoute.ADD_VERSION; -import static com.linkedin.venice.controllerapi.ControllerRoute.AGGREGATED_HEALTH_STATUS; -import static com.linkedin.venice.controllerapi.ControllerRoute.ALLOW_LIST_ADD_NODE; -import static com.linkedin.venice.controllerapi.ControllerRoute.ALLOW_LIST_REMOVE_NODE; -import static com.linkedin.venice.controllerapi.ControllerRoute.BACKUP_VERSION; -import static com.linkedin.venice.controllerapi.ControllerRoute.CHECK_RESOURCE_CLEANUP_FOR_STORE_CREATION; -import static com.linkedin.venice.controllerapi.ControllerRoute.CLEANUP_INSTANCE_CUSTOMIZED_STATES; -import static com.linkedin.venice.controllerapi.ControllerRoute.CLUSTER_DISCOVERY; -import static com.linkedin.venice.controllerapi.ControllerRoute.CLUSTER_HEALTH_STORES; -import static com.linkedin.venice.controllerapi.ControllerRoute.COMPARE_STORE; -import static com.linkedin.venice.controllerapi.ControllerRoute.COMPLETE_MIGRATION; -import static com.linkedin.venice.controllerapi.ControllerRoute.CONFIGURE_ACTIVE_ACTIVE_REPLICATION_FOR_CLUSTER; -import static com.linkedin.venice.controllerapi.ControllerRoute.CREATE_STORAGE_PERSONA; -import static com.linkedin.venice.controllerapi.ControllerRoute.ClUSTER_HEALTH_INSTANCES; -import static com.linkedin.venice.controllerapi.ControllerRoute.DATA_RECOVERY; -import static com.linkedin.venice.controllerapi.ControllerRoute.DELETE_ACL; -import static com.linkedin.venice.controllerapi.ControllerRoute.DELETE_ALL_VERSIONS; -import static com.linkedin.venice.controllerapi.ControllerRoute.DELETE_KAFKA_TOPIC; -import static com.linkedin.venice.controllerapi.ControllerRoute.DELETE_OLD_VERSION; -import static com.linkedin.venice.controllerapi.ControllerRoute.DELETE_STORAGE_PERSONA; -import static com.linkedin.venice.controllerapi.ControllerRoute.DELETE_STORE; -import static com.linkedin.venice.controllerapi.ControllerRoute.EMPTY_PUSH; -import static com.linkedin.venice.controllerapi.ControllerRoute.ENABLE_MAX_CAPACITY_PROTECTION; -import static com.linkedin.venice.controllerapi.ControllerRoute.ENABLE_QUOTA_REBALANCED; -import static com.linkedin.venice.controllerapi.ControllerRoute.ENABLE_STORE; -import static com.linkedin.venice.controllerapi.ControllerRoute.ENABLE_THROTTLING; -import static com.linkedin.venice.controllerapi.ControllerRoute.END_OF_PUSH; -import static com.linkedin.venice.controllerapi.ControllerRoute.EXECUTION; -import static com.linkedin.venice.controllerapi.ControllerRoute.FUTURE_VERSION; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_ACL; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_ADMIN_TOPIC_METADATA; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_ALL_MIGRATION_PUSH_STRATEGIES; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_ALL_REPLICATION_METADATA_SCHEMAS; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_ALL_VALUE_AND_DERIVED_SCHEMA; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_ALL_VALUE_SCHEMA; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_CLUSTER_STORAGE_PERSONAS; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_DELETABLE_STORE_TOPICS; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_HEARTBEAT_TIMESTAMP_FROM_SYSTEM_STORE; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_INUSE_SCHEMA_IDS; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_KAFKA_TOPIC_CONFIGS; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_KEY_SCHEMA; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_ONGOING_INCREMENTAL_PUSH_VERSIONS; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_REGION_PUSH_DETAILS; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_REPUSH_INFO; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_ROUTERS_CLUSTER_CONFIG; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_STALE_STORES_IN_CLUSTER; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_STORAGE_PERSONA; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_STORAGE_PERSONA_ASSOCIATED_WITH_STORE; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_STORES_IN_CLUSTER; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_STORE_LARGEST_USED_VERSION; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_VALUE_OR_DERIVED_SCHEMA_ID; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_VALUE_SCHEMA; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_VALUE_SCHEMA_ID; -import static com.linkedin.venice.controllerapi.ControllerRoute.IS_STORE_VERSION_READY_FOR_DATA_RECOVERY; -import static com.linkedin.venice.controllerapi.ControllerRoute.JOB; -import static com.linkedin.venice.controllerapi.ControllerRoute.KILL_OFFLINE_PUSH_JOB; -import static com.linkedin.venice.controllerapi.ControllerRoute.LAST_SUCCEED_EXECUTION_ID; -import static com.linkedin.venice.controllerapi.ControllerRoute.LEADER_CONTROLLER; -import static com.linkedin.venice.controllerapi.ControllerRoute.LIST_BOOTSTRAPPING_VERSIONS; -import static com.linkedin.venice.controllerapi.ControllerRoute.LIST_CHILD_CLUSTERS; -import static com.linkedin.venice.controllerapi.ControllerRoute.LIST_NODES; -import static com.linkedin.venice.controllerapi.ControllerRoute.LIST_REPLICAS; -import static com.linkedin.venice.controllerapi.ControllerRoute.LIST_STORES; -import static com.linkedin.venice.controllerapi.ControllerRoute.LIST_STORE_PUSH_INFO; -import static com.linkedin.venice.controllerapi.ControllerRoute.MASTER_CONTROLLER; -import static com.linkedin.venice.controllerapi.ControllerRoute.MIGRATE_STORE; -import static com.linkedin.venice.controllerapi.ControllerRoute.NEW_STORE; -import static com.linkedin.venice.controllerapi.ControllerRoute.NODE_REMOVABLE; -import static com.linkedin.venice.controllerapi.ControllerRoute.NODE_REPLICAS; -import static com.linkedin.venice.controllerapi.ControllerRoute.NODE_REPLICAS_READINESS; -import static com.linkedin.venice.controllerapi.ControllerRoute.OFFLINE_PUSH_INFO; -import static com.linkedin.venice.controllerapi.ControllerRoute.PREPARE_DATA_RECOVERY; -import static com.linkedin.venice.controllerapi.ControllerRoute.REMOVE_DERIVED_SCHEMA; -import static com.linkedin.venice.controllerapi.ControllerRoute.REMOVE_NODE; -import static com.linkedin.venice.controllerapi.ControllerRoute.REMOVE_STORE_FROM_GRAVEYARD; -import static com.linkedin.venice.controllerapi.ControllerRoute.REPLICATE_META_DATA; -import static com.linkedin.venice.controllerapi.ControllerRoute.REQUEST_TOPIC; -import static com.linkedin.venice.controllerapi.ControllerRoute.ROLLBACK_TO_BACKUP_VERSION; -import static com.linkedin.venice.controllerapi.ControllerRoute.ROLL_FORWARD_TO_FUTURE_VERSION; -import static com.linkedin.venice.controllerapi.ControllerRoute.SEND_HEARTBEAT_TIMESTAMP_TO_SYSTEM_STORE; -import static com.linkedin.venice.controllerapi.ControllerRoute.SEND_PUSH_JOB_DETAILS; -import static com.linkedin.venice.controllerapi.ControllerRoute.SET_MIGRATION_PUSH_STRATEGY; -import static com.linkedin.venice.controllerapi.ControllerRoute.SET_OWNER; -import static com.linkedin.venice.controllerapi.ControllerRoute.SET_PARTITION_COUNT; -import static com.linkedin.venice.controllerapi.ControllerRoute.SET_TOPIC_COMPACTION; -import static com.linkedin.venice.controllerapi.ControllerRoute.SET_VERSION; -import static com.linkedin.venice.controllerapi.ControllerRoute.SKIP_ADMIN; -import static com.linkedin.venice.controllerapi.ControllerRoute.STORAGE_ENGINE_OVERHEAD_RATIO; -import static com.linkedin.venice.controllerapi.ControllerRoute.STORE; -import static com.linkedin.venice.controllerapi.ControllerRoute.STORE_MIGRATION_ALLOWED; -import static com.linkedin.venice.controllerapi.ControllerRoute.UPDATE_ACL; -import static com.linkedin.venice.controllerapi.ControllerRoute.UPDATE_ADMIN_TOPIC_METADATA; -import static com.linkedin.venice.controllerapi.ControllerRoute.UPDATE_CLUSTER_CONFIG; -import static com.linkedin.venice.controllerapi.ControllerRoute.UPDATE_KAFKA_TOPIC_LOG_COMPACTION; -import static com.linkedin.venice.controllerapi.ControllerRoute.UPDATE_KAFKA_TOPIC_MIN_IN_SYNC_REPLICA; -import static com.linkedin.venice.controllerapi.ControllerRoute.UPDATE_KAFKA_TOPIC_RETENTION; -import static com.linkedin.venice.controllerapi.ControllerRoute.UPDATE_STORAGE_PERSONA; -import static com.linkedin.venice.controllerapi.ControllerRoute.UPDATE_STORE; -import static com.linkedin.venice.controllerapi.ControllerRoute.UPLOAD_PUSH_JOB_STATUS; -import static com.linkedin.venice.controllerapi.ControllerRoute.WIPE_CLUSTER; +import static com.linkedin.venice.controllerapi.ControllerRoute.*; import com.fasterxml.jackson.databind.ObjectMapper; import com.linkedin.venice.HttpConstants; @@ -616,6 +514,9 @@ public boolean startInner() throws Exception { httpService.post( UPDATE_ADMIN_TOPIC_METADATA.getPath(), new VeniceParentControllerRegionStateHandler(admin, adminTopicMetadataRoutes.updateAdminTopicMetadata(admin))); + httpService.post( + UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION.getPath(), + new VeniceParentControllerRegionStateHandler(admin, adminTopicMetadataRoutes.updateAdminTopicMetadata(admin))); httpService.post( DELETE_KAFKA_TOPIC.getPath(), diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java index 16fbd7cf0ac..4dccc797fb9 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java @@ -84,15 +84,13 @@ public Route updateAdminTopicMetadata(Admin admin) { Optional storeName = Optional.ofNullable(request.queryParams(NAME)); Optional offset = Optional.ofNullable(request.queryParams(OFFSET)).map(Long::parseLong); Optional upstreamOffset = Optional.ofNullable(request.queryParams(UPSTREAM_OFFSET)).map(Long::parseLong); - Optional adminOperationProtocolVersion = - Optional.ofNullable(request.queryParams(ADMIN_OPERATION_PROTOCOL_VERSION)).map(Long::parseLong); if (storeName.isPresent()) { if (offset.isPresent() || upstreamOffset.isPresent()) { throw new VeniceException("There is no store-level offsets to be updated"); } } else { - if (!adminOperationProtocolVersion.isPresent() && (!offset.isPresent() || !upstreamOffset.isPresent())) { + if (!offset.isPresent() || !upstreamOffset.isPresent()) { throw new VeniceException( "Offsets must be provided to update cluster-level admin topic metadata if no store name and admin operation version provided."); } @@ -101,13 +99,35 @@ public Route updateAdminTopicMetadata(Admin admin) { responseObject.setCluster(clusterName); storeName.ifPresent(responseObject::setName); - admin.updateAdminTopicMetadata( - clusterName, - executionId, - storeName, - offset, - upstreamOffset, - adminOperationProtocolVersion); + admin.updateAdminTopicMetadata(clusterName, executionId, storeName, offset, upstreamOffset); + } catch (Throwable e) { + responseObject.setError(e); + AdminSparkServer.handleError(new VeniceException(e), request, response); + } + return AdminSparkServer.OBJECT_MAPPER.writeValueAsString(responseObject); + }; + } + + public Route updateAdminOperationProtocolVersion(Admin admin) { + return (request, response) -> { + ControllerResponse responseObject = new ControllerResponse(); + response.type(HttpConstants.JSON); + try { + if (!isAllowListUser(request)) { + response.status(HttpStatus.SC_FORBIDDEN); + responseObject.setError("Only admin users are allowed to run " + request.url()); + responseObject.setErrorType(ErrorType.BAD_REQUEST); + return AdminSparkServer.OBJECT_MAPPER.writeValueAsString(responseObject); + } + + AdminSparkServer.validateParams(request, UPDATE_ADMIN_TOPIC_METADATA.getParams(), admin); + String clusterName = request.queryParams(CLUSTER); + Long adminOperationProtocolVersion = Long.parseLong(request.queryParams(ADMIN_OPERATION_PROTOCOL_VERSION)); + + responseObject.setCluster(clusterName); + + admin.updateAdminOperationProtocolVersion(clusterName, adminOperationProtocolVersion); + } catch (Throwable e) { responseObject.setError(e); AdminSparkServer.handleError(new VeniceException(e), request, response);