diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java index d3523af5a7..598fbbf218 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java @@ -112,6 +112,11 @@ public void writeAll(Collection records, int timeoutInMillis) throws Exceptio } } + @Override + public void writeBytes(final byte[] bytes, final String key, int timeoutInMillis) throws Exception { + throw new RuntimeException("not supported"); + } + /** * Records egress and time elapsed metrics, while calling the doRead function to * do the actual read diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java index e1ecb3311c..ff0d712889 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java @@ -38,6 +38,20 @@ public interface Buffer> { */ void writeAll(Collection records, int timeoutInMillis) throws Exception; + /** + * Atomically writes bytes into the buffer + * + * @param bytes the bytes to be written to the buffer + * @param key key to use when writing to the buffer + * @param timeoutInMillis how long to wait before giving up + * @throws TimeoutException Unable to write to the buffer within the timeout + * @throws SizeOverflowException The number of records exceeds the total capacity of the buffer. This cannot be retried. + * @throws RuntimeException Other exceptions + */ + default void writeBytes(final byte[] bytes, final String key, int timeoutInMillis) throws Exception { + throw new RuntimeException("Not supported"); + } + /** * Retrieves and removes the batch of records from the head of the queue. The batch size is defined/determined by * the configuration attribute "batch_size" or the @param timeoutInMillis @@ -53,12 +67,34 @@ public interface Buffer> { */ void checkpoint(CheckpointState checkpointState); + /** + * Checks if the buffer is empty + * + * @return true if the buffer is empty, false otherwise + */ boolean isEmpty(); + /** + * Checks if the buffer supports raw bytes + * + * @return true if the buffer supports raw bytes, false otherwise + */ + default boolean isByteBuffer() { + return false; + } + + /** + * Returns buffer's drain timeout as duration + * + * @return buffers drain timeout + */ default Duration getDrainTimeout() { return Duration.ZERO; } + /** + * shuts down the buffer + */ default void shutdown() { } } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/ByteDecoder.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/ByteDecoder.java new file mode 100644 index 0000000000..46420ca7cc --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/ByteDecoder.java @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.codec; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.util.function.Consumer; + +public interface ByteDecoder extends Serializable { + /** + * Parses an {@link InputStream}. Implementors should call the {@link Consumer} for each + * {@link Record} loaded from the {@link InputStream}. + * + * @param inputStream The input stream for code to process + * @param eventConsumer The consumer which handles each event from the stream + * @throws IOException throws IOException when invalid input is received or incorrect codec name is provided + */ + void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException; + +} diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/HasByteDecoder.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/HasByteDecoder.java new file mode 100644 index 0000000000..8d9e907a54 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/HasByteDecoder.java @@ -0,0 +1,12 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.codec; + +public interface HasByteDecoder { + default ByteDecoder getDecoder() { + return null; + } +} diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/JsonDecoder.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/JsonDecoder.java new file mode 100644 index 0000000000..1aba7e56ee --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/JsonDecoder.java @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.codec; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.record.Record; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.Objects; +import java.util.function.Consumer; + +public class JsonDecoder implements ByteDecoder { + private final ObjectMapper objectMapper = new ObjectMapper(); + private final JsonFactory jsonFactory = new JsonFactory(); + + public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { + Objects.requireNonNull(inputStream); + Objects.requireNonNull(eventConsumer); + + final JsonParser jsonParser = jsonFactory.createParser(inputStream); + + while (!jsonParser.isClosed() && jsonParser.nextToken() != JsonToken.END_OBJECT) { + if (jsonParser.getCurrentToken() == JsonToken.START_ARRAY) { + parseRecordsArray(jsonParser, eventConsumer); + } + } + } + + private void parseRecordsArray(final JsonParser jsonParser, final Consumer> eventConsumer) throws IOException { + while (jsonParser.nextToken() != JsonToken.END_ARRAY) { + final Map innerJson = objectMapper.readValue(jsonParser, Map.class); + + final Record record = createRecord(innerJson); + eventConsumer.accept(record); + } + } + + private Record createRecord(final Map json) { + final JacksonEvent event = (JacksonEvent)JacksonLog.builder() + .withData(json) + .getThis() + .build(); + + return new Record<>(event); + } + +} diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginFactory.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginFactory.java index aa6c435920..b233d89043 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginFactory.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginFactory.java @@ -26,8 +26,9 @@ public interface PluginFactory { * @return A new instance of your plugin, configured * @since 1.2 */ - T loadPlugin(final Class baseClass, final PluginSetting pluginSetting); + T loadPlugin(final Class baseClass, final PluginSetting pluginSetting, final Object ... args); + /** /** * Loads a new instance of a plugin with SinkContext. * diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/Source.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/Source.java index 1205bc625f..b753594fc5 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/Source.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/Source.java @@ -7,12 +7,13 @@ import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.codec.HasByteDecoder; /** * Data Prepper source interface. Source acts as receiver of the events that flow * through the transformation pipeline. */ -public interface Source> { +public interface Source> extends HasByteDecoder { /** * Notifies the source to start writing the records into the buffer @@ -35,4 +36,5 @@ public interface Source> { default boolean areAcknowledgementsEnabled() { return false; } + } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/AbstractBufferTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/AbstractBufferTest.java index 17f13e4ae8..0d4139ef6d 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/AbstractBufferTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/AbstractBufferTest.java @@ -168,6 +168,13 @@ public void testCheckpointMetrics() throws Exception { 0.001)); } + @Test + public void testWriteBytes() throws TimeoutException { + final AbstractBuffer> abstractBuffer = new AbstractBufferTimeoutImpl(testPluginSetting); + byte[] bytes = new byte[2]; + Assert.assertThrows(RuntimeException.class, () -> abstractBuffer.writeBytes(bytes, "", 10)); + } + @Test public void testWriteTimeoutMetric() throws TimeoutException { // Given diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java index 01cb2e7f5a..bc9df29ba3 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java @@ -28,4 +28,21 @@ public void testShutdown() { final Buffer> buffer = spy(Buffer.class); buffer.shutdown(); } + + @Test + public void testIsByteBuffer() { + final Buffer> buffer = spy(Buffer.class); + + Assert.assertEquals(false, buffer.isByteBuffer()); + } + + @Test + public void testWriteBytes() { + final Buffer> buffer = spy(Buffer.class); + + byte[] bytes = new byte[2]; + Assert.assertThrows(RuntimeException.class, () -> buffer.writeBytes(bytes, "", 10)); + + } + } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/HasByteDecoderTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/HasByteDecoderTest.java new file mode 100644 index 0000000000..583b6b269c --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/HasByteDecoderTest.java @@ -0,0 +1,23 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.codec; + +import org.junit.Assert; +import org.junit.jupiter.api.Test; + +import static org.mockito.Mockito.spy; + +public class HasByteDecoderTest { + + @Test + public void testGetDecoder() { + final HasByteDecoder hasByteDecoder = spy(HasByteDecoder.class); + + Assert.assertEquals(null, hasByteDecoder.getDecoder()); + } + +} + diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/JsonDecoderTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/JsonDecoderTest.java new file mode 100644 index 0000000000..00b0f256cc --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/JsonDecoderTest.java @@ -0,0 +1,50 @@ +package org.opensearch.dataprepper.model.codec; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; + +import java.io.ByteArrayInputStream; +import java.util.Map; +import java.util.Random; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertNotEquals; + +import org.junit.jupiter.api.BeforeEach; + +public class JsonDecoderTest { + private JsonDecoder jsonDecoder; + private Record receivedRecord; + + private JsonDecoder createObjectUnderTest() { + return new JsonDecoder(); + } + + @BeforeEach + void setup() { + jsonDecoder = createObjectUnderTest(); + receivedRecord = null; + } + + @Test + void test_basicJsonDecoder() { + String stringValue = UUID.randomUUID().toString(); + Random r = new Random(); + int intValue = r.nextInt(); + String inputString = "[{\"key1\":\""+stringValue+"\", \"key2\":"+intValue+"}]"; + try { + jsonDecoder.parse(new ByteArrayInputStream(inputString.getBytes()), (record) -> { + receivedRecord = record; + }); + } catch (Exception e){} + + assertNotEquals(receivedRecord, null); + Map map = receivedRecord.getData().toMap(); + assertThat(map.get("key1"), equalTo(stringValue)); + assertThat(map.get("key2"), equalTo(intValue)); + } + +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java index 1462a38049..d520fe5062 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java @@ -114,10 +114,8 @@ private void buildPipelineFromConfiguration( final Source source = pipelineSource.orElseGet(() -> pluginFactory.loadPlugin(Source.class, sourceSetting)); - - LOG.info("Building buffer for the pipeline [{}]", pipelineName); - final Buffer pipelineDefinedBuffer = pluginFactory.loadPlugin(Buffer.class, pipelineConfiguration.getBufferPluginSetting()); + final Buffer pipelineDefinedBuffer = pluginFactory.loadPlugin(Buffer.class, pipelineConfiguration.getBufferPluginSetting(), source.getDecoder()); LOG.info("Building processors for the pipeline [{}]", pipelineName); final int processorThreads = pipelineConfiguration.getWorkers(); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ComponentPluginArgumentsContext.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ComponentPluginArgumentsContext.java index 7f68405f3b..bb688abb8f 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ComponentPluginArgumentsContext.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ComponentPluginArgumentsContext.java @@ -77,21 +77,30 @@ private ComponentPluginArgumentsContext(final Builder builder) { } @Override - public Object[] createArguments(final Class[] parameterTypes) { + public Object[] createArguments(final Class[] parameterTypes, final Object ... args) { + Map, Supplier> optionalArgumentsSuppliers = new HashMap<>(); + for (final Object arg: args) { + if (Objects.nonNull(arg)) { + optionalArgumentsSuppliers.put(arg.getClass(), () -> arg); + for (final Class interfaceClass: arg.getClass().getInterfaces()) { + optionalArgumentsSuppliers.put(interfaceClass, () -> arg); + } + } + } return Arrays.stream(parameterTypes) - .map(this::getRequiredArgumentSupplier) + .map(parameterType -> getRequiredArgumentSupplier(parameterType, optionalArgumentsSuppliers)) .map(Supplier::get) .toArray(); } - private Supplier getRequiredArgumentSupplier(final Class parameterType) { + private Supplier getRequiredArgumentSupplier(final Class parameterType, Map, Supplier> optionalArgumentsSuppliers) { if(typedArgumentsSuppliers.containsKey(parameterType)) { return typedArgumentsSuppliers.get(parameterType); - } - else if (beanFactory != null) { + } else if(optionalArgumentsSuppliers.containsKey(parameterType)) { + return optionalArgumentsSuppliers.get(parameterType); + } else if (beanFactory != null) { return createBeanSupplier(parameterType, beanFactory); - } - else { + } else { throw new InvalidPluginDefinitionException(UNABLE_TO_CREATE_PLUGIN_PARAMETER + parameterType); } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java index 5a26c9518e..bd53b9678a 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java @@ -72,13 +72,13 @@ public class DefaultPluginFactory implements PluginFactory { } @Override - public T loadPlugin(final Class baseClass, final PluginSetting pluginSetting) { + public T loadPlugin(final Class baseClass, final PluginSetting pluginSetting, final Object ... args) { final String pluginName = pluginSetting.getName(); final Class pluginClass = getPluginClass(baseClass, pluginName); final ComponentPluginArgumentsContext constructionContext = getConstructionContext(pluginSetting, pluginClass, null); - return pluginCreator.newPluginInstance(pluginClass, constructionContext, pluginName); + return pluginCreator.newPluginInstance(pluginClass, constructionContext, pluginName, args); } @Override diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ExtensionLoader.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ExtensionLoader.java index ebe42ee129..5d5b675c13 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ExtensionLoader.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ExtensionLoader.java @@ -74,7 +74,7 @@ static String classNameToPluginName(final String className) { protected static class NoArgumentsArgumentsContext implements PluginArgumentsContext { @Override - public Object[] createArguments(final Class[] parameterTypes) { + public Object[] createArguments(final Class[] parameterTypes, final Object ... args) { if(parameterTypes.length != 0) { throw new InvalidPluginDefinitionException("No arguments are permitted for extensions constructors."); } @@ -90,7 +90,7 @@ protected static class SingleConfigArgumentArgumentsContext implements PluginArg } @Override - public Object[] createArguments(Class[] parameterTypes) { + public Object[] createArguments(Class[] parameterTypes, final Object ... args) { if (parameterTypes.length != 1 && (Objects.nonNull(extensionPluginConfiguration) && !parameterTypes[0].equals(extensionPluginConfiguration.getClass()))) { throw new InvalidPluginDefinitionException(String.format( diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/PluginArgumentsContext.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/PluginArgumentsContext.java index b7736ce52a..c564238176 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/PluginArgumentsContext.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/PluginArgumentsContext.java @@ -6,5 +6,5 @@ package org.opensearch.dataprepper.plugin; interface PluginArgumentsContext { - Object[] createArguments(final Class[] parameterTypes); + Object[] createArguments(final Class[] parameterTypes, final Object ... args); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/PluginCreator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/PluginCreator.java index 947b8543ba..cb19233e8a 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/PluginCreator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/PluginCreator.java @@ -35,14 +35,15 @@ class PluginCreator { T newPluginInstance(final Class pluginClass, final PluginArgumentsContext pluginArgumentsContext, - final String pluginName) { + final String pluginName, + final Object... args) { Objects.requireNonNull(pluginClass); Objects.requireNonNull(pluginArgumentsContext); Objects.requireNonNull(pluginName); final Constructor constructor = getConstructor(pluginClass, pluginName); - final Object[] constructorArguments = pluginArgumentsContext.createArguments(constructor.getParameterTypes()); + final Object[] constructorArguments = pluginArgumentsContext.createArguments(constructor.getParameterTypes(), args); pluginConfigurationObservableRegister.registerPluginConfigurationObservables(constructorArguments); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugins/MultiBufferDecorator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugins/MultiBufferDecorator.java index b8be2e0256..cd199a7486 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugins/MultiBufferDecorator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugins/MultiBufferDecorator.java @@ -41,6 +41,11 @@ public void writeAll(final Collection records, final int timeoutInMillis) thr primaryBuffer.writeAll(records, timeoutInMillis); } + @Override + public void writeBytes(final byte[] bytes, final String key, int timeoutInMillis) throws Exception { + primaryBuffer.writeBytes(bytes, key, timeoutInMillis); + } + @Override public Map.Entry, CheckpointState> read(final int timeoutInMillis) { return primaryBuffer.read(timeoutInMillis); @@ -51,6 +56,11 @@ public void checkpoint(final CheckpointState checkpointState) { primaryBuffer.checkpoint(checkpointState); } + @Override + public boolean isByteBuffer() { + return primaryBuffer.isByteBuffer(); + } + @Override public boolean isEmpty() { return primaryBuffer.isEmpty() && secondaryBuffers.stream() @@ -70,4 +80,4 @@ public void shutdown() { primaryBuffer.shutdown(); secondaryBuffers.forEach(Buffer::shutdown); } -} \ No newline at end of file +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ComponentPluginArgumentsContextTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ComponentPluginArgumentsContextTest.java index 9d3b9bdf0b..7885b14981 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ComponentPluginArgumentsContextTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ComponentPluginArgumentsContextTest.java @@ -157,6 +157,19 @@ void createArguments_with_multiple_supplier_sources() { equalTo(new Object[] {testPluginConfiguration, pluginSetting, mock})); } + @Test + void createArguments_with_multiple_supplier_sources_with_varargs() { + final Object mock = mock(Object.class); + + final ComponentPluginArgumentsContext objectUnderTest = new ComponentPluginArgumentsContext.Builder() + .withPluginSetting(pluginSetting) + .withPluginConfiguration(testPluginConfiguration) + .build(); + + assertThat(objectUnderTest.createArguments(new Class[] { TestPluginConfiguration.class, PluginSetting.class, Object.class}, mock), + equalTo(new Object[] {testPluginConfiguration, pluginSetting, mock})); + } + @Test void createArguments_with_two_classes() { final ComponentPluginArgumentsContext objectUnderTest = new ComponentPluginArgumentsContext.Builder() diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java index ca43a31e48..943c582ac3 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java @@ -13,6 +13,7 @@ import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; import org.opensearch.dataprepper.plugins.TestPlugin; +import org.opensearch.dataprepper.plugins.TestObjectPlugin; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.context.annotation.AnnotationConfigApplicationContext; @@ -40,6 +41,7 @@ class DefaultPluginFactoryIT { @Mock private DataPrepperConfiguration dataPrepperConfiguration; private String pluginName; + private String objectPluginName; private String pipelineName; private DefaultEventFactory eventFactory; @@ -49,6 +51,7 @@ class DefaultPluginFactoryIT { @BeforeEach void setUp() { pluginName = "test_plugin"; + objectPluginName = "test_object_plugin"; pipelineName = UUID.randomUUID().toString(); } @@ -93,6 +96,31 @@ void loadPlugin_should_return_a_new_plugin_instance_with_the_expected_configurat assertThat(configuration.getOptionalString(), equalTo(optionalStringValue)); } + @Test + void loadPlugin_should_return_a_new_plugin_instance_with_the_expected_configuration_variable_args() { + + final String requiredStringValue = UUID.randomUUID().toString(); + final String optionalStringValue = UUID.randomUUID().toString(); + + final Map pluginSettingMap = new HashMap<>(); + pluginSettingMap.put("required_string", requiredStringValue); + pluginSettingMap.put("optional_string", optionalStringValue); + final PluginSetting pluginSetting = createObjectPluginSettings(pluginSettingMap); + + final Object object = new Object(); + final TestPluggableInterface plugin = createObjectUnderTest().loadPlugin(TestPluggableInterface.class, pluginSetting, object); + + assertThat(plugin, instanceOf(TestObjectPlugin.class)); + + final TestObjectPlugin testPlugin = (TestObjectPlugin) plugin; + + final TestPluginConfiguration configuration = testPlugin.getConfiguration(); + + assertThat(testPlugin.getObject(), equalTo(object)); + assertThat(configuration.getRequiredString(), equalTo(requiredStringValue)); + assertThat(configuration.getOptionalString(), equalTo(optionalStringValue)); + } + @Test void loadPlugin_should_throw_when_a_plugin_configuration_is_invalid() { final String optionalStringValue = UUID.randomUUID().toString(); @@ -115,4 +143,10 @@ private PluginSetting createPluginSettings(final Map pluginSetti pluginSetting.setPipelineName(pipelineName); return pluginSetting; } + + private PluginSetting createObjectPluginSettings(final Map pluginSettingMap) { + final PluginSetting pluginSetting = new PluginSetting(objectPluginName, pluginSettingMap); + pluginSetting.setPipelineName(pipelineName); + return pluginSetting; + } } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryTest.java index 86b57af90b..a43b9c17c4 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryTest.java @@ -266,6 +266,32 @@ void loadPlugins_should_return_a_single_instance_when_the_the_numberOfInstances_ assertThat(plugins.get(0), equalTo(expectedInstance)); } + @Test + void loadPlugin_with_varargs_should_return_a_single_instance_when_the_the_numberOfInstances_is_1() { + final Object object = new Object(); + final TestSink expectedInstance = mock(TestSink.class); + final Object convertedConfiguration = mock(Object.class); + given(pluginConfigurationConverter.convert(PluginSetting.class, pluginSetting)) + .willReturn(convertedConfiguration); + given(pluginCreator.newPluginInstance(eq(expectedPluginClass), any(ComponentPluginArgumentsContext.class), eq(pluginName), eq(object))) + .willReturn(expectedInstance); + + final Object plugin = createObjectUnderTest().loadPlugin(baseClass, pluginSetting, object); + + verify(beanFactoryProvider).get(); + final ArgumentCaptor pluginArgumentsContextArgCapture = ArgumentCaptor.forClass(ComponentPluginArgumentsContext.class); + verify(pluginCreator).newPluginInstance(eq(expectedPluginClass), pluginArgumentsContextArgCapture.capture(), eq(pluginName), eq(object)); + final ComponentPluginArgumentsContext actualPluginArgumentsContext = pluginArgumentsContextArgCapture.getValue(); + final List classes = List.of(PipelineDescription.class); + final Object[] pipelineDescriptionObj = actualPluginArgumentsContext.createArguments(classes.toArray(new Class[1])); + assertThat(pipelineDescriptionObj.length, equalTo(1)); + assertThat(pipelineDescriptionObj[0], instanceOf(PipelineDescription.class)); + final PipelineDescription actualPipelineDescription = (PipelineDescription)pipelineDescriptionObj[0]; + assertThat(actualPipelineDescription.getPipelineName(), is(pipelineName)); + assertThat(plugin, notNullValue()); + assertThat(plugin, equalTo(expectedInstance)); + } + @Test void loadPlugins_should_return_an_instance_for_the_total_count() { final TestSink expectedInstance1 = mock(TestSink.class); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ExtensionLoaderTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ExtensionLoaderTest.java index 956c343060..76ca6900b8 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ExtensionLoaderTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ExtensionLoaderTest.java @@ -196,7 +196,7 @@ void loadExtensions_invokes_newPluginInstance_with_PluginArgumentsContext_which_ verify(pluginCreator).newPluginInstance( eq(pluginClass), contextArgumentCaptor.capture(), - anyString()); + any()); final PluginArgumentsContext actualPluginArgumentsContext = contextArgumentCaptor.getValue(); @@ -230,4 +230,4 @@ private interface TestExtension2 extends ExtensionPlugin { } private interface TestExtension3 extends ExtensionPlugin { } -} \ No newline at end of file +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/PluginCreatorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/PluginCreatorTest.java index daf5a2c57d..50617f50f1 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/PluginCreatorTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/PluginCreatorTest.java @@ -73,6 +73,22 @@ public PluginClassWithMultipleConstructors(final PluginSetting pluginSetting, fi this.pluginSetting = pluginSetting; this.alternatePluginConfig = alternatePluginConfig; } + + } + + public static class PluginClassWithThreeArgs extends PluginClassWithMultipleConstructors { + private Object obj; + private PluginSetting pluginSetting; + private AlternatePluginConfig alternatePluginConfig; + + public PluginClassWithThreeArgs() {} + public PluginClassWithThreeArgs(final String ignored) { } + @DataPrepperPluginConstructor + public PluginClassWithThreeArgs(final PluginSetting pluginSetting, final AlternatePluginConfig alternatePluginConfig, Object obj) { + this.pluginSetting = pluginSetting; + this.alternatePluginConfig = alternatePluginConfig; + this.obj = obj; + } } public static class PluginClassWithPluginConfigurationObservableConstructor { @@ -125,6 +141,23 @@ void newPluginInstance_should_create_new_instance_from_annotated_constructor() { assertThat(instance.alternatePluginConfig, equalTo(alternatePluginConfig)); } + @Test + void newPluginInstance_should_create_new_instance_from_annotated_constructor_with_byte_decoder() { + + Object obj = new Object(); + final AlternatePluginConfig alternatePluginConfig = mock(AlternatePluginConfig.class); + given(pluginConstructionContext.createArguments(new Class[] {PluginSetting.class, AlternatePluginConfig.class, Object.class}, obj)) + .willReturn(new Object[] { pluginSetting, alternatePluginConfig, obj}); + + final PluginClassWithThreeArgs instance = createObjectUnderTest() + .newPluginInstance(PluginClassWithThreeArgs.class, pluginConstructionContext, pluginName, obj); + + assertThat(instance, notNullValue()); + assertThat(instance.pluginSetting, equalTo(pluginSetting)); + assertThat(instance.alternatePluginConfig, equalTo(alternatePluginConfig)); + assertThat(instance.obj, equalTo(obj)); + } + @Test void newPluginInstance_should_register_pluginConfigurationObservable() { final PluginCreator objectUnderTest = new PluginCreator(pluginConfigurationObservableRegister); @@ -185,4 +218,4 @@ void newPluginInstance_should_throw_if_plugin_throws_in_constructor() { assertThrows(PluginInvocationException.class, () -> objectUnderTest.newPluginInstance(AlwaysThrowingPluginClass.class, pluginConstructionContext, pluginName)); } -} \ No newline at end of file +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestObjectPlugin.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestObjectPlugin.java new file mode 100644 index 0000000000..46dd0cd490 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestObjectPlugin.java @@ -0,0 +1,36 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins; + +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.plugin.TestPluggableInterface; +import org.opensearch.dataprepper.plugin.TestPluginConfiguration; + +/** + * Used for integration testing the plugin framework. + * TODO: Move this into the org.opensearch.dataprepper.plugin package once alternate packages are supported per #379. + */ +@DataPrepperPlugin(name = "test_object_plugin", pluginType = TestPluggableInterface.class, pluginConfigurationType = TestPluginConfiguration.class) +public class TestObjectPlugin implements TestPluggableInterface { + private final TestPluginConfiguration configuration; + private final Object object; + + @DataPrepperPluginConstructor + public TestObjectPlugin(final TestPluginConfiguration configuration, Object obj) { + this.configuration = configuration; + this.object = obj; + + } + public TestPluginConfiguration getConfiguration() { + return configuration; + } + + public Object getObject() { + return object; + } +} + diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPlugin.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPlugin.java index 67dffb2714..6d6ac8aa37 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPlugin.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPlugin.java @@ -17,13 +17,19 @@ @DataPrepperPlugin(name = "test_plugin", pluginType = TestPluggableInterface.class, pluginConfigurationType = TestPluginConfiguration.class) public class TestPlugin implements TestPluggableInterface { private final TestPluginConfiguration configuration; + private final Object obj; @DataPrepperPluginConstructor public TestPlugin(final TestPluginConfiguration configuration) { this.configuration = configuration; + this.obj = null; } public TestPluginConfiguration getConfiguration() { return configuration; } + + public Object getObject() { + return obj; + } } diff --git a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java index 0502c49bc5..1c80191c50 100644 --- a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java +++ b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java @@ -19,6 +19,8 @@ import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; +import org.opensearch.dataprepper.model.codec.ByteDecoder; +import org.opensearch.dataprepper.model.codec.JsonDecoder; import com.linecorp.armeria.server.HttpService; import com.linecorp.armeria.server.Server; import com.linecorp.armeria.server.ServerBuilder; @@ -54,6 +56,7 @@ public class HTTPSource implements Source> { private Server server; private final PluginMetrics pluginMetrics; private static final String HTTP_HEALTH_CHECK_PATH = "/health"; + private ByteDecoder byteDecoder; @DataPrepperPluginConstructor public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, @@ -61,6 +64,7 @@ public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics plugi this.sourceConfig = sourceConfig; this.pluginMetrics = pluginMetrics; this.pipelineName = pipelineDescription.getPipelineName(); + this.byteDecoder = new JsonDecoder(); this.certificateProviderFactory = new CertificateProviderFactory(sourceConfig); final PluginModel authenticationConfiguration = sourceConfig.getAuthentication(); final PluginSetting authenticationPluginSetting; @@ -130,7 +134,7 @@ public void start(final Buffer> buffer) { final String httpSourcePath = sourceConfig.getPath().replace(PIPELINE_NAME_PLACEHOLDER, pipelineName); sb.decorator(httpSourcePath, ThrottlingService.newDecorator(logThrottlingStrategy, logThrottlingRejectHandler)); - final LogHTTPService logHTTPService = new LogHTTPService(sourceConfig.getBufferTimeoutInMillis(), buffer, pluginMetrics); + final LogHTTPService logHTTPService = new LogHTTPService(sourceConfig.getBufferTimeoutInMillis(), buffer, byteDecoder, pluginMetrics); if (CompressionOption.NONE.equals(sourceConfig.getCompression())) { sb.annotatedService(httpSourcePath, logHTTPService, httpRequestExceptionHandler); @@ -161,6 +165,11 @@ public void start(final Buffer> buffer) { LOG.info("Started http source on port " + sourceConfig.getPort() + "..."); } + @Override + public ByteDecoder getDecoder() { + return byteDecoder; + } + @Override public void stop() { if (server != null) { diff --git a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java index 7cdb107b3b..b43891108b 100644 --- a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java +++ b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.log.Log; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.codec.ByteDecoder; import com.linecorp.armeria.common.AggregatedHttpRequest; import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpResponse; @@ -51,6 +52,7 @@ public class LogHTTPService { public LogHTTPService(final int bufferWriteTimeoutInMillis, final Buffer> buffer, + final ByteDecoder decoder, final PluginMetrics pluginMetrics) { this.buffer = buffer; this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis; @@ -74,20 +76,26 @@ public HttpResponse doPost(final ServiceRequestContext serviceRequestContext, fi } private HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRequest) throws Exception { - List jsonList; final HttpData content = aggregatedHttpRequest.content(); + List jsonList; try { jsonList = jsonCodec.parse(content); } catch (IOException e) { - LOG.error("Failed to write the request of size {} due to: {}", content.length(), e.getMessage()); + LOG.error("Failed to parse the request of size {} due to: {}", content.length(), e.getMessage()); throw new IOException("Bad request data format. Needs to be json array.", e.getCause()); } - final List> records = jsonList.stream() - .map(this::buildRecordLog) - .collect(Collectors.toList()); try { - buffer.writeAll(records, bufferWriteTimeoutInMillis); + if (buffer.isByteBuffer()) { + // jsonList is ignored in this path but parse() was done to make + // sure that the data is in the expected json format + buffer.writeBytes(content.array(), null, bufferWriteTimeoutInMillis); + } else { + final List> records = jsonList.stream() + .map(this::buildRecordLog) + .collect(Collectors.toList()); + buffer.writeAll(records, bufferWriteTimeoutInMillis); + } } catch (Exception e) { LOG.error("Failed to write the request of size {} due to: {}", content.length(), e.getMessage()); throw e; diff --git a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPServiceTest.java b/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPServiceTest.java index 9a5ca43024..bd289de769 100644 --- a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPServiceTest.java +++ b/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPServiceTest.java @@ -95,7 +95,7 @@ public void setUp() throws Exception { ); Buffer> blockingBuffer = new BlockingBuffer<>(TEST_BUFFER_CAPACITY, 8, "test-pipeline"); - logHTTPService = new LogHTTPService(TEST_TIMEOUT_IN_MILLIS, blockingBuffer, pluginMetrics); + logHTTPService = new LogHTTPService(TEST_TIMEOUT_IN_MILLIS, blockingBuffer, null, pluginMetrics); } @Test @@ -195,4 +195,4 @@ private AggregatedHttpRequest generateBadHTTPRequest() throws ExecutionException HttpData httpData = HttpData.ofUtf8("{"); return HttpRequest.of(requestHeaders, httpData).aggregate().get(); } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java index a9fac35ef9..4de082d415 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java @@ -91,7 +91,7 @@ void setUp() { } private KafkaBuffer> createObjectUnderTest() { - return new KafkaBuffer<>(pluginSetting, kafkaBufferConfig, pluginFactory, acknowledgementSetManager, pluginMetrics, null); + return new KafkaBuffer<>(pluginSetting, kafkaBufferConfig, pluginFactory, acknowledgementSetManager, pluginMetrics, null, null); } @Test diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java index 491caf06fd..36fd90a100 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java @@ -330,7 +330,6 @@ public void TestJsonRecordsWithKafkaKeyModeAsMetadata() throws Exception { kafkaSource = createObjectUnderTest(); kafkaSource.start(buffer); - assertThat(kafkaSource.getConsumer().groupMetadata().groupId(), equalTo(testGroup)); produceJsonRecords(bootstrapServers, testTopic, numRecords); int numRetries = 0; while (numRetries++ < 10 && (receivedRecords.size() != numRecords)) { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java index 4d427d9ecf..b139fe5db9 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java @@ -3,6 +3,7 @@ import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.CheckpointState; +import org.opensearch.dataprepper.model.codec.ByteDecoder; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; @@ -43,26 +44,38 @@ public class KafkaBuffer> extends AbstractBuffer { private final ExecutorService executorService; private final Duration drainTimeout; private AtomicBoolean shutdownInProgress; + private ByteDecoder byteDecoder; @DataPrepperPluginConstructor public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig kafkaBufferConfig, final PluginFactory pluginFactory, final AcknowledgementSetManager acknowledgementSetManager, final PluginMetrics pluginMetrics, - final AwsCredentialsSupplier awsCredentialsSupplier) { + final ByteDecoder byteDecoder, final AwsCredentialsSupplier awsCredentialsSupplier) { super(pluginSetting); SerializationFactory serializationFactory = new SerializationFactory(); final KafkaCustomProducerFactory kafkaCustomProducerFactory = new KafkaCustomProducerFactory(serializationFactory, awsCredentialsSupplier); + this.byteDecoder = byteDecoder; producer = kafkaCustomProducerFactory.createProducer(kafkaBufferConfig, pluginFactory, pluginSetting, null, null); final KafkaCustomConsumerFactory kafkaCustomConsumerFactory = new KafkaCustomConsumerFactory(serializationFactory, awsCredentialsSupplier); innerBuffer = new BlockingBuffer<>(INNER_BUFFER_CAPACITY, INNER_BUFFER_BATCH_SIZE, pluginSetting.getPipelineName()); this.shutdownInProgress = new AtomicBoolean(false); final List consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(), - innerBuffer, pluginMetrics, acknowledgementSetManager, shutdownInProgress); + innerBuffer, pluginMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress); this.executorService = Executors.newFixedThreadPool(consumers.size()); consumers.forEach(this.executorService::submit); this.drainTimeout = kafkaBufferConfig.getDrainTimeout(); } + @Override + public void writeBytes(final byte[] bytes, final String key, int timeoutInMillis) throws Exception { + try { + producer.produceRawData(bytes, key); + } catch (final Exception e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e); + } + } + @Override public void doWrite(T record, int timeoutInMillis) throws TimeoutException { try { @@ -73,6 +86,11 @@ public void doWrite(T record, int timeoutInMillis) throws TimeoutException { } } + @Override + public boolean isByteBuffer() { + return true; + } + @Override public void doWriteAll(Collection records, int timeoutInMillis) throws Exception { for ( T record: records ) { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java index bc80e7964b..79e50f0647 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java @@ -28,6 +28,7 @@ import org.opensearch.dataprepper.model.event.EventMetadata; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.codec.ByteDecoder; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaConsumerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaKeyMode; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; @@ -38,6 +39,8 @@ import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.glue.model.AccessDeniedException; +import java.io.ByteArrayInputStream; +import java.io.InputStream; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; @@ -88,6 +91,7 @@ public class KafkaCustomConsumer implements Runnable, ConsumerRebalanceListener private final AtomicInteger numberOfAcksPending; private long numRecordsCommitted = 0; private final LogRateLimiter errLogRateLimiter; + private final ByteDecoder byteDecoder; public KafkaCustomConsumer(final KafkaConsumer consumer, final AtomicBoolean shutdownInProgress, @@ -96,12 +100,14 @@ public KafkaCustomConsumer(final KafkaConsumer consumer, final TopicConfig topicConfig, final String schemaType, final AcknowledgementSetManager acknowledgementSetManager, + final ByteDecoder byteDecoder, KafkaTopicMetrics topicMetrics) { this.topicName = topicConfig.getName(); this.topicConfig = topicConfig; this.shutdownInProgress = shutdownInProgress; this.consumer = consumer; this.buffer = buffer; + this.byteDecoder = byteDecoder; this.topicMetrics = topicMetrics; this.topicMetrics.register(consumer); this.offsetsToCommit = new HashMap<>(); @@ -392,6 +398,31 @@ private Record getRecord(ConsumerRecord consumerRecord, in return new Record(event); } + private void processRecord(final AcknowledgementSet acknowledgementSet, final Record record) { + // Always add record to acknowledgementSet before adding to + // buffer because another thread may take and process + // buffer contents before the event record is added + // to acknowledgement set + if (acknowledgementSet != null) { + acknowledgementSet.add(record.getData()); + } + while (true) { + try { + bufferAccumulator.add(record); + break; + } catch (Exception e) { + if (e instanceof SizeOverflowException) { + topicMetrics.getNumberOfBufferSizeOverflows().increment(); + } else { + LOG.debug("Error while adding record to buffer, retrying ", e); + } + try { + Thread.sleep(100); + } catch (Exception ex) {} // ignore the exception because it only means the thread slept for shorter time + } + } + } + private void iterateRecordPartitions(ConsumerRecords records, final AcknowledgementSet acknowledgementSet, Map offsets) throws Exception { for (TopicPartition topicPartition : records.partitions()) { @@ -405,23 +436,15 @@ private void iterateRecordPartitions(ConsumerRecords records, fin List> partitionRecords = records.records(topicPartition); for (ConsumerRecord consumerRecord : partitionRecords) { - Record record = getRecord(consumerRecord, topicPartition.partition()); - if (record != null) { - // Always add record to acknowledgementSet before adding to - // buffer because another thread may take and process - // buffer contents before the event record is added - // to acknowledgement set - if (acknowledgementSet != null) { - acknowledgementSet.add(record.getData()); - } - while (true) { - try { - bufferAccumulator.add(record); - break; - } catch (SizeOverflowException e) { - topicMetrics.getNumberOfBufferSizeOverflows().increment(); - Thread.sleep(100); - } + if (schema == MessageFormat.BYTES && byteDecoder != null) { + InputStream inputStream = new ByteArrayInputStream((byte[])consumerRecord.value()); + byteDecoder.parse(inputStream, (record) -> { + processRecord(acknowledgementSet, record); + }); + } else { + Record record = getRecord(consumerRecord, topicPartition.partition()); + if (record != null) { + processRecord(acknowledgementSet, record); } } } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java index a3094011c1..b686dcd113 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java @@ -18,6 +18,7 @@ import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.codec.ByteDecoder; import org.opensearch.dataprepper.plugins.kafka.common.KafkaDataConfig; import org.opensearch.dataprepper.plugins.kafka.common.KafkaDataConfigAdapter; import org.opensearch.dataprepper.plugins.kafka.common.PlaintextKafkaDataConfig; @@ -64,6 +65,7 @@ public KafkaCustomConsumerFactory(SerializationFactory serializationFactory, Aws public List createConsumersForTopic(final KafkaConsumerConfig kafkaConsumerConfig, final TopicConfig topic, final Buffer> buffer, final PluginMetrics pluginMetrics, final AcknowledgementSetManager acknowledgementSetManager, + final ByteDecoder byteDecoder, final AtomicBoolean shutdownInProgress) { Properties authProperties = new Properties(); KafkaSecurityConfigurer.setAuthProperties(authProperties, kafkaConsumerConfig, LOG); @@ -91,7 +93,7 @@ public List createConsumersForTopic(final KafkaConsumerConf final KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerProperties, keyDeserializer, valueDeserializer); consumers.add(new KafkaCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, kafkaConsumerConfig, topic, - schemaType, acknowledgementSetManager, topicMetrics)); + schemaType, acknowledgementSetManager, byteDecoder, topicMetrics)); }); } catch (Exception e) { @@ -238,4 +240,4 @@ private void setupConfluentSchemaRegistry(final SchemaConfig schemaConfig, final throw new RuntimeException("RegistryURL must be specified for confluent schema registry"); } } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java index 6417aaf670..ea0d6f6d59 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java @@ -85,8 +85,14 @@ public KafkaCustomProducer(final Producer producer, this.topicName = ObjectUtils.isEmpty(kafkaProducerConfig.getTopic()) ? null : kafkaProducerConfig.getTopic().getName(); this.serdeFormat = ObjectUtils.isEmpty(kafkaProducerConfig.getSerdeFormat()) ? null : kafkaProducerConfig.getSerdeFormat(); schemaService = new SchemaService.SchemaServiceBuilder().getFetchSchemaService(topicName, kafkaProducerConfig.getSchemaConfig()).build(); + } - + public void produceRawData(final byte[] bytes, final String key) { + try { + send(topicName, key, bytes).get(); + } catch (Exception e) { + LOG.error("Error occurred while publishing {}", e.getMessage()); + } } public void produceRecords(final Record record) { @@ -104,7 +110,7 @@ public void produceRecords(final Record record) { publishPlaintextMessage(record, key); } } catch (Exception e) { - LOG.error("Error occurred while publishing " + e.getMessage()); + LOG.error("Error occurred while publishing {}", e.getMessage()); releaseEventHandles(false); } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index f439b6f3c8..3321e0d2c2 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -138,7 +138,7 @@ public void start(Buffer> buffer) { } } - consumer = new KafkaCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType, acknowledgementSetManager, topicMetrics); + consumer = new KafkaCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType, acknowledgementSetManager, null, topicMetrics); allTopicConsumers.add(consumer); executorService.submit(consumer); diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java index b045a53ae3..6747ab4894 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java @@ -124,7 +124,7 @@ public KafkaBuffer> createObjectUnderTest() { })) { executorsMockedStatic.when(() -> Executors.newFixedThreadPool(anyInt())).thenReturn(executorService); - return new KafkaBuffer>(pluginSetting, bufferConfig, pluginFactory, acknowledgementSetManager, pluginMetrics, awsCredentialsSupplier); + return new KafkaBuffer>(pluginSetting, bufferConfig, pluginFactory, acknowledgementSetManager, pluginMetrics, null, awsCredentialsSupplier); } } @@ -273,4 +273,4 @@ public void testShutdown_InterruptedException() throws InterruptedException { verify(executorService).awaitTermination(eq(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT), eq(TimeUnit.SECONDS)); verify(executorService).shutdownNow(); } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java index fda9252117..0d443e4413 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java @@ -146,7 +146,7 @@ public void setUp() { public KafkaCustomConsumer createObjectUnderTest(String schemaType, boolean acknowledgementsEnabled) { when(sourceConfig.getAcknowledgementsEnabled()).thenReturn(acknowledgementsEnabled); - return new KafkaCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topicConfig, schemaType, acknowledgementSetManager, topicMetrics); + return new KafkaCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topicConfig, schemaType, acknowledgementSetManager, null, topicMetrics); } private BlockingBuffer> getBuffer() { diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodec.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodec.java index b28d291b49..51aacb7b1e 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodec.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodec.java @@ -5,22 +5,15 @@ package org.opensearch.dataprepper.plugins.codec.json; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; -import com.fasterxml.jackson.databind.ObjectMapper; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.codec.DecompressionEngine; import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.io.InputFile; -import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.codec.JsonDecoder; import java.io.IOException; -import java.io.InputStream; -import java.util.Map; import java.util.Objects; import java.util.function.Consumer; @@ -28,25 +21,7 @@ * An implementation of {@link InputCodec} which parses JSON Objects for arrays. */ @DataPrepperPlugin(name = "json", pluginType = InputCodec.class) -public class JsonInputCodec implements InputCodec { - - private final ObjectMapper objectMapper = new ObjectMapper(); - private final JsonFactory jsonFactory = new JsonFactory(); - - @Override - public void parse(final InputStream inputStream, final Consumer> eventConsumer) throws IOException { - - Objects.requireNonNull(inputStream); - Objects.requireNonNull(eventConsumer); - - final JsonParser jsonParser = jsonFactory.createParser(inputStream); - - while (!jsonParser.isClosed() && jsonParser.nextToken() != JsonToken.END_OBJECT) { - if (jsonParser.getCurrentToken() == JsonToken.START_ARRAY) { - parseRecordsArray(jsonParser, eventConsumer); - } - } - } +public class JsonInputCodec extends JsonDecoder implements InputCodec { @Override public void parse( @@ -59,22 +34,4 @@ public void parse( parse(decompressionEngine.createInputStream(inputFile.newStream()), eventConsumer); } - - private void parseRecordsArray(final JsonParser jsonParser, final Consumer> eventConsumer) throws IOException { - while (jsonParser.nextToken() != JsonToken.END_ARRAY) { - final Map innerJson = objectMapper.readValue(jsonParser, Map.class); - - final Record record = createRecord(innerJson); - eventConsumer.accept(record); - } - } - - private Record createRecord(final Map json) { - final JacksonEvent event = JacksonLog.builder() - .withData(json) - .build(); - - return new Record<>(event); - } - }