Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle mixed akka/pekko protocol names #765

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.cluster

import com.typesafe.config.{ Config, ConfigValue, ConfigValueFactory, ConfigValueType }

import scala.annotation.nowarn

private[cluster] object ConfigUtil {

@nowarn("msg=deprecated")
def addAkkaConfig(cfg: Config, akkaVersion: String): Config = {
import scala.collection.JavaConverters._
val innerSet = cfg.entrySet().asScala
.filter(e => e.getKey.startsWith("pekko.") && e.getValue.valueType() != ConfigValueType.OBJECT)
.map { entry =>
entry.getKey.replace("pekko", "akka") -> adjustPackageNameIfNecessary(entry.getValue)
}
var newConfig = cfg
innerSet.foreach { case (key, value) =>
newConfig = newConfig.withValue(key, value)
}
newConfig.withValue("akka.version", ConfigValueFactory.fromAnyRef(akkaVersion))
}

private def adjustPackageNameIfNecessary(cv: ConfigValue): ConfigValue = {
if (cv.valueType() == ConfigValueType.STRING) {
val str = cv.unwrapped().toString
if (str.startsWith("org.apache.pekko")) {
ConfigValueFactory.fromAnyRef(str.replace("org.apache.pekko", "akka"))
} else {
cv
}
} else {
cv
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,21 @@ private[cluster] abstract class SeedNodeProcess(joinConfigCompatChecker: JoinCon
"Note that disabling it will allow the formation of a cluster with nodes having incompatible configuration settings. " +
"This node will be shutdown!"

private lazy val needsAkkaConfig: Boolean = {
context.system.settings.config
.getStringList("pekko.remote.accept-protocol-names")
.contains("akka")
}

private lazy val akkaVersion: String = {
val cfg = context.system.settings.config
if (cfg.hasPath("akka.version")) {
cfg.getString("akka.version")
} else {
cfg.getString("pekko.cluster.akka.version")
}
}

private def stopOrBecome(behavior: Option[Actor.Receive]): Unit =
behavior match {
case Some(done) => context.become(done) // JoinSeedNodeProcess
Expand All @@ -65,8 +80,12 @@ private[cluster] abstract class SeedNodeProcess(joinConfigCompatChecker: JoinCon
val configToValidate =
JoinConfigCompatChecker.filterWithKeys(requiredNonSensitiveKeys, context.system.settings.config)

val adjustedConfig = if (needsAkkaConfig)
ConfigUtil.addAkkaConfig(configToValidate, akkaVersion)
else configToValidate

seedNodes.foreach { a =>
context.actorSelection(context.parent.path.toStringWithAddress(a)) ! InitJoin(configToValidate)
context.actorSelection(context.parent.path.toStringWithAddress(a)) ! InitJoin(adjustedConfig)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ abstract class RemotingFeaturesSpec(val multiNodeConfig: RemotingFeaturesConfig)
remotePath,
Nobody,
None,
None)
None,
Set("pekko", "akka"))

rar.start()
rar
Expand Down
14 changes: 14 additions & 0 deletions remote/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,20 @@ pekko {
# is 'off'. Set this to 'off' to suppress these.
warn-unsafe-watch-outside-cluster = on

# When receiving requests from other remote actors, what are the valid
# prefix's to check against. Useful for when dealing with rolling cluster
# migrations with compatible systems such as Lightbend's Akka.
accept-protocol-names = ["pekko", "akka"]
pjfanning marked this conversation as resolved.
Show resolved Hide resolved

# The protocol name to use when sending requests to other remote actors.
# Useful when dealing with rolling migration, i.e. temporarily change
# the protocol name to match another compatible actor implementation
# such as Lightbend's "akka" (whilst making sure accept-protocol-names
# contains "akka") so that you can gracefully migrate all nodes to Apache
# Pekko and then change the protocol-name back to "pekko" once all
# nodes have been are running on Apache Pekko
protocol-name = "pekko"

# Settings for the Phi accrual failure detector (http://www.jaist.ac.jp/~defago/files/pdf/IS_RR_2004_010.pdf
# [Hayashibara et al]) used for remote death watch.
# The default PhiAccrualFailureDetector will trigger if there are no heartbeats within
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ object BoundAddressesExtension extends ExtensionId[BoundAddressesExtension] with

class BoundAddressesExtension(val system: ExtendedActorSystem) extends Extension {

private val remoteSettings: RemoteSettings = new RemoteSettings(system.settings.config)

/**
* Returns a mapping from a protocol to a set of bound addresses.
*/
def boundAddresses: Map[String, Set[Address]] = system.provider.asInstanceOf[RemoteActorRefProvider].transport match {
case artery: ArteryTransport => Map(ArteryTransport.ProtocolName -> Set(artery.bindAddress.address))
case artery: ArteryTransport => Map(remoteSettings.ProtocolName -> Set(artery.bindAddress.address))
case remoting: Remoting => remoting.boundAddresses
case other => throw new IllegalStateException(s"Unexpected transport type: ${other.getClass}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,8 @@ private[pekko] class RemoteActorRefProvider(
val rpath =
(RootActorPath(address) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements)
.withUid(path.uid)
new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d))
new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d),
remoteSettings.AcceptProtocolNames)
} else {
warnIfNotRemoteActorRef(path)
local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false, async)
Expand All @@ -488,7 +489,8 @@ private[pekko] class RemoteActorRefProvider(
RootActorPath(address),
Nobody,
props = None,
deploy = None)
deploy = None,
acceptProtocolNames = remoteSettings.AcceptProtocolNames)
} catch {
case NonFatal(e) =>
log.error(e, "No root guardian at [{}]", address)
Expand All @@ -513,7 +515,8 @@ private[pekko] class RemoteActorRefProvider(
RootActorPath(address) / elems,
Nobody,
props = None,
deploy = None)
deploy = None,
acceptProtocolNames = remoteSettings.AcceptProtocolNames)
} catch {
case NonFatal(e) =>
log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage)
Expand Down Expand Up @@ -555,7 +558,8 @@ private[pekko] class RemoteActorRefProvider(
rootPath,
Nobody,
props = None,
deploy = None)
deploy = None,
acceptProtocolNames = remoteSettings.AcceptProtocolNames)
} catch {
case NonFatal(e) =>
log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage)
Expand All @@ -578,7 +582,8 @@ private[pekko] class RemoteActorRefProvider(
path,
Nobody,
props = None,
deploy = None)
deploy = None,
acceptProtocolNames = remoteSettings.AcceptProtocolNames)
} catch {
case NonFatal(e) =>
log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage)
Expand Down Expand Up @@ -672,18 +677,26 @@ private[pekko] class RemoteActorRef private[pekko] (
val path: ActorPath,
val getParent: InternalActorRef,
props: Option[Props],
deploy: Option[Deploy])
deploy: Option[Deploy],
val acceptProtocolNames: Set[String])
extends InternalActorRef
with RemoteRef {

if (path.address.hasLocalScope)
throw new IllegalArgumentException(s"Unexpected local address in RemoteActorRef [$this]")

remote match {
case t: ArteryTransport =>
// detect mistakes such as using "pekko.tcp" with Artery
if (path.address.protocol != t.localAddress.address.protocol)
throw new IllegalArgumentException(s"Wrong protocol of [$path], expected [${t.localAddress.address.protocol}]")
case _: ArteryTransport =>
// detect mistakes such as using "pekko.tcp" with Artery, also handles pekko.remote.accept-protocol-names
if (!acceptProtocolNames.contains(path.address.protocol)) {
val expectedString = if (acceptProtocolNames.size == 1)
"expected"
else
"expected one of"

throw new IllegalArgumentException(
s"Wrong protocol of [$path], $expectedString [${acceptProtocolNames.mkString}]")
}
case _ =>
}
@volatile private[remote] var cachedAssociation: artery.Association = null
Expand All @@ -697,7 +710,8 @@ private[pekko] class RemoteActorRef private[pekko] (
s.headOption match {
case None => this
case Some("..") => getParent.getChild(name)
case _ => new RemoteActorRef(remote, localAddressToUse, path / s, Nobody, props = None, deploy = None)
case _ => new RemoteActorRef(remote, localAddressToUse, path / s, Nobody, props = None, deploy = None,
acceptProtocolNames = acceptProtocolNames)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ final class RemoteSettings(val config: Config) {
@deprecated("Classic remoting is deprecated, use Artery", "Akka 2.6.0")
val Adapters: Map[String, String] = configToMap(getConfig("pekko.remote.classic.adapters"))

val ProtocolName: String = getString("pekko.remote.protocol-name")

val AcceptProtocolNames: Set[String] =
immutableSeq(getStringList("pekko.remote.accept-protocol-names")).toSet.requiring(_.nonEmpty,
"accept-protocol-names must be non empty")

private def transportNames: immutable.Seq[String] =
immutableSeq(getStringList("pekko.remote.classic.enabled-transports"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,12 +393,12 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
val (port, boundPort) = bindInboundStreams()

_localAddress = UniqueAddress(
Address(ArteryTransport.ProtocolName, system.name, settings.Canonical.Hostname, port),
Address(provider.remoteSettings.ProtocolName, system.name, settings.Canonical.Hostname, port),
AddressUidExtension(system).longAddressUid)
_addresses = Set(_localAddress.address)

_bindAddress = UniqueAddress(
Address(ArteryTransport.ProtocolName, system.name, settings.Bind.Hostname, boundPort),
Address(provider.remoteSettings.ProtocolName, system.name, settings.Bind.Hostname, boundPort),
AddressUidExtension(system).longAddressUid)

flightRecorder.transportUniqueAddressSet(_localAddress)
Expand Down Expand Up @@ -954,8 +954,6 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
*/
private[remote] object ArteryTransport {

val ProtocolName = "pekko"

// Note that the used version of the header format for outbound messages is defined in
// `ArterySettings.Version` because that may depend on configuration settings.
// This is the highest supported version on receiving (decoding) side.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,12 @@ private[remote] class PekkoProtocolSettings(config: Config) {
.getMillisDuration("pekko.remote.classic.handshake-timeout")
.requiring(_ > Duration.Zero, "handshake-timeout must be > 0")
}

val PekkoScheme: String = new RemoteSettings(config).ProtocolName
}

@nowarn("msg=deprecated")
private[remote] object PekkoProtocolTransport { // Couldn't these go into the Remoting Extension/ RemoteSettings instead?
val PekkoScheme: String = "pekko"
val PekkoOverhead: Int = 0 // Don't know yet
val UniqueId = new java.util.concurrent.atomic.AtomicInteger(0)

Expand Down Expand Up @@ -122,7 +123,7 @@ private[remote] class PekkoProtocolTransport(
private val codec: PekkoPduCodec)
extends ActorTransportAdapter(wrappedTransport, system) {

override val addedSchemeIdentifier: String = PekkoScheme
override val addedSchemeIdentifier: String = new RemoteSettings(system.settings.config).ProtocolName

override def managementCommand(cmd: Any): Future[Boolean] = wrappedTransport.managementCommand(cmd)

Expand Down Expand Up @@ -229,8 +230,9 @@ private[remote] class PekkoProtocolHandle(
_wrappedHandle: AssociationHandle,
val handshakeInfo: HandshakeInfo,
private val stateActor: ActorRef,
private val codec: PekkoPduCodec)
extends AbstractTransportAdapterHandle(_localAddress, _remoteAddress, _wrappedHandle, PekkoScheme) {
private val codec: PekkoPduCodec,
override val addedSchemeIdentifier: String)
extends AbstractTransportAdapterHandle(_localAddress, _remoteAddress, _wrappedHandle, addedSchemeIdentifier) {

override def write(payload: ByteString): Boolean = wrappedHandle.write(codec.constructPayload(payload))

Expand Down Expand Up @@ -713,7 +715,8 @@ private[remote] class ProtocolStateActor(
wrappedHandle,
handshakeInfo,
self,
codec))
codec,
settings.PekkoScheme))
readHandlerPromise.future
}

Expand All @@ -733,7 +736,8 @@ private[remote] class ProtocolStateActor(
wrappedHandle,
handshakeInfo,
self,
codec)))
codec,
settings.PekkoScheme)))
readHandlerPromise.future
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ pekko.actor.warn-about-java-serializer-usage = off
extinctPath,
Nobody,
props = None,
deploy = None)
deploy = None,
acceptProtocolNames = Set("pekko", "akka"))

val probe = TestProbe()
probe.watch(extinctRef)
Expand Down