diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java index 7a2d34ea..faa462b0 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java @@ -72,12 +72,7 @@ public DorisFlightSqlReader(DorisReaderPartition partition) throws Exception { throw new DorisException("init adbc connection failed", e); } } - String tableIdentifier = config.getValue(DorisOptions.DORIS_TABLE_IDENTIFIER); - String[] arr = tableIdentifier.split("\\."); - - Schema tableSchema = frontendClient.getTableSchema(arr[0], arr[1]); - this.schema = processDorisSchema(partition, tableSchema); - log.debug("origin flight sql read Schema: " + tableSchema + ", processed schema: " + schema); + this.schema = processDorisSchema(partition); this.arrowReader = executeQuery(); } @@ -153,7 +148,7 @@ protected String generateQuerySql(DorisReaderPartition partition) throws OptionR return String.format("SELECT %s FROM %s %s%s%s", columns, fullTableName, tablets, predicates, limit); } - protected Schema processDorisSchema(DorisReaderPartition partition, final Schema originSchema) throws Exception { + protected Schema processDorisSchema(DorisReaderPartition partition) throws Exception { Schema processedSchema = new Schema(); Schema tableSchema = frontendClient.getTableSchema(partition.getDatabase(), partition.getTable()); Map fieldTypeMap = tableSchema.getProperties().stream() @@ -170,7 +165,12 @@ protected Schema processDorisSchema(DorisReaderPartition partition, final Schema newFieldList.add(new Field(realColumn, TPrimitiveType.VARCHAR.name(), null, 0, 0, null)); } } else { - newFieldList.add(fieldTypeMap.get(readColumn.trim().replaceAll("`", ""))); + String colName = readColumn.trim().replaceAll("`", ""); + if ("JSON".equalsIgnoreCase(fieldTypeMap.get(colName).getType())) { + newFieldList.add(new Field(colName, TPrimitiveType.JSONB.name(), null, 0, 0, null)); + } else { + newFieldList.add(fieldTypeMap.get(colName)); + } } } processedSchema.setProperties(newFieldList);