diff --git a/remote/src/main/mima-filters/1.1.x.backwards.excludes/quarantine.backwards.excludes b/remote/src/main/mima-filters/1.1.x.backwards.excludes/quarantine.backwards.excludes new file mode 100644 index 00000000000..bcf94007380 --- /dev/null +++ b/remote/src/main/mima-filters/1.1.x.backwards.excludes/quarantine.backwards.excludes @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# changes made due to issues with downing during harmless quarantine +# https://github.com/apache/pekko/issues/578 +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.artery.AssociationState#QuarantinedTimestamp.copy") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.artery.AssociationState#QuarantinedTimestamp.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.artery.AssociationState#QuarantinedTimestamp.apply") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.artery.AssociationState#QuarantinedTimestamp.unapply") +ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.artery.AssociationState$QuarantinedTimestamp$") diff --git a/remote/src/main/resources/reference.conf b/remote/src/main/resources/reference.conf index 9049be1ef11..4c4edff858e 100644 --- a/remote/src/main/resources/reference.conf +++ b/remote/src/main/resources/reference.conf @@ -850,6 +850,11 @@ pekko { # limit there will be extra performance and scalability cost. log-frame-size-exceeding = off + # If set to "on", InboundQuarantineCheck will propagate harmless quarantine events. + # This is the legacy behavior. Users who see these harmless quarantine events lead + # to problems can set this to "off" to suppress them (https://github.com/apache/pekko/pull/1555). + propagate-harmless-quarantine-events = on + advanced { # Maximum serialized message size, including header data. diff --git a/remote/src/main/scala/org/apache/pekko/remote/artery/ArterySettings.scala b/remote/src/main/scala/org/apache/pekko/remote/artery/ArterySettings.scala index 85c54c74bcd..8b257b27e4e 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/artery/ArterySettings.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/artery/ArterySettings.scala @@ -105,6 +105,13 @@ private[pekko] final class ArterySettings private (config: Config) { */ val Version: Byte = ArteryTransport.HighestVersion + /** + * If set to true, harmless quarantine events are propagated in InboundQuarantineCheck. + * Background is in https://github.com/apache/pekko/pull/1555 + */ + val PropagateHarmlessQuarantineEvents: Boolean = + getBoolean("propagate-harmless-quarantine-events") + object Advanced { val config: Config = getConfig("advanced") import config._ diff --git a/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala b/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala index 44103d42eda..76e618b758a 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala @@ -108,9 +108,9 @@ private[remote] object AssociationState { quarantined = ImmutableLongMap.empty[QuarantinedTimestamp], new AtomicReference(UniqueRemoteAddressValue(None, Nil))) - final case class QuarantinedTimestamp(nanoTime: Long) { + final case class QuarantinedTimestamp(nanoTime: Long, harmless: Boolean = false) { override def toString: String = - s"Quarantined ${TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - nanoTime)} seconds ago" + s"Quarantined ${TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - nanoTime)} seconds ago (harmless=$harmless)" } private final case class UniqueRemoteAddressValue( @@ -159,6 +159,13 @@ private[remote] final class AssociationState private ( def isQuarantined(uid: Long): Boolean = quarantined.contains(uid) + def quarantinedButHarmless(uid: Long): Boolean = { + quarantined.get(uid) match { + case OptionVal.Some(qt) => qt.harmless + case _ => false + } + } + @tailrec def completeUniqueRemoteAddress(peer: UniqueAddress): Unit = { val current = _uniqueRemoteAddress.get() if (current.uniqueRemoteAddress.isEmpty) { @@ -196,14 +203,14 @@ private[remote] final class AssociationState private ( quarantined, new AtomicReference(UniqueRemoteAddressValue(Some(remoteAddress), Nil))) - def newQuarantined(): AssociationState = + def newQuarantined(harmless: Boolean = false): AssociationState = uniqueRemoteAddress() match { case Some(a) => new AssociationState( incarnation, lastUsedTimestamp = new AtomicLong(System.nanoTime()), controlIdleKillSwitch, - quarantined = quarantined.updated(a.uid, QuarantinedTimestamp(System.nanoTime())), + quarantined = quarantined.updated(a.uid, QuarantinedTimestamp(System.nanoTime(), harmless)), _uniqueRemoteAddress) case None => this } diff --git a/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala b/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala index 6bf999338ff..b97fef185cf 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala @@ -538,7 +538,7 @@ private[remote] class Association( current.uniqueRemoteAddress() match { case Some(peer) if peer.uid == u => if (!current.isQuarantined(u)) { - val newState = current.newQuarantined() + val newState = current.newQuarantined(harmless) if (swapState(current, newState)) { // quarantine state change was performed if (harmless) { diff --git a/remote/src/main/scala/org/apache/pekko/remote/artery/InboundQuarantineCheck.scala b/remote/src/main/scala/org/apache/pekko/remote/artery/InboundQuarantineCheck.scala index 7e845457124..c6d20941497 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/artery/InboundQuarantineCheck.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/artery/InboundQuarantineCheck.scala @@ -45,17 +45,27 @@ private[remote] class InboundQuarantineCheck(inboundContext: InboundContext) env.association match { case OptionVal.Some(association) => if (association.associationState.isQuarantined(env.originUid)) { - if (log.isDebugEnabled) - log.debug( - "Dropping message [{}] from [{}#{}] because the system is quarantined", + if (!inboundContext.settings.PropagateHarmlessQuarantineEvents + && association.associationState.quarantinedButHarmless(env.originUid)) { + log.info( + "Message [{}] from [{}#{}] was dropped. " + + "The system is quarantined but the UID is known to be harmless.", Logging.messageClassName(env.message), association.remoteAddress, env.originUid) - // avoid starting outbound stream for heartbeats - if (!env.message.isInstanceOf[Quarantined] && !isHeartbeat(env.message)) - inboundContext.sendControl( - association.remoteAddress, - Quarantined(inboundContext.localAddress, UniqueAddress(association.remoteAddress, env.originUid))) + } else { + if (log.isDebugEnabled) + log.debug( + "Dropping message [{}] from [{}#{}] because the system is quarantined", + Logging.messageClassName(env.message), + association.remoteAddress, + env.originUid) + // avoid starting outbound stream for heartbeats + if (!env.message.isInstanceOf[Quarantined] && !isHeartbeat(env.message)) + inboundContext.sendControl( + association.remoteAddress, + Quarantined(inboundContext.localAddress, UniqueAddress(association.remoteAddress, env.originUid))) + } pull(in) } else push(out, env) diff --git a/remote/src/test/scala/org/apache/pekko/remote/artery/HarmlessQuarantineSpec.scala b/remote/src/test/scala/org/apache/pekko/remote/artery/HarmlessQuarantineSpec.scala new file mode 100644 index 00000000000..eae4dd97a97 --- /dev/null +++ b/remote/src/test/scala/org/apache/pekko/remote/artery/HarmlessQuarantineSpec.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) 2009-2022 Lightbend Inc. + */ + +package org.apache.pekko.remote.artery + +import scala.concurrent.Future +import scala.concurrent.Promise + +import org.scalatest.concurrent.Eventually +import org.scalatest.time.Span + +import org.apache.pekko +import pekko.actor.ActorRef +import pekko.actor.ActorSystem +import pekko.actor.Address +import pekko.actor.RootActorPath +import pekko.remote.RARP +import pekko.remote.UniqueAddress +import pekko.testkit.ImplicitSender +import pekko.testkit.TestActors +import pekko.testkit.TestProbe + +class HarmlessQuarantineSpec extends ArteryMultiNodeSpec(""" + pekko.loglevel=INFO + pekko.remote.artery.propagate-harmless-quarantine-events = off + pekko.remote.artery.advanced { + stop-idle-outbound-after = 1 s + connection-timeout = 2 s + remove-quarantined-association-after = 1 s + compression { + actor-refs.advertisement-interval = 5 seconds + } + } + """) with ImplicitSender with Eventually { + + override implicit val patience: PatienceConfig = { + import pekko.testkit.TestDuration + PatienceConfig(testKitSettings.DefaultTimeout.duration.dilated * 2, Span(200, org.scalatest.time.Millis)) + } + + private def futureUniqueRemoteAddress(association: Association): Future[UniqueAddress] = { + val p = Promise[UniqueAddress]() + association.associationState.addUniqueRemoteAddressListener(a => p.success(a)) + p.future + } + + "Harmless Quarantine Events" should { + + "eliminate quarantined association when not used - echo test" in withAssociation { + (remoteSystem, remoteAddress, _, localArtery, localProbe) => + // event to watch out for, indicator of the issue + remoteSystem.eventStream.subscribe(testActor, classOf[ThisActorSystemQuarantinedEvent]) + + val remoteEcho = remoteSystem.actorSelection("/user/echo").resolveOne(remainingOrDefault).futureValue + + val localAddress = RARP(system).provider.getDefaultAddress + + val localEchoRef = + remoteSystem.actorSelection(RootActorPath(localAddress) / localProbe.ref.path.elements).resolveOne( + remainingOrDefault).futureValue + remoteEcho.tell("ping", localEchoRef) + localProbe.expectMsg("ping") + + val association = localArtery.association(remoteAddress) + val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid + localArtery.quarantine(remoteAddress, Some(remoteUid), "Test") + association.associationState.isQuarantined(remoteUid) shouldBe true + association.associationState.quarantinedButHarmless(remoteUid) shouldBe false + + remoteEcho.tell("ping", localEchoRef) // trigger sending message from remote to local, which will trigger local to wrongfully notify remote that it is quarantined + eventually { + expectMsgType[ThisActorSystemQuarantinedEvent] // this is what remote emits when it learns it is quarantined by local + } + } + + "eliminate quarantined association when not used - echo test (harmless=true)" in withAssociation { + (remoteSystem, remoteAddress, _, localArtery, localProbe) => + // event to watch out for, indicator of the issue + remoteSystem.eventStream.subscribe(testActor, classOf[ThisActorSystemQuarantinedEvent]) + + val remoteEcho = remoteSystem.actorSelection("/user/echo").resolveOne(remainingOrDefault).futureValue + + val localAddress = RARP(system).provider.getDefaultAddress + + val localEchoRef = + remoteSystem.actorSelection(RootActorPath(localAddress) / localProbe.ref.path.elements).resolveOne( + remainingOrDefault).futureValue + remoteEcho.tell("ping", localEchoRef) + localProbe.expectMsg("ping") + + val association = localArtery.association(remoteAddress) + val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid + localArtery.quarantine(remoteAddress, Some(remoteUid), "HarmlessTest", harmless = true) + association.associationState.isQuarantined(remoteUid) shouldBe true + association.associationState.quarantinedButHarmless(remoteUid) shouldBe true + + remoteEcho.tell("ping", localEchoRef) // trigger sending message from remote to local, which will trigger local to wrongfully notify remote that it is quarantined + eventually { + expectNoMessage() + } + } + + /** + * Test setup fixture: + * 1. A 'remote' ActorSystem is created to spawn an Echo actor, + * 2. A TestProbe is spawned locally to initiate communication with the Echo actor + * 3. Details (remoteAddress, remoteEcho, localArtery, localProbe) are supplied to the test + */ + def withAssociation(test: (ActorSystem, Address, ActorRef, ArteryTransport, TestProbe) => Any): Unit = { + val remoteSystem = newRemoteSystem() + try { + remoteSystem.actorOf(TestActors.echoActorProps, "echo") + val remoteAddress = RARP(remoteSystem).provider.getDefaultAddress + + def remoteEcho = system.actorSelection(RootActorPath(remoteAddress) / "user" / "echo") + + val echoRef = remoteEcho.resolveOne(remainingOrDefault).futureValue + val localProbe = new TestProbe(localSystem) + + echoRef.tell("ping", localProbe.ref) + localProbe.expectMsg("ping") + + val artery = RARP(system).provider.transport.asInstanceOf[ArteryTransport] + + test(remoteSystem, remoteAddress, echoRef, artery, localProbe) + + } finally { + shutdown(remoteSystem) + } + } + } +} diff --git a/remote/src/test/scala/org/apache/pekko/remote/artery/OutboundIdleShutdownSpec.scala b/remote/src/test/scala/org/apache/pekko/remote/artery/OutboundIdleShutdownSpec.scala index 39beddb4a23..8846c40f447 100644 --- a/remote/src/test/scala/org/apache/pekko/remote/artery/OutboundIdleShutdownSpec.scala +++ b/remote/src/test/scala/org/apache/pekko/remote/artery/OutboundIdleShutdownSpec.scala @@ -31,13 +31,15 @@ import pekko.testkit.ImplicitSender import pekko.testkit.TestActors import pekko.testkit.TestProbe -class OutboundIdleShutdownSpec extends ArteryMultiNodeSpec(s""" +class OutboundIdleShutdownSpec extends ArteryMultiNodeSpec(""" pekko.loglevel=INFO - pekko.remote.artery.advanced.stop-idle-outbound-after = 1 s - pekko.remote.artery.advanced.connection-timeout = 2 s - pekko.remote.artery.advanced.remove-quarantined-association-after = 1 s - pekko.remote.artery.advanced.compression { - actor-refs.advertisement-interval = 5 seconds + pekko.remote.artery.advanced { + stop-idle-outbound-after = 1 s + connection-timeout = 2 s + remove-quarantined-association-after = 1 s + compression { + actor-refs.advertisement-interval = 5 seconds + } } """) with ImplicitSender with Eventually { @@ -116,6 +118,8 @@ class OutboundIdleShutdownSpec extends ArteryMultiNodeSpec(s""" val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid localArtery.quarantine(remoteAddress, Some(remoteUid), "Test") + association.associationState.isQuarantined(remoteUid) shouldBe true + association.associationState.quarantinedButHarmless(remoteUid) shouldBe false eventually { assertStreamActive(association, Association.ControlQueueIndex, expected = false) @@ -128,6 +132,80 @@ class OutboundIdleShutdownSpec extends ArteryMultiNodeSpec(s""" } } + "eliminate quarantined association when not used (harmless=true)" in withAssociation { + (_, remoteAddress, _, localArtery, _) => + val association = localArtery.association(remoteAddress) + val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid + + localArtery.quarantine(remoteAddress, Some(remoteUid), "HarmlessTest", harmless = true) + association.associationState.isQuarantined(remoteUid) shouldBe true + association.associationState.quarantinedButHarmless(remoteUid) shouldBe true + + eventually { + assertStreamActive(association, Association.ControlQueueIndex, expected = false) + assertStreamActive(association, Association.OrdinaryQueueIndex, expected = false) + } + + // the outbound streams are inactive and association quarantined, then it's completely removed + eventually { + localArtery.remoteAddresses should not contain remoteAddress + } + } + + "eliminate quarantined association when not used - echo test" in withAssociation { + (remoteSystem, remoteAddress, _, localArtery, localProbe) => + // event to watch out for, indicator of the issue + remoteSystem.eventStream.subscribe(testActor, classOf[ThisActorSystemQuarantinedEvent]) + + val remoteEcho = remoteSystem.actorSelection("/user/echo").resolveOne(remainingOrDefault).futureValue + + val localAddress = RARP(system).provider.getDefaultAddress + + val localEchoRef = + remoteSystem.actorSelection(RootActorPath(localAddress) / localProbe.ref.path.elements).resolveOne( + remainingOrDefault).futureValue + remoteEcho.tell("ping", localEchoRef) + localProbe.expectMsg("ping") + + val association = localArtery.association(remoteAddress) + val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid + localArtery.quarantine(remoteAddress, Some(remoteUid), "Test") + association.associationState.isQuarantined(remoteUid) shouldBe true + association.associationState.quarantinedButHarmless(remoteUid) shouldBe false + + remoteEcho.tell("ping", localEchoRef) // trigger sending message from remote to local, which will trigger local to wrongfully notify remote that it is quarantined + eventually { + expectMsgType[ThisActorSystemQuarantinedEvent] // this is what remote emits when it learns it is quarantined by local + } + } + + "eliminate quarantined association when not used - echo test (harmless=true)" in withAssociation { + (remoteSystem, remoteAddress, _, localArtery, localProbe) => + // event to watch out for, indicator of the issue + remoteSystem.eventStream.subscribe(testActor, classOf[ThisActorSystemQuarantinedEvent]) + + val remoteEcho = remoteSystem.actorSelection("/user/echo").resolveOne(remainingOrDefault).futureValue + + val localAddress = RARP(system).provider.getDefaultAddress + + val localEchoRef = + remoteSystem.actorSelection(RootActorPath(localAddress) / localProbe.ref.path.elements).resolveOne( + remainingOrDefault).futureValue + remoteEcho.tell("ping", localEchoRef) + localProbe.expectMsg("ping") + + val association = localArtery.association(remoteAddress) + val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid + localArtery.quarantine(remoteAddress, Some(remoteUid), "HarmlessTest", harmless = true) + association.associationState.isQuarantined(remoteUid) shouldBe true + association.associationState.quarantinedButHarmless(remoteUid) shouldBe true + + remoteEcho.tell("ping", localEchoRef) // trigger sending message from remote to local, which will trigger local to wrongfully notify remote that it is quarantined + eventually { + expectMsgType[ThisActorSystemQuarantinedEvent] // this is what remote emits when it learns it is quarantined by local + } + } + "remove inbound compression after quarantine" in withAssociation { (_, remoteAddress, _, localArtery, _) => val association = localArtery.association(remoteAddress) val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid