diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala index c45d2d671b9..4016c99bcc3 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala @@ -167,16 +167,7 @@ class NettyTransportSettings(config: Config) { case _ => getBoolean("tcp-reuse-addr") } - val ByteBufAllocator: ByteBufAllocator = getString("bytebuf-allocator-type") match { - case "pooled" => PooledByteBufAllocator.DEFAULT - case "unpooled" => UnpooledByteBufAllocator.DEFAULT - case "unpooled-heap" => new UnpooledByteBufAllocator(false) - case "adaptive" => new AdaptiveByteBufAllocator() - case "adaptive-heap" => new AdaptiveByteBufAllocator(false) - case other => throw new IllegalArgumentException( - "Unknown 'bytebuf-allocator-type' [" + other + "]," + - " supported values are 'pooled', 'unpooled', 'unpooled-heap', 'adaptive', 'adaptive-heap'.") - } + val ByteBufAllocator: ByteBufAllocator = NettyTransport.deriveByteBufAllocator(getString("bytebuf-allocator-type")) val Hostname: String = getString("hostname") match { case "" => InetAddress.getLocalHost.getHostAddress @@ -336,6 +327,17 @@ private[transport] object NettyTransport { systemName: String, hostName: Option[String]): Option[Address] = addressFromSocketAddress(addr, schemeIdentifier, systemName, hostName, port = None) + + def deriveByteBufAllocator(allocatorType: String): ByteBufAllocator = allocatorType match { + case "pooled" => PooledByteBufAllocator.DEFAULT + case "unpooled" => UnpooledByteBufAllocator.DEFAULT + case "unpooled-heap" => new UnpooledByteBufAllocator(false) + case "adaptive" => new AdaptiveByteBufAllocator() + case "adaptive-heap" => new AdaptiveByteBufAllocator(false) + case other => throw new IllegalArgumentException( + "Unknown 'bytebuf-allocator-type' [" + other + "]," + + " supported values are 'pooled', 'unpooled', 'unpooled-heap', 'adaptive', 'adaptive-heap'.") + } } @deprecated("Classic remoting is deprecated, use Artery", "Akka 2.6.0") diff --git a/remote/src/test/scala/org/apache/pekko/remote/artery/BindCanonicalAddressSpec.scala b/remote/src/test/scala/org/apache/pekko/remote/artery/BindCanonicalAddressSpec.scala index 5f44b956521..58a6c222b7d 100644 --- a/remote/src/test/scala/org/apache/pekko/remote/artery/BindCanonicalAddressSpec.scala +++ b/remote/src/test/scala/org/apache/pekko/remote/artery/BindCanonicalAddressSpec.scala @@ -24,7 +24,7 @@ import org.scalatest.wordspec.AnyWordSpec import org.apache.pekko import pekko.actor.{ ActorSystem, Address } -import pekko.remote.classic.transport.netty.NettyTransportSpec._ +import pekko.remote.transport.NettyTransportSpec._ import pekko.testkit.SocketUtil trait BindCanonicalAddressBehaviors { diff --git a/remote/src/test/scala/org/apache/pekko/remote/classic/transport/netty/NettyTransportSpec.scala b/remote/src/test/scala/org/apache/pekko/remote/transport/NettyTransportSpec.scala similarity index 85% rename from remote/src/test/scala/org/apache/pekko/remote/classic/transport/netty/NettyTransportSpec.scala rename to remote/src/test/scala/org/apache/pekko/remote/transport/NettyTransportSpec.scala index 3f84936b8d0..436994de582 100644 --- a/remote/src/test/scala/org/apache/pekko/remote/classic/transport/netty/NettyTransportSpec.scala +++ b/remote/src/test/scala/org/apache/pekko/remote/transport/NettyTransportSpec.scala @@ -11,23 +11,25 @@ * Copyright (C) 2018-2022 Lightbend Inc. */ -package org.apache.pekko.remote.classic.transport.netty - -import java.net.{ InetAddress, InetSocketAddress } -import java.nio.channels.ServerSocketChannel - -import scala.concurrent.Await -import scala.concurrent.duration.Duration +package org.apache.pekko.remote.transport import com.typesafe.config.ConfigFactory -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpec +import io.netty.buffer.{ AdaptiveByteBufAllocator, PooledByteBufAllocator, UnpooledByteBufAllocator } import org.apache.pekko import pekko.actor.{ ActorSystem, Address, ExtendedActorSystem } import pekko.remote.BoundAddressesExtension +import pekko.remote.transport.netty.NettyTransport.deriveByteBufAllocator import pekko.testkit.SocketUtil +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +import java.net.{ InetAddress, InetSocketAddress } +import java.nio.channels.ServerSocketChannel +import scala.concurrent.Await +import scala.concurrent.duration.Duration + object NettyTransportSpec { val commonConfig = ConfigFactory.parseString(""" pekko.actor.provider = remote @@ -132,6 +134,30 @@ class NettyTransportSpec extends AnyWordSpec with Matchers with BindBehavior { Await.result(sys.terminate(), Duration.Inf) } + + "be able to specify byte buffer allocator" in { + deriveByteBufAllocator("pooled") should ===(PooledByteBufAllocator.DEFAULT) + deriveByteBufAllocator("unpooled") should ===(UnpooledByteBufAllocator.DEFAULT) + + { + val allocator = deriveByteBufAllocator("unpooled-heap") + allocator shouldBe a[UnpooledByteBufAllocator] + allocator.toString.contains("directByDefault: false") should ===(true) + } + + { + val allocator = deriveByteBufAllocator("adaptive") + allocator shouldBe a[AdaptiveByteBufAllocator] + allocator.toString.contains("directByDefault: true") should ===(true) + } + + { + val allocator = deriveByteBufAllocator("adaptive-heap") + allocator shouldBe a[AdaptiveByteBufAllocator] + allocator.toString.contains("directByDefault: false") should ===(true) + } + + } } }