diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a2c0552e..cc6ec38b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,3 +28,4 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ### Maintenance ### Refactoring - Improve error messages for workflow states other than NOT_STARTED ([#642](https://github.com/opensearch-project/flow-framework/pull/642)) +- Create a Config XContent model for Config index ([#679](https://github.com/opensearch-project/flow-framework/pull/679)) diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index 4c8486b7e..2a728fcb1 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -112,7 +112,7 @@ public Collection createComponents( Settings settings = environment.settings(); flowFrameworkSettings = new FlowFrameworkSettings(clusterService, settings); MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client); - EncryptorUtils encryptorUtils = new EncryptorUtils(clusterService, client); + EncryptorUtils encryptorUtils = new EncryptorUtils(clusterService, client, xContentRegistry); FlowFrameworkIndicesHandler flowFrameworkIndicesHandler = new FlowFrameworkIndicesHandler( client, clusterService, diff --git a/src/main/java/org/opensearch/flowframework/model/Config.java b/src/main/java/org/opensearch/flowframework/model/Config.java new file mode 100644 index 000000000..b0b8ef032 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/model/Config.java @@ -0,0 +1,100 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.model; + +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.util.ParseUtils; + +import java.io.IOException; +import java.time.Instant; + +import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.opensearch.flowframework.common.CommonValue.CREATE_TIME; +import static org.opensearch.flowframework.common.CommonValue.MASTER_KEY; + +/** + * Flow Framework Configuration + */ +public class Config implements ToXContentObject { + + private final String masterKey; + private final Instant createTime; + + /** + * Instantiate this object + * + * @param masterKey The encryption master key + * @param createTime The config creation time + */ + public Config(String masterKey, Instant createTime) { + this.masterKey = masterKey; + this.createTime = createTime; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + XContentBuilder xContentBuilder = builder.startObject(); + xContentBuilder.field(MASTER_KEY, this.masterKey); + xContentBuilder.field(CREATE_TIME, this.createTime.toEpochMilli()); + return xContentBuilder.endObject(); + } + + /** + * Parse raw xContent into a Config instance. + * + * @param parser xContent based content parser + * @return an instance of the config + * @throws IOException if content can't be parsed correctly + */ + public static Config parse(XContentParser parser) throws IOException { + String masterKey = null; + Instant createTime = Instant.now(); + + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case MASTER_KEY: + masterKey = parser.text(); + break; + case CREATE_TIME: + createTime = ParseUtils.parseInstant(parser); + break; + default: + throw new FlowFrameworkException( + "Unable to parse field [" + fieldName + "] in a config object.", + RestStatus.BAD_REQUEST + ); + } + } + if (masterKey == null) { + throw new FlowFrameworkException("The config object requires a master key.", RestStatus.BAD_REQUEST); + } + return new Config(masterKey, createTime); + } + + /** + * @return the masterKey + */ + public String masterKey() { + return masterKey; + } + + /** + * @return the createTime + */ + public Instant createTime() { + return createTime; + } +} diff --git a/src/main/java/org/opensearch/flowframework/util/EncryptorUtils.java b/src/main/java/org/opensearch/flowframework/util/EncryptorUtils.java index df7e66e07..d8dbf7750 100644 --- a/src/main/java/org/opensearch/flowframework/util/EncryptorUtils.java +++ b/src/main/java/org/opensearch/flowframework/util/EncryptorUtils.java @@ -17,9 +17,15 @@ import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.model.Config; import org.opensearch.flowframework.model.Template; import org.opensearch.flowframework.model.Workflow; import org.opensearch.flowframework.model.WorkflowNode; @@ -41,8 +47,8 @@ import com.amazonaws.encryptionsdk.CryptoResult; import com.amazonaws.encryptionsdk.jce.JceMasterKey; +import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; import static org.opensearch.flowframework.common.CommonValue.CONFIG_INDEX; -import static org.opensearch.flowframework.common.CommonValue.CREATE_TIME; import static org.opensearch.flowframework.common.CommonValue.CREDENTIAL_FIELD; import static org.opensearch.flowframework.common.CommonValue.MASTER_KEY; @@ -59,19 +65,21 @@ public class EncryptorUtils { // https://github.com/aws/aws-encryption-sdk-java/issues/1879 private static final String WRAPPING_ALGORITHM = "AES/GCM/NOPADDING"; - private ClusterService clusterService; - private Client client; + private final ClusterService clusterService; + private final Client client; private String masterKey; + private final NamedXContentRegistry xContentRegistry; /** * Instantiates a new EncryptorUtils object * @param clusterService the cluster service * @param client the node client */ - public EncryptorUtils(ClusterService clusterService, Client client) { + public EncryptorUtils(ClusterService clusterService, Client client, NamedXContentRegistry xContentRegistry) { this.masterKey = null; this.clusterService = clusterService; this.client = client; + this.xContentRegistry = xContentRegistry; } /** @@ -239,23 +247,24 @@ public void initializeMasterKey(ActionListener listener) { // generate // This is necessary in case of global context index restoration from snapshot, will need to use the same master key to decrypt // stored credentials - try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { - + try ( + XContentBuilder builder = XContentFactory.jsonBuilder(); + ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext() + ) { + // Using the master_key string as the document id GetRequest getRequest = new GetRequest(CONFIG_INDEX).id(MASTER_KEY); client.get(getRequest, ActionListener.wrap(getResponse -> { if (!getResponse.isExists()) { - - // Generate new key and index - final String generatedKey = generateMasterKey(); + Config config = new Config(generateMasterKey(), Instant.now()); IndexRequest masterKeyIndexRequest = new IndexRequest(CONFIG_INDEX).id(MASTER_KEY) - .source(Map.ofEntries(Map.entry(MASTER_KEY, generatedKey), Map.entry(CREATE_TIME, Instant.now().toEpochMilli()))) + .source(config.toXContent(builder, ToXContent.EMPTY_PARAMS)) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); client.index(masterKeyIndexRequest, ActionListener.wrap(indexResponse -> { // Set generated key to master logger.info("Config has been initialized successfully"); - this.masterKey = generatedKey; + this.masterKey = config.masterKey(); listener.onResponse(true); }, indexException -> { logger.error("Failed to index config", indexException); @@ -264,9 +273,20 @@ public void initializeMasterKey(ActionListener listener) { } else { // Set existing key to master - logger.info("Config has already been initialized"); - this.masterKey = (String) getResponse.getSourceAsMap().get(MASTER_KEY); - listener.onResponse(true); + logger.debug("Config has already been initialized, fetching key"); + try ( + XContentParser parser = ParseUtils.createXContentParserFromRegistry( + xContentRegistry, + getResponse.getSourceAsBytesRef() + ) + ) { + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + Config config = Config.parse(parser); + this.masterKey = config.masterKey(); + listener.onResponse(true); + } catch (FlowFrameworkException e) { + listener.onFailure(e); + } } }, getRequestException -> { logger.error("Failed to search for config from config index", getRequestException); @@ -294,7 +314,16 @@ void initializeMasterKeyIfAbsent() { GetRequest getRequest = new GetRequest(CONFIG_INDEX).id(MASTER_KEY); client.get(getRequest, ActionListener.wrap(response -> { if (response.isExists()) { - this.masterKey = (String) response.getSourceAsMap().get(MASTER_KEY); + try ( + XContentParser parser = ParseUtils.createXContentParserFromRegistry( + xContentRegistry, + response.getSourceAsBytesRef() + ) + ) { + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + Config config = Config.parse(parser); + this.masterKey = config.masterKey(); + } } else { throw new FlowFrameworkException("Master key has not been initialized in config index", RestStatus.NOT_FOUND); } diff --git a/src/test/java/org/opensearch/flowframework/model/ConfigTests.java b/src/test/java/org/opensearch/flowframework/model/ConfigTests.java new file mode 100644 index 000000000..390803aae --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/model/ConfigTests.java @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.model; + +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.flowframework.util.ParseUtils; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; + +import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.mockito.Mockito.mock; + +public class ConfigTests extends OpenSearchTestCase { + private NamedXContentRegistry xContentRegistry; + + @Override + public void setUp() throws Exception { + super.setUp(); + this.xContentRegistry = mock(NamedXContentRegistry.class); + } + + public void testConfig() throws IOException { + String masterKey = "foo"; + Instant createTime = Instant.now().truncatedTo(ChronoUnit.MILLIS); + Config config = new Config(masterKey, createTime); + + assertEquals(masterKey, config.masterKey()); + assertEquals(createTime, config.createTime()); + + BytesReference bytesRef; + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + XContentBuilder source = config.toXContent(builder, ToXContent.EMPTY_PARAMS); + bytesRef = BytesReference.bytes(source); + } + try (XContentParser parser = ParseUtils.createXContentParserFromRegistry(xContentRegistry, bytesRef)) { + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + config = Config.parse(parser); + } + assertEquals(masterKey, config.masterKey()); + assertEquals(createTime, config.createTime()); + } +} diff --git a/src/test/java/org/opensearch/flowframework/transport/GetWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/GetWorkflowTransportActionTests.java index cdc4e5814..7a88199fb 100644 --- a/src/test/java/org/opensearch/flowframework/transport/GetWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/GetWorkflowTransportActionTests.java @@ -19,6 +19,7 @@ import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.flowframework.TestHelpers; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; @@ -50,8 +51,8 @@ public class GetWorkflowTransportActionTests extends OpenSearchTestCase { - private ThreadPool threadPool; private Client client; + private NamedXContentRegistry xContentRegistry; private GetWorkflowTransportAction getTemplateTransportAction; private FlowFrameworkIndicesHandler flowFrameworkIndicesHandler; private Template template; @@ -60,10 +61,10 @@ public class GetWorkflowTransportActionTests extends OpenSearchTestCase { @Override public void setUp() throws Exception { super.setUp(); - this.threadPool = mock(ThreadPool.class); this.client = mock(Client.class); + this.xContentRegistry = mock(NamedXContentRegistry.class); this.flowFrameworkIndicesHandler = mock(FlowFrameworkIndicesHandler.class); - this.encryptorUtils = new EncryptorUtils(mock(ClusterService.class), client); + this.encryptorUtils = new EncryptorUtils(mock(ClusterService.class), client, xContentRegistry); this.getTemplateTransportAction = new GetWorkflowTransportAction( mock(TransportService.class), mock(ActionFilters.class), diff --git a/src/test/java/org/opensearch/flowframework/util/EncryptorUtilsTests.java b/src/test/java/org/opensearch/flowframework/util/EncryptorUtilsTests.java index cae595430..ce46407b5 100644 --- a/src/test/java/org/opensearch/flowframework/util/EncryptorUtilsTests.java +++ b/src/test/java/org/opensearch/flowframework/util/EncryptorUtilsTests.java @@ -17,21 +17,28 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.flowframework.TestHelpers; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.model.Config; import org.opensearch.flowframework.model.Template; import org.opensearch.flowframework.model.Workflow; import org.opensearch.flowframework.model.WorkflowNode; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ThreadPool; +import java.io.IOException; +import java.time.Instant; import java.util.Collections; import java.util.List; import java.util.Map; import static org.opensearch.flowframework.common.CommonValue.CREDENTIAL_FIELD; -import static org.opensearch.flowframework.common.CommonValue.MASTER_KEY; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; @@ -42,6 +49,7 @@ public class EncryptorUtilsTests extends OpenSearchTestCase { private ClusterService clusterService; private Client client; + private NamedXContentRegistry xContentRegistry; private EncryptorUtils encryptorUtils; private String testMasterKey; private Template testTemplate; @@ -53,7 +61,8 @@ public void setUp() throws Exception { super.setUp(); this.clusterService = mock(ClusterService.class); this.client = mock(Client.class); - this.encryptorUtils = new EncryptorUtils(clusterService, client); + this.xContentRegistry = mock(NamedXContentRegistry.class); + this.encryptorUtils = new EncryptorUtils(clusterService, client, xContentRegistry); this.testMasterKey = encryptorUtils.generateMasterKey(); this.testCredentialKey = "credential_key"; this.testCredentialValue = "12345"; @@ -125,17 +134,23 @@ public void testEncryptWithDifferentMasterKey() { assertNotEquals(encrypted1, encrypted2); } - public void testInitializeMasterKeySuccess() { + public void testInitializeMasterKeySuccess() throws IOException { encryptorUtils.setMasterKey(null); String masterKey = encryptorUtils.generateMasterKey(); + BytesReference bytesRef; + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + Config config = new Config(masterKey, Instant.now()); + XContentBuilder source = config.toXContent(builder, ToXContent.EMPTY_PARAMS); + bytesRef = BytesReference.bytes(source); + } doAnswer(invocation -> { ActionListener getRequestActionListener = invocation.getArgument(1); // Stub get response for success case GetResponse getResponse = mock(GetResponse.class); when(getResponse.isExists()).thenReturn(true); - when(getResponse.getSourceAsMap()).thenReturn(Map.of(MASTER_KEY, masterKey)); + when(getResponse.getSourceAsBytesRef()).thenReturn(bytesRef); getRequestActionListener.onResponse(getResponse); return null;