Skip to content

Commit

Permalink
Aggreate chapter data that being used for indexing
Browse files Browse the repository at this point in the history
This include: name, tags and all comments.

This use mongodb aggreation feature to aggreate all comments
in study_chapter_flat collection.
  • Loading branch information
lenguyenthanh committed Jun 24, 2024
1 parent 80ef59c commit 3590e15
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 36 deletions.
8 changes: 4 additions & 4 deletions modules/ingestor/src/main/scala/app.config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object IngestorConfig:
.or(prop("ingestor.forum.start.at"))
.as[Long]
.option
def config = (batchSize, timeWindows, startAt).parMapN(Forum.apply)
def config = (batchSize, timeWindows, startAt).mapN(Forum.apply)

private object Team:
private def batchSize =
Expand All @@ -61,7 +61,7 @@ object IngestorConfig:
env("INGESTOR_TEAM_TIME_WINDOWS").or(prop("ingestor.team.time.windows")).as[Int].default(10)
private def startAt =
env("INGESTOR_TEAM_START_AT").or(prop("ingestor.team.start.at")).as[Long].option
def config = (batchSize, timeWindows, startAt).parMapN(Team.apply)
def config = (batchSize, timeWindows, startAt).mapN(Team.apply)

private object Study:
private def batchSize =
Expand All @@ -70,6 +70,6 @@ object IngestorConfig:
env("INGESTOR_STUDY_TIME_WINDOWS").or(prop("ingestor.study.time.windows")).as[Int].default(10)
private def startAt =
env("INGESTOR_STUDY_START_AT").or(prop("ingestor.study.start.at")).as[Long].option
def config = (batchSize, timeWindows, startAt).parMapN(Study.apply)
def config = (batchSize, timeWindows, startAt).mapN(Study.apply)

def config = (Forum.config, Team.config, Study.config).parMapN(IngestorConfig.apply)
def config = (Forum.config, Team.config, Study.config).mapN(IngestorConfig.apply)
10 changes: 7 additions & 3 deletions modules/ingestor/src/main/scala/ingestor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ object Ingestor:
def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig)(using
Logger[IO]
): IO[Ingestor] =
(ForumIngestor(mongo, elastic, store, config.forum), TeamIngestor(mongo, elastic, store, config.team))
.mapN: (forum, team) =>
(
ForumIngestor(mongo, elastic, store, config.forum),
TeamIngestor(mongo, elastic, store, config.team),
StudyIngestor(mongo, elastic, store, config.study)
)
.mapN: (forum, team, study) =>
new Ingestor:
def run() =
forum.watch.merge(team.watch).compile.drain
forum.watch.merge(team.watch).merge(study.watch).compile.drain
104 changes: 78 additions & 26 deletions modules/ingestor/src/main/scala/ingestor.study.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import cats.syntax.all.*
import com.mongodb.client.model.changestream.FullDocument
import com.mongodb.client.model.changestream.OperationType.*
import lila.search.spec.StudySource
import mongo4cats.bson.Document
import mongo4cats.bson.{ BsonValue, Document }
import mongo4cats.database.MongoDatabase
import mongo4cats.models.collection.ChangeStreamDocument
import mongo4cats.operations.{ Aggregate, Filter, Projection }
import mongo4cats.operations.{ Accumulator, Aggregate, Filter, Projection }
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.syntax.*

Expand Down Expand Up @@ -59,20 +59,23 @@ object StudyIngestor:
changeStream(last)
.filterNot(_.isEmpty)
.evalMap: events =>
val lastEventTimestamp = events.lastOption.flatMap(_.clusterTime).flatMap(_.asInstant)
// val lastEventTimestamp = events.lastOption.flatMap(_.clusterTime).flatMap(_.asInstant)
val (toDelete, toIndex) = events.partition(_.isDelete)
storeBulk(toIndex.flatten(_.fullDocument))
*> elastic.deleteMany(index, toDelete)
*> saveLastIndexedTimestamp(lastEventTimestamp.getOrElse(Instant.now))
val studyIds = events.flatten(_.docId).distinct
chapterDataByIds(studyIds) >>= IO.println
// storeBulk(toIndex.flatten(_.fullDocument))
// *> elastic.deleteMany(index, toDelete)
// *> saveLastIndexedTimestamp(lastEventTimestamp.getOrElse(Instant.now))

private def storeBulk(docs: List[Document]): IO[Unit] =
val sources = docs.toSources
def storeBulk(docs: List[Document]): IO[Unit] =
info"Received ${docs.size} studies to index" *>
elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} studies"
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to index studies: ${docs.map(_.id).mkString(", ")}")
docs.toSources.flatMap: sources =>
elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} studies"
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to index studies: ${docs.map(_.id).mkString(", ")}")
.whenA(docs.nonEmpty)

private def saveLastIndexedTimestamp(time: Instant): IO[Unit] =
def saveLastIndexedTimestamp(time: Instant): IO[Unit] =
store.put(index.value, time)
*> info"Stored last indexed time ${time.getEpochSecond} for $index"

Expand All @@ -90,15 +93,16 @@ object StudyIngestor:
.fullDocument(FullDocument.UPDATE_LOOKUP) // this is required for update event
.boundedStream(config.batchSize)
.drop(skip)
.evalTap(x => debug"Study change stream event: $x")
// .evalTap(x => debug"Study change stream event: $x")
.groupWithin(config.batchSize, config.timeWindows.second)
.map(_.toList)

