Skip to content

Commit

Permalink
Adding support for storing raw bytes in Kafka Buffer (opensearch-proj…
Browse files Browse the repository at this point in the history
…ect#3519)

* Adding support for storing raw bytes in Kafka Buffer

Signed-off-by: Krishna Kondaka <[email protected]>

* Modified to wait for the send() to finish before returning

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Removed unused imports

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed Kafka integration test

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed json processor check style errors

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments and added a new test case

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments and added a new tests

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Oct 20, 2023
1 parent 53a06bd commit aa81607
Show file tree
Hide file tree
Showing 38 changed files with 531 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ public void writeAll(Collection<T> records, int timeoutInMillis) throws Exceptio
}
}

@Override
public void writeBytes(final byte[] bytes, final String key, int timeoutInMillis) throws Exception {
throw new RuntimeException("not supported");
}

/**
* Records egress and time elapsed metrics, while calling the doRead function to
* do the actual read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@ public interface Buffer<T extends Record<?>> {
*/
void writeAll(Collection<T> records, int timeoutInMillis) throws Exception;

/**
* Atomically writes bytes into the buffer
*
* @param bytes the bytes to be written to the buffer
* @param key key to use when writing to the buffer
* @param timeoutInMillis how long to wait before giving up
* @throws TimeoutException Unable to write to the buffer within the timeout
* @throws SizeOverflowException The number of records exceeds the total capacity of the buffer. This cannot be retried.
* @throws RuntimeException Other exceptions
*/
default void writeBytes(final byte[] bytes, final String key, int timeoutInMillis) throws Exception {
throw new RuntimeException("Not supported");
}

/**
* Retrieves and removes the batch of records from the head of the queue. The batch size is defined/determined by
* the configuration attribute "batch_size" or the @param timeoutInMillis
Expand All @@ -53,12 +67,34 @@ public interface Buffer<T extends Record<?>> {
*/
void checkpoint(CheckpointState checkpointState);

/**
* Checks if the buffer is empty
*
* @return true if the buffer is empty, false otherwise
*/
boolean isEmpty();

/**
* Checks if the buffer supports raw bytes
*
* @return true if the buffer supports raw bytes, false otherwise
*/
default boolean isByteBuffer() {
return false;
}

/**
* Returns buffer's drain timeout as duration
*
* @return buffers drain timeout
*/
default Duration getDrainTimeout() {
return Duration.ZERO;
}

/**
* shuts down the buffer
*/
default void shutdown() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.codec;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.function.Consumer;

