Skip to content

Commit

Permalink
[KYUUBI #5717] Infer the proxy user automatically for delete batch op…
Browse files Browse the repository at this point in the history
…eration

# 🔍 Description
Infer the batch user from session or metadata, user do not need to specify the proxy user anymore.

This pr also align the behavior of BatchesResource with that of SessionsResource and OperationsResource(no proxy user parameter).

For Kyuubi Batch, Session and Operation, these resources have the user attiribute.

So we only need to check whether the authentication user has the permission to access the resource.

## Issue References 🔗

This pull request fixes #

## Describe Your Solution 🔧

Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklists
## 📝 Author Self Checklist

- [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project
- [x] I have performed a self-review
- [x] I have commented my code, particularly in hard-to-understand areas
- [x] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature works
- [x] New and existing unit tests pass locally with my changes
- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

## 📝 Committer Pre-Merge Checklist

- [x] Pull request title is okay.
- [x] No license issues.
- [x] Milestone correctly set?
- [ ] Test coverage is ok
- [x] Assignees are selected.
- [x] Minimum number of approvals
- [x] No changes are requested

**Be nice. Be informative.**

Closes #5717 from turboFei/hive_server2_proxy_user.

Closes #5717

70ad7e7 [fwang12] comment
c721a75 [fwang12] ignore
da92bd5 [fwang12] fix ut
9a197d0 [fwang12] doc
c8ed5f9 [fwang12] ut
cef9e32 [fwang12] do not use proxy user

Authored-by: fwang12 <[email protected]>
Signed-off-by: fwang12 <[email protected]>
  • Loading branch information
turboFei committed Nov 17, 2023
1 parent 0bcd107 commit 3478fc9
Show file tree
Hide file tree
Showing 10 changed files with 34 additions and 69 deletions.
10 changes: 0 additions & 10 deletions docs/client/rest/rest_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -410,16 +410,6 @@ The [Batch](#batch).

Kill the batch if it is still running.

#### Request Parameters

| Name | Description | Type |
|:------------------------|:------------------------------|:-----------------|
| proxyUser | the proxy user to impersonate | String(optional) |
| hive.server2.proxy.user | the proxy user to impersonate | String(optional) |

`proxyUser` is an alternative to `hive.server2.proxy.user`, and the current behavior is consistent with
`hive.server2.proxy.user`. When both parameters are set, `proxyUser` takes precedence.

#### Response Body

| Name | Description | Type |
Expand Down
4 changes: 4 additions & 0 deletions docs/deployment/migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@

* Since Kyuubi 1.9.0, `kyuubi.session.conf.advisor` can be set as a sequence, Kyuubi supported chaining SessionConfAdvisors.

## Upgrading from Kyuubi 1.8.0 to 1.8.1

* Since Kyuubi 1.8.1, for `DELETE /batches/${batchId}`, `hive.server2.proxy.user` is not needed in the request parameters.

## Upgrading from Kyuubi 1.7 to 1.8

* Since Kyuubi 1.8, SQLite is added and becomes the default database type of Kyuubi metastore, as Derby has been deprecated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class DeleteBatchCommand(cliConfig: CliConfig) extends Command[Batch](cliConfig)
val batchRestApi: BatchRestApi = new BatchRestApi(kyuubiRestClient)
val batchId = normalizedCliConfig.batchOpts.batchId

val result = batchRestApi.deleteBatch(batchId, normalizedCliConfig.commonOpts.hs2ProxyUser)
val result = batchRestApi.deleteBatch(batchId)

info(JsonUtils.toJson(result))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,6 @@ class BatchCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExit {
}
}

test("delete batch with hs2ProxyUser") {
val args = Array(
"delete",
"batch",
"f7fd702c-e54e-11ec-8fea-0242ac120002",
"--hs2ProxyUser",
"b_user")
val opArgs = new ControlCliArguments(args)
assert(opArgs.cliConfig.batchOpts.batchId == "f7fd702c-e54e-11ec-8fea-0242ac120002")
assert(opArgs.cliConfig.commonOpts.hs2ProxyUser == "b_user")
}

test("test list batch option") {
val args = Array(
"list",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import org.apache.kyuubi.client.api.v1.dto.*;
import org.apache.kyuubi.client.util.JsonUtils;
import org.apache.kyuubi.client.util.VersionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchRestApi {
static final Logger LOG = LoggerFactory.getLogger(BatchRestApi.class);

private KyuubiRestClient client;

Expand Down Expand Up @@ -101,14 +104,27 @@ public OperationLog getBatchLocalLog(String batchId, int from, int size) {
return this.getClient().get(path, params, OperationLog.class, client.getAuthHeader());
}

/**
* hs2ProxyUser for delete batch is deprecated since 1.8.1, please use {@link
* #deleteBatch(String)} instead.
*/
@Deprecated
public CloseBatchResponse deleteBatch(String batchId, String hs2ProxyUser) {
LOG.warn(
"The method `deleteBatch(batchId, hs2ProxyUser)` is deprecated since 1.8.1, "
+ "using `deleteBatch(batchId)` instead.");
Map<String, Object> params = new HashMap<>();
params.put("hive.server2.proxy.user", hs2ProxyUser);

String path = String.format("%s/%s", API_BASE_PATH, batchId);
return this.getClient().delete(path, params, CloseBatchResponse.class, client.getAuthHeader());
}

public CloseBatchResponse deleteBatch(String batchId) {
String path = String.format("%s/%s", API_BASE_PATH, batchId);
return this.getClient().delete(path, null, CloseBatchResponse.class, client.getAuthHeader());
}

private IRestClient getClient() {
return this.client.getHttpClient();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,13 @@ public void getOperationLogTest() {
public void deleteBatchTest() {
// test spnego auth
BatchTestServlet.setAuthSchema(NEGOTIATE_AUTH);
CloseBatchResponse response = spnegoBatchRestApi.deleteBatch("71535", "b_test");
CloseBatchResponse response = spnegoBatchRestApi.deleteBatch("71535");
assertTrue(response.isSuccess());

// test basic auth
BatchTestServlet.setAuthSchema(BASIC_AUTH);
BatchTestServlet.allowAnonymous(false);
response = basicBatchRestApi.deleteBatch("71535", "b_test");
response = basicBatchRestApi.deleteBatch("71535");
assertTrue(response.isSuccess());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import java.util.{Collections, Locale, UUID}
import java.util.concurrent.ConcurrentHashMap
import javax.ws.rs._
import javax.ws.rs.core.MediaType
import javax.ws.rs.core.Response.Status

import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -448,19 +447,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
description = "close and cancel a batch session")
@DELETE
@Path("{batchId}")
def closeBatchSession(
@PathParam("batchId") batchId: String,
@QueryParam("proxyUser") kyuubiProxyUser: String,
@QueryParam("hive.server2.proxy.user") hs2ProxyUser: String): CloseBatchResponse = {

def checkPermission(operator: String, owner: String): Unit = {
if (operator != owner) {
throw new WebApplicationException(
s"$operator is not allowed to close the session belong to $owner",
Status.METHOD_NOT_ALLOWED)
}
}

def closeBatchSession(@PathParam("batchId") batchId: String): CloseBatchResponse = {
def forceKill(
appMgrInfo: ApplicationManagerInfo,
batchId: String,
Expand All @@ -472,18 +459,15 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
(killed, message)
}

val activeProxyUser = Option(kyuubiProxyUser).getOrElse(hs2ProxyUser)
val userName = fe.getSessionUser(activeProxyUser)

val sessionHandle = formatSessionHandle(batchId)
sessionManager.getBatchSession(sessionHandle).map { batchSession =>
checkPermission(userName, batchSession.user)
fe.getSessionUser(batchSession.user)
sessionManager.closeSession(batchSession.handle)
val (killed, msg) = batchSession.batchJobSubmissionOp.getKillMessage
new CloseBatchResponse(killed, msg)
}.getOrElse {
sessionManager.getBatchMetadata(batchId).map { metadata =>
checkPermission(userName, metadata.username)
fe.getSessionUser(metadata.username)
if (OperationState.isTerminal(OperationState.withName(metadata.state))) {
new CloseBatchResponse(false, s"The batch[$metadata] has been terminated.")
} else if (batchV2Enabled(metadata.requestConf) && metadata.state == "INITIALIZED" &&
Expand All @@ -494,21 +478,21 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
} else if (batchV2Enabled(metadata.requestConf) && metadata.kyuubiInstance == null) {
// code goes here indicates metadata is outdated, recursively calls itself to refresh
// the metadata
closeBatchSession(batchId, kyuubiProxyUser, hs2ProxyUser)
closeBatchSession(batchId)
} else if (metadata.kyuubiInstance != fe.connectionUrl) {
info(s"Redirecting delete batch[$batchId] to ${metadata.kyuubiInstance}")
val internalRestClient = getInternalRestClient(metadata.kyuubiInstance)
try {
internalRestClient.deleteBatch(userName, batchId)
internalRestClient.deleteBatch(metadata.username, batchId)
} catch {
case e: KyuubiRestException =>
error(s"Error redirecting delete batch[$batchId] to ${metadata.kyuubiInstance}", e)
val (killed, msg) = forceKill(metadata.appMgrInfo, batchId, userName)
val (killed, msg) = forceKill(metadata.appMgrInfo, batchId, metadata.username)
new CloseBatchResponse(killed, if (killed) msg else Utils.stringifyException(e))
}
} else { // should not happen, but handle this for safe
warn(s"Something wrong on deleting batch[$batchId], try forcibly killing application")
val (killed, msg) = forceKill(metadata.appMgrInfo, batchId, userName)
val (killed, msg) = forceKill(metadata.appMgrInfo, batchId, metadata.username)
new CloseBatchResponse(killed, msg)
}
}.getOrElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class InternalRestClient(

def deleteBatch(user: String, batchId: String): CloseBatchResponse = {
withAuthUser(user) {
internalBatchRestApi.deleteBatch(batchId, null)
internalBatchRestApi.deleteBatch(batchId)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ abstract class BatchesResourceSuiteBase extends KyuubiFunSuite
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader(batch.getId))
.delete()
assert(deleteBatchResponse.getStatus === 405)
errorMessage = s"${batch.getId} is not allowed to close the session belong to anonymous"
errorMessage = s"Failed to validate proxy privilege of ${batch.getId} for anonymous"
assert(deleteBatchResponse.readEntity(classOf[String]).contains(errorMessage))

// invalid batchId
Expand All @@ -228,16 +228,6 @@ abstract class BatchesResourceSuiteBase extends KyuubiFunSuite
.delete()
assert(deleteBatchResponse.getStatus === 404)

// invalid proxy user
deleteBatchResponse = webTarget.path(s"api/v1/batches/${batch.getId}")
.queryParam("hive.server2.proxy.user", "invalidProxy")
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous"))
.delete()
assert(deleteBatchResponse.getStatus === 405)
errorMessage = "Failed to validate proxy privilege of anonymous for invalidProxy"
assert(deleteBatchResponse.readEntity(classOf[String]).contains(errorMessage))

// check close batch session
deleteBatchResponse = webTarget.path(s"api/v1/batches/${batch.getId}")
.request(MediaType.APPLICATION_JSON_TYPE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,9 @@ class BatchRestApiSuite extends RestClientTestHelper with BatchTestHelper {
}

// delete batch
val closeResp = batchRestApi.deleteBatch(batch.getId(), null)
val closeResp = batchRestApi.deleteBatch(batch.getId())
assert(closeResp.getMsg.nonEmpty)

// delete batch - error
val e = intercept[KyuubiRestException] {
batchRestApi.deleteBatch(batch.getId(), "fake")
}
assert(e.getCause.toString.contains(
s"Failed to validate proxy privilege of ${ldapUser} for fake"))

basicKyuubiRestClient.close()
}

Expand Down Expand Up @@ -170,7 +163,7 @@ class BatchRestApiSuite extends RestClientTestHelper with BatchTestHelper {
}

// delete batch
val closeResp = batchRestApi.deleteBatch(batch.getId(), proxyUser)
val closeResp = batchRestApi.deleteBatch(batch.getId())
assert(closeResp.getMsg.nonEmpty)

// list batches
Expand Down

0 comments on commit 3478fc9

Please sign in to comment.