Skip to content

Commit

Permalink
Potentially faster jackson
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Mar 19, 2024
1 parent 410d17c commit 9df58e2
Showing 1 changed file with 5 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
package org.opensearch.dataprepper.plugins.source.s3.codec;

import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.processor.model.datatypes.ocsf.OCSF;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.function.Consumer;

@DataPrepperPlugin(name = "ocsf", pluginType = InputCodec.class, pluginConfigurationType = OCSFCodecConfig.class)
public class OCSFCodec implements InputCodec {
private final ObjectMapper objectMapper = new ObjectMapper();
private static final ObjectReader OBJECT_READER = new ObjectMapper().readerFor(OCSF.class);

@DataPrepperPluginConstructor
public OCSFCodec(final OCSFCodecConfig config) {
Expand All @@ -25,16 +25,7 @@ public OCSFCodec(final OCSFCodecConfig config) {

@Override
public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException {
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
parseBufferedReader(reader, eventConsumer);
}
}

private void parseBufferedReader(final BufferedReader reader, final Consumer<Record<Event>> eventConsumer) throws IOException {
String line;
while ((line = reader.readLine()) != null) {
final OCSF ocsf = objectMapper.readValue(line, OCSF.class);
eventConsumer.accept(new Record<>(ocsf));
}
final MappingIterator<OCSF> mappingIterator = OBJECT_READER.readValues(inputStream);
mappingIterator.forEachRemaining(ocsf -> eventConsumer.accept(new Record<>(ocsf)));
}
}

0 comments on commit 9df58e2

Please sign in to comment.