Skip to content

Commit

Permalink
Merge pull request #33 from traveltime-dev/formatting
Browse files Browse the repository at this point in the history
Formatting
  • Loading branch information
mjanuszkiewicz-tt authored Aug 29, 2024
2 parents 326b8ad + b9be652 commit 519ec5c
Show file tree
Hide file tree
Showing 155 changed files with 14,455 additions and 12,714 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package com.traveltime.plugin.elasticsearch;


import com.traveltime.plugin.elasticsearch.query.TraveltimeFetchPhase;
import com.traveltime.plugin.elasticsearch.query.TraveltimeQueryBuilder;
import com.traveltime.plugin.elasticsearch.query.TraveltimeQueryParser;
import com.traveltime.plugin.elasticsearch.util.Util;
import com.traveltime.sdk.dto.requests.proto.Country;
import com.traveltime.sdk.dto.requests.proto.RequestType;
import com.traveltime.sdk.dto.requests.proto.Transportation;
import java.net.URI;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -25,60 +30,108 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;

import java.net.URI;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;

public class TraveltimePlugin extends Plugin implements SearchPlugin {
public static final Setting<String> APP_ID = Setting.simpleString("traveltime.app.id", Setting.Property.NodeScope);
public static final Setting<String> API_KEY = Setting.simpleString("traveltime.api.key", Setting.Property.NodeScope, Setting.Property.Filtered);
public static final Setting<Optional<Transportation.Modes>> DEFAULT_MODE = new Setting<>("traveltime.default.mode", s -> "", Util::findModeByName, Setting.Property.NodeScope);
public static final Setting<Optional<Country>> DEFAULT_COUNTRY = new Setting<>("traveltime.default.country", s -> "", Util::findCountryByName, Setting.Property.NodeScope);
public static final Setting<Optional<RequestType>> DEFAULT_REQUEST_TYPE = new Setting<>("traveltime.default.request_type", s -> RequestType.ONE_TO_MANY.name(), Util::findRequestTypeByName, Setting.Property.NodeScope);

public static final Setting<URI> API_URI = new Setting<>("traveltime.api.uri", s -> "https://proto.api.traveltimeapp.com/api/v2/", URI::create, Setting.Property.NodeScope);
public static final Setting<String> APP_ID =
Setting.simpleString("traveltime.app.id", Setting.Property.NodeScope);
public static final Setting<String> API_KEY =
Setting.simpleString(
"traveltime.api.key", Setting.Property.NodeScope, Setting.Property.Filtered);
public static final Setting<Optional<Transportation.Modes>> DEFAULT_MODE =
new Setting<>(
"traveltime.default.mode", s -> "", Util::findModeByName, Setting.Property.NodeScope);
public static final Setting<Optional<Country>> DEFAULT_COUNTRY =
new Setting<>(
"traveltime.default.country",
s -> "",
Util::findCountryByName,
Setting.Property.NodeScope);
public static final Setting<Optional<RequestType>> DEFAULT_REQUEST_TYPE =
new Setting<>(
"traveltime.default.request_type",
s -> RequestType.ONE_TO_MANY.name(),
Util::findRequestTypeByName,
Setting.Property.NodeScope);

private static final Setting<Integer> CACHE_CLEANUP_INTERVAL = Setting.intSetting("traveltime.cache.cleanup.interval", 120, 0, Setting.Property.NodeScope);
private static final Setting<Integer> CACHE_EXPIRY = Setting.intSetting("traveltime.cache.expiry", 60, 0, Setting.Property.NodeScope);
private static final Setting<Integer> CACHE_SIZE = Setting.intSetting("traveltime.cache.size", 50, 0, Setting.Property.NodeScope);
public static final Setting<URI> API_URI =
new Setting<>(
"traveltime.api.uri",
s -> "https://proto.api.traveltimeapp.com/api/v2/",
URI::create,
Setting.Property.NodeScope);

private void cleanUpAndReschedule(ThreadPool threadPool, TimeValue cleanupSeconds) {
TraveltimeCache.INSTANCE.cleanUp();
TraveltimeCache.DISTANCE.cleanUp();
threadPool.scheduleUnlessShuttingDown(cleanupSeconds, "generic", () -> cleanUpAndReschedule(threadPool, cleanupSeconds));
}
private static final Setting<Integer> CACHE_CLEANUP_INTERVAL =
Setting.intSetting("traveltime.cache.cleanup.interval", 120, 0, Setting.Property.NodeScope);
private static final Setting<Integer> CACHE_EXPIRY =
Setting.intSetting("traveltime.cache.expiry", 60, 0, Setting.Property.NodeScope);
private static final Setting<Integer> CACHE_SIZE =
Setting.intSetting("traveltime.cache.size", 50, 0, Setting.Property.NodeScope);

@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool, ResourceWatcherService resourceWatcherService, ScriptService scriptService, NamedXContentRegistry xContentRegistry, Environment environment, NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<RepositoriesService> repositoriesServiceSupplier) {
TimeValue cleanupSeconds = TimeValue.timeValueSeconds(CACHE_CLEANUP_INTERVAL.get(environment.settings()));
Duration cacheExpiry = Duration.ofSeconds(CACHE_EXPIRY.get(environment.settings()));
Integer cacheSize = CACHE_SIZE.get(environment.settings());
private void cleanUpAndReschedule(ThreadPool threadPool, TimeValue cleanupSeconds) {
TraveltimeCache.INSTANCE.cleanUp();
TraveltimeCache.DISTANCE.cleanUp();
threadPool.scheduleUnlessShuttingDown(
cleanupSeconds, "generic", () -> cleanUpAndReschedule(threadPool, cleanupSeconds));
}

TraveltimeCache.INSTANCE.setUp(cacheSize, cacheExpiry);
TraveltimeCache.DISTANCE.setUp(cacheSize, cacheExpiry);
cleanUpAndReschedule(threadPool, cleanupSeconds);
@Override
public Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier) {
TimeValue cleanupSeconds =
TimeValue.timeValueSeconds(CACHE_CLEANUP_INTERVAL.get(environment.settings()));
Duration cacheExpiry = Duration.ofSeconds(CACHE_EXPIRY.get(environment.settings()));
Integer cacheSize = CACHE_SIZE.get(environment.settings());

return super.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService, xContentRegistry, environment, nodeEnvironment, namedWriteableRegistry, indexNameExpressionResolver, repositoriesServiceSupplier);
TraveltimeCache.INSTANCE.setUp(cacheSize, cacheExpiry);
TraveltimeCache.DISTANCE.setUp(cacheSize, cacheExpiry);
cleanUpAndReschedule(threadPool, cleanupSeconds);

}
return super.createComponents(
client,
clusterService,
threadPool,
resourceWatcherService,
scriptService,
xContentRegistry,
environment,
nodeEnvironment,
namedWriteableRegistry,
indexNameExpressionResolver,
repositoriesServiceSupplier);
}

