Skip to content

Commit 70343a4

Browse files
author
Robin Duda
committed
Add support for parsing CSV files.
1 parent e30e468 commit 70343a4

25 files changed

+913
-493
lines changed

README.md

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@ Tested with ElasticSearch 5.6.2 and 6.2.3.
2727

2828
Running the application, filename and index is required, to import from the terminal run:
2929
```
30-
java -Xmx2g -jar excelastic-1.2.7.jar <fileName> <indexName> --mapping mappingName --clear
30+
java -Xmx2g -jar excelastic-1.3.0.jar <fileName> <indexName> --mapping mappingName --clear
3131
```
3232
If running with --clear, then the existing index will be cleared before the import starts.
3333

3434
To run with the web interface, run the following in your terminal:
3535
```
36-
java -Xmx2g -jar excelastic-1.2.7.jar
36+
java -Xmx2g -jar excelastic-1.3.0.jar
3737
```
3838
When the application successfully connects to the ElasticSearch server, the browser will automatically open a new tab.
3939

@@ -67,10 +67,7 @@ If no configuration file is present a new configuration file will be created usi
6767

6868
## Contributing
6969

70-
If you want to contribute to this project, open an issue or pull request. ::
71-
72-
In the 1.2.7 release we have cleaned up the code and added even more javadoc
73-
in order to promote contributions! :astonished:
70+
If you want to contribute to this project, open an issue or pull request. :heart_eyes_cat: :metal:
7471

7572
---
7673

excelastic.png

1.84 KB
Loading

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
<groupId>com.codingchili</groupId>
1212
<artifactId>excelastic</artifactId>
13-
<version>1.2.7</version>
13+
<version>1.3.0</version>
1414
<build>
1515
<plugins>
1616
<plugin>

