Skip to content

Commit

Permalink
Fix ingestor don't delete deleted forum posts
Browse files Browse the repository at this point in the history
  • Loading branch information
lenguyenthanh committed Jun 21, 2024
1 parent 73812eb commit 3711a85
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 2 deletions.
8 changes: 7 additions & 1 deletion modules/ingestor/src/main/scala/ingestor.forum.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object ForumIngestor:
val lastEventTimestamp = events.flatten(_.clusterTime.flatMap(_.asInstant)).maxOption
val (toDelete, toIndex) = events.partition(_.isDelete)
storeBulk(toIndex.flatten(_.fullDocument))
*> deleteMany(toDelete.flatten(_.fullDocument))
*> deleteMany(toDelete)
*> saveLastIndexedTimestamp(lastEventTimestamp.getOrElse(Instant.now()))

def run(since: Instant, until: Option[Instant], dryRun: Boolean): fs2.Stream[IO, Unit] =
Expand Down Expand Up @@ -89,11 +89,17 @@ object ForumIngestor:
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to index forum posts: ${events.map(_._id).mkString(", ")}")

@scala.annotation.targetName("deleteManyWithDocs")
private def deleteMany(events: List[Document]): IO[Unit] =
info"Received ${events.size} forum posts to delete" *>
IO.whenA(events.nonEmpty):
deleteMany(events.flatMap(_._id).map(Id.apply))

@scala.annotation.targetName("deleteManyWithChanges")
private def deleteMany(events: List[ChangeStreamDocument[Document]]): IO[Unit] =
info"Received ${events.size} forum posts to delete" *>
deleteMany(events.flatMap(_.docId).map(Id.apply)).whenA(events.nonEmpty)

@scala.annotation.targetName("deleteManyWithIds")
private def deleteMany(ids: List[Id]): IO[Unit] =
IO.whenA(ids.nonEmpty):
Expand Down
2 changes: 1 addition & 1 deletion modules/ingestor/src/main/scala/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import java.time.Instant
type MongoCollection = GenericMongoCollection[IO, Document, [A] =>> fs2.Stream[IO, A]]

extension [A](change: ChangeStreamDocument[A])
def id: Option[String] = change.documentKey.flatMap(_.getString("_id"))
def docId: Option[String] = change.documentKey.flatMap(_.getString("_id"))

given [A: Schema]: Indexable[A] = (a: A) => writeToString(a)
given Indexable[Source] =
Expand Down

0 comments on commit 3711a85

Please sign in to comment.