Skip to content

Commit

Permalink
Make RecordEnricher extend RecordTransformer (#14601)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Dec 6, 2024
1 parent a506df3 commit 694546b
Show file tree
Hide file tree
Showing 42 changed files with 302 additions and 394 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.core.util.SegmentProcessorAvroUtils;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.segment.local.utils.IngestionUtils;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.spi.config.table.TableConfig;
Expand All @@ -54,7 +53,7 @@
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.apache.pinot.spi.recordtransformer.RecordTransformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -81,7 +80,6 @@ public class FlinkSegmentWriter implements SegmentWriter {
private String _outputDirURI;
private Schema _schema;
private Set<String> _fieldsToRead;
private RecordEnricherPipeline _recordEnricherPipeline;
private RecordTransformer _recordTransformer;

private File _stagingDir;
Expand Down Expand Up @@ -167,7 +165,6 @@ public void init(TableConfig tableConfig, Schema schema, Map<String, String> bat

_schema = schema;
_fieldsToRead = _schema.getColumnNames();
_recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(_tableConfig);
_recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
_avroSchema = SegmentProcessorAvroUtils.convertPinotSchemaToAvroSchema(_schema);
_reusableRecord = new GenericData.Record(_avroSchema);
Expand Down Expand Up @@ -203,7 +200,7 @@ private void resetBuffer()
public void collect(GenericRow row)
throws IOException {
long startTime = System.currentTimeMillis();
_recordEnricherPipeline.run(row);
// TODO: Revisit whether we should transform the row
GenericRow transform = _recordTransformer.transform(row);
SegmentProcessorAvroUtils.convertGenericRowToAvroRecord(transform, _reusableRecord, _fieldsToRead);
_rowCount++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.metrics.PinotMeter;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.apache.pinot.spi.stream.ConsumerPartitionState;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
Expand Down Expand Up @@ -286,7 +285,6 @@ public void deleteSegmentFile() {
private final int _partitionGroupId;
private final PartitionGroupConsumptionStatus _partitionGroupConsumptionStatus;
final String _clientId;
private final RecordEnricherPipeline _recordEnricherPipeline;
private final TransformPipeline _transformPipeline;
private PartitionGroupConsumer _partitionGroupConsumer = null;
private StreamMetadataProvider _partitionMetadataProvider = null;
Expand Down Expand Up @@ -623,7 +621,6 @@ private boolean processStreamEvents(MessageBatch messageBatch, long idlePipeSlee
_numBytesDropped += rowSizeInBytes;
} else {
try {
_recordEnricherPipeline.run(decodedRow.getResult());
_transformPipeline.processRow(decodedRow.getResult(), reusedResult);
} catch (Exception e) {
_numRowsErrored++;
Expand Down Expand Up @@ -1580,7 +1577,7 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf
.setFieldConfigList(tableConfig.getFieldConfigList());

// Create message decoder
Set<String> fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), _schema);
Set<String> fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig, _schema);
RetryPolicy retryPolicy = RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f);
AtomicReference<StreamDataDecoder> localStreamDataDecoder = new AtomicReference<>();
try {
Expand All @@ -1595,20 +1592,20 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf
}
});
} catch (Exception e) {
_realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(),
"Failed to initialize the StreamMessageDecoder", e));
_realtimeTableDataManager.addSegmentError(_segmentNameStr,
new SegmentErrorInfo(now(), "Failed to initialize the StreamMessageDecoder", e));
throw e;
}
_streamDataDecoder = localStreamDataDecoder.get();

try {
_recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(tableConfig);
_transformPipeline = new TransformPipeline(tableConfig, schema);
} catch (Exception e) {
_realtimeTableDataManager.addSegmentError(_segmentNameStr,
new SegmentErrorInfo(now(), "Failed to initialize the RecordEnricherPipeline", e));
new SegmentErrorInfo(now(), "Failed to initialize the TransformPipeline", e));
throw e;
}
_transformPipeline = new TransformPipeline(tableConfig, schema);

