diff --git a/build.gradle b/build.gradle index 63c6670..f9589a8 100644 --- a/build.gradle +++ b/build.gradle @@ -56,7 +56,7 @@ dependencies { compileOnly 'org.slf4j:slf4j-api' compileOnly 'ch.qos.logback:logback-classic' - api 'de.siegmar:fastcsv:2.2.2' + api 'de.siegmar:fastcsv:3.2.0' api 'org.apache.avro:avro:1.11.3' api group: 'org.apache.parquet', name: 'parquet-avro', version: '1.14.0' api group: 'org.apache.poi', name: 'poi', version: '5.2.5' diff --git a/src/main/java/io/kestra/plugin/serdes/csv/CsvToIon.java b/src/main/java/io/kestra/plugin/serdes/csv/CsvToIon.java index 9105ce8..dfbd69e 100644 --- a/src/main/java/io/kestra/plugin/serdes/csv/CsvToIon.java +++ b/src/main/java/io/kestra/plugin/serdes/csv/CsvToIon.java @@ -1,5 +1,6 @@ package io.kestra.plugin.serdes.csv; +import de.siegmar.fastcsv.reader.CsvRecord; import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.swagger.v3.oas.annotations.media.Schema; @@ -115,16 +116,16 @@ public Output run(RunContext runContext) throws Exception { AtomicInteger skipped = new AtomicInteger(); try ( - de.siegmar.fastcsv.reader.CsvReader csvReader = this.csvReader(new InputStreamReader(runContext.storage().getFile(from), charset)); + de.siegmar.fastcsv.reader.CsvReader csvReader = this.csvReader(new InputStreamReader(runContext.storage().getFile(from), charset)); OutputStream output = new FileOutputStream(tempFile); ) { Map headers = new TreeMap<>(); Flux flowable = Flux .fromIterable(csvReader) - .filter(csvRow -> { - if (header && csvRow.getOriginalLineNumber() == 1) { - for (int i = 0; i < csvRow.getFieldCount(); i++) { - headers.put(i, csvRow.getField(i)); + .filter(csvRecord -> { + if (header && csvRecord.getStartingLineNumber() == 1) { + for (int i = 0; i < csvRecord.getFieldCount(); i++) { + headers.put(i, csvRecord.getField(i)); } return false; } @@ -170,7 +171,7 @@ public static class Output implements io.kestra.core.models.tasks.Output { private URI uri; } - private de.siegmar.fastcsv.reader.CsvReader csvReader(InputStreamReader inputStreamReader) { + private de.siegmar.fastcsv.reader.CsvReader csvReader(InputStreamReader inputStreamReader) { var builder = de.siegmar.fastcsv.reader.CsvReader.builder(); if (this.textDelimiter != null) { @@ -182,13 +183,13 @@ private de.siegmar.fastcsv.reader.CsvReader csvReader(InputStreamReader inputStr } if (this.skipEmptyRows != null) { - builder.skipEmptyRows(skipEmptyRows); + builder.skipEmptyLines(skipEmptyRows); } if (this.errorOnDifferentFieldCount != null) { - builder.errorOnDifferentFieldCount(errorOnDifferentFieldCount); + builder.ignoreDifferentFieldCount(!errorOnDifferentFieldCount); } - return builder.build(inputStreamReader); + return builder.ofCsvRecord(inputStreamReader); } } diff --git a/src/main/java/io/kestra/plugin/serdes/csv/IonToCsv.java b/src/main/java/io/kestra/plugin/serdes/csv/IonToCsv.java index 756a503..f03dce1 100644 --- a/src/main/java/io/kestra/plugin/serdes/csv/IonToCsv.java +++ b/src/main/java/io/kestra/plugin/serdes/csv/IonToCsv.java @@ -1,7 +1,7 @@ package io.kestra.plugin.serdes.csv; import de.siegmar.fastcsv.writer.LineDelimiter; -import de.siegmar.fastcsv.writer.QuoteStrategy; +import de.siegmar.fastcsv.writer.QuoteStrategies; import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.plugin.serdes.AbstractTextWriter; @@ -148,21 +148,21 @@ public void accept(Object row) { throw new IllegalArgumentException("Invalid data of type List with header"); } - var rows = casted.stream().map(field -> convert(field)).toList(); - csvWriter.writeRow(rows); + var record = casted.stream().map(field -> convert(field)).toList(); + csvWriter.writeRecord(record); } else if (row instanceof Map) { Map casted = (Map) row; if (!first) { this.first = true; if (header) { - var rows = casted.keySet().stream().map(field -> convert(field)).toList(); - csvWriter.writeRow(rows); + var record = casted.keySet().stream().map(field -> convert(field)).toList(); + csvWriter.writeRecord(record); } } - var rows = casted.values().stream().map(field -> convert(field)).toList(); - csvWriter.writeRow(rows); + var record = casted.values().stream().map(field -> convert(field)).toList(); + csvWriter.writeRecord(record); } } }); @@ -194,7 +194,9 @@ private de.siegmar.fastcsv.writer.CsvWriter csvWriter(Writer writer) { builder.quoteCharacter(this.textDelimiter); builder.fieldSeparator(this.fieldSeparator); builder.lineDelimiter(LineDelimiter.of(this.lineDelimiter)); - builder.quoteStrategy(this.alwaysDelimitText ? QuoteStrategy.ALWAYS : QuoteStrategy.REQUIRED); + if (this.alwaysDelimitText) { + builder.quoteStrategy(QuoteStrategies.ALWAYS); + } return builder.build(writer); }