From b7fe5036a4278c4d35f231a55ce723a68fcc6c73 Mon Sep 17 00:00:00 2001 From: Dave Martin Date: Tue, 11 Jun 2024 21:38:02 +0100 Subject: [PATCH] WIP - multi-threaded indexing https://github.com/CatalogueOfLife/backend/issues/1321 https://github.com/gbif/pipelines/issues/217 --- .../catalogue/matching/IndexingService.java | 214 ++++++++++++++---- .../life/catalogue/matching/NameNRank.java | 2 +- 2 files changed, 170 insertions(+), 46 deletions(-) diff --git a/matching-ws/src/main/java/life/catalogue/matching/IndexingService.java b/matching-ws/src/main/java/life/catalogue/matching/IndexingService.java index 7b3abeeda..2e096c9a1 100644 --- a/matching-ws/src/main/java/life/catalogue/matching/IndexingService.java +++ b/matching-ws/src/main/java/life/catalogue/matching/IndexingService.java @@ -6,6 +6,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -87,6 +88,9 @@ public class IndexingService { @Value("${clb.driver}") String clDriver; + @Value("${indexing.threads:6}") + Integer indexingThreads; + protected final MatchingService matchingService; private static final String REL_PATTERN_STR = "(\\d+)(?:LX?RC?|R(\\d+))"; @@ -233,7 +237,7 @@ public void writeCLBToFile(@NotNull final String datasetKeyInput) throws Excepti () -> session.getMapper(IndexingMapper.class).getAllForDataset(validDatasetKey), name -> { try { - sbc.write(name); + sbc.write(cleanNameUsage(name)); } catch (Exception e) { throw new RuntimeException(e); } @@ -247,6 +251,14 @@ public void writeCLBToFile(@NotNull final String datasetKeyInput) throws Excepti log.info("ChecklistBank export written to file {}: {}", fileName, counter.get()); } + private NameUsage cleanNameUsage(NameUsage name) { + if (name.getRank() == null) { + name.setRank(Rank.UNRANKED.toString()); + } + name.setScientificName(name.getScientificName().replaceAll("[/\\\\]", "").trim()); + return name; + } + @Transactional public void indexIdentifiers(String datasetKey) throws Exception { writeCLBToFile(datasetKey); @@ -348,7 +360,6 @@ private static IndexWriter getIndexWriter(Directory dir) throws IOException { return new IndexWriter(dir, getIndexWriterConfig()); } - /** * * @param tempNameUsageIndexPath @@ -366,7 +377,7 @@ private void writeJoinIndex(String tempNameUsageIndexPath, String joinIndexPath, Directory ancillaryDirectory = FSDirectory.open(indexDirectory); // create the join index - Long[] counters = createJoinIndex(matchingService, tempDirectory, ancillaryDirectory, acceptedOnly, true); + Long[] counters = createJoinIndex(matchingService, tempDirectory, ancillaryDirectory, acceptedOnly, true, indexingThreads); // load export metadata ObjectMapper mapper = new ObjectMapper(); @@ -396,7 +407,7 @@ public static Long[] createJoinIndex(MatchingService matchingService, Directory tempUsageIndexDirectory, Directory outputDirectory, boolean acceptedOnly, - boolean closeDirectoryOnExit) + boolean closeDirectoryOnExit, int indexingThreads) throws IOException { IndexWriterConfig config = getIndexWriterConfig(); @@ -412,49 +423,41 @@ public static Long[] createJoinIndex(MatchingService matchingService, AtomicLong counter = new AtomicLong(0); AtomicLong matchedCounter = new AtomicLong(0); + ExecutorService exec = new ThreadPoolExecutor(indexingThreads, indexingThreads, + 5000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue(indexingThreads * 2, true), new ThreadPoolExecutor.CallerRunsPolicy()); + + List batch = new ArrayList<>(); // Write document data for (ScoreDoc hit : hits) { counter.incrementAndGet(); Document doc = searcher.storedFields().document(hit.doc); - Map hierarchy = loadHierarchy(searcher, doc.get(FIELD_ID)); - - String status = doc.get(FIELD_STATUS); - if (status != null && - acceptedOnly && - !status.equals(TaxonomicStatus.ACCEPTED.name())) { - // skip synonyms, otherwise we would index them twice - continue; - } - String scientificName = doc.get(FIELD_SCIENTIFIC_NAME); - Classification classification = new Classification(); - classification.setKingdom(hierarchy.getOrDefault(Rank.KINGDOM.name(), "")); - classification.setPhylum(hierarchy.getOrDefault(Rank.PHYLUM.name(), "")); - classification.setClazz(hierarchy.getOrDefault(Rank.CLASS.name(), "")); - classification.setOrder(hierarchy.getOrDefault(Rank.ORDER.name(), "")); - classification.setFamily(hierarchy.getOrDefault(Rank.FAMILY.name(), "")); - classification.setGenus(hierarchy.getOrDefault(Rank.GENUS.name(), "")); - classification.setSpecies(hierarchy.getOrDefault(Rank.SPECIES.name(), "")); - - if (counter.get() % 100000 == 0) { - log.info("Indexed: {} taxa", counter.get()); + batch.add(doc); + + if (batch.size() >= 100000) { + log.info("Starting batch: {} taxa", counter.get()); + List finalBatch = batch; + exec.submit(new JoinIndexTask(matchingService, searcher, joinIndexWriter, finalBatch, acceptedOnly, matchedCounter)); + batch = new ArrayList<>(); } + } - // match to main dataset - try { - NameUsageMatch nameUsageMatch = matchingService.match(scientificName, classification, true); - if (nameUsageMatch.getUsage() != null) { - doc.add(new StringField(FIELD_JOIN_ID, - nameUsageMatch.getAcceptedUsage() != null ? nameUsageMatch.getAcceptedUsage().getKey() : - nameUsageMatch.getUsage().getKey(), Field.Store.YES)); - joinIndexWriter.addDocument(doc); - matchedCounter.incrementAndGet(); - } else { - log.debug("No match for {}", scientificName); - } - }catch (Exception e) { - log.error("Problem matching name from ancillary index " + scientificName, e.getMessage(), e); + //final batch + exec.submit(new JoinIndexTask(matchingService, searcher, joinIndexWriter, batch, acceptedOnly, matchedCounter)); + + log.info("Finished reading CSV file. Indexing remaining taxa..."); + + exec.shutdown(); + try { + if (!exec.awaitTermination(5, TimeUnit.MINUTES)) { + log.error("Forcing shut down of executor service, pending tasks will be lost! {}", exec); + exec.shutdownNow(); } + } catch (InterruptedException var4) { + log.error("Forcing shut down of executor service, pending tasks will be lost! {}", exec); + exec.shutdownNow(); + Thread.currentThread().interrupt(); } // close temp @@ -473,6 +476,72 @@ public static Long[] createJoinIndex(MatchingService matchingService, return new Long[]{counter.get(), matchedCounter.get()}; } + static class JoinIndexTask implements Runnable { + private final IndexWriter writer; + private final List docs; + private final IndexSearcher searcher; + private final boolean acceptedOnly; + private final MatchingService matchingService; + private final AtomicLong matchedCounter; + + public JoinIndexTask(MatchingService matchingService, IndexSearcher searcher, IndexWriter writer, List docs, + boolean acceptedOnly, AtomicLong matchedCounter) { + this.searcher = searcher; + this.writer = writer; + this.docs = docs; + this.acceptedOnly = acceptedOnly; + this.matchingService = matchingService; + this.matchedCounter = matchedCounter; + } + + @Override + public void run() { + try { + for (Document doc : docs) { + + Map hierarchy = loadHierarchy(searcher, doc.get(FIELD_ID)); + + String status = doc.get(FIELD_STATUS); + if (status != null && + acceptedOnly && + !status.equals(TaxonomicStatus.ACCEPTED.name())) { + // skip synonyms, otherwise we would index them twice + continue; + } + String scientificName = doc.get(FIELD_SCIENTIFIC_NAME); + Classification classification = new Classification(); + classification.setKingdom(hierarchy.getOrDefault(Rank.KINGDOM.name(), "")); + classification.setPhylum(hierarchy.getOrDefault(Rank.PHYLUM.name(), "")); + classification.setClazz(hierarchy.getOrDefault(Rank.CLASS.name(), "")); + classification.setOrder(hierarchy.getOrDefault(Rank.ORDER.name(), "")); + classification.setFamily(hierarchy.getOrDefault(Rank.FAMILY.name(), "")); + classification.setGenus(hierarchy.getOrDefault(Rank.GENUS.name(), "")); + classification.setSpecies(hierarchy.getOrDefault(Rank.SPECIES.name(), "")); + + // match to main dataset + try { + NameUsageMatch nameUsageMatch = matchingService.match(scientificName, classification, true); + if (nameUsageMatch.getUsage() != null) { + doc.add(new StringField(FIELD_JOIN_ID, + nameUsageMatch.getAcceptedUsage() != null ? nameUsageMatch.getAcceptedUsage().getKey() : + nameUsageMatch.getUsage().getKey(), Field.Store.YES)); + writer.addDocument(doc); + matchedCounter.incrementAndGet(); + } else { + log.debug("No match for {}", scientificName); + } + } catch (Exception e) { + log.error("Problem matching name from ancillary index " + scientificName, e.getMessage(), e); + } + } + writer.flush(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + private static Optional getById(IndexSearcher searcher, String id) { Query query = new TermQuery(new Term(FIELD_ID, id)); try { @@ -523,33 +592,58 @@ private void indexFile(String exportPath, String indexPath) throws Exception { final String filePath = exportPath + "/index.csv"; final String metadataPath = exportPath + "/metadata.json"; + ExecutorService exec = new ThreadPoolExecutor(indexingThreads, indexingThreads, + 5000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue(indexingThreads * 2, true), new ThreadPoolExecutor.CallerRunsPolicy()); + try (Reader reader = new FileReader(filePath); IndexWriter indexWriter = new IndexWriter(directory, config)) { CsvToBean csvReader = new CsvToBeanBuilder(reader) .withType(NameUsage.class) .withSeparator('$') + .withMultilineLimit(1) .withIgnoreLeadingWhiteSpace(true) .withIgnoreEmptyLine(true) .build(); Iterator iterator = csvReader.iterator(); - + List batch = new ArrayList<>(); while (iterator.hasNext()) { - if (counter.get() % 100000 == 0) { - log.info("Indexed: {} taxa", counter.get()); - } NameUsage nameUsage = iterator.next(); - Document doc = toDoc(nameUsage); - indexWriter.addDocument(doc); counter.incrementAndGet(); + batch.add(nameUsage); + if (batch.size() >= 100000) { + List finalBatch = batch; + exec.submit(new IndexingTask(indexWriter, finalBatch)); + batch = new ArrayList<>(); + } } + + //final batch + exec.submit(new IndexingTask(indexWriter, batch)); + + log.info("Finished reading CSV file. Indexing remaining taxa..."); + + exec.shutdown(); + try { + if (!exec.awaitTermination(5, TimeUnit.MINUTES)) { + log.error("Forcing shut down of executor service, pending tasks will be lost! {}", exec); + exec.shutdownNow(); + } + } catch (InterruptedException var4) { + log.error("Forcing shut down of executor service, pending tasks will be lost! {}", exec); + exec.shutdownNow(); + Thread.currentThread().interrupt(); + } + log.info("Final index commit"); indexWriter.commit(); log.info("Optimising index...."); indexWriter.forceMerge(1); log.info("Optimisation complete."); } + // write metadata file in JSON format log.info("Taxa indexed: {}", counter.get()); @@ -563,6 +657,36 @@ private void indexFile(String exportPath, String indexPath) throws Exception { mapper.writeValue(new File(indexPath + "/metadata.json"), metadata); } + class YourThreadFactory implements ThreadFactory { + public Thread newThread(Runnable r) { + return new Thread(r, "NameUsage-Indexing-taskThread"); + } + } + + static class IndexingTask implements Runnable { + private final IndexWriter writer; + private final List nameUsages; + + public IndexingTask(IndexWriter writer, List nameUsages) { + this.writer = writer; + this.nameUsages = nameUsages; + } + + @Override + public void run() { + try { + for (NameUsage nameUsage : nameUsages) { + Document doc = toDoc(nameUsage); + writer.addDocument(doc); + } + writer.flush(); + nameUsages.clear(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + @Transactional public void runDatasetIndexing(final Integer datasetKey) throws Exception { diff --git a/matching-ws/src/main/java/life/catalogue/matching/NameNRank.java b/matching-ws/src/main/java/life/catalogue/matching/NameNRank.java index 26ce91366..db25cd424 100644 --- a/matching-ws/src/main/java/life/catalogue/matching/NameNRank.java +++ b/matching-ws/src/main/java/life/catalogue/matching/NameNRank.java @@ -204,7 +204,7 @@ protected static boolean isSimpleBinomial(String name) { private static void warnIfMissing(String name, @Nullable String epithet, String part) { if (exists(epithet) && name != null && !name.toLowerCase().contains(epithet.toLowerCase())) { - LOG.warn("ScientificName >{}< missing {}: {}", name, part, epithet); + LOG.debug("ScientificName >{}< missing {}: {}", name, part, epithet); } }