Skip to content

Commit

Permalink
update PipeTableModeTsFileBuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
luoluoyuyu committed Dec 17, 2024
1 parent 5b8820e commit 9dd29ac
Showing 1 changed file with 44 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -73,7 +72,10 @@ public List<Pair<String, File>> convertTabletToTSFileWithDBInfo() throws IOExcep
}
List<Pair<String, File>> pairList = new ArrayList<>();
for (Map.Entry<String, List<Tablet>> entry : dataBase2TabletList.entrySet()) {
pairList.addAll(writeTableModelTabletsToTsFiles(entry.getValue(), entry.getKey()));
final LinkedHashSet<LinkedList<Pair<Tablet, List<Pair<IDeviceID, Integer>>>>> linkedHashSet =
new LinkedHashSet<>();
pairList.addAll(
writeTableModelTabletsToTsFiles(entry.getValue(), entry.getKey(), linkedHashSet));
}
return pairList;
}
Expand All @@ -95,44 +97,56 @@ public synchronized void close() {
dataBase2TabletList.clear();
}

private List<Pair<String, File>> writeTableModelTabletsToTsFiles(
final List<Tablet> tabletList, final String dataBase) throws IOException {
private <T extends Pair<Tablet, List<Pair<IDeviceID, Integer>>>>
List<Pair<String, File>> writeTableModelTabletsToTsFiles(
final List<Tablet> tabletList,
final String dataBase,
LinkedHashSet<LinkedList<T>> linkedHashSet)
throws IOException {

final Map<String, List<Tablet>> tableName2Tablets = new HashMap<>();
final Map<String, List<T>> tableName2Tablets = new HashMap<>();

// Sort the tablets by dataBaseName
for (final Tablet tablet : tabletList) {
tableName2Tablets.computeIfAbsent(tablet.getTableName(), k -> new ArrayList<>()).add(tablet);
tableName2Tablets
.computeIfAbsent(tablet.getTableName(), k -> new ArrayList<>())
.add((T) new Pair<>(tablet, WriteUtils.splitTabletByDevice(tablet)));
}

// Replace ArrayList with LinkedList to improve performance
final LinkedHashSet<LinkedList<Pair<Tablet, List<Pair<IDeviceID, Integer>>>>> table2Tablets =
new LinkedHashSet<>();
final LinkedHashSet<LinkedList<T>> table2Tablets = new LinkedHashSet<>();

// Sort the tablets by start time in first device
for (final List<T> tablets : tableName2Tablets.values()) {
tablets.sort(
(o1, o2) -> {
final IDeviceID deviceID = o1.right.get(0).left;
final int result;
if ((result = deviceID.compareTo(o2.right.get(0).left)) == 0) {
return Long.compare(o1.left.timestamps[0], o2.left.timestamps[0]);
}
return result;
});
}

// Sort the tables by table name
tableName2Tablets.entrySet().stream()
.sorted(Map.Entry.comparingByKey(Comparator.naturalOrder()))
.forEach(
entry -> {
LinkedList<Pair<Tablet, List<Pair<IDeviceID, Integer>>>> list = new LinkedList<>();
for (final Tablet tablet : entry.getValue()) {
writerPairToList(list, new Pair<>(tablet, WriteUtils.splitTabletByDevice(tablet)));
}
table2Tablets.add(list);
});
.forEach(entry -> linkedHashSet.add(new LinkedList<>(entry.getValue())));

// Help GC
tableName2Tablets.clear();

final List<Pair<String, File>> sealedFiles = new ArrayList<>();

// Try making the tsfile size as large as possible
while (!table2Tablets.isEmpty()) {
while (!linkedHashSet.isEmpty()) {
if (Objects.isNull(fileWriter)) {
createFileWriter();
}

try {
tryBestToWriteTabletsIntoOneFile(table2Tablets);
tryBestToWriteTabletsIntoOneFile(linkedHashSet);
} catch (final Exception e) {
LOGGER.warn(
"Batch id = {}: Failed to write tablets into tsfile, because {}",
Expand Down Expand Up @@ -186,21 +200,20 @@ private List<Pair<String, File>> writeTableModelTabletsToTsFiles(
return sealedFiles;
}

private void tryBestToWriteTabletsIntoOneFile(
final LinkedHashSet<LinkedList<Pair<Tablet, List<Pair<IDeviceID, Integer>>>>>
device2TabletsLinkedList)
throws IOException {
final Iterator<LinkedList<Pair<Tablet, List<Pair<IDeviceID, Integer>>>>> iterator =
device2TabletsLinkedList.iterator();
private <T extends Pair<Tablet, List<Pair<IDeviceID, Integer>>>>
void tryBestToWriteTabletsIntoOneFile(
final LinkedHashSet<LinkedList<T>> device2TabletsLinkedList) throws IOException {
final Iterator<LinkedList<T>> iterator = device2TabletsLinkedList.iterator();

while (iterator.hasNext()) {
final LinkedList<Pair<Tablet, List<Pair<IDeviceID, Integer>>>> tablets = iterator.next();
final LinkedList<T> tablets = iterator.next();

final List<Pair<Tablet, List<Pair<IDeviceID, Integer>>>> tabletsToWrite = new ArrayList<>();
final List<T> tabletsToWrite = new ArrayList<>();
final Map<IDeviceID, Long> deviceLastTimestampMap = new HashMap<>();
while (!tablets.isEmpty()) {
final Pair<Tablet, List<Pair<IDeviceID, Integer>>> pair = tablets.peekFirst();
if (timestampsAreNonOverlapping(pair, deviceLastTimestampMap)) {
final T pair = tablets.peekFirst();
if (timestampsAreNonOverlapping(
(Pair<Tablet, List<Pair<IDeviceID, Integer>>>) pair, deviceLastTimestampMap)) {
tabletsToWrite.add(pair);
tablets.pollFirst();
continue;
Expand Down Expand Up @@ -240,9 +253,9 @@ private void tryBestToWriteTabletsIntoOneFile(
* @return If false, the tablet overlaps with the previous tablet; if true, there is no time
* overlap.
*/
private boolean timestampsAreNonOverlapping(
final Pair<Tablet, List<Pair<IDeviceID, Integer>>> tabletPair,
final Map<IDeviceID, Long> deviceLastTimestampMap) {
private <T extends Pair<Tablet, List<Pair<IDeviceID, Integer>>>>
boolean timestampsAreNonOverlapping(
final T tabletPair, final Map<IDeviceID, Long> deviceLastTimestampMap) {
int currentTimestampIndex = 0;
for (Pair<IDeviceID, Integer> deviceTimestampIndexPair : tabletPair.right) {
final Long lastDeviceTimestamp = deviceLastTimestampMap.get(deviceTimestampIndexPair.left);
Expand All @@ -257,77 +270,4 @@ private boolean timestampsAreNonOverlapping(

return true;
}

/**
* Add the Tablet to the List and compare the IDevice minimum timestamp with each Tablet from the
* beginning. If all the IDevice minimum timestamps of the current Tablet are smaller than the
* IDevice minimum timestamps of a certain Tablet in the List, put the current Tablet in this
* position.
*/
private void writerPairToList(
final LinkedList<Pair<Tablet, List<Pair<IDeviceID, Integer>>>> list,
final Pair<Tablet, List<Pair<IDeviceID, Integer>>> pair) {
int lastResult = Integer.MAX_VALUE;
if (list.isEmpty()) {
list.add(pair);
return;
}
ListIterator<Pair<Tablet, List<Pair<IDeviceID, Integer>>>> iterator = list.listIterator();
while (iterator.hasNext()) {
final Pair<Tablet, List<Pair<IDeviceID, Integer>>> pair2 = iterator.next();
final int result = compareDeviceID(pair2, pair);
if (lastResult == 0 && result != 0) {
iterator.add(pair);
return;
}
lastResult = result;
}
list.add(pair);
}

/**
* Compares the time differences of the same DeviceID in two device ID lists. If the time of the
* same DeviceID in the second device list is greater than in the first, then a positive number is
* returned; if there is no such DeviceID, then 0 is returned.
*
* @param firstDeviceList The first device ID list and its associated times
* @param secondDeviceList The second device ID list and its associated times
* @return The comparison result
*/
private int compareDeviceID(
final Pair<Tablet, List<Pair<IDeviceID, Integer>>> firstDeviceList,
final Pair<Tablet, List<Pair<IDeviceID, Integer>>> secondDeviceList) {
int bCount = 0;
int aIndex = 0;
int bIndex = 0;
int aLastTimeIndex = 0;
int bLastTimeIndex = 0;
final List<Pair<IDeviceID, Integer>> listA = firstDeviceList.right;
final List<Pair<IDeviceID, Integer>> listB = secondDeviceList.right;
while (aIndex < listA.size() && bIndex < listB.size()) {
int comparisonResult = listA.get(aIndex).left.compareTo(listB.get(bIndex).left);
if (comparisonResult == 0) {
long aTime = firstDeviceList.left.timestamps[aLastTimeIndex];
long bTime = secondDeviceList.left.timestamps[bLastTimeIndex];
if (aTime < bTime) {
bCount++;
}
aLastTimeIndex = listA.get(aIndex).right;
bLastTimeIndex = listB.get(bIndex).right;
aIndex++;
bIndex++;
continue;
}

if (comparisonResult > 0) {
bLastTimeIndex = listB.get(bIndex).right;
bIndex++;
continue;
}

aLastTimeIndex = listA.get(aIndex).right;
aIndex++;
}
return bCount;
}
}

0 comments on commit 9dd29ac

Please sign in to comment.