Skip to content

Commit

Permalink
move out split logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas-HuWei committed Jul 11, 2024
1 parent 109550f commit d155414
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.milvus.source;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
Expand Down Expand Up @@ -94,14 +95,20 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
MilvusSourceSplit split = pendingSplits.poll();
if (null != split) {
handleEveryRowInternal(split, output);
} else if (noMoreSplit && pendingSplits.isEmpty()) {
// signal to the source that we have reached the end of the data.
log.info("Closed the bounded mivlus source");
context.signalNoMoreElement();
} else {
Thread.sleep(1000L);
if (!noMoreSplit) {
log.info("Milvus source wait split!");
}
}
}
if (noMoreSplit
&& pendingSplits.isEmpty()
&& Boundedness.BOUNDED.equals(context.getBoundedness())) {
// signal to the source that we have reached the end of the data.
log.info("Closed the bounded milvus source");
context.signalNoMoreElement();
}
Thread.sleep(1000L);
}

private void handleEveryRowInternal(MilvusSourceSplit split, Collector<SeaTunnelRow> output) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,19 +149,6 @@ private void initSourceData() {
"Failed to create index on vector field! Error: " + ret.getMessage());
}

// Specify an index type on the varchar field.
ret =
milvusClient.createIndex(
CreateIndexParam.newBuilder()
.withCollectionName(COLLECTION_NAME)
.withFieldName(TITLE_FIELD)
.withIndexType(IndexType.TRIE)
.build());
if (ret.getStatus() != R.Status.Success.getCode()) {
throw new RuntimeException(
"Failed to create index on varchar field! Error: " + ret.getMessage());
}

// Call loadCollection() to enable automatically loading data into memory for searching
milvusClient.loadCollection(
LoadCollectionParam.newBuilder().withCollectionName(COLLECTION_NAME).build());
Expand Down

0 comments on commit d155414

Please sign in to comment.