From 183b128e7f2d8f63c181dd4f2e9e3df832deb6ef Mon Sep 17 00:00:00 2001 From: Roberto Tyley Date: Tue, 31 Jan 2023 15:56:27 +0000 Subject: [PATCH] Fix double-execution of scan thunk https://github.com/guardian/prout/pull/90 introduced additional logging, providing a `logAround()` method that timed the execution of `Future`s. However, it contained a bug! The `thunk` was passed as a 'by-name' parameter (see https://docs.scala-lang.org/tour/by-name-parameters.html) so that it wouldn't start executing until we were ready to start timing it, which is reasonable. 'by-name' parameters are evaluated *every* time they are used though, and the `logAround()` method evaluated it _twice_. So the thunk was executed twice, concurrently, when it was just supposed to be executed once. You can see in the logs below that the 3 pieces of code timed with `logAround()` were executed twice: ``` Jan 31 15:50:48 prout-bot app/web.1 [info] controllers.Api - githubHook repo=guardian/frontend githubDeliveryGuid=Some(0789db88-a17f-11ed-9d54-6d035aa39ad1) xRequestId=Some(78c70f90-622e-4bb0-8d53-c70b0727b4b0) Jan 31 15:50:51 prout-bot app/web.1 [info] lib.RepoUtil - Updating Git repo with fetch... https://github.com/guardian/frontend.git Jan 31 15:50:51 prout-bot app/web.1 [info] lib.RepoUtil - Updating Git repo with fetch... https://github.com/guardian/frontend.git Jan 31 15:50:51 prout-bot app/web.1 [info] c.m.s.GitHub - guardian/frontend - Git Repo ref count: Success(393) Jan 31 15:50:51 prout-bot app/web.1 [info] c.m.s.GitHub - guardian/frontend - 'fetch repo hooks' 196 ms : success=true Jan 31 15:50:51 prout-bot app/web.1 [info] c.m.s.GitHub - guardian/frontend - Git Repo ref count: Success(393) Jan 31 15:50:51 prout-bot app/web.1 [info] c.m.s.GitHub - guardian/frontend - 'fetch git repo' 341 ms : success=true Jan 31 15:50:57 prout-bot app/web.1 [info] c.m.s.GitHub - guardian/frontend - PRs merged to master size=25 Jan 31 15:50:57 prout-bot app/web.1 [info] c.m.s.GitHub - guardian/frontend - Merged Pull Requests fetched: Success(List(25871, 25869, 25868, 25865, 25862, 25861, 25860, 25859, 25857, 25856, 25851, 25850, 25849, 25848, 25846, 25845, 25844, 25842, 25841, 25838, 25837, 25836, 25834, 25792, 25749)) Jan 31 15:50:57 prout-bot app/web.1 [info] c.m.s.GitHub - guardian/frontend - 'fetch PRs' 6160 ms : success=true Jan 31 15:50:58 prout-bot app/web.1 [info] c.m.s.GitHub - guardian/frontend - PRs merged to master size=25 Jan 31 15:50:58 prout-bot app/web.1 [info] c.m.s.GitHub - guardian/frontend - Merged Pull Requests fetched: Success(List(25871, 25869, 25868, 25865, 25862, 25861, 25860, 25859, 25857, 25856, 25851, 25850, 25849, 25848, 25846, 25845, 25844, 25842, 25841, 25838, 25837, 25836, 25834, 25792, 25749)) Jan 31 15:50:58 prout-bot app/web.1 [info] l.RepoLevelDetails - Need to look at guardian/frontend, branch:main commit AnyObjectId[2bddf3a5f95129cf745eb7843b71ce9f8782eeca] ``` Fetching repo PRs and hooks through GitHub API calls can be duplicated without much issue (apart from perhaps doubling API quota consumed), but fetching the git repo itself (cloning/fetching) happens on a fixed folder on the filesystem, and having simultaneous threads trying to write to that folder would often lead to exceptions, trying to lock those files - here are two examples: ``` Jan 30 12:00:01 prout-bot app/web.1 [info] c.m.s.GitHub - guardian/members-data-api - Git Repo ref count: Failure(org.eclipse.jgit.api.errors.TransportException: lock error: /tmp/bot/working-dir/guardian/members-data-api/repo.git/shallow) ``` ``` Jan 30 12:44:10 prout-bot app/web.1 Caused by: org.eclipse.jgit.errors.LockFailedException: Cannot lock /tmp/bot/working-dir/guardian/prout/repo.git/config. Ensure that no other process has an open file handle on the lock file /tmp/bot/working-dir/guardian/prout/repo.git/config.lock, then you may delete the lock file and retry. Jan 30 12:44:10 prout-bot app/web.1 at org.eclipse.jgit.storage.file.FileBasedConfig.save(FileBasedConfig.java:185) Jan 30 12:44:10 prout-bot app/web.1 at org.eclipse.jgit.api.CloneCommand.fetch(CloneCommand.java:303) Jan 30 12:44:10 prout-bot app/web.1 at org.eclipse.jgit.api.CloneCommand.call(CloneCommand.java:191) Jan 30 12:44:10 prout-bot app/web.1 at org.eclipse.jgit.api.CloneCommand.call(CloneCommand.java:1) Jan 30 12:44:10 prout-bot app/web.1 at lib.RepoUtil$.invoke$1(RepoUtil.scala:39) Jan 30 12:44:10 prout-bot app/web.1 at lib.RepoUtil$.getUpToDateRepo$1(RepoUtil.scala:61) Jan 30 12:44:10 prout-bot app/web.1 at lib.RepoUtil$.getGitRepo(RepoUtil.scala:68) Jan 30 12:44:10 prout-bot app/web.1 at lib.RepoSnapshot$Factory.$anonfun$fetchLatestCopyOfGitRepo$1(RepoSnapshot.scala:121) ``` Sentry does a reasonable job of showing that these errors only started with PR #90 (looking at the 'First Seen' of 'Jan 26, 5:57 PM'): https://sentry.io/organizations/the-guardian/issues/3899449647/?project=49913&query=is%3Aunresolved&referrer=issue-stream --- app/lib/Dogpile.scala | 3 ++- app/lib/Droid.scala | 4 ++-- app/lib/RepoSnapshot.scala | 5 +++-- conf/logback.xml | 2 +- test/lib/DogpileSpec.scala | 39 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 47 insertions(+), 6 deletions(-) create mode 100644 test/lib/DogpileSpec.scala diff --git a/app/lib/Dogpile.scala b/app/lib/Dogpile.scala index 23bcd0e..68a0e05 100644 --- a/app/lib/Dogpile.scala +++ b/app/lib/Dogpile.scala @@ -46,7 +46,8 @@ class Dogpile[R](thing: => Future[R]) { * * @return a future for a run which has been initiated at or after this call */ - def doAtLeastOneMore(): Future[R] = stateRef.updateAndGet { previousState => + def doAtLeastOneMore(): Future[R] = stateRef.updateAndGet { // TODO updateAndGet shouldn't handle side-effects + previousState => if (previousState.scanFuture.isCompleted) ScanRun(thing) else { previousState match { case ScanQueued(_) => previousState diff --git a/app/lib/Droid.scala b/app/lib/Droid.scala index 62c2e8f..39c5865 100644 --- a/app/lib/Droid.scala +++ b/app/lib/Droid.scala @@ -23,14 +23,14 @@ class Droid( val logger = Logger(getClass) def scan(repoId: RepoId): Future[Seq[PullRequestCheckpointsStateChangeSummary]] = { - logger.info(s"Asked to audit $repoId") + logger.info(s"Asked to audit ${repoId.fullName}") for { repoSnapshot <- repoSnapshotFactory.snapshot(repoId) pullRequestUpdates <- processMergedPullRequestsOn(repoSnapshot) activeSnapshots <- repoSnapshot.activeSnapshotsF } yield { - logger.info(s"$repoId has ${activeSnapshots.size} active snapshots : ${activeSnapshots.map(s => s.checkpoint.name -> s.commitIdTry.map(_.map(_.shortName).getOrElse("None"))).toMap}") + logger.info(s"${repoId.fullName} has ${activeSnapshots.size} active snapshots : ${activeSnapshots.map(s => s.checkpoint.name -> s.commitIdTry.map(_.map(_.shortName).getOrElse("None"))).toMap}") pullRequestUpdates } } diff --git a/app/lib/RepoSnapshot.scala b/app/lib/RepoSnapshot.scala index 978e379..6a181f6 100644 --- a/app/lib/RepoSnapshot.scala +++ b/app/lib/RepoSnapshot.scala @@ -59,11 +59,12 @@ object RepoSnapshot { def log(message: String)(implicit repo: Repo): Unit = logger.info(s"${repo.full_name} - $message") def logAround[T](desc: String)(thunk: => Future[T])(implicit repo: Repo): Future[T] = { val start = System.currentTimeMillis() - thunk.onComplete { attempt => + val fut = thunk // evaluate thunk, evaluate only once! + fut.onComplete { attempt => val elapsedMs = System.currentTimeMillis() - start log(s"'$desc' $elapsedMs ms : success=${attempt.isSuccess}") } - thunk + fut } def isMergedToMain(pr: PullRequest)(implicit repo: Repo): Boolean = diff --git a/conf/logback.xml b/conf/logback.xml index 3a47bd7..611c9e1 100644 --- a/conf/logback.xml +++ b/conf/logback.xml @@ -11,7 +11,7 @@ - %coloredLevel %logger{15} - %message%n%xException{10} + %coloredLevel %logger{15} in %thread - %message%n%xException{10} diff --git a/test/lib/DogpileSpec.scala b/test/lib/DogpileSpec.scala new file mode 100644 index 0000000..f5d38a0 --- /dev/null +++ b/test/lib/DogpileSpec.scala @@ -0,0 +1,39 @@ +package lib + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import java.util.concurrent.Executors +import java.util.concurrent.atomic.{AtomicLong, LongAdder} +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, Future} + +class DogpileSpec extends AnyFlatSpec with Matchers { + it should "not concurrently execute the side-effecting function" in { + + val executionCount = new LongAdder() + val runningCounter = new AtomicLong(0) + val clashCounter = new LongAdder() + + val dogpile = new Dogpile[String]({ + val numRunning = runningCounter.incrementAndGet() + executionCount.increment() + if (numRunning > 1) { + clashCounter.increment() + } + Thread.sleep(10) + runningCounter.decrementAndGet() + Future.successful("OK") + }) + + implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(4)) + + val numExecutions = 20 + val allF = Future.traverse(1 to numExecutions)(_ => dogpile.doAtLeastOneMore()) + Await.ready(allF, 15.seconds) + + executionCount.longValue() shouldBe numExecutions + runningCounter.get() shouldBe 0 + clashCounter.longValue() shouldBe 0 + } +}