Skip to content

Commit

Permalink
[Improve] fix doris source duplicate splitid (#414)
Browse files Browse the repository at this point in the history
* fix doris source duplicate splitid

* code style
  • Loading branch information
JNSimba committed Jul 3, 2024
1 parent ba21aab commit 67c9eb1
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,12 @@ public SplitEnumerator<DorisSourceSplit, PendingSplitsCheckpoint> createEnumerat
List<DorisSourceSplit> dorisSourceSplits = new ArrayList<>();
List<PartitionDefinition> partitions =
RestService.findPartitions(options, readOptions, LOG);
partitions.forEach(m -> dorisSourceSplits.add(new DorisSourceSplit(m)));
for (int index = 0; index < partitions.size(); index++) {
PartitionDefinition partitionDef = partitions.get(index);
String splitId = partitionDef.getBeAddress() + "_" + index;
dorisSourceSplits.add(new DorisSourceSplit(splitId, partitionDef));
}
DorisSplitAssigner splitAssigner = new SimpleSplitAssigner(dorisSourceSplits);

return new DorisSourceEnumerator(context, splitAssigner);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.doris.flink.source.enumerator.PendingSplitsCheckpoint;
import org.apache.doris.flink.source.split.DorisSourceSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

Expand All @@ -29,6 +31,7 @@
/** The {@code SimpleSplitAssigner} hands out splits in a random order. */
public class SimpleSplitAssigner implements DorisSplitAssigner {

private static final Logger LOG = LoggerFactory.getLogger(SimpleSplitAssigner.class);
private final ArrayList<DorisSourceSplit> splits;

public SimpleSplitAssigner(Collection<DorisSourceSplit> splits) {
Expand All @@ -43,6 +46,7 @@ public Optional<DorisSourceSplit> getNext(@Nullable String hostname) {

@Override
public void addSplits(Collection<DorisSourceSplit> splits) {
LOG.info("Adding splits: {}", splits);
splits.addAll(splits);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void handleSplitRequest(int subtaskId, @Nullable String hostname) {

@Override
public void addSplitsBack(List<DorisSourceSplit> splits, int subtaskId) {
LOG.debug("Doris Source Enumerator adds splits back: {}", splits);
LOG.info("Doris Source Enumerator adds splits back: {}", splits);
splitAssigner.addSplits(splits);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

/** A {@link SourceSplit} that represents a {@link PartitionDefinition}. */
public class DorisSourceSplit implements SourceSplit {

private String id;
private final PartitionDefinition partitionDefinition;

/**
Expand All @@ -36,13 +36,14 @@ public class DorisSourceSplit implements SourceSplit {
*/
@Nullable transient byte[] serializedFormCache;

public DorisSourceSplit(PartitionDefinition partitionDefinition) {
public DorisSourceSplit(String id, PartitionDefinition partitionDefinition) {
this.id = id;
this.partitionDefinition = partitionDefinition;
}

@Override
public String splitId() {
return partitionDefinition.getBeAddress();
return id;
}

public PartitionDefinition getPartitionDefinition() {
Expand All @@ -52,9 +53,10 @@ public PartitionDefinition getPartitionDefinition() {
@Override
public String toString() {
return String.format(
"DorisSourceSplit: %s.%s,be=%s,tablets=%s",
"DorisSourceSplit: %s.%s,id=%s,be=%s,tablets=%s",
partitionDefinition.getDatabase(),
partitionDefinition.getTable(),
id,
partitionDefinition.getBeAddress(),
partitionDefinition.getTabletIds());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class DorisSourceSplitSerializer implements SimpleVersionedSerializer<Dor
private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(64));

private static final int VERSION = 1;
private static final int VERSION = 2;

private static void writeLongArray(DataOutputView out, Long[] values) throws IOException {
out.writeInt(values.length);
Expand Down Expand Up @@ -71,6 +71,7 @@ public byte[] serialize(DorisSourceSplit split) throws IOException {
}

final DataOutputSerializer out = SERIALIZER_CACHE.get();

PartitionDefinition partDef = split.getPartitionDefinition();
out.writeUTF(partDef.getDatabase());
out.writeUTF(partDef.getTable());
Expand All @@ -81,6 +82,8 @@ public byte[] serialize(DorisSourceSplit split) throws IOException {
out.writeInt(queryPlanBytes.length);
out.write(queryPlanBytes);

out.writeUTF(split.splitId());

final byte[] result = out.getCopyOfBuffer();
out.clear();

Expand All @@ -93,13 +96,16 @@ public byte[] serialize(DorisSourceSplit split) throws IOException {

@Override
public DorisSourceSplit deserialize(int version, byte[] serialized) throws IOException {
if (version == 1) {
return deserialize(serialized);
switch (version) {
case 1:
case 2:
return deserializeSplit(version, serialized);
default:
throw new IOException("Unknown version: " + version);
}
throw new IOException("Unknown version: " + version);
}

private DorisSourceSplit deserialize(byte[] serialized) throws IOException {
private DorisSourceSplit deserializeSplit(int version, byte[] serialized) throws IOException {
final DataInputDeserializer in = new DataInputDeserializer(serialized);
final String database = in.readUTF();
final String table = in.readUTF();
Expand All @@ -112,8 +118,14 @@ private DorisSourceSplit deserialize(byte[] serialized) throws IOException {
final byte[] bytes = new byte[len];
in.read(bytes);
final String queryPlan = new String(bytes, StandardCharsets.UTF_8);

// read split id
String splitId = "splitId";
if (version >= 2) {
splitId = in.readUTF();
}
PartitionDefinition partDef =
new PartitionDefinition(database, table, beAddress, tabletIds, queryPlan);
return new DorisSourceSplit(partDef);
return new DorisSourceSplit(splitId, partDef);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ private static void assertCheckpointsEqual(

@Test
public void serializeSplit() throws Exception {
final DorisSourceSplit split = new DorisSourceSplit(OptionUtils.buildPartitionDef());
final DorisSourceSplit split =
new DorisSourceSplit("splitId", OptionUtils.buildPartitionDef());
PendingSplitsCheckpoint checkpoint = new PendingSplitsCheckpoint(Arrays.asList(split));

final PendingSplitsCheckpointSerializer splitSerializer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private static DorisSourceReader createReader(TestingReaderContext context) {
}

private static DorisSourceSplit createTestDorisSplit() throws IOException {
return new DorisSourceSplit(OptionUtils.buildPartitionDef());
return new DorisSourceSplit("splitId", OptionUtils.buildPartitionDef());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public class DorisSourceSplitSerializerTest {

@Test
public void serializeSplit() throws Exception {
final DorisSourceSplit split = new DorisSourceSplit(OptionUtils.buildPartitionDef());
final DorisSourceSplit split =
new DorisSourceSplit("splitId", OptionUtils.buildPartitionDef());

DorisSourceSplit deSerialized = serializeAndDeserializeSplit(split);
assertEquals(split, deSerialized);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public void testSplit() {
new PartitionDefinition("db", "tbl", "be", new HashSet<>(), "queryplan1");
PartitionDefinition pd2 =
new PartitionDefinition("db", "tbl", "be", new HashSet<>(), "queryplan1");
DorisSourceSplit split1 = new DorisSourceSplit(pd1);
DorisSourceSplit split2 = new DorisSourceSplit(pd2);
DorisSourceSplit split1 = new DorisSourceSplit("be_1", pd1);
DorisSourceSplit split2 = new DorisSourceSplit("be_2", pd2);
Assert.assertEquals(split1, split2);
}
}

0 comments on commit 67c9eb1

Please sign in to comment.