From 59ce4f5f2e81515d448fd91e433ad1d1e5a7cf17 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Wed, 25 Dec 2024 23:01:56 -0800 Subject: [PATCH] Add Tenant aware Rest Tests for Workflows Signed-off-by: Daniel Widdis --- .github/workflows/CI.yml | 16 +- build.gradle | 33 +- .../flowframework/model/Template.java | 1 + .../rest/RestCreateWorkflowAction.java | 94 +++--- .../resources/mappings/global-context.json | 5 +- .../FlowFrameworkRestTestCase.java | 14 + .../FlowFrameworkTenantAwareRestTestCase.java | 207 ++++++++++++ .../rest/FlowFrameworkRestApiIT.java | 2 +- .../rest/RestWorkflowTenantAwareIT.java | 309 ++++++++++++++++++ .../GetWorkflowStateTransportActionTests.java | 4 +- .../flowframework/util/ParseUtilsTests.java | 2 - 11 files changed, 634 insertions(+), 53 deletions(-) create mode 100644 src/test/java/org/opensearch/flowframework/FlowFrameworkTenantAwareRestTestCase.java create mode 100644 src/test/java/org/opensearch/flowframework/rest/RestWorkflowTenantAwareIT.java diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 25820be34..70c0e4507 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -26,11 +26,22 @@ jobs: if: github.repository == 'opensearch-project/flow-framework' runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 - - uses: actions/setup-java@v4 + # TEMPORARY until this is on Maven + - name: Set up JDK 21 + uses: actions/setup-java@v4 with: java-version: 21 distribution: temurin + - name: Checkout Metadata Client + uses: actions/checkout@v4 + with: + repository: dbwiddis/opensearch-remote-metadata-sdk + ref: main + path: opensearch-remote-metadata-sdk + - name: Publish to maven local + run: ./gradlew publishToMavenLocal + # end TEMPORARY code + - uses: actions/checkout@v4 - name: Javadoc CheckStyle run: ./gradlew checkstyleMain - name: Javadoc Check @@ -98,3 +109,4 @@ jobs: - name: Build and Run Tests run: | ./gradlew integTest -PnumNodes=3 + ./gradlew integTest -PnumNodes=3 -Dtests.rest.tenantaware=true diff --git a/build.gradle b/build.gradle index d61d53a88..812b19db6 100644 --- a/build.gradle +++ b/build.gradle @@ -288,10 +288,19 @@ integTest { systemProperty('user', user) systemProperty('password', password) + // Only tenant aware test if set + if (System.getProperty("tests.rest.tenantaware") == "true") { + filter { + includeTestsMatching "org.opensearch.flowframework.*TenantAwareIT" + } + systemProperty "plugins.flow_framework.multi_tenancy_enabled", "true" + } + // Only rest case can run with remote cluster - if (System.getProperty("tests.rest.cluster") != null) { + if (System.getProperty("tests.rest.cluster") != null && System.getProperty("tests.rest.tenantaware") == null) { filter { includeTestsMatching "org.opensearch.flowframework.rest.*IT" + excludeTestsMatching "org.opensearch.ml.rest.*TenantAwareIT" } } @@ -319,6 +328,28 @@ integTest { // doFirst delays this block until execution time doFirst { + if (System.getProperty("tests.rest.tenantaware") == "true") { + def ymlFile = file("$buildDir/testclusters/integTest-0/config/opensearch.yml") + if (ymlFile.exists()) { + ymlFile.withWriterAppend { + writer -> + writer.write("\n# Set multitenancy\n") + writer.write("plugins.flow_framework.multi_tenancy_enabled: true\n") + } + // TODO this properly uses the remote client factory but needs a remote cluster set up + // TODO get the endpoint from a system property + if (System.getProperty("tests.rest.cluster") != null) { + ymlFile.withWriterAppend { writer -> + writer.write("\n# Use a remote cluster\n") + writer.write("plugins.flow_framework.remote_metadata_type: RemoteOpenSearch\n") + writer.write("plugins.flow_framework.remote_metadata_endpoint: https://127.0.0.1:9200\n") + } + } + } else { + throw new GradleException("opensearch.yml not found at: $ymlFile") + } + } + // Tell the test JVM if the cluster JVM is running under a debugger so that tests can // use longer timeouts for requests. def isDebuggingCluster = getDebug() || System.getProperty("test.debug") != null diff --git a/src/main/java/org/opensearch/flowframework/model/Template.java b/src/main/java/org/opensearch/flowframework/model/Template.java index 28b36377b..0402a4f04 100644 --- a/src/main/java/org/opensearch/flowframework/model/Template.java +++ b/src/main/java/org/opensearch/flowframework/model/Template.java @@ -166,6 +166,7 @@ private Builder(Template t) { this.createdTime = t.createdTime(); this.lastUpdatedTime = t.lastUpdatedTime(); this.lastProvisionedTime = t.lastProvisionedTime(); + this.tenantId = t.getTenantId(); } /** diff --git a/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java index 68774884d..e225e09f2 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java @@ -102,52 +102,56 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli .stream() .filter(e -> !request.consumedParams().contains(e.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - if (!flowFrameworkSettings.isFlowFrameworkEnabled()) { - FlowFrameworkException ffe = new FlowFrameworkException( - "This API is disabled. To enable it, set [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.", - RestStatus.FORBIDDEN - ); - return channel -> channel.sendResponse( - new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS)) - ); - } - String tenantId = RestActionUtils.getTenantID(flowFrameworkSettings.isMultiTenancyEnabled(), request); - if (!provision && !params.isEmpty()) { - FlowFrameworkException ffe = new FlowFrameworkException( - "Only the parameters " + request.consumedParams() + " are permitted unless the provision parameter is set to true.", - RestStatus.BAD_REQUEST - ); - return processError(ffe, params, request); - } - if (provision && updateFields) { - FlowFrameworkException ffe = new FlowFrameworkException( - "You can not use both the " + PROVISION_WORKFLOW + " and " + UPDATE_WORKFLOW_FIELDS + " parameters in the same request.", - RestStatus.BAD_REQUEST - ); - return processError(ffe, params, request); - } - if (reprovision && workflowId == null) { - FlowFrameworkException ffe = new FlowFrameworkException( - "You can not use the " + REPROVISION_WORKFLOW + " parameter to create a new template.", - RestStatus.BAD_REQUEST - ); - return processError(ffe, params, request); - } - if (reprovision && useCase != null) { - FlowFrameworkException ffe = new FlowFrameworkException( - "You cannot use the " + REPROVISION_WORKFLOW + " and " + USE_CASE + " parameters in the same request.", - RestStatus.BAD_REQUEST - ); - return processError(ffe, params, request); - } - if (reprovision && !params.isEmpty()) { - FlowFrameworkException ffe = new FlowFrameworkException( - "Only the parameters " + request.consumedParams() + " are permitted unless the provision parameter is set to true.", - RestStatus.BAD_REQUEST - ); - return processError(ffe, params, request); - } try { + if (!flowFrameworkSettings.isFlowFrameworkEnabled()) { + FlowFrameworkException ffe = new FlowFrameworkException( + "This API is disabled. To enable it, set [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.", + RestStatus.FORBIDDEN + ); + return channel -> channel.sendResponse( + new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS)) + ); + } + String tenantId = RestActionUtils.getTenantID(flowFrameworkSettings.isMultiTenancyEnabled(), request); + if (!provision && !params.isEmpty()) { + FlowFrameworkException ffe = new FlowFrameworkException( + "Only the parameters " + request.consumedParams() + " are permitted unless the provision parameter is set to true.", + RestStatus.BAD_REQUEST + ); + return processError(ffe, params, request); + } + if (provision && updateFields) { + FlowFrameworkException ffe = new FlowFrameworkException( + "You can not use both the " + + PROVISION_WORKFLOW + + " and " + + UPDATE_WORKFLOW_FIELDS + + " parameters in the same request.", + RestStatus.BAD_REQUEST + ); + return processError(ffe, params, request); + } + if (reprovision && workflowId == null) { + FlowFrameworkException ffe = new FlowFrameworkException( + "You can not use the " + REPROVISION_WORKFLOW + " parameter to create a new template.", + RestStatus.BAD_REQUEST + ); + return processError(ffe, params, request); + } + if (reprovision && useCase != null) { + FlowFrameworkException ffe = new FlowFrameworkException( + "You cannot use the " + REPROVISION_WORKFLOW + " and " + USE_CASE + " parameters in the same request.", + RestStatus.BAD_REQUEST + ); + return processError(ffe, params, request); + } + if (reprovision && !params.isEmpty()) { + FlowFrameworkException ffe = new FlowFrameworkException( + "Only the parameters " + request.consumedParams() + " are permitted unless the provision parameter is set to true.", + RestStatus.BAD_REQUEST + ); + return processError(ffe, params, request); + } Template template; Map useCaseDefaultsMap = Collections.emptyMap(); if (useCase != null) { diff --git a/src/main/resources/mappings/global-context.json b/src/main/resources/mappings/global-context.json index 544b4a9af..b79ba07df 100644 --- a/src/main/resources/mappings/global-context.json +++ b/src/main/resources/mappings/global-context.json @@ -1,7 +1,7 @@ { "dynamic": false, "_meta": { - "schema_version": 3 + "schema_version": 4 }, "properties": { "workflow_id": { @@ -86,6 +86,9 @@ "type": "date", "format": "strict_date_time||epoch_millis" }, + "tenant_id": { + "type": "keyword" + }, "ui_metadata": { "type": "object", "enabled": false diff --git a/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java b/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java index 3570dccf6..f1b6d338b 100644 --- a/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java +++ b/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java @@ -18,6 +18,7 @@ import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; import org.apache.hc.client5.http.ssl.NoopHostnameVerifier; import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpEntity; import org.apache.hc.core5.http.HttpHeaders; import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.message.BasicHeader; @@ -146,6 +147,19 @@ protected String getProtocol() { return isHttps() ? "https" : "http"; } + public static Map responseToMap(Response response) throws IOException { + HttpEntity entity = response.getEntity(); + assertNotNull(response); + String entityString = TestHelpers.httpEntityToString(entity); + XContentParser parser = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + entityString + ); + parser.nextToken(); + return parser.map(); + } + // Utility fn for deleting indices. Should only be used when not allowed in a regular context // (e.g., deleting system indices) protected static void deleteIndexWithAdminClient(String name) throws IOException { diff --git a/src/test/java/org/opensearch/flowframework/FlowFrameworkTenantAwareRestTestCase.java b/src/test/java/org/opensearch/flowframework/FlowFrameworkTenantAwareRestTestCase.java new file mode 100644 index 000000000..72126c8d5 --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/FlowFrameworkTenantAwareRestTestCase.java @@ -0,0 +1,207 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework; + +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.message.BasicHeader; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.Response; +import org.opensearch.client.ResponseException; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.DeprecationHandler; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.rest.FakeRestRequest; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonList; +import static org.opensearch.common.xcontent.XContentType.JSON; +import static org.opensearch.flowframework.common.CommonValue.*; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.*; + +public abstract class FlowFrameworkTenantAwareRestTestCase extends FlowFrameworkRestTestCase { + + // Toggle to run DDB tests + // TODO: Get this from a property + protected static final boolean DDB = false; + + protected static final String DOC_ID = "_id"; + + // REST methods + protected static final String POST = RestRequest.Method.POST.name(); + protected static final String GET = RestRequest.Method.GET.name(); + protected static final String PUT = RestRequest.Method.PUT.name(); + protected static final String DELETE = RestRequest.Method.DELETE.name(); + + // REST body + protected static final String MATCH_ALL_QUERY = "{\"query\":{\"match_all\":{}}}"; + protected static final String EMPTY_CONTENT = "{}"; + + // REST Response error reasons + protected static final String MISSING_TENANT_REASON = "Tenant ID header is missing"; + protected static final String NO_PERMISSION_REASON = "You don't have permission to access this resource"; + + protected String tenantId = randomAlphaOfLength(5); + protected String otherTenantId = randomAlphaOfLength(6); + + protected final RestRequest tenantRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withHeaders( + Map.of(TENANT_ID_HEADER, singletonList(tenantId)) + ).build(); + protected final RestRequest otherTenantRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withHeaders( + Map.of(TENANT_ID_HEADER, singletonList(otherTenantId)) + ).build(); + protected final RestRequest nullTenantRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withHeaders(emptyMap()) + .build(); + + protected final RestRequest tenantMatchAllRequest = getRestRequestWithHeadersAndContent(tenantId, MATCH_ALL_QUERY); + protected final RestRequest otherTenantMatchAllRequest = getRestRequestWithHeadersAndContent(otherTenantId, MATCH_ALL_QUERY); + protected final RestRequest nullTenantMatchAllRequest = getRestRequestWithHeadersAndContent(null, MATCH_ALL_QUERY); + + protected static boolean isMultiTenancyEnabled() throws IOException { + // pass -Dtests.rest.tenantaware=true on gradle command line to enable + return Boolean.parseBoolean(System.getProperty(FLOW_FRAMEWORK_MULTI_TENANCY_ENABLED.getKey())) + || Boolean.parseBoolean(System.getenv(FLOW_FRAMEWORK_MULTI_TENANCY_ENABLED.getKey())); + } + + protected static Response makeRequest(RestRequest request, String method, String path) throws IOException { + return TestHelpers.makeRequest( + client(), + method, + path, + request.params(), + request.content().utf8ToString(), + getHeadersFromRequest(request) + ); + } + + private static List
getHeadersFromRequest(RestRequest request) { + return request.getHeaders() + .entrySet() + .stream() + .map(e -> new BasicHeader(e.getKey(), e.getValue().stream().collect(Collectors.joining(",")))) + .collect(Collectors.toList()); + } + + protected static RestRequest getRestRequestWithHeadersAndContent(String tenantId, String requestContent) { + Map> headers = new HashMap<>(); + if (tenantId != null) { + headers.put(TENANT_ID_HEADER, singletonList(tenantId)); + } + return new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withHeaders(headers) + .withContent(new BytesArray(requestContent), JSON) + .build(); + } + + @SuppressWarnings("unchecked") + protected static String getErrorReasonFromResponseMap(Map map) { + // FlowFrameworkExceptions have a simple error field + if (map.get("error") instanceof String) { + return (String) map.get("error"); + } + + // OpenSearchStatusExceptions have different possibilities based on client + String type = ((Map) map.get("error")).get("type"); + + // { + // "error": { + // "root_cause": [ + // { + // "type": "status_exception", + // "reason": "You don't have permission to access this resource" + // } + // ], + // "type": "status_exception", + // "reason": "You don't have permission to access this resource" + // }, + // "status": 403 + // } + if ("status_exception".equals(type)) { + return ((Map) map.get("error")).get("reason"); + } + + // { + // "error": { + // "reason": "System Error", + // "details": "You don't have permission to access this resource", + // "type": "OpenSearchStatusException" + // }, + // "status": 403 + // } + return ((Map) map.get("error")).get("details"); + } + + protected static SearchResponse searchResponseFromResponse(Response response) throws IOException { + XContentParser parser = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.IGNORE_DEPRECATIONS, + TestHelpers.httpEntityToString(response.getEntity()).getBytes(UTF_8) + ); + return SearchResponse.fromXContent(parser); + } + + protected static void assertBadRequest(Response response) { + assertEquals(RestStatus.BAD_REQUEST.getStatus(), response.getStatusLine().getStatusCode()); + } + + protected static void assertNotFound(Response response) { + assertEquals(RestStatus.NOT_FOUND.getStatus(), response.getStatusLine().getStatusCode()); + } + + protected static void assertForbidden(Response response) { + assertEquals(RestStatus.FORBIDDEN.getStatus(), response.getStatusLine().getStatusCode()); + } + + protected static void assertUnauthorized(Response response) { + assertEquals(RestStatus.UNAUTHORIZED.getStatus(), response.getStatusLine().getStatusCode()); + } + + /** + * Delete the specified document and wait until a search matches only the specified number of hits + * @param tenantId The tenant ID to filter the search by + * @param restPath The base path for the REST API + * @param id The document ID to be appended to the REST API for deletion + * @param hits The number of hits to expect after the deletion is processed + * @throws Exception on failures with building or making the request + */ + protected static void deleteAndWaitForSearch(String tenantId, String restPath, String id, int hits) throws Exception { + RestRequest request = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withHeaders( + Map.of(TENANT_ID_HEADER, singletonList(tenantId)) + ).build(); + // First process the deletion. Dependent resources (e.g. model with connector) may cause 409 status until they are deleted + assertBusy(() -> { + try { + Response deleteResponse = makeRequest(request, DELETE, restPath + id); + // first successful deletion should produce an OK + assertOK(deleteResponse); + } catch (ResponseException e) { + // repeat deletions can produce a 404, treat as a success + assertNotFound(e.getResponse()); + } + }, 20, TimeUnit.SECONDS); + // Deletion processed, now wait for it to disappear from search + RestRequest searchRequest = getRestRequestWithHeadersAndContent(tenantId, MATCH_ALL_QUERY); + assertBusy(() -> { + Response response = makeRequest(searchRequest, GET, restPath + "_search"); + assertOK(response); + SearchResponse searchResponse = searchResponseFromResponse(response); + assertEquals(hits, searchResponse.getHits().getTotalHits().value); + }, 20, TimeUnit.SECONDS); + } +} diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java index 5b8ccae9e..285cb7b45 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java @@ -88,7 +88,7 @@ public void testFailedUpdateWorkflow() throws Exception { Map responseMap = entityAsMap(response); String workflowId = (String) responseMap.get(WORKFLOW_ID); - Response provisionResponse = provisionResponse = provisionWorkflow(client(), workflowId); + Response provisionResponse = provisionWorkflow(client(), workflowId); assertEquals(RestStatus.OK, TestHelpers.restStatus(provisionResponse)); getAndAssertWorkflowStatus(client(), workflowId, State.PROVISIONING, ProvisioningProgress.IN_PROGRESS); diff --git a/src/test/java/org/opensearch/flowframework/rest/RestWorkflowTenantAwareIT.java b/src/test/java/org/opensearch/flowframework/rest/RestWorkflowTenantAwareIT.java new file mode 100644 index 000000000..ba9f3383e --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/rest/RestWorkflowTenantAwareIT.java @@ -0,0 +1,309 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.rest; + +import org.opensearch.client.Response; +import org.opensearch.client.ResponseException; +import org.opensearch.flowframework.FlowFrameworkTenantAwareRestTestCase; +import org.opensearch.flowframework.util.ParseUtils; +import org.opensearch.rest.RestRequest; + +import java.io.IOException; +import java.util.Map; + +import static org.opensearch.flowframework.common.CommonValue.TENANT_ID_FIELD; +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID; +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI; + +public class RestWorkflowTenantAwareIT extends FlowFrameworkTenantAwareRestTestCase { + + private static final String WORKFLOW_PATH = WORKFLOW_URI + "/"; + + public void testWorkflowCRUD() throws Exception { + boolean multiTenancyEnabled = isMultiTenancyEnabled(); + + /* + * Create + */ + // Create a workflow with a tenant id + RestRequest createWorkflowRequest = getRestRequestWithHeadersAndContent(tenantId, createNoOpTemplate()); + Response response = makeRequest(createWorkflowRequest, POST, WORKFLOW_PATH); + assertOK(response); + Map map = responseToMap(response); + assertTrue(map.containsKey(WORKFLOW_ID)); + String workflowId = map.get(WORKFLOW_ID).toString(); + + /* + * Get + */ + // Now try to get that workflow + response = makeRequest(tenantRequest, GET, WORKFLOW_PATH + workflowId); + assertOK(response); + map = responseToMap(response); + assertEquals("noop", map.get("name")); + if (multiTenancyEnabled) { + assertEquals(tenantId, map.get(TENANT_ID_FIELD)); + } else { + assertNull(map.get(TENANT_ID_FIELD)); + } + + // Now try again with an other ID + if (multiTenancyEnabled) { + ResponseException ex = assertThrows( + ResponseException.class, + () -> makeRequest(otherTenantRequest, GET, WORKFLOW_PATH + workflowId) + ); + response = ex.getResponse(); + map = responseToMap(response); + if (DDB) { + assertNotFound(response); + assertEquals("Failed to retrieve template (" + workflowId + ") from global context.", getErrorReasonFromResponseMap(map)); + } else { + assertForbidden(response); + assertEquals(NO_PERMISSION_REASON, getErrorReasonFromResponseMap(map)); + } + } else { + response = makeRequest(otherTenantRequest, GET, WORKFLOW_PATH + workflowId); + assertOK(response); + map = responseToMap(response); + assertEquals("noop", map.get("name")); + } + + // Now try again with a null ID + if (multiTenancyEnabled) { + ResponseException ex = assertThrows( + ResponseException.class, + () -> makeRequest(nullTenantRequest, GET, WORKFLOW_PATH + workflowId) + ); + response = ex.getResponse(); + map = responseToMap(response); + assertForbidden(response); + assertEquals(MISSING_TENANT_REASON, getErrorReasonFromResponseMap(map)); + } else { + response = makeRequest(nullTenantRequest, GET, WORKFLOW_PATH + workflowId); + assertOK(response); + map = responseToMap(response); + assertEquals("noop", map.get("name")); + } + + /* + * Update + */ + // Now attempt to update the workflow name + RestRequest updateRequest = getRestRequestWithHeadersAndContent(tenantId, "{\"name\":\"Updated name\"}"); + response = makeRequest(updateRequest, PUT, WORKFLOW_PATH + workflowId); + assertOK(response); + map = responseToMap(response); + assertEquals(workflowId, map.get(WORKFLOW_ID).toString()); + + // Verify the update + response = makeRequest(tenantRequest, GET, WORKFLOW_PATH + workflowId); + assertOK(response); + map = responseToMap(response); + assertEquals("Updated name", map.get("name")); + + // Try the update with other tenant ID + RestRequest otherUpdateRequest = getRestRequestWithHeadersAndContent(otherTenantId, "{\"name\":\"Other tenant name\"}"); + if (multiTenancyEnabled) { + ResponseException ex = assertThrows( + ResponseException.class, + () -> makeRequest(otherUpdateRequest, PUT, WORKFLOW_PATH + workflowId) + ); + response = ex.getResponse(); + map = responseToMap(response); + if (DDB) { + assertNotFound(response); + assertEquals("Failed to retrieve template (" + workflowId + ") from global context.", getErrorReasonFromResponseMap(map)); + } else { + assertForbidden(response); + assertEquals(NO_PERMISSION_REASON, getErrorReasonFromResponseMap(map)); + } + } else { + response = makeRequest(otherUpdateRequest, PUT, WORKFLOW_PATH + workflowId); + assertOK(response); + // Verify the update + response = makeRequest(otherTenantRequest, GET, WORKFLOW_PATH + workflowId); + assertOK(response); + map = responseToMap(response); + assertEquals("Other tenant name", map.get("name")); + } + + // Try the update with no tenant ID + RestRequest nullUpdateRequest = getRestRequestWithHeadersAndContent(null, "{\"name\":\"Null tenant name\"}"); + if (multiTenancyEnabled) { + ResponseException ex = assertThrows( + ResponseException.class, + () -> makeRequest(nullUpdateRequest, PUT, WORKFLOW_PATH + workflowId) + ); + response = ex.getResponse(); + map = responseToMap(response); + assertForbidden(response); + assertEquals(MISSING_TENANT_REASON, getErrorReasonFromResponseMap(map)); + } else { + response = makeRequest(nullUpdateRequest, PUT, WORKFLOW_PATH + workflowId); + assertOK(response); + // Verify the update + response = makeRequest(tenantRequest, GET, WORKFLOW_PATH + workflowId); + assertOK(response); + map = responseToMap(response); + assertEquals("Null tenant name", map.get("name")); + } + + // Verify no change from original update when multiTenancy enabled + if (multiTenancyEnabled) { + response = makeRequest(tenantRequest, GET, WORKFLOW_PATH + workflowId); + assertOK(response); + map = responseToMap(response); + assertEquals("Updated name", map.get("name")); + } + + /* + * Search + */ + // Create a second workflow using otherTenantId + RestRequest otherWorkflowRequest = getRestRequestWithHeadersAndContent(otherTenantId, createNoOpTemplate()); + response = makeRequest(otherWorkflowRequest, POST, WORKFLOW_PATH); + assertOK(response); + map = responseToMap(response); + assertTrue(map.containsKey(WORKFLOW_ID)); + String otherWorkflowId = map.get(WORKFLOW_ID).toString(); + + // Verify it + response = makeRequest(otherTenantRequest, GET, WORKFLOW_PATH + otherWorkflowId); + assertOK(response); + map = responseToMap(response); + assertEquals("noop", map.get("name")); + + // Retry these tests until they pass. Search requires refresh, can take 15s on DDB + refreshAllIndices(); + + /* Search not yet implemented TODO + assertBusy(() -> { + // Search should show only the workflow for tenant + Response restResponse = makeRequest(tenantMatchAllRequest, GET, WORKFLOW_PATH + "_search"); + assertOK(restResponse); + SearchResponse searchResponse = searchResponseFromResponse(restResponse); + if (multiTenancyEnabled) { + assertEquals(1, searchResponse.getHits().getTotalHits().value); + assertEquals(tenantId, searchResponse.getHits().getHits()[0].getSourceAsMap().get(TENANT_ID_FIELD)); + } else { + assertEquals(2, searchResponse.getHits().getTotalHits().value); + assertNull(searchResponse.getHits().getHits()[0].getSourceAsMap().get(TENANT_ID_FIELD)); + assertNull(searchResponse.getHits().getHits()[1].getSourceAsMap().get(TENANT_ID_FIELD)); + } + }, 20, TimeUnit.SECONDS); + + assertBusy(() -> { + // Search should show only the workflow for other tenant + Response restResponse = makeRequest(otherTenantMatchAllRequest, GET, WORKFLOW_PATH + "_search"); + assertOK(restResponse); + SearchResponse searchResponse = searchResponseFromResponse(restResponse); + if (multiTenancyEnabled) { + assertEquals(1, searchResponse.getHits().getTotalHits().value); + assertEquals(otherTenantId, searchResponse.getHits().getHits()[0].getSourceAsMap().get(TENANT_ID_FIELD)); + } else { + assertEquals(2, searchResponse.getHits().getTotalHits().value); + assertNull(searchResponse.getHits().getHits()[0].getSourceAsMap().get(TENANT_ID_FIELD)); + assertNull(searchResponse.getHits().getHits()[1].getSourceAsMap().get(TENANT_ID_FIELD)); + } + }, 20, TimeUnit.SECONDS); + + // Search should fail without a tenant id + if (multiTenancyEnabled) { + ResponseException ex = assertThrows( + ResponseException.class, + () -> makeRequest(nullTenantMatchAllRequest, GET, WORKFLOW_PATH + "_search") + ); + response = ex.getResponse(); + assertForbidden(response); + map = responseToMap(response); + assertEquals(MISSING_TENANT_REASON, getErrorReasonFromResponseMap(map)); + } else { + response = makeRequest(nullTenantMatchAllRequest, GET, WORKFLOW_PATH + "_search"); + assertOK(response); + SearchResponse searchResponse = searchResponseFromResponse(response); + assertEquals(2, searchResponse.getHits().getTotalHits().value); + assertNull(searchResponse.getHits().getHits()[0].getSourceAsMap().get(TENANT_ID_FIELD)); + assertNull(searchResponse.getHits().getHits()[1].getSourceAsMap().get(TENANT_ID_FIELD)); + } + + /* + * Delete + */ + // Delete the workflows + // First test that we can't delete other tenant workflows + if (multiTenancyEnabled) { + ResponseException ex = assertThrows( + ResponseException.class, + () -> makeRequest(tenantRequest, DELETE, WORKFLOW_PATH + otherWorkflowId) + ); + response = ex.getResponse(); + map = responseToMap(response); + if (DDB) { + assertNotFound(response); + assertEquals( + "Failed to find workflow with the provided workflow id: " + otherWorkflowId, + getErrorReasonFromResponseMap(map) + ); + } else { + assertForbidden(response); + assertEquals(NO_PERMISSION_REASON, getErrorReasonFromResponseMap(map)); + } + + ex = assertThrows(ResponseException.class, () -> makeRequest(otherTenantRequest, DELETE, WORKFLOW_PATH + workflowId)); + response = ex.getResponse(); + map = responseToMap(response); + if (DDB) { + assertNotFound(response); + assertEquals("Failed to retrieve template (" + workflowId + ") from global context.", getErrorReasonFromResponseMap(map)); + } else { + assertForbidden(response); + assertEquals(NO_PERMISSION_REASON, getErrorReasonFromResponseMap(map)); + } + + // and can't delete without a tenant ID either + ex = assertThrows(ResponseException.class, () -> makeRequest(nullTenantRequest, DELETE, WORKFLOW_PATH + workflowId)); + response = ex.getResponse(); + map = responseToMap(response); + assertForbidden(response); + assertEquals(MISSING_TENANT_REASON, getErrorReasonFromResponseMap(map)); + } + + // Now actually do the deletions. Same result whether multi-tenancy is enabled. + // Delete from tenant + response = makeRequest(tenantRequest, DELETE, WORKFLOW_PATH + workflowId); + assertOK(response); + map = responseToMap(response); + assertEquals(workflowId, map.get(DOC_ID).toString()); + + // Verify the deletion + ResponseException ex = assertThrows(ResponseException.class, () -> makeRequest(tenantRequest, GET, WORKFLOW_PATH + workflowId)); + response = ex.getResponse(); + assertNotFound(response); + map = responseToMap(response); + assertEquals("Failed to retrieve template (" + workflowId + ") from global context.", getErrorReasonFromResponseMap(map)); + + // Delete from other tenant + response = makeRequest(otherTenantRequest, DELETE, WORKFLOW_PATH + otherWorkflowId); + assertOK(response); + map = responseToMap(response); + assertEquals(otherWorkflowId, map.get(DOC_ID).toString()); + + // Verify the deletion + ex = assertThrows(ResponseException.class, () -> makeRequest(otherTenantRequest, GET, WORKFLOW_PATH + otherWorkflowId)); + response = ex.getResponse(); + assertNotFound(response); + map = responseToMap(response); + assertEquals("Failed to retrieve template (" + otherWorkflowId + ") from global context.", getErrorReasonFromResponseMap(map)); + } + + private static String createNoOpTemplate() throws IOException { + return ParseUtils.resourceToString("/template/noop.json"); + } +} diff --git a/src/test/java/org/opensearch/flowframework/transport/GetWorkflowStateTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/GetWorkflowStateTransportActionTests.java index 8641654a5..f6dd0bd47 100644 --- a/src/test/java/org/opensearch/flowframework/transport/GetWorkflowStateTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/GetWorkflowStateTransportActionTests.java @@ -63,7 +63,6 @@ import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -196,6 +195,7 @@ public void testGetWorkflowStateResponse() throws IOException { public void testExecuteGetWorkflowStateRequestFailure() throws IOException, InterruptedException { String workflowId = "test-workflow"; GetWorkflowStateRequest request = new GetWorkflowStateRequest(workflowId, false, null); + @SuppressWarnings("unchecked") ActionListener listener = mock(ActionListener.class); PlainActionFuture future = PlainActionFuture.newFuture(); @@ -217,6 +217,7 @@ public void testExecuteGetWorkflowStateRequestFailure() throws IOException, Inte public void testExecuteGetWorkflowStateRequestIndexNotFound() throws IOException, InterruptedException { String workflowId = "test-workflow"; GetWorkflowStateRequest request = new GetWorkflowStateRequest(workflowId, false, null); + @SuppressWarnings("unchecked") ActionListener listener = mock(ActionListener.class); PlainActionFuture future = PlainActionFuture.newFuture(); @@ -238,6 +239,7 @@ public void testExecuteGetWorkflowStateRequestIndexNotFound() throws IOException public void testExecuteGetWorkflowStateRequestParseFailure() throws IOException, InterruptedException { String workflowId = "test-workflow"; GetWorkflowStateRequest request = new GetWorkflowStateRequest(workflowId, false, null); + @SuppressWarnings("unchecked") ActionListener listener = mock(ActionListener.class); XContentBuilder builder = XContentFactory.jsonBuilder(); diff --git a/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java b/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java index 3644ab0ee..fb65ab2c0 100644 --- a/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java +++ b/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java @@ -31,8 +31,6 @@ import java.util.Set; import static org.opensearch.flowframework.util.ParseUtils.isAdmin; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; public class ParseUtilsTests extends OpenSearchTestCase { public void testResourceToStringToJson() throws IOException {