From 694546b3b2970cdd6553bce2331e2ffd8e5ea0bf Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" <17555551+Jackie-Jiang@users.noreply.github.com> Date: Fri, 6 Dec 2024 13:40:48 -0800 Subject: [PATCH] Make RecordEnricher extend RecordTransformer (#14601) --- .../flink/sink/FlinkSegmentWriter.java | 7 +- .../realtime/RealtimeSegmentDataManager.java | 15 +- .../framework/SegmentProcessorFramework.java | 10 +- .../processing/mapper/SegmentMapper.java | 7 +- .../DimensionValueTransformer.java | 2 +- .../mergerollup/MergeRollupTaskExecutor.java | 2 +- .../DimensionValueTransformerTest.java | 2 +- .../filebased/FileBasedSegmentWriter.java | 3 +- .../converter/RealtimeSegmentConverter.java | 4 +- .../ComplexTypeTransformer.java | 10 +- .../CompositeTransformer.java | 70 ++++++--- .../DataTypeTransformer.java | 7 + .../ExpressionTransformer.java | 16 +++ .../recordtransformer/FilterTransformer.java | 7 + .../NullValueTransformer.java | 1 + .../SanitizationTransformer.java | 1 + .../SchemaConformingTransformer.java | 1 + .../SchemaConformingTransformerV2.java | 1 + .../SpecialValueTransformer.java | 1 + .../TimeValidationTransformer.java | 1 + .../enricher/clp/CLPEncodingEnricher.java | 8 +- .../clp/CLPEncodingEnricherConfig.java} | 6 +- .../clp/CLPEncodingEnricherFactory.java | 12 +- .../function/CustomFunctionEnricher.java | 10 +- .../CustomFunctionEnricherConfig.java | 7 +- .../CustomFunctionEnricherFactory.java | 10 +- ...RecordReaderSegmentCreationDataSource.java | 9 -- .../segment/creator/TransformPipeline.java | 14 +- .../impl/SegmentIndexCreationDriverImpl.java | 27 +--- .../segment/local/utils/IngestionUtils.java | 133 +++++------------- .../segment/local/utils/TableConfigUtils.java | 4 +- .../RecordTransformerTest.java | 1 + .../SchemaConformingTransformerTest.java | 1 + .../local/utils/IngestionUtilsTest.java | 63 +++++---- .../annotations/RecordEnricherFactory.java | 30 ---- .../recordenricher/RecordEnricherConfig.java | 23 --- .../RecordEnricherPipeline.java | 75 ---------- .../recordtransformer/RecordTransformer.java | 12 +- .../enricher}/RecordEnricher.java | 21 +-- .../enricher}/RecordEnricherFactory.java | 20 ++- .../enricher}/RecordEnricherRegistry.java | 40 +++--- .../RecordEnricherValidationConfig.java | 2 +- 42 files changed, 302 insertions(+), 394 deletions(-) rename pinot-segment-local/src/main/java/org/apache/pinot/{plugin/record => segment/local/recordtransformer}/enricher/clp/CLPEncodingEnricher.java (92%) rename pinot-segment-local/src/main/java/org/apache/pinot/{plugin/record/enricher/clp/ClpEnricherConfig.java => segment/local/recordtransformer/enricher/clp/CLPEncodingEnricherConfig.java} (85%) rename pinot-segment-local/src/main/java/org/apache/pinot/{plugin/record => segment/local/recordtransformer}/enricher/clp/CLPEncodingEnricherFactory.java (80%) rename pinot-segment-local/src/main/java/org/apache/pinot/{plugin/record => segment/local/recordtransformer}/enricher/function/CustomFunctionEnricher.java (86%) rename pinot-segment-local/src/main/java/org/apache/pinot/{plugin/record => segment/local/recordtransformer}/enricher/function/CustomFunctionEnricherConfig.java (89%) rename pinot-segment-local/src/main/java/org/apache/pinot/{plugin/record => segment/local/recordtransformer}/enricher/function/CustomFunctionEnricherFactory.java (87%) delete mode 100644 pinot-spi/src/main/java/org/apache/pinot/spi/annotations/RecordEnricherFactory.java delete mode 100644 pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherConfig.java delete mode 100644 pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherPipeline.java rename {pinot-segment-local/src/main/java/org/apache/pinot/segment/local => pinot-spi/src/main/java/org/apache/pinot/spi}/recordtransformer/RecordTransformer.java (82%) rename pinot-spi/src/main/java/org/apache/pinot/spi/{recordenricher => recordtransformer/enricher}/RecordEnricher.java (65%) rename pinot-spi/src/main/java/org/apache/pinot/spi/{recordenricher => recordtransformer/enricher}/RecordEnricherFactory.java (75%) rename pinot-spi/src/main/java/org/apache/pinot/spi/{recordenricher => recordtransformer/enricher}/RecordEnricherRegistry.java (69%) rename pinot-spi/src/main/java/org/apache/pinot/spi/{recordenricher => recordtransformer/enricher}/RecordEnricherValidationConfig.java (95%) diff --git a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java index f43bd2e67dc6..5468afad9452 100644 --- a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java +++ b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java @@ -42,7 +42,6 @@ import org.apache.pinot.common.utils.TarCompressionUtils; import org.apache.pinot.core.util.SegmentProcessorAvroUtils; import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer; -import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.segment.local.utils.IngestionUtils; import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; import org.apache.pinot.spi.config.table.TableConfig; @@ -54,7 +53,7 @@ import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; import org.apache.pinot.spi.ingestion.batch.spec.Constants; import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; +import org.apache.pinot.spi.recordtransformer.RecordTransformer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,7 +80,6 @@ public class FlinkSegmentWriter implements SegmentWriter { private String _outputDirURI; private Schema _schema; private Set _fieldsToRead; - private RecordEnricherPipeline _recordEnricherPipeline; private RecordTransformer _recordTransformer; private File _stagingDir; @@ -167,7 +165,6 @@ public void init(TableConfig tableConfig, Schema schema, Map bat _schema = schema; _fieldsToRead = _schema.getColumnNames(); - _recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(_tableConfig); _recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema); _avroSchema = SegmentProcessorAvroUtils.convertPinotSchemaToAvroSchema(_schema); _reusableRecord = new GenericData.Record(_avroSchema); @@ -203,7 +200,7 @@ private void resetBuffer() public void collect(GenericRow row) throws IOException { long startTime = System.currentTimeMillis(); - _recordEnricherPipeline.run(row); + // TODO: Revisit whether we should transform the row GenericRow transform = _recordTransformer.transform(row); SegmentProcessorAvroUtils.convertGenericRowToAvroRecord(transform, _reusableRecord, _fieldsToRead); _rowCount++; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 06f46495337b..de0c87e7bb1f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -81,7 +81,6 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.metrics.PinotMeter; import org.apache.pinot.spi.plugin.PluginManager; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; import org.apache.pinot.spi.stream.ConsumerPartitionState; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.MessageBatch; @@ -286,7 +285,6 @@ public void deleteSegmentFile() { private final int _partitionGroupId; private final PartitionGroupConsumptionStatus _partitionGroupConsumptionStatus; final String _clientId; - private final RecordEnricherPipeline _recordEnricherPipeline; private final TransformPipeline _transformPipeline; private PartitionGroupConsumer _partitionGroupConsumer = null; private StreamMetadataProvider _partitionMetadataProvider = null; @@ -623,7 +621,6 @@ private boolean processStreamEvents(MessageBatch messageBatch, long idlePipeSlee _numBytesDropped += rowSizeInBytes; } else { try { - _recordEnricherPipeline.run(decodedRow.getResult()); _transformPipeline.processRow(decodedRow.getResult(), reusedResult); } catch (Exception e) { _numRowsErrored++; @@ -1580,7 +1577,7 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf .setFieldConfigList(tableConfig.getFieldConfigList()); // Create message decoder - Set fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), _schema); + Set fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig, _schema); RetryPolicy retryPolicy = RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f); AtomicReference localStreamDataDecoder = new AtomicReference<>(); try { @@ -1595,20 +1592,20 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf } }); } catch (Exception e) { - _realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), - "Failed to initialize the StreamMessageDecoder", e)); + _realtimeTableDataManager.addSegmentError(_segmentNameStr, + new SegmentErrorInfo(now(), "Failed to initialize the StreamMessageDecoder", e)); throw e; } _streamDataDecoder = localStreamDataDecoder.get(); try { - _recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(tableConfig); + _transformPipeline = new TransformPipeline(tableConfig, schema); } catch (Exception e) { _realtimeTableDataManager.addSegmentError(_segmentNameStr, - new SegmentErrorInfo(now(), "Failed to initialize the RecordEnricherPipeline", e)); + new SegmentErrorInfo(now(), "Failed to initialize the TransformPipeline", e)); throw e; } - _transformPipeline = new TransformPipeline(tableConfig, schema); + // Acquire semaphore to create stream consumers try { _partitionGroupConsumerSemaphore.acquire(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java index 4923d5be94b3..378d79fbabfa 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java @@ -33,7 +33,6 @@ import org.apache.pinot.core.segment.processing.mapper.SegmentMapper; import org.apache.pinot.core.segment.processing.reducer.Reducer; import org.apache.pinot.core.segment.processing.reducer.ReducerFactory; -import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource; import org.apache.pinot.segment.local.segment.creator.TransformPipeline; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; @@ -43,7 +42,7 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderFileConfig; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; +import org.apache.pinot.spi.recordtransformer.RecordTransformer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -164,8 +163,8 @@ private List doProcess() while (nextRecordReaderIndexToBeProcessed < numRecordReaders) { // Initialise the mapper. Eliminate the record readers that have been processed in the previous iterations. - SegmentMapper mapper = getSegmentMapper( - _recordReaderFileConfigs.subList(nextRecordReaderIndexToBeProcessed, numRecordReaders)); + SegmentMapper mapper = + getSegmentMapper(_recordReaderFileConfigs.subList(nextRecordReaderIndexToBeProcessed, numRecordReaders)); // Log start of iteration details only if intermediate file size threshold is set. if (isMapperOutputSizeThresholdEnabled) { @@ -256,7 +255,7 @@ private void doReduce(Map partitionToFileManagerM throws Exception { LOGGER.info("Beginning reduce phase on partitions: {}", partitionToFileManagerMap.keySet()); Consumer observer = _segmentProcessorConfig.getProgressObserver(); - int totalCount = partitionToFileManagerMap.keySet().size(); + int totalCount = partitionToFileManagerMap.size(); int count = 1; for (Map.Entry entry : partitionToFileManagerMap.entrySet()) { String partitionId = entry.getKey(); @@ -316,7 +315,6 @@ private List generateSegment(Map partitionT GenericRowFileRecordReader recordReaderForRange = recordReader.getRecordReaderForRange(startRowId, endRowId); SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); driver.init(generatorConfig, new RecordReaderSegmentCreationDataSource(recordReaderForRange), - RecordEnricherPipeline.getPassThroughPipeline(), TransformPipeline.getPassThroughPipeline()); driver.build(); outputSegmentDirs.add(driver.getOutputDirectory()); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java index 49dfc42431f5..3ec0a539c71c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java @@ -40,7 +40,6 @@ import org.apache.pinot.core.segment.processing.utils.SegmentProcessorUtils; import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer; import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer; -import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.segment.local.segment.creator.TransformPipeline; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.FieldSpec; @@ -48,7 +47,7 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderFileConfig; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; +import org.apache.pinot.spi.recordtransformer.RecordTransformer; import org.apache.pinot.spi.utils.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,7 +68,6 @@ public class SegmentMapper { private final List _fieldSpecs; private final boolean _includeNullFields; private final int _numSortFields; - private final RecordEnricherPipeline _recordEnricherPipeline; private final TransformPipeline _transformPipeline; private final TimeHandler _timeHandler; private final Partitioner[] _partitioners; @@ -103,7 +101,6 @@ public SegmentMapper(List recordReaderFileConfigs, Trans _numSortFields = pair.getRight(); _includeNullFields = schema.isEnableColumnBasedNullHandling() || tableConfig.getIndexingConfig().isNullHandlingEnabled(); - _recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(processorConfig.getTableConfig()); _transformPipeline = transformPipeline; _timeHandler = TimeHandlerFactory.getTimeHandler(processorConfig); List partitionerConfigs = processorConfig.getPartitionerConfigs(); @@ -185,8 +182,6 @@ protected boolean completeMapAndTransformRow(RecordReader recordReader, GenericR while (recordReader.hasNext() && (_adaptiveSizeBasedWriter.canWrite())) { try { reuse = recordReader.next(reuse); - _recordEnricherPipeline.run(reuse); - _transformPipeline.processRow(reuse, reusedResult); for (GenericRow transformedRow : reusedResult.getTransformedRows()) { writeRecord(transformedRow); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/DimensionValueTransformer.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/DimensionValueTransformer.java index 4ec70ff43d47..e631bcf1730f 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/DimensionValueTransformer.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/DimensionValueTransformer.java @@ -21,10 +21,10 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; -import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.recordtransformer.RecordTransformer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java index ce56192f1f31..bebedb3ff2a9 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java @@ -34,11 +34,11 @@ import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor; import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils; import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult; -import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.recordtransformer.RecordTransformer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/DimensionValueTransformerTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/DimensionValueTransformerTest.java index 43b72c271f16..83979431a09e 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/DimensionValueTransformerTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/DimensionValueTransformerTest.java @@ -23,7 +23,6 @@ import java.util.Map; import java.util.Set; import org.apache.pinot.core.common.MinionConstants; -import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.DimensionFieldSpec; @@ -31,6 +30,7 @@ import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.recordtransformer.RecordTransformer; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.annotations.Test; diff --git a/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/main/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriter.java b/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/main/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriter.java index ee0550cac4fd..3601e7610704 100644 --- a/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/main/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriter.java +++ b/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/main/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriter.java @@ -39,7 +39,6 @@ import org.apache.pinot.common.utils.TarCompressionUtils; import org.apache.pinot.core.util.SegmentProcessorAvroUtils; import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer; -import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.segment.local.utils.IngestionUtils; import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; import org.apache.pinot.spi.config.table.TableConfig; @@ -51,6 +50,7 @@ import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; import org.apache.pinot.spi.ingestion.batch.spec.Constants; import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter; +import org.apache.pinot.spi.recordtransformer.RecordTransformer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -141,6 +141,7 @@ private void resetBuffer() @Override public void collect(GenericRow row) throws IOException { + // TODO: Revisit whether we should transform the row GenericRow transform = _recordTransformer.transform(row); SegmentProcessorAvroUtils.convertGenericRowToAvroRecord(transform, _reusableRecord, _fieldsToRead); _recordWriter.append(_reusableRecord); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java index c7e4be93151d..026276515031 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java @@ -37,7 +37,6 @@ import org.apache.pinot.spi.config.table.SegmentZKPropsConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; public class RealtimeSegmentConverter { @@ -108,8 +107,7 @@ public void build(@Nullable SegmentVersion segmentVersion, ServerMetrics serverM recordReader.init(_realtimeSegmentImpl, sortedDocIds); RealtimeSegmentSegmentCreationDataSource dataSource = new RealtimeSegmentSegmentCreationDataSource(_realtimeSegmentImpl, recordReader); - driver.init(genConfig, dataSource, RecordEnricherPipeline.getPassThroughPipeline(), - TransformPipeline.getPassThroughPipeline()); + driver.init(genConfig, dataSource, TransformPipeline.getPassThroughPipeline()); if (!_enableColumnMajor) { driver.build(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformer.java index 2bec0ced4272..d462c7a77fd7 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformer.java @@ -30,6 +30,7 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig; import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.recordtransformer.RecordTransformer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,8 +98,8 @@ public class ComplexTypeTransformer implements RecordTransformer { private final boolean _continueOnError; public ComplexTypeTransformer(TableConfig tableConfig) { - this(parseFieldsToUnnest(tableConfig), parseDelimiter(tableConfig), - parseCollectionNotUnnestedToJson(tableConfig), parsePrefixesToRename(tableConfig), tableConfig); + this(parseFieldsToUnnest(tableConfig), parseDelimiter(tableConfig), parseCollectionNotUnnestedToJson(tableConfig), + parsePrefixesToRename(tableConfig), tableConfig); } @VisibleForTesting @@ -170,6 +171,11 @@ private static Map parsePrefixesToRename(TableConfig tableConfig } } + @Override + public List getInputColumns() { + return _fieldsToUnnest; + } + @Override public GenericRow transform(GenericRow record) { try { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java index a1bfcba52a20..abadfd98fd53 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java @@ -18,16 +18,21 @@ */ package org.apache.pinot.segment.local.recordtransformer; +import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; +import java.util.HashSet; import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; +import java.util.Set; import javax.annotation.Nullable; import org.apache.pinot.segment.local.utils.IngestionUtils; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.recordtransformer.RecordTransformer; +import org.apache.pinot.spi.recordtransformer.enricher.RecordEnricher; +import org.apache.pinot.spi.recordtransformer.enricher.RecordEnricherRegistry; /** @@ -42,8 +47,11 @@ public class CompositeTransformer implements RecordTransformer { *

