diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/source/HdfsDynamicTableSource.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/source/HdfsDynamicTableSource.java index 1d712cd6e1..651639e0aa 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/source/HdfsDynamicTableSource.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/source/HdfsDynamicTableSource.java @@ -48,10 +48,17 @@ public class HdfsDynamicTableSource implements ScanTableSource { private final HdfsConf hdfsConf; private final TableSchema tableSchema; + private final List partitionKeyList; public HdfsDynamicTableSource(HdfsConf hdfsConf, TableSchema tableSchema) { + this(hdfsConf, tableSchema, new ArrayList<>()); + } + + public HdfsDynamicTableSource( + HdfsConf hdfsConf, TableSchema tableSchema, List partitionKeyList) { this.hdfsConf = hdfsConf; this.tableSchema = tableSchema; + this.partitionKeyList = partitionKeyList; } @Override @@ -61,10 +68,14 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon String[] fieldNames = tableSchema.getFieldNames(); List columnList = new ArrayList<>(fieldNames.length); for (int i = 0; i < fieldNames.length; i++) { + String fieldName = fieldNames[i]; FieldConf field = new FieldConf(); - field.setName(fieldNames[i]); + field.setName(fieldName); field.setType(rowType.getTypeAt(i).asSummaryString()); field.setIndex(i); + if (partitionKeyList.contains(fieldName)) { + field.setPart(true); + } columnList.add(field); } hdfsConf.setColumn(columnList); diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/table/HdfsDynamicTableFactory.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/table/HdfsDynamicTableFactory.java index bece47db28..d48d2c22a8 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/table/HdfsDynamicTableFactory.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/table/HdfsDynamicTableFactory.java @@ -36,6 +36,7 @@ import org.apache.flink.table.utils.TableSchemaUtils; import java.util.HashSet; +import java.util.List; import java.util.Set; /** @@ -94,11 +95,12 @@ public DynamicTableSource createDynamicTableSource(Context context) { TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); HdfsConf hdfsConf = getHdfsConf(config); + List partitionKeys = context.getCatalogTable().getPartitionKeys(); hdfsConf.setParallelism(config.get(SourceOptions.SCAN_PARALLELISM)); hdfsConf.setHadoopConfig( HdfsOptions.getHadoopConfig(context.getCatalogTable().getOptions())); - return new HdfsDynamicTableSource(hdfsConf, physicalSchema); + return new HdfsDynamicTableSource(hdfsConf, physicalSchema, partitionKeys); } @Override