Skip to content

Commit

Permalink
Support custom Lucene analyzer with arguments and custom query parser
Browse files Browse the repository at this point in the history
  • Loading branch information
jackluo923 committed Apr 24, 2024
1 parent 099a86c commit 5ad8b05
Show file tree
Hide file tree
Showing 14 changed files with 495 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
package org.apache.pinot.segment.local.realtime.impl.invertedindex;

import java.io.File;
import java.lang.reflect.Constructor;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.queryparser.classic.QueryParser;
import org.apache.lucene.queryparser.classic.QueryParserBase;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.SearcherManager;
Expand Down Expand Up @@ -52,6 +53,7 @@ public class RealtimeLuceneTextIndex implements MutableTextIndex {
private final LuceneTextIndexCreator _indexCreator;
private SearcherManager _searcherManager;
private Analyzer _analyzer;
private Constructor<QueryParserBase> _queryParserClassConstructor;
private final String _column;
private final String _segmentName;
private boolean _enablePrefixSuffixMatchingInPhraseQueries = false;
Expand Down Expand Up @@ -82,6 +84,8 @@ public RealtimeLuceneTextIndex(String column, File segmentIndexDir, String segme
IndexWriter indexWriter = _indexCreator.getIndexWriter();
_searcherManager = new SearcherManager(indexWriter, false, false, null);
_analyzer = _indexCreator.getIndexWriter().getConfig().getAnalyzer();
_queryParserClassConstructor =
getQueryParserWithStringAndAnalyzerTypeConstructor(config.getLuceneQueryParserClass());
_enablePrefixSuffixMatchingInPhraseQueries = config.isEnablePrefixSuffixMatchingInPhraseQueries();
} catch (Exception e) {
LOGGER.error("Failed to instantiate realtime Lucene index reader for column {}, exception {}", column,
Expand Down Expand Up @@ -122,12 +126,19 @@ public MutableRoaringBitmap getDocIds(String searchQuery) {
Callable<MutableRoaringBitmap> searchCallable = () -> {
IndexSearcher indexSearcher = null;
try {
QueryParser parser = new QueryParser(_column, _analyzer);
// Lucene Query Parser can be stateful, we will instantiate a new instance for each query
QueryParserBase parser = _queryParserClassConstructor.newInstance(_column, _analyzer);
if (_enablePrefixSuffixMatchingInPhraseQueries) {
// TODO: this code path is semi-broken as the default QueryParser does not always utilizes the analyzer
// passed into the constructor for wildcards in phrases in favor of using a custom Lucene query parser
// https://github.com/elastic/elasticsearch/issues/22540
parser.setAllowLeadingWildcard(true);
}
Query query = parser.parse(searchQuery);
if (_enablePrefixSuffixMatchingInPhraseQueries) {
// TODO: this code path is semi-broken as the default QueryParser does not always utilizes the analyzer
// passed into the constructor for wildcards in phrases in favor of using a custom Lucene query parser
// https://github.com/elastic/elasticsearch/issues/22540
query = LuceneTextIndexUtils.convertToMultiTermSpanQuery(query);
}
indexSearcher = _searcherManager.acquire();
Expand Down Expand Up @@ -181,6 +192,27 @@ private MutableRoaringBitmap getPinotDocIds(IndexSearcher indexSearcher, Mutable
return actualDocIDs;
}

private Constructor<QueryParserBase> getQueryParserWithStringAndAnalyzerTypeConstructor(String queryParserClassName)
throws ReflectiveOperationException {
// Fail-fast if the query parser if specified class is not QueryParseBase class
final Class<?> queryParserClass = Class.forName(queryParserClassName);
if (!QueryParserBase.class.isAssignableFrom(queryParserClass)) {
throw new ReflectiveOperationException("The specified lucene query parser class " + queryParserClassName
+ " is not assignable from " + QueryParserBase.class.getCanonicalName());
}
// Fail-fast if the query parser does not have the required constructor used by this class
try {
queryParserClass.getConstructor(String.class, Analyzer.class);
} catch (NoSuchMethodException ex) {
throw new ReflectiveOperationException("The specified lucene query parser class " + queryParserClassName
+ " is not assignable from does not have the required constructor method with parameter type "
+ "[String.class, Analyzer.class]"
);
}

return (Constructor<QueryParserBase>) queryParserClass.getConstructor(String.class, Analyzer.class);
}

@Override
public void commit() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.commons.io.FileUtils;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.CharArraySet;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StoredField;
Expand Down Expand Up @@ -118,14 +117,7 @@ public LuceneTextIndexCreator(String column, File segmentIndexDir, boolean commi
// to V3 if segmentVersion is set to V3 in SegmentGeneratorConfig.
_indexFile = getV1TextIndexFile(segmentIndexDir);

Analyzer luceneAnalyzer;
if (luceneAnalyzerClass.isEmpty() || luceneAnalyzerClass.equals(StandardAnalyzer.class.getName())) {
luceneAnalyzer = TextIndexUtils.getStandardAnalyzerWithCustomizedStopWords(config.getStopWordsInclude(),
config.getStopWordsExclude());
} else {
luceneAnalyzer = TextIndexUtils.getAnalyzerFromClassName(luceneAnalyzerClass);
}

Analyzer luceneAnalyzer = TextIndexUtils.getAnalyzer(config);
IndexWriterConfig indexWriterConfig = new IndexWriterConfig(luceneAnalyzer);
indexWriterConfig.setRAMBufferSizeMB(config.getLuceneMaxBufferSizeMB());
indexWriterConfig.setCommitOnClose(commit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
Expand Down Expand Up @@ -90,10 +89,7 @@ public LuceneTextIndexReader(String column, File indexDir, int numDocs, TextInde
// TODO: consider using a threshold of num docs per segment to decide between building
// mapping file upfront on segment load v/s on-the-fly during query processing
_docIdTranslator = new DocIdTranslator(indexDir, _column, numDocs, _indexSearcher);
String luceneAnalyzerClass = config.getLuceneAnalyzerClass();
_analyzer = luceneAnalyzerClass.equals(StandardAnalyzer.class.getName())
? TextIndexUtils.getStandardAnalyzerWithCustomizedStopWords(config.getStopWordsInclude(),
config.getStopWordsExclude()) : TextIndexUtils.getAnalyzerFromClassName(luceneAnalyzerClass);
_analyzer = TextIndexUtils.getAnalyzer(config);
LOGGER.info("Successfully read lucene index for {} from {}", _column, indexDir);
} catch (Exception e) {
LOGGER.error("Failed to instantiate Lucene text index reader for column {}, exception {}", column,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import javax.annotation.Nullable;
import org.apache.pinot.segment.local.segment.store.TextIndexUtils;
import org.apache.pinot.segment.spi.index.TextIndexConfig;
import org.apache.pinot.segment.spi.utils.CsvParser;
import org.apache.pinot.spi.config.table.FSTType;
import org.apache.pinot.spi.config.table.FieldConfig;

Expand Down Expand Up @@ -67,6 +68,23 @@ public TextIndexConfig.AbstractBuilder withProperties(@Nullable Map<String, Stri
_luceneAnalyzerClass = textIndexProperties.get(FieldConfig.TEXT_INDEX_LUCENE_ANALYZER_CLASS);
}

// Note that we cannot depend on jackson's default behavior to automatically coerce the comma delimited args to
// List<String>. This is because the args may contain comma and other special characters such as space. Therefore,
// we use our own csv parser to parse the values directly.
if (textIndexProperties.get(FieldConfig.TEXT_INDEX_LUCENE_ANALYZER_CLASS_ARGS) != null) {
_luceneAnalyzerClassArgs = CsvParser.parse(
textIndexProperties.get(FieldConfig.TEXT_INDEX_LUCENE_ANALYZER_CLASS_ARGS), true, false);
}

if (textIndexProperties.get(FieldConfig.TEXT_INDEX_LUCENE_ANALYZER_CLASS_ARG_TYPES) != null) {
_luceneAnalyzerClassArgTypes = CsvParser.parse(
textIndexProperties.get(FieldConfig.TEXT_INDEX_LUCENE_ANALYZER_CLASS_ARG_TYPES), false, true);
}

if (textIndexProperties.get(FieldConfig.TEXT_INDEX_LUCENE_QUERY_PARSER_CLASS) != null) {
_luceneQueryParserClass = textIndexProperties.get(FieldConfig.TEXT_INDEX_LUCENE_QUERY_PARSER_CLASS);
}

for (Map.Entry<String, String> entry : textIndexProperties.entrySet()) {
if (entry.getKey().equalsIgnoreCase(FieldConfig.TEXT_FST_TYPE)) {
_fstType = FSTType.NATIVE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.segment.local.segment.store;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -32,12 +33,16 @@
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator;
import org.apache.pinot.segment.spi.V1Constants.Indexes;
import org.apache.pinot.segment.spi.index.TextIndexConfig;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
import org.apache.pinot.spi.config.table.FSTType;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class TextIndexUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(TextIndexUtils.class);
private TextIndexUtils() {
}

Expand Down Expand Up @@ -108,10 +113,141 @@ private static List<String> parseEntryAsString(@Nullable Map<String, String> col
.collect(Collectors.toList());
}

public static Analyzer getAnalyzerFromClassName(String luceneAnalyzerClass)
throws ReflectiveOperationException {
// Support instantiation with default constructor for now unless customized
return (Analyzer) Class.forName(luceneAnalyzerClass).getConstructor().newInstance();
public static Analyzer getAnalyzer(TextIndexConfig config) throws ReflectiveOperationException {
String luceneAnalyzerClassName = config.getLuceneAnalyzerClass();
List<String> luceneAnalyzerClassArgs = config.getLuceneAnalyzerClassArgs();
List<String> luceneAnalyzerClassArgsTypes = config.getLuceneAnalyzerClassArgTypes();
if (null == luceneAnalyzerClassName || luceneAnalyzerClassName.isEmpty()) {
// Default analyzer + default configs when custom defined analyzer is not specified
return TextIndexUtils.getStandardAnalyzerWithCustomizedStopWords(
config.getStopWordsInclude(), config.getStopWordsExclude());
} else {
// Custom analyzer + custom configs via reflection
if (luceneAnalyzerClassArgs.size() != luceneAnalyzerClassArgsTypes.size()) {
throw new ReflectiveOperationException("Mismatch of the number of analyzer arguments and arguments types.");
}

// Generate args type list
List<Class<?>> argClasses = new ArrayList<>();
for (String argType : luceneAnalyzerClassArgsTypes) {
argClasses.add(parseSupportedType(argType));
}

// Best effort coercion to the analyzer argument type
// Note only a subset of class types is supported, unsupported ones can be added in the future
List<Object> argValues = new ArrayList<>();
for (int i = 0; i < luceneAnalyzerClassArgs.size(); i++) {
argValues.add(parseSupportedTypes(luceneAnalyzerClassArgs.get(i), argClasses.get(i)));
}

// Initialize the custom analyzer class with custom analyzer args
Class<?> luceneAnalyzerClass = Class.forName(luceneAnalyzerClassName);
Analyzer analyzer;
if (!Analyzer.class.isAssignableFrom(luceneAnalyzerClass)) {
String exceptionMessage = "Custom analyzer must be a child of " + Analyzer.class.getCanonicalName();
LOGGER.error(exceptionMessage);
throw new ReflectiveOperationException(exceptionMessage);
}
if (luceneAnalyzerClassArgs.isEmpty()) {
// Default constructor
analyzer = (Analyzer) luceneAnalyzerClass.getConstructor().newInstance();
} else {
// Non-default constructor
analyzer = (Analyzer) luceneAnalyzerClass
.getConstructor(argClasses.toArray(new Class<?>[0])).newInstance(argValues.toArray(new Object[0]));
}
return analyzer;
}
}

/**
* Parse the Java value type specified in the type string
* @param valueTypeString FQCN of the value type class or the name of the primitive value type
* @return Class object of the value type
* @throws ClassNotFoundException when the value type is not supported
*/
public static Class<?> parseSupportedType(String valueTypeString) throws ClassNotFoundException {
try {
// Support both primitive types + class
switch (valueTypeString) {
case "java.lang.Byte.TYPE":
return Byte.TYPE;
case "java.lang.Short.TYPE":
return Short.TYPE;
case "java.lang.Integer.TYPE":
return Integer.TYPE;
case "java.lang.Long.TYPE":
return Long.TYPE;
case "java.lang.Float.TYPE":
return Float.TYPE;
case "java.lang.Double.TYPE":
return Double.TYPE;
case "java.lang.Boolean.TYPE":
return Boolean.TYPE;
case "java.lang.Character.TYPE":
return Character.TYPE;
default:
return Class.forName(valueTypeString);
}
} catch (ClassNotFoundException ex) {
LOGGER.error("Analyzer argument class type not found: " + valueTypeString);
throw ex;
}
}

/**
* Attempt to coerce string into supported value type
* @param stringValue string representation of the value
* @param clazz of the value
* @return class object of the value, auto-boxed if it is a primitive type
* @throws ReflectiveOperationException if value cannot be coerced without ambiguity or encountered unsupported type
*/
public static Object parseSupportedTypes(String stringValue, Class<?> clazz) throws ReflectiveOperationException {
try {
if (clazz.equals(String.class)) {
return stringValue;
} else if (clazz.equals(Byte.class) || clazz.equals(Byte.TYPE)) {
return Byte.parseByte(stringValue);
} else if (clazz.equals(Short.class) || clazz.equals(Short.TYPE)) {
return Short.parseShort(stringValue);
} else if (clazz.equals(Integer.class) || clazz.equals(Integer.TYPE)) {
return Integer.parseInt(stringValue);
} else if (clazz.equals(Long.class) || clazz.equals(Long.TYPE)) {
return Long.parseLong(stringValue);
} else if (clazz.equals(Float.class) || clazz.equals(Float.TYPE)) {
return Float.parseFloat(stringValue);
} else if (clazz.equals(Double.class) || clazz.equals(Double.TYPE)) {
return Double.parseDouble(stringValue);
} else if (clazz.equals(Boolean.class) || clazz.equals(Boolean.TYPE)) {
// Note we cannot use Boolean.parseBoolean here because it treats "abc" as false which
// introduces unexpected parsing results. We should validate the input by accepting only
// true|false in a case-insensitive manner, for all other values, return an exception.
String lowerCaseStringValue = stringValue.toLowerCase();
if (lowerCaseStringValue.equals("true")) {
return true;
} else if (lowerCaseStringValue.equals("false")) {
return false;
}
throw new ReflectiveOperationException();
} else if (clazz.equals(Character.class) || clazz.equals(Character.TYPE)) {
if (stringValue.length() == 1) {
return stringValue.charAt(0);
}
throw new ReflectiveOperationException();
} else {
throw new UnsupportedOperationException();
}
} catch (NumberFormatException | ReflectiveOperationException ex) {
String exceptionMessage = "Custom analyzer argument cannot be coerced from "
+ stringValue + " to " + clazz.getName() + " type";
LOGGER.error(exceptionMessage);
throw new ReflectiveOperationException(exceptionMessage);
} catch (UnsupportedOperationException ex) {
// In the future, consider adding more common serdes for common complex types used within Lucene
String exceptionMessage = "Custom analyzer argument does not support " + clazz.getName() + " type";
LOGGER.error(exceptionMessage);
throw new ReflectiveOperationException(exceptionMessage);
}
}

public static StandardAnalyzer getStandardAnalyzerWithCustomizedStopWords(@Nullable List<String> stopWordsInclude,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ public void testSegmentBuilderWithReuse(boolean columnMajorSegmentBuilder, Strin
IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
TextIndexConfig textIndexConfig =
new TextIndexConfig(false, null, null, false, false, Collections.emptyList(), Collections.emptyList(), false,
500, null, false);
500, null, null, null, null, false);

RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
new RealtimeSegmentConfig.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName)
Expand Down
Loading

0 comments on commit 5ad8b05

Please sign in to comment.