Skip to content

Commit

Permalink
use elastic4s client
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenliang123 committed Oct 24, 2023
1 parent 99b3dd3 commit e1a4653
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 66 deletions.
8 changes: 8 additions & 0 deletions dev/dependencyList
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ commons-lang/2.6//commons-lang-2.6.jar
commons-lang3/3.13.0//commons-lang3-3.13.0.jar
commons-logging/1.1.3//commons-logging-1.1.3.jar
derby/10.14.2.0//derby-10.14.2.0.jar
elasticsearch-java/8.10.4//elasticsearch-java-8.10.4.jar
elasticsearch-rest-client/8.10.4//elasticsearch-rest-client-8.10.4.jar
error_prone_annotations/2.14.0//error_prone_annotations-2.14.0.jar
failsafe/2.4.4//failsafe-2.4.4.jar
failureaccess/1.0.1//failureaccess-1.0.1.jar
Expand Down Expand Up @@ -61,7 +63,9 @@ hive-storage-api/2.7.0//hive-storage-api-2.7.0.jar
hk2-api/2.6.1//hk2-api-2.6.1.jar
hk2-locator/2.6.1//hk2-locator-2.6.1.jar
hk2-utils/2.6.1//hk2-utils-2.6.1.jar
httpasyncclient/4.1.5//httpasyncclient-4.1.5.jar
httpclient/4.5.14//httpclient-4.5.14.jar
httpcore-nio/4.4.13//httpcore-nio-4.4.13.jar
httpcore/4.4.16//httpcore-4.4.16.jar
httpmime/4.5.14//httpmime-4.5.14.jar
j2objc-annotations/1.3//j2objc-annotations-1.3.jar
Expand All @@ -77,6 +81,7 @@ jackson-module-jaxb-annotations/2.15.0//jackson-module-jaxb-annotations-2.15.0.j
jackson-module-scala_2.12/2.15.0//jackson-module-scala_2.12-2.15.0.jar
jakarta.annotation-api/1.3.5//jakarta.annotation-api-1.3.5.jar
jakarta.inject/2.6.1//jakarta.inject-2.6.1.jar
jakarta.json-api/2.0.1//jakarta.json-api-2.0.1.jar
jakarta.servlet-api/4.0.4//jakarta.servlet-api-4.0.4.jar
jakarta.validation-api/2.0.2//jakarta.validation-api-2.0.2.jar
jakarta.ws.rs-api/2.1.6//jakarta.ws.rs-api-2.1.6.jar
Expand Down Expand Up @@ -165,8 +170,11 @@ netty-transport/4.1.93.Final//netty-transport-4.1.93.Final.jar
okhttp-urlconnection/3.14.9//okhttp-urlconnection-3.14.9.jar
okhttp/3.12.12//okhttp-3.12.12.jar
okio/1.15.0//okio-1.15.0.jar
opentelemetry-api/1.29.0//opentelemetry-api-1.29.0.jar
opentelemetry-context/1.29.0//opentelemetry-context-1.29.0.jar
osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar
paranamer/2.8//paranamer-2.8.jar
parsson/1.0.0//parsson-1.0.0.jar
perfmark-api/0.25.0//perfmark-api-0.25.0.jar
proto-google-common-protos/2.9.0//proto-google-common-protos-2.9.0.jar
protobuf-java-util/3.21.7//protobuf-java-util-3.21.7.jar
Expand Down
4 changes: 2 additions & 2 deletions kyuubi-events/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
</dependency>

<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<groupId>com.sksamuel.elastic4s</groupId>
<artifactId>elastic4s-client-esjava_${scala.binary.version}</artifactId>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,20 @@

package org.apache.kyuubi.events.handler

// scalastyle:off
import co.elastic.clients.elasticsearch._types.mapping.{DynamicMapping, TypeMapping}
import co.elastic.clients.elasticsearch.ElasticsearchClient
// scalastyle:on
import co.elastic.clients.elasticsearch.core.IndexRequest
import co.elastic.clients.elasticsearch.indices.{CreateIndexRequest, ExistsRequest}
import co.elastic.clients.json.jackson.JacksonJsonpMapper
import co.elastic.clients.transport.rest_client.RestClientTransport
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}
import com.sksamuel.elastic4s.ElasticApi.indexInto
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.requests.common.RefreshPolicy
import com.sksamuel.elastic4s.requests.mappings.MappingDefinition
import com.sksamuel.elastic4s.requests.mappings.dynamictemplate.DynamicMapping
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.util.Preconditions.checkArgument
import org.apache.http.{Header, HttpHost}
import org.apache.http.message.BasicHeader
import org.elasticsearch.client.RestClient
import org.apache.http.HttpHost

import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{SERVER_EVENT_ELASTICSEARCH_API_KEY, SERVER_EVENT_ELASTICSEARCH_INDEX, SERVER_EVENT_ELASTICSEARCH_INDEX_AUTOCREATE_ENABLED}
import org.apache.kyuubi.config.KyuubiConf.{SERVER_EVENT_ELASTICSEARCH_INDEX, SERVER_EVENT_ELASTICSEARCH_INDEX_AUTOCREATE_ENABLED}
import org.apache.kyuubi.events.KyuubiEvent