@Override
public List<Setting<?>> getSettings() {
return List.of(APP_ID, API_KEY, DEFAULT_MODE, DEFAULT_COUNTRY, DEFAULT_REQUEST_TYPE, API_URI, CACHE_CLEANUP_INTERVAL, CACHE_EXPIRY, CACHE_SIZE);
}
@Override
public List<Setting<?>> getSettings() {
return List.of(
APP_ID,
API_KEY,
DEFAULT_MODE,
DEFAULT_COUNTRY,
DEFAULT_REQUEST_TYPE,
API_URI,
CACHE_CLEANUP_INTERVAL,
CACHE_EXPIRY,
CACHE_SIZE);
}

@Override
public List<QuerySpec<?>> getQueries() {
return List.of(
new QuerySpec<>(TraveltimeQueryParser.NAME, TraveltimeQueryBuilder::new, new TraveltimeQueryParser())
);
}
@Override
public List<QuerySpec<?>> getQueries() {
return List.of(
new QuerySpec<>(
TraveltimeQueryParser.NAME, TraveltimeQueryBuilder::new, new TraveltimeQueryParser()));
}

@Override
public List<FetchSubPhase> getFetchSubPhases(FetchPhaseConstructionContext context) {
return List.of(new TraveltimeFetchPhase());
}
@Override
public List<FetchSubPhase> getFetchSubPhases(FetchPhaseConstructionContext context) {
return List.of(new TraveltimeFetchPhase());
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.traveltime.plugin.elasticsearch.query;

import com.traveltime.plugin.elasticsearch.TraveltimeCache;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import lombok.val;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Query;
Expand All @@ -12,69 +15,71 @@
import org.elasticsearch.search.fetch.subphase.FieldAndFormat;
import org.elasticsearch.search.fetch.subphase.FieldFetcher;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class TraveltimeFetchPhase implements FetchSubPhase {

private static class ParamFinder extends QueryVisitor {
private final List<TraveltimeSearchQuery> paramList = new ArrayList<>();
private static class ParamFinder extends QueryVisitor {
private final List<TraveltimeSearchQuery> paramList = new ArrayList<>();

@Override
public void visitLeaf(Query query) {
if (query instanceof TraveltimeSearchQuery) {
if (!((TraveltimeSearchQuery) query).getOutput().isEmpty()) {
paramList.add(((TraveltimeSearchQuery) query));
}
}
@Override
public void visitLeaf(Query query) {
if (query instanceof TraveltimeSearchQuery) {
if (!((TraveltimeSearchQuery) query).getOutput().isEmpty()) {
paramList.add(((TraveltimeSearchQuery) query));
}
}
}

public TraveltimeSearchQuery getQuery() {
if (paramList.size() == 1) return paramList.get(0);
else return null;
}
}
public TraveltimeSearchQuery getQuery() {
if (paramList.size() == 1) return paramList.get(0);
else return null;
}
}

@Override
public FetchSubPhaseProcessor getProcessor(FetchContext fetchContext) {
Query query = fetchContext.query();
val finder = new ParamFinder();
query.visit(finder);
TraveltimeSearchQuery traveltimeQuery = finder.getQuery();
if (traveltimeQuery == null) return null;
TraveltimeQueryParameters params = traveltimeQuery.getParams();
final String output = traveltimeQuery.getOutput();
final String distanceOutput = traveltimeQuery.getDistanceOutput();
@Override
public FetchSubPhaseProcessor getProcessor(FetchContext fetchContext) {
Query query = fetchContext.query();
val finder = new ParamFinder();
query.visit(finder);
TraveltimeSearchQuery traveltimeQuery = finder.getQuery();
if (traveltimeQuery == null) return null;
TraveltimeQueryParameters params = traveltimeQuery.getParams();
final String output = traveltimeQuery.getOutput();
final String distanceOutput = traveltimeQuery.getDistanceOutput();

FieldFetcher fieldFetcher = FieldFetcher.create(fetchContext.mapperService(), fetchContext.searchLookup(), List.of(new FieldAndFormat(params.getField(), null)));
FieldFetcher fieldFetcher =
FieldFetcher.create(
fetchContext.mapperService(),
fetchContext.searchLookup(),
List.of(new FieldAndFormat(params.getField(), null)));

return new FetchSubPhaseProcessor() {
return new FetchSubPhaseProcessor() {

@Override
public void setNextReader(LeafReaderContext readerContext) {
fieldFetcher.setNextReader(readerContext);
}
@Override
public void setNextReader(LeafReaderContext readerContext) {
fieldFetcher.setNextReader(readerContext);
}

@Override
public void process(HitContext hitContext) throws IOException {
val docValues = hitContext.reader().getSortedNumericDocValues(params.getField());
docValues.advance(hitContext.docId());
val point = docValues.nextValue();
if(!output.isEmpty()) {
Integer tt = TraveltimeCache.INSTANCE.get(params, point);
if (tt >= 0) {
hitContext.hit().setDocumentField(output, new DocumentField(output, List.of(tt)));
}
}
@Override
public void process(HitContext hitContext) throws IOException {
val docValues = hitContext.reader().getSortedNumericDocValues(params.getField());
docValues.advance(hitContext.docId());
val point = docValues.nextValue();
if (!output.isEmpty()) {
Integer tt = TraveltimeCache.INSTANCE.get(params, point);
if (tt >= 0) {
hitContext.hit().setDocumentField(output, new DocumentField(output, List.of(tt)));
}
}

if(!distanceOutput.isEmpty()) {
Integer td = TraveltimeCache.DISTANCE.get(params, point);
if (td >= 0) {
hitContext.hit().setDocumentField(distanceOutput, new DocumentField(distanceOutput, List.of(td)));
}
}
}
};
}
if (!distanceOutput.isEmpty()) {
Integer td = TraveltimeCache.DISTANCE.get(params, point);
if (td >= 0) {
hitContext
.hit()
.setDocumentField(distanceOutput, new DocumentField(distanceOutput, List.of(td)));
}
}
}
};
}
}
Loading

0 comments on commit 519ec5c

Please sign in to comment.