From 3be25198a71d6cc3580e17a48f02e10e2a91d9b4 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Tue, 13 Aug 2024 21:46:24 +0000 Subject: [PATCH 01/20] Adding reprovision integration tests Signed-off-by: Joshua Palis --- .../FlowFrameworkRestTestCase.java | 19 ++ .../rest/FlowFrameworkRestApiIT.java | 206 ++++++++++++++++++ .../registerlocalmodel-createindex.json | 59 +++++ ...localmodel-ingestpipeline-createindex.json | 86 ++++++++ ...localmodel-ingestpipeline-updateindex.json | 86 ++++++++ .../registerlocalmodel-ingestpipeline.json | 52 +++++ .../template/registerlocalmodel.json | 29 +++ 7 files changed, 537 insertions(+) create mode 100644 src/test/resources/template/registerlocalmodel-createindex.json create mode 100644 src/test/resources/template/registerlocalmodel-ingestpipeline-createindex.json create mode 100644 src/test/resources/template/registerlocalmodel-ingestpipeline-updateindex.json create mode 100644 src/test/resources/template/registerlocalmodel-ingestpipeline.json create mode 100644 src/test/resources/template/registerlocalmodel.json diff --git a/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java b/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java index dc25f44fa..877b6292a 100644 --- a/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java +++ b/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java @@ -415,6 +415,25 @@ protected Response createWorkflowValidation(RestClient client, Template template return TestHelpers.makeRequest(client, "POST", WORKFLOW_URI, Collections.emptyMap(), template.toJson(), null); } + /** + * Helper method to invoke the Reprovision Workflow API + * @param client the rest client + * @param workflowId the document id + * @param templateFields the template to reprovision + * @throws Exception if the request fails + * @return a rest response + */ + protected Response reprovisionWorkflow(RestClient client, String workflowId, Template template) throws Exception { + return TestHelpers.makeRequest( + client, + "PUT", + String.format(Locale.ROOT, "%s/%s?reprovision=true", WORKFLOW_URI, workflowId), + Collections.emptyMap(), + template.toJson(), + null + ); + } + /** * Helper method to invoke the Update Workflow API * @param client the rest client diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java index 6a454dc75..747d9eabf 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java @@ -363,6 +363,212 @@ public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception { assertBusy(() -> { getAndAssertWorkflowStatusNotFound(client(), workflowId); }, 30, TimeUnit.SECONDS); } + public void testReprovisionWorkflow() throws Exception { + // Begin with a template to register a local pretrained model + Template template = TestHelpers.createTemplateFromFile("registerlocalmodel.json"); + + // Hit Create Workflow API to create agent-framework template, with template validation check and provision parameter + Response response; + if (!indexExistsWithAdminClient(".plugins-ml-config")) { + assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS); + response = createWorkflowWithProvision(client(), template); + } else { + response = createWorkflowWithProvision(client(), template); + } + assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); + Map responseMap = entityAsMap(response); + String workflowId = (String) responseMap.get(WORKFLOW_ID); + // wait and ensure state is completed/done + assertBusy( + () -> { getAndAssertWorkflowStatus(client(), workflowId, State.COMPLETED, ProvisioningProgress.DONE); }, + 120, + TimeUnit.SECONDS + ); + + // Wait until provisioning has completed successfully before attempting to retrieve created resources + List resourcesCreated = getResourcesCreated(client(), workflowId, 30); + assertEquals(2, resourcesCreated.size()); + assertEquals("register_local_pretrained_model", resourcesCreated.get(0).workflowStepName()); + + // Reprovision template to add ingest pipeline which uses the model ID + template = TestHelpers.createTemplateFromFile("registerlocalmodel-ingestpipeline.json"); + response = reprovisionWorkflow(client(), workflowId, template); + assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); + + resourcesCreated = getResourcesCreated(client(), workflowId, 10); + assertEquals(3, resourcesCreated.size()); + List resourceIds = resourcesCreated.stream().map(x -> x.workflowStepName()).collect(Collectors.toList()); + assertTrue(resourceIds.contains("register_local_pretrained_model")); + assertTrue(resourceIds.contains("create_ingest_pipeline")); + + // Retrieve pipeline by ID to ensure model ID is set correctly + String modelId = resourcesCreated.stream() + .filter(x -> x.workflowStepName().equals("register_local_pretrained_model")) + .findFirst() + .get() + .resourceId(); + String pipelineId = resourcesCreated.stream() + .filter(x -> x.workflowStepName().equals("create_ingest_pipeline")) + .findFirst() + .get() + .resourceId(); + GetPipelineResponse getPipelineResponse = getPipelines(pipelineId); + assertEquals(1, getPipelineResponse.pipelines().size()); + assertTrue(getPipelineResponse.pipelines().get(0).getConfigAsMap().toString().contains(modelId)); + + // Reprovision template to add index which uses default ingest pipeline + template = TestHelpers.createTemplateFromFile("registerlocalmodel-ingestpipeline-createindex.json"); + response = reprovisionWorkflow(client(), workflowId, template); + assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); + + resourcesCreated = getResourcesCreated(client(), workflowId, 10); + assertEquals(4, resourcesCreated.size()); + resourceIds = resourcesCreated.stream().map(x -> x.workflowStepName()).collect(Collectors.toList()); + assertTrue(resourceIds.contains("register_local_pretrained_model")); + assertTrue(resourceIds.contains("create_ingest_pipeline")); + assertTrue(resourceIds.contains("create_index")); + + // Retrieve index settings to ensure pipeline ID is set correctly + String indexName = resourcesCreated.stream() + .filter(x -> x.workflowStepName().equals("create_index")) + .findFirst() + .get() + .resourceId(); + Map indexSettings = getIndexSettingsAsMap(indexName); + assertEquals(pipelineId, indexSettings.get("index.default_pipeline")); + + // Reprovision template to remove default ingest pipeline + template = TestHelpers.createTemplateFromFile("registerlocalmodel-ingestpipeline-updateindex.json"); + response = reprovisionWorkflow(client(), workflowId, template); + assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); + + resourcesCreated = getResourcesCreated(client(), workflowId, 10); + // resource count should remain unchanged when updating an existing node + assertEquals(4, resourcesCreated.size()); + + // Retrieve index settings to ensure default pipeline has been updated correctly + indexSettings = getIndexSettingsAsMap(indexName); + assertEquals("_none", indexSettings.get("index.default_pipeline")); + } + + public void testReprovisionWorkflowMidNodeAddition() throws Exception { + // Begin with a template to register a local pretrained model and create an index, no edges + Template template = TestHelpers.createTemplateFromFile("registerlocalmodel-createindex.json"); + + // Hit Create Workflow API to create agent-framework template, with template validation check and provision parameter + Response response; + if (!indexExistsWithAdminClient(".plugins-ml-config")) { + assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS); + response = createWorkflowWithProvision(client(), template); + } else { + response = createWorkflowWithProvision(client(), template); + } + assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); + Map responseMap = entityAsMap(response); + String workflowId = (String) responseMap.get(WORKFLOW_ID); + // wait and ensure state is completed/done + assertBusy( + () -> { getAndAssertWorkflowStatus(client(), workflowId, State.COMPLETED, ProvisioningProgress.DONE); }, + 120, + TimeUnit.SECONDS + ); + + // Wait until provisioning has completed successfully before attempting to retrieve created resources + List resourcesCreated = getResourcesCreated(client(), workflowId, 30); + assertEquals(3, resourcesCreated.size()); + List resourceIds = resourcesCreated.stream().map(x -> x.workflowStepName()).collect(Collectors.toList()); + assertTrue(resourceIds.contains("register_local_pretrained_model")); + assertTrue(resourceIds.contains("create_index")); + + // Reprovision template to add ingest pipeline which uses the model ID + template = TestHelpers.createTemplateFromFile("registerlocalmodel-ingestpipeline-createindex.json"); + response = reprovisionWorkflow(client(), workflowId, template); + assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); + + resourcesCreated = getResourcesCreated(client(), workflowId, 10); + assertEquals(4, resourcesCreated.size()); + resourceIds = resourcesCreated.stream().map(x -> x.workflowStepName()).collect(Collectors.toList()); + assertTrue(resourceIds.contains("register_local_pretrained_model")); + assertTrue(resourceIds.contains("create_ingest_pipeline")); + assertTrue(resourceIds.contains("create_index")); + + // Ensure ingest pipeline configuration contains the model id and index settings have the ingest pipeline as default + String modelId = resourcesCreated.stream() + .filter(x -> x.workflowStepName().equals("register_local_pretrained_model")) + .findFirst() + .get() + .resourceId(); + String pipelineId = resourcesCreated.stream() + .filter(x -> x.workflowStepName().equals("create_ingest_pipeline")) + .findFirst() + .get() + .resourceId(); + GetPipelineResponse getPipelineResponse = getPipelines(pipelineId); + assertEquals(1, getPipelineResponse.pipelines().size()); + assertTrue(getPipelineResponse.pipelines().get(0).getConfigAsMap().toString().contains(modelId)); + + String indexName = resourcesCreated.stream() + .filter(x -> x.workflowStepName().equals("create_index")) + .findFirst() + .get() + .resourceId(); + Map indexSettings = getIndexSettingsAsMap(indexName); + assertEquals(pipelineId, indexSettings.get("index.default_pipeline")); + } + + public void testReprovisionWithNoChange() throws Exception { + Template template = TestHelpers.createTemplateFromFile("registerlocalmodel-ingestpipeline-createindex.json"); + + Response response; + if (!indexExistsWithAdminClient(".plugins-ml-config")) { + assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS); + response = createWorkflowWithProvision(client(), template); + } else { + response = createWorkflowWithProvision(client(), template); + } + assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); + Map responseMap = entityAsMap(response); + String workflowId = (String) responseMap.get(WORKFLOW_ID); + // wait and ensure state is completed/done + assertBusy( + () -> { getAndAssertWorkflowStatus(client(), workflowId, State.COMPLETED, ProvisioningProgress.DONE); }, + 120, + TimeUnit.SECONDS + ); + + // Attempt to reprovision the same template with no changes + ResponseException exception = expectThrows(ResponseException.class, () -> reprovisionWorkflow(client(), workflowId, template)); + assertEquals(RestStatus.BAD_REQUEST.getStatus(), exception.getResponse().getStatusLine().getStatusCode()); + assertTrue(exception.getMessage().contains("Template does not contain any modifications")); + } + + public void testReprovisionWithDeletion() throws Exception { + Template template = TestHelpers.createTemplateFromFile("registerlocalmodel-ingestpipeline-createindex.json"); + + Response response; + if (!indexExistsWithAdminClient(".plugins-ml-config")) { + assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS); + response = createWorkflowWithProvision(client(), template); + } else { + response = createWorkflowWithProvision(client(), template); + } + assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); + Map responseMap = entityAsMap(response); + String workflowId = (String) responseMap.get(WORKFLOW_ID); + // wait and ensure state is completed/done + assertBusy( + () -> { getAndAssertWorkflowStatus(client(), workflowId, State.COMPLETED, ProvisioningProgress.DONE); }, + 120, + TimeUnit.SECONDS + ); + + // Attempt to reprovision template without ingest pipeline node + Template templateWithoutIngestPipeline = TestHelpers.createTemplateFromFile("registerlocalmodel-createindex.json"); + ResponseException exception = expectThrows(ResponseException.class, () -> reprovisionWorkflow(client(), workflowId, templateWithoutIngestPipeline)); + assertEquals(RestStatus.BAD_REQUEST.getStatus(), exception.getResponse().getStatusLine().getStatusCode()); + assertTrue(exception.getMessage().contains("Workflow Step deletion is not supported when reprovisioning a template.")); + } + public void testTimestamps() throws Exception { Template noopTemplate = TestHelpers.createTemplateFromFile("noop.json"); // Create the template, should have created and updated matching diff --git a/src/test/resources/template/registerlocalmodel-createindex.json b/src/test/resources/template/registerlocalmodel-createindex.json new file mode 100644 index 000000000..8dad29432 --- /dev/null +++ b/src/test/resources/template/registerlocalmodel-createindex.json @@ -0,0 +1,59 @@ +{ + "name": "semantic search with local pretrained model", + "description": "Setting up semantic search, with a local pretrained embedding model", + "use_case": "SEMANTIC_SEARCH", + "version": { + "template": "1.0.0", + "compatibility": [ + "2.12.0", + "3.0.0" + ] + }, + "workflows": { + "provision": { + "nodes": [ + { + "id": "register_local_pretrained_model", + "type": "register_local_pretrained_model", + "user_inputs": { + "name": "huggingface/sentence-transformers/paraphrase-MiniLM-L3-v2", + "version": "1.0.1", + "description": "This is a sentence transformer model", + "model_format": "TORCH_SCRIPT", + "deploy": true + } + }, + { + "id": "create_index", + "type": "create_index", + "user_inputs": { + "index_name": "my-nlp-index", + "configurations": { + "settings": { + "index.knn": true, + "index.number_of_shards": "2" + }, + "mappings": { + "properties": { + "passage_embedding": { + "type": "knn_vector", + "dimension": "768", + "method": { + "engine": "lucene", + "space_type": "l2", + "name": "hnsw", + "parameters": {} + } + }, + "passage_text": { + "type": "text" + } + } + } + } + } + } + ] + } + } + } diff --git a/src/test/resources/template/registerlocalmodel-ingestpipeline-createindex.json b/src/test/resources/template/registerlocalmodel-ingestpipeline-createindex.json new file mode 100644 index 000000000..b60da54a2 --- /dev/null +++ b/src/test/resources/template/registerlocalmodel-ingestpipeline-createindex.json @@ -0,0 +1,86 @@ +{ + "name": "semantic search with local pretrained model", + "description": "Setting up semantic search, with a local pretrained embedding model", + "use_case": "SEMANTIC_SEARCH", + "version": { + "template": "1.0.0", + "compatibility": [ + "2.12.0", + "3.0.0" + ] + }, + "workflows": { + "provision": { + "nodes": [ + { + "id": "register_local_pretrained_model", + "type": "register_local_pretrained_model", + "user_inputs": { + "name": "huggingface/sentence-transformers/paraphrase-MiniLM-L3-v2", + "version": "1.0.1", + "description": "This is a sentence transformer model", + "model_format": "TORCH_SCRIPT", + "deploy": true + } + }, + { + "id": "create_ingest_pipeline", + "type": "create_ingest_pipeline", + "previous_node_inputs": { + "register_local_pretrained_model": "model_id" + }, + "user_inputs": { + "pipeline_id": "nlp-ingest-pipeline", + "configurations": { + "description": "A text embedding pipeline", + "processors": [ + { + "text_embedding": { + "model_id": "${{register_local_pretrained_model.model_id}}", + "field_map": { + "passage_text": "passage_embedding" + } + } + } + ] + } + } + }, + { + "id": "create_index", + "type": "create_index", + "previous_node_inputs": { + "create_ingest_pipeline": "pipeline_id" + }, + "user_inputs": { + "index_name": "my-nlp-index", + "configurations": { + "settings": { + "index.knn": true, + "default_pipeline": "${{create_ingest_pipeline.pipeline_id}}", + "index.number_of_shards": "2" + }, + "mappings": { + "properties": { + "passage_embedding": { + "type": "knn_vector", + "dimension": "768", + "method": { + "engine": "lucene", + "space_type": "l2", + "name": "hnsw", + "parameters": {} + } + }, + "passage_text": { + "type": "text" + } + } + } + } + } + } + ] + } + } + } diff --git a/src/test/resources/template/registerlocalmodel-ingestpipeline-updateindex.json b/src/test/resources/template/registerlocalmodel-ingestpipeline-updateindex.json new file mode 100644 index 000000000..5303af938 --- /dev/null +++ b/src/test/resources/template/registerlocalmodel-ingestpipeline-updateindex.json @@ -0,0 +1,86 @@ +{ + "name": "semantic search with local pretrained model", + "description": "Setting up semantic search, with a local pretrained embedding model", + "use_case": "SEMANTIC_SEARCH", + "version": { + "template": "1.0.0", + "compatibility": [ + "2.12.0", + "3.0.0" + ] + }, + "workflows": { + "provision": { + "nodes": [ + { + "id": "register_local_pretrained_model", + "type": "register_local_pretrained_model", + "user_inputs": { + "name": "huggingface/sentence-transformers/paraphrase-MiniLM-L3-v2", + "version": "1.0.1", + "description": "This is a sentence transformer model", + "model_format": "TORCH_SCRIPT", + "deploy": true + } + }, + { + "id": "create_ingest_pipeline", + "type": "create_ingest_pipeline", + "previous_node_inputs": { + "register_local_pretrained_model": "model_id" + }, + "user_inputs": { + "pipeline_id": "nlp-ingest-pipeline", + "configurations": { + "description": "A text embedding pipeline", + "processors": [ + { + "text_embedding": { + "model_id": "${{register_local_pretrained_model.model_id}}", + "field_map": { + "passage_text": "passage_embedding" + } + } + } + ] + } + } + }, + { + "id": "create_index", + "type": "create_index", + "previous_node_inputs": { + "create_ingest_pipeline": "pipeline_id" + }, + "user_inputs": { + "index_name": "my-nlp-index", + "configurations": { + "settings": { + "index.knn": true, + "default_pipeline": "_none", + "index.number_of_shards": "2" + }, + "mappings": { + "properties": { + "passage_embedding": { + "type": "knn_vector", + "dimension": "768", + "method": { + "engine": "lucene", + "space_type": "l2", + "name": "hnsw", + "parameters": {} + } + }, + "passage_text": { + "type": "text" + } + } + } + } + } + } + ] + } + } + } diff --git a/src/test/resources/template/registerlocalmodel-ingestpipeline.json b/src/test/resources/template/registerlocalmodel-ingestpipeline.json new file mode 100644 index 000000000..a4ceab116 --- /dev/null +++ b/src/test/resources/template/registerlocalmodel-ingestpipeline.json @@ -0,0 +1,52 @@ +{ + "name": "semantic search with local pretrained model", + "description": "Setting up semantic search, with a local pretrained embedding model", + "use_case": "SEMANTIC_SEARCH", + "version": { + "template": "1.0.0", + "compatibility": [ + "2.12.0", + "3.0.0" + ] + }, + "workflows": { + "provision": { + "nodes": [ + { + "id": "register_local_pretrained_model", + "type": "register_local_pretrained_model", + "user_inputs": { + "name": "huggingface/sentence-transformers/paraphrase-MiniLM-L3-v2", + "version": "1.0.1", + "description": "This is a sentence transformer model", + "model_format": "TORCH_SCRIPT", + "deploy": true + } + }, + { + "id": "create_ingest_pipeline", + "type": "create_ingest_pipeline", + "previous_node_inputs": { + "register_local_pretrained_model": "model_id" + }, + "user_inputs": { + "pipeline_id": "nlp-ingest-pipeline", + "configurations": { + "description": "A text embedding pipeline", + "processors": [ + { + "text_embedding": { + "model_id": "${{register_local_pretrained_model.model_id}}", + "field_map": { + "passage_text": "passage_embedding" + } + } + } + ] + } + } + } + ] + } + } + } diff --git a/src/test/resources/template/registerlocalmodel.json b/src/test/resources/template/registerlocalmodel.json new file mode 100644 index 000000000..8394f76ab --- /dev/null +++ b/src/test/resources/template/registerlocalmodel.json @@ -0,0 +1,29 @@ +{ + "name": "semantic search with local pretrained model", + "description": "Setting up semantic search, with a local pretrained embedding model", + "use_case": "SEMANTIC_SEARCH", + "version": { + "template": "1.0.0", + "compatibility": [ + "2.12.0", + "3.0.0" + ] + }, + "workflows": { + "provision": { + "nodes": [ + { + "id": "register_local_pretrained_model", + "type": "register_local_pretrained_model", + "user_inputs": { + "name": "huggingface/sentence-transformers/paraphrase-MiniLM-L3-v2", + "version": "1.0.1", + "description": "This is a sentence transformer model", + "model_format": "TORCH_SCRIPT", + "deploy": true + } + } + ] + } + } + } From 01c169a97717e1599bb811acb65ae83bb83fec03 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Tue, 13 Aug 2024 21:50:06 +0000 Subject: [PATCH 02/20] spotless Signed-off-by: Joshua Palis --- .../flowframework/rest/FlowFrameworkRestApiIT.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java index 747d9eabf..6eaea352d 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java @@ -564,7 +564,10 @@ public void testReprovisionWithDeletion() throws Exception { // Attempt to reprovision template without ingest pipeline node Template templateWithoutIngestPipeline = TestHelpers.createTemplateFromFile("registerlocalmodel-createindex.json"); - ResponseException exception = expectThrows(ResponseException.class, () -> reprovisionWorkflow(client(), workflowId, templateWithoutIngestPipeline)); + ResponseException exception = expectThrows( + ResponseException.class, + () -> reprovisionWorkflow(client(), workflowId, templateWithoutIngestPipeline) + ); assertEquals(RestStatus.BAD_REQUEST.getStatus(), exception.getResponse().getStatusLine().getStatusCode()); assertTrue(exception.getMessage().contains("Workflow Step deletion is not supported when reprovisioning a template.")); } From 0c0dae1f694af792f44b376d6dbfa1a34ce81bc7 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Tue, 13 Aug 2024 22:48:25 +0000 Subject: [PATCH 03/20] Adding deprovision/delete to reprovision integration tests Signed-off-by: Joshua Palis --- .../rest/FlowFrameworkRestApiIT.java | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java index 6eaea352d..5396669fc 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java @@ -449,6 +449,19 @@ public void testReprovisionWorkflow() throws Exception { // Retrieve index settings to ensure default pipeline has been updated correctly indexSettings = getIndexSettingsAsMap(indexName); assertEquals("_none", indexSettings.get("index.default_pipeline")); + + // Deprovision and delete all resources + Response deprovisionResponse = deprovisionWorkflowWithAllowDelete(client(), workflowId, pipelineId + "," + indexName); + assertBusy( + () -> { getAndAssertWorkflowStatus(client(), workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED); }, + 60, + TimeUnit.SECONDS + ); + assertEquals(RestStatus.OK, TestHelpers.restStatus(deprovisionResponse)); + + // Hit Delete API + Response deleteResponse = deleteWorkflow(client(), workflowId); + assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse)); } public void testReprovisionWorkflowMidNodeAddition() throws Exception { @@ -514,6 +527,19 @@ public void testReprovisionWorkflowMidNodeAddition() throws Exception { .resourceId(); Map indexSettings = getIndexSettingsAsMap(indexName); assertEquals(pipelineId, indexSettings.get("index.default_pipeline")); + + // Deprovision and delete all resources + Response deprovisionResponse = deprovisionWorkflowWithAllowDelete(client(), workflowId, pipelineId + "," + indexName); + assertBusy( + () -> { getAndAssertWorkflowStatus(client(), workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED); }, + 60, + TimeUnit.SECONDS + ); + assertEquals(RestStatus.OK, TestHelpers.restStatus(deprovisionResponse)); + + // Hit Delete API + Response deleteResponse = deleteWorkflow(client(), workflowId); + assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse)); } public void testReprovisionWithNoChange() throws Exception { From 6e5656e652a64c6137a8ef218abcba75ed2c88bc Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Tue, 13 Aug 2024 22:53:14 +0000 Subject: [PATCH 04/20] Adding deprovision/delete to reprovision failure tests Signed-off-by: Joshua Palis --- .../rest/FlowFrameworkRestApiIT.java | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java index 5396669fc..ba9f9f499 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java @@ -566,6 +566,23 @@ public void testReprovisionWithNoChange() throws Exception { ResponseException exception = expectThrows(ResponseException.class, () -> reprovisionWorkflow(client(), workflowId, template)); assertEquals(RestStatus.BAD_REQUEST.getStatus(), exception.getResponse().getStatusLine().getStatusCode()); assertTrue(exception.getMessage().contains("Template does not contain any modifications")); + + // Deprovision and delete all resources + Response deprovisionResponse = deprovisionWorkflowWithAllowDelete( + client(), + workflowId, + "nlp-ingest-pipeline" + "," + "my-nlp-index" + ); + assertBusy( + () -> { getAndAssertWorkflowStatus(client(), workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED); }, + 60, + TimeUnit.SECONDS + ); + assertEquals(RestStatus.OK, TestHelpers.restStatus(deprovisionResponse)); + + // Hit Delete API + Response deleteResponse = deleteWorkflow(client(), workflowId); + assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse)); } public void testReprovisionWithDeletion() throws Exception { @@ -596,6 +613,23 @@ public void testReprovisionWithDeletion() throws Exception { ); assertEquals(RestStatus.BAD_REQUEST.getStatus(), exception.getResponse().getStatusLine().getStatusCode()); assertTrue(exception.getMessage().contains("Workflow Step deletion is not supported when reprovisioning a template.")); + + // Deprovision and delete all resources + Response deprovisionResponse = deprovisionWorkflowWithAllowDelete( + client(), + workflowId, + "nlp-ingest-pipeline" + "," + "my-nlp-index" + ); + assertBusy( + () -> { getAndAssertWorkflowStatus(client(), workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED); }, + 60, + TimeUnit.SECONDS + ); + assertEquals(RestStatus.OK, TestHelpers.restStatus(deprovisionResponse)); + + // Hit Delete API + Response deleteResponse = deleteWorkflow(client(), workflowId); + assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse)); } public void testTimestamps() throws Exception { From 3dedcf1b2b8c05e8a6e8eb4197ca7ce0764b1379 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Tue, 13 Aug 2024 23:48:10 +0000 Subject: [PATCH 05/20] Using remote models rather than local models to reduce flakiness Signed-off-by: Joshua Palis --- .../rest/FlowFrameworkRestApiIT.java | 52 ++++---- .../registerlocalmodel-createindex.json | 59 ---------- ...localmodel-ingestpipeline-createindex.json | 86 -------------- .../registerlocalmodel-ingestpipeline.json | 52 -------- .../template/registerlocalmodel.json | 29 ----- ...n => registerremotemodel-createindex.json} | 54 ++++----- ...emotemodel-ingestpipeline-createindex.json | 111 ++++++++++++++++++ ...emotemodel-ingestpipeline-updateindex.json | 111 ++++++++++++++++++ .../registerremotemodel-ingestpipeline.json | 77 ++++++++++++ .../template/registerremotemodel.json | 54 +++++++++ 10 files changed, 408 insertions(+), 277 deletions(-) delete mode 100644 src/test/resources/template/registerlocalmodel-createindex.json delete mode 100644 src/test/resources/template/registerlocalmodel-ingestpipeline-createindex.json delete mode 100644 src/test/resources/template/registerlocalmodel-ingestpipeline.json delete mode 100644 src/test/resources/template/registerlocalmodel.json rename src/test/resources/template/{registerlocalmodel-ingestpipeline-updateindex.json => registerremotemodel-createindex.json} (57%) create mode 100644 src/test/resources/template/registerremotemodel-ingestpipeline-createindex.json create mode 100644 src/test/resources/template/registerremotemodel-ingestpipeline-updateindex.json create mode 100644 src/test/resources/template/registerremotemodel-ingestpipeline.json create mode 100644 src/test/resources/template/registerremotemodel.json diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java index ba9f9f499..28ca9e0b8 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java @@ -365,7 +365,7 @@ public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception { public void testReprovisionWorkflow() throws Exception { // Begin with a template to register a local pretrained model - Template template = TestHelpers.createTemplateFromFile("registerlocalmodel.json"); + Template template = TestHelpers.createTemplateFromFile("registerremotemodel.json"); // Hit Create Workflow API to create agent-framework template, with template validation check and provision parameter Response response; @@ -387,23 +387,26 @@ public void testReprovisionWorkflow() throws Exception { // Wait until provisioning has completed successfully before attempting to retrieve created resources List resourcesCreated = getResourcesCreated(client(), workflowId, 30); - assertEquals(2, resourcesCreated.size()); - assertEquals("register_local_pretrained_model", resourcesCreated.get(0).workflowStepName()); + assertEquals(3, resourcesCreated.size()); + List resourceIds = resourcesCreated.stream().map(x -> x.workflowStepName()).collect(Collectors.toList()); + assertTrue(resourceIds.contains("create_connector")); + assertTrue(resourceIds.contains("register_remote_model")); // Reprovision template to add ingest pipeline which uses the model ID - template = TestHelpers.createTemplateFromFile("registerlocalmodel-ingestpipeline.json"); + template = TestHelpers.createTemplateFromFile("registerremotemodel-ingestpipeline.json"); response = reprovisionWorkflow(client(), workflowId, template); assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); resourcesCreated = getResourcesCreated(client(), workflowId, 10); - assertEquals(3, resourcesCreated.size()); - List resourceIds = resourcesCreated.stream().map(x -> x.workflowStepName()).collect(Collectors.toList()); - assertTrue(resourceIds.contains("register_local_pretrained_model")); + assertEquals(4, resourcesCreated.size()); + resourceIds = resourcesCreated.stream().map(x -> x.workflowStepName()).collect(Collectors.toList()); + assertTrue(resourceIds.contains("create_connector")); + assertTrue(resourceIds.contains("register_remote_model")); assertTrue(resourceIds.contains("create_ingest_pipeline")); // Retrieve pipeline by ID to ensure model ID is set correctly String modelId = resourcesCreated.stream() - .filter(x -> x.workflowStepName().equals("register_local_pretrained_model")) + .filter(x -> x.workflowStepName().equals("register_remote_model")) .findFirst() .get() .resourceId(); @@ -417,14 +420,15 @@ public void testReprovisionWorkflow() throws Exception { assertTrue(getPipelineResponse.pipelines().get(0).getConfigAsMap().toString().contains(modelId)); // Reprovision template to add index which uses default ingest pipeline - template = TestHelpers.createTemplateFromFile("registerlocalmodel-ingestpipeline-createindex.json"); + template = TestHelpers.createTemplateFromFile("registerremotemodel-ingestpipeline-createindex.json"); response = reprovisionWorkflow(client(), workflowId, template); assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); resourcesCreated = getResourcesCreated(client(), workflowId, 10); - assertEquals(4, resourcesCreated.size()); + assertEquals(5, resourcesCreated.size()); resourceIds = resourcesCreated.stream().map(x -> x.workflowStepName()).collect(Collectors.toList()); - assertTrue(resourceIds.contains("register_local_pretrained_model")); + assertTrue(resourceIds.contains("create_connector")); + assertTrue(resourceIds.contains("register_remote_model")); assertTrue(resourceIds.contains("create_ingest_pipeline")); assertTrue(resourceIds.contains("create_index")); @@ -438,13 +442,13 @@ public void testReprovisionWorkflow() throws Exception { assertEquals(pipelineId, indexSettings.get("index.default_pipeline")); // Reprovision template to remove default ingest pipeline - template = TestHelpers.createTemplateFromFile("registerlocalmodel-ingestpipeline-updateindex.json"); + template = TestHelpers.createTemplateFromFile("registerremotemodel-ingestpipeline-updateindex.json"); response = reprovisionWorkflow(client(), workflowId, template); assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); resourcesCreated = getResourcesCreated(client(), workflowId, 10); // resource count should remain unchanged when updating an existing node - assertEquals(4, resourcesCreated.size()); + assertEquals(5, resourcesCreated.size()); // Retrieve index settings to ensure default pipeline has been updated correctly indexSettings = getIndexSettingsAsMap(indexName); @@ -466,7 +470,7 @@ public void testReprovisionWorkflow() throws Exception { public void testReprovisionWorkflowMidNodeAddition() throws Exception { // Begin with a template to register a local pretrained model and create an index, no edges - Template template = TestHelpers.createTemplateFromFile("registerlocalmodel-createindex.json"); + Template template = TestHelpers.createTemplateFromFile("registerremotemodel-createindex.json"); // Hit Create Workflow API to create agent-framework template, with template validation check and provision parameter Response response; @@ -488,26 +492,28 @@ public void testReprovisionWorkflowMidNodeAddition() throws Exception { // Wait until provisioning has completed successfully before attempting to retrieve created resources List resourcesCreated = getResourcesCreated(client(), workflowId, 30); - assertEquals(3, resourcesCreated.size()); + assertEquals(4, resourcesCreated.size()); List resourceIds = resourcesCreated.stream().map(x -> x.workflowStepName()).collect(Collectors.toList()); - assertTrue(resourceIds.contains("register_local_pretrained_model")); + assertTrue(resourceIds.contains("create_connector")); + assertTrue(resourceIds.contains("register_remote_model")); assertTrue(resourceIds.contains("create_index")); // Reprovision template to add ingest pipeline which uses the model ID - template = TestHelpers.createTemplateFromFile("registerlocalmodel-ingestpipeline-createindex.json"); + template = TestHelpers.createTemplateFromFile("registerremotemodel-ingestpipeline-createindex.json"); response = reprovisionWorkflow(client(), workflowId, template); assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); resourcesCreated = getResourcesCreated(client(), workflowId, 10); - assertEquals(4, resourcesCreated.size()); + assertEquals(5, resourcesCreated.size()); resourceIds = resourcesCreated.stream().map(x -> x.workflowStepName()).collect(Collectors.toList()); - assertTrue(resourceIds.contains("register_local_pretrained_model")); + assertTrue(resourceIds.contains("create_connector")); + assertTrue(resourceIds.contains("register_remote_model")); assertTrue(resourceIds.contains("create_ingest_pipeline")); assertTrue(resourceIds.contains("create_index")); // Ensure ingest pipeline configuration contains the model id and index settings have the ingest pipeline as default String modelId = resourcesCreated.stream() - .filter(x -> x.workflowStepName().equals("register_local_pretrained_model")) + .filter(x -> x.workflowStepName().equals("register_remote_model")) .findFirst() .get() .resourceId(); @@ -543,7 +549,7 @@ public void testReprovisionWorkflowMidNodeAddition() throws Exception { } public void testReprovisionWithNoChange() throws Exception { - Template template = TestHelpers.createTemplateFromFile("registerlocalmodel-ingestpipeline-createindex.json"); + Template template = TestHelpers.createTemplateFromFile("registerremotemodel-ingestpipeline-createindex.json"); Response response; if (!indexExistsWithAdminClient(".plugins-ml-config")) { @@ -586,7 +592,7 @@ public void testReprovisionWithNoChange() throws Exception { } public void testReprovisionWithDeletion() throws Exception { - Template template = TestHelpers.createTemplateFromFile("registerlocalmodel-ingestpipeline-createindex.json"); + Template template = TestHelpers.createTemplateFromFile("registerremotemodel-ingestpipeline-createindex.json"); Response response; if (!indexExistsWithAdminClient(".plugins-ml-config")) { @@ -606,7 +612,7 @@ public void testReprovisionWithDeletion() throws Exception { ); // Attempt to reprovision template without ingest pipeline node - Template templateWithoutIngestPipeline = TestHelpers.createTemplateFromFile("registerlocalmodel-createindex.json"); + Template templateWithoutIngestPipeline = TestHelpers.createTemplateFromFile("registerremotemodel-createindex.json"); ResponseException exception = expectThrows( ResponseException.class, () -> reprovisionWorkflow(client(), workflowId, templateWithoutIngestPipeline) diff --git a/src/test/resources/template/registerlocalmodel-createindex.json b/src/test/resources/template/registerlocalmodel-createindex.json deleted file mode 100644 index 8dad29432..000000000 --- a/src/test/resources/template/registerlocalmodel-createindex.json +++ /dev/null @@ -1,59 +0,0 @@ -{ - "name": "semantic search with local pretrained model", - "description": "Setting up semantic search, with a local pretrained embedding model", - "use_case": "SEMANTIC_SEARCH", - "version": { - "template": "1.0.0", - "compatibility": [ - "2.12.0", - "3.0.0" - ] - }, - "workflows": { - "provision": { - "nodes": [ - { - "id": "register_local_pretrained_model", - "type": "register_local_pretrained_model", - "user_inputs": { - "name": "huggingface/sentence-transformers/paraphrase-MiniLM-L3-v2", - "version": "1.0.1", - "description": "This is a sentence transformer model", - "model_format": "TORCH_SCRIPT", - "deploy": true - } - }, - { - "id": "create_index", - "type": "create_index", - "user_inputs": { - "index_name": "my-nlp-index", - "configurations": { - "settings": { - "index.knn": true, - "index.number_of_shards": "2" - }, - "mappings": { - "properties": { - "passage_embedding": { - "type": "knn_vector", - "dimension": "768", - "method": { - "engine": "lucene", - "space_type": "l2", - "name": "hnsw", - "parameters": {} - } - }, - "passage_text": { - "type": "text" - } - } - } - } - } - } - ] - } - } - } diff --git a/src/test/resources/template/registerlocalmodel-ingestpipeline-createindex.json b/src/test/resources/template/registerlocalmodel-ingestpipeline-createindex.json deleted file mode 100644 index b60da54a2..000000000 --- a/src/test/resources/template/registerlocalmodel-ingestpipeline-createindex.json +++ /dev/null @@ -1,86 +0,0 @@ -{ - "name": "semantic search with local pretrained model", - "description": "Setting up semantic search, with a local pretrained embedding model", - "use_case": "SEMANTIC_SEARCH", - "version": { - "template": "1.0.0", - "compatibility": [ - "2.12.0", - "3.0.0" - ] - }, - "workflows": { - "provision": { - "nodes": [ - { - "id": "register_local_pretrained_model", - "type": "register_local_pretrained_model", - "user_inputs": { - "name": "huggingface/sentence-transformers/paraphrase-MiniLM-L3-v2", - "version": "1.0.1", - "description": "This is a sentence transformer model", - "model_format": "TORCH_SCRIPT", - "deploy": true - } - }, - { - "id": "create_ingest_pipeline", - "type": "create_ingest_pipeline", - "previous_node_inputs": { - "register_local_pretrained_model": "model_id" - }, - "user_inputs": { - "pipeline_id": "nlp-ingest-pipeline", - "configurations": { - "description": "A text embedding pipeline", - "processors": [ - { - "text_embedding": { - "model_id": "${{register_local_pretrained_model.model_id}}", - "field_map": { - "passage_text": "passage_embedding" - } - } - } - ] - } - } - }, - { - "id": "create_index", - "type": "create_index", - "previous_node_inputs": { - "create_ingest_pipeline": "pipeline_id" - }, - "user_inputs": { - "index_name": "my-nlp-index", - "configurations": { - "settings": { - "index.knn": true, - "default_pipeline": "${{create_ingest_pipeline.pipeline_id}}", - "index.number_of_shards": "2" - }, - "mappings": { - "properties": { - "passage_embedding": { - "type": "knn_vector", - "dimension": "768", - "method": { - "engine": "lucene", - "space_type": "l2", - "name": "hnsw", - "parameters": {} - } - }, - "passage_text": { - "type": "text" - } - } - } - } - } - } - ] - } - } - } diff --git a/src/test/resources/template/registerlocalmodel-ingestpipeline.json b/src/test/resources/template/registerlocalmodel-ingestpipeline.json deleted file mode 100644 index a4ceab116..000000000 --- a/src/test/resources/template/registerlocalmodel-ingestpipeline.json +++ /dev/null @@ -1,52 +0,0 @@ -{ - "name": "semantic search with local pretrained model", - "description": "Setting up semantic search, with a local pretrained embedding model", - "use_case": "SEMANTIC_SEARCH", - "version": { - "template": "1.0.0", - "compatibility": [ - "2.12.0", - "3.0.0" - ] - }, - "workflows": { - "provision": { - "nodes": [ - { - "id": "register_local_pretrained_model", - "type": "register_local_pretrained_model", - "user_inputs": { - "name": "huggingface/sentence-transformers/paraphrase-MiniLM-L3-v2", - "version": "1.0.1", - "description": "This is a sentence transformer model", - "model_format": "TORCH_SCRIPT", - "deploy": true - } - }, - { - "id": "create_ingest_pipeline", - "type": "create_ingest_pipeline", - "previous_node_inputs": { - "register_local_pretrained_model": "model_id" - }, - "user_inputs": { - "pipeline_id": "nlp-ingest-pipeline", - "configurations": { - "description": "A text embedding pipeline", - "processors": [ - { - "text_embedding": { - "model_id": "${{register_local_pretrained_model.model_id}}", - "field_map": { - "passage_text": "passage_embedding" - } - } - } - ] - } - } - } - ] - } - } - } diff --git a/src/test/resources/template/registerlocalmodel.json b/src/test/resources/template/registerlocalmodel.json deleted file mode 100644 index 8394f76ab..000000000 --- a/src/test/resources/template/registerlocalmodel.json +++ /dev/null @@ -1,29 +0,0 @@ -{ - "name": "semantic search with local pretrained model", - "description": "Setting up semantic search, with a local pretrained embedding model", - "use_case": "SEMANTIC_SEARCH", - "version": { - "template": "1.0.0", - "compatibility": [ - "2.12.0", - "3.0.0" - ] - }, - "workflows": { - "provision": { - "nodes": [ - { - "id": "register_local_pretrained_model", - "type": "register_local_pretrained_model", - "user_inputs": { - "name": "huggingface/sentence-transformers/paraphrase-MiniLM-L3-v2", - "version": "1.0.1", - "description": "This is a sentence transformer model", - "model_format": "TORCH_SCRIPT", - "deploy": true - } - } - ] - } - } - } diff --git a/src/test/resources/template/registerlocalmodel-ingestpipeline-updateindex.json b/src/test/resources/template/registerremotemodel-createindex.json similarity index 57% rename from src/test/resources/template/registerlocalmodel-ingestpipeline-updateindex.json rename to src/test/resources/template/registerremotemodel-createindex.json index 5303af938..3005eeed1 100644 --- a/src/test/resources/template/registerlocalmodel-ingestpipeline-updateindex.json +++ b/src/test/resources/template/registerremotemodel-createindex.json @@ -13,51 +13,49 @@ "provision": { "nodes": [ { - "id": "register_local_pretrained_model", - "type": "register_local_pretrained_model", + "id": "create_openai_connector", + "type": "create_connector", "user_inputs": { - "name": "huggingface/sentence-transformers/paraphrase-MiniLM-L3-v2", - "version": "1.0.1", - "description": "This is a sentence transformer model", - "model_format": "TORCH_SCRIPT", - "deploy": true + "name": "OpenAI Chat Connector", + "description": "The connector to public OpenAI model service for text embedding model", + "version": "1", + "protocol": "http", + "parameters": { + "endpoint": "api.openai.com", + "model": "gpt-3.5-turbo", + "response_filter": "$.choices[0].message.content" + }, + "credential": { + "openAI_key": "12345" + }, + "actions": [ + { + "action_type": "predict", + "method": "POST", + "url": "https://${parameters.endpoint}/v1/chat/completions" + } + ] } }, { - "id": "create_ingest_pipeline", - "type": "create_ingest_pipeline", + "id": "register_openai_model", + "type": "register_remote_model", "previous_node_inputs": { - "register_local_pretrained_model": "model_id" + "create_openai_connector": "connector_id" }, "user_inputs": { - "pipeline_id": "nlp-ingest-pipeline", - "configurations": { - "description": "A text embedding pipeline", - "processors": [ - { - "text_embedding": { - "model_id": "${{register_local_pretrained_model.model_id}}", - "field_map": { - "passage_text": "passage_embedding" - } - } - } - ] - } + "name": "openAI-gpt-3.5-turbo", + "deploy": true } }, { "id": "create_index", "type": "create_index", - "previous_node_inputs": { - "create_ingest_pipeline": "pipeline_id" - }, "user_inputs": { "index_name": "my-nlp-index", "configurations": { "settings": { "index.knn": true, - "default_pipeline": "_none", "index.number_of_shards": "2" }, "mappings": { diff --git a/src/test/resources/template/registerremotemodel-ingestpipeline-createindex.json b/src/test/resources/template/registerremotemodel-ingestpipeline-createindex.json new file mode 100644 index 000000000..767da07b6 --- /dev/null +++ b/src/test/resources/template/registerremotemodel-ingestpipeline-createindex.json @@ -0,0 +1,111 @@ +{ + "name": "semantic search with local pretrained model", + "description": "Setting up semantic search, with a local pretrained embedding model", + "use_case": "SEMANTIC_SEARCH", + "version": { + "template": "1.0.0", + "compatibility": [ + "2.12.0", + "3.0.0" + ] + }, + "workflows": { + "provision": { + "nodes": [ + { + "id": "create_openai_connector", + "type": "create_connector", + "user_inputs": { + "name": "OpenAI Chat Connector", + "description": "The connector to public OpenAI model service for text embedding model", + "version": "1", + "protocol": "http", + "parameters": { + "endpoint": "api.openai.com", + "model": "gpt-3.5-turbo", + "response_filter": "$.choices[0].message.content" + }, + "credential": { + "openAI_key": "12345" + }, + "actions": [ + { + "action_type": "predict", + "method": "POST", + "url": "https://${parameters.endpoint}/v1/chat/completions" + } + ] + } + }, + { + "id": "register_openai_model", + "type": "register_remote_model", + "previous_node_inputs": { + "create_openai_connector": "connector_id" + }, + "user_inputs": { + "name": "openAI-gpt-3.5-turbo", + "deploy": true + } + }, + { + "id": "create_ingest_pipeline", + "type": "create_ingest_pipeline", + "previous_node_inputs": { + "register_openai_model": "model_id" + }, + "user_inputs": { + "pipeline_id": "nlp-ingest-pipeline", + "configurations": { + "description": "A text embedding pipeline", + "processors": [ + { + "text_embedding": { + "model_id": "${{register_openai_model.model_id}}", + "field_map": { + "passage_text": "passage_embedding" + } + } + } + ] + } + } + }, + { + "id": "create_index", + "type": "create_index", + "previous_node_inputs": { + "create_ingest_pipeline": "pipeline_id" + }, + "user_inputs": { + "index_name": "my-nlp-index", + "configurations": { + "settings": { + "index.knn": true, + "default_pipeline": "${{create_ingest_pipeline.pipeline_id}}", + "index.number_of_shards": "2" + }, + "mappings": { + "properties": { + "passage_embedding": { + "type": "knn_vector", + "dimension": "768", + "method": { + "engine": "lucene", + "space_type": "l2", + "name": "hnsw", + "parameters": {} + } + }, + "passage_text": { + "type": "text" + } + } + } + } + } + } + ] + } + } +} diff --git a/src/test/resources/template/registerremotemodel-ingestpipeline-updateindex.json b/src/test/resources/template/registerremotemodel-ingestpipeline-updateindex.json new file mode 100644 index 000000000..fc873ae66 --- /dev/null +++ b/src/test/resources/template/registerremotemodel-ingestpipeline-updateindex.json @@ -0,0 +1,111 @@ +{ + "name": "semantic search with local pretrained model", + "description": "Setting up semantic search, with a local pretrained embedding model", + "use_case": "SEMANTIC_SEARCH", + "version": { + "template": "1.0.0", + "compatibility": [ + "2.12.0", + "3.0.0" + ] + }, + "workflows": { + "provision": { + "nodes": [ + { + "id": "create_openai_connector", + "type": "create_connector", + "user_inputs": { + "name": "OpenAI Chat Connector", + "description": "The connector to public OpenAI model service for text embedding model", + "version": "1", + "protocol": "http", + "parameters": { + "endpoint": "api.openai.com", + "model": "gpt-3.5-turbo", + "response_filter": "$.choices[0].message.content" + }, + "credential": { + "openAI_key": "12345" + }, + "actions": [ + { + "action_type": "predict", + "method": "POST", + "url": "https://${parameters.endpoint}/v1/chat/completions" + } + ] + } + }, + { + "id": "register_openai_model", + "type": "register_remote_model", + "previous_node_inputs": { + "create_openai_connector": "connector_id" + }, + "user_inputs": { + "name": "openAI-gpt-3.5-turbo", + "deploy": true + } + }, + { + "id": "create_ingest_pipeline", + "type": "create_ingest_pipeline", + "previous_node_inputs": { + "register_openai_model": "model_id" + }, + "user_inputs": { + "pipeline_id": "nlp-ingest-pipeline", + "configurations": { + "description": "A text embedding pipeline", + "processors": [ + { + "text_embedding": { + "model_id": "${{register_openai_model.model_id}}", + "field_map": { + "passage_text": "passage_embedding" + } + } + } + ] + } + } + }, + { + "id": "create_index", + "type": "create_index", + "previous_node_inputs": { + "create_ingest_pipeline": "pipeline_id" + }, + "user_inputs": { + "index_name": "my-nlp-index", + "configurations": { + "settings": { + "index.knn": true, + "default_pipeline": "_none", + "index.number_of_shards": "2" + }, + "mappings": { + "properties": { + "passage_embedding": { + "type": "knn_vector", + "dimension": "768", + "method": { + "engine": "lucene", + "space_type": "l2", + "name": "hnsw", + "parameters": {} + } + }, + "passage_text": { + "type": "text" + } + } + } + } + } + } + ] + } + } +} diff --git a/src/test/resources/template/registerremotemodel-ingestpipeline.json b/src/test/resources/template/registerremotemodel-ingestpipeline.json new file mode 100644 index 000000000..dede163c1 --- /dev/null +++ b/src/test/resources/template/registerremotemodel-ingestpipeline.json @@ -0,0 +1,77 @@ +{ + "name": "semantic search with local pretrained model", + "description": "Setting up semantic search, with a local pretrained embedding model", + "use_case": "SEMANTIC_SEARCH", + "version": { + "template": "1.0.0", + "compatibility": [ + "2.12.0", + "3.0.0" + ] + }, + "workflows": { + "provision": { + "nodes": [ + { + "id": "create_openai_connector", + "type": "create_connector", + "user_inputs": { + "name": "OpenAI Chat Connector", + "description": "The connector to public OpenAI model service for text embedding model", + "version": "1", + "protocol": "http", + "parameters": { + "endpoint": "api.openai.com", + "model": "gpt-3.5-turbo", + "response_filter": "$.choices[0].message.content" + }, + "credential": { + "openAI_key": "12345" + }, + "actions": [ + { + "action_type": "predict", + "method": "POST", + "url": "https://${parameters.endpoint}/v1/chat/completions" + } + ] + } + }, + { + "id": "register_openai_model", + "type": "register_remote_model", + "previous_node_inputs": { + "create_openai_connector": "connector_id" + }, + "user_inputs": { + "name": "openAI-gpt-3.5-turbo", + "deploy": true + } + }, + { + "id": "create_ingest_pipeline", + "type": "create_ingest_pipeline", + "previous_node_inputs": { + "register_openai_model": "model_id" + }, + "user_inputs": { + "pipeline_id": "nlp-ingest-pipeline", + "configurations": { + "description": "A text embedding pipeline", + "processors": [ + { + "text_embedding": { + "model_id": "${{register_openai_model.model_id}}", + "field_map": { + "passage_text": "passage_embedding" + } + } + } + ] + } + } + } + ] + } + } +} diff --git a/src/test/resources/template/registerremotemodel.json b/src/test/resources/template/registerremotemodel.json new file mode 100644 index 000000000..58c520af4 --- /dev/null +++ b/src/test/resources/template/registerremotemodel.json @@ -0,0 +1,54 @@ +{ + "name": "semantic search with local pretrained model", + "description": "Setting up semantic search, with a local pretrained embedding model", + "use_case": "SEMANTIC_SEARCH", + "version": { + "template": "1.0.0", + "compatibility": [ + "2.12.0", + "3.0.0" + ] + }, + "workflows": { + "provision": { + "nodes": [ + { + "id": "create_openai_connector", + "type": "create_connector", + "user_inputs": { + "name": "OpenAI Chat Connector", + "description": "The connector to public OpenAI model service for text embedding model", + "version": "1", + "protocol": "http", + "parameters": { + "endpoint": "api.openai.com", + "model": "gpt-3.5-turbo", + "response_filter": "$.choices[0].message.content" + }, + "credential": { + "openAI_key": "12345" + }, + "actions": [ + { + "action_type": "predict", + "method": "POST", + "url": "https://${parameters.endpoint}/v1/chat/completions" + } + ] + } + }, + { + "id": "register_openai_model", + "type": "register_remote_model", + "previous_node_inputs": { + "create_openai_connector": "connector_id" + }, + "user_inputs": { + "name": "openAI-gpt-3.5-turbo", + "deploy": true + } + } + ] + } + } + } From 0e87cd94727f77c4ad8527c799184452e15d31b4 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Wed, 14 Aug 2024 18:43:09 +0000 Subject: [PATCH 06/20] Fixing forbiddenApis check Signed-off-by: Joshua Palis --- .../java/org/opensearch/flowframework/model/Template.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/model/Template.java b/src/main/java/org/opensearch/flowframework/model/Template.java index f4d2ae600..988f49161 100644 --- a/src/main/java/org/opensearch/flowframework/model/Template.java +++ b/src/main/java/org/opensearch/flowframework/model/Template.java @@ -8,7 +8,6 @@ */ package org.opensearch.flowframework.model; -import org.apache.logging.log4j.util.Strings; import org.opensearch.Version; import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.json.JsonXContent; @@ -372,10 +371,10 @@ public static Template updateExistingTemplate(Template existingTemplate, Templat if (templateWithNewFields.name() != null) { builder.name(templateWithNewFields.name()); } - if (!Strings.isBlank(templateWithNewFields.description())) { + if (!templateWithNewFields.description().isBlank()) { builder.description(templateWithNewFields.description()); } - if (!Strings.isBlank(templateWithNewFields.useCase())) { + if (!templateWithNewFields.useCase().isBlank()) { builder.useCase(templateWithNewFields.useCase()); } if (templateWithNewFields.templateVersion() != null) { From 019f45d2a91a804a2f69240eeb40815481462c1e Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Wed, 14 Aug 2024 19:11:46 +0000 Subject: [PATCH 07/20] Fixing forbiddenAPI check, addressing PR comments Signed-off-by: Joshua Palis --- .../opensearch/flowframework/TestHelpers.java | 3 +- .../rest/FlowFrameworkRestApiIT.java | 103 +++--------------- 2 files changed, 14 insertions(+), 92 deletions(-) diff --git a/src/test/java/org/opensearch/flowframework/TestHelpers.java b/src/test/java/org/opensearch/flowframework/TestHelpers.java index 6c4f3534b..22a2bb82f 100644 --- a/src/test/java/org/opensearch/flowframework/TestHelpers.java +++ b/src/test/java/org/opensearch/flowframework/TestHelpers.java @@ -11,7 +11,6 @@ import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpEntity; import org.apache.hc.core5.http.io.entity.StringEntity; -import org.apache.logging.log4j.util.Strings; import org.opensearch.client.Request; import org.opensearch.client.RequestOptions; import org.opensearch.client.Response; @@ -74,7 +73,7 @@ public static Response makeRequest( String jsonEntity, List
headers ) throws IOException { - HttpEntity httpEntity = Strings.isBlank(jsonEntity) ? null : new StringEntity(jsonEntity, APPLICATION_JSON); + HttpEntity httpEntity = jsonEntity.isBlank() ? null : new StringEntity(jsonEntity, APPLICATION_JSON); return makeRequest(client, method, endpoint, params, httpEntity, headers); } diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java index 28ca9e0b8..da55517b6 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java @@ -37,9 +37,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static org.opensearch.flowframework.common.CommonValue.CREATE_CONNECTOR_CREDENTIAL_KEY; @@ -48,15 +46,12 @@ import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID; public class FlowFrameworkRestApiIT extends FlowFrameworkRestTestCase { - private static AtomicBoolean waitToStart = new AtomicBoolean(true); @Before public void waitToStart() throws Exception { // ML Commons cron job runs every 10 seconds and takes 20+ seconds to initialize .plugins-ml-config index - // Delay on the first attempt for 25 seconds to allow this initialization and prevent flaky tests - if (waitToStart.getAndSet(false)) { - CountDownLatch latch = new CountDownLatch(1); - latch.await(25, TimeUnit.SECONDS); + if (!indexExistsWithAdminClient(".plugins-ml-config")) { + assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS); } } @@ -93,14 +88,7 @@ public void testFailedUpdateWorkflow() throws Exception { Map responseMap = entityAsMap(response); String workflowId = (String) responseMap.get(WORKFLOW_ID); - // Ensure Ml config index is initialized as creating a connector requires this, then hit Provision API and assert status - Response provisionResponse; - if (!indexExistsWithAdminClient(".plugins-ml-config")) { - assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS); - provisionResponse = provisionWorkflow(client(), workflowId); - } else { - provisionResponse = provisionWorkflow(client(), workflowId); - } + Response provisionResponse = provisionResponse = provisionWorkflow(client(), workflowId); assertEquals(RestStatus.OK, TestHelpers.restStatus(provisionResponse)); getAndAssertWorkflowStatus(client(), workflowId, State.PROVISIONING, ProvisioningProgress.IN_PROGRESS); @@ -122,14 +110,7 @@ public void testUpdateWorkflowUsingFields() throws Exception { Map responseMap = entityAsMap(response); String workflowId = (String) responseMap.get(WORKFLOW_ID); - // Ensure Ml config index is initialized as creating a connector requires this, then hit Provision API and assert status - Response provisionResponse; - if (!indexExistsWithAdminClient(".plugins-ml-config")) { - assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS); - provisionResponse = provisionWorkflow(client(), workflowId); - } else { - provisionResponse = provisionWorkflow(client(), workflowId); - } + Response provisionResponse = provisionWorkflow(client(), workflowId); assertEquals(RestStatus.OK, TestHelpers.restStatus(provisionResponse)); getAndAssertWorkflowStatus(client(), workflowId, State.PROVISIONING, ProvisioningProgress.IN_PROGRESS); @@ -259,14 +240,7 @@ public void testCreateAndProvisionRemoteModelWorkflow() throws Exception { String workflowId = (String) responseMap.get(WORKFLOW_ID); getAndAssertWorkflowStatus(client(), workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED); - // Ensure Ml config index is initialized as creating a connector requires this, then hit Provision API and assert status - if (!indexExistsWithAdminClient(".plugins-ml-config")) { - assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS); - response = provisionWorkflow(client(), workflowId); - } else { - response = provisionWorkflow(client(), workflowId); - } - + response = provisionWorkflow(client(), workflowId); assertEquals(RestStatus.OK, TestHelpers.restStatus(response)); getAndAssertWorkflowStatus(client(), workflowId, State.PROVISIONING, ProvisioningProgress.IN_PROGRESS); @@ -294,13 +268,7 @@ public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception { Template template = TestHelpers.createTemplateFromFile("agent-framework.json"); // Hit Create Workflow API to create agent-framework template, with template validation check and provision parameter - Response response; - if (!indexExistsWithAdminClient(".plugins-ml-config")) { - assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS); - response = createWorkflowWithProvision(client(), template); - } else { - response = createWorkflowWithProvision(client(), template); - } + Response response = createWorkflowWithProvision(client(), template); assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); Map responseMap = entityAsMap(response); String workflowId = (String) responseMap.get(WORKFLOW_ID); @@ -368,13 +336,7 @@ public void testReprovisionWorkflow() throws Exception { Template template = TestHelpers.createTemplateFromFile("registerremotemodel.json"); // Hit Create Workflow API to create agent-framework template, with template validation check and provision parameter - Response response; - if (!indexExistsWithAdminClient(".plugins-ml-config")) { - assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS); - response = createWorkflowWithProvision(client(), template); - } else { - response = createWorkflowWithProvision(client(), template); - } + Response response = createWorkflowWithProvision(client(), template); assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); Map responseMap = entityAsMap(response); String workflowId = (String) responseMap.get(WORKFLOW_ID); @@ -473,13 +435,7 @@ public void testReprovisionWorkflowMidNodeAddition() throws Exception { Template template = TestHelpers.createTemplateFromFile("registerremotemodel-createindex.json"); // Hit Create Workflow API to create agent-framework template, with template validation check and provision parameter - Response response; - if (!indexExistsWithAdminClient(".plugins-ml-config")) { - assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS); - response = createWorkflowWithProvision(client(), template); - } else { - response = createWorkflowWithProvision(client(), template); - } + Response response = createWorkflowWithProvision(client(), template); assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); Map responseMap = entityAsMap(response); String workflowId = (String) responseMap.get(WORKFLOW_ID); @@ -551,13 +507,7 @@ public void testReprovisionWorkflowMidNodeAddition() throws Exception { public void testReprovisionWithNoChange() throws Exception { Template template = TestHelpers.createTemplateFromFile("registerremotemodel-ingestpipeline-createindex.json"); - Response response; - if (!indexExistsWithAdminClient(".plugins-ml-config")) { - assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS); - response = createWorkflowWithProvision(client(), template); - } else { - response = createWorkflowWithProvision(client(), template); - } + Response response = createWorkflowWithProvision(client(), template); assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); Map responseMap = entityAsMap(response); String workflowId = (String) responseMap.get(WORKFLOW_ID); @@ -594,13 +544,7 @@ public void testReprovisionWithNoChange() throws Exception { public void testReprovisionWithDeletion() throws Exception { Template template = TestHelpers.createTemplateFromFile("registerremotemodel-ingestpipeline-createindex.json"); - Response response; - if (!indexExistsWithAdminClient(".plugins-ml-config")) { - assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS); - response = createWorkflowWithProvision(client(), template); - } else { - response = createWorkflowWithProvision(client(), template); - } + Response response = createWorkflowWithProvision(client(), template); assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); Map responseMap = entityAsMap(response); String workflowId = (String) responseMap.get(WORKFLOW_ID); @@ -696,14 +640,7 @@ public void testCreateAndProvisionIngestAndSearchPipeline() throws Exception { String workflowId = (String) responseMap.get(WORKFLOW_ID); getAndAssertWorkflowStatus(client(), workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED); - // Ensure Ml config index is initialized as creating a connector requires this, then hit Provision API and assert status - if (!indexExistsWithAdminClient(".plugins-ml-config")) { - assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS); - response = provisionWorkflow(client(), workflowId); - } else { - response = provisionWorkflow(client(), workflowId); - } - + response = provisionWorkflow(client(), workflowId); assertEquals(RestStatus.OK, TestHelpers.restStatus(response)); getAndAssertWorkflowStatus(client(), workflowId, State.PROVISIONING, ProvisioningProgress.IN_PROGRESS); @@ -750,14 +687,7 @@ public void testDefaultCohereUseCase() throws Exception { String workflowId = (String) responseMap.get(WORKFLOW_ID); getAndAssertWorkflowStatus(client(), workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED); - // Ensure Ml config index is initialized as creating a connector requires this, then hit Provision API and assert status - if (!indexExistsWithAdminClient(".plugins-ml-config")) { - assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS); - response = provisionWorkflow(client(), workflowId); - } else { - response = provisionWorkflow(client(), workflowId); - } - + response = provisionWorkflow(client(), workflowId); assertEquals(RestStatus.OK, TestHelpers.restStatus(response)); getAndAssertWorkflowStatus(client(), workflowId, State.PROVISIONING, ProvisioningProgress.IN_PROGRESS); @@ -801,14 +731,7 @@ public void testDefaultSemanticSearchUseCaseWithFailureExpected() throws Excepti String workflowId = (String) responseMap.get(WORKFLOW_ID); getAndAssertWorkflowStatus(client(), workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED); - // Ensure Ml config index is initialized as creating a connector requires this, then hit Provision API and assert status - if (!indexExistsWithAdminClient(".plugins-ml-config")) { - assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS); - response = provisionWorkflow(client(), workflowId); - } else { - response = provisionWorkflow(client(), workflowId); - } - + response = provisionWorkflow(client(), workflowId); assertEquals(RestStatus.OK, TestHelpers.restStatus(response)); getAndAssertWorkflowStatus(client(), workflowId, State.FAILED, ProvisioningProgress.FAILED); } From 5ae86038473a91c847bf4cc7f56152482ba8c940 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Wed, 14 Aug 2024 20:30:35 +0000 Subject: [PATCH 08/20] Fixing forbiddenAPIs main Signed-off-by: Joshua Palis --- .../java/org/opensearch/flowframework/model/Template.java | 5 +++-- src/test/java/org/opensearch/flowframework/TestHelpers.java | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/model/Template.java b/src/main/java/org/opensearch/flowframework/model/Template.java index 988f49161..4c8c2c9ef 100644 --- a/src/main/java/org/opensearch/flowframework/model/Template.java +++ b/src/main/java/org/opensearch/flowframework/model/Template.java @@ -13,6 +13,7 @@ import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.common.xcontent.yaml.YamlXContent; import org.opensearch.commons.authuser.User; +import org.opensearch.core.common.Strings; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContentObject; @@ -371,10 +372,10 @@ public static Template updateExistingTemplate(Template existingTemplate, Templat if (templateWithNewFields.name() != null) { builder.name(templateWithNewFields.name()); } - if (!templateWithNewFields.description().isBlank()) { + if (Strings.hasText(templateWithNewFields.description())) { builder.description(templateWithNewFields.description()); } - if (!templateWithNewFields.useCase().isBlank()) { + if (Strings.hasText(templateWithNewFields.useCase())) { builder.useCase(templateWithNewFields.useCase()); } if (templateWithNewFields.templateVersion() != null) { diff --git a/src/test/java/org/opensearch/flowframework/TestHelpers.java b/src/test/java/org/opensearch/flowframework/TestHelpers.java index 22a2bb82f..6d136f7a6 100644 --- a/src/test/java/org/opensearch/flowframework/TestHelpers.java +++ b/src/test/java/org/opensearch/flowframework/TestHelpers.java @@ -23,6 +23,7 @@ import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.common.xcontent.XContentType; import org.opensearch.commons.authuser.User; +import org.opensearch.core.common.Strings; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.ToXContent; @@ -73,7 +74,7 @@ public static Response makeRequest( String jsonEntity, List
headers ) throws IOException { - HttpEntity httpEntity = jsonEntity.isBlank() ? null : new StringEntity(jsonEntity, APPLICATION_JSON); + HttpEntity httpEntity = !Strings.hasText(jsonEntity) ? null : new StringEntity(jsonEntity, APPLICATION_JSON); return makeRequest(client, method, endpoint, params, httpEntity, headers); } From 85c8e187e1541c69d6bd6400b21528cac8a35136 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Wed, 14 Aug 2024 21:04:39 +0000 Subject: [PATCH 09/20] increasing getResource timeout Signed-off-by: Joshua Palis --- .../flowframework/rest/FlowFrameworkRestApiIT.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java index da55517b6..0a01e5181 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java @@ -359,7 +359,7 @@ public void testReprovisionWorkflow() throws Exception { response = reprovisionWorkflow(client(), workflowId, template); assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); - resourcesCreated = getResourcesCreated(client(), workflowId, 10); + resourcesCreated = getResourcesCreated(client(), workflowId, 30); assertEquals(4, resourcesCreated.size()); resourceIds = resourcesCreated.stream().map(x -> x.workflowStepName()).collect(Collectors.toList()); assertTrue(resourceIds.contains("create_connector")); @@ -386,7 +386,7 @@ public void testReprovisionWorkflow() throws Exception { response = reprovisionWorkflow(client(), workflowId, template); assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); - resourcesCreated = getResourcesCreated(client(), workflowId, 10); + resourcesCreated = getResourcesCreated(client(), workflowId, 30); assertEquals(5, resourcesCreated.size()); resourceIds = resourcesCreated.stream().map(x -> x.workflowStepName()).collect(Collectors.toList()); assertTrue(resourceIds.contains("create_connector")); @@ -408,7 +408,7 @@ public void testReprovisionWorkflow() throws Exception { response = reprovisionWorkflow(client(), workflowId, template); assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); - resourcesCreated = getResourcesCreated(client(), workflowId, 10); + resourcesCreated = getResourcesCreated(client(), workflowId, 30); // resource count should remain unchanged when updating an existing node assertEquals(5, resourcesCreated.size()); @@ -459,7 +459,7 @@ public void testReprovisionWorkflowMidNodeAddition() throws Exception { response = reprovisionWorkflow(client(), workflowId, template); assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); - resourcesCreated = getResourcesCreated(client(), workflowId, 10); + resourcesCreated = getResourcesCreated(client(), workflowId, 30); assertEquals(5, resourcesCreated.size()); resourceIds = resourcesCreated.stream().map(x -> x.workflowStepName()).collect(Collectors.toList()); assertTrue(resourceIds.contains("create_connector")); From 62c7852469897fa54308e95f6c28240746865b65 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Wed, 14 Aug 2024 21:41:50 +0000 Subject: [PATCH 10/20] Addressing PR comments Signed-off-by: Joshua Palis --- .../rest/FlowFrameworkRestApiIT.java | 82 +++++++------------ 1 file changed, 29 insertions(+), 53 deletions(-) diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java index 0a01e5181..d176adc3b 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java @@ -335,7 +335,6 @@ public void testReprovisionWorkflow() throws Exception { // Begin with a template to register a local pretrained model Template template = TestHelpers.createTemplateFromFile("registerremotemodel.json"); - // Hit Create Workflow API to create agent-framework template, with template validation check and provision parameter Response response = createWorkflowWithProvision(client(), template); assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); Map responseMap = entityAsMap(response); @@ -350,9 +349,10 @@ public void testReprovisionWorkflow() throws Exception { // Wait until provisioning has completed successfully before attempting to retrieve created resources List resourcesCreated = getResourcesCreated(client(), workflowId, 30); assertEquals(3, resourcesCreated.size()); - List resourceIds = resourcesCreated.stream().map(x -> x.workflowStepName()).collect(Collectors.toList()); - assertTrue(resourceIds.contains("create_connector")); - assertTrue(resourceIds.contains("register_remote_model")); + Map resourceMap = resourcesCreated.stream() + .collect(Collectors.toMap(ResourceCreated::workflowStepName, r -> r)); + assertTrue(resourceMap.containsKey("create_connector")); + assertTrue(resourceMap.containsKey("register_remote_model")); // Reprovision template to add ingest pipeline which uses the model ID template = TestHelpers.createTemplateFromFile("registerremotemodel-ingestpipeline.json"); @@ -361,22 +361,14 @@ public void testReprovisionWorkflow() throws Exception { resourcesCreated = getResourcesCreated(client(), workflowId, 30); assertEquals(4, resourcesCreated.size()); - resourceIds = resourcesCreated.stream().map(x -> x.workflowStepName()).collect(Collectors.toList()); - assertTrue(resourceIds.contains("create_connector")); - assertTrue(resourceIds.contains("register_remote_model")); - assertTrue(resourceIds.contains("create_ingest_pipeline")); + resourceMap = resourcesCreated.stream().collect(Collectors.toMap(ResourceCreated::workflowStepName, r -> r)); + assertTrue(resourceMap.containsKey("create_connector")); + assertTrue(resourceMap.containsKey("register_remote_model")); + assertTrue(resourceMap.containsKey("create_ingest_pipeline")); // Retrieve pipeline by ID to ensure model ID is set correctly - String modelId = resourcesCreated.stream() - .filter(x -> x.workflowStepName().equals("register_remote_model")) - .findFirst() - .get() - .resourceId(); - String pipelineId = resourcesCreated.stream() - .filter(x -> x.workflowStepName().equals("create_ingest_pipeline")) - .findFirst() - .get() - .resourceId(); + String modelId = resourceMap.get("register_remote_model").resourceId(); + String pipelineId = resourceMap.get("create_ingest_pipeline").resourceId(); GetPipelineResponse getPipelineResponse = getPipelines(pipelineId); assertEquals(1, getPipelineResponse.pipelines().size()); assertTrue(getPipelineResponse.pipelines().get(0).getConfigAsMap().toString().contains(modelId)); @@ -388,18 +380,14 @@ public void testReprovisionWorkflow() throws Exception { resourcesCreated = getResourcesCreated(client(), workflowId, 30); assertEquals(5, resourcesCreated.size()); - resourceIds = resourcesCreated.stream().map(x -> x.workflowStepName()).collect(Collectors.toList()); - assertTrue(resourceIds.contains("create_connector")); - assertTrue(resourceIds.contains("register_remote_model")); - assertTrue(resourceIds.contains("create_ingest_pipeline")); - assertTrue(resourceIds.contains("create_index")); + resourceMap = resourcesCreated.stream().collect(Collectors.toMap(ResourceCreated::workflowStepName, r -> r)); + assertTrue(resourceMap.containsKey("create_connector")); + assertTrue(resourceMap.containsKey("register_remote_model")); + assertTrue(resourceMap.containsKey("create_ingest_pipeline")); + assertTrue(resourceMap.containsKey("create_index")); // Retrieve index settings to ensure pipeline ID is set correctly - String indexName = resourcesCreated.stream() - .filter(x -> x.workflowStepName().equals("create_index")) - .findFirst() - .get() - .resourceId(); + String indexName = resourceMap.get("create_index").resourceId(); Map indexSettings = getIndexSettingsAsMap(indexName); assertEquals(pipelineId, indexSettings.get("index.default_pipeline")); @@ -434,7 +422,6 @@ public void testReprovisionWorkflowMidNodeAddition() throws Exception { // Begin with a template to register a local pretrained model and create an index, no edges Template template = TestHelpers.createTemplateFromFile("registerremotemodel-createindex.json"); - // Hit Create Workflow API to create agent-framework template, with template validation check and provision parameter Response response = createWorkflowWithProvision(client(), template); assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); Map responseMap = entityAsMap(response); @@ -449,10 +436,11 @@ public void testReprovisionWorkflowMidNodeAddition() throws Exception { // Wait until provisioning has completed successfully before attempting to retrieve created resources List resourcesCreated = getResourcesCreated(client(), workflowId, 30); assertEquals(4, resourcesCreated.size()); - List resourceIds = resourcesCreated.stream().map(x -> x.workflowStepName()).collect(Collectors.toList()); - assertTrue(resourceIds.contains("create_connector")); - assertTrue(resourceIds.contains("register_remote_model")); - assertTrue(resourceIds.contains("create_index")); + Map resourceMap = resourcesCreated.stream() + .collect(Collectors.toMap(ResourceCreated::workflowStepName, r -> r)); + assertTrue(resourceMap.containsKey("create_connector")); + assertTrue(resourceMap.containsKey("register_remote_model")); + assertTrue(resourceMap.containsKey("create_index")); // Reprovision template to add ingest pipeline which uses the model ID template = TestHelpers.createTemplateFromFile("registerremotemodel-ingestpipeline-createindex.json"); @@ -461,32 +449,20 @@ public void testReprovisionWorkflowMidNodeAddition() throws Exception { resourcesCreated = getResourcesCreated(client(), workflowId, 30); assertEquals(5, resourcesCreated.size()); - resourceIds = resourcesCreated.stream().map(x -> x.workflowStepName()).collect(Collectors.toList()); - assertTrue(resourceIds.contains("create_connector")); - assertTrue(resourceIds.contains("register_remote_model")); - assertTrue(resourceIds.contains("create_ingest_pipeline")); - assertTrue(resourceIds.contains("create_index")); + resourceMap = resourcesCreated.stream().collect(Collectors.toMap(ResourceCreated::workflowStepName, r -> r)); + assertTrue(resourceMap.containsKey("create_connector")); + assertTrue(resourceMap.containsKey("register_remote_model")); + assertTrue(resourceMap.containsKey("create_ingest_pipeline")); + assertTrue(resourceMap.containsKey("create_index")); // Ensure ingest pipeline configuration contains the model id and index settings have the ingest pipeline as default - String modelId = resourcesCreated.stream() - .filter(x -> x.workflowStepName().equals("register_remote_model")) - .findFirst() - .get() - .resourceId(); - String pipelineId = resourcesCreated.stream() - .filter(x -> x.workflowStepName().equals("create_ingest_pipeline")) - .findFirst() - .get() - .resourceId(); + String modelId = resourceMap.get("register_remote_model").resourceId(); + String pipelineId = resourceMap.get("create_ingest_pipeline").resourceId(); GetPipelineResponse getPipelineResponse = getPipelines(pipelineId); assertEquals(1, getPipelineResponse.pipelines().size()); assertTrue(getPipelineResponse.pipelines().get(0).getConfigAsMap().toString().contains(modelId)); - String indexName = resourcesCreated.stream() - .filter(x -> x.workflowStepName().equals("create_index")) - .findFirst() - .get() - .resourceId(); + String indexName = resourceMap.get("create_index").resourceId(); Map indexSettings = getIndexSettingsAsMap(indexName); assertEquals(pipelineId, indexSettings.get("index.default_pipeline")); From fb5a3f578b520d83b3828f45865ea515298b6c21 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Thu, 15 Aug 2024 21:48:09 +0000 Subject: [PATCH 11/20] Fixing multi-node integration tests Signed-off-by: Joshua Palis --- .../java/org/opensearch/flowframework/util/ParseUtils.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java index e20b2ed3b..3e06bbd71 100644 --- a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java +++ b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java @@ -529,6 +529,11 @@ public static void flattenSettings(String prefix, Map settings, if (value instanceof Map) { flattenSettings(key, (Map) value, flattenedSettings); } else { + // Create index setting configuration can be a mix of flattened or expanded settings + // prepend index. to ensure successful setting comparison + if (!key.startsWith("index.")) { + key = "index." + key; + } flattenedSettings.put(key, value.toString()); } } From 4e525abaf186ea91d4a7da609ca9284f10743328 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Thu, 15 Aug 2024 22:12:05 +0000 Subject: [PATCH 12/20] fixing multi-node integration tests Signed-off-by: Joshua Palis --- .../org/opensearch/flowframework/util/ParseUtils.java | 5 ----- .../flowframework/workflow/UpdateIndexStep.java | 10 +++++++++- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java index 3e06bbd71..e20b2ed3b 100644 --- a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java +++ b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java @@ -529,11 +529,6 @@ public static void flattenSettings(String prefix, Map settings, if (value instanceof Map) { flattenSettings(key, (Map) value, flattenedSettings); } else { - // Create index setting configuration can be a mix of flattened or expanded settings - // prepend index. to ensure successful setting comparison - if (!key.startsWith("index.")) { - key = "index." + key; - } flattenedSettings.put(key, value.toString()); } } diff --git a/src/main/java/org/opensearch/flowframework/workflow/UpdateIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/UpdateIndexStep.java index 9d35a32ce..88a78fd1e 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/UpdateIndexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/UpdateIndexStep.java @@ -113,7 +113,15 @@ public PlainActionFuture execute( if (updatedSettings.containsKey("index")) { ParseUtils.flattenSettings("", updatedSettings, flattenedSettings); } else { - flattenedSettings.putAll(updatedSettings); + // Create index setting configuration can be a mix of flattened or expanded settings + // prepend index. to ensure successful setting comparison + updatedSettings.entrySet().stream().map(x -> { + if (!x.getKey().startsWith("index.")) { + flattenedSettings.put("index." + x.getKey(), x.getValue()); + } else { + flattenedSettings.put(x.getKey(), x.getValue()); + } + }); } Map filteredSettings = new HashMap<>(); From ca6b1830f6ee38cc7f11a7d471b27a5d746c1cd0 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Thu, 15 Aug 2024 22:18:02 +0000 Subject: [PATCH 13/20] Fixing syntax error Signed-off-by: Joshua Palis --- .../org/opensearch/flowframework/workflow/UpdateIndexStep.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/opensearch/flowframework/workflow/UpdateIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/UpdateIndexStep.java index 88a78fd1e..03950b6dc 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/UpdateIndexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/UpdateIndexStep.java @@ -115,7 +115,7 @@ public PlainActionFuture execute( } else { // Create index setting configuration can be a mix of flattened or expanded settings // prepend index. to ensure successful setting comparison - updatedSettings.entrySet().stream().map(x -> { + updatedSettings.entrySet().stream().forEach(x -> { if (!x.getKey().startsWith("index.")) { flattenedSettings.put("index." + x.getKey(), x.getValue()); } else { From d6a95bc2d7660fd6265223d2314fadc5bdc37527 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Thu, 15 Aug 2024 22:32:44 +0000 Subject: [PATCH 14/20] Blocking reprovision requests with substitution params Signed-off-by: Joshua Palis --- .../rest/RestCreateWorkflowAction.java | 7 +++++++ .../rest/RestCreateWorkflowActionTests.java | 14 ++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java index 032b4b898..8acfab16a 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java @@ -138,6 +138,13 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli ); return processError(ffe, params, request); } + if (reprovision && !params.isEmpty()) { + FlowFrameworkException ffe = new FlowFrameworkException( + "Only the parameters " + request.consumedParams() + " are permitted unless the provision parameter is set to true.", + RestStatus.BAD_REQUEST + ); + return processError(ffe, params, request); + } try { Template template; Map useCaseDefaultsMap = Collections.emptyMap(); diff --git a/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java b/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java index e4f22e947..f6b1a5fc7 100644 --- a/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java +++ b/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java @@ -178,6 +178,20 @@ public void testCreateWorkflowRequestWithCreateAndReprovision() throws Exception ); } + public void testCreateWorkflowRequestWithReprovisionAndSubstitutionParams() throws Exception { + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) + .withPath(this.createWorkflowPath) + .withParams(Map.ofEntries(Map.entry(REPROVISION_WORKFLOW, "true"), Map.entry("open_ai_key", "1234"))) + .withContent(new BytesArray(validTemplate), MediaTypeRegistry.JSON) + .build(); + FakeRestChannel channel = new FakeRestChannel(request, false, 1); + createWorkflowRestAction.handleRequest(request, channel, nodeClient); + assertEquals(RestStatus.BAD_REQUEST, channel.capturedResponse().status()); + assertTrue( + channel.capturedResponse().content().utf8ToString().contains("are permitted unless the provision parameter is set to true.") + ); + } + public void testCreateWorkflowRequestWithUpdateAndParams() throws Exception { RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) .withPath(this.createWorkflowPath) From d02955bc67d6276986cc11676710c004c1873de7 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Thu, 15 Aug 2024 23:15:22 +0000 Subject: [PATCH 15/20] Fixes update settings request issue for multi-node Signed-off-by: Joshua Palis --- .../workflow/UpdateIndexStep.java | 46 ++++++++++--------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/workflow/UpdateIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/UpdateIndexStep.java index 03950b6dc..dbd683d6e 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/UpdateIndexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/UpdateIndexStep.java @@ -141,35 +141,39 @@ public PlainActionFuture execute( filteredSettings.put(e.getKey(), e.getValue()); } } + + // Create and send the update settings request + updateSettingsRequest.settings(filteredSettings); + if (updateSettingsRequest.settings().size() == 0) { + String errorMessage = "Failed to update index settings for index " + + indexName + + ", no settings have been updated"; + updateIndexFuture.onFailure(new WorkflowStepException(errorMessage, RestStatus.BAD_REQUEST)); + } else { + client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(acknowledgedResponse -> { + String resourceName = getResourceByWorkflowStep(getName()); + logger.info("Updated index settings for index {}", indexName); + updateIndexFuture.onResponse( + new WorkflowData(Map.of(resourceName, indexName), currentNodeInputs.getWorkflowId(), currentNodeId) + ); + + }, ex -> { + Exception e = getSafeException(ex); + String errorMessage = (e == null + ? "Failed to update the index settings for index " + indexName + : e.getMessage()); + logger.error(errorMessage, e); + updateIndexFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); + })); + } }, ex -> { Exception e = getSafeException(ex); String errorMessage = (e == null ? "Failed to retrieve the index settings for index " + indexName : e.getMessage()); logger.error(errorMessage, e); updateIndexFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); })); - - updateSettingsRequest.settings(filteredSettings); } } - - if (updateSettingsRequest.settings().size() == 0) { - String errorMessage = "Failed to update index settings for index " + indexName + ", no settings have been updated"; - throw new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST); - } else { - client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(acknowledgedResponse -> { - String resourceName = getResourceByWorkflowStep(getName()); - logger.info("Updated index settings for index {}", indexName); - updateIndexFuture.onResponse( - new WorkflowData(Map.of(resourceName, indexName), currentNodeInputs.getWorkflowId(), currentNodeId) - ); - - }, ex -> { - Exception e = getSafeException(ex); - String errorMessage = (e == null ? "Failed to update the index settings for index " + indexName : e.getMessage()); - logger.error(errorMessage, e); - updateIndexFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); - })); - } } catch (Exception e) { updateIndexFuture.onFailure(new WorkflowStepException(e.getMessage(), ExceptionsHelper.status(e))); } From 9e4e61db0fd444d6fc4abb81af71f23f708920c4 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Fri, 16 Aug 2024 00:00:46 +0000 Subject: [PATCH 16/20] Increasing test coverage Signed-off-by: Joshua Palis --- .../flowframework/util/ParseUtils.java | 16 ++++++++++++++++ .../flowframework/workflow/UpdateIndexStep.java | 9 ++------- .../flowframework/util/ParseUtilsTests.java | 17 +++++++++++++++++ 3 files changed, 35 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java index e20b2ed3b..a63220433 100644 --- a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java +++ b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java @@ -533,4 +533,20 @@ public static void flattenSettings(String prefix, Map settings, } } } + + /** + * Ensures index is prepended to flattened setting keys + * @param originalSettings the original settings map + */ + public static Map prependIndexToSettings(Map originalSettings) { + Map newSettings = new HashMap<>(); + originalSettings.entrySet().stream().forEach(x -> { + if (!x.getKey().startsWith("index.")) { + newSettings.put("index." + x.getKey(), x.getValue()); + } else { + newSettings.put(x.getKey(), x.getValue()); + } + }); + return newSettings; + } } diff --git a/src/main/java/org/opensearch/flowframework/workflow/UpdateIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/UpdateIndexStep.java index dbd683d6e..719ef7237 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/UpdateIndexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/UpdateIndexStep.java @@ -115,13 +115,8 @@ public PlainActionFuture execute( } else { // Create index setting configuration can be a mix of flattened or expanded settings // prepend index. to ensure successful setting comparison - updatedSettings.entrySet().stream().forEach(x -> { - if (!x.getKey().startsWith("index.")) { - flattenedSettings.put("index." + x.getKey(), x.getValue()); - } else { - flattenedSettings.put(x.getKey(), x.getValue()); - } - }); + + flattenedSettings.putAll(ParseUtils.prependIndexToSettings(updatedSettings)); } Map filteredSettings = new HashMap<>(); diff --git a/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java b/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java index 1cdb0c50e..ade0f872d 100644 --- a/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java +++ b/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java @@ -330,4 +330,21 @@ public void testFlattenSettings() throws Exception { assertTrue(flattenedSettings.entrySet().stream().allMatch(x -> x.getKey().startsWith("index."))); } + + public void testPrependIndexToSettings() throws Exception { + + Map indexSettingsMap = Map.ofEntries( + Map.entry("knn", "true"), + Map.entry("number_of_shards", "2"), + Map.entry("number_of_replicas", "1"), + Map.entry("default_pipeline", "_none"), + Map.entry("search", Map.of("default_pipeine", "_none")) + ); + Map prependedSettings = ParseUtils.prependIndexToSettings(indexSettingsMap); + assertEquals(5, prependedSettings.size()); + + // every setting should start with index + assertTrue(prependedSettings.entrySet().stream().allMatch(x -> x.getKey().startsWith("index."))); + + } } From 45d01b8d6fe22378c12629af60883ee9b866e449 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Fri, 16 Aug 2024 00:03:28 +0000 Subject: [PATCH 17/20] Adding return to javadoc Signed-off-by: Joshua Palis --- src/main/java/org/opensearch/flowframework/util/ParseUtils.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java index a63220433..16e8b25e1 100644 --- a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java +++ b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java @@ -537,6 +537,7 @@ public static void flattenSettings(String prefix, Map settings, /** * Ensures index is prepended to flattened setting keys * @param originalSettings the original settings map + * @return new map with keys prepended with index */ public static Map prependIndexToSettings(Map originalSettings) { Map newSettings = new HashMap<>(); From f66524d7eb72ba18253d1eca842ce0f6a0f834e8 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Fri, 16 Aug 2024 00:31:26 +0000 Subject: [PATCH 18/20] Adding test coverage Signed-off-by: Joshua Palis --- .../flowframework/util/ParseUtilsTests.java | 2 +- .../workflow/UpdateIndexStepTests.java | 52 +++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java b/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java index ade0f872d..8237a7a93 100644 --- a/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java +++ b/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java @@ -337,7 +337,7 @@ public void testPrependIndexToSettings() throws Exception { Map.entry("knn", "true"), Map.entry("number_of_shards", "2"), Map.entry("number_of_replicas", "1"), - Map.entry("default_pipeline", "_none"), + Map.entry("index.default_pipeline", "_none"), Map.entry("search", Map.of("default_pipeine", "_none")) ); Map prependedSettings = ParseUtils.prependIndexToSettings(indexSettingsMap); diff --git a/src/test/java/org/opensearch/flowframework/workflow/UpdateIndexStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/UpdateIndexStepTests.java index 7dade5607..68d800cef 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/UpdateIndexStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/UpdateIndexStepTests.java @@ -12,6 +12,7 @@ import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.AdminClient; import org.opensearch.client.Client; import org.opensearch.client.IndicesAdminClient; @@ -107,6 +108,57 @@ public void testUpdateIndexStepWithUpdatedSettings() throws ExecutionException, assertEquals("_none", settingsToUpdate.get("index.search.default_pipeline")); } + public void testFailedToUpdateIndexSettings() throws ExecutionException, InterruptedException, IOException { + + UpdateIndexStep updateIndexStep = new UpdateIndexStep(client); + + String indexName = "test-index"; + + // Create existing settings for default pipelines + Settings.Builder builder = Settings.builder(); + builder.put("index.number_of_shards", 2); + builder.put("index.number_of_replicas", 1); + builder.put("index.knn", true); + builder.put("index.default_pipeline", "ingest_pipeline_id"); + builder.put("index.search.default_pipeline", "search_pipeline_id"); + Map indexToSettings = new HashMap<>(); + indexToSettings.put(indexName, builder.build()); + + // Stub get index settings request/response + doAnswer(invocation -> { + ActionListener getSettingsResponseListener = invocation.getArgument(1); + getSettingsResponseListener.onResponse(new GetSettingsResponse(indexToSettings, indexToSettings)); + return null; + }).when(indicesAdminClient).getSettings(any(), any()); + + doAnswer(invocation -> { + ActionListener ackResponseListener = invocation.getArgument(1); + ackResponseListener.onFailure(new Exception("")); + return null; + }).when(indicesAdminClient).updateSettings(any(), any()); + + // Configurations has updated search/ingest pipeline default values of _none + String configurations = + "{\"settings\":{\"index\":{\"knn\":true,\"number_of_shards\":2,\"number_of_replicas\":1,\"default_pipeline\":\"_none\",\"search\":{\"default_pipeline\":\"_none\"}}},\"mappings\":{\"properties\":{\"age\":{\"type\":\"integer\"}}},\"aliases\":{\"sample-alias1\":{}}}"; + WorkflowData data = new WorkflowData( + Map.ofEntries(Map.entry(INDEX_NAME, indexName), Map.entry(CONFIGURATIONS, configurations)), + "test-id", + "test-node-id" + ); + PlainActionFuture future = updateIndexStep.execute( + data.getNodeId(), + data, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ); + + assertTrue(future.isDone()); + ExecutionException exception = assertThrows(ExecutionException.class, () -> future.get()); + assertTrue(exception.getCause() instanceof Exception); + assertEquals("Failed to update the index settings for index test-index", exception.getCause().getMessage()); + } + public void testMissingSettings() throws InterruptedException { UpdateIndexStep updateIndexStep = new UpdateIndexStep(client); From d8179028a8d5a6ac900b514f3094318b2c6a5219 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Fri, 16 Aug 2024 00:48:21 +0000 Subject: [PATCH 19/20] Increasing test coverage Signed-off-by: Joshua Palis --- .../workflow/UpdateIndexStepTests.java | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/src/test/java/org/opensearch/flowframework/workflow/UpdateIndexStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/UpdateIndexStepTests.java index 68d800cef..ebbf9ebca 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/UpdateIndexStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/UpdateIndexStepTests.java @@ -188,6 +188,55 @@ public void testMissingSettings() throws InterruptedException { ); } + public void testUpdateMixedSettings() throws InterruptedException { + UpdateIndexStep updateIndexStep = new UpdateIndexStep(client); + + String indexName = "test-index"; + + // Create existing settings for default pipelines + Settings.Builder builder = Settings.builder(); + builder.put("index.number_of_shards", 2); + builder.put("index.number_of_replicas", 1); + builder.put("index.knn", true); + builder.put("index.default_pipeline", "ingest_pipeline_id"); + Map indexToSettings = new HashMap<>(); + indexToSettings.put(indexName, builder.build()); + + // Stub get index settings request/response + doAnswer(invocation -> { + ActionListener getSettingsResponseListener = invocation.getArgument(1); + getSettingsResponseListener.onResponse(new GetSettingsResponse(indexToSettings, indexToSettings)); + return null; + }).when(indicesAdminClient).getSettings(any(), any()); + + // validate update settings request content + @SuppressWarnings({ "unchecked" }) + ArgumentCaptor updateSettingsRequestCaptor = ArgumentCaptor.forClass(UpdateSettingsRequest.class); + + // Configurations has updated ingest pipeline default values of _none. Settings have regular and full names + String configurations = + "{\"settings\":{\"index.knn\":true,\"default_pipeline\":\"_none\",\"index.number_of_shards\":2,\"index.number_of_replicas\":1},\"mappings\":{\"properties\":{\"age\":{\"type\":\"integer\"}}},\"aliases\":{\"sample-alias1\":{}}}"; + WorkflowData data = new WorkflowData( + Map.ofEntries(Map.entry(INDEX_NAME, indexName), Map.entry(CONFIGURATIONS, configurations)), + "test-id", + "test-node-id" + ); + PlainActionFuture future = updateIndexStep.execute( + data.getNodeId(), + data, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ); + + verify(indicesAdminClient, times(1)).getSettings(any(GetSettingsRequest.class), any()); + verify(indicesAdminClient, times(1)).updateSettings(updateSettingsRequestCaptor.capture(), any()); + + Settings settingsToUpdate = updateSettingsRequestCaptor.getValue().settings(); + assertEquals(1, settingsToUpdate.size()); + assertEquals("_none", settingsToUpdate.get("index.default_pipeline")); + } + public void testEmptyConfiguration() throws InterruptedException { UpdateIndexStep updateIndexStep = new UpdateIndexStep(client); From 558e37423a3bca19a2b27824beb18fadb15a7dd6 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Fri, 16 Aug 2024 01:13:08 +0000 Subject: [PATCH 20/20] Increasing test coverage Signed-off-by: Joshua Palis --- .../flowframework/workflow/UpdateIndexStepTests.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/test/java/org/opensearch/flowframework/workflow/UpdateIndexStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/UpdateIndexStepTests.java index ebbf9ebca..e4ea939ea 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/UpdateIndexStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/UpdateIndexStepTests.java @@ -79,6 +79,12 @@ public void testUpdateIndexStepWithUpdatedSettings() throws ExecutionException, return null; }).when(indicesAdminClient).getSettings(any(), any()); + doAnswer(invocation -> { + ActionListener ackResponseListener = invocation.getArgument(1); + ackResponseListener.onResponse(new AcknowledgedResponse(true)); + return null; + }).when(indicesAdminClient).updateSettings(any(), any()); + // validate update settings request content @SuppressWarnings({ "unchecked" }) ArgumentCaptor updateSettingsRequestCaptor = ArgumentCaptor.forClass(UpdateSettingsRequest.class); @@ -106,6 +112,12 @@ public void testUpdateIndexStepWithUpdatedSettings() throws ExecutionException, assertEquals(2, settingsToUpdate.size()); assertEquals("_none", settingsToUpdate.get("index.default_pipeline")); assertEquals("_none", settingsToUpdate.get("index.search.default_pipeline")); + + assertTrue(future.isDone()); + WorkflowData returnedData = (WorkflowData) future.get(); + assertEquals(Map.ofEntries(Map.entry(INDEX_NAME, indexName)), returnedData.getContent()); + assertEquals(data.getWorkflowId(), returnedData.getWorkflowId()); + assertEquals(data.getNodeId(), returnedData.getNodeId()); } public void testFailedToUpdateIndexSettings() throws ExecutionException, InterruptedException, IOException {