NOTE: DO NOT CHANGE THE ORDER OF THE RECORD TRANSFORMERS *

    *
  • - * Optional {@link ExpressionTransformer} before everyone else, so that we get the real columns for other - * transformers to work on + * Optional list of {@link RecordEnricher}s to enrich the record before any other transformation. + *
  • + *
  • + * Optional {@link ExpressionTransformer} after enrichers, so that we get the real columns for other transformers + * to work on *
  • *
  • * Optional {@link FilterTransformer} after {@link ExpressionTransformer}, so that we have source as well as @@ -82,12 +90,37 @@ public class CompositeTransformer implements RecordTransformer { *
*/ public static List getDefaultTransformers(TableConfig tableConfig, Schema schema) { - return Stream.of(new ExpressionTransformer(tableConfig, schema), new FilterTransformer(tableConfig), - new SchemaConformingTransformer(tableConfig, schema), - new SchemaConformingTransformerV2(tableConfig, schema), new DataTypeTransformer(tableConfig, schema), - new TimeValidationTransformer(tableConfig, schema), new SpecialValueTransformer(schema), - new NullValueTransformer(tableConfig, schema), new SanitizationTransformer(schema)).filter(t -> !t.isNoOp()) - .collect(Collectors.toList()); + List transformers = new ArrayList<>(); + IngestionConfig ingestionConfig = tableConfig.getIngestionConfig(); + if (ingestionConfig != null) { + List enrichmentConfigs = ingestionConfig.getEnrichmentConfigs(); + if (enrichmentConfigs != null) { + for (EnrichmentConfig enrichmentConfig : enrichmentConfigs) { + try { + addIfNotNoOp(transformers, RecordEnricherRegistry.createRecordEnricher(enrichmentConfig)); + } catch (IOException e) { + throw new RuntimeException("Failed to instantiate record enricher " + enrichmentConfig.getEnricherType(), + e); + } + } + } + } + addIfNotNoOp(transformers, new ExpressionTransformer(tableConfig, schema)); + addIfNotNoOp(transformers, new FilterTransformer(tableConfig)); + addIfNotNoOp(transformers, new SchemaConformingTransformer(tableConfig, schema)); + addIfNotNoOp(transformers, new SchemaConformingTransformerV2(tableConfig, schema)); + addIfNotNoOp(transformers, new DataTypeTransformer(tableConfig, schema)); + addIfNotNoOp(transformers, new TimeValidationTransformer(tableConfig, schema)); + addIfNotNoOp(transformers, new SpecialValueTransformer(schema)); + addIfNotNoOp(transformers, new NullValueTransformer(tableConfig, schema)); + addIfNotNoOp(transformers, new SanitizationTransformer(schema)); + return transformers; + } + + private static void addIfNotNoOp(List transformers, RecordTransformer transformer) { + if (!transformer.isNoOp()) { + transformers.add(transformer); + } } public static CompositeTransformer getDefaultTransformer(TableConfig tableConfig, Schema schema) { @@ -96,10 +129,6 @@ public static CompositeTransformer getDefaultTransformer(TableConfig tableConfig /** * Includes custom and default transformers. - * @param customTransformers - * @param tableConfig - * @param schema - * @return */ public static CompositeTransformer composeAllTransformers(List customTransformers, TableConfig tableConfig, Schema schema) { @@ -112,13 +141,22 @@ public static CompositeTransformer composeAllTransformers(List transformers) { _transformers = transformers; } + @Override + public Set getInputColumns() { + Set inputColumns = new HashSet<>(); + for (RecordTransformer transformer : _transformers) { + inputColumns.addAll(transformer.getInputColumns()); + } + return inputColumns; + } + @Nullable @Override public GenericRow transform(GenericRow record) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java index a91cd9f85c0f..65019549ece2 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java @@ -26,12 +26,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import javax.annotation.Nullable; import org.apache.pinot.common.utils.PinotDataType; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.recordtransformer.RecordTransformer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +63,11 @@ public DataTypeTransformer(TableConfig tableConfig, Schema schema) { } } + @Override + public Set getInputColumns() { + return _dataTypes.keySet(); + } + @Override public GenericRow transform(GenericRow record) { for (Map.Entry entry : _dataTypes.entrySet()) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java index 0f9783851776..c8bbbc71d90a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java @@ -34,6 +34,7 @@ import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.recordtransformer.RecordTransformer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +43,7 @@ * The {@code ExpressionTransformer} class will evaluate the function expressions. *

NOTE: should put this before the {@link DataTypeTransformer}. After this, transformed column can be treated as * regular column for other record transformers. + * TODO: Merge this and CustomFunctionEnricher */ public class ExpressionTransformer implements RecordTransformer { private static final Logger LOGGER = LoggerFactory.getLogger(ExpressionTransformer.class); @@ -112,6 +114,20 @@ public boolean isNoOp() { return _expressionEvaluators.isEmpty(); } + @Override + public Set getInputColumns() { + if (_expressionEvaluators.isEmpty()) { + return Set.of(); + } + Set inputColumns = new HashSet<>(); + for (Map.Entry entry : _expressionEvaluators.entrySet()) { + inputColumns.addAll(entry.getValue().getArguments()); + // NOTE: Add the column itself too, so that if it is already transformed, we won't transform again + inputColumns.add(entry.getKey()); + } + return inputColumns; + } + @Override public GenericRow transform(GenericRow record) { for (Map.Entry entry : _expressionEvaluators.entrySet()) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/FilterTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/FilterTransformer.java index 28bea304d5cd..3835d412e211 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/FilterTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/FilterTransformer.java @@ -18,10 +18,12 @@ */ package org.apache.pinot.segment.local.recordtransformer; +import java.util.List; import org.apache.pinot.segment.local.function.FunctionEvaluator; import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.recordtransformer.RecordTransformer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +54,11 @@ public boolean isNoOp() { return _evaluator == null; } + @Override + public List getInputColumns() { + return _evaluator != null ? _evaluator.getArguments() : List.of(); + } + @Override public GenericRow transform(GenericRow record) { if (_evaluator != null) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/NullValueTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/NullValueTransformer.java index 823f449a00f4..05cf5d60fcd0 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/NullValueTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/NullValueTransformer.java @@ -28,6 +28,7 @@ import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.recordtransformer.RecordTransformer; import org.apache.pinot.spi.utils.TimeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java index eb407fbfc231..c1ab3b24b542 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java @@ -25,6 +25,7 @@ import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.recordtransformer.RecordTransformer; import org.apache.pinot.spi.utils.StringUtil; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java index b9cfdce5a8aa..6a16bdc1cf75 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java @@ -34,6 +34,7 @@ import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.recordtransformer.RecordTransformer; import org.apache.pinot.spi.stream.StreamDataDecoderImpl; import org.apache.pinot.spi.utils.JsonUtils; import org.slf4j.Logger; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerV2.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerV2.java index 78962fd5eea3..8ad1fe980a4c 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerV2.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerV2.java @@ -45,6 +45,7 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.metrics.PinotMeter; +import org.apache.pinot.spi.recordtransformer.RecordTransformer; import org.apache.pinot.spi.stream.StreamDataDecoderImpl; import org.apache.pinot.spi.utils.JsonUtils; import org.slf4j.Logger; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SpecialValueTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SpecialValueTransformer.java index 1075ff349722..be3b3a3abefe 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SpecialValueTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SpecialValueTransformer.java @@ -26,6 +26,7 @@ import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.recordtransformer.RecordTransformer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/TimeValidationTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/TimeValidationTransformer.java index 9a8cd26c2dc0..4a5acc8c931c 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/TimeValidationTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/TimeValidationTransformer.java @@ -26,6 +26,7 @@ import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.recordtransformer.RecordTransformer; import org.apache.pinot.spi.utils.TimeUtils; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricher.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/enricher/clp/CLPEncodingEnricher.java similarity index 92% rename from pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricher.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/enricher/clp/CLPEncodingEnricher.java index 790015d8af31..c1c735c6cc3f 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricher.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/enricher/clp/CLPEncodingEnricher.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.plugin.record.enricher.clp; +package org.apache.pinot.segment.local.recordtransformer.enricher.clp; import com.fasterxml.jackson.databind.JsonNode; import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions; @@ -25,7 +25,7 @@ import java.io.IOException; import java.util.List; import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.recordenricher.RecordEnricher; +import org.apache.pinot.spi.recordtransformer.enricher.RecordEnricher; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.sql.parsers.rewriter.ClpRewriter; import org.slf4j.Logger; @@ -41,12 +41,12 @@ */ public class CLPEncodingEnricher implements RecordEnricher { private static final Logger LOGGER = LoggerFactory.getLogger(CLPEncodingEnricher.class); - private final ClpEnricherConfig _config; + private final CLPEncodingEnricherConfig _config; private final EncodedMessage _clpEncodedMessage; private final MessageEncoder _clpMessageEncoder; public CLPEncodingEnricher(JsonNode enricherProperties) throws IOException { - _config = JsonUtils.jsonNodeToObject(enricherProperties, ClpEnricherConfig.class); + _config = JsonUtils.jsonNodeToObject(enricherProperties, CLPEncodingEnricherConfig.class); _clpEncodedMessage = new EncodedMessage(); _clpMessageEncoder = new MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2, BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/ClpEnricherConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/enricher/clp/CLPEncodingEnricherConfig.java similarity index 85% rename from pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/ClpEnricherConfig.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/enricher/clp/CLPEncodingEnricherConfig.java index 93439602ecb2..40ed7300ccbe 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/ClpEnricherConfig.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/enricher/clp/CLPEncodingEnricherConfig.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.plugin.record.enricher.clp; +package org.apache.pinot.segment.local.recordtransformer.enricher.clp; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -26,11 +26,11 @@ /** * Configuration for the CLP enricher. */ -public class ClpEnricherConfig { +public class CLPEncodingEnricherConfig { private final List _fields; @JsonCreator - public ClpEnricherConfig(@JsonProperty("fields") List fields) { + public CLPEncodingEnricherConfig(@JsonProperty("fields") List fields) { _fields = fields; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricherFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/enricher/clp/CLPEncodingEnricherFactory.java similarity index 80% rename from pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricherFactory.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/enricher/clp/CLPEncodingEnricherFactory.java index cacc4fdbc9b4..b50f774c9516 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricherFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/enricher/clp/CLPEncodingEnricherFactory.java @@ -16,19 +16,21 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.plugin.record.enricher.clp; +package org.apache.pinot.segment.local.recordtransformer.enricher.clp; import com.fasterxml.jackson.databind.JsonNode; import com.google.auto.service.AutoService; import java.io.IOException; -import org.apache.pinot.spi.recordenricher.RecordEnricher; -import org.apache.pinot.spi.recordenricher.RecordEnricherFactory; -import org.apache.pinot.spi.recordenricher.RecordEnricherValidationConfig; +import org.apache.pinot.spi.recordtransformer.enricher.RecordEnricher; +import org.apache.pinot.spi.recordtransformer.enricher.RecordEnricherFactory; +import org.apache.pinot.spi.recordtransformer.enricher.RecordEnricherValidationConfig; import org.apache.pinot.spi.utils.JsonUtils; + @AutoService(RecordEnricherFactory.class) public class CLPEncodingEnricherFactory implements RecordEnricherFactory { private static final String ENRICHER_TYPE = "clpEnricher"; + @Override public String getEnricherType() { return ENRICHER_TYPE; @@ -43,7 +45,7 @@ public RecordEnricher createEnricher(JsonNode enricherProps) @Override public void validateEnrichmentConfig(JsonNode enricherProps, RecordEnricherValidationConfig validationConfig) { try { - ClpEnricherConfig config = JsonUtils.jsonNodeToObject(enricherProps, ClpEnricherConfig.class); + JsonUtils.jsonNodeToObject(enricherProps, CLPEncodingEnricherConfig.class); } catch (IOException e) { throw new IllegalArgumentException("Failed to parse clp enricher config", e); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricher.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/enricher/function/CustomFunctionEnricher.java similarity index 86% rename from pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricher.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/enricher/function/CustomFunctionEnricher.java index 92cf565220d9..2e769e858247 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricher.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/enricher/function/CustomFunctionEnricher.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.pinot.plugin.record.enricher.function; +package org.apache.pinot.segment.local.recordtransformer.enricher.function; import com.fasterxml.jackson.databind.JsonNode; import java.io.IOException; @@ -28,18 +28,20 @@ import org.apache.pinot.segment.local.function.FunctionEvaluator; import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.recordenricher.RecordEnricher; +import org.apache.pinot.spi.recordtransformer.enricher.RecordEnricher; import org.apache.pinot.spi.utils.JsonUtils; /** * Enriches the record with custom functions. + * TODO: Merge this and ExpressionTransformer */ public class CustomFunctionEnricher implements RecordEnricher { - private final Map _fieldToFunctionEvaluator; + private final LinkedHashMap _fieldToFunctionEvaluator; private final List _fieldsToExtract; - public CustomFunctionEnricher(JsonNode enricherProps) throws IOException { + public CustomFunctionEnricher(JsonNode enricherProps) + throws IOException { CustomFunctionEnricherConfig config = JsonUtils.jsonNodeToObject(enricherProps, CustomFunctionEnricherConfig.class); _fieldToFunctionEvaluator = new LinkedHashMap<>(); _fieldsToExtract = new ArrayList<>(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/enricher/function/CustomFunctionEnricherConfig.java similarity index 89% rename from pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherConfig.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/enricher/function/CustomFunctionEnricherConfig.java index cc4270f60185..e1b3bd338e1b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherConfig.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/enricher/function/CustomFunctionEnricherConfig.java @@ -16,12 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.plugin.record.enricher.function; +package org.apache.pinot.segment.local.recordtransformer.enricher.function; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.LinkedHashMap; + /** * Configuration for the custom function enricher. */ @@ -30,8 +31,8 @@ public class CustomFunctionEnricherConfig { @JsonCreator public CustomFunctionEnricherConfig( - @JsonProperty("fieldToFunctionMap") LinkedHashMap columnTofunctionMap) { - _fieldToFunctionMap = columnTofunctionMap; + @JsonProperty("fieldToFunctionMap") LinkedHashMap fieldToFunctionMap) { + _fieldToFunctionMap = fieldToFunctionMap; } public LinkedHashMap getFieldToFunctionMap() { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/enricher/function/CustomFunctionEnricherFactory.java similarity index 87% rename from pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/enricher/function/CustomFunctionEnricherFactory.java index f77308190359..54659a702fe1 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/enricher/function/CustomFunctionEnricherFactory.java @@ -16,20 +16,22 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.plugin.record.enricher.function; +package org.apache.pinot.segment.local.recordtransformer.enricher.function; import com.fasterxml.jackson.databind.JsonNode; import com.google.auto.service.AutoService; import java.io.IOException; import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; -import org.apache.pinot.spi.recordenricher.RecordEnricher; -import org.apache.pinot.spi.recordenricher.RecordEnricherFactory; -import org.apache.pinot.spi.recordenricher.RecordEnricherValidationConfig; +import org.apache.pinot.spi.recordtransformer.enricher.RecordEnricher; +import org.apache.pinot.spi.recordtransformer.enricher.RecordEnricherFactory; +import org.apache.pinot.spi.recordtransformer.enricher.RecordEnricherValidationConfig; import org.apache.pinot.spi.utils.JsonUtils; + @AutoService(RecordEnricherFactory.class) public class CustomFunctionEnricherFactory implements RecordEnricherFactory { private static final String TYPE = "generateColumn"; + @Override public String getEnricherType() { return TYPE; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java index 71915122e0f9..3412a01ad237 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java @@ -25,7 +25,6 @@ import org.apache.pinot.segment.spi.creator.StatsCollectorConfig; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,17 +38,12 @@ public class RecordReaderSegmentCreationDataSource implements SegmentCreationDat private static final Logger LOGGER = LoggerFactory.getLogger(RecordReaderSegmentCreationDataSource.class); private final RecordReader _recordReader; - private RecordEnricherPipeline _recordEnricherPipeline; private TransformPipeline _transformPipeline; public RecordReaderSegmentCreationDataSource(RecordReader recordReader) { _recordReader = recordReader; } - public void setRecordEnricherPipeline(RecordEnricherPipeline recordEnricherPipeline) { - _recordEnricherPipeline = recordEnricherPipeline; - } - public void setTransformPipeline(TransformPipeline transformPipeline) { _transformPipeline = transformPipeline; } @@ -57,8 +51,6 @@ public void setTransformPipeline(TransformPipeline transformPipeline) { @Override public SegmentPreIndexStatsCollector gatherStats(StatsCollectorConfig statsCollectorConfig) { try { - RecordEnricherPipeline recordEnricherPipeline = _recordEnricherPipeline != null ? _recordEnricherPipeline - : RecordEnricherPipeline.fromTableConfig(statsCollectorConfig.getTableConfig()); TransformPipeline transformPipeline = _transformPipeline != null ? _transformPipeline : new TransformPipeline(statsCollectorConfig.getTableConfig(), statsCollectorConfig.getSchema()); @@ -75,7 +67,6 @@ public SegmentPreIndexStatsCollector gatherStats(StatsCollectorConfig statsColle reuse.clear(); try { reuse = _recordReader.next(reuse); - recordEnricherPipeline.run(reuse); transformPipeline.processRow(reuse, reusedResult); for (GenericRow row : reusedResult.getTransformedRows()) { collector.collectRow(row); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java index b72f22226726..b566d4534033 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java @@ -20,16 +20,18 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; import javax.annotation.Nullable; import org.apache.commons.collections4.CollectionUtils; import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer; import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer; -import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.segment.local.utils.IngestionUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.recordtransformer.RecordTransformer; /** @@ -71,6 +73,16 @@ public static TransformPipeline getPassThroughPipeline() { return new TransformPipeline(CompositeTransformer.getPassThroughTransformer(), null); } + public Collection getInputColumns() { + if (_complexTypeTransformer == null) { + return _recordTransformer.getInputColumns(); + } else { + Set inputColumns = new HashSet<>(_recordTransformer.getInputColumns()); + inputColumns.addAll(_complexTypeTransformer.getInputColumns()); + return inputColumns; + } + } + /** * Process and validate the decoded row against schema. * @param decodedRow the row data to pass in diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java index ae7c0e71a263..612b696e27bb 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java @@ -36,8 +36,6 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.io.FileUtils; import org.apache.pinot.segment.local.realtime.converter.stats.RealtimeSegmentSegmentCreationDataSource; -import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer; -import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource; import org.apache.pinot.segment.local.segment.creator.TransformPipeline; import org.apache.pinot.segment.local.segment.index.converter.SegmentFormatConverterFactory; @@ -82,7 +80,6 @@ import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderFactory; import org.apache.pinot.spi.env.PinotConfiguration; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; import org.apache.pinot.spi.utils.ByteArray; import org.apache.pinot.spi.utils.ReadMode; import org.slf4j.Logger; @@ -105,7 +102,6 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive private SegmentIndexCreationInfo _segmentIndexCreationInfo; private SegmentCreationDataSource _dataSource; private Schema _dataSchema; - private RecordEnricherPipeline _recordEnricherPipeline; private TransformPipeline _transformPipeline; private IngestionSchemaValidator _ingestionSchemaValidator; private int _totalDocs = 0; @@ -131,8 +127,8 @@ private RecordReader getRecordReader(SegmentGeneratorConfig segmentGeneratorConf TableConfig tableConfig = segmentGeneratorConfig.getTableConfig(); FileFormat fileFormat = segmentGeneratorConfig.getFormat(); String recordReaderClassName = segmentGeneratorConfig.getRecordReaderPath(); - Set sourceFields = IngestionUtils.getFieldsForRecordExtractor(tableConfig.getIngestionConfig(), - segmentGeneratorConfig.getSchema()); + Set sourceFields = + IngestionUtils.getFieldsForRecordExtractor(tableConfig, segmentGeneratorConfig.getSchema()); // Allow for instantiation general record readers from a record reader path passed into segment generator config // If this is set, this will override the file format @@ -162,21 +158,11 @@ public RecordReader getRecordReader() { public void init(SegmentGeneratorConfig config, RecordReader recordReader) throws Exception { - SegmentCreationDataSource dataSource = new RecordReaderSegmentCreationDataSource(recordReader); - init(config, dataSource, RecordEnricherPipeline.fromTableConfig(config.getTableConfig()), + init(config, new RecordReaderSegmentCreationDataSource(recordReader), new TransformPipeline(config.getTableConfig(), config.getSchema())); } - @Deprecated public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSource, - RecordTransformer recordTransformer, @Nullable ComplexTypeTransformer complexTypeTransformer) - throws Exception { - init(config, dataSource, RecordEnricherPipeline.fromTableConfig(config.getTableConfig()), - new TransformPipeline(recordTransformer, complexTypeTransformer)); - } - - public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSource, - RecordEnricherPipeline enricherPipeline, TransformPipeline transformPipeline) throws Exception { _config = config; @@ -187,11 +173,9 @@ public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSo if (config.isFailOnEmptySegment()) { Preconditions.checkState(_recordReader.hasNext(), "No record in data source"); } - _recordEnricherPipeline = enricherPipeline; _transformPipeline = transformPipeline; // Use the same transform pipeline if the data source is backed by a record reader if (dataSource instanceof RecordReaderSegmentCreationDataSource) { - ((RecordReaderSegmentCreationDataSource) dataSource).setRecordEnricherPipeline(enricherPipeline); ((RecordReaderSegmentCreationDataSource) dataSource).setTransformPipeline(transformPipeline); } @@ -280,12 +264,7 @@ public void build() try { GenericRow decodedRow = _recordReader.next(reuse); long recordReadStartTimeNs = System.nanoTime(); - - // Should not be needed anymore. - // Add row to indexes - _recordEnricherPipeline.run(decodedRow); _transformPipeline.processRow(decodedRow, reusedResult); - recordReadStopTimeNs = System.nanoTime(); _totalRecordReadTimeNs += (recordReadStopTimeNs - recordReadStartTimeNs); } catch (Exception e) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java index d34fd20cb572..3736231324f4 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java @@ -32,9 +32,8 @@ import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.request.context.RequestContextUtils; -import org.apache.pinot.segment.local.function.FunctionEvaluator; -import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer; +import org.apache.pinot.segment.local.segment.creator.TransformPipeline; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; import org.apache.pinot.segment.spi.creator.name.FixedSegmentNameGenerator; @@ -47,14 +46,12 @@ import org.apache.pinot.spi.config.table.ingestion.AggregationConfig; import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig; -import org.apache.pinot.spi.config.table.ingestion.FilterConfig; import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; -import org.apache.pinot.spi.config.table.ingestion.TransformConfig; import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.DateTimeFormatSpec; -import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordExtractor; import org.apache.pinot.spi.data.readers.RecordReaderFactory; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.LocalPinotFS; @@ -66,7 +63,6 @@ import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec; import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; import org.apache.pinot.spi.ingestion.batch.spec.TableSpec; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.spi.utils.retry.AttemptsExceededException; @@ -110,10 +106,9 @@ public static SegmentGeneratorConfig generateSegmentGeneratorConfig(TableConfig BatchIngestionConfig batchIngestionConfig) throws ClassNotFoundException, IOException { Preconditions.checkState(batchIngestionConfig != null && batchIngestionConfig.getBatchConfigMaps() != null - && batchIngestionConfig.getBatchConfigMaps().size() == 1, + && batchIngestionConfig.getBatchConfigMaps().size() == 1, "Must provide batchIngestionConfig and contains exactly 1 batchConfigMap for table: %s, " - + "for generating SegmentGeneratorConfig", - tableConfig.getTableName()); + + "for generating SegmentGeneratorConfig", tableConfig.getTableName()); // apply config override provided by user. BatchConfig batchConfig = @@ -126,8 +121,8 @@ public static SegmentGeneratorConfig generateSegmentGeneratorConfig(TableConfig segmentGeneratorConfig.setOutDir(batchConfig.getOutputDirURI()); // Reader configs - segmentGeneratorConfig - .setRecordReaderPath(RecordReaderFactory.getRecordReaderClassName(batchConfig.getInputFormat().toString())); + segmentGeneratorConfig.setRecordReaderPath( + RecordReaderFactory.getRecordReaderClassName(batchConfig.getInputFormat().toString())); Map recordReaderProps = batchConfig.getRecordReaderProps(); segmentGeneratorConfig.setReaderConfig(RecordReaderFactory.getRecordReaderConfig(batchConfig.getInputFormat(), IngestionConfigUtils.getRecordReaderProps(recordReaderProps))); @@ -181,8 +176,8 @@ private static SegmentNameGenerator getSegmentNameGenerator(BatchConfig batchCon return new UploadedRealtimeSegmentNameGenerator(rawTableName, uploadedRealtimePartitionId, batchConfig.getSegmentUploadTimeMs(), batchConfig.getSegmentNamePrefix(), batchConfig.getSequenceId()); default: - throw new IllegalStateException(String - .format("Unsupported segmentNameGeneratorType: %s for table: %s", segmentNameGeneratorType, + throw new IllegalStateException( + String.format("Unsupported segmentNameGeneratorType: %s for table: %s", segmentNameGeneratorType, tableConfig.getTableName())); } } @@ -209,9 +204,8 @@ public static String buildSegment(SegmentGeneratorConfig segmentGeneratorConfig) public static void uploadSegment(String tableNameWithType, BatchConfig batchConfig, List segmentTarURIs, @Nullable AuthProvider authProvider) throws Exception { - - SegmentGenerationJobSpec segmentUploadSpec = generateSegmentUploadSpec(tableNameWithType, batchConfig, - authProvider); + SegmentGenerationJobSpec segmentUploadSpec = + generateSegmentUploadSpec(tableNameWithType, batchConfig, authProvider); List segmentTarURIStrs = segmentTarURIs.stream().map(URI::toString).collect(Collectors.toList()); String pushMode = batchConfig.getPushMode(); @@ -220,8 +214,8 @@ public static void uploadSegment(String tableNameWithType, BatchConfig batchConf try { SegmentPushUtils.pushSegments(segmentUploadSpec, LOCAL_PINOT_FS, segmentTarURIStrs); } catch (RetriableOperationException | AttemptsExceededException e) { - throw new RuntimeException(String - .format("Caught exception while uploading segments. Push mode: TAR, segment tars: [%s]", + throw new RuntimeException( + String.format("Caught exception while uploading segments. Push mode: TAR, segment tars: [%s]", segmentTarURIStrs), e); } break; @@ -240,8 +234,9 @@ public static void uploadSegment(String tableNameWithType, BatchConfig batchConf } SegmentPushUtils.sendSegmentUris(segmentUploadSpec, segmentUris); } catch (RetriableOperationException | AttemptsExceededException e) { - throw new RuntimeException(String - .format("Caught exception while uploading segments. Push mode: URI, segment URIs: [%s]", segmentUris), e); + throw new RuntimeException( + String.format("Caught exception while uploading segments. Push mode: URI, segment URIs: [%s]", + segmentUris), e); } break; case METADATA: @@ -251,12 +246,13 @@ public static void uploadSegment(String tableNameWithType, BatchConfig batchConf outputSegmentDirURI = URI.create(batchConfig.getOutputSegmentDirURI()); } PinotFS outputFileFS = getOutputPinotFS(batchConfig, outputSegmentDirURI); - Map segmentUriToTarPathMap = SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, - segmentUploadSpec.getPushJobSpec(), segmentTarURIStrs.toArray(new String[0])); + Map segmentUriToTarPathMap = + SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, segmentUploadSpec.getPushJobSpec(), + segmentTarURIStrs.toArray(new String[0])); SegmentPushUtils.sendSegmentUriAndMetadata(segmentUploadSpec, outputFileFS, segmentUriToTarPathMap); } catch (RetriableOperationException | AttemptsExceededException e) { - throw new RuntimeException(String - .format("Caught exception while uploading segments. Push mode: METADATA, segment URIs: [%s]", + throw new RuntimeException( + String.format("Caught exception while uploading segments. Push mode: METADATA, segment URIs: [%s]", segmentTarURIStrs), e); } break; @@ -311,27 +307,34 @@ private static void registerPinotFS(String fileURIScheme, String fsClass, PinotC } /** - * Extracts all fields required by the {@link org.apache.pinot.spi.data.readers.RecordExtractor} from the given - * TableConfig and Schema + * Extracts all fields required by the {@link RecordExtractor} from the given TableConfig and Schema. * Fields for ingestion come from 2 places: * 1. The schema * 2. The ingestion config in the table config. The ingestion config (e.g. filter, complexType) can have fields which * are not in the schema. */ - public static Set getFieldsForRecordExtractor(@Nullable IngestionConfig ingestionConfig, Schema schema) { - Set fieldsForRecordExtractor = new HashSet<>(); - - if (null != ingestionConfig && (null != ingestionConfig.getSchemaConformingTransformerConfig() - || null != ingestionConfig.getSchemaConformingTransformerV2Config())) { + public static Set getFieldsForRecordExtractor(TableConfig tableConfig, Schema schema) { + IngestionConfig ingestionConfig = tableConfig.getIngestionConfig(); + if (ingestionConfig != null && (ingestionConfig.getSchemaConformingTransformerConfig() != null + || ingestionConfig.getSchemaConformingTransformerV2Config() != null)) { // The SchemaConformingTransformer requires that all fields are extracted, indicated by returning an empty set // here. Compared to extracting the fields specified below, extracting all fields should be a superset. - return fieldsForRecordExtractor; + return Set.of(); } - extractFieldsFromIngestionConfig(ingestionConfig, fieldsForRecordExtractor); - extractFieldsFromSchema(schema, fieldsForRecordExtractor); - fieldsForRecordExtractor = getFieldsToReadWithComplexType(fieldsForRecordExtractor, ingestionConfig); - return fieldsForRecordExtractor; + Set fields = new HashSet<>(); + if (ingestionConfig != null) { + List aggregationConfigs = ingestionConfig.getAggregationConfigs(); + if (aggregationConfigs != null) { + for (AggregationConfig aggregationConfig : aggregationConfigs) { + ExpressionContext expressionContext = + RequestContextUtils.getExpression(aggregationConfig.getAggregationFunction()); + expressionContext.getColumns(fields); + } + } + } + fields.addAll(new TransformPipeline(tableConfig, schema).getInputColumns()); + return getFieldsToReadWithComplexType(fields, ingestionConfig); } /** @@ -353,64 +356,6 @@ private static Set getFieldsToReadWithComplexType(Set fieldsToRe return result; } - /** - * Extracts all the fields needed by the {@link org.apache.pinot.spi.data.readers.RecordExtractor} from the given - * Schema - * TODO: for now, we assume that arguments to transform function are in the source i.e. no columns are derived from - * transformed columns - */ - private static void extractFieldsFromSchema(Schema schema, Set fields) { - for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) { - if (!fieldSpec.isVirtualColumn()) { - FunctionEvaluator functionEvaluator = FunctionEvaluatorFactory.getExpressionEvaluator(fieldSpec); - if (functionEvaluator != null) { - fields.addAll(functionEvaluator.getArguments()); - } - fields.add(fieldSpec.getName()); - } - } - } - - /** - * Extracts the fields needed by a RecordExtractor from given {@link IngestionConfig} - */ - private static void extractFieldsFromIngestionConfig(@Nullable IngestionConfig ingestionConfig, Set fields) { - if (ingestionConfig != null) { - FilterConfig filterConfig = ingestionConfig.getFilterConfig(); - if (filterConfig != null) { - String filterFunction = filterConfig.getFilterFunction(); - if (filterFunction != null) { - FunctionEvaluator functionEvaluator = FunctionEvaluatorFactory.getExpressionEvaluator(filterFunction); - fields.addAll(functionEvaluator.getArguments()); - } - } - List aggregationConfigs = ingestionConfig.getAggregationConfigs(); - if (aggregationConfigs != null) { - for (AggregationConfig aggregationConfig : aggregationConfigs) { - ExpressionContext expressionContext = - RequestContextUtils.getExpression(aggregationConfig.getAggregationFunction()); - expressionContext.getColumns(fields); - } - } - - fields.addAll(RecordEnricherPipeline.fromIngestionConfig(ingestionConfig).getColumnsToExtract()); - List transformConfigs = ingestionConfig.getTransformConfigs(); - if (transformConfigs != null) { - for (TransformConfig transformConfig : transformConfigs) { - FunctionEvaluator expressionEvaluator = - FunctionEvaluatorFactory.getExpressionEvaluator(transformConfig.getTransformFunction()); - fields.addAll(expressionEvaluator.getArguments()); - // add the column itself too, so that if it is already transformed, we won't transform again - fields.add(transformConfig.getColumnName()); - } - } - ComplexTypeConfig complexTypeConfig = ingestionConfig.getComplexTypeConfig(); - if (complexTypeConfig != null && complexTypeConfig.getFieldsToUnnest() != null) { - fields.addAll(complexTypeConfig.getFieldsToUnnest()); - } - } - } - /** * Returns false if the record contains key {@link GenericRow#SKIP_RECORD_KEY} with value true */ diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 90bae24cfacd..387f69a44269 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -84,8 +84,8 @@ import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.ingestion.batch.BatchConfig; -import org.apache.pinot.spi.recordenricher.RecordEnricherRegistry; -import org.apache.pinot.spi.recordenricher.RecordEnricherValidationConfig; +import org.apache.pinot.spi.recordtransformer.enricher.RecordEnricherRegistry; +import org.apache.pinot.spi.recordtransformer.enricher.RecordEnricherValidationConfig; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.utils.CommonConstants; diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java index 696f2b1f48bb..eb0eb1217db3 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java @@ -36,6 +36,7 @@ import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.recordtransformer.RecordTransformer; import org.apache.pinot.spi.utils.BytesUtils; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.Assert; diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerTest.java index e9b3ec3d6d97..dc862ef64fab 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerTest.java @@ -37,6 +37,7 @@ import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.recordtransformer.RecordTransformer; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.Assert; import org.testng.annotations.Test; diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/IngestionUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/IngestionUtilsTest.java index 817ce0362a34..1dcdadd283d4 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/IngestionUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/IngestionUtilsTest.java @@ -26,6 +26,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.ingestion.AggregationConfig; import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig; import org.apache.pinot.spi.config.table.ingestion.FilterConfig; @@ -36,6 +38,7 @@ import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.TimeGranularitySpec; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.Assert; import org.testng.annotations.Test; @@ -50,7 +53,7 @@ public class IngestionUtilsTest { */ @Test public void testExtractFieldsSchema() { - + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build(); Schema schema; // from groovy function @@ -58,7 +61,7 @@ public void testExtractFieldsSchema() { DimensionFieldSpec dimensionFieldSpec = new DimensionFieldSpec("d1", FieldSpec.DataType.STRING, true); dimensionFieldSpec.setTransformFunction("Groovy({function}, argument1, argument2)"); schema.addField(dimensionFieldSpec); - List extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(null, schema)); + List extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(tableConfig, schema)); Assert.assertEquals(extract.size(), 3); Assert.assertTrue(extract.containsAll(Arrays.asList("d1", "argument1", "argument2"))); @@ -67,7 +70,7 @@ public void testExtractFieldsSchema() { dimensionFieldSpec = new DimensionFieldSpec("d1", FieldSpec.DataType.STRING, true); dimensionFieldSpec.setTransformFunction("Groovy({function})"); schema.addField(dimensionFieldSpec); - extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(null, schema)); + extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(tableConfig, schema)); Assert.assertEquals(extract.size(), 1); Assert.assertTrue(extract.contains("d1")); @@ -75,7 +78,7 @@ public void testExtractFieldsSchema() { schema = new Schema(); dimensionFieldSpec = new DimensionFieldSpec("map__KEYS", FieldSpec.DataType.INT, false); schema.addField(dimensionFieldSpec); - extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(null, schema)); + extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(tableConfig, schema)); Assert.assertEquals(extract.size(), 2); Assert.assertTrue(extract.containsAll(Arrays.asList("map", "map__KEYS"))); @@ -83,7 +86,7 @@ public void testExtractFieldsSchema() { schema = new Schema(); dimensionFieldSpec = new DimensionFieldSpec("map__VALUES", FieldSpec.DataType.LONG, false); schema.addField(dimensionFieldSpec); - extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(null, schema)); + extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(tableConfig, schema)); Assert.assertEquals(extract.size(), 2); Assert.assertTrue(extract.containsAll(Arrays.asList("map", "map__VALUES"))); @@ -91,7 +94,7 @@ public void testExtractFieldsSchema() { // only incoming schema = new Schema.SchemaBuilder().addTime( new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, "time"), null).build(); - extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(null, schema)); + extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(tableConfig, schema)); Assert.assertEquals(extract.size(), 1); Assert.assertTrue(extract.contains("time")); @@ -99,7 +102,7 @@ public void testExtractFieldsSchema() { schema = new Schema.SchemaBuilder().addTime( new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, "in"), new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, "out")).build(); - extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(null, schema)); + extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(tableConfig, schema)); Assert.assertEquals(extract.size(), 2); Assert.assertTrue(extract.containsAll(Arrays.asList("in", "out"))); @@ -108,7 +111,7 @@ public void testExtractFieldsSchema() { dimensionFieldSpec = new DimensionFieldSpec("hoursSinceEpoch", FieldSpec.DataType.LONG, true); dimensionFieldSpec.setTransformFunction("toEpochHours(\"timestamp\")"); schema.addField(dimensionFieldSpec); - extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(null, schema)); + extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(tableConfig, schema)); Assert.assertEquals(extract.size(), 2); Assert.assertTrue(extract.containsAll(Arrays.asList("timestamp", "hoursSinceEpoch"))); @@ -117,7 +120,7 @@ public void testExtractFieldsSchema() { dimensionFieldSpec = new DimensionFieldSpec("tenMinutesSinceEpoch", FieldSpec.DataType.LONG, true); dimensionFieldSpec.setTransformFunction("toEpochMinutesBucket(\"timestamp\", 10)"); schema.addField(dimensionFieldSpec); - extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(null, schema)); + extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(tableConfig, schema)); Assert.assertEquals(extract.size(), 2); Assert.assertTrue(extract.containsAll(Lists.newArrayList("tenMinutesSinceEpoch", "timestamp"))); @@ -127,24 +130,26 @@ public void testExtractFieldsSchema() { new DateTimeFieldSpec("date", FieldSpec.DataType.STRING, "1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd", "1:DAYS"); dateTimeFieldSpec.setTransformFunction("toDateTime(\"timestamp\", 'yyyy-MM-dd')"); schema.addField(dateTimeFieldSpec); - extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(null, schema)); + extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(tableConfig, schema)); Assert.assertEquals(extract.size(), 2); Assert.assertTrue(extract.containsAll(Lists.newArrayList("date", "timestamp"))); } @Test public void testExtractFieldsIngestionConfig() { + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build(); Schema schema = new Schema(); // filter config IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setFilterConfig(new FilterConfig("Groovy({x > 100}, x)")); - Set fields = IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema); + tableConfig.setIngestionConfig(ingestionConfig); + Set fields = IngestionUtils.getFieldsForRecordExtractor(tableConfig, schema); Assert.assertEquals(fields.size(), 1); Assert.assertTrue(fields.containsAll(Sets.newHashSet("x"))); schema.addField(new DimensionFieldSpec("y", FieldSpec.DataType.STRING, true)); - fields = IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema); + fields = IngestionUtils.getFieldsForRecordExtractor(tableConfig, schema); Assert.assertEquals(fields.size(), 2); Assert.assertTrue(fields.containsAll(Sets.newHashSet("x", "y"))); @@ -153,13 +158,14 @@ public void testExtractFieldsIngestionConfig() { ingestionConfig = new IngestionConfig(); ingestionConfig.setTransformConfigs( Collections.singletonList(new TransformConfig("d1", "Groovy({function}, argument1, argument2)"))); - List extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema)); + tableConfig.setIngestionConfig(ingestionConfig); + List extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(tableConfig, schema)); Assert.assertEquals(extract.size(), 3); Assert.assertTrue(extract.containsAll(Arrays.asList("d1", "argument1", "argument2"))); // groovy function, no arguments ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("d1", "Groovy({function})"))); - extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema)); + extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(tableConfig, schema)); Assert.assertEquals(extract.size(), 1); Assert.assertTrue(extract.contains("d1")); @@ -167,7 +173,7 @@ public void testExtractFieldsIngestionConfig() { schema = new Schema.SchemaBuilder().addSingleValueDimension("hoursSinceEpoch", FieldSpec.DataType.LONG).build(); ingestionConfig.setTransformConfigs( Collections.singletonList(new TransformConfig("hoursSinceEpoch", "toEpochHours(timestampColumn)"))); - extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema)); + extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(tableConfig, schema)); Assert.assertEquals(extract.size(), 2); Assert.assertTrue(extract.containsAll(Arrays.asList("timestampColumn", "hoursSinceEpoch"))); @@ -176,7 +182,7 @@ public void testExtractFieldsIngestionConfig() { new Schema.SchemaBuilder().addSingleValueDimension("tenMinutesSinceEpoch", FieldSpec.DataType.LONG).build(); ingestionConfig.setTransformConfigs(Collections.singletonList( new TransformConfig("tenMinutesSinceEpoch", "toEpochMinutesBucket(timestampColumn, 10)"))); - extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema)); + extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(tableConfig, schema)); Assert.assertEquals(extract.size(), 2); Assert.assertTrue(extract.containsAll(Lists.newArrayList("tenMinutesSinceEpoch", "timestampColumn"))); @@ -185,7 +191,7 @@ public void testExtractFieldsIngestionConfig() { "1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd", "1:DAYS").build(); ingestionConfig.setTransformConfigs( Collections.singletonList(new TransformConfig("dateColumn", "toDateTime(timestampColumn, 'yyyy-MM-dd')"))); - extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema)); + extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(tableConfig, schema)); Assert.assertEquals(extract.size(), 2); Assert.assertTrue(extract.containsAll(Lists.newArrayList("dateColumn", "timestampColumn"))); @@ -195,7 +201,7 @@ public void testExtractFieldsIngestionConfig() { .addDateTime("dateColumn", FieldSpec.DataType.STRING, "1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd", "1:DAYS").build(); schema.getFieldSpecFor("d2").setTransformFunction("reverse(xy)"); ingestionConfig.setFilterConfig(new FilterConfig("Groovy({d1 == \"10\"}, d1)")); - extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema)); + extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(tableConfig, schema)); Assert.assertEquals(extract.size(), 6); Assert.assertTrue(extract.containsAll(Lists.newArrayList("d1", "d2", "m1", "dateColumn", "xy", "timestampColumn"))); @@ -206,7 +212,7 @@ public void testExtractFieldsIngestionConfig() { schema.getFieldSpecFor("d2").setTransformFunction("reverse(xy)"); ingestionConfig.setComplexTypeConfig(new ComplexTypeConfig(Arrays.asList("before.test", "after.test"), ".", ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE, Collections.singletonMap("before", "after"))); - extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema)); + extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(tableConfig, schema)); Assert.assertEquals(extract.size(), 8); List expectedColumns = Arrays.asList("d1", "d2", "m1", "dateColumn", "xy", "timestampColumn", "before", "after"); @@ -216,20 +222,22 @@ public void testExtractFieldsIngestionConfig() { @Test public void testExtractFieldsAggregationConfig() { IngestionConfig ingestionConfig = new IngestionConfig(); + TableConfig tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig).build(); Schema schema = new Schema(); ingestionConfig.setAggregationConfigs(Collections.singletonList(new AggregationConfig("d1", "SUM(s1)"))); - Set fields = IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema); + Set fields = IngestionUtils.getFieldsForRecordExtractor(tableConfig, schema); Assert.assertEquals(fields.size(), 1); Assert.assertTrue(fields.containsAll(Sets.newHashSet("s1"))); ingestionConfig.setAggregationConfigs(Collections.singletonList(new AggregationConfig("d1", "MIN(s1)"))); - fields = IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema); + fields = IngestionUtils.getFieldsForRecordExtractor(tableConfig, schema); Assert.assertEquals(fields.size(), 1); Assert.assertTrue(fields.containsAll(Sets.newHashSet("s1"))); ingestionConfig.setAggregationConfigs(Collections.singletonList(new AggregationConfig("d1", "MAX(s1)"))); - fields = IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema); + fields = IngestionUtils.getFieldsForRecordExtractor(tableConfig, schema); Assert.assertEquals(fields.size(), 1); Assert.assertTrue(fields.containsAll(Sets.newHashSet("s1"))); } @@ -237,15 +245,16 @@ public void testExtractFieldsAggregationConfig() { @Test public void testComplexTypeConfig() { IngestionConfig ingestionConfig = new IngestionConfig(); - ComplexTypeConfig complexTypeConfig = new ComplexTypeConfig(null, "__", - ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE, null); - Schema schema = new Schema(); - + ComplexTypeConfig complexTypeConfig = + new ComplexTypeConfig(null, "__", ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE, null); ingestionConfig.setComplexTypeConfig(complexTypeConfig); + TableConfig tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig).build(); + Schema schema = new Schema(); schema.addField(new DimensionFieldSpec("a_b__c_d", FieldSpec.DataType.STRING, true)); schema.addField(new DimensionFieldSpec("f_d", FieldSpec.DataType.STRING, false)); schema.addField(new DimensionFieldSpec("ab__cd", FieldSpec.DataType.STRING, true)); - Set fields = IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema); + Set fields = IngestionUtils.getFieldsForRecordExtractor(tableConfig, schema); Assert.assertEquals(fields.size(), 3); Assert.assertTrue(fields.containsAll(Sets.newHashSet("a_b", "f_d", "ab"))); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/RecordEnricherFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/RecordEnricherFactory.java deleted file mode 100644 index 5e58c3a3fde1..000000000000 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/RecordEnricherFactory.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.spi.annotations; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - - -@Retention(RetentionPolicy.RUNTIME) -@Target(ElementType.TYPE) -public @interface RecordEnricherFactory { -} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherConfig.java deleted file mode 100644 index adac697bdb6d..000000000000 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherConfig.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.spi.recordenricher; - -public interface RecordEnricherConfig { - void parse(); -} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherPipeline.java b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherPipeline.java deleted file mode 100644 index 5a50d685cafe..000000000000 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherPipeline.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.spi.recordenricher; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig; -import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; -import org.apache.pinot.spi.data.readers.GenericRow; - - -public class RecordEnricherPipeline { - private final List _enrichers = new ArrayList<>(); - private final Set _columnsToExtract = new HashSet<>(); - - public static RecordEnricherPipeline getPassThroughPipeline() { - return new RecordEnricherPipeline(); - } - - public static RecordEnricherPipeline fromIngestionConfig(IngestionConfig ingestionConfig) { - RecordEnricherPipeline pipeline = new RecordEnricherPipeline(); - if (null == ingestionConfig || null == ingestionConfig.getEnrichmentConfigs()) { - return pipeline; - } - List enrichmentConfigs = ingestionConfig.getEnrichmentConfigs(); - for (EnrichmentConfig enrichmentConfig : enrichmentConfigs) { - try { - RecordEnricher enricher = RecordEnricherRegistry.createRecordEnricher(enrichmentConfig); - pipeline.add(enricher); - } catch (IOException e) { - throw new RuntimeException("Failed to instantiate record enricher " + enrichmentConfig.getEnricherType(), e); - } - } - return pipeline; - } - - public static RecordEnricherPipeline fromTableConfig(TableConfig tableConfig) { - return fromIngestionConfig(tableConfig.getIngestionConfig()); - } - - public Set getColumnsToExtract() { - return _columnsToExtract; - } - - public void add(RecordEnricher enricher) { - _enrichers.add(enricher); - _columnsToExtract.addAll(enricher.getInputColumns()); - } - - public void run(GenericRow record) { - for (RecordEnricher enricher : _enrichers) { - enricher.enrich(record); - } - } -} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/recordtransformer/RecordTransformer.java similarity index 82% rename from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java rename to pinot-spi/src/main/java/org/apache/pinot/spi/recordtransformer/RecordTransformer.java index 72065132ae49..e5dae660dc63 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/recordtransformer/RecordTransformer.java @@ -16,9 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.segment.local.recordtransformer; +package org.apache.pinot.spi.recordtransformer; import java.io.Serializable; +import java.util.Collection; +import java.util.List; import javax.annotation.Nullable; import org.apache.pinot.spi.data.readers.GenericRow; @@ -35,6 +37,14 @@ default boolean isNoOp() { return false; } + /** + * Returns the input columns required for the transformer. This is used to make sure the required input fields are + * extracted. + */ + default Collection getInputColumns() { + return List.of(); + } + /** * Transforms a record based on some custom rules. * diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/recordtransformer/enricher/RecordEnricher.java similarity index 65% rename from pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricher.java rename to pinot-spi/src/main/java/org/apache/pinot/spi/recordtransformer/enricher/RecordEnricher.java index 8e48ed71023a..8baca1e6a017 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/recordtransformer/enricher/RecordEnricher.java @@ -16,25 +16,26 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.spi.recordenricher; +package org.apache.pinot.spi.recordtransformer.enricher; -import java.util.List; import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.recordtransformer.RecordTransformer; /** - * Interface for enriching records. - * If a column with the same name as the input column already exists in the record, it will be overwritten. + * Record enricher is a special {@link RecordTransformer} which is applied before other transformers to enrich the + * columns. If a column with the same name as the input column already exists in the record, it will be overwritten. */ -public interface RecordEnricher { - /** - * Returns the list of input columns required for enriching the record. - * This is used to make sure the required input fields are extracted. - */ - List getInputColumns(); +public interface RecordEnricher extends RecordTransformer { /** * Enriches the given record, by adding new columns to the same record. */ void enrich(GenericRow record); + + @Override + default GenericRow transform(GenericRow record) { + enrich(record); + return record; + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/recordtransformer/enricher/RecordEnricherFactory.java similarity index 75% rename from pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherFactory.java rename to pinot-spi/src/main/java/org/apache/pinot/spi/recordtransformer/enricher/RecordEnricherFactory.java index 4b55d0426013..7106698b07a1 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherFactory.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/recordtransformer/enricher/RecordEnricherFactory.java @@ -16,14 +16,30 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.spi.recordenricher; +package org.apache.pinot.spi.recordtransformer.enricher; import com.fasterxml.jackson.databind.JsonNode; import java.io.IOException; +/** + * Factory for {@link RecordEnricher}. + */ public interface RecordEnricherFactory { + + /** + * Returns the type of the enricher. + */ String getEnricherType(); - RecordEnricher createEnricher(JsonNode enricherProps) throws IOException; + + /** + * Creates a new instance of the enricher. + */ + RecordEnricher createEnricher(JsonNode enricherProps) + throws IOException; + + /** + * Validates the enrichment properties. + */ void validateEnrichmentConfig(JsonNode enricherProps, RecordEnricherValidationConfig validationConfig); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherRegistry.java b/pinot-spi/src/main/java/org/apache/pinot/spi/recordtransformer/enricher/RecordEnricherRegistry.java similarity index 69% rename from pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherRegistry.java rename to pinot-spi/src/main/java/org/apache/pinot/spi/recordtransformer/enricher/RecordEnricherRegistry.java index 59c1e6fb348d..68a14889a8db 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherRegistry.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/recordtransformer/enricher/RecordEnricherRegistry.java @@ -16,8 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.spi.recordenricher; +package org.apache.pinot.spi.recordtransformer.enricher; +import com.google.common.base.Preconditions; import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -28,35 +29,32 @@ public class RecordEnricherRegistry { + private RecordEnricherRegistry() { + } + private static final Logger LOGGER = LoggerFactory.getLogger(RecordEnricherRegistry.class); private static final Map RECORD_ENRICHER_FACTORY_MAP = new HashMap<>(); - private RecordEnricherRegistry() { + static { + for (RecordEnricherFactory recordEnricherFactory : ServiceLoader.load(RecordEnricherFactory.class)) { + LOGGER.info("Registered record enricher factory type: {}", recordEnricherFactory.getEnricherType()); + RECORD_ENRICHER_FACTORY_MAP.put(recordEnricherFactory.getEnricherType(), recordEnricherFactory); + } } public static void validateEnrichmentConfig(EnrichmentConfig enrichmentConfig, - RecordEnricherValidationConfig config) { - if (!RECORD_ENRICHER_FACTORY_MAP.containsKey(enrichmentConfig.getEnricherType())) { - throw new IllegalArgumentException("No record enricher found for type: " + enrichmentConfig.getEnricherType()); - } - - RECORD_ENRICHER_FACTORY_MAP.get(enrichmentConfig.getEnricherType()) - .validateEnrichmentConfig(enrichmentConfig.getProperties(), config); + RecordEnricherValidationConfig validationConfig) { + String type = enrichmentConfig.getEnricherType(); + RecordEnricherFactory factory = RECORD_ENRICHER_FACTORY_MAP.get(type); + Preconditions.checkArgument(factory != null, "No record enricher found for type: %s", type); + factory.validateEnrichmentConfig(enrichmentConfig.getProperties(), validationConfig); } public static RecordEnricher createRecordEnricher(EnrichmentConfig enrichmentConfig) throws IOException { - if (!RECORD_ENRICHER_FACTORY_MAP.containsKey(enrichmentConfig.getEnricherType())) { - throw new IllegalArgumentException("No record enricher found for type: " + enrichmentConfig.getEnricherType()); - } - return RECORD_ENRICHER_FACTORY_MAP.get(enrichmentConfig.getEnricherType()) - .createEnricher(enrichmentConfig.getProperties()); - } - - static { - for (RecordEnricherFactory recordEnricherFactory : ServiceLoader.load(RecordEnricherFactory.class)) { - LOGGER.info("Registered record enricher factory type: {}", recordEnricherFactory.getEnricherType()); - RECORD_ENRICHER_FACTORY_MAP.put(recordEnricherFactory.getEnricherType(), recordEnricherFactory); - } + String type = enrichmentConfig.getEnricherType(); + RecordEnricherFactory factory = RECORD_ENRICHER_FACTORY_MAP.get(type); + Preconditions.checkArgument(factory != null, "No record enricher found for type: %s", type); + return factory.createEnricher(enrichmentConfig.getProperties()); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherValidationConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/recordtransformer/enricher/RecordEnricherValidationConfig.java similarity index 95% rename from pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherValidationConfig.java rename to pinot-spi/src/main/java/org/apache/pinot/spi/recordtransformer/enricher/RecordEnricherValidationConfig.java index 86fb74155e59..21e3d20e66ed 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherValidationConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/recordtransformer/enricher/RecordEnricherValidationConfig.java @@ -1,4 +1,4 @@ -package org.apache.pinot.spi.recordenricher; +package org.apache.pinot.spi.recordtransformer.enricher; /** * Licensed to the Apache Software Foundation (ASF) under one