Skip to content

Commit

Permalink
Merge pull request #379 from lichess-org/elastic4s/http4s-client
Browse files Browse the repository at this point in the history
Elastic4s/http4s client
  • Loading branch information
lenguyenthanh authored Nov 30, 2024
2 parents 71556d2 + 02b5c7e commit 06469af
Show file tree
Hide file tree
Showing 13 changed files with 63 additions and 35 deletions.
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ lazy val elastic = project
libraryDependencies ++= Seq(
catsCore,
catsEffect,
elastic4sJavaClient,
http4sClient,
elastic4sCatsEffect,
elastic4sHttp4sClient,
otel4sCore
)
)
Expand Down Expand Up @@ -88,12 +89,14 @@ lazy val ingestor = project
declineCatsEffect,
ducktape,
cirisCore,
cirisHtt4s,
smithy4sCore,
smithy4sJson,
jsoniterCore,
jsoniterMacro,
circe,
http4sServer,
http4sEmberClient,
mongo4catsCore,
mongo4catsCirce,
log4Cats,
Expand Down
9 changes: 6 additions & 3 deletions modules/app/src/main/scala/app.config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import cats.syntax.all.*
import ciris.*
import ciris.http4s.*
import com.comcast.ip4s.*
import org.http4s.Uri
import org.http4s.implicits.*

object AppConfig:

Expand Down Expand Up @@ -38,8 +40,9 @@ object HttpServerConfig:
private def enableDocs = env("HTTP_ENABLE_DOCS").or(prop("http.enable.docs")).as[Boolean].default(false)
def config = (host, port, logger, shutdownTimeout, enableDocs).parMapN(HttpServerConfig.apply)

case class ElasticConfig(uri: String)
case class ElasticConfig(uri: Uri)

object ElasticConfig:
private def uri = env("ELASTIC_URI").or(prop("elastic.uri")).as[String].default("http://127.0.0.1:9200")
def config = uri.map(ElasticConfig.apply)
private def uri =
env("ELASTIC_URI").or(prop("elastic.uri")).as[Uri].default(uri"http://127.0.0.1:9200")
def config = uri.map(ElasticConfig.apply)
10 changes: 8 additions & 2 deletions modules/app/src/main/scala/app.resources.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@ package lila.search
package app

import cats.effect.*
import cats.effect.unsafe.IORuntime
import org.http4s.ember.client.EmberClientBuilder
import org.typelevel.otel4s.metrics.Meter

class AppResources(val esClient: ESClient[IO])

object AppResources:

def instance(conf: AppConfig)(using Meter[IO]): Resource[IO, AppResources] =
ESClient(conf.elastic.uri).map(AppResources.apply)
def instance(conf: AppConfig)(using Meter[IO], IORuntime): Resource[IO, AppResources] =
EmberClientBuilder
.default[IO]
.build
.evalMap(ESClient(_, conf.elastic.uri))
.map(AppResources.apply)
2 changes: 2 additions & 0 deletions modules/app/src/main/scala/app.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package lila.search
package app

import cats.effect.*
import cats.effect.unsafe.IORuntime
import cats.syntax.all.*
import org.typelevel.log4cats.slf4j.Slf4jFactory
import org.typelevel.log4cats.{ Logger, LoggerFactory }
Expand All @@ -16,6 +17,7 @@ object App extends IOApp.Simple:

given LoggerFactory[IO] = Slf4jFactory.create[IO]
given Logger[IO] = LoggerFactory[IO].getLogger
given IORuntime = runtime

override def run: IO[Unit] = app.useForever

Expand Down
3 changes: 2 additions & 1 deletion modules/e2e/src/test/scala/CompatSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.sksamuel.elastic4s.Indexable
import lila.search.app.{ App, AppConfig, AppResources, ElasticConfig, HttpServerConfig }
import lila.search.client.{ SearchClient, SearchError }
import lila.search.spec.{ CountOutput, Query, SearchOutput }
import org.http4s.implicits.*
import org.typelevel.log4cats.noop.{ NoOpFactory, NoOpLogger }
import org.typelevel.log4cats.{ Logger, LoggerFactory }
import org.typelevel.otel4s.metrics.Meter
Expand Down Expand Up @@ -62,7 +63,7 @@ object CompatSuite extends weaver.IOSuite:

