diff --git a/modules/app/src/main/scala/app.scala b/modules/app/src/main/scala/app.scala index ce81afc..6c5e4b6 100644 --- a/modules/app/src/main/scala/app.scala +++ b/modules/app/src/main/scala/app.scala @@ -3,7 +3,7 @@ package app import cats.effect.* import cats.syntax.all.* -import org.typelevel.log4cats.slf4j.{ Slf4jFactory, Slf4jLogger } +import org.typelevel.log4cats.slf4j.Slf4jFactory import org.typelevel.log4cats.{ Logger, LoggerFactory } import org.typelevel.otel4s.experimental.metrics.* import org.typelevel.otel4s.metrics.Meter @@ -14,8 +14,8 @@ import org.typelevel.otel4s.sdk.metrics.exporter.MetricExporter object App extends IOApp.Simple: - given Logger[IO] = Slf4jLogger.getLogger[IO] given LoggerFactory[IO] = Slf4jFactory.create[IO] + given Logger[IO] = LoggerFactory[IO].getLogger override def run: IO[Unit] = app.useForever diff --git a/modules/app/src/main/scala/service.health.scala b/modules/app/src/main/scala/service.health.scala index bbde604..2956456 100644 --- a/modules/app/src/main/scala/service.health.scala +++ b/modules/app/src/main/scala/service.health.scala @@ -8,7 +8,7 @@ import org.typelevel.log4cats.{ Logger, LoggerFactory } class HealthServiceImpl(esClient: ESClient[IO])(using LoggerFactory[IO]) extends HealthService[IO]: - given logger: Logger[IO] = summon[LoggerFactory[IO]].getLogger + given logger: Logger[IO] = LoggerFactory[IO].getLogger override def healthCheck(): IO[HealthCheckOutput] = esClient.status diff --git a/modules/app/src/main/scala/service.search.scala b/modules/app/src/main/scala/service.search.scala index 0b8a5fa..135c19a 100644 --- a/modules/app/src/main/scala/service.search.scala +++ b/modules/app/src/main/scala/service.search.scala @@ -2,7 +2,6 @@ package lila.search package app import cats.effect.* -import com.sksamuel.elastic4s.Indexable import io.github.arainko.ducktape.* import lila.search.forum.Forum import lila.search.game.Game @@ -11,7 +10,6 @@ import lila.search.study.Study import lila.search.team.Team import org.typelevel.log4cats.{ Logger, LoggerFactory } import smithy4s.Timestamp -import smithy4s.schema.Schema import java.time.Instant @@ -19,7 +17,7 @@ class SearchServiceImpl(esClient: ESClient[IO])(using LoggerFactory[IO]) extends import SearchServiceImpl.given - given logger: Logger[IO] = summon[LoggerFactory[IO]].getLogger + given logger: Logger[IO] = LoggerFactory[IO].getLogger override def count(query: Query): IO[CountOutput] = esClient @@ -68,8 +66,3 @@ object SearchServiceImpl: case _: Query.Game => Index.Game case _: Query.Study => Index.Study case _: Query.Team => Index.Team - - import smithy4s.json.Json.given - import com.github.plokhotnyuk.jsoniter_scala.core.* - - given [A: Schema]: Indexable[A] = (a: A) => writeToString(a) diff --git a/modules/e2e/src/test/scala/IntegrationSuite.scala b/modules/e2e/src/test/scala/IntegrationSuite.scala index 5957f32..717a2e8 100644 --- a/modules/e2e/src/test/scala/IntegrationSuite.scala +++ b/modules/e2e/src/test/scala/IntegrationSuite.scala @@ -5,7 +5,7 @@ package test import cats.effect.{ IO, Resource } import cats.syntax.all.* import com.comcast.ip4s.* -import lila.search.ingestor.given +import lila.search.ingestor.Ingestor.given import lila.search.spec.* import org.http4s.Uri import org.typelevel.log4cats.noop.{ NoOpFactory, NoOpLogger } @@ -41,7 +41,7 @@ object IntegrationSuite extends IOSuite: def testAppConfig(elastic: ElasticConfig) = AppConfig( server = - HttpServerConfig(ip"0.0.0.0", port"9999", apiLogger = false, shutdownTimeout = 30, enableDocs = false), + HttpServerConfig(ip"0.0.0.0", port"9999", apiLogger = false, shutdownTimeout = 1, enableDocs = false), elastic = elastic ) diff --git a/modules/ingestor/src/main/scala/Repo.scala b/modules/ingestor/src/main/scala/Repo.scala new file mode 100644 index 0000000..a111cc3 --- /dev/null +++ b/modules/ingestor/src/main/scala/Repo.scala @@ -0,0 +1,45 @@ +package lila.search +package ingestor + +import cats.effect.IO + +import java.time.Instant + +trait Repo[A]: + def watch(since: Option[Instant]): fs2.Stream[IO, Repo.Result[A]] + def fetch(since: Instant, until: Instant): fs2.Stream[IO, Repo.Result[A]] + +object Repo: + type SourceWithId[A] = (String, A) + case class Result[A](toIndex: List[SourceWithId[A]], toDelete: List[Id], timestamp: Option[Instant]) + + import cats.effect.IO + import mongo4cats.bson.Document + import mongo4cats.collection.GenericMongoCollection + import mongo4cats.models.collection.ChangeStreamDocument + import mongo4cats.operations.Filter + import org.bson.BsonTimestamp + + import java.time.Instant + + val _id = "_id" + + type MongoCollection = GenericMongoCollection[IO, Document, [A] =>> fs2.Stream[IO, A]] + + given [A]: HasDocId[ChangeStreamDocument[A]] with + extension (change: ChangeStreamDocument[A]) + def docId: Option[String] = + change.documentKey.flatMap(_.id) + + extension (doc: Document) + def id: Option[String] = + doc.getString(_id) + + extension (instant: Instant) + inline def asBsonTimestamp: BsonTimestamp = BsonTimestamp(instant.getEpochSecond.toInt, 1) + + def range(field: String)(since: Instant, until: Option[Instant]): Filter = + inline def gtes = Filter.gte(field, since) + until.fold(gtes)(until => gtes.and(Filter.lt(field, until))) + + extension (s: String) def dollarPrefix = "$" + s diff --git a/modules/ingestor/src/main/scala/app.scala b/modules/ingestor/src/main/scala/app.scala index acf52cd..333bb75 100644 --- a/modules/ingestor/src/main/scala/app.scala +++ b/modules/ingestor/src/main/scala/app.scala @@ -2,7 +2,7 @@ package lila.search package ingestor import cats.effect.* -import org.typelevel.log4cats.slf4j.{ Slf4jFactory, Slf4jLogger } +import org.typelevel.log4cats.slf4j.Slf4jFactory import org.typelevel.log4cats.{ Logger, LoggerFactory } import org.typelevel.otel4s.experimental.metrics.* import org.typelevel.otel4s.metrics.Meter @@ -11,8 +11,8 @@ import org.typelevel.otel4s.sdk.metrics.SdkMetrics object App extends IOApp.Simple: - given Logger[IO] = Slf4jLogger.getLogger[IO] given LoggerFactory[IO] = Slf4jFactory.create[IO] + given Logger[IO] = LoggerFactory[IO].getLogger override def run: IO[Unit] = app.useForever @@ -33,7 +33,7 @@ object App extends IOApp.Simple: class IngestorApp(res: AppResources, config: AppConfig)(using Logger[IO], LoggerFactory[IO]): def run(): Resource[IO, Unit] = - Ingestor(res.lichess, res.study, res.studyLocal, res.elastic, res.store, config.ingestor) + Ingestors(res.lichess, res.study, res.studyLocal, res.store, res.elastic, config.ingestor) .flatMap(_.run()) .toResource .evalTap(_ => Logger[IO].info("Ingestor started")) diff --git a/modules/ingestor/src/main/scala/cli.scala b/modules/ingestor/src/main/scala/cli.scala index 4ecadad..271857c 100644 --- a/modules/ingestor/src/main/scala/cli.scala +++ b/modules/ingestor/src/main/scala/cli.scala @@ -7,7 +7,7 @@ import cats.syntax.all.* import com.monovore.decline.* import com.monovore.decline.effect.* import lila.search.ingestor.opts.{ IndexOpts, WatchOpts } -import org.typelevel.log4cats.slf4j.{ Slf4jFactory, Slf4jLogger } +import org.typelevel.log4cats.slf4j.Slf4jFactory import org.typelevel.log4cats.{ Logger, LoggerFactory } import org.typelevel.otel4s.metrics.Meter @@ -20,36 +20,29 @@ object cli version = "3.0.0" ): - given Logger[IO] = Slf4jLogger.getLogger[IO] given LoggerFactory[IO] = Slf4jFactory.create[IO] + given Logger[IO] = LoggerFactory[IO].getLogger given Meter[IO] = Meter.noop[IO] override def main: Opts[IO[ExitCode]] = opts.parse.map: opts => - makeExecutor.use(_.execute(opts).as(ExitCode.Success)) + makeIngestor.use(_.execute(opts).as(ExitCode.Success)) - def makeExecutor: Resource[IO, Executor] = + def makeIngestor: Resource[IO, Ingestors] = for config <- AppConfig.load.toResource res <- AppResources.instance(config) - forum <- ForumIngestor(res.lichess, res.elastic, res.store, config.ingestor.forum).toResource - team <- TeamIngestor(res.lichess, res.elastic, res.store, config.ingestor.team).toResource - study <- StudyIngestor( + ingestor <- Ingestors( + res.lichess, res.study, res.studyLocal, - res.elastic, res.store, - config.ingestor.study + res.elastic, + config.ingestor ).toResource - game <- GameIngestor(res.lichess, res.elastic, res.store, config.ingestor.game).toResource - yield Executor(forum, study, game, team) - - class Executor( - val forum: ForumIngestor, - val study: StudyIngestor, - val game: GameIngestor, - val team: TeamIngestor - ): + yield ingestor + + extension (ingestor: Ingestors) def execute(opts: IndexOpts | WatchOpts): IO[Unit] = opts match case opts: IndexOpts => index(opts) @@ -58,28 +51,38 @@ object cli def index(opts: IndexOpts): IO[Unit] = opts.index match case Index.Forum => - forum.run(opts.since, opts.until, opts.dry).compile.drain + ingestor.forum.run(opts.since, opts.until, opts.dry) case Index.Study => - study.run(opts.since, opts.until, opts.dry).compile.drain + ingestor.study.run(opts.since, opts.until, opts.dry) case Index.Game => - game.run(opts.since, opts.until, opts.dry).compile.drain + ingestor.game.run(opts.since, opts.until, opts.dry) case Index.Team => - team.run(opts.since, opts.until, opts.dry).compile.drain + ingestor.team.run(opts.since, opts.until, opts.dry) case _ => - forum.run(opts.since, opts.until, opts.dry).compile.drain *> - study.run(opts.since, opts.until, opts.dry).compile.drain *> - game.run(opts.since, opts.until, opts.dry).compile.drain *> - team.run(opts.since, opts.until, opts.dry).compile.drain + ingestor.forum.run(opts.since, opts.until, opts.dry) *> + ingestor.study.run(opts.since, opts.until, opts.dry) *> + ingestor.game.run(opts.since, opts.until, opts.dry) *> + ingestor.team.run(opts.since, opts.until, opts.dry) def watch(opts: WatchOpts): IO[Unit] = opts.index match case Index.Game => - game.watch(opts.since.some, opts.dry).compile.drain - case _ => IO.println("We only support game watch for now") + ingestor.game.watch(opts.since.some, opts.dry) + case Index.Forum => + ingestor.forum.watch(opts.since.some, opts.dry) + case Index.Team => + ingestor.team.watch(opts.since.some, opts.dry) + case Index.Study => + ingestor.study.watch(opts.since.some, opts.dry) + case _ => + ingestor.forum.watch(opts.since.some, opts.dry) *> + ingestor.team.watch(opts.since.some, opts.dry) *> + ingestor.study.watch(opts.since.some, opts.dry) *> + ingestor.game.watch(opts.since.some, opts.dry) object opts: case class IndexOpts(index: Index | Unit, since: Instant, until: Instant, dry: Boolean) - case class WatchOpts(index: Index, since: Instant, dry: Boolean) + case class WatchOpts(index: Index | Unit, since: Instant, dry: Boolean) def parse = Opts.subcommand("index", "index documents")(indexOpt) <+> Opts.subcommand("watch", "watch change events and index documents")(watchOpt) @@ -128,12 +131,7 @@ object opts: ) val watchOpt = ( - Opts.option[Index]( - long = "index", - help = "Target index (only `game` for now)", - short = "i", - metavar = "forum|team|study|game" - ), + singleIndexOpt orElse allIndexOpt, Opts .option[Instant]( long = "since", diff --git a/modules/ingestor/src/main/scala/ingestor.scala b/modules/ingestor/src/main/scala/ingestor.scala index bc85b88..27fbd8f 100644 --- a/modules/ingestor/src/main/scala/ingestor.scala +++ b/modules/ingestor/src/main/scala/ingestor.scala @@ -3,33 +3,91 @@ package ingestor import cats.effect.* import cats.syntax.all.* -import mongo4cats.database.MongoDatabase -import org.typelevel.log4cats.LoggerFactory +import com.github.plokhotnyuk.jsoniter_scala.core.* +import com.sksamuel.elastic4s.Indexable +import org.typelevel.log4cats.syntax.* +import org.typelevel.log4cats.{ Logger, LoggerFactory } +import smithy4s.json.Json.given +import smithy4s.schema.Schema + +import java.time.Instant trait Ingestor: - def run(): IO[Unit] + // watch change events from database and ingest documents into elastic search + def watch: IO[Unit] + // Similar to watch but started from a given timestamp + def watch(since: Option[Instant], dryRun: Boolean): IO[Unit] + // Fetch documents in [since, until] and ingest into elastic search + def run(since: Instant, until: Instant, dryRun: Boolean): IO[Unit] object Ingestor: - def apply( - lichess: MongoDatabase[IO], - study: MongoDatabase[IO], - local: MongoDatabase[IO], - elastic: ESClient[IO], + given [A: Schema]: Indexable[A] = (a: A) => writeToString(a) + + def apply[A: Schema]( + index: Index, + repo: Repo[A], store: KVStore, - config: IngestorConfig - )(using LoggerFactory[IO]): IO[Ingestor] = - ( - ForumIngestor(lichess, elastic, store, config.forum), - TeamIngestor(lichess, elastic, store, config.team), - StudyIngestor(study, local, elastic, store, config.study), - GameIngestor(lichess, elastic, store, config.game) - ).mapN: (forum, team, study, game) => - new Ingestor: - def run() = - fs2 - .Stream(forum.watch, team.watch, study.watch, game.watch) - .covary[IO] - .parJoinUnbounded - .compile - .drain + elastic: ESClient[IO], + defaultStartAt: Option[Instant] + )(using LoggerFactory[IO]): Ingestor = new: + given Logger[IO] = LoggerFactory[IO].getLogger + + def watch: IO[Unit] = + fs2.Stream + .eval(startAt) + .flatMap(repo.watch) + .evalMap: result => + updateElastic(result, false) *> saveLastIndexedTimestamp(index, result.timestamp) + .compile + .drain + + def watch(since: Option[Instant], dryRun: Boolean): IO[Unit] = + repo + .watch(since) + .evalMap(updateElastic(_, dryRun)) + .compile + .drain + + def run(since: Instant, until: Instant, dryRun: Boolean): IO[Unit] = + repo + .fetch(since, until) + .evalMap(updateElastic(_, dryRun)) + .compile + .drain + + private def updateElastic(result: Repo.Result[A], dryRun: Boolean): IO[Unit] = + dryRun.fold( + info"Would index total ${result.toIndex.size} games and delete ${result.toDelete.size} games" *> + result.toIndex.traverse_(x => debug"Would index $x") + *> result.toDelete.traverse_(x => debug"Would delete $x"), + storeBulk(index, result.toIndex) + *> deleteMany(index, result.toDelete) + ) + + private def startAt: IO[Option[Instant]] = + defaultStartAt + .fold(store.get(index.value))(_.some.pure[IO]) + .flatTap(since => info"Starting ${index.value} ingestor from $since") + + private def deleteMany(index: Index, ids: List[Id]): IO[Unit] = + elastic + .deleteMany(index, ids) + .flatTap(_ => Logger[IO].info(s"Deleted ${ids.size} ${index.value}s")) + .handleErrorWith: e => + Logger[IO].error(e)(s"Failed to delete ${index.value}: ${ids.map(_.value).mkString(", ")}") + .whenA(ids.nonEmpty) + + private def storeBulk(index: Index, sources: List[(String, A)]): IO[Unit] = + Logger[IO].info(s"Received ${sources.size} docs to ${index.value}") *> + elastic + .storeBulk(index, sources) + .handleErrorWith: e => + Logger[IO].error(e)(s"Failed to ${index.value} index: ${sources.map(_._1).mkString(", ")}") + .whenA(sources.nonEmpty) + *> Logger[IO].info(s"Indexed ${sources.size} ${index.value}s") + + private def saveLastIndexedTimestamp(index: Index, time: Option[Instant]): IO[Unit] = + val savedTime = time.getOrElse(Instant.now()) + store.put(index.value, savedTime) + *> Logger[IO].info(s"Stored last indexed time ${savedTime.getEpochSecond} for $index") diff --git a/modules/ingestor/src/main/scala/ingestors.scala b/modules/ingestor/src/main/scala/ingestors.scala new file mode 100644 index 0000000..9a36057 --- /dev/null +++ b/modules/ingestor/src/main/scala/ingestors.scala @@ -0,0 +1,39 @@ +package lila.search +package ingestor + +import cats.effect.* +import cats.syntax.all.* +import mongo4cats.database.MongoDatabase +import org.typelevel.log4cats.LoggerFactory + +class Ingestors( + val forum: Ingestor, + val study: Ingestor, + val game: Ingestor, + val team: Ingestor +): + def run(): IO[Unit] = + List(forum.watch, team.watch, study.watch, game.watch).parSequence_ + +object Ingestors: + + def apply( + lichess: MongoDatabase[IO], + study: MongoDatabase[IO], + local: MongoDatabase[IO], + store: KVStore, + elastic: ESClient[IO], + config: IngestorConfig + )(using LoggerFactory[IO]): IO[Ingestors] = + ( + ForumRepo(lichess, config.forum), + StudyRepo(study, local, config.study), + GameRepo(lichess, config.game), + TeamRepo(lichess, config.team) + ).mapN: (forums, studies, games, teams) => + new Ingestors( + Ingestor(Index.Forum, forums, store, elastic, config.forum.startAt), + Ingestor(Index.Study, studies, store, elastic, config.study.startAt), + Ingestor(Index.Game, games, store, elastic, config.game.startAt), + Ingestor(Index.Team, teams, store, elastic, config.team.startAt) + ) diff --git a/modules/ingestor/src/main/scala/mongo.chapter.scala b/modules/ingestor/src/main/scala/mongo.chapter.scala index 2d87eb2..85d265b 100644 --- a/modules/ingestor/src/main/scala/mongo.chapter.scala +++ b/modules/ingestor/src/main/scala/mongo.chapter.scala @@ -11,6 +11,8 @@ import mongo4cats.database.MongoDatabase import mongo4cats.operations.{ Accumulator, Aggregate, Filter } import org.typelevel.log4cats.Logger +import Repo.* + trait ChapterRepo: // Aggregate chapters data and convert them to StudyChapterText by their study ids def byStudyIds(ids: List[String]): IO[Map[String, StudyData]] @@ -108,8 +110,6 @@ object ChapterRepo: def byStudyIds(ids: List[String]): IO[Map[String, StudyData]] = coll .aggregateWithCodec[StudyData](Query.aggregate(ids)) - .stream - .compile - .toList + .all .flatTap(docs => Logger[IO].debug(s"Received $docs chapters")) .map(_.map(x => x._id -> x).toMap) diff --git a/modules/ingestor/src/main/scala/ingestor.forum.scala b/modules/ingestor/src/main/scala/mongo.forum.scala similarity index 62% rename from modules/ingestor/src/main/scala/ingestor.forum.scala rename to modules/ingestor/src/main/scala/mongo.forum.scala index 1d5c73b..eb867d7 100644 --- a/modules/ingestor/src/main/scala/ingestor.forum.scala +++ b/modules/ingestor/src/main/scala/mongo.forum.scala @@ -15,15 +15,9 @@ import org.typelevel.log4cats.{ Logger, LoggerFactory } import java.time.Instant import scala.concurrent.duration.* -trait ForumIngestor: - // watch change events from MongoDB and ingest forum posts into elastic search - def watch: fs2.Stream[IO, Unit] - // Fetch posts in [since, until] and ingest into elastic search - def run(since: Instant, until: Instant, dryRun: Boolean): fs2.Stream[IO, Unit] +import Repo.{ *, given } -object ForumIngestor: - - private val index = Index.Forum +object ForumRepo: private val interestedOperations = List(DELETE, INSERT, REPLACE, UPDATE).map(_.getValue) @@ -43,74 +37,37 @@ object ForumIngestor: private def aggregate(maxPostLength: Int) = Aggregate.matchBy(eventFilter(maxPostLength)).combinedWith(Aggregate.project(eventProjection)) - def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Forum)( - using LoggerFactory[IO] - ): IO[ForumIngestor] = - given Logger[IO] = summon[LoggerFactory[IO]].getLogger - (mongo.getCollection("f_topic"), mongo.getCollection("f_post")).mapN(apply(elastic, store, config)) + def apply(mongo: MongoDatabase[IO], config: IngestorConfig.Forum)(using + LoggerFactory[IO] + ): IO[Repo[ForumSource]] = + given Logger[IO] = LoggerFactory[IO].getLogger + (mongo.getCollection("f_topic"), mongo.getCollection("f_post")).mapN(apply(config)) - def apply(elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Forum)( + def apply(config: IngestorConfig.Forum)( topics: MongoCollection, posts: MongoCollection - )(using Logger[IO]): ForumIngestor = new: - - def watch: fs2.Stream[IO, Unit] = - fs2.Stream - .eval(startAt.flatTap(since => info"Starting forum ingestor from $since")) - .flatMap: last => - changes(last) - .evalMap: events => - val lastEventTimestamp = events.flatten(_.clusterTime.flatMap(_.asInstant)).maxOption - val (toDelete, toIndex) = events.partition(_.isDelete) - storeBulk(toIndex.flatten(_.fullDocument)) - *> elastic.deleteMany(index, toDelete) - *> saveLastIndexedTimestamp(lastEventTimestamp.getOrElse(Instant.now())) - - def run(since: Instant, until: Instant, dryRun: Boolean): fs2.Stream[IO, Unit] = + )(using Logger[IO]): Repo[ForumSource] = new: + + def fetch(since: Instant, until: Instant) = val filter = range(F.createdAt)(since, until.some) .or(range(F.updatedAt)(since, until.some)) .or(range(F.erasedAt)(since, until.some)) - posts - .find(filter) - .projection(postProjection) - .boundedStream(config.batchSize) - .filter(_.validText) - .chunkN(config.batchSize) - .map(_.toList) - .metered(1.second) // to avoid overloading the elasticsearch - .evalMap: docs => - val (toDelete, toIndex) = docs.partition(_.isErased) - dryRun.fold( - toIndex.traverse_(doc => debug"Would index $doc") - *> toDelete.traverse_(doc => debug"Would delete $doc"), - storeBulk(toIndex) *> elastic.deleteMany(index, toDelete) - ) - - private def storeBulk(docs: List[Document]): IO[Unit] = - info"Received ${docs.size} forum posts to index" *> - docs.toSources - .flatMap: sources => - elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} forum posts" - .handleErrorWith: e => - Logger[IO].error(e)(s"Failed to index forum posts: ${docs.map(_.id).mkString(", ")}") - .whenA(docs.nonEmpty) - - private def saveLastIndexedTimestamp(time: Instant): IO[Unit] = - store.put(index.value, time) - *> info"Stored last indexed time ${time.getEpochSecond} for $index" - - private def startAt: IO[Option[Instant]] = - config.startAt.fold(store.get(index.value))(_.some.pure[IO]) - - // Fetches topic names by their ids - private def topicByIds(ids: Seq[String]): IO[Map[String, String]] = - topics - .find(Filter.in(_id, ids)) - .projection(Projection.include(List(_id, Topic.name))) - .all - .map(_.map(doc => (doc.id, doc.getString(Topic.name)).mapN(_ -> _)).flatten.toMap) - - private def changes(since: Option[Instant]): fs2.Stream[IO, List[ChangeStreamDocument[Document]]] = + fs2.Stream.eval(info"Fetching teams from $since to $until") *> + posts + .find(filter) + .projection(postProjection) + .boundedStream(config.batchSize) + .filter(_.validText) + .chunkN(config.batchSize) + .map(_.toList) + .metered(1.second) + .evalMap: events => + val (toDelete, toIndex) = events.partition(_.isErased) + toIndex.toSources + .map: sources => + Result(sources, toDelete.flatten(_.id.map(Id.apply)), none) + + def watch(since: Option[Instant]): fs2.Stream[IO, Result[ForumSource]] = val builder = posts.watch(aggregate(config.maxPostLength)) // skip the first event if we're starting from a specific timestamp // since the event at that timestamp is already indexed @@ -124,11 +81,25 @@ object ForumIngestor: .groupWithin(config.batchSize, config.timeWindows.second) .evalTap(_.traverse_(x => debug"received $x")) .map(_.toList.distincByDocId) + .evalMap: events => + val lastEventTimestamp = events.flatten(_.clusterTime.flatMap(_.asInstant)).maxOption + val (toDelete, toIndex) = events.partition(_.isDelete) + toIndex + .flatten(_.fullDocument) + .toSources + .map: sources => + Result(sources, toDelete.flatten(_.docId.map(Id.apply)), lastEventTimestamp) - private type SourceWithId = (String, ForumSource) + // Fetches topic names by their ids + private def topicByIds(ids: Seq[String]): IO[Map[String, String]] = + topics + .find(Filter.in(_id, ids)) + .projection(Projection.include(List(_id, Topic.name))) + .all + .map(_.map(doc => (doc.id, doc.getString(Topic.name)).mapN(_ -> _)).flatten.toMap) extension (events: List[Document]) - private def toSources: IO[List[SourceWithId]] = + private def toSources: IO[List[SourceWithId[ForumSource]]] = val topicIds = events.flatMap(_.topicId).distinct topicIds.isEmpty.fold( info"no topics found for posts: $events".as(Nil), @@ -141,7 +112,7 @@ object ForumIngestor: extension (doc: Document) - private def toSource(topicMap: Map[String, String]): IO[Option[SourceWithId]] = + private def toSource(topicMap: Map[String, String]): IO[Option[SourceWithId[ForumSource]]] = (doc.id, doc.topicId) .flatMapN: (id, topicId) => doc.toSource(topicMap.get(topicId), topicId).map(id -> _) diff --git a/modules/ingestor/src/main/scala/ingestor.game.scala b/modules/ingestor/src/main/scala/mongo.game.scala similarity index 74% rename from modules/ingestor/src/main/scala/ingestor.game.scala rename to modules/ingestor/src/main/scala/mongo.game.scala index 2fb30e1..2892af5 100644 --- a/modules/ingestor/src/main/scala/ingestor.game.scala +++ b/modules/ingestor/src/main/scala/mongo.game.scala @@ -20,17 +20,9 @@ import org.typelevel.log4cats.{ Logger, LoggerFactory } import java.time.Instant import scala.concurrent.duration.* -trait GameIngestor: - // watch change events from game5 collection and ingest games into elastic search - def watch: fs2.Stream[IO, Unit] - // Similar to watch but started from a given timestamp - def watch(since: Option[Instant], dryRun: Boolean): fs2.Stream[IO, Unit] - // Fetch posts in [since, until] and ingest into elastic search - def run(since: Instant, until: Instant, dryRun: Boolean): fs2.Stream[IO, Unit] +import Repo.{ *, given } -object GameIngestor: - - private val index = Index.Game +object GameRepo: private val interestedOperations = List(UPDATE, DELETE).map(_.getValue) private val eventFilter = Filter.in("operationType", interestedOperations) @@ -70,63 +62,39 @@ object GameIngestor: private val aggregate = Aggregate.matchBy(eventFilter.and(changeFilter)).combinedWith(Aggregate.project(eventProjection)) - def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Game)( - using LoggerFactory[IO] - ): IO[GameIngestor] = - given Logger[IO] = summon[LoggerFactory[IO]].getLogger - mongo.getCollectionWithCodec[DbGame]("game5").map(apply(elastic, store, config)) - - def apply( - elastic: ESClient[IO], - store: KVStore, - config: IngestorConfig.Game - )(games: MongoCollection[IO, DbGame])(using Logger[IO]): GameIngestor = new: + def apply(mongo: MongoDatabase[IO], config: IngestorConfig.Game)(using + LoggerFactory[IO] + ): IO[Repo[GameSource]] = + given Logger[IO] = LoggerFactory[IO].getLogger + mongo.getCollectionWithCodec[DbGame]("game5").map(apply(config)) - def watch: fs2.Stream[IO, Unit] = - fs2.Stream - .eval(startAt.flatTap(since => info"Starting game ingestor from $since")) - .flatMap(watch(_, dryRun = false)) + def apply(config: IngestorConfig.Game)(games: MongoCollection[IO, DbGame])(using + Logger[IO] + ): Repo[GameSource] = new: - def watch(since: Option[Instant], dryRun: Boolean): fs2.Stream[IO, Unit] = + def watch(since: Option[Instant]): fs2.Stream[IO, Result[GameSource]] = changes(since) - .evalMap: events => + .map: events => val lastEventTimestamp = events.lastOption.flatMap(_.clusterTime).flatMap(_.asInstant) val (toDelete, toIndex) = events.partition(_.operationType == DELETE) - dryRun.fold( - info"Would index total ${toIndex.size} games and delete ${toDelete.size} games" *> - toIndex.flatMap(_.fullDocument).traverse_(x => debug"Would index ${x.debug}") - *> toDelete.traverse_(x => debug"Would delete ${x.docId}"), - storeBulk(toIndex.flatten(_.fullDocument)) - *> elastic.deleteMany(index, toDelete) - *> saveLastIndexedTimestamp(lastEventTimestamp.getOrElse(Instant.now)) + Result( + toIndex.flatten(_.fullDocument.map(_.toSource)), + toDelete.flatten(_.docId.map(Id.apply)), + lastEventTimestamp ) - def run(since: Instant, until: Instant, dryRun: Boolean): fs2.Stream[IO, Unit] = + def fetch(since: Instant, until: Instant): fs2.Stream[IO, Result[GameSource]] = val filter = range(F.createdAt)(since, until.some) .or(range(F.updatedAt)(since, until.some)) - games - .find(filter.and(gameFilter)) - // .projection(postProjection) - .boundedStream(config.batchSize) - .chunkN(config.batchSize) - .map(_.toList) - .metered(1.second) // to avoid overloading the elasticsearch - .evalMap: docs => - dryRun.fold( - info"Would index total ${docs.size} games" *> - docs.traverse_(doc => debug"Would index $doc"), - storeBulk(docs) - ) - - private def storeBulk(docs: List[DbGame]): IO[Unit] = - val sources = docs.map(_.toSource) - info"Received ${docs.size} ${index.value}s to index" *> - elastic - .storeBulk(index, sources) - .handleErrorWith: e => - Logger[IO].error(e)(s"Failed to index ${index.value}s: ${docs.map(_.id).mkString(", ")}") - .whenA(sources.nonEmpty) - *> info"Indexed ${sources.size} ${index.value}s" + fs2.Stream.eval(info"Fetching teams from $since to $until") *> + games + .find(filter.and(gameFilter)) + // .projection(postProjection) + .boundedStream(config.batchSize) + .chunkN(config.batchSize) + .map(_.toList) + .metered(1.second) // to avoid overloading the elasticsearch + .map(ds => Result(ds.map(_.toSource), Nil, none)) private def changes(since: Option[Instant]): fs2.Stream[IO, List[ChangeStreamDocument[DbGame]]] = val builder = games.watch(aggregate) @@ -144,13 +112,6 @@ object GameIngestor: .map(_.toList.distincByDocId) .evalTap(_.traverse_(x => x.fullDocument.traverse_(x => debug"${x.debug}"))) - private def saveLastIndexedTimestamp(time: Instant): IO[Unit] = - store.put(index.value, time) - *> info"Stored last indexed time ${time.getEpochSecond} for $index" - - private def startAt: IO[Option[Instant]] = - config.startAt.fold(store.get(index.value))(_.some.pure[IO]) - object F: val createdAt = "ca" val updatedAt = "ua" diff --git a/modules/ingestor/src/main/scala/ingestor.study.scala b/modules/ingestor/src/main/scala/mongo.study.scala similarity index 61% rename from modules/ingestor/src/main/scala/ingestor.study.scala rename to modules/ingestor/src/main/scala/mongo.study.scala index 5ed2980..abeb3e0 100644 --- a/modules/ingestor/src/main/scala/ingestor.study.scala +++ b/modules/ingestor/src/main/scala/mongo.study.scala @@ -11,14 +11,9 @@ import org.typelevel.log4cats.{ Logger, LoggerFactory } import java.time.Instant -trait StudyIngestor: - // pull changes from study MongoDB and ingest into elastic search - def watch: fs2.Stream[IO, Unit] - def run(since: Instant, until: Instant, dryRun: Boolean): fs2.Stream[IO, Unit] +import Repo.* -object StudyIngestor: - - private val index = Index.Study +object StudyRepo: private val interestedfields = List("_id", F.name, F.members, F.ownerId, F.visibility, F.topics, F.likes) @@ -28,33 +23,30 @@ object StudyIngestor: def apply( study: MongoDatabase[IO], local: MongoDatabase[IO], - elastic: ESClient[IO], - store: KVStore, config: IngestorConfig.Study - )(using LoggerFactory[IO]): IO[StudyIngestor] = - given Logger[IO] = summon[LoggerFactory[IO]].getLogger + )(using LoggerFactory[IO]): IO[Repo[StudySource]] = + given Logger[IO] = LoggerFactory[IO].getLogger (study.getCollection("study"), ChapterRepo(study), local.getCollection("oplog.rs")) - .mapN(apply(elastic, store, config)) + .mapN(apply(config)) - def apply(elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Study)( + def apply(config: IngestorConfig.Study)( studies: MongoCollection, chapters: ChapterRepo, oplogs: MongoCollection - )(using Logger[IO]): StudyIngestor = new: - def watch: fs2.Stream[IO, Unit] = - intervalStream + )(using Logger[IO]): Repo[StudySource] = new: + + def watch(since: Option[Instant]): fs2.Stream[IO, Result[StudySource]] = + intervalStream(since) .meteredStartImmediately(config.interval) - .flatMap: (since, until) => - run(since, until, dryRun = false) + .flatMap(fetch) - def run(since: Instant, until: Instant, dryRun: Boolean): fs2.Stream[IO, Unit] = - fs2.Stream.eval(info"Indexing studies from $since to $until") ++ - pullAndIndex(since, until, dryRun) ++ - fs2.Stream.eval(info"deleting studies from $since to $until") ++ - pullAndDelete(since, until, dryRun) - ++ fs2.Stream.eval(saveLastIndexedTimestamp(until)) + def fetch(since: Instant, until: Instant): fs2.Stream[IO, Result[StudySource]] = + fs2.Stream.eval(info"Fetching studies from $since to $until") *> + pullAndIndex(since, until) + .zip(pullAndDelete(since, until)) + .map((toIndex, toDelete) => Result(toIndex, toDelete, until.some)) - def pullAndIndex(since: Instant, until: Instant, dryRun: Boolean = false): fs2.Stream[IO, Unit] = + def pullAndIndex(since: Instant, until: Instant) = val filter = range(F.createdAt)(since, until.some) .or(range(F.updatedAt)(since, until.some)) studies @@ -64,9 +56,9 @@ object StudyIngestor: .chunkN(config.batchSize) .map(_.toList) .evalTap(_.traverse_(x => debug"received $x")) - .evalMap(storeBulk(_, dryRun)) + .evalMap(_.toSources) - def pullAndDelete(since: Instant, until: Instant, dryRun: Boolean = false): fs2.Stream[IO, Unit] = + def pullAndDelete(since: Instant, until: Instant) = val filter = Filter .gte("ts", since.asBsonTimestamp) @@ -80,40 +72,17 @@ object StudyIngestor: .chunkN(config.batchSize) .map(_.toList.flatMap(extractId)) .evalTap(xs => info"Deleting $xs") - .evalMap: - dryRun.fold( - xs => xs.traverse_(x => debug"Would delete $x"), - elastic.deleteMany(index, _) - ) - - def storeBulk(docs: List[Document], dryRun: Boolean = false): IO[Unit] = - info"Received ${docs.size} studies to index" *> - docs.toSources.flatMap: sources => - dryRun.fold( - sources.traverse_(source => debug"Would index $source"), - elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} studies" - .handleErrorWith: e => - Logger[IO].error(e)(s"Failed to index studies: ${docs.map(_.id).mkString(", ")}") - .whenA(docs.nonEmpty) - ) - - def saveLastIndexedTimestamp(time: Instant): IO[Unit] = - store.put(index.value, time) - *> info"Stored last indexed time ${time.getEpochSecond} for $index" def extractId(doc: Document): Option[Id] = doc.getNestedAs[String](F.oplogId).map(Id.apply) - def intervalStream: fs2.Stream[IO, (Instant, Instant)] = - fs2.Stream - .eval: - config.startAt.fold(store.get(index.value))(_.some.pure[IO]) - .flatMap: startAt => - startAt.fold(fs2.Stream.empty)(since => fs2.Stream(since)) - ++ fs2.Stream - .eval(IO.realTimeInstant) - .flatMap(now => fs2.Stream.unfold(now)(s => (s, s.plusSeconds(config.interval.toSeconds)).some)) - .zipWithNext + def intervalStream(startAt: Option[Instant]): fs2.Stream[IO, (Instant, Instant)] = + (startAt.fold(fs2.Stream.empty)(since => fs2.Stream(since)) + ++ fs2.Stream + .eval(IO.realTimeInstant) + .flatMap(now => + fs2.Stream.unfold(now)(s => (s, s.plusSeconds(config.interval.toSeconds)).some) + )).zipWithNext .map((since, until) => since -> until.get) extension (docs: List[Document]) diff --git a/modules/ingestor/src/main/scala/ingestor.team.scala b/modules/ingestor/src/main/scala/mongo.team.scala similarity index 51% rename from modules/ingestor/src/main/scala/ingestor.team.scala rename to modules/ingestor/src/main/scala/mongo.team.scala index 2f8acc8..5290c31 100644 --- a/modules/ingestor/src/main/scala/ingestor.team.scala +++ b/modules/ingestor/src/main/scala/mongo.team.scala @@ -15,15 +15,9 @@ import org.typelevel.log4cats.{ Logger, LoggerFactory } import java.time.Instant import scala.concurrent.duration.* -trait TeamIngestor: - // watch change events from MongoDB and ingest team data into elastic search - def watch: fs2.Stream[IO, Unit] - // Fetch teams in [since, until] and ingest into elastic search - def run(since: Instant, until: Instant, dryRun: Boolean): fs2.Stream[IO, Unit] +import Repo.{ *, given } -object TeamIngestor: - - private val index = Index.Team +object TeamRepo: private val interestedOperations = List(DELETE, INSERT, UPDATE, REPLACE).map(_.getValue) private val eventFilter = Filter.in("operationType", interestedOperations) @@ -37,65 +31,15 @@ object TeamIngestor: private val aggregate = Aggregate.matchBy(eventFilter).combinedWith(Aggregate.project(eventProjection)) - def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Team)( - using LoggerFactory[IO] - ): IO[TeamIngestor] = - given Logger[IO] = summon[LoggerFactory[IO]].getLogger - mongo.getCollection("team").map(apply(elastic, store, config)) - - def apply(elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Team)(teams: MongoCollection)(using - Logger[IO] - ): TeamIngestor = new: - def watch = - fs2.Stream - .eval(startAt.flatTap(since => info"Starting team ingestor from $since")) - .flatMap: last => - changeStream(last) - .filterNot(_.isEmpty) - .evalMap: events => - val lastEventTimestamp = events.lastOption.flatMap(_.clusterTime).flatMap(_.asInstant) - val (toDelete, toIndex) = events.partition(_.isDelete) - storeBulk(toIndex.flatten(_.fullDocument)) - *> elastic.deleteMany(index, toDelete) - *> saveLastIndexedTimestamp(lastEventTimestamp.getOrElse(Instant.now)) - - def run(since: Instant, until: Instant, dryRun: Boolean) = - val filter = range(F.createdAt)(since, until.some) - .or(range(F.updatedAt)(since, until.some)) - .or(range(F.erasedAt)(since, until.some)) - teams - .find(filter) - .projection(postProjection) - .boundedStream(config.batchSize) - .chunkN(config.batchSize) - .map(_.toList) - .metered(1.second) // to avoid overloading the elasticsearch - .evalMap: docs => - val (toDelete, toIndex) = docs.partition(!_.isEnabled) - dryRun.fold( - toIndex.traverse_(doc => debug"Would index $doc") - *> toDelete.traverse_(doc => debug"Would delete $doc"), - storeBulk(toIndex) *> elastic.deleteMany(index, toDelete) - ) - - private def storeBulk(docs: List[Document]): IO[Unit] = - val sources = docs.toSources - info"Received ${docs.size} teams to index" *> - elastic - .storeBulk(index, sources) - .handleErrorWith: e => - Logger[IO].error(e)(s"Failed to index teams: ${docs.map(_.id).mkString(", ")}") - .whenA(sources.nonEmpty) - *> info"Indexed ${sources.size} teams" + def apply(mongo: MongoDatabase[IO], config: IngestorConfig.Team)(using + LoggerFactory[IO] + ): IO[Repo[TeamSource]] = + given Logger[IO] = LoggerFactory[IO].getLogger + mongo.getCollection("team").map(apply(config)) - private def saveLastIndexedTimestamp(time: Instant): IO[Unit] = - store.put(index.value, time) - *> info"Stored last indexed time ${time.getEpochSecond} for $index" + def apply(config: IngestorConfig.Team)(teams: MongoCollection)(using Logger[IO]): Repo[TeamSource] = new: - private def startAt: IO[Option[Instant]] = - config.startAt.fold(store.get(index.value))(_.some.pure[IO]) - - private def changeStream(since: Option[Instant]): fs2.Stream[IO, List[ChangeStreamDocument[Document]]] = + def watch(since: Option[Instant]) = // skip the first event if we're starting from a specific timestamp // since the event at that timestamp is already indexed val skip = since.fold(0)(_ => 1) @@ -109,6 +53,34 @@ object TeamIngestor: .evalTap(x => debug"Team change stream event: $x") .groupWithin(config.batchSize, config.timeWindows.second) .map(_.toList.distincByDocId) + .map: docs => + val lastEventTimestamp = docs.lastOption.flatMap(_.clusterTime).flatMap(_.asInstant) + val (toDelete, toIndex) = docs.partition(_.isDelete) + Result( + toIndex.flatten(_.fullDocument).toSources, + toDelete.flatten(_.docId.map(Id.apply)), + lastEventTimestamp + ) + + def fetch(since: Instant, until: Instant) = + val filter = range(F.createdAt)(since, until.some) + .or(range(F.updatedAt)(since, until.some)) + .or(range(F.erasedAt)(since, until.some)) + fs2.Stream.eval(info"Fetching teams from $since to $until") *> + teams + .find(filter) + .projection(postProjection) + .boundedStream(config.batchSize) + .chunkN(config.batchSize) + .map(_.toList) + .metered(1.second) // to avoid overloading the elasticsearch + .map: docs => + val (toDelete, toIndex) = docs.partition(!_.isEnabled) + Result( + toIndex.toSources, + toDelete.flatten(_.id.map(Id.apply)), + none + ) extension (docs: List[Document]) private def toSources: List[(String, TeamSource)] = diff --git a/modules/ingestor/src/main/scala/package.scala b/modules/ingestor/src/main/scala/package.scala deleted file mode 100644 index ba2b428..0000000 --- a/modules/ingestor/src/main/scala/package.scala +++ /dev/null @@ -1,62 +0,0 @@ -package lila.search -package ingestor - -import cats.effect.IO -import cats.syntax.all.* -import com.github.plokhotnyuk.jsoniter_scala.core.* -import com.sksamuel.elastic4s.Indexable -import mongo4cats.bson.Document -import mongo4cats.collection.GenericMongoCollection -import mongo4cats.models.collection.ChangeStreamDocument -import mongo4cats.operations.Filter -import org.bson.BsonTimestamp -import org.typelevel.log4cats.Logger -import org.typelevel.log4cats.syntax.* -import smithy4s.json.Json.given -import smithy4s.schema.Schema - -import java.time.Instant - -val _id = "_id" - -type MongoCollection = GenericMongoCollection[IO, Document, [A] =>> fs2.Stream[IO, A]] - -given [A]: HasDocId[ChangeStreamDocument[A]] with - extension (change: ChangeStreamDocument[A]) - def docId: Option[String] = - change.documentKey.flatMap(_.id) - -extension (doc: Document) - private def id: Option[String] = - doc.getString(_id) - -given [A: Schema]: Indexable[A] = (a: A) => writeToString(a) - -extension (instant: Instant) - inline def asBsonTimestamp: BsonTimestamp = BsonTimestamp(instant.getEpochSecond.toInt, 1) - -def range(field: String)(since: Instant, until: Option[Instant]): Filter = - inline def gtes = Filter.gte(field, since) - until.fold(gtes)(until => gtes.and(Filter.lt(field, until))) - -extension (elastic: ESClient[IO]) - - def deleteMany_(index: Index, ids: List[Id])(using Logger[IO]): IO[Unit] = - elastic - .deleteMany(index, ids) - .flatTap(_ => Logger[IO].info(s"Deleted ${ids.size} ${index.value}s")) - .handleErrorWith: e => - Logger[IO].error(e)(s"Failed to delete ${index.value}: ${ids.map(_.value).mkString(", ")}") - .whenA(ids.nonEmpty) - - @scala.annotation.targetName("deleteManyWithDocs") - def deleteMany(index: Index, events: List[Document])(using Logger[IO]): IO[Unit] = - info"Received ${events.size} ${index.value} to delete" *> - deleteMany_(index, events.flatMap(_.id).map(Id.apply)).whenA(events.nonEmpty) - - @scala.annotation.targetName("deleteManyWithChanges") - def deleteMany[A](index: Index, events: List[ChangeStreamDocument[A]])(using Logger[IO]): IO[Unit] = - info"Received ${events.size} ${index.value} to delete" *> - deleteMany_(index, events.flatMap(_.docId).map(Id.apply)).whenA(events.nonEmpty) - -extension (s: String) def dollarPrefix = "$" + s