Skip to content

Commit

Permalink
[Feature]Support ES query index parameters (#254)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangshenghang authored Jan 7, 2025
1 parent cf1f6c9 commit c09a158
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ public List<String> getTables(
String database,
Map<String, String> option) {
databaseCheck(database);

try (EsRestClient client =
EsRestClient.createInstance(ConfigFactory.parseMap(requestParams))) {
return client.listIndex();
return client.listIndex(option.get("filterName"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.datasource.plugin.elasticsearch.ElasticSearchOptionRule;

import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
Expand Down Expand Up @@ -252,27 +253,7 @@ public void close() {
}

public List<String> listIndex() {
String endpoint = "/_cat/indices?format=json";
Request request = new Request("GET", endpoint);
try {
Response response = restClient.performRequest(request);
if (response == null) {
throw new ResponseException("GET " + endpoint + " response null");
}
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
String entity = EntityUtils.toString(response.getEntity());
return JsonUtils.toList(entity, Map.class).stream()
.map(map -> map.get("index").toString())
.collect(Collectors.toList());
} else {
throw new ResponseException(
String.format(
"GET %s response status code=%d",
endpoint, response.getStatusLine().getStatusCode()));
}
} catch (IOException ex) {
throw new ResponseException(ex);
}
return this.listIndex(null);
}

public void dropIndex(String tableName) {
Expand Down Expand Up @@ -365,4 +346,41 @@ private static Map<String, String> getFieldTypeMappingFromProperties(JsonNode pr
}
return mapping;
}

public List<String> listIndex(String filterName) {
String endpoint = "/_cat/indices?format=json";
Request request = new Request("GET", endpoint);
try {
Response response = restClient.performRequest(request);
if (response == null) {
throw new ResponseException("GET " + endpoint + " response null");
}
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
String entity = EntityUtils.toString(response.getEntity());
List<String> indices =
JsonUtils.toList(entity, Map.class).stream()
.map(map -> map.get("index").toString())
.collect(Collectors.toList());

if (StringUtils.isNotEmpty(filterName)) {
indices =
indices.stream()
.filter(
index ->
index.toLowerCase()
.contains(filterName.toLowerCase()))
.collect(Collectors.toList());
}

return indices;
} else {
throw new ResponseException(
String.format(
"GET %s response status code=%d",
endpoint, response.getStatusLine().getStatusCode()));
}
} catch (IOException ex) {
throw new ResponseException(ex);
}
}
}

0 comments on commit c09a158

Please sign in to comment.