diff --git a/app/lib/Dogpile.scala b/app/lib/Dogpile.scala index 23bcd0e..68a0e05 100644 --- a/app/lib/Dogpile.scala +++ b/app/lib/Dogpile.scala @@ -46,7 +46,8 @@ class Dogpile[R](thing: => Future[R]) { * * @return a future for a run which has been initiated at or after this call */ - def doAtLeastOneMore(): Future[R] = stateRef.updateAndGet { previousState => + def doAtLeastOneMore(): Future[R] = stateRef.updateAndGet { // TODO updateAndGet shouldn't handle side-effects + previousState => if (previousState.scanFuture.isCompleted) ScanRun(thing) else { previousState match { case ScanQueued(_) => previousState diff --git a/app/lib/Droid.scala b/app/lib/Droid.scala index 62c2e8f..39c5865 100644 --- a/app/lib/Droid.scala +++ b/app/lib/Droid.scala @@ -23,14 +23,14 @@ class Droid( val logger = Logger(getClass) def scan(repoId: RepoId): Future[Seq[PullRequestCheckpointsStateChangeSummary]] = { - logger.info(s"Asked to audit $repoId") + logger.info(s"Asked to audit ${repoId.fullName}") for { repoSnapshot <- repoSnapshotFactory.snapshot(repoId) pullRequestUpdates <- processMergedPullRequestsOn(repoSnapshot) activeSnapshots <- repoSnapshot.activeSnapshotsF } yield { - logger.info(s"$repoId has ${activeSnapshots.size} active snapshots : ${activeSnapshots.map(s => s.checkpoint.name -> s.commitIdTry.map(_.map(_.shortName).getOrElse("None"))).toMap}") + logger.info(s"${repoId.fullName} has ${activeSnapshots.size} active snapshots : ${activeSnapshots.map(s => s.checkpoint.name -> s.commitIdTry.map(_.map(_.shortName).getOrElse("None"))).toMap}") pullRequestUpdates } } diff --git a/app/lib/RepoSnapshot.scala b/app/lib/RepoSnapshot.scala index 978e379..6a181f6 100644 --- a/app/lib/RepoSnapshot.scala +++ b/app/lib/RepoSnapshot.scala @@ -59,11 +59,12 @@ object RepoSnapshot { def log(message: String)(implicit repo: Repo): Unit = logger.info(s"${repo.full_name} - $message") def logAround[T](desc: String)(thunk: => Future[T])(implicit repo: Repo): Future[T] = { val start = System.currentTimeMillis() - thunk.onComplete { attempt => + val fut = thunk // evaluate thunk, evaluate only once! + fut.onComplete { attempt => val elapsedMs = System.currentTimeMillis() - start log(s"'$desc' $elapsedMs ms : success=${attempt.isSuccess}") } - thunk + fut } def isMergedToMain(pr: PullRequest)(implicit repo: Repo): Boolean = diff --git a/conf/logback.xml b/conf/logback.xml index 3a47bd7..611c9e1 100644 --- a/conf/logback.xml +++ b/conf/logback.xml @@ -11,7 +11,7 @@ - %coloredLevel %logger{15} - %message%n%xException{10} + %coloredLevel %logger{15} in %thread - %message%n%xException{10} diff --git a/test/lib/DogpileSpec.scala b/test/lib/DogpileSpec.scala new file mode 100644 index 0000000..f5d38a0 --- /dev/null +++ b/test/lib/DogpileSpec.scala @@ -0,0 +1,39 @@ +package lib + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import java.util.concurrent.Executors +import java.util.concurrent.atomic.{AtomicLong, LongAdder} +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, Future} + +class DogpileSpec extends AnyFlatSpec with Matchers { + it should "not concurrently execute the side-effecting function" in { + + val executionCount = new LongAdder() + val runningCounter = new AtomicLong(0) + val clashCounter = new LongAdder() + + val dogpile = new Dogpile[String]({ + val numRunning = runningCounter.incrementAndGet() + executionCount.increment() + if (numRunning > 1) { + clashCounter.increment() + } + Thread.sleep(10) + runningCounter.decrementAndGet() + Future.successful("OK") + }) + + implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(4)) + + val numExecutions = 20 + val allF = Future.traverse(1 to numExecutions)(_ => dogpile.doAtLeastOneMore()) + Await.ready(allF, 15.seconds) + + executionCount.longValue() shouldBe numExecutions + runningCounter.get() shouldBe 0 + clashCounter.longValue() shouldBe 0 + } +}