// Acquire semaphore to create stream consumers
try {
_partitionGroupConsumerSemaphore.acquire();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.pinot.core.segment.processing.mapper.SegmentMapper;
import org.apache.pinot.core.segment.processing.reducer.Reducer;
import org.apache.pinot.core.segment.processing.reducer.ReducerFactory;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
Expand All @@ -43,7 +42,7 @@
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.apache.pinot.spi.recordtransformer.RecordTransformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -164,8 +163,8 @@ private List<File> doProcess()

while (nextRecordReaderIndexToBeProcessed < numRecordReaders) {
// Initialise the mapper. Eliminate the record readers that have been processed in the previous iterations.
SegmentMapper mapper = getSegmentMapper(
_recordReaderFileConfigs.subList(nextRecordReaderIndexToBeProcessed, numRecordReaders));
SegmentMapper mapper =
getSegmentMapper(_recordReaderFileConfigs.subList(nextRecordReaderIndexToBeProcessed, numRecordReaders));

// Log start of iteration details only if intermediate file size threshold is set.
if (isMapperOutputSizeThresholdEnabled) {
Expand Down Expand Up @@ -256,7 +255,7 @@ private void doReduce(Map<String, GenericRowFileManager> partitionToFileManagerM
throws Exception {
LOGGER.info("Beginning reduce phase on partitions: {}", partitionToFileManagerMap.keySet());
Consumer<Object> observer = _segmentProcessorConfig.getProgressObserver();
int totalCount = partitionToFileManagerMap.keySet().size();
int totalCount = partitionToFileManagerMap.size();
int count = 1;
for (Map.Entry<String, GenericRowFileManager> entry : partitionToFileManagerMap.entrySet()) {
String partitionId = entry.getKey();
Expand Down Expand Up @@ -316,7 +315,6 @@ private List<File> generateSegment(Map<String, GenericRowFileManager> partitionT
GenericRowFileRecordReader recordReaderForRange = recordReader.getRecordReaderForRange(startRowId, endRowId);
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
driver.init(generatorConfig, new RecordReaderSegmentCreationDataSource(recordReaderForRange),
RecordEnricherPipeline.getPassThroughPipeline(),
TransformPipeline.getPassThroughPipeline());
driver.build();
outputSegmentDirs.add(driver.getOutputDirectory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,14 @@
import org.apache.pinot.core.segment.processing.utils.SegmentProcessorUtils;
import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.apache.pinot.spi.recordtransformer.RecordTransformer;
import org.apache.pinot.spi.utils.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -69,7 +68,6 @@ public class SegmentMapper {
private final List<FieldSpec> _fieldSpecs;
private final boolean _includeNullFields;
private final int _numSortFields;
private final RecordEnricherPipeline _recordEnricherPipeline;
private final TransformPipeline _transformPipeline;
private final TimeHandler _timeHandler;
private final Partitioner[] _partitioners;
Expand Down Expand Up @@ -103,7 +101,6 @@ public SegmentMapper(List<RecordReaderFileConfig> recordReaderFileConfigs, Trans
_numSortFields = pair.getRight();
_includeNullFields =
schema.isEnableColumnBasedNullHandling() || tableConfig.getIndexingConfig().isNullHandlingEnabled();
_recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(processorConfig.getTableConfig());
_transformPipeline = transformPipeline;
_timeHandler = TimeHandlerFactory.getTimeHandler(processorConfig);
List<PartitionerConfig> partitionerConfigs = processorConfig.getPartitionerConfigs();
Expand Down Expand Up @@ -185,8 +182,6 @@ protected boolean completeMapAndTransformRow(RecordReader recordReader, GenericR
while (recordReader.hasNext() && (_adaptiveSizeBasedWriter.canWrite())) {
try {
reuse = recordReader.next(reuse);
_recordEnricherPipeline.run(reuse);

_transformPipeline.processRow(reuse, reusedResult);
for (GenericRow transformedRow : reusedResult.getTransformedRows()) {
writeRecord(transformedRow);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.recordtransformer.RecordTransformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@
import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.recordtransformer.RecordTransformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
import java.util.Map;
import java.util.Set;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.recordtransformer.RecordTransformer;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.core.util.SegmentProcessorAvroUtils;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.segment.local.utils.IngestionUtils;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.spi.config.table.TableConfig;
Expand All @@ -51,6 +50,7 @@
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter;
import org.apache.pinot.spi.recordtransformer.RecordTransformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -141,6 +141,7 @@ private void resetBuffer()
@Override
public void collect(GenericRow row)
throws IOException {
// TODO: Revisit whether we should transform the row
GenericRow transform = _recordTransformer.transform(row);
SegmentProcessorAvroUtils.convertGenericRowToAvroRecord(transform, _reusableRecord, _fieldsToRead);
_recordWriter.append(_reusableRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;


public class RealtimeSegmentConverter {
Expand Down Expand Up @@ -108,8 +107,7 @@ public void build(@Nullable SegmentVersion segmentVersion, ServerMetrics serverM
recordReader.init(_realtimeSegmentImpl, sortedDocIds);
RealtimeSegmentSegmentCreationDataSource dataSource =
new RealtimeSegmentSegmentCreationDataSource(_realtimeSegmentImpl, recordReader);
driver.init(genConfig, dataSource, RecordEnricherPipeline.getPassThroughPipeline(),
TransformPipeline.getPassThroughPipeline());
driver.init(genConfig, dataSource, TransformPipeline.getPassThroughPipeline());

if (!_enableColumnMajor) {
driver.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.recordtransformer.RecordTransformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -97,8 +98,8 @@ public class ComplexTypeTransformer implements RecordTransformer {
private final boolean _continueOnError;

public ComplexTypeTransformer(TableConfig tableConfig) {
this(parseFieldsToUnnest(tableConfig), parseDelimiter(tableConfig),
parseCollectionNotUnnestedToJson(tableConfig), parsePrefixesToRename(tableConfig), tableConfig);
this(parseFieldsToUnnest(tableConfig), parseDelimiter(tableConfig), parseCollectionNotUnnestedToJson(tableConfig),
parsePrefixesToRename(tableConfig), tableConfig);
}

@VisibleForTesting
Expand Down Expand Up @@ -170,6 +171,11 @@ private static Map<String, String> parsePrefixesToRename(TableConfig tableConfig
}
}

@Override
public List<String> getInputColumns() {
return _fieldsToUnnest;
}

@Override
public GenericRow transform(GenericRow record) {
try {
Expand Down
Loading

0 comments on commit 694546b

Please sign in to comment.