Skip to content

Commit

Permalink
Merge pull request #4 from omc/dan/fix_node_request
Browse files Browse the repository at this point in the history
Fix cluster wide invalidation
  • Loading branch information
dansimpson authored Oct 30, 2020
2 parents 9bbfe70 + 8f4b5ff commit 74f1525
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 19 deletions.
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
group = io.bonsai
publishedPluginVersion=1.0.1
pluginVersion=1.0.1
publishedPluginVersion=1.0.2
pluginVersion=1.0.2
esVersion=7.7.1
pluginName=stored-synonyms
pluginClassname=io.bonsai.plugins.synonyms.StoredSynonymsPlugin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,23 @@ public void store(StoredSynonyms set, ActionListener<IndexResponse> listener) {
set,
ActionListener.wrap(
(indexResponse) -> {
invalidateSynonymSet(set.getName());
listener.onResponse(indexResponse);
invalidateSynonymSet(
set.getName(),
ActionListener.wrap(
(invalidateResponse) -> {
log.info(
"Refreshed synonym cache for {} on {} nodes {} failures",
set.getName(),
invalidateResponse.getNodes().size(),
invalidateResponse.failures().size());

if (invalidateResponse.hasFailures()) {
invalidateResponse.failures().forEach(log::warn);
}

listener.onResponse(indexResponse);
},
listener::onFailure));
},
listener::onFailure));
}
Expand All @@ -136,8 +151,17 @@ public void delete(String collectionName, ActionListener<DeleteResponse> listene
collectionName,
ActionListener.wrap(
(deleteResponse) -> {
invalidateSynonymSet(collectionName);
listener.onResponse(deleteResponse);
invalidateSynonymSet(
collectionName,
ActionListener.wrap(
(invalidateResponse) -> {
log.info(
"Expired synonym cache for {} on {} nodes",
collectionName,
invalidateResponse.getNodes().size());
listener.onResponse(deleteResponse);
},
listener::onFailure));
},
listener::onFailure));
}
Expand Down Expand Up @@ -171,10 +195,12 @@ public void get(String collectionName, ActionListener<StoredSynonyms> listener)
listener::onFailure));
}

private void invalidateSynonymSet(String name) {
InvalidateResponse response =
InvalidateAction.INSTANCE.newRequestBuilder(client).setTimeout("10s").setName(name).get();
log.info("Invalidated {} on {} nodes", name, response.getNodes().size());
private void invalidateSynonymSet(String name, ActionListener<InvalidateResponse> callback) {
InvalidateAction.INSTANCE
.newRequestBuilder(client)
.setTimeout("10s")
.setName(name)
.execute(callback);
}

protected void reload(String name) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ public NodeRequest() {}

public NodeRequest(StreamInput in) throws IOException {
super(in);
request = new InvalidateRequest(in);
}

public NodeRequest(final InvalidateRequest request) {
Expand Down
8 changes: 0 additions & 8 deletions src/test/java/io/bonsai/plugins/synonyms/AnalysisTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ public void testSearchAnalysis() throws IOException, InterruptedException {
Assert.assertEquals(200, curlResponse.getHttpStatusCode());
}

// cluster.refresh();
// Thread.sleep(100);

System.err.println("Pre assert search");

response =
cluster.search(
"myindex",
Expand All @@ -85,9 +80,6 @@ public void testSearchAnalysis() throws IOException, InterruptedException {
Assert.assertEquals(200, curlResponse.getHttpStatusCode());
}

// cluster.refresh();
// Thread.sleep(100);

response =
cluster.search(
"myindex",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.codelibs.elasticsearch.runner.ElasticsearchClusterRunner;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.Settings.Builder;
import org.elasticsearch.common.xcontent.XContentType;
import org.junit.After;
import org.junit.Assert;
Expand All @@ -15,11 +16,31 @@ public class BaseClusterTest implements TestSupport {

@Before
public void setUp() {
String clusterName = "es-cl-run-" + System.currentTimeMillis();
cluster = new ElasticsearchClusterRunner();

cluster.onBuild(
new ElasticsearchClusterRunner.Builder() {
@Override
public void build(final int number, final Builder settingsBuilder) {
settingsBuilder.put("http.cors.enabled", true);
settingsBuilder.put("http.cors.allow-origin", "*");
settingsBuilder.putList(
"discovery.seed_hosts", "127.0.0.1:9300", "127.0.0.1:9301", "127.0.0.1:9302");
settingsBuilder.putList("cluster.initial_master_nodes", "127.0.0.1:9300");
}
});

cluster.build(
ElasticsearchClusterRunner.newConfigs()
.numOfNode(1) // Create a test node, default number of node is 3.
.clusterName(clusterName)
.useLogger()
.disableESLogger()
.useLogger()
.numOfNode(3)
.pluginTypes("io.bonsai.plugins.synonyms.StoredSynonymsPlugin"));

cluster.ensureYellow();
}

@After
Expand Down
16 changes: 16 additions & 0 deletions src/test/resources/log4j2.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Logger name="org.elasticsearch" level="warn">
<AppenderRef ref="Console"/>
</Logger>
<Root level="DEBUG">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>

0 comments on commit 74f1525

Please sign in to comment.