Skip to content

Commit

Permalink
fix flight sql read json type
Browse files Browse the repository at this point in the history
  • Loading branch information
gnehil committed Feb 8, 2025
1 parent 3c3df39 commit 8f3e93b
Showing 1 changed file with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,7 @@ public DorisFlightSqlReader(DorisReaderPartition partition) throws Exception {
throw new DorisException("init adbc connection failed", e);
}
}
String tableIdentifier = config.getValue(DorisOptions.DORIS_TABLE_IDENTIFIER);
String[] arr = tableIdentifier.split("\\.");

Schema tableSchema = frontendClient.getTableSchema(arr[0], arr[1]);
this.schema = processDorisSchema(partition, tableSchema);
log.debug("origin flight sql read Schema: " + tableSchema + ", processed schema: " + schema);
this.schema = processDorisSchema(partition);
this.arrowReader = executeQuery();
}

Expand Down Expand Up @@ -153,7 +148,7 @@ protected String generateQuerySql(DorisReaderPartition partition) throws OptionR
return String.format("SELECT %s FROM %s %s%s%s", columns, fullTableName, tablets, predicates, limit);
}

protected Schema processDorisSchema(DorisReaderPartition partition, final Schema originSchema) throws Exception {
protected Schema processDorisSchema(DorisReaderPartition partition) throws Exception {
Schema processedSchema = new Schema();
Schema tableSchema = frontendClient.getTableSchema(partition.getDatabase(), partition.getTable());
Map<String, Field> fieldTypeMap = tableSchema.getProperties().stream()
Expand All @@ -170,7 +165,12 @@ protected Schema processDorisSchema(DorisReaderPartition partition, final Schema
newFieldList.add(new Field(realColumn, TPrimitiveType.VARCHAR.name(), null, 0, 0, null));
}
} else {
newFieldList.add(fieldTypeMap.get(readColumn.trim().replaceAll("`", "")));
String colName = readColumn.trim().replaceAll("`", "");
if ("JSON".equalsIgnoreCase(fieldTypeMap.get(colName).getType())) {
newFieldList.add(new Field(colName, TPrimitiveType.JSONB.name(), null, 0, 0, null));
} else {
newFieldList.add(fieldTypeMap.get(colName));
}
}
}
processedSchema.setProperties(newFieldList);
Expand Down

0 comments on commit 8f3e93b

Please sign in to comment.