Skip to content

Commit b767220

Browse files
committedSep 5, 2024·
Drop KvinTupleInternal and add KvinRecord-based writer.
1 parent 39eb60a commit b767220

File tree

8 files changed

+387
-216
lines changed

8 files changed

+387
-216
lines changed
 

‎bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/Compactor.java

+17-37
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.github.linkedfactory.core.kvin.parquet;
22

3+
import io.github.linkedfactory.core.kvin.parquet.records.KvinRecord;
4+
import net.enilink.commons.iterator.IExtendedIterator;
35
import net.enilink.commons.util.Pair;
46
import org.apache.commons.io.FileUtils;
57
import org.apache.hadoop.conf.Configuration;
@@ -153,13 +155,6 @@ private ParquetReader<IdMapping> getParquetMappingReader(HadoopInputFile file) t
153155
.build();
154156
}
155157

156-
private ParquetReader<KvinTupleInternal> getParquetDataReader(HadoopInputFile file) throws IOException {
157-
return AvroParquetReader.<KvinTupleInternal>builder(file)
158-
.withDataModel(reflectData)
159-
.useStatsFilter()
160-
.build();
161-
}
162-
163158
private void compactDataFiles(File weekFolder) throws IOException {
164159
Lock readLock = kvinParquet.readLock();
165160
try {
@@ -182,49 +177,34 @@ private void compactDataFiles(File weekFolder) throws IOException {
182177
Paths.get(archiveLocation).relativize(weekFolder.toPath()));
183178

184179
Path compactionFile = new Path(targetFolder.toAbsolutePath().toString(), "data__1.parquet");
185-
ParquetWriter<KvinTupleInternal> compactionFileWriter = getParquetDataWriter(compactionFile);
180+
ParquetWriter<KvinRecord> compactionFileWriter = getKvinRecordWriter(compactionFile);
186181

187-
PriorityQueue<Pair<KvinTupleInternal, ParquetReader<KvinTupleInternal>>> nextTuples =
182+
PriorityQueue<Pair<KvinRecord, IExtendedIterator<KvinRecord>>> nextRecords =
188183
new PriorityQueue<>(Comparator.comparing(Pair::getFirst));
189184
for (java.nio.file.Path dataFile : dataFiles) {
190-
ParquetReader<KvinTupleInternal> reader = getParquetDataReader(
191-
HadoopInputFile.fromPath(new Path(dataFile.toString()), new Configuration()));
192-
KvinTupleInternal tuple = reader.read();
193-
if (tuple != null) {
194-
nextTuples.add(new Pair<>(tuple, reader));
185+
IExtendedIterator<KvinRecord> it = createKvinRecordReader(new Path(dataFile.toString()), null);
186+
if (it.hasNext()) {
187+
nextRecords.add(new Pair<>(it.next(), it));
195188
} else {
196-
try {
197-
reader.close();
198-
} catch (IOException e) {
199-
}
189+
it.close();
200190
}
201191
}
202192

203-
KvinTupleInternal prevTuple = null;
204-
while (!nextTuples.isEmpty()) {
205-
var pair = nextTuples.poll();
206-
if (prevTuple == null || prevTuple.compareTo(pair.getFirst()) != 0) {
193+
KvinRecord prevRecord = null;
194+
while (!nextRecords.isEmpty()) {
195+
var pair = nextRecords.poll();
196+
if (prevRecord == null || prevRecord.compareTo(pair.getFirst()) != 0) {
207197
var tuple = pair.getFirst();
208-
// update the first flag
209-
if (prevTuple == null || !Arrays.equals(prevTuple.id, tuple.id)) {
210-
tuple.setFirst(true);
211-
} else {
212-
tuple.setFirst(null);
213-
}
214198
compactionFileWriter.write(tuple);
215-
prevTuple = tuple;
216-
} else if (prevTuple != null) {
199+
prevRecord = tuple;
200+
} else if (prevRecord != null) {
217201
// omit tuple as it is duplicate in terms of id, time, and seqNr
218202
}
219203

220-
KvinTupleInternal tuple = pair.getSecond().read();
221-
if (tuple != null) {
222-
nextTuples.add(new Pair<>(tuple, pair.getSecond()));
204+
if (pair.getSecond().hasNext()) {
205+
nextRecords.add(new Pair<>(pair.getSecond().next(), pair.getSecond()));
223206
} else {
224-
try {
225-
pair.getSecond().close();
226-
} catch (IOException e) {
227-
}
207+
pair.getSecond().close();
228208
}
229209
}
230210

‎bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/KvinParquet.java

+21-34
Original file line numberDiff line numberDiff line change
@@ -275,9 +275,8 @@ private synchronized void putInternal(Iterable<KvinTuple> tuples) throws IOExcep
275275

276276
WriterState writerState = null;
277277
String prevKey = null;
278-
KvinTupleInternal prevTuple = null;
279278
for (KvinTuple tuple : tuples) {
280-
KvinTupleInternal internalTuple = new KvinTupleInternal();
279+
KvinRecord record = new KvinRecord();
281280

282281
Calendar tupleDate = getDate(tuple.time);
283282
int year = tupleDate.get(Calendar.YEAR);
@@ -293,39 +292,32 @@ private synchronized void putInternal(Iterable<KvinTuple> tuples) throws IOExcep
293292
.resolve(weekFolderName)
294293
.resolve("data__1.parquet");
295294
Files.createDirectories(file.getParent());
296-
writerState = new WriterState(file, getParquetDataWriter(new Path(file.toString())),
295+
writerState = new WriterState(file, getKvinRecordWriter(new Path(file.toString())),
297296
year, week);
298297
writers.put(key, writerState);
299298
}
300299
prevKey = key;
301300
}
302301

303302
// writing mappings and values
304-
internalTuple.setId(generateId(tuple, writeContext,
305-
itemMappingWriter, propertyMappingWriter, contextMappingWriter));
306-
internalTuple.setTime(tuple.time);
307-
internalTuple.setSeqNr(tuple.seqNr);
308-
309-
internalTuple.setValueInt(tuple.value instanceof Integer ? (int) tuple.value : null);
310-
internalTuple.setValueLong(tuple.value instanceof Long ? (long) tuple.value : null);
311-
internalTuple.setValueFloat(tuple.value instanceof Float ? (float) tuple.value : null);
312-
internalTuple.setValueDouble(tuple.value instanceof Double ? (double) tuple.value : null);
313-
internalTuple.setValueString(tuple.value instanceof String ? (String) tuple.value : null);
314-
internalTuple.setValueBool(tuple.value instanceof Boolean ? (Boolean) tuple.value ? 1 : 0 : null);
315-
if (tuple.value instanceof Record || tuple.value instanceof URI || tuple.value instanceof BigInteger ||
316-
tuple.value instanceof BigDecimal || tuple.value instanceof Short || tuple.value instanceof Object[]) {
317-
internalTuple.setValueObject(encodeRecord(tuple.value));
318-
} else {
319-
internalTuple.setValueObject(null);
320-
}
321-
// set first flag
322-
if (prevTuple == null || !Arrays.equals(prevTuple.id, internalTuple.id)) {
323-
internalTuple.setFirst(true);
303+
long[] id = generateIds(tuple, writeContext,
304+
itemMappingWriter, propertyMappingWriter, contextMappingWriter);
305+
record.itemId = id[0];
306+
record.contextId = id[1];
307+
record.propertyId = id[2];
308+
record.time = tuple.time;
309+
record.seqNr = tuple.seqNr;
310+
311+
Object value = tuple.value;
312+
if (value instanceof Record || value instanceof URI || value instanceof BigInteger ||
313+
value instanceof BigDecimal || value instanceof Short || value instanceof Object[]) {
314+
value = ByteBuffer.wrap(encodeRecord(value));
324315
}
325-
writerState.writer.write(internalTuple);
316+
record.value = value;
317+
318+
writerState.writer.write(record);
326319
writerState.minMax[0] = Math.min(writerState.minMax[0], writeContext.lastItemId);
327320
writerState.minMax[1] = Math.max(writerState.minMax[1], writeContext.lastItemId);
328-
prevTuple = internalTuple;
329321
}
330322

331323
for (WriterState state : writers.values()) {
@@ -558,7 +550,7 @@ private Calendar getDate(long timestamp) {
558550
return calendar;
559551
}
560552

561-
private byte[] generateId(KvinTuple tuple,
553+
private long[] generateIds(KvinTuple tuple,
562554
WriteContext writeContext,
563555
ParquetWriter itemMappingWriter,
564556
ParquetWriter propertyMappingWriter,
@@ -620,12 +612,7 @@ private byte[] generateId(KvinTuple tuple,
620612
}
621613
return newId;
622614
});
623-
624-
ByteBuffer idBuffer = ByteBuffer.allocate(Long.BYTES * 3);
625-
idBuffer.putLong(itemId);
626-
idBuffer.putLong(contextId);
627-
idBuffer.putLong(propertyId);
628-
return idBuffer.array();
615+
return new long[] {itemId, contextId, propertyId};
629616
}
630617

631618
private long getId(URI entity, IdType idType) {
@@ -1445,12 +1432,12 @@ static class InputFileInfo {
14451432

14461433
static class WriterState {
14471434
java.nio.file.Path file;
1448-
ParquetWriter<KvinTupleInternal> writer;
1435+
ParquetWriter<KvinRecord> writer;
14491436
int year;
14501437
int week;
14511438
long[] minMax = {Long.MAX_VALUE, Long.MIN_VALUE};
14521439

1453-
WriterState(java.nio.file.Path file, ParquetWriter<KvinTupleInternal> writer, int year, int week) {
1440+
WriterState(java.nio.file.Path file, ParquetWriter<KvinRecord> writer, int year, int week) {
14541441
this.file = file;
14551442
this.writer = writer;
14561443
this.year = year;

‎bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/KvinTupleInternal.java

-125
This file was deleted.

0 commit comments

Comments
 (0)
Please sign in to comment.