Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix double-execution of repo-scanning functions #92

Merged
merged 1 commit into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion app/lib/Dogpile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class Dogpile[R](thing: => Future[R]) {
*
* @return a future for a run which has been initiated at or after this call
*/
def doAtLeastOneMore(): Future[R] = stateRef.updateAndGet { previousState =>
def doAtLeastOneMore(): Future[R] = stateRef.updateAndGet { // TODO updateAndGet shouldn't handle side-effects
Copy link
Member Author

@rtyley rtyley Feb 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is concerning to me, but doesn't seem to be currently causing severe problems: the docs for AtomicReference. updateAndGet() say:

The function should be side-effect-free, since it may be re-applied when attempted updates fail due to contention among threads.

I only realised this while trying to hunt down the cause for double-executions just now, and thing is that the function we're using here is not side-effect-free - it's very very side-effecty, labelling repos, creating GitHub comments, etc.

This code should be updated in another PR to be truly safe, and run the side-effecting code with the desired semantics, which are closer to 'throttle-last'.

previousState =>
if (previousState.scanFuture.isCompleted) ScanRun(thing) else {
previousState match {
case ScanQueued(_) => previousState
Expand Down
4 changes: 2 additions & 2 deletions app/lib/Droid.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ class Droid(
val logger = Logger(getClass)

def scan(repoId: RepoId): Future[Seq[PullRequestCheckpointsStateChangeSummary]] = {
logger.info(s"Asked to audit $repoId")
logger.info(s"Asked to audit ${repoId.fullName}")

for {
repoSnapshot <- repoSnapshotFactory.snapshot(repoId)
pullRequestUpdates <- processMergedPullRequestsOn(repoSnapshot)
activeSnapshots <- repoSnapshot.activeSnapshotsF
} yield {
logger.info(s"$repoId has ${activeSnapshots.size} active snapshots : ${activeSnapshots.map(s => s.checkpoint.name -> s.commitIdTry.map(_.map(_.shortName).getOrElse("None"))).toMap}")
logger.info(s"${repoId.fullName} has ${activeSnapshots.size} active snapshots : ${activeSnapshots.map(s => s.checkpoint.name -> s.commitIdTry.map(_.map(_.shortName).getOrElse("None"))).toMap}")
pullRequestUpdates
}
}
Expand Down
5 changes: 3 additions & 2 deletions app/lib/RepoSnapshot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,12 @@ object RepoSnapshot {
def log(message: String)(implicit repo: Repo): Unit = logger.info(s"${repo.full_name} - $message")
def logAround[T](desc: String)(thunk: => Future[T])(implicit repo: Repo): Future[T] = {
val start = System.currentTimeMillis()
thunk.onComplete { attempt =>
val fut = thunk // evaluate thunk, evaluate only once!
fut.onComplete { attempt =>
val elapsedMs = System.currentTimeMillis() - start
log(s"'$desc' $elapsedMs ms : success=${attempt.isSuccess}")
}
thunk
fut
Comment on lines -62 to +67
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the key part of the fix - we just evaluate thunk once, rather than twice!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👏

}

def isMergedToMain(pr: PullRequest)(implicit repo: Repo): Boolean =
Expand Down
2 changes: 1 addition & 1 deletion conf/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%coloredLevel %logger{15} - %message%n%xException{10}</pattern>
<pattern>%coloredLevel %logger{15} in %thread - %message%n%xException{10}</pattern>
</encoder>
</appender>

Expand Down
39 changes: 39 additions & 0 deletions test/lib/DogpileSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package lib

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.util.concurrent.Executors
import java.util.concurrent.atomic.{AtomicLong, LongAdder}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}

class DogpileSpec extends AnyFlatSpec with Matchers {
it should "not concurrently execute the side-effecting function" in {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this test while trying to debug this issue. It never reproduced the problem - because the problem wasn't in Dogpile - and due to the unpredictable nature of concurrency problems, the test wouldn't have been guaranteed to spot one if there was one - but I guess it's a reasonable statement of intent.


val executionCount = new LongAdder()
val runningCounter = new AtomicLong(0)
val clashCounter = new LongAdder()

val dogpile = new Dogpile[String]({
val numRunning = runningCounter.incrementAndGet()
executionCount.increment()
if (numRunning > 1) {
clashCounter.increment()
}
Thread.sleep(10)
runningCounter.decrementAndGet()
Future.successful("OK")
})

implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(4))

val numExecutions = 20
val allF = Future.traverse(1 to numExecutions)(_ => dogpile.doAtLeastOneMore())
Await.ready(allF, 15.seconds)

executionCount.intValue() should be <= numExecutions
Copy link
Member Author

@rtyley rtyley Feb 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, in this test, what I would hope is that the execution count is more like 2, rather than 20, but due to the behaviour of AtomicReference.updateAndGet() we're currently getting the full 20.

runningCounter.get() shouldBe 0
clashCounter.longValue() shouldBe 0
}
}