public interface ByteDecoder extends Serializable {
/**
* Parses an {@link InputStream}. Implementors should call the {@link Consumer} for each
* {@link Record} loaded from the {@link InputStream}.
*
* @param inputStream The input stream for code to process
* @param eventConsumer The consumer which handles each event from the stream
* @throws IOException throws IOException when invalid input is received or incorrect codec name is provided
*/
void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.codec;

public interface HasByteDecoder {
default ByteDecoder getDecoder() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.codec;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;

public class JsonDecoder implements ByteDecoder {
private final ObjectMapper objectMapper = new ObjectMapper();
private final JsonFactory jsonFactory = new JsonFactory();

public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException {
Objects.requireNonNull(inputStream);
Objects.requireNonNull(eventConsumer);

final JsonParser jsonParser = jsonFactory.createParser(inputStream);

while (!jsonParser.isClosed() && jsonParser.nextToken() != JsonToken.END_OBJECT) {
if (jsonParser.getCurrentToken() == JsonToken.START_ARRAY) {
parseRecordsArray(jsonParser, eventConsumer);
}
}
}

private void parseRecordsArray(final JsonParser jsonParser, final Consumer<Record<Event>> eventConsumer) throws IOException {
while (jsonParser.nextToken() != JsonToken.END_ARRAY) {
final Map<String, Object> innerJson = objectMapper.readValue(jsonParser, Map.class);

final Record<Event> record = createRecord(innerJson);
eventConsumer.accept(record);
}
}

private Record<Event> createRecord(final Map<String, Object> json) {
final JacksonEvent event = (JacksonEvent)JacksonLog.builder()
.withData(json)
.getThis()
.build();

return new Record<>(event);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ public interface PluginFactory {
* @return A new instance of your plugin, configured
* @since 1.2
*/
<T> T loadPlugin(final Class<T> baseClass, final PluginSetting pluginSetting);
<T> T loadPlugin(final Class<T> baseClass, final PluginSetting pluginSetting, final Object ... args);

/**
/**
* Loads a new instance of a plugin with SinkContext.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@

import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.codec.HasByteDecoder;

/**
* Data Prepper source interface. Source acts as receiver of the events that flow
* through the transformation pipeline.
*/
public interface Source<T extends Record<?>> {
public interface Source<T extends Record<?>> extends HasByteDecoder {

/**
* Notifies the source to start writing the records into the buffer
Expand All @@ -35,4 +36,5 @@ public interface Source<T extends Record<?>> {
default boolean areAcknowledgementsEnabled() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,13 @@ public void testCheckpointMetrics() throws Exception {
0.001));
}

@Test
public void testWriteBytes() throws TimeoutException {
final AbstractBuffer<Record<String>> abstractBuffer = new AbstractBufferTimeoutImpl(testPluginSetting);
byte[] bytes = new byte[2];
Assert.assertThrows(RuntimeException.class, () -> abstractBuffer.writeBytes(bytes, "", 10));
}

@Test
public void testWriteTimeoutMetric() throws TimeoutException {
// Given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,21 @@ public void testShutdown() {
final Buffer<Record<Event>> buffer = spy(Buffer.class);
buffer.shutdown();
}

@Test
public void testIsByteBuffer() {
final Buffer<Record<Event>> buffer = spy(Buffer.class);

Assert.assertEquals(false, buffer.isByteBuffer());
}

@Test
public void testWriteBytes() {
final Buffer<Record<Event>> buffer = spy(Buffer.class);

byte[] bytes = new byte[2];
Assert.assertThrows(RuntimeException.class, () -> buffer.writeBytes(bytes, "", 10));

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.codec;

import org.junit.Assert;
import org.junit.jupiter.api.Test;

import static org.mockito.Mockito.spy;

public class HasByteDecoderTest {

@Test
public void testGetDecoder() {
final HasByteDecoder hasByteDecoder = spy(HasByteDecoder.class);

Assert.assertEquals(null, hasByteDecoder.getDecoder());
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.opensearch.dataprepper.model.codec;

import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;

import java.io.ByteArrayInputStream;
import java.util.Map;
import java.util.Random;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertNotEquals;

import org.junit.jupiter.api.BeforeEach;

public class JsonDecoderTest {
private JsonDecoder jsonDecoder;
private Record<Event> receivedRecord;

private JsonDecoder createObjectUnderTest() {
return new JsonDecoder();
}

@BeforeEach
void setup() {
jsonDecoder = createObjectUnderTest();
receivedRecord = null;
}

@Test
void test_basicJsonDecoder() {
String stringValue = UUID.randomUUID().toString();
Random r = new Random();
int intValue = r.nextInt();
String inputString = "[{\"key1\":\""+stringValue+"\", \"key2\":"+intValue+"}]";
try {
jsonDecoder.parse(new ByteArrayInputStream(inputString.getBytes()), (record) -> {
receivedRecord = record;
});
} catch (Exception e){}

assertNotEquals(receivedRecord, null);
Map<String, Object> map = receivedRecord.getData().toMap();
assertThat(map.get("key1"), equalTo(stringValue));
assertThat(map.get("key2"), equalTo(intValue));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,8 @@ private void buildPipelineFromConfiguration(
final Source source = pipelineSource.orElseGet(() ->
pluginFactory.loadPlugin(Source.class, sourceSetting));



LOG.info("Building buffer for the pipeline [{}]", pipelineName);
final Buffer pipelineDefinedBuffer = pluginFactory.loadPlugin(Buffer.class, pipelineConfiguration.getBufferPluginSetting());
final Buffer pipelineDefinedBuffer = pluginFactory.loadPlugin(Buffer.class, pipelineConfiguration.getBufferPluginSetting(), source.getDecoder());

LOG.info("Building processors for the pipeline [{}]", pipelineName);
final int processorThreads = pipelineConfiguration.getWorkers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,30 @@ private ComponentPluginArgumentsContext(final Builder builder) {
}

@Override
public Object[] createArguments(final Class<?>[] parameterTypes) {
public Object[] createArguments(final Class<?>[] parameterTypes, final Object ... args) {
Map<Class<?>, Supplier<Object>> optionalArgumentsSuppliers = new HashMap<>();
for (final Object arg: args) {
if (Objects.nonNull(arg)) {
optionalArgumentsSuppliers.put(arg.getClass(), () -> arg);
for (final Class interfaceClass: arg.getClass().getInterfaces()) {
optionalArgumentsSuppliers.put(interfaceClass, () -> arg);
}
}
}
return Arrays.stream(parameterTypes)
.map(this::getRequiredArgumentSupplier)
.map(parameterType -> getRequiredArgumentSupplier(parameterType, optionalArgumentsSuppliers))
.map(Supplier::get)
.toArray();
}

private Supplier<Object> getRequiredArgumentSupplier(final Class<?> parameterType) {
private Supplier<Object> getRequiredArgumentSupplier(final Class<?> parameterType, Map<Class<?>, Supplier<Object>> optionalArgumentsSuppliers) {
if(typedArgumentsSuppliers.containsKey(parameterType)) {
return typedArgumentsSuppliers.get(parameterType);
}
else if (beanFactory != null) {
} else if(optionalArgumentsSuppliers.containsKey(parameterType)) {
return optionalArgumentsSuppliers.get(parameterType);
} else if (beanFactory != null) {
return createBeanSupplier(parameterType, beanFactory);
}
else {
} else {
throw new InvalidPluginDefinitionException(UNABLE_TO_CREATE_PLUGIN_PARAMETER + parameterType);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ public class DefaultPluginFactory implements PluginFactory {
}

@Override
public <T> T loadPlugin(final Class<T> baseClass, final PluginSetting pluginSetting) {
public <T> T loadPlugin(final Class<T> baseClass, final PluginSetting pluginSetting, final Object ... args) {
final String pluginName = pluginSetting.getName();
final Class<? extends T> pluginClass = getPluginClass(baseClass, pluginName);

final ComponentPluginArgumentsContext constructionContext = getConstructionContext(pluginSetting, pluginClass, null);

return pluginCreator.newPluginInstance(pluginClass, constructionContext, pluginName);
return pluginCreator.newPluginInstance(pluginClass, constructionContext, pluginName, args);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ static String classNameToPluginName(final String className) {

protected static class NoArgumentsArgumentsContext implements PluginArgumentsContext {
@Override
public Object[] createArguments(final Class<?>[] parameterTypes) {
public Object[] createArguments(final Class<?>[] parameterTypes, final Object ... args) {
if(parameterTypes.length != 0) {
throw new InvalidPluginDefinitionException("No arguments are permitted for extensions constructors.");
}
Expand All @@ -90,7 +90,7 @@ protected static class SingleConfigArgumentArgumentsContext implements PluginArg
}

@Override
public Object[] createArguments(Class<?>[] parameterTypes) {
public Object[] createArguments(Class<?>[] parameterTypes, final Object ... args) {
if (parameterTypes.length != 1 && (Objects.nonNull(extensionPluginConfiguration) &&
!parameterTypes[0].equals(extensionPluginConfiguration.getClass()))) {
throw new InvalidPluginDefinitionException(String.format(
Expand Down
Loading

0 comments on commit aa81607

Please sign in to comment.