diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/codec/OCSFCodec.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/codec/OCSFCodec.java index 3fb31b2226..0d963f0135 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/codec/OCSFCodec.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/codec/OCSFCodec.java @@ -1,6 +1,8 @@ package org.opensearch.dataprepper.plugins.source.s3.codec; +import com.fasterxml.jackson.databind.MappingIterator; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.codec.InputCodec; @@ -8,15 +10,13 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.processor.model.datatypes.ocsf.OCSF; -import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; import java.util.function.Consumer; @DataPrepperPlugin(name = "ocsf", pluginType = InputCodec.class, pluginConfigurationType = OCSFCodecConfig.class) public class OCSFCodec implements InputCodec { - private final ObjectMapper objectMapper = new ObjectMapper(); + private static final ObjectReader OBJECT_READER = new ObjectMapper().readerFor(OCSF.class); @DataPrepperPluginConstructor public OCSFCodec(final OCSFCodecConfig config) { @@ -25,16 +25,7 @@ public OCSFCodec(final OCSFCodecConfig config) { @Override public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { - try (final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { - parseBufferedReader(reader, eventConsumer); - } - } - - private void parseBufferedReader(final BufferedReader reader, final Consumer> eventConsumer) throws IOException { - String line; - while ((line = reader.readLine()) != null) { - final OCSF ocsf = objectMapper.readValue(line, OCSF.class); - eventConsumer.accept(new Record<>(ocsf)); - } + final MappingIterator mappingIterator = OBJECT_READER.readValues(inputStream); + mappingIterator.forEachRemaining(ocsf -> eventConsumer.accept(new Record<>(ocsf))); } }