def testAppConfig = AppConfig(
server = HttpServerConfig(ip"0.0.0.0", port"9999", false, shutdownTimeout = 1, false),
elastic = ElasticConfig("http://0.0.0.0:9200")
elastic = ElasticConfig(uri"http://0.0.0.0:9200")
)

def fakeClient: ESClient[IO] = new:
Expand Down
2 changes: 1 addition & 1 deletion modules/e2e/src/test/scala/ElasticSearchContainer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object ElasticSearchContainer:
Resource.make(start)(cont => IO(cont.stop()))

def parseConfig(container: GenericContainer): ElasticConfig =
ElasticConfig(s"http://${container.host}:${container.mappedPort(PORT)}")
ElasticConfig(org.http4s.Uri.unsafeFromString(s"http://${container.host}:${container.mappedPort(PORT)}"))

def start: Resource[IO, ElasticConfig] =
container.map(parseConfig)
1 change: 1 addition & 0 deletions modules/e2e/src/test/scala/IntegrationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package lila.search
package app
package test

import cats.effect.unsafe.implicits.global
import cats.effect.{ IO, Resource }
import cats.syntax.all.*
import com.comcast.ip4s.*
Expand Down
31 changes: 15 additions & 16 deletions modules/elastic/src/main/scala/ESClient.scala
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
package lila.search

import cats.effect.*
import cats.effect.unsafe.IORuntime
import cats.syntax.all.*
import com.sksamuel.elastic4s.ElasticDsl.*
import com.sksamuel.elastic4s.cats.effect.instances.*
import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.http4s.Http4sClient
import com.sksamuel.elastic4s.{
ElasticClient,
ElasticDsl,
ElasticProperties,
Executor,
Functor,
Index as ESIndex,
Indexable,
Response
}
import org.http4s.Uri
import org.http4s.client.Client
import org.typelevel.otel4s.metrics.{ Histogram, Meter }
import org.typelevel.otel4s.{ Attribute, AttributeKey, Attributes }

Expand Down Expand Up @@ -49,20 +51,17 @@ object ESClient:
case Resource.ExitCase.Canceled =>
static.added(errorType, "canceled")

def apply(uri: String)(using meter: Meter[IO]): Resource[IO, ESClient[IO]] =
Resource
.make(IO(ElasticClient(JavaClient(ElasticProperties(uri)))))(client => IO(client.close()))
.evalMap: esClient =>
meter
.histogram[Double]("db.client.operation.duration")
.withUnit("ms")
.create
.map(
apply(
esClient,
Attributes(Attribute("db.system", "elasticsearch"), Attribute("server.address", uri))
)
)
def apply(client: Client[IO], uri: Uri)(using Meter[IO], IORuntime): IO[ESClient[IO]] =
Meter[IO]
.histogram[Double]("db.client.operation.duration")
.withUnit("ms")
.create
.map(
apply(
ElasticClient(Http4sClient.usingIO(client, uri)),
Attributes(Attribute("db.system", "elasticsearch"), Attribute("server.address", uri.toString()))
)
)

def apply[F[_]: MonadCancelThrow: Functor: Executor](client: ElasticClient, baseAttributes: Attributes)(
metric: Histogram[F, Double]
Expand Down
10 changes: 7 additions & 3 deletions modules/ingestor/src/main/scala/app.config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ package ingestor
import cats.effect.IO
import cats.syntax.all.*
import ciris.*
import ciris.http4s.*
import org.http4s.Uri
import org.http4s.implicits.*

import java.time.Instant
import scala.concurrent.duration.*
Expand Down Expand Up @@ -39,11 +42,12 @@ object MongoConfig:

def config = (uri, name, studyUri, studyDatabase).parMapN(MongoConfig.apply)

case class ElasticConfig(uri: String)
case class ElasticConfig(uri: Uri)

object ElasticConfig:
private def uri = env("ELASTIC_URI").or(prop("elastic.uri")).as[String].default("http://127.0.0.1:9200")
def config = uri.map(ElasticConfig.apply)
private def uri =
env("ELASTIC_URI").or(prop("elastic.uri")).as[Uri].default(uri"http://127.0.0.1:9200")
def config = uri.map(ElasticConfig.apply)

case class IngestorConfig(
forum: IngestorConfig.Forum,
Expand Down
17 changes: 11 additions & 6 deletions modules/ingestor/src/main/scala/app.resources.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package lila.search
package ingestor

import cats.effect.unsafe.IORuntime
import cats.effect.{ IO, Resource }
import cats.syntax.all.*
import com.mongodb.ReadPreference
import mongo4cats.client.MongoClient
import mongo4cats.database.MongoDatabase
import org.http4s.ember.client.EmberClientBuilder
import org.typelevel.log4cats.Logger
import org.typelevel.otel4s.metrics.Meter

Expand All @@ -19,7 +21,7 @@ class AppResources(

object AppResources:

def instance(conf: AppConfig)(using Logger[IO], Meter[IO]): Resource[IO, AppResources] =
def instance(conf: AppConfig)(using Logger[IO], Meter[IO], IORuntime): Resource[IO, AppResources] =
(
makeMongoClient(conf.mongo),
makeStudyMongoClient(conf.mongo),
Expand All @@ -28,20 +30,23 @@ object AppResources:
KVStore.apply().toResource
).parMapN(AppResources.apply)

def makeElasticClient(conf: ElasticConfig)(using Meter[IO]) =
ESClient(conf.uri)
private def makeElasticClient(conf: ElasticConfig)(using Meter[IO], IORuntime): Resource[IO, ESClient[IO]] =
EmberClientBuilder
.default[IO]
.build
.evalMap(ESClient(_, conf.uri))

def makeMongoClient(conf: MongoConfig) =
private def makeMongoClient(conf: MongoConfig) =
MongoClient
.fromConnectionString[IO](conf.uri)
.evalMap(_.getDatabase(conf.name).map(_.withReadPreference(ReadPreference.secondary())))

def makeStudyMongoClient(conf: MongoConfig) =
private def makeStudyMongoClient(conf: MongoConfig) =
MongoClient
.fromConnectionString[IO](conf.studyUri)
.evalMap(_.getDatabase(conf.studyName))

def makeStudyOplogClient(conf: MongoConfig) =
private def makeStudyOplogClient(conf: MongoConfig) =
MongoClient
.fromConnectionString[IO](conf.studyUri)
.evalMap(_.getDatabase("local"))
2 changes: 2 additions & 0 deletions modules/ingestor/src/main/scala/app.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package lila.search
package ingestor

import cats.effect.*
import cats.effect.unsafe.IORuntime
import org.typelevel.log4cats.slf4j.Slf4jFactory
import org.typelevel.log4cats.{ Logger, LoggerFactory }
import org.typelevel.otel4s.experimental.metrics.*
Expand All @@ -13,6 +14,7 @@ object App extends IOApp.Simple:

given LoggerFactory[IO] = Slf4jFactory.create[IO]
given Logger[IO] = LoggerFactory[IO].getLogger
given IORuntime = runtime

override def run: IO[Unit] = app.useForever

Expand Down
2 changes: 2 additions & 0 deletions modules/ingestor/src/main/scala/cli.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ingestor

import cats.data.Validated
import cats.effect.*
import cats.effect.unsafe.IORuntime
import cats.syntax.all.*
import com.monovore.decline.*
import com.monovore.decline.effect.*
Expand All @@ -23,6 +24,7 @@ object cli
given LoggerFactory[IO] = Slf4jFactory.create[IO]
given Logger[IO] = LoggerFactory[IO].getLogger
given Meter[IO] = Meter.noop[IO]
given IORuntime = runtime

override def main: Opts[IO[ExitCode]] =
opts.parse.map: opts =>
Expand Down
4 changes: 2 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ object Dependencies {
val chess = "16.2.4"
val ciris = "3.6.0"
val decline = "2.4.1"
val elastic4s = "8.15.3"
val elastic4s = "8.16.0"
val fs2 = "3.11.0"
val http4s = "0.23.29"
val iron = "2.5.0"
Expand Down Expand Up @@ -49,7 +49,7 @@ object Dependencies {

val playWS = "com.typesafe.play" %% "play-ahc-ws-standalone" % "2.2.10"

val elastic4sJavaClient = "nl.gn0s1s" %% "elastic4s-client-esjava" % V.elastic4s
val elastic4sHttp4sClient = "nl.gn0s1s" %% "elastic4s-client-http4s" % V.elastic4s
val elastic4sCatsEffect = "nl.gn0s1s" %% "elastic4s-effect-cats" % V.elastic4s

val mongo4catsCore = "io.github.kirill5k" %% "mongo4cats-core" % V.mongo4cats
Expand Down

0 comments on commit 06469af

Please sign in to comment.