From f9c89450722ad038f55c3073abb18f2304247a5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?A=C3=A9cio=20Santos?= Date: Thu, 7 Jan 2021 12:13:27 -0500 Subject: [PATCH] Refactor Elasticsearch repository and make it wait until the server ready --- CHANGELOG.md | 1 + .../ElasticSearchRestTargetRepository.java | 45 +---------- .../ElasticSearchClientFactory.java | 74 ++++++++++++++----- .../elasticsearch/ElasticSearchConfig.java | 10 +++ config/config_bipartite/docker-compose.yml | 2 +- config/config_docker/docker-compose.yml | 2 +- config/config_docker_tor/docker-compose.yml | 2 +- config/config_kafka/docker-compose.yml | 2 +- config/config_nginx/docker-compose.yml | 2 +- config/config_server/docker-compose.yml | 10 +-- 10 files changed, 81 insertions(+), 69 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0bb5b71e6..a042aa442 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - Reorganized gradle module directory structure - Rename root package to 'achecrawler' - Use multi-stage build to reduce Docker image size +- Refactor Elasticsearch repository and make it wait until the server ready ## Version 0.12.0 (2020-01-18) diff --git a/ache/src/main/java/achecrawler/target/repository/ElasticSearchRestTargetRepository.java b/ache/src/main/java/achecrawler/target/repository/ElasticSearchRestTargetRepository.java index 3f38c406e..9659b6d8d 100644 --- a/ache/src/main/java/achecrawler/target/repository/ElasticSearchRestTargetRepository.java +++ b/ache/src/main/java/achecrawler/target/repository/ElasticSearchRestTargetRepository.java @@ -2,24 +2,16 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; -import java.net.MalformedURLException; -import java.net.URL; import java.net.URLEncoder; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; -import java.util.List; import java.util.Map; -import org.apache.http.HttpHost; -import org.apache.http.client.config.RequestConfig; import org.apache.http.entity.AbstractHttpEntity; import org.apache.http.entity.ContentType; import org.apache.http.nio.entity.NStringEntity; import org.apache.http.util.EntityUtils; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestClientBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,11 +22,12 @@ import achecrawler.target.model.Page; import achecrawler.target.model.TargetModelElasticSearch; +import achecrawler.target.repository.elasticsearch.ElasticSearchClientFactory; import achecrawler.target.repository.elasticsearch.ElasticSearchConfig; import achecrawler.util.CloseableIterator; public class ElasticSearchRestTargetRepository implements TargetRepository { - + private static final Map EMPTY_MAP = Collections.emptyMap(); private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestTargetRepository.class); private static final ObjectMapper mapper = new ObjectMapper(); @@ -50,7 +43,7 @@ public class ElasticSearchRestTargetRepository implements TargetRepository { public ElasticSearchRestTargetRepository(ElasticSearchConfig config) { this.indexName = config.getIndexName(); this.typeName = config.getTypeName(); - this.client = createRestClient(config); + this.client = ElasticSearchClientFactory.createClient(config); this.createIndexMapping(indexName); } @@ -186,38 +179,6 @@ private String serializeAsJson(Object model) { return targetAsJson; } - public RestClient createRestClient(ElasticSearchConfig config) { - - List esHosts = config.getRestApiHosts(); - List hosts = new ArrayList<>(); - for (String host : esHosts) { - try { - URL url = new URL(host); - hosts.add(new HttpHost(url.getHost(), url.getPort())); - } catch (MalformedURLException e) { - throw new RuntimeException("Failed to initialize Elasticsearch REST client. " - + "Invalid host: " + host, e); - } - } - - HttpHost[] httpHostsArray = (HttpHost[]) hosts.toArray(new HttpHost[hosts.size()]); - - client = RestClient.builder(httpHostsArray) - .setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() { - @Override - public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) { - return requestConfigBuilder - .setConnectTimeout(config.getRestConnectTimeout()) - .setSocketTimeout(config.getRestSocketTimeout()); - } - }) - .setMaxRetryTimeoutMillis(config.getRestMaxRetryTimeoutMillis()) - .build(); - - logger.info("Initialized Elasticsearch REST client for hosts: "+Arrays.toString(httpHostsArray)); - return client; - } - @Override public void close() { try { diff --git a/ache/src/main/java/achecrawler/target/repository/elasticsearch/ElasticSearchClientFactory.java b/ache/src/main/java/achecrawler/target/repository/elasticsearch/ElasticSearchClientFactory.java index edb6e44dc..9211b418d 100644 --- a/ache/src/main/java/achecrawler/target/repository/elasticsearch/ElasticSearchClientFactory.java +++ b/ache/src/main/java/achecrawler/target/repository/elasticsearch/ElasticSearchClientFactory.java @@ -1,14 +1,17 @@ package achecrawler.target.repository.elasticsearch; +import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; + import org.apache.http.HttpHost; -import org.apache.http.client.config.RequestConfig.Builder; +import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestClientBuilder.RequestConfigCallback; +import org.elasticsearch.client.RestClientBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,21 +23,58 @@ public static RestClient createClient(ElasticSearchConfig config) { HttpHost[] httpHosts = parseHostAddresses(config.getRestApiHosts()); - RestClient client = RestClient.builder(httpHosts) - .setRequestConfigCallback(new RequestConfigCallback() { - @Override - public Builder customizeRequestConfig( - Builder requestConfigBuilder) { - return requestConfigBuilder - .setConnectTimeout(config.getRestConnectTimeout()) - .setSocketTimeout(config.getRestSocketTimeout()); - } - }) - .setMaxRetryTimeoutMillis(config.getRestMaxRetryTimeoutMillis()) - .build(); - - logger.info("Initialized Elasticsearch REST client for: " + Arrays.toString(httpHosts)); - return client; + final RestClientBuilder builder = RestClient.builder(httpHosts) + .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder + .setConnectTimeout(config.getRestConnectTimeout()) + .setSocketTimeout(config.getRestSocketTimeout()) + ) + .setMaxRetryTimeoutMillis(config.getRestMaxRetryTimeoutMillis()); + + final int connectionTimeout = config.getRestClientInitialConnectionTimeout(); + final long start = System.currentTimeMillis(); + int attempts = 0; + RestClient client = null; + while(true) { + try { + attempts++; + if (client == null) { + client = builder.build(); + logger.info("Initialized Elasticsearch REST client for hosts: " + Arrays.toString(httpHosts)); + } + if (client != null) { + checkRestApi(client); + } + return client; + } catch (Exception e) { + long elapsed = System.currentTimeMillis() - start; + if (elapsed > connectionTimeout) { + String msg = String.format("Failed to connect to Elasticsearch server after %d retries", attempts); + throw new IllegalStateException(msg, e); + } + try { + TimeUnit.MILLISECONDS.sleep(200); + } catch (InterruptedException ex) { + throw new IllegalStateException("Interrupted while trying to connect to Elasticsearch server.", ex); + } + if (attempts % 10 == 0) { + logger.info("Failed to connect to Elasticsearch server (failed attempts: {}). Retrying...", attempts); + } + } + } + } + + private static void checkRestApi(RestClient client) { + String rootEndpoint = "/"; + try { + Response response = client.performRequest("GET", rootEndpoint); + final int statusCode = response.getStatusLine().getStatusCode(); + logger.info(response.getEntity().toString()); + if(statusCode != 200) { + throw new IllegalStateException("Cluster returned non-OK status code: " + statusCode); + } + } catch (IOException e) { + throw new IllegalStateException("Failed to issue request to Elasticsearch REST API.", e); + } } private static HttpHost[] parseHostAddresses(List esHosts) { diff --git a/ache/src/main/java/achecrawler/target/repository/elasticsearch/ElasticSearchConfig.java b/ache/src/main/java/achecrawler/target/repository/elasticsearch/ElasticSearchConfig.java index a50a8491f..59667c713 100644 --- a/ache/src/main/java/achecrawler/target/repository/elasticsearch/ElasticSearchConfig.java +++ b/ache/src/main/java/achecrawler/target/repository/elasticsearch/ElasticSearchConfig.java @@ -21,6 +21,9 @@ public class ElasticSearchConfig { @JsonProperty("target_storage.data_format.elasticsearch.rest.max_retry_timeout_millis") private int restMaxRetryTimeoutMillis = 60000; + @JsonProperty("target_storage.data_format.elasticsearch.rest.initial_connection_timeout") + private int restClientInitialConnectionTimeout = 30000; + // // Index and type parameters // @@ -68,4 +71,11 @@ public void setTypeName(String typeName) { this.typeName = typeName; } + public int getRestClientInitialConnectionTimeout() { + return restClientInitialConnectionTimeout; + } + + public void setRestClientInitialConnectionTimeout(int restClientInitialConnectionTimeout) { + this.restClientInitialConnectionTimeout = restClientInitialConnectionTimeout; + } } diff --git a/config/config_bipartite/docker-compose.yml b/config/config_bipartite/docker-compose.yml index 2a0856e14..f69f1f248 100644 --- a/config/config_bipartite/docker-compose.yml +++ b/config/config_bipartite/docker-compose.yml @@ -2,7 +2,7 @@ version: '2' services: ache: image: vidanyu/ache - entrypoint: sh -c 'sleep 10 && /ache/bin/ache startCrawl -c /config/ -s /config/seeds.txt -o /data -e crawl-data' + command: startCrawl -c /config/ -s /config/seeds.txt -o /data -e crawl-data ports: - "8080:8080" volumes: diff --git a/config/config_docker/docker-compose.yml b/config/config_docker/docker-compose.yml index 1778446b7..2b419ae66 100644 --- a/config/config_docker/docker-compose.yml +++ b/config/config_docker/docker-compose.yml @@ -2,7 +2,7 @@ version: '2' services: ache: image: vidanyu/ache - entrypoint: sh -c 'sleep 10 && /ache/bin/ache startCrawl -c /config/ -s /config/docker.seeds -o /data -e crawl-data' + command: startCrawl -c /config/ -s /config/docker.seeds -o /data -e crawl-data ports: - "8080:8080" volumes: diff --git a/config/config_docker_tor/docker-compose.yml b/config/config_docker_tor/docker-compose.yml index 9a82e068a..8395a144f 100644 --- a/config/config_docker_tor/docker-compose.yml +++ b/config/config_docker_tor/docker-compose.yml @@ -2,7 +2,7 @@ version: '2' services: ache: image: vidanyu/ache - entrypoint: sh -c 'sleep 10 && /ache/bin/ache startCrawl -c /config/ -s /config/tor.seeds -o /data -e tor' + command: startCrawl -c /config/ -s /config/tor.seeds -o /data -e tor ports: - "8080:8080" volumes: diff --git a/config/config_kafka/docker-compose.yml b/config/config_kafka/docker-compose.yml index 7f31584f0..898722682 100644 --- a/config/config_kafka/docker-compose.yml +++ b/config/config_kafka/docker-compose.yml @@ -2,7 +2,7 @@ version: '2' services: ache: image: vidanyu/ache - entrypoint: sh -c '/ache/bin/ache startCrawl -c /config/ -s /config/seeds.txt -o /data' + command: startCrawl -c /config/ -s /config/seeds.txt -o /data ports: - "8080:8080" volumes: diff --git a/config/config_nginx/docker-compose.yml b/config/config_nginx/docker-compose.yml index d09036bd9..e14bc4620 100644 --- a/config/config_nginx/docker-compose.yml +++ b/config/config_nginx/docker-compose.yml @@ -2,7 +2,7 @@ version: '2' services: ache: image: vidanyu/ache - entrypoint: sh -c 'sleep 10 && /ache/bin/ache startServer -c /config/ -d /data' + command: startServer -c /config/ -d /data ports: - "8080:8080" volumes: diff --git a/config/config_server/docker-compose.yml b/config/config_server/docker-compose.yml index 7b004b4c8..81890167b 100644 --- a/config/config_server/docker-compose.yml +++ b/config/config_server/docker-compose.yml @@ -2,14 +2,14 @@ version: '2' services: ache: image: vidanyu/ache - entrypoint: sh -c 'sleep 10 && /ache/bin/ache startServer -c /config -d /data' + command: startServer -c /config -d /data ports: - - "8080:8080" + - "8080:8080" volumes: - - ./data-ache/:/data - - ./:/config + - ./data-ache/:/data + - ./:/config links: - - elasticsearch + - elasticsearch elasticsearch: image: elasticsearch:2.4.5 environment: