diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 4d02b9560c9..92724249177 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -24,6 +24,13 @@ New Features * finagle-mysql: Added support for LONG_BLOB data type. ``PHAB_ID=D1152247`` +Bug Fixes +~~~~~~~~~~ + +* finagle-memcached: Fixed support for running memcached tests with external memcached. Added README with + instructions under finagle/finagle-memcached. ``PHAB_ID=D1120240`` + + Breaking API Changes ~~~~~~~~~~~~~~~~~~~~ diff --git a/finagle-memcached/README b/finagle-memcached/README new file mode 100644 index 00000000000..e3ab4a0fd42 --- /dev/null +++ b/finagle-memcached/README @@ -0,0 +1,15 @@ +To run the memcached tests against a real memcached server: + +1. Ensure you have a Memcached installation. If not, you can install it with: + + $ brew install memcached + +2. Take note of the path where memcached is now installed: + + $ which memcached + +3. Run the memcached tests with the jvm flag EXTERNAL_MEMCACHED_PATH=. For example, if you + are using bazel: + + $ ./bazel test --test_arg=--jvm_flags="-DEXTERNAL_MEMCACHED_PATH=" \ + finagle/finagle-memcached/src/test/scala:scala diff --git a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedOldClientTest.scala b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedOldClientTest.scala index 2a319a46295..4644321c3b2 100644 --- a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedOldClientTest.scala +++ b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedOldClientTest.scala @@ -4,7 +4,8 @@ import com.twitter.conversions.DurationOps._ import com.twitter.finagle._ import com.twitter.finagle.liveness.FailureAccrualFactory import com.twitter.finagle.memcached.Client -import com.twitter.finagle.param.{Stats, Timer} +import com.twitter.finagle.param.Stats +import com.twitter.finagle.param.Timer import com.twitter.finagle.partitioning.param import com.twitter.finagle.partitioning.param.EjectFailedHost import com.twitter.finagle.stats.InMemoryStatsReceiver diff --git a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedPartitioningClientTest.scala b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedPartitioningClientTest.scala index f61b33e3de0..c26378833d2 100644 --- a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedPartitioningClientTest.scala +++ b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedPartitioningClientTest.scala @@ -90,48 +90,50 @@ class MemcachedPartitioningClientTest extends MemcachedTest { client.close() } - test("traces fanout requests") { - // we use an eventually block to retry the request if we didn't get partitioned to different shards - eventually { - val tracer = new BufferingTracer() - - // the servers created in MemcachedTest have inconsistent addresses, which means sharding - // will be inconsistent across runs. To combat this, we'll start our own servers and rerun the - // tests if we partition to the same shard. - val serverOpts = - for (_ <- 1 to NumServers) - yield TestMemcachedServer.start( - Some(new InetSocketAddress(InetAddress.getLoopbackAddress, 0))) - val servers: Seq[TestMemcachedServer] = serverOpts.flatten - - val client = Memcached.client - .configured(param.KeyHasher(KeyHasher.FNV1_32)) - .connectionsPerEndpoint(1) - .withTracer(tracer) - .newRichClient(Name.bound(servers.map { s => Address(s.address) }: _*), clientName) - - awaitResult(client.set("foo", Buf.Utf8("bar"))) - awaitResult(client.set("baz", Buf.Utf8("boing"))) - awaitResult( - client.gets(Seq("foo", "baz")) - ).flatMap { - case (key, (Buf.Utf8(value1), Buf.Utf8(value2))) => - Map((key, (value1, value2))) - } - - client.close() - servers.foreach(_.stop()) - - val gets: Seq[TraceId] = tracer.iterator.toList collect { - case Record(id, _, Annotation.Rpc("Gets"), _) => id - } - - // Moving the MemcachedTracingFilter means that partitioned requests should result in two gets spans - assert(gets.length == 2) - // However the FanoutProxy should ensure that the requests are stored in peers, not the same tid. - gets.tail.foreach { get => - assert(get._parentId == gets.head._parentId) - assert(get.spanId != gets.head.spanId) + if (!Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) { + test("traces fanout requests") { + // we use an eventually block to retry the request if we didn't get partitioned to different shards + eventually { + val tracer = new BufferingTracer() + + // the servers created in MemcachedTest have inconsistent addresses, which means sharding + // will be inconsistent across runs. To combat this, we'll start our own servers and rerun the + // tests if we partition to the same shard. + val serverOpts = + for (_ <- 1 to NumServers) + yield TestMemcachedServer.start( + Some(new InetSocketAddress(InetAddress.getLoopbackAddress, 0))) + val servers: Seq[TestMemcachedServer] = serverOpts.flatten + + val client = Memcached.client + .configured(param.KeyHasher(KeyHasher.FNV1_32)) + .connectionsPerEndpoint(1) + .withTracer(tracer) + .newRichClient(Name.bound(servers.map { s => Address(s.address) }: _*), clientName) + + awaitResult(client.set("foo", Buf.Utf8("bar"))) + awaitResult(client.set("baz", Buf.Utf8("boing"))) + awaitResult( + client.gets(Seq("foo", "baz")) + ).flatMap { + case (key, (Buf.Utf8(value1), Buf.Utf8(value2))) => + Map((key, (value1, value2))) + } + + client.close() + servers.foreach(_.stop()) + + val gets: Seq[TraceId] = tracer.iterator.toList collect { + case Record(id, _, Annotation.Rpc("Gets"), _) => id + } + + // Moving the MemcachedTracingFilter means that partitioned requests should result in two gets spans + assert(gets.length == 2) + // However the FanoutProxy should ensure that the requests are stored in peers, not the same tid. + gets.tail.foreach { get => + assert(get._parentId == gets.head._parentId) + assert(get.spanId != gets.head.spanId) + } } } } diff --git a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedTest.scala b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedTest.scala index 24700561aa5..c0d172ff291 100644 --- a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedTest.scala +++ b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedTest.scala @@ -41,6 +41,7 @@ abstract class MemcachedTest protected[this] val Timeout: Duration = 15.seconds protected[this] var servers: Seq[TestMemcachedServer] = Seq.empty protected[this] var client: Client = _ + protected[this] var singleServerClient: Client = _ protected[this] val clientName = "test_client" protected[this] val redistributesKey: Seq[String] @@ -57,6 +58,9 @@ abstract class MemcachedTest val dest = Name.bound(servers.map { s => Address(s.address) }: _*) client = createClient(dest, clientName) } + + singleServerClient = + createClient(Name.bound(Seq(Address(servers.head.address)): _*), clientName) } after { @@ -136,18 +140,18 @@ abstract class MemcachedTest ) } - if (Option(System.getProperty("USE_EXTERNAL_MEMCACHED")).isDefined) { + if (Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) { test("gets") { - // create a client that connects to only one server so we can predict CAS tokens - awaitResult(client.set("foos", Buf.Utf8("xyz"))) // CAS: 1 - awaitResult(client.set("bazs", Buf.Utf8("xyz"))) // CAS: 2 - awaitResult(client.set("bazs", Buf.Utf8("zyx"))) // CAS: 3 - awaitResult(client.set("bars", Buf.Utf8("xyz"))) // CAS: 4 - awaitResult(client.set("bars", Buf.Utf8("zyx"))) // CAS: 5 - awaitResult(client.set("bars", Buf.Utf8("yxz"))) // CAS: 6 + // use client that connects to only one server so we can predict CAS tokens + awaitResult(singleServerClient.set("foos", Buf.Utf8("xyz"))) + awaitResult(singleServerClient.set("bazs", Buf.Utf8("xyz"))) + awaitResult(singleServerClient.set("bazs", Buf.Utf8("zyx"))) + awaitResult(singleServerClient.set("bars", Buf.Utf8("xyz"))) + awaitResult(singleServerClient.set("bars", Buf.Utf8("zyx"))) + awaitResult(singleServerClient.set("bars", Buf.Utf8("yxz"))) val result = awaitResult( - client.gets(Seq("foos", "bazs", "bars", "somethingelse")) + singleServerClient.gets(Seq("foos", "bazs", "bars", "somethingelse")) ).map { case (key, (Buf.Utf8(value), Buf.Utf8(casUnique))) => (key, (value, casUnique)) @@ -155,47 +159,54 @@ abstract class MemcachedTest // the "cas unique" values are predictable from a fresh memcached val expected = Map( - "foos" -> (("xyz", "1")), - "bazs" -> (("zyx", "3")), - "bars" -> (("yxz", "6")) + "foos" -> (("xyz", "2")), + "bazs" -> (("zyx", "4")), + "bars" -> (("yxz", "7")) ) assert(result == expected) } } - if (Option(System.getProperty("USE_EXTERNAL_MEMCACHED")).isDefined) { + if (Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) { test("getsWithFlag") { - awaitResult(client.set("foos1", Buf.Utf8("xyz"))) - awaitResult(client.set("bazs1", Buf.Utf8("xyz"))) - awaitResult(client.set("bazs1", Buf.Utf8("zyx"))) - val result = awaitResult(client.getsWithFlag(Seq("foos1", "bazs1", "somethingelse"))) - .map { - case (key, (Buf.Utf8(value), Buf.Utf8(flag), Buf.Utf8(casUnique))) => - (key, (value, flag, casUnique)) - } + // use client that connects to only one server so we can predict CAS tokens + awaitResult(singleServerClient.set("foos1", Buf.Utf8("xyz"))) + awaitResult(singleServerClient.set("bazs1", Buf.Utf8("xyz"))) + awaitResult(singleServerClient.set("bazs1", Buf.Utf8("zyx"))) + val result = + awaitResult(singleServerClient.getsWithFlag(Seq("foos1", "bazs1", "somethingelse"))) + .map { + case (key, (Buf.Utf8(value), Buf.Utf8(flag), Buf.Utf8(casUnique))) => + (key, (value, flag, casUnique)) + } // the "cas unique" values are predictable from a fresh memcached assert( result == Map( - "foos1" -> (("xyz", "0", "1")), - "bazs1" -> (("zyx", "0", "2")) + "foos1" -> (("xyz", "0", "2")), + "bazs1" -> (("zyx", "0", "4")) ) ) } } - if (Option(System.getProperty("USE_EXTERNAL_MEMCACHED")).isDefined) { + if (Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) { test("cas") { - awaitResult(client.set("x", Buf.Utf8("y"))) - val Some((value, casUnique)) = awaitResult(client.gets("x")) + // use client that connects to only one server so we can predict CAS tokens + awaitResult(singleServerClient.set("x", Buf.Utf8("y"))) // Next CAS: 2 + val Some((value, casUnique)) = awaitResult(singleServerClient.gets("x")) assert(value == Buf.Utf8("y")) - assert(casUnique == Buf.Utf8("1")) + assert(casUnique == Buf.Utf8("2")) - assert(!awaitResult(client.checkAndSet("x", Buf.Utf8("z"), Buf.Utf8("2")).map(_.replaced))) assert( - awaitResult(client.checkAndSet("x", Buf.Utf8("z"), casUnique).map(_.replaced)).booleanValue + !awaitResult( + singleServerClient.checkAndSet("x", Buf.Utf8("z"), Buf.Utf8("1")).map(_.replaced))) + assert( + awaitResult( + singleServerClient + .checkAndSet("x", Buf.Utf8("z"), casUnique).map(_.replaced)).booleanValue ) - val res = awaitResult(client.get("x")) + val res = awaitResult(singleServerClient.get("x")) assert(res.isDefined) assert(res.get == Buf.Utf8("z")) } @@ -224,7 +235,7 @@ abstract class MemcachedTest assert(awaitResult(client.decr("foo", l)) == Some(0L)) } - if (Option(System.getProperty("USE_EXTERNAL_MEMCACHED")).isDefined) { + if (Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) { test("stats") { // We can't use a partitioned client to get stats, because we don't hash to a server based on // a key. Instead, we create a ConnectedClient, which is connected to one server. @@ -483,39 +494,49 @@ abstract class MemcachedTest assertRead(newClient, keys2) } - test("partial success") { - val keys = writeKeys(client, 1000, 20) - assertRead(client, keys) - - val initialResult = awaitResult { client.getResult(keys) } - assert(initialResult.failures.isEmpty) - assert(initialResult.misses.isEmpty) - assert(initialResult.values.size == keys.size) - - // now kill one server - servers.head.stop() - - // test partial success with getResult() - val getResult = awaitResult { client.getResult(keys) } - // assert the failures are set to the exception received from the failing partition - assert(getResult.failures.nonEmpty) - getResult.failures.foreach { - case (_, e) => - assert(e.isInstanceOf[Exception]) - } - // there should be no misses as all keys are known - assert(getResult.misses.isEmpty) - - // assert that the values are what we expect them to be. We are not checking for exact - // number of failures and successes here because we don't know how many keys will fall into - // the failed partition. The accuracy of the responses are tested in other tests anyways. - assert(getResult.values.nonEmpty) - assert(getResult.values.size < keys.size) - getResult.values.foreach { - case (keyStr, valueBuf) => - val Buf.Utf8(valStr) = valueBuf - assert(valStr == s"$keyStr$ValueSuffix") - } + // This works with our internal memcached because when the server is shutdown, we get an immediate + // "connection refused" when trying to send a request. With external memcached, the connection + // establishment instead hangs. To make this test pass with external memcached, we could add + // `withSession.acquisitionTimeout` to the client, but this makes the test a) slow and b) can + // make the other tests flakey, so don't bother. + if (!Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) { + test("partial success") { + val keys = writeKeys(client, 1000, 20) + assertRead(client, keys) + + val initialResult = awaitResult { + client.getResult(keys) + } + assert(initialResult.failures.isEmpty) + assert(initialResult.misses.isEmpty) + assert(initialResult.values.size == keys.size) + // now kill one server + servers.head.stop() + + // test partial success with getResult() + val getResult = awaitResult { + client.getResult(keys) + } + // assert the failures are set to the exception received from the failing partition + assert(getResult.failures.nonEmpty) + getResult.failures.foreach { + case (_, e) => + assert(e.isInstanceOf[Exception]) + } + // there should be no misses as all keys are known + assert(getResult.misses.isEmpty) + + // assert that the values are what we expect them to be. We are not checking for exact + // number of failures and successes here because we don't know how many keys will fall into + // the failed partition. The accuracy of the responses are tested in other tests anyways. + assert(getResult.values.nonEmpty) + assert(getResult.values.size < keys.size) + getResult.values.foreach { + case (keyStr, valueBuf) => + val Buf.Utf8(valStr) = valueBuf + assert(valStr == s"$keyStr$ValueSuffix") + } + } } } diff --git a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/ProxyTest.scala b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/ProxyTest.scala index 60ccc00b5fa..8553f937c2d 100644 --- a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/ProxyTest.scala +++ b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/ProxyTest.scala @@ -4,11 +4,15 @@ import com.twitter.conversions.DurationOps._ import com.twitter.finagle._ import com.twitter.finagle.memcached.Client import com.twitter.finagle.memcached.integration.external.TestMemcachedServer -import com.twitter.finagle.memcached.protocol.{Command, Response} +import com.twitter.finagle.memcached.protocol.Command +import com.twitter.finagle.memcached.protocol.Response import com.twitter.io.Buf -import com.twitter.util.{Await, Awaitable} -import java.net.{InetAddress, InetSocketAddress} +import com.twitter.util.Await +import com.twitter.util.Awaitable +import java.net.InetAddress +import java.net.InetSocketAddress import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.Eventually.eventually import org.scalatest.funsuite.AnyFunSuite class ProxyTest extends AnyFunSuite with BeforeAndAfter { @@ -85,7 +89,7 @@ class ProxyTest extends AnyFunSuite with BeforeAndAfter { awaitResult(externalClient.close()) } - if (Option(System.getProperty("USE_EXTERNAL_MEMCACHED")).isDefined) { + if (Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) { test("stats is supported") { awaitResult(externalClient.delete("foo")) assert(awaitResult(externalClient.get("foo")) == None) @@ -100,7 +104,7 @@ class ProxyTest extends AnyFunSuite with BeforeAndAfter { } } - if (Option(System.getProperty("USE_EXTERNAL_MEMCACHED")).isDefined) { + if (Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) { test("stats (cachedump) is supported") { awaitResult(externalClient.delete("foo")) assert(awaitResult(externalClient.get("foo")) == None) @@ -109,11 +113,15 @@ class ProxyTest extends AnyFunSuite with BeforeAndAfter { assert(slabs != null) assert(!slabs.isEmpty) val n = slabs.head.split(" ")(1).split(":")(0).toInt - val stats = awaitResult(externalClient.stats(Some("cachedump " + n + " 100"))) - assert(stats != null) - assert(!stats.isEmpty) - stats.foreach { stat => assert(stat.startsWith("ITEM")) } - assert(stats.find { stat => stat.contains("foo") }.isDefined) + + eventually { + val stats = awaitResult(externalClient.stats(Some("cachedump " + n + " 100"))) + assert(stats != null) + assert(!stats.isEmpty) + stats.foreach { stat => assert(stat.startsWith("ITEM")) } + assert(stats.find { stat => stat.contains("foo") }.isDefined) + } + awaitResult(externalClient.close()) } } diff --git a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/SimpleClientTest.scala b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/SimpleClientTest.scala index 7fcc2e8b6bd..8105a4b9918 100644 --- a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/SimpleClientTest.scala +++ b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/SimpleClientTest.scala @@ -109,8 +109,9 @@ class SimpleClientTest extends AnyFunSuite with BeforeAndAfter { ) } - if (Option(System.getProperty("USE_EXTERNAL_MEMCACHED")).isDefined) { + if (Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) { test("gets") { + assert(awaitResult(client.gets("foos")).isEmpty) awaitResult(client.set("foos", Buf.Utf8("xyz"))) awaitResult(client.set("bazs", Buf.Utf8("xyz"))) awaitResult(client.set("bazs", Buf.Utf8("zyx"))) @@ -125,23 +126,23 @@ class SimpleClientTest extends AnyFunSuite with BeforeAndAfter { "foos" -> ( ( "xyz", - "1" + "2" ) ), // the "cas unique" values are predictable from a fresh memcached - "bazs" -> (("zyx", "3")) + "bazs" -> (("zyx", "4")) ) ) } } - if (Option(System.getProperty("USE_EXTERNAL_MEMCACHED")).isDefined) { + if (Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) { test("cas") { awaitResult(client.set("x", Buf.Utf8("y"))) val Some((value, casUnique)) = awaitResult(client.gets("x")) assert(value == Buf.Utf8("y")) - assert(casUnique == Buf.Utf8("1")) + assert(casUnique == Buf.Utf8("2")) - assert(!awaitResult(client.checkAndSet("x", Buf.Utf8("z"), Buf.Utf8("2")).map(_.replaced))) + assert(!awaitResult(client.checkAndSet("x", Buf.Utf8("z"), Buf.Utf8("1")).map(_.replaced))) assert( awaitResult(client.checkAndSet("x", Buf.Utf8("z"), casUnique).map(_.replaced)).booleanValue ) @@ -176,7 +177,7 @@ class SimpleClientTest extends AnyFunSuite with BeforeAndAfter { assert(awaitResult(client.decr("foo", l)) == Some(0L)) } - if (Option(System.getProperty("USE_EXTERNAL_MEMCACHED")).isDefined) { + if (Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) { test("stats") { val stats = awaitResult(client.stats()) assert(stats != null) diff --git a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/external/ExternalMemcached.scala b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/external/ExternalMemcached.scala index 9d2e1bf1c47..0084bf76c2a 100644 --- a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/external/ExternalMemcached.scala +++ b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/external/ExternalMemcached.scala @@ -1,8 +1,13 @@ package com.twitter.finagle.memcached.integration.external import com.twitter.conversions.DurationOps._ -import com.twitter.util.{Duration, RandomSocket, Stopwatch} -import java.net.{BindException, InetAddress, InetSocketAddress, ServerSocket} +import com.twitter.util.Duration +import com.twitter.util.RandomSocket +import com.twitter.util.Stopwatch +import java.net.BindException +import java.net.InetAddress +import java.net.InetSocketAddress +import java.net.ServerSocket import scala.jdk.CollectionConverters import scala.collection._ import scala.collection.immutable.Stream @@ -12,9 +17,12 @@ object TestMemcachedServer { def start(): Option[TestMemcachedServer] = start(None) def start(address: Option[InetSocketAddress]): Option[TestMemcachedServer] = { - if (!Option(System.getProperty("USE_EXTERNAL_MEMCACHED")).isDefined) - InternalMemcached.start(address) - else ExternalMemcached.start(address) + Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")) match { + case Some(externalMemcachedPath) => + ExternalMemcached.start(address, externalMemcachedPath) + case None => + InternalMemcached.start(address) + } } } @@ -68,13 +76,13 @@ private[memcached] object ExternalMemcached { self => address } - // Use overloads instead of default args to support java integration tests - - def start(): Option[TestMemcachedServer] = start(None) - - def start(address: Option[InetSocketAddress]): Option[TestMemcachedServer] = { + def start( + address: Option[InetSocketAddress], + externalMemcachedPath: String + ): Option[TestMemcachedServer] = { def exec(address: InetSocketAddress): Process = { - val cmd = List("memcached", "-l", address.getHostName, "-p", address.getPort.toString) + val cmd = + List(externalMemcachedPath, "-l", address.getHostName, "-p", address.getPort.toString) val builder = new ProcessBuilder(cmd: _*) builder.start() } @@ -124,7 +132,7 @@ private[memcached] object ExternalMemcached { self => result = true } catch { case ex: BindException => - result = (ex.getMessage != "Address already in use") + result = !ex.getMessage.contains("Address already in use") } finally { if (ss != null) ss.close()