src/main/java/com/codingchili/ApplicationLauncher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
*/
2222
public class ApplicationLauncher {
2323
private final ApplicationLogger logger = new ApplicationLogger(getClass());
24-
public static String VERSION = "1.2.7";
24+
public static String VERSION = "1.3.0";
2525
private Vertx vertx;
2626

2727
public static void main(String[] args) {

src/main/java/com/codingchili/Controller/CommandLine.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,11 @@ private void importFile(ImportEvent event, String fileName) {
4646
logger.loadingFromFilesystem(fileName);
4747
logger.parsingStarted();
4848
try {
49-
FileParser parser = new FileParser(new File(fileName), 1, fileName);
49+
FileParser parser = ParserFactory.getByFilename(fileName);
50+
parser.setFileData(fileName, 1, fileName);
51+
5052
event.setParser(parser);
51-
parser.assertFileParsable();
53+
parser.initialize();
5254

5355
logger.importStarted(event.getIndex());
5456
vertx.eventBus().send(Configuration.INDEXING_ELASTICSEARCH, event, getDeliveryOpts(),

src/main/java/com/codingchili/Controller/Website.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import static com.codingchili.ApplicationLauncher.VERSION;
2121
import static com.codingchili.Model.Configuration.INDEXING_ELASTICSEARCH;
2222
import static com.codingchili.Model.ElasticWriter.*;
23-
import static com.codingchili.Model.FileParser.INDEX;
23+
import static com.codingchili.Model.ExcelParser.INDEX;
2424

2525
/**
2626
* @author Robin Duda
@@ -172,8 +172,10 @@ private void parse(String uploadedFileName, MultiMap params, String fileName, Fu
172172
vertx.executeBlocking(blocking -> {
173173
try {
174174
ImportEvent event = ImportEvent.fromParams(params);
175-
FileParser parser = new FileParser(new File(uploadedFileName), event.getOffset(), fileName);
176-
parser.assertFileParsable();
175+
FileParser parser = ParserFactory.getByFilename(fileName);
176+
parser.setFileData(uploadedFileName, event.getOffset(), fileName);
177+
178+
parser.initialize();
177179
event.setParser(parser);
178180

179181
// submit an import event.
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
package com.codingchili.Model;
2+
3+
import io.vertx.core.json.JsonObject;
4+
import org.reactivestreams.Subscriber;
5+
import org.reactivestreams.Subscription;
6+
7+
import java.io.*;
8+
import java.nio.ByteBuffer;
9+
import java.nio.MappedByteBuffer;
10+
import java.nio.channels.FileChannel;
11+
import java.util.*;
12+
import java.util.concurrent.atomic.AtomicInteger;
13+
14+
/**
15+
* @author Robin Duda
16+
* <p>
17+
* Parses CSV files.
18+
*/
19+
public class CSVParser implements FileParser {
20+
private static final int MAX_LINE_LENGTH = 16384;
21+
private static final int PAGE_16MB = 16777216;
22+
23+
private static final char TOKEN_NULL = '\0';
24+
private static final char TOKEN_CR = '\r';
25+
private static final char TOKEN_LF = '\n';
26+
private static final char TOKEN_QUOTE = '\"';
27+
private static final char TOKEN_SEPARATOR = ',';
28+
29+
private ByteBuffer buffer = ByteBuffer.allocate(MAX_LINE_LENGTH);
30+
private JsonObject headers = new JsonObject();
31+
private Iterator<String> header;
32+
private RandomAccessFile file;
33+
private MappedByteBuffer map;
34+
private long fileSize;
35+
private int index = 0;
36+
private int rows = 0;
37+
38+
@Override
39+
public void setFileData(String localFileName, int offset, String fileName) throws FileNotFoundException {
40+
file = new RandomAccessFile(localFileName, "rw");
41+
try {
42+
map = file.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, PAGE_16MB);
43+
fileSize = file.length();
44+
readRowCount();
45+
readHeaders();
46+
} catch (IOException e) {
47+
throw new ParserException(e);
48+
}
49+
}
50+
51+
@Override
52+
public Set<String> getSupportedFileExtensions() {
53+
return new HashSet<>(Collections.singletonList(".csv"));
54+
}
55+
56+
@Override
57+
public void initialize() {
58+
index = 0;
59+
map.position(0);
60+
readRow(); // skip headers row.
61+
for (int i = 0; i < rows; i++) {
62+
readRow();
63+
}
64+
}
65+
66+
private int readRowCount() {
67+
for (int i = map.position(); i < fileSize; i++) {
68+
if (map.get(i) == '\n') {
69+
rows++;
70+
}
71+
}
72+
return rows;
73+
}
74+
75+
private void readHeaders() throws IOException {
76+
map.position(0);
77+
78+
for (int i = map.position(); i < file.length(); i++) {
79+
if (map.get(i) == '\n') {
80+
Arrays.stream(new String(buffer.array()).split(","))
81+
.map(header -> header.replaceAll("\"", ""))
82+
.map(String::trim).forEach(header -> {
83+
headers.put(header, "<empty>");
84+
});
85+
break;
86+
} else {
87+
buffer.put(map.get(i));
88+
}
89+
}
90+
buffer.clear();
91+
}
92+
93+
private void process(AtomicInteger columnsRead, ByteBuffer buffer, JsonObject json) {
94+
columnsRead.incrementAndGet();
95+
96+
if (columnsRead.get() > headers.size()) {
97+
throw new ColumnsExceededHeadersException(columnsRead.get(), headers.size(), index + 1);
98+
} else {
99+
int read = buffer.position();
100+
byte[] line = new byte[read + 1];
101+
102+
buffer.position(0);
103+
buffer.get(line, 0, read);
104+
line[line.length - 1] = '\0';
105+
106+
json.put(header.next(), parseDatatype(line));
107+
buffer.clear();
108+
}
109+
}
110+
111+
private JsonObject readRow() {
112+
// reset current header.
113+
header = headers.fieldNames().iterator();
114+
115+
AtomicInteger columnsRead = new AtomicInteger(0);
116+
JsonObject json = headers.copy();
117+
boolean quoted = false;
118+
boolean done = false;
119+
120+
while (!done) {
121+
byte current = map.get();
122+
123+
switch (current) {
124+
case TOKEN_NULL:
125+
// EOF call process.
126+
process(columnsRead, buffer, json);
127+
done = true;
128+
break;
129+
case TOKEN_CR:
130+
case TOKEN_LF:
131+
// final header is being read and EOL appears.
132+
if (columnsRead.get() == headers.size() - 1) {
133+
process(columnsRead, buffer, json);
134+
done = true;
135+
break;
136+
} else {
137+
// skip token if not all headers read.
138+
continue;
139+
}
140+
case TOKEN_QUOTE:
141+
// toggle quoted to support commas within quotes.
142+
quoted = !quoted;
143+
break;
144+
case TOKEN_SEPARATOR:
145+
if (!quoted) {
146+
process(columnsRead, buffer, json);
147+
break;
148+
}
149+
default:
150+
// store the current token in the buffer until the column ends.
151+
buffer.put(current);
152+
}
153+
}
154+
155+
if (!(columnsRead.get() == headers.size())) {
156+
throw new ParserException(
157+
String.format("Error at line %d, values (%d) does not match headers (%d).",
158+
index, columnsRead.get(), headers.size()));
159+
} else {
160+
index++;
161+
}
162+
163+
// parse json object.
164+
return json;
165+
}
166+
167+
private Object parseDatatype(byte[] data) {
168+
String line = new String(data).trim();
169+
170+
if (line.matches("[0-9]*")) {
171+
return Integer.parseInt(line);
172+
} else if (line.matches("true|false")) {
173+
return Boolean.parseBoolean(line);
174+
} else {
175+
return line;
176+
}
177+
}
178+
179+
@Override
180+
public int getNumberOfElements() {
181+
return rows;
182+
}
183+
184+
@Override
185+
public void subscribe(Subscriber<? super JsonObject> subscriber) {
186+
map.position(0);
187+
readRow();
188+
index = 0;
189+
190+
subscriber.onSubscribe(new Subscription() {
191+
private boolean complete = false;
192+
private int index = 0;
193+
194+
@Override
195+
public void request(long count) {
196+
for (int i = 0; i < count && i < rows; i++) {
197+
JsonObject result = readRow();
198+
199+
if (result != null) {
200+
subscriber.onNext(result);
201+
} else {
202+
complete = true;
203+
subscriber.onComplete();
204+
}
205+
}
206+
207+
index += count;
208+
209+
if (index >= rows && !complete) {
210+
subscriber.onComplete();
211+
}
212+
}
213+
214+
@Override
215+
public void cancel() {
216+
// send no more items!
217+
}
218+
});
219+
}
220+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.codingchili.Model;
2+
3+
/**
4+
* @author Robin Duda
5+
*
6+
* Thrown when more columns are encountered than there is headers.
7+
*/
8+
public class ColumnsExceededHeadersException extends ParserException {
9+
10+
/**
11+
* @param values number of values encountered
12+
* @param headers the number of headers on the first row.
13+
* @param index the line in the file.
14+
*/
15+
public ColumnsExceededHeadersException(int values, int headers, int index) {
16+
super(String.format("Encountered too many values (%d) on row %d, expected to match headers (%d).",
17+
values, index, headers));
18+
}
19+
}

0 commit comments

Comments
 (0)