Skip to content

Commit

Permalink
WIP - multi-threaded indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
djtfmartin committed Jun 11, 2024
1 parent d8c526b commit b7fe503
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 46 deletions.
214 changes: 169 additions & 45 deletions matching-ws/src/main/java/life/catalogue/matching/IndexingService.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
Expand Down Expand Up @@ -87,6 +88,9 @@ public class IndexingService {
@Value("${clb.driver}")
String clDriver;

@Value("${indexing.threads:6}")
Integer indexingThreads;

protected final MatchingService matchingService;

private static final String REL_PATTERN_STR = "(\\d+)(?:LX?RC?|R(\\d+))";
Expand Down Expand Up @@ -233,7 +237,7 @@ public void writeCLBToFile(@NotNull final String datasetKeyInput) throws Excepti
() -> session.getMapper(IndexingMapper.class).getAllForDataset(validDatasetKey),
name -> {
try {
sbc.write(name);
sbc.write(cleanNameUsage(name));
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -247,6 +251,14 @@ public void writeCLBToFile(@NotNull final String datasetKeyInput) throws Excepti
log.info("ChecklistBank export written to file {}: {}", fileName, counter.get());
}

private NameUsage cleanNameUsage(NameUsage name) {
if (name.getRank() == null) {
name.setRank(Rank.UNRANKED.toString());
}
name.setScientificName(name.getScientificName().replaceAll("[/\\\\]", "").trim());
return name;
}

@Transactional
public void indexIdentifiers(String datasetKey) throws Exception {
writeCLBToFile(datasetKey);
Expand Down Expand Up @@ -348,7 +360,6 @@ private static IndexWriter getIndexWriter(Directory dir) throws IOException {
return new IndexWriter(dir, getIndexWriterConfig());
}


/**
*
* @param tempNameUsageIndexPath
Expand All @@ -366,7 +377,7 @@ private void writeJoinIndex(String tempNameUsageIndexPath, String joinIndexPath,
Directory ancillaryDirectory = FSDirectory.open(indexDirectory);

// create the join index
Long[] counters = createJoinIndex(matchingService, tempDirectory, ancillaryDirectory, acceptedOnly, true);
Long[] counters = createJoinIndex(matchingService, tempDirectory, ancillaryDirectory, acceptedOnly, true, indexingThreads);

// load export metadata
ObjectMapper mapper = new ObjectMapper();
Expand Down Expand Up @@ -396,7 +407,7 @@ public static Long[] createJoinIndex(MatchingService matchingService,
Directory tempUsageIndexDirectory,
Directory outputDirectory,
boolean acceptedOnly,
boolean closeDirectoryOnExit)
boolean closeDirectoryOnExit, int indexingThreads)
throws IOException {

IndexWriterConfig config = getIndexWriterConfig();
Expand All @@ -412,49 +423,41 @@ public static Long[] createJoinIndex(MatchingService matchingService,
AtomicLong counter = new AtomicLong(0);
AtomicLong matchedCounter = new AtomicLong(0);

ExecutorService exec = new ThreadPoolExecutor(indexingThreads, indexingThreads,
5000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(indexingThreads * 2, true), new ThreadPoolExecutor.CallerRunsPolicy());

List<Document> batch = new ArrayList<>();
// Write document data
for (ScoreDoc hit : hits) {

counter.incrementAndGet();
Document doc = searcher.storedFields().document(hit.doc);
Map<String, String> hierarchy = loadHierarchy(searcher, doc.get(FIELD_ID));

String status = doc.get(FIELD_STATUS);
if (status != null &&
acceptedOnly &&
!status.equals(TaxonomicStatus.ACCEPTED.name())) {
// skip synonyms, otherwise we would index them twice
continue;
}
String scientificName = doc.get(FIELD_SCIENTIFIC_NAME);
Classification classification = new Classification();
classification.setKingdom(hierarchy.getOrDefault(Rank.KINGDOM.name(), ""));
classification.setPhylum(hierarchy.getOrDefault(Rank.PHYLUM.name(), ""));
classification.setClazz(hierarchy.getOrDefault(Rank.CLASS.name(), ""));
classification.setOrder(hierarchy.getOrDefault(Rank.ORDER.name(), ""));
classification.setFamily(hierarchy.getOrDefault(Rank.FAMILY.name(), ""));
classification.setGenus(hierarchy.getOrDefault(Rank.GENUS.name(), ""));
classification.setSpecies(hierarchy.getOrDefault(Rank.SPECIES.name(), ""));

if (counter.get() % 100000 == 0) {
log.info("Indexed: {} taxa", counter.get());
batch.add(doc);

if (batch.size() >= 100000) {
log.info("Starting batch: {} taxa", counter.get());
List<Document> finalBatch = batch;
exec.submit(new JoinIndexTask(matchingService, searcher, joinIndexWriter, finalBatch, acceptedOnly, matchedCounter));
batch = new ArrayList<>();
}
}

// match to main dataset
try {
NameUsageMatch nameUsageMatch = matchingService.match(scientificName, classification, true);
if (nameUsageMatch.getUsage() != null) {
doc.add(new StringField(FIELD_JOIN_ID,
nameUsageMatch.getAcceptedUsage() != null ? nameUsageMatch.getAcceptedUsage().getKey() :
nameUsageMatch.getUsage().getKey(), Field.Store.YES));
joinIndexWriter.addDocument(doc);
matchedCounter.incrementAndGet();
} else {
log.debug("No match for {}", scientificName);
}
}catch (Exception e) {
log.error("Problem matching name from ancillary index " + scientificName, e.getMessage(), e);
//final batch
exec.submit(new JoinIndexTask(matchingService, searcher, joinIndexWriter, batch, acceptedOnly, matchedCounter));

log.info("Finished reading CSV file. Indexing remaining taxa...");

exec.shutdown();
try {
if (!exec.awaitTermination(5, TimeUnit.MINUTES)) {
log.error("Forcing shut down of executor service, pending tasks will be lost! {}", exec);
exec.shutdownNow();
}
} catch (InterruptedException var4) {
log.error("Forcing shut down of executor service, pending tasks will be lost! {}", exec);
exec.shutdownNow();
Thread.currentThread().interrupt();
}

// close temp
Expand All @@ -473,6 +476,72 @@ public static Long[] createJoinIndex(MatchingService matchingService,
return new Long[]{counter.get(), matchedCounter.get()};
}

static class JoinIndexTask implements Runnable {
private final IndexWriter writer;
private final List<Document> docs;
private final IndexSearcher searcher;
private final boolean acceptedOnly;
private final MatchingService matchingService;
private final AtomicLong matchedCounter;

public JoinIndexTask(MatchingService matchingService, IndexSearcher searcher, IndexWriter writer, List<Document> docs,
boolean acceptedOnly, AtomicLong matchedCounter) {
this.searcher = searcher;
this.writer = writer;
this.docs = docs;
this.acceptedOnly = acceptedOnly;
this.matchingService = matchingService;
this.matchedCounter = matchedCounter;
}

@Override
public void run() {
try {
for (Document doc : docs) {

Map<String, String> hierarchy = loadHierarchy(searcher, doc.get(FIELD_ID));

String status = doc.get(FIELD_STATUS);
if (status != null &&
acceptedOnly &&
!status.equals(TaxonomicStatus.ACCEPTED.name())) {
// skip synonyms, otherwise we would index them twice
continue;
}
String scientificName = doc.get(FIELD_SCIENTIFIC_NAME);
Classification classification = new Classification();
classification.setKingdom(hierarchy.getOrDefault(Rank.KINGDOM.name(), ""));
classification.setPhylum(hierarchy.getOrDefault(Rank.PHYLUM.name(), ""));
classification.setClazz(hierarchy.getOrDefault(Rank.CLASS.name(), ""));
classification.setOrder(hierarchy.getOrDefault(Rank.ORDER.name(), ""));
classification.setFamily(hierarchy.getOrDefault(Rank.FAMILY.name(), ""));
classification.setGenus(hierarchy.getOrDefault(Rank.GENUS.name(), ""));
classification.setSpecies(hierarchy.getOrDefault(Rank.SPECIES.name(), ""));

// match to main dataset
try {
NameUsageMatch nameUsageMatch = matchingService.match(scientificName, classification, true);
if (nameUsageMatch.getUsage() != null) {
doc.add(new StringField(FIELD_JOIN_ID,
nameUsageMatch.getAcceptedUsage() != null ? nameUsageMatch.getAcceptedUsage().getKey() :
nameUsageMatch.getUsage().getKey(), Field.Store.YES));
writer.addDocument(doc);
matchedCounter.incrementAndGet();
} else {
log.debug("No match for {}", scientificName);
}
} catch (Exception e) {
log.error("Problem matching name from ancillary index " + scientificName, e.getMessage(), e);
}
}
writer.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
}


private static Optional<Document> getById(IndexSearcher searcher, String id) {
Query query = new TermQuery(new Term(FIELD_ID, id));
try {
Expand Down Expand Up @@ -523,33 +592,58 @@ private void indexFile(String exportPath, String indexPath) throws Exception {
final String filePath = exportPath + "/index.csv";
final String metadataPath = exportPath + "/metadata.json";

ExecutorService exec = new ThreadPoolExecutor(indexingThreads, indexingThreads,
5000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(indexingThreads * 2, true), new ThreadPoolExecutor.CallerRunsPolicy());

try (Reader reader = new FileReader(filePath);
IndexWriter indexWriter = new IndexWriter(directory, config)) {

CsvToBean<NameUsage> csvReader = new CsvToBeanBuilder(reader)
.withType(NameUsage.class)
.withSeparator('$')
.withMultilineLimit(1)
.withIgnoreLeadingWhiteSpace(true)
.withIgnoreEmptyLine(true)
.build();

Iterator<NameUsage> iterator = csvReader.iterator();

List<NameUsage> batch = new ArrayList<>();
while (iterator.hasNext()) {
if (counter.get() % 100000 == 0) {
log.info("Indexed: {} taxa", counter.get());
}
NameUsage nameUsage = iterator.next();
Document doc = toDoc(nameUsage);
indexWriter.addDocument(doc);
counter.incrementAndGet();
batch.add(nameUsage);
if (batch.size() >= 100000) {
List<NameUsage> finalBatch = batch;
exec.submit(new IndexingTask(indexWriter, finalBatch));
batch = new ArrayList<>();
}
}

//final batch
exec.submit(new IndexingTask(indexWriter, batch));

log.info("Finished reading CSV file. Indexing remaining taxa...");

exec.shutdown();
try {
if (!exec.awaitTermination(5, TimeUnit.MINUTES)) {
log.error("Forcing shut down of executor service, pending tasks will be lost! {}", exec);
exec.shutdownNow();
}
} catch (InterruptedException var4) {
log.error("Forcing shut down of executor service, pending tasks will be lost! {}", exec);
exec.shutdownNow();
Thread.currentThread().interrupt();
}

log.info("Final index commit");
indexWriter.commit();
log.info("Optimising index....");
indexWriter.forceMerge(1);
log.info("Optimisation complete.");
}

// write metadata file in JSON format
log.info("Taxa indexed: {}", counter.get());

Expand All @@ -563,6 +657,36 @@ private void indexFile(String exportPath, String indexPath) throws Exception {
mapper.writeValue(new File(indexPath + "/metadata.json"), metadata);
}

class YourThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
return new Thread(r, "NameUsage-Indexing-taskThread");
}
}

static class IndexingTask implements Runnable {
private final IndexWriter writer;
private final List<NameUsage> nameUsages;

public IndexingTask(IndexWriter writer, List<NameUsage> nameUsages) {
this.writer = writer;
this.nameUsages = nameUsages;
}

@Override
public void run() {
try {
for (NameUsage nameUsage : nameUsages) {
Document doc = toDoc(nameUsage);
writer.addDocument(doc);
}
writer.flush();
nameUsages.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
}

@Transactional
public void runDatasetIndexing(final Integer datasetKey) throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ protected static boolean isSimpleBinomial(String name) {

private static void warnIfMissing(String name, @Nullable String epithet, String part) {
if (exists(epithet) && name != null && !name.toLowerCase().contains(epithet.toLowerCase())) {
LOG.warn("ScientificName >{}< missing {}: {}", name, part, epithet);
LOG.debug("ScientificName >{}< missing {}: {}", name, part, epithet);
}
}

Expand Down

0 comments on commit b7fe503

Please sign in to comment.