diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/QueryMessageRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/QueryMessageRequest.java index 3c939222e5f..0183b57e6f2 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/QueryMessageRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/QueryMessageRequest.java @@ -41,7 +41,7 @@ public class QueryMessageRequest { private String streamId; @ApiModelProperty(value = "Message count") - private Integer messageCount = 100; + private Integer messageCount = 10; @ApiModelProperty(value = "Field name") private String fieldName; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java index 3fa9853e93a..de53dfb19c2 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java @@ -52,12 +52,13 @@ public List parseFields(String str, InlongStreamInfo streamInfo) thro if (StringUtils.isNotBlank(streamInfo.getDataEscapeChar())) { escapeChar = streamInfo.getDataEscapeChar().charAt(0); } - String[][] rowValues = SplitUtils.splitCsv(str, separator, escapeChar, '\"', '\n', true); + String[][] rowValues = SplitUtils.splitCsv(str, separator, escapeChar, null, '\n', true); + int fieldIndex = 0; for (int i = 0; i < rowValues.length; i++) { String[] fieldValues = rowValues[i]; for (int j = 0; j < fieldValues.length; j++) { if (i + j < fields.size()) { - fields.get(i + j).setFieldValue(fieldValues[j]); + fields.get(fieldIndex++).setFieldValue(fieldValues[j]); } } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/KvDataTypeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/KvDataTypeOperator.java index 8b0a40af379..900732f906d 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/KvDataTypeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/KvDataTypeOperator.java @@ -62,7 +62,7 @@ public List parseFields(String str, InlongStreamInfo streamInfo) thro lineSeparator = (char) Integer.parseInt(streamInfo.getLineSeparator()); } List> rowValues = - KvUtils.splitKv(str, separator, kvSeparator, escapeChar, '\"', lineSeparator); + KvUtils.splitKv(str, separator, kvSeparator, escapeChar, null, lineSeparator); for (Map row : rowValues) { for (FieldInfo fieldInfo : fields) { fieldInfo.setFieldValue(row.get(fieldInfo.getFieldName())); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java index a7ac1dcf9a1..0e4983214c5 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java @@ -414,7 +414,7 @@ public List queryLatestMessage(PulsarClusterInfo pulsarClusterIn LOGGER.info("begin to query message for topic {}, adminUrl={}", topicFullName, pulsarClusterInfo.getAdminUrl()); List messageList = new ArrayList<>(); int partitionCount = getPartitionCount(pulsarClusterInfo, topicFullName); - for (int messageIndex = 0; messageIndex < 100; messageIndex++) { + for (int messageIndex = 0; messageIndex < request.getMessageCount(); messageIndex++) { int currentPartitionNum = messageIndex % partitionCount; int messagePosition = messageIndex / partitionCount + 1; String topicNameOfPartition = buildTopicNameOfPartition(topicFullName, currentPartitionNum, serial);