diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala b/cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala new file mode 100644 index 00000000000..6c2bd2254e2 --- /dev/null +++ b/cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.cluster + +import com.typesafe.config.{ Config, ConfigValue, ConfigValueFactory, ConfigValueType } + +import scala.annotation.nowarn + +private[cluster] object ConfigUtil { + + @nowarn("msg=deprecated") + def addAkkaConfig(cfg: Config, akkaVersion: String): Config = { + import scala.collection.JavaConverters._ + val innerSet = cfg.entrySet().asScala + .filter(e => e.getKey.startsWith("pekko.") && e.getValue.valueType() != ConfigValueType.OBJECT) + .map { entry => + entry.getKey.replace("pekko", "akka") -> adjustPackageNameIfNecessary(entry.getValue) + } + var newConfig = cfg + innerSet.foreach { case (key, value) => + newConfig = newConfig.withValue(key, value) + } + newConfig.withValue("akka.version", ConfigValueFactory.fromAnyRef(akkaVersion)) + } + + private def adjustPackageNameIfNecessary(cv: ConfigValue): ConfigValue = { + if (cv.valueType() == ConfigValueType.STRING) { + val str = cv.unwrapped().toString + if (str.startsWith("org.apache.pekko")) { + ConfigValueFactory.fromAnyRef(str.replace("org.apache.pekko", "akka")) + } else { + cv + } + } else { + cv + } + } + +} diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala b/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala index ab090e3615a..fb9eb722c22 100644 --- a/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala +++ b/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala @@ -47,6 +47,21 @@ private[cluster] abstract class SeedNodeProcess(joinConfigCompatChecker: JoinCon "Note that disabling it will allow the formation of a cluster with nodes having incompatible configuration settings. " + "This node will be shutdown!" + private lazy val needsAkkaConfig: Boolean = { + context.system.settings.config + .getStringList("pekko.remote.accept-protocol-names") + .contains("akka") + } + + private lazy val akkaVersion: String = { + val cfg = context.system.settings.config + if (cfg.hasPath("akka.version")) { + cfg.getString("akka.version") + } else { + cfg.getString("pekko.cluster.akka.version") + } + } + private def stopOrBecome(behavior: Option[Actor.Receive]): Unit = behavior match { case Some(done) => context.become(done) // JoinSeedNodeProcess @@ -65,8 +80,12 @@ private[cluster] abstract class SeedNodeProcess(joinConfigCompatChecker: JoinCon val configToValidate = JoinConfigCompatChecker.filterWithKeys(requiredNonSensitiveKeys, context.system.settings.config) + val adjustedConfig = if (needsAkkaConfig) + ConfigUtil.addAkkaConfig(configToValidate, akkaVersion) + else configToValidate + seedNodes.foreach { a => - context.actorSelection(context.parent.path.toStringWithAddress(a)) ! InitJoin(configToValidate) + context.actorSelection(context.parent.path.toStringWithAddress(a)) ! InitJoin(adjustedConfig) } } diff --git a/remote-tests/src/multi-jvm/scala/org/apache/pekko/remote/RemoteFeaturesSpec.scala b/remote-tests/src/multi-jvm/scala/org/apache/pekko/remote/RemoteFeaturesSpec.scala index 58e97b16811..8b7c1b9262e 100644 --- a/remote-tests/src/multi-jvm/scala/org/apache/pekko/remote/RemoteFeaturesSpec.scala +++ b/remote-tests/src/multi-jvm/scala/org/apache/pekko/remote/RemoteFeaturesSpec.scala @@ -260,7 +260,8 @@ abstract class RemotingFeaturesSpec(val multiNodeConfig: RemotingFeaturesConfig) remotePath, Nobody, None, - None) + None, + Set("pekko", "akka")) rar.start() rar diff --git a/remote/src/main/resources/reference.conf b/remote/src/main/resources/reference.conf index 2efc5c85e77..70fe09dda4d 100644 --- a/remote/src/main/resources/reference.conf +++ b/remote/src/main/resources/reference.conf @@ -184,6 +184,20 @@ pekko { # is 'off'. Set this to 'off' to suppress these. warn-unsafe-watch-outside-cluster = on + # When receiving requests from other remote actors, what are the valid + # prefix's to check against. Useful for when dealing with rolling cluster + # migrations with compatible systems such as Lightbend's Akka. + accept-protocol-names = ["pekko", "akka"] + + # The protocol name to use when sending requests to other remote actors. + # Useful when dealing with rolling migration, i.e. temporarily change + # the protocol name to match another compatible actor implementation + # such as Lightbend's "akka" (whilst making sure accept-protocol-names + # contains "akka") so that you can gracefully migrate all nodes to Apache + # Pekko and then change the protocol-name back to "pekko" once all + # nodes have been are running on Apache Pekko + protocol-name = "pekko" + # Settings for the Phi accrual failure detector (http://www.jaist.ac.jp/~defago/files/pdf/IS_RR_2004_010.pdf # [Hayashibara et al]) used for remote death watch. # The default PhiAccrualFailureDetector will trigger if there are no heartbeats within diff --git a/remote/src/main/scala/org/apache/pekko/remote/BoundAddressesExtension.scala b/remote/src/main/scala/org/apache/pekko/remote/BoundAddressesExtension.scala index 4139ada218e..5c5d8ac0741 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/BoundAddressesExtension.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/BoundAddressesExtension.scala @@ -38,11 +38,13 @@ object BoundAddressesExtension extends ExtensionId[BoundAddressesExtension] with class BoundAddressesExtension(val system: ExtendedActorSystem) extends Extension { + private val remoteSettings: RemoteSettings = new RemoteSettings(system.settings.config) + /** * Returns a mapping from a protocol to a set of bound addresses. */ def boundAddresses: Map[String, Set[Address]] = system.provider.asInstanceOf[RemoteActorRefProvider].transport match { - case artery: ArteryTransport => Map(ArteryTransport.ProtocolName -> Set(artery.bindAddress.address)) + case artery: ArteryTransport => Map(remoteSettings.ProtocolName -> Set(artery.bindAddress.address)) case remoting: Remoting => remoting.boundAddresses case other => throw new IllegalStateException(s"Unexpected transport type: ${other.getClass}") } diff --git a/remote/src/main/scala/org/apache/pekko/remote/RemoteActorRefProvider.scala b/remote/src/main/scala/org/apache/pekko/remote/RemoteActorRefProvider.scala index edd9cc8561d..74a3055c499 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/RemoteActorRefProvider.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/RemoteActorRefProvider.scala @@ -464,7 +464,8 @@ private[pekko] class RemoteActorRefProvider( val rpath = (RootActorPath(address) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements) .withUid(path.uid) - new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d)) + new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d), + remoteSettings.AcceptProtocolNames) } else { warnIfNotRemoteActorRef(path) local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false, async) @@ -488,7 +489,8 @@ private[pekko] class RemoteActorRefProvider( RootActorPath(address), Nobody, props = None, - deploy = None) + deploy = None, + acceptProtocolNames = remoteSettings.AcceptProtocolNames) } catch { case NonFatal(e) => log.error(e, "No root guardian at [{}]", address) @@ -513,7 +515,8 @@ private[pekko] class RemoteActorRefProvider( RootActorPath(address) / elems, Nobody, props = None, - deploy = None) + deploy = None, + acceptProtocolNames = remoteSettings.AcceptProtocolNames) } catch { case NonFatal(e) => log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage) @@ -555,7 +558,8 @@ private[pekko] class RemoteActorRefProvider( rootPath, Nobody, props = None, - deploy = None) + deploy = None, + acceptProtocolNames = remoteSettings.AcceptProtocolNames) } catch { case NonFatal(e) => log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage) @@ -578,7 +582,8 @@ private[pekko] class RemoteActorRefProvider( path, Nobody, props = None, - deploy = None) + deploy = None, + acceptProtocolNames = remoteSettings.AcceptProtocolNames) } catch { case NonFatal(e) => log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage) @@ -672,7 +677,8 @@ private[pekko] class RemoteActorRef private[pekko] ( val path: ActorPath, val getParent: InternalActorRef, props: Option[Props], - deploy: Option[Deploy]) + deploy: Option[Deploy], + val acceptProtocolNames: Set[String]) extends InternalActorRef with RemoteRef { @@ -680,10 +686,17 @@ private[pekko] class RemoteActorRef private[pekko] ( throw new IllegalArgumentException(s"Unexpected local address in RemoteActorRef [$this]") remote match { - case t: ArteryTransport => - // detect mistakes such as using "pekko.tcp" with Artery - if (path.address.protocol != t.localAddress.address.protocol) - throw new IllegalArgumentException(s"Wrong protocol of [$path], expected [${t.localAddress.address.protocol}]") + case _: ArteryTransport => + // detect mistakes such as using "pekko.tcp" with Artery, also handles pekko.remote.accept-protocol-names + if (!acceptProtocolNames.contains(path.address.protocol)) { + val expectedString = if (acceptProtocolNames.size == 1) + "expected" + else + "expected one of" + + throw new IllegalArgumentException( + s"Wrong protocol of [$path], $expectedString [${acceptProtocolNames.mkString}]") + } case _ => } @volatile private[remote] var cachedAssociation: artery.Association = null @@ -697,7 +710,8 @@ private[pekko] class RemoteActorRef private[pekko] ( s.headOption match { case None => this case Some("..") => getParent.getChild(name) - case _ => new RemoteActorRef(remote, localAddressToUse, path / s, Nobody, props = None, deploy = None) + case _ => new RemoteActorRef(remote, localAddressToUse, path / s, Nobody, props = None, deploy = None, + acceptProtocolNames = acceptProtocolNames) } } diff --git a/remote/src/main/scala/org/apache/pekko/remote/RemoteSettings.scala b/remote/src/main/scala/org/apache/pekko/remote/RemoteSettings.scala index dd2fe5b8fdd..83472de3ed4 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/RemoteSettings.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/RemoteSettings.scala @@ -199,6 +199,12 @@ final class RemoteSettings(val config: Config) { @deprecated("Classic remoting is deprecated, use Artery", "Akka 2.6.0") val Adapters: Map[String, String] = configToMap(getConfig("pekko.remote.classic.adapters")) + val ProtocolName: String = getString("pekko.remote.protocol-name") + + val AcceptProtocolNames: Set[String] = + immutableSeq(getStringList("pekko.remote.accept-protocol-names")).toSet.requiring(_.nonEmpty, + "accept-protocol-names must be non empty") + private def transportNames: immutable.Seq[String] = immutableSeq(getStringList("pekko.remote.classic.enabled-transports")) diff --git a/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala b/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala index fb66fba6a47..44103d42eda 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala @@ -393,12 +393,12 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr val (port, boundPort) = bindInboundStreams() _localAddress = UniqueAddress( - Address(ArteryTransport.ProtocolName, system.name, settings.Canonical.Hostname, port), + Address(provider.remoteSettings.ProtocolName, system.name, settings.Canonical.Hostname, port), AddressUidExtension(system).longAddressUid) _addresses = Set(_localAddress.address) _bindAddress = UniqueAddress( - Address(ArteryTransport.ProtocolName, system.name, settings.Bind.Hostname, boundPort), + Address(provider.remoteSettings.ProtocolName, system.name, settings.Bind.Hostname, boundPort), AddressUidExtension(system).longAddressUid) flightRecorder.transportUniqueAddressSet(_localAddress) @@ -954,8 +954,6 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr */ private[remote] object ArteryTransport { - val ProtocolName = "pekko" - // Note that the used version of the header format for outbound messages is defined in // `ArterySettings.Version` because that may depend on configuration settings. // This is the highest supported version on receiving (decoding) side. diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/PekkoProtocolTransport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/PekkoProtocolTransport.scala index 902faf65780..1540ce73b39 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/PekkoProtocolTransport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/PekkoProtocolTransport.scala @@ -69,11 +69,12 @@ private[remote] class PekkoProtocolSettings(config: Config) { .getMillisDuration("pekko.remote.classic.handshake-timeout") .requiring(_ > Duration.Zero, "handshake-timeout must be > 0") } + + val PekkoScheme: String = new RemoteSettings(config).ProtocolName } @nowarn("msg=deprecated") private[remote] object PekkoProtocolTransport { // Couldn't these go into the Remoting Extension/ RemoteSettings instead? - val PekkoScheme: String = "pekko" val PekkoOverhead: Int = 0 // Don't know yet val UniqueId = new java.util.concurrent.atomic.AtomicInteger(0) @@ -122,7 +123,7 @@ private[remote] class PekkoProtocolTransport( private val codec: PekkoPduCodec) extends ActorTransportAdapter(wrappedTransport, system) { - override val addedSchemeIdentifier: String = PekkoScheme + override val addedSchemeIdentifier: String = new RemoteSettings(system.settings.config).ProtocolName override def managementCommand(cmd: Any): Future[Boolean] = wrappedTransport.managementCommand(cmd) @@ -229,8 +230,9 @@ private[remote] class PekkoProtocolHandle( _wrappedHandle: AssociationHandle, val handshakeInfo: HandshakeInfo, private val stateActor: ActorRef, - private val codec: PekkoPduCodec) - extends AbstractTransportAdapterHandle(_localAddress, _remoteAddress, _wrappedHandle, PekkoScheme) { + private val codec: PekkoPduCodec, + override val addedSchemeIdentifier: String) + extends AbstractTransportAdapterHandle(_localAddress, _remoteAddress, _wrappedHandle, addedSchemeIdentifier) { override def write(payload: ByteString): Boolean = wrappedHandle.write(codec.constructPayload(payload)) @@ -713,7 +715,8 @@ private[remote] class ProtocolStateActor( wrappedHandle, handshakeInfo, self, - codec)) + codec, + settings.PekkoScheme)) readHandlerPromise.future } @@ -733,7 +736,8 @@ private[remote] class ProtocolStateActor( wrappedHandle, handshakeInfo, self, - codec))) + codec, + settings.PekkoScheme))) readHandlerPromise.future } diff --git a/remote/src/test/scala/org/apache/pekko/remote/classic/RemoteDeathWatchSpec.scala b/remote/src/test/scala/org/apache/pekko/remote/classic/RemoteDeathWatchSpec.scala index c710966b905..c448719f3e6 100644 --- a/remote/src/test/scala/org/apache/pekko/remote/classic/RemoteDeathWatchSpec.scala +++ b/remote/src/test/scala/org/apache/pekko/remote/classic/RemoteDeathWatchSpec.scala @@ -134,7 +134,8 @@ pekko.actor.warn-about-java-serializer-usage = off extinctPath, Nobody, props = None, - deploy = None) + deploy = None, + acceptProtocolNames = Set("pekko", "akka")) val probe = TestProbe() probe.watch(extinctRef)