// TOOD: only need to index the latest study
// We should do that in changeStream by using some fs2 operators
extension (docs: List[Document])
private def toSources: IO[List[StudySourceWithId]] =
val studyIds = docs.flatMap(_.id).distinct
chapterNamesByIds(studyIds).flatMap: chapterNames =>
chapterDataByIds(studyIds).flatMap: chapterNames =>
docs
.traverse(_.toSource(chapterNames))
.map(_.flatten)
Expand All @@ -116,29 +120,77 @@ object StudyIngestor:

type StudySourceWithId = (String, StudySource)
extension (doc: Document)
private def toSource(chapterNames: Map[String, List[String]]): IO[Option[StudySourceWithId]] = ???
private def toSource(chapterNames: Map[String, Chapter]): IO[Option[StudySourceWithId]] = ???

extension (event: ChangeStreamDocument[Document])
private def isDelete: Boolean =
event.operationType == DELETE
// event.fullDocument.fold(false)(x => !x.isEnabled)

val addFields = Aggregate.addFields(
"comments" ->
Document.empty.add("$objectToArray" -> "$root").toBsonDocument
)

val unwinding = Aggregate.unwind("$comments")
val matching = Aggregate.matchBy(Filter.exists("comments.v.co"))
val replaceRoot = Aggregate.replaceWith(
Document.empty.add(
"$mergeObjects" -> List(
Document(
"comments" -> BsonValue.string("$comments.v.co.text"),
"name" -> BsonValue.string("$name"),
"tags" -> BsonValue.string("$tags"),
"_id" -> BsonValue.string("$_id")
)
)
)
)

val group = Aggregate.group(
"_id",
Accumulator
.push("comments", "$comments")
.combinedWith(Accumulator.first("name", "$name"))
.combinedWith(Accumulator.first("tags", "$tags"))
)
// Fetches chapter names by their study ids
// could be stream it's too large
private def chapterNamesByIds(ids: Seq[String]): IO[Map[String, List[String]]] =
chapters
.find(Filter.in(Chapter.studyId, ids))
.projection(Projection.include(List(_id, Chapter.name)))
.all
.map(_.flatMap(doc => (doc.id -> doc.getString(Chapter.name)).mapN(_ -> _)))
.map(_.groupMapReduce(_._1)(x => List(x._2))(_ ++ _))
private def chapterDataByIds(ids: Seq[String]): IO[Map[String, Chapter]] =
val filterByIds = Aggregate.matchBy(Filter.in(Chapter.studyId, ids))
val chapterAggregates = List(addFields, unwinding, matching, replaceRoot, group)
.foldLeft(filterByIds)(_.combinedWith(_))
for
_ <- IO.println(chapterAggregates)
_ <- IO.println(ids)
x <- chapters
.aggregate[Document](chapterAggregates)
.all
_ <- IO.println(x)

y = x.flatMap(doc => (doc.id, Chapter(doc)).mapN((_, _))).toMap
yield y

object Study:
val name = "name"
val likes = "likes"
val members = "members"
val ownerId = "ownerId"
val name = "name"
val likes = "likes"
val members = "members"
val ownerId = "ownerId"
val description = "description"

case class Chapter(name: String, tags: List[String], comments: List[String])

object Chapter:
val name = "name"
val studyId = "studyId"
val tags = "tags"

// accumulates comments into a list
val comments = "comments"

def apply(doc: Document): Option[Chapter] =
(
doc.getString(name),
doc.getList(tags).map(_.flatMap(_.asString)),
doc.getList(comments).map(_.flatMap(_.asList).flatten.flatMap(_.asString))
).mapN(Chapter.apply)
7 changes: 4 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ object Dependencies {
object V {
val catsEffect = "3.5.4"
val ciris = "3.6.0"
val elastic4s = "8.13.0"
val decline = "2.4.1"
val elastic4s = "8.13.0"
val fs2 = "3.10.2"
val http4s = "0.23.27"
val iron = "2.5.0"
val mongo4cats = "0.7.7"
}

def http4s(artifact: String) = "org.http4s" %% s"http4s-$artifact" % V.http4s
Expand Down Expand Up @@ -44,8 +45,8 @@ object Dependencies {
val elastic4sJavaClient = "nl.gn0s1s" %% "elastic4s-client-esjava" % V.elastic4s
val elastic4sCatsEffect = "nl.gn0s1s" %% "elastic4s-effect-cats" % V.elastic4s

val mongo4catsCore = "io.github.kirill5k" %% "mongo4cats-core" % "0.7.7"
val mongo4catsCirce = "io.github.kirill5k" %% "mongo4cats-circe" % "0.7.7"
val mongo4catsCore = "io.github.kirill5k" %% "mongo4cats-core" % V.mongo4cats
val mongo4catsCirce = "io.github.kirill5k" %% "mongo4cats-circe" % V.mongo4cats

val log4Cats = "org.typelevel" %% "log4cats-slf4j" % "2.7.0"
val logback = "ch.qos.logback" % "logback-classic" % "1.5.6"
Expand Down

0 comments on commit 3590e15

Please sign in to comment.