/**
Expand All @@ -43,41 +40,33 @@ class ElasticSearchLoggingEventHandler(
indexId: String,
hosts: Seq[String],
kyuubiConf: KyuubiConf) extends EventHandler[KyuubiEvent] with Logging {
private lazy val esRestClient: RestClient = {
val serverUrls = hosts.map(url => HttpHost.create(url))
val authorHeader = kyuubiConf.get(SERVER_EVENT_ELASTICSEARCH_API_KEY)
.map(apiKey => new BasicHeader("Authorization", s"ApiKey $apiKey"))
val headers = Seq[Header](authorHeader.orNull).filter(p => p != null).toArray
RestClient.builder(serverUrls: _*)
.setDefaultHeaders(headers)
.build
}

private lazy val esTransport: RestClientTransport =
new RestClientTransport(esRestClient, new JacksonJsonpMapper())

private lazy val esClient: ElasticsearchClient = new ElasticsearchClient(esTransport)

private val isAutoCreateIndex: Boolean =
kyuubiConf.get(SERVER_EVENT_ELASTICSEARCH_INDEX_AUTOCREATE_ENABLED)

private val client: ElasticClient = {
val props = ElasticProperties(HttpHost.create(hosts.mkString(",")).toString)
ElasticClient(JavaClient(props))
}

private def checkIndexExisted: String => Boolean = (indexId: String) => {
checkArgument(
StringUtils.isNotBlank(indexId),
"index name must be configured for ElasticSearchEventHandler, please ensure %s is set",
SERVER_EVENT_ELASTICSEARCH_INDEX.key)
val req = new ExistsRequest.Builder().index(indexId).build()
esClient.indices().exists(req).value()
// check
client.execute {
indexExists(indexId)
}.await.result.isExists
}

private def checkAndEnsureIndex(indexId: String): Unit = {
if (!checkIndexExisted(indexId) && isAutoCreateIndex) {
val typeMapping = new TypeMapping.Builder().dynamic(DynamicMapping.True).build()
val createIndexReq = new CreateIndexRequest.Builder().index(indexId)
.mappings(typeMapping)
.build()
esClient.indices().create(createIndexReq)

// create index
client.execute {
createIndex(indexId)
.mapping(MappingDefinition(dynamic = Some(DynamicMapping.Dynamic)))
}.await
if (!checkIndexExisted(indexId)) {
throwExceptionIndexNotfound(indexId)
}
Expand All @@ -89,19 +78,24 @@ class ElasticSearchLoggingEventHandler(
checkAndEnsureIndex(indexId)

override def apply(event: KyuubiEvent): Unit = {

try {
val indexReq = new IndexRequest.Builder[KyuubiEvent]
.index(indexId).document(event)
.build()
esClient.index(indexReq)
// add document to index
client.execute {
indexInto(indexId)
// .doc(event.toJson)
.fields("a" -> "b")
.refresh(RefreshPolicy.Immediate)
}.await
} catch {
case e: Exception =>
val a = e
error("Failed to send event in ElasticSearchEventHandler", e)
}
}

override def close(): Unit = {
esTransport.close()
client.close()
}

private def throwExceptionIndexNotfound(indexId: String) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@

package org.apache.kyuubi.events.handler

import co.elastic.clients.elasticsearch.ElasticsearchClient
import co.elastic.clients.elasticsearch.core.SearchRequest
import co.elastic.clients.json.jackson.JacksonJsonpMapper
import co.elastic.clients.transport.rest_client.RestClientTransport
import com.dimafeng.testcontainers.ElasticsearchContainer
import com.dimafeng.testcontainers.scalatest.TestContainerForAll
import org.apache.http.HttpHost
import org.elasticsearch.client.RestClient
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.http.JavaClient

import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
Expand All @@ -50,36 +47,32 @@ abstract class ServerElasticSearchLoggingEventHandlerSuite extends WithKyuubiSer
}

override def beforeAll(): Unit = withContainers { esContainer =>
conf.set(SERVER_EVENT_ELASTICSEARCH_SERVER_URLS.key, esContainer.httpHostAddress)
conf.set(SERVER_EVENT_ELASTICSEARCH_SERVER_URLS.key, s"http://${esContainer.httpHostAddress}")
super.beforeAll()
}

test("check server events sent to ElasticSearch index") {
withContainers { elasticSearchContainer =>
val (esClient, esTransport) = getClient(elasticSearchContainer.httpHostAddress)
val client = getClient(elasticSearchContainer.httpHostAddress)
try {
val req = new SearchRequest.Builder().index(destIndex).build()
val docs = esClient.search(req, classOf[String])
val doc = docs.hits.hits().get(0)
val docSource = doc.source()
assert(doc != null)
assertResult("abc")(docSource)
val hits = client.execute {
search(destIndex).matchAllQuery()
}.await.result.hits.hits
assert(hits.length > 0)
val a = 1
} finally {
esTransport.close()
client.close()
}
}
}

def getClient(esServerUrl: String): (ElasticsearchClient, RestClientTransport) = {
val esRestClient: RestClient = RestClient.builder(HttpHost.create(esServerUrl)).build()
val esTransport: RestClientTransport =
new RestClientTransport(esRestClient, new JacksonJsonpMapper())
val esClient: ElasticsearchClient = new ElasticsearchClient(esTransport)
(esClient, esTransport)
private def getClient(hosts: String): ElasticClient = {
val props = ElasticProperties(s"http://$hosts")
ElasticClient(JavaClient(props))
}
}

class ServerElasticSearchLoggingEventHandlerSuite8
extends ServerElasticSearchLoggingEventHandlerSuite {
override val imageTag = "8.10.4"
override val imageTag = "7.17.14"
}
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@
<commons-lang.version>2.6</commons-lang.version>
<commons-lang3.version>3.13.0</commons-lang3.version>
<etcd.version>0.7.3</etcd.version>
<elasticsearch.version>8.10.4</elasticsearch.version>
<elasticsearch.version>7.17.4</elasticsearch.version>
<delta.artifact>delta-core</delta.artifact>
<delta.version>2.4.0</delta.version>
<failsafe.verion>2.4.4</failsafe.verion>
Expand Down Expand Up @@ -1277,8 +1277,8 @@
</dependency>

<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<groupId>com.sksamuel.elastic4s</groupId>
<artifactId>elastic4s-client-esjava_${scala.binary.version}</artifactId>
<version>${elasticsearch.version}</version>
<optional>true</optional>
</dependency>
Expand Down

0 comments on commit e1a4653

Please sign in to comment.