Skip to content

Commit

Permalink
Add schema as input to the decoder. (apache#12981)
Browse files Browse the repository at this point in the history
  • Loading branch information
rseetham authored Apr 24, 2024
1 parent 2ca6666 commit 099a86c
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.metrics.PinotMeter;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.apache.pinot.spi.stream.ConsumerPartitionState;
import org.apache.pinot.spi.stream.LongMsgOffset;
Expand All @@ -91,7 +92,6 @@
import org.apache.pinot.spi.stream.StreamDataDecoder;
import org.apache.pinot.spi.stream.StreamDataDecoderImpl;
import org.apache.pinot.spi.stream.StreamDataDecoderResult;
import org.apache.pinot.spi.stream.StreamDecoderProvider;
import org.apache.pinot.spi.stream.StreamMessage;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
Expand Down Expand Up @@ -1505,7 +1505,7 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf
// Create message decoder
Set<String> fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), _schema);
try {
StreamMessageDecoder streamMessageDecoder = StreamDecoderProvider.create(_streamConfig, fieldsToRead);
StreamMessageDecoder streamMessageDecoder = createMessageDecoder(fieldsToRead);
_streamDataDecoder = new StreamDataDecoderImpl(streamMessageDecoder);
} catch (Exception e) {
_realtimeTableDataManager.addSegmentError(_segmentNameStr,
Expand Down Expand Up @@ -1780,6 +1780,26 @@ private void updateCurrentDocumentCountMetrics() {
}
}

/**
* Creates a {@link StreamMessageDecoder} using properties in {@link StreamConfig}.
*
* @param streamConfig The stream config from the table config
* @param fieldsToRead The fields to read from the source stream
* @return The initialized StreamMessageDecoder
*/
private StreamMessageDecoder createMessageDecoder(Set<String> fieldsToRead) {
String decoderClass = _streamConfig.getDecoderClass();
try {
Map<String, String> decoderProperties = _streamConfig.getDecoderProperties();
StreamMessageDecoder decoder = PluginManager.get().createInstance(decoderClass);
decoder.init(fieldsToRead, _streamConfig, _tableConfig, _schema);
return decoder;
} catch (Exception e) {
throw new RuntimeException(
"Caught exception while creating StreamMessageDecoder from stream config: " + _streamConfig, e);
}
}

@Override
public MutableSegment getSegment() {
return _realtimeSegment;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import javax.annotation.Nullable;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.annotations.InterfaceStability;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;


/**
* Interface for a decoder of messages fetched from the stream
* @param <T>
Expand All @@ -46,8 +47,22 @@ public interface StreamMessageDecoder<T> {
* @param topicName Topic name of the stream
* @throws Exception If an error occurs
*/
void init(Map<String, String> props, Set<String> fieldsToRead, String topicName)
throws Exception;
default void init(Map<String, String> props, Set<String> fieldsToRead, String topicName)
throws Exception {
throw new UnsupportedOperationException("init method not implemented");
}

/**
* Initializes the decoder.
* @param streamConfig Can be derived from tableConfig but is passed explicitly to avoid redundant computation
* @param tableConfig Table Config of the table
* @param schema Schema of the table
* @throws Exception
*/
default void init(Set<String> fieldsToRead, StreamConfig streamConfig, TableConfig tableConfig, Schema schema)
throws Exception {
init(streamConfig.getDecoderProperties(), fieldsToRead, streamConfig.getTopicName());
}

/**
* Decodes a row.
Expand Down

0 comments on commit 099a86c

Please sign in to comment.