Skip to content

Commit

Permalink
Only fetch interested fields and clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
lenguyenthanh committed Jun 21, 2024
1 parent 5211336 commit 32378e6
Showing 1 changed file with 12 additions and 14 deletions.
26 changes: 12 additions & 14 deletions modules/ingestor/src/main/scala/ingestor.team.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import lila.search.spec.TeamSource
import mongo4cats.bson.Document
import mongo4cats.database.MongoDatabase
import mongo4cats.models.collection.ChangeStreamDocument
import mongo4cats.operations.{ Aggregate, Filter }
import mongo4cats.operations.{ Aggregate, Filter, Projection }
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.syntax.*

Expand All @@ -26,16 +26,14 @@ object TeamIngestor:

private val interestedOperations = List(DELETE, INSERT, UPDATE, REPLACE).map(_.getValue)
private val eventFilter = Filter.in("operationType", interestedOperations)
// private val eventProjection = Projection.include(
// List(
// "documentKey._id",
// "fullDocument.name",
// "fullDocument.description",
// "fullDocument.nbMembers",
// "fullDocument.createdBy",
// )
// )
private val aggregate = Aggregate.matchBy(eventFilter) // .combinedWith(Aggregate.project(eventProjection))

private val interestedFields = List("_id", F.name, F.description, F.nbMembers, F.name, F.enabled)

private val interestedEventFields =
List("operationType", "clusterTime", "documentKey._id") ++ interestedFields.map("fullDocument." + _)
private val eventProjection = Projection.include(interestedEventFields)

private val aggregate = Aggregate.matchBy(eventFilter).combinedWith(Aggregate.project(eventProjection))

def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Team)(
using Logger[IO]
Expand Down Expand Up @@ -92,8 +90,7 @@ object TeamIngestor:
.batchSize(config.batchSize)
.fullDocument(FullDocument.UPDATE_LOOKUP) // this is required for update event
.boundedStream(config.batchSize)
.evalTap(IO.println)
.evalTap(x => IO.println(x.fullDocument))
.evalTap(x => debug"Team change stream event: $x")
.groupWithin(config.batchSize, config.timeWindows.second)
.map(_.toList)

Expand All @@ -110,7 +107,7 @@ object TeamIngestor:
).mapN(TeamSource.apply)

private def isEnabled =
doc.getBoolean("enabled").getOrElse(true)
doc.getBoolean(F.enabled).getOrElse(true)

extension (event: ChangeStreamDocument[Document])
private def isDelete: Boolean =
Expand All @@ -121,3 +118,4 @@ object TeamIngestor:
val name = "name"
val description = "description"
val nbMembers = "nbMembers"
val enabled = "enabled"

0 comments on commit 32378e6

Please sign in to comment.