diff --git a/finagle-memcached/src/test/java/com/twitter/finagle/memcached/integration/TestClient.java b/finagle-memcached/src/test/java/com/twitter/finagle/memcached/integration/TestClient.java index 53145471690..43ac1cb059e 100644 --- a/finagle-memcached/src/test/java/com/twitter/finagle/memcached/integration/TestClient.java +++ b/finagle-memcached/src/test/java/com/twitter/finagle/memcached/integration/TestClient.java @@ -2,7 +2,6 @@ import scala.Option; -import org.junit.Assume; import org.junit.Before; import org.junit.Test; @@ -13,8 +12,8 @@ import com.twitter.finagle.Service; import com.twitter.finagle.memcached.JavaClient; import com.twitter.finagle.memcached.JavaClientBase; -import com.twitter.finagle.memcached.integration.external.TestMemcachedServer; -import com.twitter.finagle.memcached.integration.external.TestMemcachedServer$; +import com.twitter.finagle.memcached.integration.external.MemcachedServer; +import com.twitter.finagle.memcached.integration.external.MemcachedServer$; import com.twitter.finagle.memcached.protocol.Command; import com.twitter.finagle.memcached.protocol.Response; import com.twitter.io.Bufs; @@ -23,12 +22,11 @@ import static org.junit.Assert.assertEquals; public class TestClient { - private Option server; + private MemcachedServer server; @Before public void setUp() { - server = TestMemcachedServer$.MODULE$.start(); - Assume.assumeTrue(server.isDefined()); + server = MemcachedServer$.MODULE$.start(); } /** @@ -36,7 +34,7 @@ public void setUp() { */ @Test public void testGetAndSet() throws Exception { - Address addr = Addresses.newInetAddress(server.get().address()); + Address addr = Addresses.newInetAddress(server.address()); Service service = Memcached.client() .connectionsPerEndpoint(1) diff --git a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/ClientAPITest.scala b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/ClientAPITest.scala new file mode 100644 index 00000000000..ee176f380c2 --- /dev/null +++ b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/ClientAPITest.scala @@ -0,0 +1,253 @@ +package com.twitter.finagle.memcached.integration + +import com.twitter.conversions.DurationOps._ +import com.twitter.finagle.Address +import com.twitter.finagle.Name +import com.twitter.finagle.memcached.Client +import com.twitter.finagle.memcached.integration.external.ExternalMemcached +import com.twitter.finagle.memcached.integration.external.MemcachedServer +import com.twitter.finagle.memcached.protocol._ +import com.twitter.io.Buf +import com.twitter.util.Await +import com.twitter.util.Awaitable +import org.scalatest.BeforeAndAfter +import org.scalatest.funsuite.AnyFunSuite + +// These tests run against either a test Memcached server or a real MemcachedServer, +// depending on the value of `ExternalMemcached.use` +abstract class ClientAPITest extends AnyFunSuite with BeforeAndAfter { + + protected var client: Client = _ + private var testServer: MemcachedServer = _ + + protected def awaitResult[T](awaitable: Awaitable[T]): T = Await.result(awaitable, 5.seconds) + + protected def mkClient(dest: Name): Client + + before { + testServer = MemcachedServer.start() + client = mkClient(Name.bound(Seq(Address(testServer.address)): _*)) + } + + after { + testServer.stop() + } + + test("set & get") { + awaitResult(client.delete("foo")) + assert(awaitResult(client.get("foo")) == None) + awaitResult(client.set("foo", Buf.Utf8("bar"))) + assert(awaitResult(client.get("foo")).get == Buf.Utf8("bar")) + } + + test("set & get data containing newlines") { + awaitResult(client.delete("bob")) + assert(awaitResult(client.get("bob")) == None) + awaitResult(client.set("bob", Buf.Utf8("hello there \r\n nice to meet \r\n you"))) + assert( + awaitResult(client.get("bob")).get == Buf.Utf8("hello there \r\n nice to meet \r\n you"), + 3.seconds + ) + } + + test("get") { + awaitResult(client.set("foo", Buf.Utf8("bar"))) + awaitResult(client.set("baz", Buf.Utf8("boing"))) + val result = awaitResult(client.get(Seq("foo", "baz", "notthere"))) + .map { case (key, Buf.Utf8(value)) => (key, value) } + assert( + result == Map( + "foo" -> "bar", + "baz" -> "boing" + ) + ) + } + + test("getWithFlag") { + awaitResult(client.set("foo", Buf.Utf8("bar"))) + awaitResult(client.set("baz", Buf.Utf8("boing"))) + val result = awaitResult(client.getWithFlag(Seq("foo", "baz", "notthere"))) + .map { case (key, ((Buf.Utf8(value), Buf.Utf8(flag)))) => (key, (value, flag)) } + assert( + result == Map( + "foo" -> (("bar", "0")), + "baz" -> (("boing", "0")) + ) + ) + } + + test("append & prepend") { + awaitResult(client.set("foo", Buf.Utf8("bar"))) + awaitResult(client.append("foo", Buf.Utf8("rab"))) + val Buf.Utf8(res) = awaitResult(client.get("foo")).get + assert(res == "barrab") + awaitResult(client.prepend("foo", Buf.Utf8("rab"))) + val Buf.Utf8(res2) = awaitResult(client.get("foo")).get + assert(res2 == "rabbarrab") + } + + test("incr & decr") { + // As of memcached 1.4.8 (issue 221), empty values are no longer treated as integers + awaitResult(client.set("foo", Buf.Utf8("0"))) + assert(awaitResult(client.incr("foo")) == Some(1L)) + assert(awaitResult(client.incr("foo", 2)) == Some(3L)) + assert(awaitResult(client.decr("foo")) == Some(2L)) + + awaitResult(client.set("foo", Buf.Utf8("0"))) + assert(awaitResult(client.incr("foo")) == Some(1L)) + val l = 1L << 50 + assert(awaitResult(client.incr("foo", l)) == Some(l + 1L)) + assert(awaitResult(client.decr("foo")) == Some(l)) + assert(awaitResult(client.decr("foo", l)) == Some(0L)) + } + + test("send malformed keys") { + // test key validation trait + intercept[ClientError] { + awaitResult(client.get("fo o")) + } + intercept[ClientError] { + awaitResult(client.set("", Buf.Utf8("bar"))) + } + intercept[ClientError] { + awaitResult(client.get(" foo")) + } + intercept[ClientError] { + awaitResult(client.get("foo ")) + } + intercept[ClientError] { + awaitResult(client.get(" foo")) + } + intercept[ClientError] { + awaitResult(client.get(null: String)) + } + intercept[ClientError] { + awaitResult(client.set(null: String, Buf.Utf8("bar"))) + } + intercept[ClientError] { + awaitResult(client.set(" ", Buf.Utf8("bar"))) + } + + try { + awaitResult(client.set("\t", Buf.Utf8("bar"))) + } catch { + case _: ClientError => fail("\t is allowed") + } + + intercept[ClientError] { + awaitResult(client.set("\r", Buf.Utf8("bar"))) + } + intercept[ClientError] { + awaitResult(client.set("\n", Buf.Utf8("bar"))) + } + intercept[ClientError] { + awaitResult(client.set("\u0000", Buf.Utf8("bar"))) + } + + val veryLongKey = + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + intercept[ClientError] { + awaitResult(client.get(veryLongKey)) + } + intercept[ClientError] { + awaitResult(client.set(veryLongKey, Buf.Utf8("bar"))) + } + + // test other keyed command validation + intercept[ClientError] { + awaitResult(client.gets(Seq(null))) + } + intercept[ClientError] { + awaitResult(client.gets(Seq(""))) + } + intercept[ClientError] { + awaitResult(client.gets(Seq("foos", "bad key", "somethingelse"))) + } + intercept[ClientError] { + awaitResult(client.append("bad key", Buf.Utf8("rab"))) + } + intercept[ClientError] { + awaitResult(client.prepend("bad key", Buf.Utf8("rab"))) + } + intercept[ClientError] { + awaitResult(client.replace("bad key", Buf.Utf8("bar"))) + } + intercept[ClientError] { + awaitResult(client.add("bad key", Buf.Utf8("2"))) + } + intercept[ClientError] { + awaitResult(client.checkAndSet("bad key", Buf.Utf8("z"), Buf.Utf8("2"))) + } + intercept[ClientError] { + awaitResult(client.incr("bad key")) + } + intercept[ClientError] { + awaitResult(client.decr("bad key")) + } + intercept[ClientError] { + awaitResult(client.delete("bad key")) + } + } + + // Only run these if we have a real memcached server to use + if (ExternalMemcached.use()) { + test("gets") { + assert(awaitResult(client.gets("foos")).isEmpty) + awaitResult(client.set("foos", Buf.Utf8("xyz"))) + awaitResult(client.set("bazs", Buf.Utf8("xyz"))) + awaitResult(client.set("bazs", Buf.Utf8("zyx"))) + val result = awaitResult(client.gets(Seq("foos", "bazs", "somethingelse"))) + .map { + case (key, (Buf.Utf8(value), Buf.Utf8(casUnique))) => + (key, (value, casUnique)) + } + + assert( + result == Map( + "foos" -> ( + ( + "xyz", + "2" + ) + ), // the "cas unique" values are predictable from a fresh memcached + "bazs" -> (("zyx", "4")) + ) + ) + } + + test("getsWithFlag") { + awaitResult(client.set("foos1", Buf.Utf8("xyz"))) + awaitResult(client.set("bazs1", Buf.Utf8("xyz"))) + awaitResult(client.set("bazs1", Buf.Utf8("zyx"))) + val result = + awaitResult(client.getsWithFlag(Seq("foos1", "bazs1", "somethingelse"))) + .map { + case (key, (Buf.Utf8(value), Buf.Utf8(flag), Buf.Utf8(casUnique))) => + (key, (value, flag, casUnique)) + } + + // the "cas unique" values are predictable from a fresh memcached + assert( + result == Map( + "foos1" -> (("xyz", "0", "2")), + "bazs1" -> (("zyx", "0", "4")) + ) + ) + } + + test("cas") { + awaitResult(client.set("x", Buf.Utf8("y"))) + val Some((value, casUnique)) = awaitResult(client.gets("x")) + assert(value == Buf.Utf8("y")) + assert(casUnique == Buf.Utf8("2")) + + assert(!awaitResult(client.checkAndSet("x", Buf.Utf8("z"), Buf.Utf8("1")).map(_.replaced))) + assert( + awaitResult(client.checkAndSet("x", Buf.Utf8("z"), casUnique).map(_.replaced)).booleanValue + ) + val res = awaitResult(client.get("x")) + assert(res.isDefined) + assert(res.get == Buf.Utf8("z")) + } + } +} diff --git a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/ClientBehavioursTest.scala b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/ClientBehavioursTest.scala new file mode 100644 index 00000000000..95e902ea617 --- /dev/null +++ b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/ClientBehavioursTest.scala @@ -0,0 +1,458 @@ +package com.twitter.finagle.memcached.integration + +import com.twitter.conversions.DurationOps._ +import com.twitter.finagle._ +import com.twitter.finagle.liveness.FailureAccrualFactory +import com.twitter.finagle.memcached.Client +import com.twitter.finagle.memcached.integration.external.ExternalMemcached +import com.twitter.finagle.memcached.integration.external.MemcachedServer +import com.twitter.finagle.memcached.protocol.Command +import com.twitter.finagle.memcached.protocol.Error +import com.twitter.finagle.memcached.protocol.Get +import com.twitter.finagle.memcached.protocol.Quit +import com.twitter.finagle.memcached.protocol.Response +import com.twitter.finagle.memcached.protocol.Set +import com.twitter.finagle.memcached.protocol.Value +import com.twitter.finagle.memcached.protocol.Values +import com.twitter.finagle.partitioning.param +import com.twitter.finagle.service.TimeoutFilter +import com.twitter.finagle.stats.InMemoryStatsReceiver +import com.twitter.finagle.tracing.Annotation +import com.twitter.finagle.tracing.BufferingTracer +import com.twitter.finagle.tracing.NullTracer +import com.twitter.finagle.tracing.Record +import com.twitter.finagle.tracing.TraceId +import com.twitter.finagle.{param => ctfparam} +import com.twitter.hashing.KeyHasher +import com.twitter.io.Buf +import com.twitter.util._ +import com.twitter.util.registry.Entry +import com.twitter.util.registry.GlobalRegistry +import com.twitter.util.registry.SimpleRegistry +import java.net.InetAddress +import java.net.InetSocketAddress +import org.scalatest.BeforeAndAfter +import org.scalatest.Outcome +import org.scalatest.concurrent.Eventually +import org.scalatest.concurrent.PatienceConfiguration +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.time.Milliseconds +import org.scalatest.time.Seconds +import org.scalatest.time.Span +import scala.util.Random +import org.mockito.ArgumentCaptor +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.atLeastOnce +import org.mockito.Mockito.spy +import org.mockito.Mockito.verify +import org.mockito.Mockito.when +import scala.collection.JavaConverters._ + +class ClientBehavioursTest + extends AnyFunSuite + with BeforeAndAfter + with Eventually + with PatienceConfiguration { + + protected def createClient(dest: Name, clientName: String): Client = { + Memcached.client.newRichClient(dest, clientName) + } + + private[this] val NumServers = 5 + private[this] val NumConnections = 4 + private[this] val Timeout: Duration = 15.seconds + private[this] var servers: Seq[MemcachedServer] = Seq.empty + private[this] var client: Client = _ + private[this] val clientName = "test_client" + + private[this] val redistributesKey: Seq[String] = + Seq("test_client", "partitioner", "redistributes") + private[this] val leavesKey: Seq[String] = Seq(clientName, "partitioner", "leaves") + private[this] val revivalsKey: Seq[String] = Seq(clientName, "partitioner", "revivals") + private[this] val ejectionsKey: Seq[String] = Seq(clientName, "partitioner", "ejections") + + before { + servers = for (_ <- 1 to NumServers) yield MemcachedServer.start() + + val dest = Name.bound(servers.map { s => Address(s.address) }: _*) + client = createClient(dest, clientName) + } + + after { + servers.foreach(_.stop()) + client.close() + } + + override def withFixture(test: NoArgTest): Outcome = { + if (servers.length == NumServers) { + test() + } else { + info("Cannot start memcached. Skipping test...") + cancel() + } + } + + private[this] def awaitResult[T](awaitable: Awaitable[T]): T = Await.result(awaitable, Timeout) + + test("re-hash when a bad host is ejected") { + val sr = new InMemoryStatsReceiver + val client = Memcached.client + .configured(param.KeyHasher(KeyHasher.FNV1_32)) + .configured(TimeoutFilter.Param(10000.milliseconds)) + .configured(param.EjectFailedHost(true)) + .configured(FailureAccrualFactory.Param(1, () => 10.minutes)) + .configured(ctfparam.Stats(sr)) + .newRichClient(Name.bound(servers.map { s => Address(s.address) }: _*), clientName) + + val max = 200 + // set values + awaitResult( + Future.collect( + (0 to max).map { i => client.set(s"foo$i", Buf.Utf8(s"bar$i")) } + ) + ) + + // We can't control the Distributor to make sure that for the set of servers, there is at least + // one client in the partition talking to it. Therefore, we rely on the fact that for 5 + // backends, it's very unlikely all clients will be talking to the same server, and as such, + // shutting down all backends but one will trigger cache misses. + servers.tail.foreach(_.stop()) + + // trigger ejection + for (i <- 0 to max) { + Await.ready(client.get(s"foo$i"), Timeout) + } + // wait a little longer than default to prevent test flappiness + val timeout = PatienceConfiguration.Timeout(Span(10, Seconds)) + val interval = PatienceConfiguration.Interval(Span(100, Milliseconds)) + eventually(timeout, interval) { + assert(sr.counters.getOrElse(ejectionsKey, 0L) > 2) + } + + // previously set values have cache misses + var cacheMisses = 0 + for (i <- 0 to max) { + if (awaitResult(client.get(s"foo$i")).isEmpty) cacheMisses = cacheMisses + 1 + } + assert(cacheMisses > 0) + + client.close() + } + + test("host comes back into ring after being ejected") { + class MockedMemcacheServer extends Service[Command, Response] { + def apply(command: Command): Future[Response with Product with Serializable] = command match { + case Get(_) => Future.value(Values(List(Value(Buf.Utf8("foo"), Buf.Utf8("bar"))))) + case Set(_, _, _, _) => Future.value(Error(new Exception)) + case x => Future.exception(new MatchError(x)) + } + } + + val cacheServer: ListeningServer = Memcached.serve( + new InetSocketAddress(InetAddress.getLoopbackAddress, 0), + new MockedMemcacheServer + ) + + val timer = new MockTimer + val statsReceiver = new InMemoryStatsReceiver + + val client = Memcached.client + .configured(param.KeyHasher(KeyHasher.FNV1_32)) + .configured(TimeoutFilter.Param(10000.milliseconds)) + .configured(FailureAccrualFactory.Param(1, () => 10.minutes)) + .configured(param.EjectFailedHost(true)) + .configured(ctfparam.Timer(timer)) + .configured(ctfparam.Stats(statsReceiver)) + .newRichClient( + Name.bound(Address(cacheServer.boundAddress.asInstanceOf[InetSocketAddress])), + clientName) + + Time.withCurrentTimeFrozen { timeControl => + // Send a bad request + intercept[Exception] { + awaitResult(client.set("foo", Buf.Utf8("bar"))) + } + + // Node should have been ejected + val timeout = PatienceConfiguration.Timeout(Span(1, Seconds)) + eventually(timeout) { + assert(statsReceiver.counters.get(ejectionsKey).contains(1)) + } + + // Node should have been marked dead, and still be dead after 5 minutes + timeControl.advance(5.minutes) + + // Shard should be unavailable + intercept[ShardNotAvailableException] { + awaitResult(client.get(s"foo")) + } + + timeControl.advance(5.minutes) + timer.tick() + + // 10 minutes (markDeadFor duration) have passed, so the request should go through + assert(statsReceiver.counters.get(revivalsKey) == Some(1)) + assert(awaitResult(client.get(s"foo")).get == Buf.Utf8("bar")) + } + client.close() + } + + test("Add and remove nodes") { + val addrs = servers.map { s => Address(s.address) } + + // Start with 3 backends + val mutableAddrs: ReadWriteVar[Addr] = new ReadWriteVar(Addr.Bound(addrs.toSet.drop(2))) + + val sr = new InMemoryStatsReceiver + + val client = Memcached.client + .configured(param.KeyHasher(KeyHasher.FNV1_32)) + .configured(TimeoutFilter.Param(10000.milliseconds)) + .configured(FailureAccrualFactory.Param(1, () => 10.minutes)) + .configured(param.EjectFailedHost(true)) + .connectionsPerEndpoint(NumConnections) + .withStatsReceiver(sr) + .newRichClient(Name.Bound.singleton(mutableAddrs), clientName) + + assert(sr.counters(redistributesKey) == 1) + assert(sr.counters(Seq("test_client", "loadbalancer", "rebuilds")) == 3) + assert(sr.counters(Seq("test_client", "loadbalancer", "updates")) == 3) + assert(sr.counters(Seq("test_client", "loadbalancer", "adds")) == NumConnections * 3) + assert(sr.counters(Seq("test_client", "loadbalancer", "removes")) == 0) + + // Add 2 nodes to the backends, for a total of 5 backends + mutableAddrs.update(Addr.Bound(addrs.toSet)) + + assert(sr.counters(redistributesKey) == 2) + // Need to rebuild each of the 5 nodes with `numConnections` + assert(sr.counters(Seq("test_client", "loadbalancer", "rebuilds")) == 5) + assert(sr.counters(Seq("test_client", "loadbalancer", "updates")) == 5) + assert(sr.counters(Seq("test_client", "loadbalancer", "adds")) == NumConnections * 5) + assert(sr.counters(Seq("test_client", "loadbalancer", "removes")) == 0) + + // Remove 1 node from the backends, for a total of 4 backends + mutableAddrs.update(Addr.Bound(addrs.toSet.drop(1))) + + assert(sr.counters(redistributesKey) == 3) + // Don't need to rebuild or update any existing nodes + assert(sr.counters(Seq("test_client", "loadbalancer", "rebuilds")) == 5) + assert(sr.counters(Seq("test_client", "loadbalancer", "updates")) == 5) + assert(sr.counters(Seq("test_client", "loadbalancer", "adds")) == NumConnections * 5) + + assert(sr.counters(leavesKey) == 1) + + // Node is removed, closing `numConnections` in the LoadBalancer + assert(sr.counters(Seq("test_client", "loadbalancer", "removes")) == NumConnections) + + // Update the backends with the same list, for a total of 4 backends + mutableAddrs.update(Addr.Bound(addrs.toSet.drop(1))) + + assert(sr.counters(redistributesKey) == 4) + // Ensure we don't do anything in the LoadBalancer because the set of nodes is the same + assert(sr.counters(Seq("test_client", "loadbalancer", "rebuilds")) == 5) + assert(sr.counters(Seq("test_client", "loadbalancer", "updates")) == 5) + assert(sr.counters(Seq("test_client", "loadbalancer", "adds")) == NumConnections * 5) + assert(sr.counters(Seq("test_client", "loadbalancer", "removes")) == NumConnections) + + client.close() + } + + test("FailureAccrualFactoryException has remote address") { + val client = Memcached.client + .configured(param.KeyHasher(KeyHasher.FNV1_32)) + .configured(TimeoutFilter.Param(10000.milliseconds)) + .configured(FailureAccrualFactory.Param(1, 10.minutes)) + .configured(param.EjectFailedHost(false)) + .connectionsPerEndpoint(1) + .newRichClient(Name.bound(Address("localhost", 1234)), clientName) + + // Trigger transition to "Dead" state + intercept[Exception] { + awaitResult(client.delete("foo")) + } + + // Client has not been ejected, so the same client gets a re-application of the connection, + // triggering the 'failureAccrualEx' in KetamaFailureAccrualFactory + val failureAccrualEx = intercept[HasRemoteInfo] { + awaitResult(client.delete("foo")) + } + + assert(failureAccrualEx.getMessage().contains("Endpoint is marked dead by failureAccrual")) + assert(failureAccrualEx.getMessage().contains("Downstream Address: localhost/127.0.0.1:1234")) + client.close() + } + + test("traces fanout requests") { + // we use an eventually block to retry the request if we didn't get partitioned to different shards + eventually { + val tracer = new BufferingTracer() + + // the servers created in `before` have inconsistent addresses, which means sharding + // will be inconsistent across runs. To combat this, we'll start our own servers and rerun the + // tests if we partition to the same shard. + val servers = + for (_ <- 1 to NumServers) yield MemcachedServer.start() + + val client = Memcached.client + .configured(param.KeyHasher(KeyHasher.FNV1_32)) + .connectionsPerEndpoint(1) + .withTracer(tracer) + .newRichClient(Name.bound(servers.map { s => Address(s.address) }: _*), clientName) + + awaitResult(client.set("foo", Buf.Utf8("bar"))) + awaitResult(client.set("baz", Buf.Utf8("boing"))) + awaitResult( + client.gets(Seq("foo", "baz")) + ).flatMap { + case (key, (Buf.Utf8(value1), Buf.Utf8(value2))) => + Map((key, (value1, value2))) + } + + client.close() + servers.foreach(_.stop()) + + val gets: Seq[TraceId] = tracer.iterator.toList collect { + case Record(id, _, Annotation.Rpc("Gets"), _) => id + } + + // Moving the MemcachedTracingFilter means that partitioned requests should result in two gets spans + assert(gets.length == 2) + // However the FanoutProxy should ensure that the requests are stored in peers, not the same tid. + gets.tail.foreach { get => + assert(get._parentId == gets.head._parentId) + assert(get.spanId != gets.head.spanId) + } + } + } + + test("set and get without partioning") { + val c = Memcached.client + .newLoadBalancedTwemcacheClient( + Name.bound(servers.map { s => Address(s.address) }.head), + clientName) + awaitResult(c.set("xyz", Buf.Utf8("value1"))) + assert(awaitResult(c.get("xyz")).get == Buf.Utf8("value1")) + } + + test("Push client uses Netty4PushTransporter") { + val simple = new SimpleRegistry() + val address = servers.head.address + GlobalRegistry.withRegistry(simple) { + val client = + Memcached.client.newService(Name.bound(com.twitter.finagle.Address(address)), "memcache") + client(Quit()) + val entries = simple.toSet + assert( + entries.contains( + Entry(Seq("client", "memcached", "memcache", "Transporter"), "Netty4PushTransporter") + ) + ) + } + } + + test("annotates the total number of hits and misses") { + withExpectedTraces( + c => { + awaitResult(c.set("foo", Buf.Utf8("bar"))) + awaitResult(c.set("bar", Buf.Utf8("baz"))) + // add a missing key + awaitResult(c.getResult(Seq("foo", "bar", "themissingkey"))) + }, + Seq( + Annotation.BinaryAnnotation("clnt/memcached.hits", 2), + Annotation.BinaryAnnotation("clnt/memcached.misses", 1) + ) + ) + } + + test("annotates with the endpoint") { + withExpectedTraces( + c => { + awaitResult(c.get("foo")) + }, + Seq( + Annotation.BinaryAnnotation("clnt/namer.name", s"Set(Inet(${servers.head.address},Map()))") + ) + ) + } + + // This works with our internal memcached because when the server is shutdown, we get an immediate + // "connection refused" when trying to send a request. With external memcached, the connection + // establishment instead hangs. To make this test pass with external memcached, we could add + // `withSession.acquisitionTimeout` to the client, but this makes the test a) slow and b) can + // make the other tests flakey, so don't bother. + if (!ExternalMemcached.use()) { + test("partial success") { + val ValueSuffix = ":" + Time.now.inSeconds + + // creating multiple random strings so that we get a uniform distribution of keys the + // ketama ring and thus the Memcached shards + val keys = 1 to 1000 map { _ => Random.alphanumeric.take(20).mkString } + val writes = keys map { key => client.set(key, Buf.Utf8(s"$key$ValueSuffix")) } + awaitResult(Future.join(writes)) + + val readValues: Map[String, Buf] = awaitResult { + client.get(keys) + } + assert(readValues.size == keys.length) + assert(readValues.keySet.toSeq.sorted == keys.sorted) + readValues.keys foreach { key => + val Buf.Utf8(readValue) = readValues(key) + assert(readValue == s"$key$ValueSuffix") + } + + val initialResult = awaitResult { + client.getResult(keys) + } + assert(initialResult.failures.isEmpty) + assert(initialResult.misses.isEmpty) + assert(initialResult.values.size == keys.size) + + // now kill one server + servers.head.stop() + + // test partial success with getResult() + val getResult = awaitResult { + client.getResult(keys) + } + // assert the failures are set to the exception received from the failing partition + assert(getResult.failures.nonEmpty) + getResult.failures.foreach { + case (_, e) => + assert(e.isInstanceOf[Exception]) + } + // there should be no misses as all keys are known + assert(getResult.misses.isEmpty) + + // assert that the values are what we expect them to be. We are not checking for exact + // number of failures and successes here because we don't know how many keys will fall into + // the failed partition. The accuracy of the responses are tested in other tests anyways. + assert(getResult.values.nonEmpty) + assert(getResult.values.size < keys.size) + getResult.values.foreach { + case (keyStr, valueBuf) => + val Buf.Utf8(valStr) = valueBuf + assert(valStr == s"$keyStr$ValueSuffix") + } + } + } + + private[this] def withExpectedTraces(f: Client => Unit, expected: Seq[Annotation]): Unit = { + val tracer = spy(new NullTracer) + when(tracer.isActivelyTracing(any[TraceId])).thenReturn(true) + when(tracer.isNull).thenReturn(false) + val captor: ArgumentCaptor[Record] = ArgumentCaptor.forClass(classOf[Record]) + + // Bind to only one server so all requests go to it + val client = Memcached.client + .configured(param.KeyHasher(KeyHasher.FNV1_32)) + .connectionsPerEndpoint(1) + .withTracer(tracer) + .newRichClient(Name.bound(Seq(Address(servers.head.address)): _*), clientName) + + f(client) + verify(tracer, atLeastOnce()).record(captor.capture()) + val annotations = captor.getAllValues.asScala collect { case Record(_, _, a, _) => a } + assert(expected.filterNot(annotations.contains(_)).isEmpty) + } +} diff --git a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedCompressingClientTest.scala b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/ClientCompressionTest.scala similarity index 98% rename from finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedCompressingClientTest.scala rename to finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/ClientCompressionTest.scala index c0087aa8280..ad2e539cdd7 100644 --- a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedCompressingClientTest.scala +++ b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/ClientCompressionTest.scala @@ -21,7 +21,7 @@ import java.net.InetSocketAddress import org.scalatest.BeforeAndAfter import org.scalatest.funsuite.AnyFunSuite -class MemcachedCompressingClientTest extends AnyFunSuite with BeforeAndAfter { +class ClientCompressionTest extends AnyFunSuite with BeforeAndAfter { val clientName = "test_client" val Timeout: Duration = 15.seconds diff --git a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/ConnectedClientAPITest.scala b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/ConnectedClientAPITest.scala new file mode 100644 index 00000000000..cde8300d423 --- /dev/null +++ b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/ConnectedClientAPITest.scala @@ -0,0 +1,39 @@ +package com.twitter.finagle.memcached.integration +import com.twitter.finagle.Memcached +import com.twitter.finagle.Name +import com.twitter.finagle.memcached.Client +import com.twitter.finagle.memcached.integration.external.ExternalMemcached +import com.twitter.finagle.memcached.protocol.ClientError + +class ConnectedClientAPITest extends ClientAPITest { + + override protected def mkClient(dest: Name): Client = { + val service = Memcached.client + .connectionsPerEndpoint(1) + .newService(dest, "memcache") + Client(service) + } + + // The twemcache client doesn't check this so move out of the common tests. Nulls are not + // scala-idiomatic so don't change the twemcache client, but keep the current checking in + // the base ConnectedClient in case it's being relied on. + test("Null Keys") { + intercept[ClientError] { + awaitResult(client.get(null: Seq[String])) + } + + intercept[ClientError] { + awaitResult(client.gets(null: Seq[String])) + } + } + + // The twemcache client doesn't support stats + if (ExternalMemcached.use()) { + test("stats") { + val stats = awaitResult(client.stats()) + assert(stats != null) + assert(!stats.isEmpty) + stats.foreach { stat => assert(stat.startsWith("STAT")) } + } + } +} diff --git a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedOldClientTest.scala b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedOldClientTest.scala deleted file mode 100644 index 4644321c3b2..00000000000 --- a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedOldClientTest.scala +++ /dev/null @@ -1,78 +0,0 @@ -package com.twitter.finagle.memcached.integration - -import com.twitter.conversions.DurationOps._ -import com.twitter.finagle._ -import com.twitter.finagle.liveness.FailureAccrualFactory -import com.twitter.finagle.memcached.Client -import com.twitter.finagle.param.Stats -import com.twitter.finagle.param.Timer -import com.twitter.finagle.partitioning.param -import com.twitter.finagle.partitioning.param.EjectFailedHost -import com.twitter.finagle.stats.InMemoryStatsReceiver -import com.twitter.util._ -import java.net.InetSocketAddress - -class MemcachedOldClientTest extends MemcachedTest { - - protected def createClient(dest: Name, clientName: String): Client = { - Memcached.client.newRichClient(dest, clientName) - } - - protected[this] val redistributesKey: Seq[String] = - Seq(clientName, "partitioner", "redistributes") - protected[this] val leavesKey: Seq[String] = Seq(clientName, "partitioner", "leaves") - protected[this] val revivalsKey: Seq[String] = Seq(clientName, "partitioner", "revivals") - protected[this] val ejectionsKey: Seq[String] = Seq(clientName, "partitioner", "ejections") - - test("re-hash when a bad host is ejected") { - val sr = new InMemoryStatsReceiver - val client = Memcached.client - .configured(FailureAccrualFactory.Param(1, () => 10.minutes)) - .configured(param.EjectFailedHost(true)) - .withStatsReceiver(sr) - .newTwemcacheClient(Name.bound(servers.map { s => Address(s.address) }: _*), clientName) - testRehashUponEject(client, sr) - client.close() - } - - test("host comes back into ring after being ejected") { - testRingReEntryAfterEjection((timer, cacheServer, statsReceiver) => - Memcached.client - .configured(FailureAccrualFactory.Param(1, () => 10.minutes)) - .configured(EjectFailedHost(true)) - .configured(Timer(timer)) - .configured(Stats(statsReceiver)) - .newTwemcacheClient( - Name.bound(Address(cacheServer.boundAddress.asInstanceOf[InetSocketAddress])), - clientName - )) - } - - test("Add and remove nodes") { - val addrs = servers.map { s => Address(s.address) } - - // Start with 3 backends - val mutableAddrs: ReadWriteVar[Addr] = new ReadWriteVar(Addr.Bound(addrs.toSet.drop(2))) - - val sr = new InMemoryStatsReceiver - - val client = Memcached.client - .connectionsPerEndpoint(NumConnections) - .withStatsReceiver(sr) - .newTwemcacheClient(Name.Bound.singleton(mutableAddrs), "test_client") - - testAddAndRemoveNodes(addrs, mutableAddrs, sr) - client.close() - } - - test("FailureAccrualFactoryException has remote address") { - val client = Memcached.client - .connectionsPerEndpoint(1) - // 1 failure triggers FA; make sure FA stays in "dead" state after failure - .configured(FailureAccrualFactory.Param(1, 10.minutes)) - .withEjectFailedHost(false) - .newTwemcacheClient(Name.bound(Address("localhost", 1234)), "client") - testFailureAccrualFactoryExceptionHasRemoteAddress(client) - client.close() - } -} diff --git a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedPartitioningClientTest.scala b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedPartitioningClientTest.scala deleted file mode 100644 index c26378833d2..00000000000 --- a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedPartitioningClientTest.scala +++ /dev/null @@ -1,144 +0,0 @@ -package com.twitter.finagle.memcached.integration - -import com.twitter.conversions.DurationOps._ -import com.twitter.finagle._ -import com.twitter.finagle.liveness.FailureAccrualFactory -import com.twitter.finagle.memcached.Client -import com.twitter.finagle.memcached.integration.external.TestMemcachedServer -import com.twitter.finagle.partitioning.param -import com.twitter.finagle.service.TimeoutFilter -import com.twitter.finagle.stats.InMemoryStatsReceiver -import com.twitter.finagle.tracing.Annotation -import com.twitter.finagle.tracing.BufferingTracer -import com.twitter.finagle.tracing.Record -import com.twitter.finagle.tracing.TraceId -import com.twitter.finagle.{param => ctfparam} -import com.twitter.hashing.KeyHasher -import com.twitter.io.Buf -import com.twitter.util._ -import java.net.InetAddress -import java.net.InetSocketAddress - -class MemcachedPartitioningClientTest extends MemcachedTest { - - protected def createClient(dest: Name, clientName: String): Client = { - Memcached.client.newRichClient(dest, clientName) - } - - protected[this] val redistributesKey: Seq[String] = - Seq("test_client", "partitioner", "redistributes") - protected[this] val leavesKey: Seq[String] = Seq(clientName, "partitioner", "leaves") - protected[this] val revivalsKey: Seq[String] = Seq(clientName, "partitioner", "revivals") - protected[this] val ejectionsKey: Seq[String] = Seq(clientName, "partitioner", "ejections") - - test("re-hash when a bad host is ejected") { - val sr = new InMemoryStatsReceiver - val client = Memcached.client - .configured(param.KeyHasher(KeyHasher.FNV1_32)) - .configured(TimeoutFilter.Param(10000.milliseconds)) - .configured(param.EjectFailedHost(true)) - .configured(FailureAccrualFactory.Param(1, () => 10.minutes)) - .configured(ctfparam.Stats(sr)) - .newRichClient(Name.bound(servers.map { s => Address(s.address) }: _*), clientName) - testRehashUponEject(client, sr) - client.close() - } - - test("host comes back into ring after being ejected") { - testRingReEntryAfterEjection((timer, cacheServer, statsReceiver) => - Memcached.client - .configured(param.KeyHasher(KeyHasher.FNV1_32)) - .configured(TimeoutFilter.Param(10000.milliseconds)) - .configured(FailureAccrualFactory.Param(1, () => 10.minutes)) - .configured(param.EjectFailedHost(true)) - .configured(ctfparam.Timer(timer)) - .configured(ctfparam.Stats(statsReceiver)) - .newRichClient( - Name.bound(Address(cacheServer.boundAddress.asInstanceOf[InetSocketAddress])), - clientName)) - } - - test("Add and remove nodes") { - val addrs = servers.map { s => Address(s.address) } - - // Start with 3 backends - val mutableAddrs: ReadWriteVar[Addr] = new ReadWriteVar(Addr.Bound(addrs.toSet.drop(2))) - - val sr = new InMemoryStatsReceiver - - val client = Memcached.client - .configured(param.KeyHasher(KeyHasher.FNV1_32)) - .configured(TimeoutFilter.Param(10000.milliseconds)) - .configured(FailureAccrualFactory.Param(1, () => 10.minutes)) - .configured(param.EjectFailedHost(true)) - .connectionsPerEndpoint(NumConnections) - .withStatsReceiver(sr) - .newRichClient(Name.Bound.singleton(mutableAddrs), clientName) - testAddAndRemoveNodes(addrs, mutableAddrs, sr) - client.close() - } - - test("FailureAccrualFactoryException has remote address") { - val client = Memcached.client - .configured(param.KeyHasher(KeyHasher.FNV1_32)) - .configured(TimeoutFilter.Param(10000.milliseconds)) - .configured(FailureAccrualFactory.Param(1, 10.minutes)) - .configured(param.EjectFailedHost(false)) - .connectionsPerEndpoint(1) - .newRichClient(Name.bound(Address("localhost", 1234)), clientName) - testFailureAccrualFactoryExceptionHasRemoteAddress(client) - client.close() - } - - if (!Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) { - test("traces fanout requests") { - // we use an eventually block to retry the request if we didn't get partitioned to different shards - eventually { - val tracer = new BufferingTracer() - - // the servers created in MemcachedTest have inconsistent addresses, which means sharding - // will be inconsistent across runs. To combat this, we'll start our own servers and rerun the - // tests if we partition to the same shard. - val serverOpts = - for (_ <- 1 to NumServers) - yield TestMemcachedServer.start( - Some(new InetSocketAddress(InetAddress.getLoopbackAddress, 0))) - val servers: Seq[TestMemcachedServer] = serverOpts.flatten - - val client = Memcached.client - .configured(param.KeyHasher(KeyHasher.FNV1_32)) - .connectionsPerEndpoint(1) - .withTracer(tracer) - .newRichClient(Name.bound(servers.map { s => Address(s.address) }: _*), clientName) - - awaitResult(client.set("foo", Buf.Utf8("bar"))) - awaitResult(client.set("baz", Buf.Utf8("boing"))) - awaitResult( - client.gets(Seq("foo", "baz")) - ).flatMap { - case (key, (Buf.Utf8(value1), Buf.Utf8(value2))) => - Map((key, (value1, value2))) - } - - client.close() - servers.foreach(_.stop()) - - val gets: Seq[TraceId] = tracer.iterator.toList collect { - case Record(id, _, Annotation.Rpc("Gets"), _) => id - } - - // Moving the MemcachedTracingFilter means that partitioned requests should result in two gets spans - assert(gets.length == 2) - // However the FanoutProxy should ensure that the requests are stored in peers, not the same tid. - gets.tail.foreach { get => - assert(get._parentId == gets.head._parentId) - assert(get.spanId != gets.head.spanId) - } - } - } - } - - test("data read/write consistency between old and new clients") { - testCompatibility() - } -} diff --git a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedServerTest.scala b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedServerTest.scala index d0c4d93835e..f16f8d63547 100644 --- a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedServerTest.scala +++ b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedServerTest.scala @@ -10,8 +10,9 @@ import com.twitter.finagle.client.StackClient import com.twitter.finagle.client.StdStackClient import com.twitter.finagle.client.Transporter import com.twitter.finagle.dispatch.SerialClientDispatcher +import com.twitter.finagle.memcached.integration.external.ExternalMemcached import com.twitter.finagle.memcached.integration.external.InternalMemcached -import com.twitter.finagle.memcached.integration.external.TestMemcachedServer +import com.twitter.finagle.memcached.integration.external.MemcachedServer import com.twitter.finagle.memcached.protocol.Add import com.twitter.finagle.memcached.protocol.Cas import com.twitter.finagle.memcached.protocol.Command @@ -41,15 +42,15 @@ import org.scalatest.funsuite.AnyFunSuite // Memcached protocol. private class MemcachedServerTest extends AnyFunSuite with BeforeAndAfter { - private[this] var realServer: TestMemcachedServer = _ - private[this] var testServer: TestMemcachedServer = _ + private[this] var realServer: MemcachedServer = _ + private[this] var testServer: MemcachedServer = _ private[this] var realServerClient: Service[Command, String] = _ private[this] var testServerClient: Service[Command, String] = _ before { - realServer = TestMemcachedServer.start().get - testServer = InternalMemcached.start(None).get + realServer = MemcachedServer.start() + testServer = InternalMemcached.start() realServerClient = StringClient .apply().newService(Name.bound(Address(realServer.address)), "client") @@ -65,7 +66,7 @@ private class MemcachedServerTest extends AnyFunSuite with BeforeAndAfter { Await.result(testServerClient.close(), 5.seconds) } - if (Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) { + if (ExternalMemcached.use()) { test("NOT_FOUND") { assertSameResponses(Incr(Buf.Utf8("key1"), 1), "NOT_FOUND\r\n") } diff --git a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedTest.scala b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedTest.scala deleted file mode 100644 index c0d172ff291..00000000000 --- a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedTest.scala +++ /dev/null @@ -1,542 +0,0 @@ -package com.twitter.finagle.memcached.integration - -import com.twitter.conversions.DurationOps._ -import com.twitter.finagle._ -import com.twitter.finagle.memcached.integration.external.TestMemcachedServer -import com.twitter.finagle.memcached.protocol.ClientError -import com.twitter.finagle.memcached.Client -import com.twitter.finagle.memcached.GetResult -import com.twitter.finagle.memcached.GetsResult -import com.twitter.finagle.memcached.PartitionedClient -import com.twitter.finagle.stats.InMemoryStatsReceiver -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.io.Buf -import com.twitter.util._ -import java.net.InetAddress -import java.net.InetSocketAddress -import org.scalatest.concurrent.Eventually -import org.scalatest.concurrent.PatienceConfiguration -import org.scalatest.time.Milliseconds -import org.scalatest.time.Seconds -import org.scalatest.time.Span -import org.scalatest.BeforeAndAfter -import org.scalatest.Outcome -import scala.util.Random -import org.scalatest.funsuite.AnyFunSuite - -abstract class MemcachedTest - extends AnyFunSuite - with BeforeAndAfter - with Eventually - with PatienceConfiguration { - - private val ValueSuffix = ":" + Time.now.inSeconds - - private def randomString(length: Int): String = { - Random.alphanumeric.take(length).mkString - } - - protected[this] val NumServers = 5 - protected[this] val NumConnections = 4 - protected[this] val Timeout: Duration = 15.seconds - protected[this] var servers: Seq[TestMemcachedServer] = Seq.empty - protected[this] var client: Client = _ - protected[this] var singleServerClient: Client = _ - protected[this] val clientName = "test_client" - - protected[this] val redistributesKey: Seq[String] - protected[this] val leavesKey: Seq[String] - protected[this] val revivalsKey: Seq[String] - protected[this] val ejectionsKey: Seq[String] - protected[this] def createClient(dest: Name, clientName: String): Client - - before { - val serversOpt = for (_ <- 1 to NumServers) yield TestMemcachedServer.start() - - if (serversOpt.forall(_.isDefined)) { - servers = serversOpt.flatten - val dest = Name.bound(servers.map { s => Address(s.address) }: _*) - client = createClient(dest, clientName) - } - - singleServerClient = - createClient(Name.bound(Seq(Address(servers.head.address)): _*), clientName) - } - - after { - servers.foreach(_.stop()) - client.close() - } - - override def withFixture(test: NoArgTest): Outcome = { - if (servers.length == NumServers) { - test() - } else { - info("Cannot start memcached. Skipping test...") - cancel() - } - } - - protected[this] def awaitResult[T](awaitable: Awaitable[T]): T = Await.result(awaitable, Timeout) - - test("set & get") { - awaitResult(client.delete("foo")) - assert(awaitResult(client.get("foo")) == None) - awaitResult(client.set("foo", Buf.Utf8("bar"))) - assert(awaitResult(client.get("foo")).get == Buf.Utf8("bar")) - } - - test("set and get without partioning") { - val c = Memcached.client - .newLoadBalancedTwemcacheClient( - Name.bound(servers.map { s => Address(s.address) }.head), - clientName) - awaitResult(c.set("xyz", Buf.Utf8("value1"))) - assert(awaitResult(c.get("xyz")).get == Buf.Utf8("value1")) - } - - test("set & get data containing newlines") { - awaitResult(client.delete("bob")) - assert(awaitResult(client.get("bob")) == None) - awaitResult(client.set("bob", Buf.Utf8("hello there \r\n nice to meet \r\n you"))) - assert( - awaitResult(client.get("bob")).get == Buf.Utf8("hello there \r\n nice to meet \r\n you"), - 3.seconds - ) - } - - test("empty key sequence") { - assert(awaitResult(client.get(Seq.empty)).isEmpty) - assert(awaitResult(client.gets(Seq.empty)).isEmpty) - assert(awaitResult(client.getWithFlag(Seq.empty)).isEmpty) - assert(awaitResult(client.getsWithFlag(Seq.empty)).isEmpty) - assert(awaitResult(client.getResult(Seq.empty)) == GetResult.Empty) - assert(awaitResult(client.getsResult(Seq.empty)) == GetsResult(GetResult.Empty)) - } - - test("get") { - awaitResult(client.set("foo", Buf.Utf8("bar"))) - awaitResult(client.set("baz", Buf.Utf8("boing"))) - val result = - awaitResult( - client.get(Seq("foo", "baz", "notthere")) - ).map { - case (key, Buf.Utf8(value)) => - (key, value) - } - assert(result == Map("foo" -> "bar", "baz" -> "boing")) - } - - test("getWithFlag") { - awaitResult(client.set("foo", Buf.Utf8("bar"))) - awaitResult(client.set("baz", Buf.Utf8("boing"))) - val result = awaitResult(client.getWithFlag(Seq("foo", "baz", "notthere"))) - .map { case (key, ((Buf.Utf8(value), Buf.Utf8(flag)))) => (key, (value, flag)) } - assert( - result == Map( - "foo" -> (("bar", "0")), - "baz" -> (("boing", "0")) - ) - ) - } - - if (Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) { - test("gets") { - // use client that connects to only one server so we can predict CAS tokens - awaitResult(singleServerClient.set("foos", Buf.Utf8("xyz"))) - awaitResult(singleServerClient.set("bazs", Buf.Utf8("xyz"))) - awaitResult(singleServerClient.set("bazs", Buf.Utf8("zyx"))) - awaitResult(singleServerClient.set("bars", Buf.Utf8("xyz"))) - awaitResult(singleServerClient.set("bars", Buf.Utf8("zyx"))) - awaitResult(singleServerClient.set("bars", Buf.Utf8("yxz"))) - val result = - awaitResult( - singleServerClient.gets(Seq("foos", "bazs", "bars", "somethingelse")) - ).map { - case (key, (Buf.Utf8(value), Buf.Utf8(casUnique))) => - (key, (value, casUnique)) - } - // the "cas unique" values are predictable from a fresh memcached - val expected = - Map( - "foos" -> (("xyz", "2")), - "bazs" -> (("zyx", "4")), - "bars" -> (("yxz", "7")) - ) - assert(result == expected) - } - } - - if (Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) { - test("getsWithFlag") { - // use client that connects to only one server so we can predict CAS tokens - awaitResult(singleServerClient.set("foos1", Buf.Utf8("xyz"))) - awaitResult(singleServerClient.set("bazs1", Buf.Utf8("xyz"))) - awaitResult(singleServerClient.set("bazs1", Buf.Utf8("zyx"))) - val result = - awaitResult(singleServerClient.getsWithFlag(Seq("foos1", "bazs1", "somethingelse"))) - .map { - case (key, (Buf.Utf8(value), Buf.Utf8(flag), Buf.Utf8(casUnique))) => - (key, (value, flag, casUnique)) - } - - // the "cas unique" values are predictable from a fresh memcached - assert( - result == Map( - "foos1" -> (("xyz", "0", "2")), - "bazs1" -> (("zyx", "0", "4")) - ) - ) - } - } - - if (Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) { - test("cas") { - // use client that connects to only one server so we can predict CAS tokens - awaitResult(singleServerClient.set("x", Buf.Utf8("y"))) // Next CAS: 2 - val Some((value, casUnique)) = awaitResult(singleServerClient.gets("x")) - assert(value == Buf.Utf8("y")) - assert(casUnique == Buf.Utf8("2")) - - assert( - !awaitResult( - singleServerClient.checkAndSet("x", Buf.Utf8("z"), Buf.Utf8("1")).map(_.replaced))) - assert( - awaitResult( - singleServerClient - .checkAndSet("x", Buf.Utf8("z"), casUnique).map(_.replaced)).booleanValue - ) - val res = awaitResult(singleServerClient.get("x")) - assert(res.isDefined) - assert(res.get == Buf.Utf8("z")) - } - } - - test("append & prepend") { - awaitResult(client.set("foo", Buf.Utf8("bar"))) - awaitResult(client.append("foo", Buf.Utf8("rab"))) - assert(awaitResult(client.get("foo")).get == Buf.Utf8("barrab")) - awaitResult(client.prepend("foo", Buf.Utf8("rab"))) - assert(awaitResult(client.get("foo")).get == Buf.Utf8("rabbarrab")) - } - - test("incr & decr") { - // As of memcached 1.4.8 (issue 221), empty values are no longer treated as integers - awaitResult(client.set("foo", Buf.Utf8("0"))) - assert(awaitResult(client.incr("foo")) == Some(1L)) - assert(awaitResult(client.incr("foo", 2)) == Some(3L)) - assert(awaitResult(client.decr("foo")) == Some(2L)) - - awaitResult(client.set("foo", Buf.Utf8("0"))) - assert(awaitResult(client.incr("foo")) == Some(1L)) - val l = 1L << 50 - assert(awaitResult(client.incr("foo", l)) == Some(l + 1L)) - assert(awaitResult(client.decr("foo")) == Some(l)) - assert(awaitResult(client.decr("foo", l)) == Some(0L)) - } - - if (Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) { - test("stats") { - // We can't use a partitioned client to get stats, because we don't hash to a server based on - // a key. Instead, we create a ConnectedClient, which is connected to one server. - val service = Memcached.client.newService(Name.bound(Address(servers.head.address)), "client") - - val connectedClient = Client(service) - val stats = awaitResult(connectedClient.stats()) - assert(stats != null) - assert(stats.nonEmpty) - stats.foreach { stat => assert(stat.startsWith("STAT")) } - service.close() - } - } - - test("send malformed keys") { - // test key validation trait - intercept[ClientError] { awaitResult(client.get("fo o")) } - intercept[ClientError] { awaitResult(client.set("", Buf.Utf8("bar"))) } - intercept[ClientError] { awaitResult(client.get(" foo")) } - intercept[ClientError] { awaitResult(client.get("foo ")) } - intercept[ClientError] { awaitResult(client.get(" foo")) } - val nullString: String = null - - intercept[ClientError] { awaitResult(client.get(nullString)) } - intercept[ClientError] { awaitResult(client.set(nullString, Buf.Utf8("bar"))) } - intercept[ClientError] { awaitResult(client.set(" ", Buf.Utf8("bar"))) } - - // "\t" is a valid key - assert(awaitResult(client.set("\t", Buf.Utf8("bar")).liftToTry) == Return.Unit) - intercept[ClientError] { awaitResult(client.set("\r", Buf.Utf8("bar"))) } - intercept[ClientError] { awaitResult(client.set("\n", Buf.Utf8("bar"))) } - intercept[ClientError] { awaitResult(client.set("\u0000", Buf.Utf8("bar"))) } - - val veryLongKey = "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstu" + - "vwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdef" + - "ghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopq" + - "rstuvwxyz" - intercept[ClientError] { awaitResult(client.get(veryLongKey)) } - intercept[ClientError] { awaitResult(client.set(veryLongKey, Buf.Utf8("bar"))) } - - // test other keyed command validation - val nullSeq: Seq[String] = null - intercept[NullPointerException] { awaitResult(client.get(nullSeq)) } - - intercept[ClientError] { awaitResult(client.append("bad key", Buf.Utf8("rab"))) } - intercept[ClientError] { awaitResult(client.prepend("bad key", Buf.Utf8("rab"))) } - intercept[ClientError] { awaitResult(client.replace("bad key", Buf.Utf8("bar"))) } - intercept[ClientError] { awaitResult(client.add("bad key", Buf.Utf8("2"))) } - intercept[ClientError] { - awaitResult(client.checkAndSet("bad key", Buf.Utf8("z"), Buf.Utf8("2"))) - } - intercept[ClientError] { awaitResult(client.incr("bad key")) } - intercept[ClientError] { awaitResult(client.decr("bad key")) } - intercept[ClientError] { awaitResult(client.delete("bad key")) } - } - - protected[this] def testRehashUponEject(client: Client, sr: InMemoryStatsReceiver): Unit = { - val max = 200 - // set values - awaitResult( - Future.collect( - (0 to max).map { i => client.set(s"foo$i", Buf.Utf8(s"bar$i")) } - ) - ) - - // We can't control the Distributor to make sure that for the set of servers, there is at least - // one client in the partition talking to it. Therefore, we rely on the fact that for 5 - // backends, it's very unlikely all clients will be talking to the same server, and as such, - // shutting down all backends but one will trigger cache misses. - servers.tail.foreach(_.stop()) - - // trigger ejection - for (i <- 0 to max) { - Await.ready(client.get(s"foo$i"), Timeout) - } - // wait a little longer than default to prevent test flappiness - val timeout = PatienceConfiguration.Timeout(Span(10, Seconds)) - val interval = PatienceConfiguration.Interval(Span(100, Milliseconds)) - eventually(timeout, interval) { - assert(sr.counters.getOrElse(ejectionsKey, 0L) > 2) - } - - client match { - case partitionedClient: PartitionedClient => - val clientSet = - (0 to max).foldLeft(Set[Client]()) { - case (s, i) => - val c = partitionedClient.clientOf(s"foo$i") - s + c - } - assert(clientSet.size == 1) - case _ => - // the new client doesn't have a way to get to the equivalent functionality in - // ConsistentHashPartitioningService.partitionByKey - } - - // previously set values have cache misses - var cacheMisses = 0 - for (i <- 0 to max) { - if (awaitResult(client.get(s"foo$i")).isEmpty) cacheMisses = cacheMisses + 1 - } - assert(cacheMisses > 0) - } - - protected[this] def testRingReEntryAfterEjection( - createClient: (MockTimer, ListeningServer, StatsReceiver) => Client - ): Unit = { - import com.twitter.finagle.memcached.protocol._ - - class MockedMemcacheServer extends Service[Command, Response] { - def apply(command: Command): Future[Response with Product with Serializable] = command match { - case Get(_) => Future.value(Values(List(Value(Buf.Utf8("foo"), Buf.Utf8("bar"))))) - case Set(_, _, _, _) => Future.value(Error(new Exception)) - case x => Future.exception(new MatchError(x)) - } - } - - val cacheServer: ListeningServer = Memcached.serve( - new InetSocketAddress(InetAddress.getLoopbackAddress, 0), - new MockedMemcacheServer - ) - - val timer = new MockTimer - val statsReceiver = new InMemoryStatsReceiver - - val client = createClient(timer, cacheServer, statsReceiver) - - Time.withCurrentTimeFrozen { timeControl => - // Send a bad request - intercept[Exception] { awaitResult(client.set("foo", Buf.Utf8("bar"))) } - - // Node should have been ejected - val timeout = PatienceConfiguration.Timeout(Span(1, Seconds)) - eventually(timeout) { assert(statsReceiver.counters.get(ejectionsKey).contains(1)) } - - // Node should have been marked dead, and still be dead after 5 minutes - timeControl.advance(5.minutes) - - // Shard should be unavailable - intercept[ShardNotAvailableException] { - awaitResult(client.get(s"foo")) - } - - timeControl.advance(5.minutes) - timer.tick() - - // 10 minutes (markDeadFor duration) have passed, so the request should go through - assert(statsReceiver.counters.get(revivalsKey) == Some(1)) - assert(awaitResult(client.get(s"foo")).get == Buf.Utf8("bar")) - } - client.close() - } - - protected[this] def testAddAndRemoveNodes( - addrs: Seq[Address], - mutableAddrs: ReadWriteVar[Addr], - sr: InMemoryStatsReceiver - ): Unit = { - assert(sr.counters(redistributesKey) == 1) - assert(sr.counters(Seq("test_client", "loadbalancer", "rebuilds")) == 3) - assert(sr.counters(Seq("test_client", "loadbalancer", "updates")) == 3) - assert(sr.counters(Seq("test_client", "loadbalancer", "adds")) == NumConnections * 3) - assert(sr.counters(Seq("test_client", "loadbalancer", "removes")) == 0) - - // Add 2 nodes to the backends, for a total of 5 backends - mutableAddrs.update(Addr.Bound(addrs.toSet)) - - assert(sr.counters(redistributesKey) == 2) - // Need to rebuild each of the 5 nodes with `numConnections` - assert(sr.counters(Seq("test_client", "loadbalancer", "rebuilds")) == 5) - assert(sr.counters(Seq("test_client", "loadbalancer", "updates")) == 5) - assert(sr.counters(Seq("test_client", "loadbalancer", "adds")) == NumConnections * 5) - assert(sr.counters(Seq("test_client", "loadbalancer", "removes")) == 0) - - // Remove 1 node from the backends, for a total of 4 backends - mutableAddrs.update(Addr.Bound(addrs.toSet.drop(1))) - - assert(sr.counters(redistributesKey) == 3) - // Don't need to rebuild or update any existing nodes - assert(sr.counters(Seq("test_client", "loadbalancer", "rebuilds")) == 5) - assert(sr.counters(Seq("test_client", "loadbalancer", "updates")) == 5) - assert(sr.counters(Seq("test_client", "loadbalancer", "adds")) == NumConnections * 5) - - assert(sr.counters(leavesKey) == 1) - - // Node is removed, closing `numConnections` in the LoadBalancer - assert(sr.counters(Seq("test_client", "loadbalancer", "removes")) == NumConnections) - - // Update the backends with the same list, for a total of 4 backends - mutableAddrs.update(Addr.Bound(addrs.toSet.drop(1))) - - assert(sr.counters(redistributesKey) == 4) - // Ensure we don't do anything in the LoadBalancer because the set of nodes is the same - assert(sr.counters(Seq("test_client", "loadbalancer", "rebuilds")) == 5) - assert(sr.counters(Seq("test_client", "loadbalancer", "updates")) == 5) - assert(sr.counters(Seq("test_client", "loadbalancer", "adds")) == NumConnections * 5) - assert(sr.counters(Seq("test_client", "loadbalancer", "removes")) == NumConnections) - } - - protected[this] def testFailureAccrualFactoryExceptionHasRemoteAddress(client: Client): Unit = { - // Trigger transition to "Dead" state - intercept[Exception] { - awaitResult(client.delete("foo")) - } - - // Client has not been ejected, so the same client gets a re-application of the connection, - // triggering the 'failureAccrualEx' in KetamaFailureAccrualFactory - val failureAccrualEx = intercept[HasRemoteInfo] { - awaitResult(client.delete("foo")) - } - - assert(failureAccrualEx.getMessage().contains("Endpoint is marked dead by failureAccrual")) - assert(failureAccrualEx.getMessage().contains("Downstream Address: localhost/127.0.0.1:1234")) - } - - def writeKeys(client: Client, numKeys: Int, keyLength: Int): Seq[String] = { - // creating multiple random strings so that we get a uniform distribution of keys the - // ketama ring and thus the Memcached shards - val keys = 1 to numKeys map { _ => randomString(keyLength) } - val writes = keys map { key => client.set(key, Buf.Utf8(s"$key$ValueSuffix")) } - awaitResult(Future.join(writes)) - keys - } - - def assertRead(client: Client, keys: Seq[String]): Unit = { - val readValues: Map[String, Buf] = awaitResult { client.get(keys) } - assert(readValues.size == keys.length) - assert(readValues.keySet.toSeq.sorted == keys.sorted) - readValues.keys foreach { key => - val Buf.Utf8(readValue) = readValues(key) - assert(readValue == s"$key$ValueSuffix") - } - } - - /** - * Test compatibility between old and new clients for the migration phase - */ - protected[this] def testCompatibility(): Unit = { - val numKeys = 20 - val keyLength = 50 - - val newClient = client - val oldClient = { - val dest = Name.bound(servers.map { s => Address(s.address) }: _*) - Memcached.client.newRichClient(dest, clientName) - } - - // make sure old and new client can read the values written by old client - val keys1 = writeKeys(oldClient, numKeys, keyLength) - assertRead(oldClient, keys1) - assertRead(newClient, keys1) - - // make sure old and new client can read the values written by new client - val keys2 = writeKeys(newClient, numKeys, keyLength) - assertRead(oldClient, keys2) - assertRead(newClient, keys2) - } - - // This works with our internal memcached because when the server is shutdown, we get an immediate - // "connection refused" when trying to send a request. With external memcached, the connection - // establishment instead hangs. To make this test pass with external memcached, we could add - // `withSession.acquisitionTimeout` to the client, but this makes the test a) slow and b) can - // make the other tests flakey, so don't bother. - if (!Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) { - test("partial success") { - val keys = writeKeys(client, 1000, 20) - assertRead(client, keys) - - val initialResult = awaitResult { - client.getResult(keys) - } - assert(initialResult.failures.isEmpty) - assert(initialResult.misses.isEmpty) - assert(initialResult.values.size == keys.size) - - // now kill one server - servers.head.stop() - - // test partial success with getResult() - val getResult = awaitResult { - client.getResult(keys) - } - // assert the failures are set to the exception received from the failing partition - assert(getResult.failures.nonEmpty) - getResult.failures.foreach { - case (_, e) => - assert(e.isInstanceOf[Exception]) - } - // there should be no misses as all keys are known - assert(getResult.misses.isEmpty) - - // assert that the values are what we expect them to be. We are not checking for exact - // number of failures and successes here because we don't know how many keys will fall into - // the failed partition. The accuracy of the responses are tested in other tests anyways. - assert(getResult.values.nonEmpty) - assert(getResult.values.size < keys.size) - getResult.values.foreach { - case (keyStr, valueBuf) => - val Buf.Utf8(valStr) = valueBuf - assert(valStr == s"$keyStr$ValueSuffix") - } - } - } -} diff --git a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedUtilityTest.scala b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedUtilityTest.scala deleted file mode 100644 index 0fdb92f7c7f..00000000000 --- a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedUtilityTest.scala +++ /dev/null @@ -1,122 +0,0 @@ -package com.twitter.finagle.memcached.integration - -import com.twitter.conversions.DurationOps._ -import com.twitter.finagle.{param => ctfparam, _} -import com.twitter.finagle.liveness.FailureAccrualFactory -import com.twitter.finagle.memcached.{Client, TwemcacheClient} -import com.twitter.finagle.partitioning.param -import com.twitter.finagle.stats.InMemoryStatsReceiver -import com.twitter.util.ReadWriteVar -import java.net.InetSocketAddress - -class MemcachedUtilityTest extends MemcachedTest { - - protected[this] val redistributesKey: Seq[String] = - Seq("test_client", "partitioner", "redistributes") - protected[this] val leavesKey: Seq[String] = Seq(clientName, "partitioner", "leaves") - protected[this] val revivalsKey: Seq[String] = Seq(clientName, "partitioner", "revivals") - protected[this] val ejectionsKey: Seq[String] = Seq(clientName, "partitioner", "ejections") - - protected def createClient(dest: Name, clientName: String): Client = { - newPartitioningClient(dest) - } - - private[this] def newPartitioningClient( - dest: Name, - label: String = clientName, - create: (Name, String) => Client = defaultCreate - ): Client = { - var client: Client = null - client = create(dest, label) - assert(client.isInstanceOf[TwemcacheClient]) // assert new client - client - } - - private[this] def defaultCreate(dest: Name, label: String): Client = { - Memcached.client.newRichClient(dest, label) - } - - test("re-hash when a bad host is ejected") { - val sr = new InMemoryStatsReceiver - val dest = Name.bound(servers.map { s => Address(s.address) }: _*) - val client = newPartitioningClient( - dest, - clientName, - (dest, clientName) => { - Memcached.client - .configured(FailureAccrualFactory.Param(1, () => 10.minutes)) - .configured(param.EjectFailedHost(true)) - .withStatsReceiver(sr) - .newRichClient(dest, clientName) - } - ) - testRehashUponEject(client, sr) - client.close() - } - - test("host comes back into ring after being ejected") { - testRingReEntryAfterEjection((timer, cacheServer, statsReceiver) => { - val dest: Name = - Name.bound(Address(cacheServer.boundAddress.asInstanceOf[InetSocketAddress])) - val client: Client = newPartitioningClient( - dest, - clientName, - (dest, clientName) => { - Memcached.client - .configured(FailureAccrualFactory.Param(1, () => 10.minutes)) - .configured(param.EjectFailedHost(true)) - .configured(ctfparam.Timer(timer)) - .configured(ctfparam.Stats(statsReceiver)) - .newRichClient(dest, clientName) - } - ) - client - }) - } - - test("Add and remove nodes") { - val addrs = servers.map { s => Address(s.address) } - - // Start with 3 backends - val mutableAddrs: ReadWriteVar[Addr] = new ReadWriteVar(Addr.Bound(addrs.toSet.drop(2))) - - val sr = new InMemoryStatsReceiver - val dest = Name.Bound.singleton(mutableAddrs) - val clientName = "test_client" - val client = newPartitioningClient( - dest, - clientName, - (dest, clientName) => { - Memcached.client - .connectionsPerEndpoint(NumConnections) - .withStatsReceiver(sr) - .newRichClient(dest, clientName) - } - ) - testAddAndRemoveNodes(addrs, mutableAddrs, sr) - client.close() - } - - test("FailureAccrualFactoryException has remote address") { - val dest = Name.bound(Address("localhost", 1234)) - val label = "client" - val client = newPartitioningClient( - dest, - clientName, - (dest, clientName) => { - Memcached.client - .connectionsPerEndpoint(1) - // 1 failure triggers FA; make sure FA stays in "dead" state after failure - .configured(FailureAccrualFactory.Param(1, 10.minutes)) - .withEjectFailedHost(false) - .newTwemcacheClient(dest, clientName) - } - ) - testFailureAccrualFactoryExceptionHasRemoteAddress(client) - client.close() - } - - test("data read/write consistency between old and new clients") { - testCompatibility() - } -} diff --git a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/ProxyTest.scala b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/ProxyTest.scala index 8553f937c2d..39cd77e182c 100644 --- a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/ProxyTest.scala +++ b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/ProxyTest.scala @@ -3,7 +3,8 @@ package com.twitter.finagle.memcached.integration import com.twitter.conversions.DurationOps._ import com.twitter.finagle._ import com.twitter.finagle.memcached.Client -import com.twitter.finagle.memcached.integration.external.TestMemcachedServer +import com.twitter.finagle.memcached.integration.external.ExternalMemcached +import com.twitter.finagle.memcached.integration.external.MemcachedServer import com.twitter.finagle.memcached.protocol.Command import com.twitter.finagle.memcached.protocol.Response import com.twitter.io.Buf @@ -23,59 +24,45 @@ class ProxyTest extends AnyFunSuite with BeforeAndAfter { type MemcacheService = Service[Command, Response] - /** - * Note: This integration test requires a real Memcached server to run. - */ var externalClient: Client = null var server: ListeningServer = null var serverAddress: InetSocketAddress = null var proxyService: MemcacheService = null var proxyClient: MemcacheService = null - var testServer: Option[TestMemcachedServer] = None + var testServer: MemcachedServer = null before { - testServer = TestMemcachedServer.start() - if (testServer.isDefined) { - Thread.sleep(150) // On my box the 100ms sleep wasn't long enough - proxyClient = Memcached.client - .connectionsPerEndpoint(1) - .newService( - Name.bound(Address(testServer.get.address.asInstanceOf[InetSocketAddress])), - "memcached" - ) - - proxyService = new MemcacheService { - def apply(request: Command) = proxyClient(request) - } - - server = Memcached.server - .withLabel("memcached") - .serve(new InetSocketAddress(InetAddress.getLoopbackAddress, 0), proxyService) - - serverAddress = server.boundAddress.asInstanceOf[InetSocketAddress] - externalClient = Client( - Memcached.client - .newService("%s:%d".format(serverAddress.getHostName, serverAddress.getPort)) + testServer = MemcachedServer.start() + Thread.sleep(150) // On my box the 100ms sleep wasn't long enough + proxyClient = Memcached.client + .connectionsPerEndpoint(1) + .newService( + Name.bound(com.twitter.finagle.Address(testServer.address)), + "memcached" ) + + proxyService = new MemcacheService { + def apply(request: Command) = proxyClient(request) } + + server = Memcached.server + .withLabel("memcached") + .serve(new InetSocketAddress(InetAddress.getLoopbackAddress, 0), proxyService) + + serverAddress = server.boundAddress.asInstanceOf[InetSocketAddress] + externalClient = Client( + Memcached.client + .newService("%s:%d".format(serverAddress.getHostName, serverAddress.getPort)) + ) } after { // externalClient.close() needs to be called explicitly by each test. Otherwise // 'quit' test would call it twice. - if (testServer.isDefined) { - server.close(0.seconds) - proxyService.close() - proxyClient.close() - testServer.map(_.stop()) - } - } - - override def withFixture(test: NoArgTest) = { - if (testServer == None) { - info("Cannot start memcached. skipping test...") - cancel() - } else test() + server.close(0.seconds) + proxyService.close() + proxyClient.close() + testServer.stop() } test("Proxied Memcached Servers should handle a basic get/set operation") { @@ -89,7 +76,15 @@ class ProxyTest extends AnyFunSuite with BeforeAndAfter { awaitResult(externalClient.close()) } - if (Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) { + test("quit is supported") { + awaitResult(externalClient.get("foo")) // do nothing + awaitResult(externalClient.quit()) + intercept[ServiceClosedException] { + awaitResult(externalClient.get("foo")) + } + } + + if (ExternalMemcached.use()) { test("stats is supported") { awaitResult(externalClient.delete("foo")) assert(awaitResult(externalClient.get("foo")) == None) @@ -102,9 +97,7 @@ class ProxyTest extends AnyFunSuite with BeforeAndAfter { } awaitResult(externalClient.close()) } - } - if (Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) { test("stats (cachedump) is supported") { awaitResult(externalClient.delete("foo")) assert(awaitResult(externalClient.get("foo")) == None) @@ -125,13 +118,4 @@ class ProxyTest extends AnyFunSuite with BeforeAndAfter { awaitResult(externalClient.close()) } } - - test("quit is supported") { - awaitResult(externalClient.get("foo")) // do nothing - awaitResult(externalClient.quit()) - intercept[ServiceClosedException] { - awaitResult(externalClient.get("foo")) - } - } - } diff --git a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/SimpleClientTest.scala b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/SimpleClientTest.scala deleted file mode 100644 index 8105a4b9918..00000000000 --- a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/SimpleClientTest.scala +++ /dev/null @@ -1,273 +0,0 @@ -package com.twitter.finagle.memcached.integration - -import com.twitter.conversions.DurationOps._ -import com.twitter.finagle.Address -import com.twitter.finagle.Memcached -import com.twitter.finagle.Name -import com.twitter.finagle.param -import com.twitter.finagle.memcached.Client -import com.twitter.finagle.memcached.integration.external.TestMemcachedServer -import com.twitter.finagle.memcached.protocol._ -import com.twitter.finagle.partitioning.zk.ZkMetadata -import com.twitter.finagle.stats.SummarizingStatsReceiver -import com.twitter.finagle.tracing._ -import com.twitter.io.Buf -import com.twitter.util.registry.Entry -import com.twitter.util.registry.GlobalRegistry -import com.twitter.util.registry.SimpleRegistry -import com.twitter.util.Await -import com.twitter.util.Awaitable -import java.net.InetSocketAddress -import org.mockito.ArgumentCaptor -import org.mockito.ArgumentMatchers.any -import org.mockito.Mockito.atLeastOnce -import org.mockito.Mockito.spy -import org.mockito.Mockito.verify -import org.mockito.Mockito.when -import org.scalatest.BeforeAndAfter -import org.scalatest.Outcome -import scala.collection.JavaConverters._ -import org.scalatest.funsuite.AnyFunSuite - -class SimpleClientTest extends AnyFunSuite with BeforeAndAfter { - - /** - * Note: This integration test requires a real Memcached server to run. - */ - private var client: Client = null - private var testServer: Option[TestMemcachedServer] = None - - private val stats = new SummarizingStatsReceiver - - private def awaitResult[T](awaitable: Awaitable[T]): T = Await.result(awaitable, 5.seconds) - - private val baseClient: Memcached.Client = Memcached.client - - before { - testServer = TestMemcachedServer.start() - if (testServer.isDefined) - client = newClient(baseClient) - } - - after { - if (testServer.isDefined) - testServer map { _.stop() } - } - - def newClient(baseClient: Memcached.Client): Client = { - // we add zk metadata to with a shard_id 0 - val address = Address.Inet( - testServer.get.address.asInstanceOf[InetSocketAddress], - ZkMetadata.toAddrMetadata(ZkMetadata(Some(0))) - ) - - val service = baseClient - .withStatsReceiver(stats) - .connectionsPerEndpoint(1) - .newService(Name.bound(address), "memcache") - Client(service) - } - - override def withFixture(test: NoArgTest): Outcome = { - if (testServer.isDefined) test() - else { - info("Cannot start memcached. Skipping test...") - cancel() - } - } - - def withExpectedTraces(f: Client => Unit, expected: Seq[Annotation]): Unit = { - val tracer = spy(new NullTracer) - when(tracer.isActivelyTracing(any[TraceId])).thenReturn(true) - when(tracer.isNull).thenReturn(false) - val captor: ArgumentCaptor[Record] = ArgumentCaptor.forClass(classOf[Record]) - - val client = newClient(baseClient.configured(param.Tracer(tracer))) - f(client) - verify(tracer, atLeastOnce()).record(captor.capture()) - val annotations = captor.getAllValues.asScala collect { case Record(_, _, a, _) => a } - assert(expected.filterNot(annotations.contains(_)).isEmpty) - } - - test("set & get") { - awaitResult(client.delete("foo")) - assert(awaitResult(client.get("foo")) == None) - awaitResult(client.set("foo", Buf.Utf8("bar"))) - assert(awaitResult(client.get("foo")).get == Buf.Utf8("bar")) - } - - test("get") { - awaitResult(client.set("foo", Buf.Utf8("bar"))) - awaitResult(client.set("baz", Buf.Utf8("boing"))) - val result = awaitResult(client.get(Seq("foo", "baz", "notthere"))) - .map { case (key, Buf.Utf8(value)) => (key, value) } - assert( - result == Map( - "foo" -> "bar", - "baz" -> "boing" - ) - ) - } - - if (Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) { - test("gets") { - assert(awaitResult(client.gets("foos")).isEmpty) - awaitResult(client.set("foos", Buf.Utf8("xyz"))) - awaitResult(client.set("bazs", Buf.Utf8("xyz"))) - awaitResult(client.set("bazs", Buf.Utf8("zyx"))) - val result = awaitResult(client.gets(Seq("foos", "bazs", "somethingelse"))) - .map { - case (key, (Buf.Utf8(value), Buf.Utf8(casUnique))) => - (key, (value, casUnique)) - } - - assert( - result == Map( - "foos" -> ( - ( - "xyz", - "2" - ) - ), // the "cas unique" values are predictable from a fresh memcached - "bazs" -> (("zyx", "4")) - ) - ) - } - } - - if (Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) { - test("cas") { - awaitResult(client.set("x", Buf.Utf8("y"))) - val Some((value, casUnique)) = awaitResult(client.gets("x")) - assert(value == Buf.Utf8("y")) - assert(casUnique == Buf.Utf8("2")) - - assert(!awaitResult(client.checkAndSet("x", Buf.Utf8("z"), Buf.Utf8("1")).map(_.replaced))) - assert( - awaitResult(client.checkAndSet("x", Buf.Utf8("z"), casUnique).map(_.replaced)).booleanValue - ) - val res = awaitResult(client.get("x")) - assert(res.isDefined) - assert(res.get == Buf.Utf8("z")) - } - } - - test("append & prepend") { - awaitResult(client.set("foo", Buf.Utf8("bar"))) - awaitResult(client.append("foo", Buf.Utf8("rab"))) - val Buf.Utf8(res) = awaitResult(client.get("foo")).get - assert(res == "barrab") - awaitResult(client.prepend("foo", Buf.Utf8("rab"))) - val Buf.Utf8(res2) = awaitResult(client.get("foo")).get - assert(res2 == "rabbarrab") - } - - test("incr & decr") { - // As of memcached 1.4.8 (issue 221), empty values are no longer treated as integers - awaitResult(client.set("foo", Buf.Utf8("0"))) - assert(awaitResult(client.incr("foo")) == Some(1L)) - assert(awaitResult(client.incr("foo", 2)) == Some(3L)) - assert(awaitResult(client.decr("foo")) == Some(2L)) - - awaitResult(client.set("foo", Buf.Utf8("0"))) - assert(awaitResult(client.incr("foo")) == Some(1L)) - val l = 1L << 50 - assert(awaitResult(client.incr("foo", l)) == Some(l + 1L)) - assert(awaitResult(client.decr("foo")) == Some(l)) - assert(awaitResult(client.decr("foo", l)) == Some(0L)) - } - - if (Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) { - test("stats") { - val stats = awaitResult(client.stats()) - assert(stats != null) - assert(!stats.isEmpty) - stats.foreach { stat => assert(stat.startsWith("STAT")) } - } - } - - test("send malformed keys") { - // test key validation trait - intercept[ClientError] { awaitResult(client.get("fo o")) } - intercept[ClientError] { awaitResult(client.set("", Buf.Utf8("bar"))) } - intercept[ClientError] { awaitResult(client.get(" foo")) } - intercept[ClientError] { awaitResult(client.get("foo ")) } - intercept[ClientError] { awaitResult(client.get(" foo")) } - intercept[ClientError] { awaitResult(client.get(null: String)) } - intercept[ClientError] { awaitResult(client.set(null: String, Buf.Utf8("bar"))) } - intercept[ClientError] { awaitResult(client.set(" ", Buf.Utf8("bar"))) } - - try { awaitResult(client.set("\t", Buf.Utf8("bar"))) } - catch { - case _: ClientError => fail("\t is allowed") - } - - intercept[ClientError] { awaitResult(client.set("\r", Buf.Utf8("bar"))) } - intercept[ClientError] { awaitResult(client.set("\n", Buf.Utf8("bar"))) } - intercept[ClientError] { awaitResult(client.set("\u0000", Buf.Utf8("bar"))) } - - val veryLongKey = - "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" - intercept[ClientError] { awaitResult(client.get(veryLongKey)) } - intercept[ClientError] { awaitResult(client.set(veryLongKey, Buf.Utf8("bar"))) } - - // test other keyed command validation - intercept[ClientError] { awaitResult(client.get(null: Seq[String])) } - intercept[ClientError] { awaitResult(client.gets(null: Seq[String])) } - intercept[ClientError] { awaitResult(client.gets(Seq(null))) } - intercept[ClientError] { awaitResult(client.gets(Seq(""))) } - intercept[ClientError] { awaitResult(client.gets(Seq("foos", "bad key", "somethingelse"))) } - intercept[ClientError] { awaitResult(client.append("bad key", Buf.Utf8("rab"))) } - intercept[ClientError] { awaitResult(client.prepend("bad key", Buf.Utf8("rab"))) } - intercept[ClientError] { awaitResult(client.replace("bad key", Buf.Utf8("bar"))) } - intercept[ClientError] { awaitResult(client.add("bad key", Buf.Utf8("2"))) } - intercept[ClientError] { - awaitResult(client.checkAndSet("bad key", Buf.Utf8("z"), Buf.Utf8("2"))) - } - intercept[ClientError] { awaitResult(client.incr("bad key")) } - intercept[ClientError] { awaitResult(client.decr("bad key")) } - intercept[ClientError] { awaitResult(client.delete("bad key")) } - } - - test("Push client uses Netty4PushTransporter") { - val simple = new SimpleRegistry() - val address = Address(testServer.get.address.asInstanceOf[InetSocketAddress]) - GlobalRegistry.withRegistry(simple) { - val client = Memcached.client.newService(Name.bound(address), "memcache") - client(Quit()) - val entries = simple.toSet - assert( - entries.contains( - Entry(Seq("client", "memcached", "memcache", "Transporter"), "Netty4PushTransporter") - ) - ) - } - } - - test("annotates the total number of hits and misses") { - awaitResult(client.set("foo", Buf.Utf8("bar"))) - awaitResult(client.set("bar", Buf.Utf8("baz"))) - withExpectedTraces( - c => { - // add a missing key - awaitResult(c.getResult(Seq("foo", "bar", "themissingkey"))) - }, - Seq( - Annotation.BinaryAnnotation("clnt/memcached.hits", 2), - Annotation.BinaryAnnotation("clnt/memcached.misses", 1) - ) - ) - } - - test("annotates the shard id of the endpoint") { - awaitResult(client.set("foo", Buf.Utf8("bar"))) - withExpectedTraces( - c => { - awaitResult(c.get("foo")) - }, - Seq( - Annotation.BinaryAnnotation("clnt/memcached.shard_id", 0) - ) - ) - } -} diff --git a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/TwemcacheClientAPITest.scala b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/TwemcacheClientAPITest.scala new file mode 100644 index 00000000000..45ec9ee6909 --- /dev/null +++ b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/TwemcacheClientAPITest.scala @@ -0,0 +1,24 @@ +package com.twitter.finagle.memcached.integration +import com.twitter.finagle.Memcached +import com.twitter.finagle.Name +import com.twitter.finagle.memcached.Client +import com.twitter.finagle.memcached.GetResult +import com.twitter.finagle.memcached.GetsResult + +class TwemcacheClientAPITest extends ClientAPITest { + + override protected def mkClient(dest: Name): Client = { + Memcached.client + .connectionsPerEndpoint(1) + .newRichClient(dest, "foo") + } + + test("empty key sequence") { + assert(awaitResult(client.get(Seq.empty)).isEmpty) + assert(awaitResult(client.gets(Seq.empty)).isEmpty) + assert(awaitResult(client.getWithFlag(Seq.empty)).isEmpty) + assert(awaitResult(client.getsWithFlag(Seq.empty)).isEmpty) + assert(awaitResult(client.getResult(Seq.empty)) == GetResult.Empty) + assert(awaitResult(client.getsResult(Seq.empty)) == GetsResult(GetResult.Empty)) + } +} diff --git a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/external/ExternalMemcached.scala b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/external/ExternalMemcached.scala index 0084bf76c2a..713c6cf2af5 100644 --- a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/external/ExternalMemcached.scala +++ b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/external/ExternalMemcached.scala @@ -8,53 +8,51 @@ import java.net.BindException import java.net.InetAddress import java.net.InetSocketAddress import java.net.ServerSocket -import scala.jdk.CollectionConverters import scala.collection._ -import scala.collection.immutable.Stream -import scala.util.control.NonFatal - -object TestMemcachedServer { - def start(): Option[TestMemcachedServer] = start(None) - - def start(address: Option[InetSocketAddress]): Option[TestMemcachedServer] = { - Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")) match { - case Some(externalMemcachedPath) => - ExternalMemcached.start(address, externalMemcachedPath) - case None => - InternalMemcached.start(address) + +object MemcachedServer { + + def start(): MemcachedServer = { + if (ExternalMemcached.use()) { + ExternalMemcached.start() + } else { + InternalMemcached.start() } } } -trait TestMemcachedServer { +trait MemcachedServer { val address: InetSocketAddress def stop(): Unit } private[memcached] object InternalMemcached { - def start(address: Option[InetSocketAddress]): Option[TestMemcachedServer] = { - try { - val server = new InProcessMemcached( - address.getOrElse(new InetSocketAddress(InetAddress.getLoopbackAddress, 0)) - ) - Some(new TestMemcachedServer { - val address = server.start().boundAddress.asInstanceOf[InetSocketAddress] - def stop(): Unit = { server.stop(true) } - }) - } catch { - case NonFatal(_) => None + def start(): MemcachedServer = { + val server = new InProcessMemcached( + new InetSocketAddress(InetAddress.getLoopbackAddress, 0) + ) + new MemcachedServer { + val address = server.start().boundAddress.asInstanceOf[InetSocketAddress] + def stop(): Unit = { server.stop(true) } } } } -private[memcached] object ExternalMemcached { self => - class MemcachedBinaryNotFound extends Exception +private[memcached] object ExternalMemcached { private[this] var processes: List[Process] = List() private[this] val forbiddenPorts = 11000.until(11900) private[this] var takenPorts: Set[Int] = Set[Int]() // prevent us from taking a port that is anything close to a real memcached port. - private[this] def findAddress() = { + def use(): Boolean = { + externalMemcachedPath().nonEmpty + } + + private[this] def externalMemcachedPath(): Option[String] = { + Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")) + } + + private[this] def findAddress(): InetSocketAddress = { var address: Option[InetSocketAddress] = None var tries = 100 while (address == None && tries >= 0) { @@ -66,45 +64,35 @@ private[memcached] object ExternalMemcached { self => Thread.sleep(5) } } - if (address == None) sys.error("Couldn't get an address for the external memcached") - - takenPorts += address - .getOrElse( - new InetSocketAddress(InetAddress.getLoopbackAddress, 0) - ) - .getPort - address + if (address == None) throw new Exception("Couldn't get an address for the external memcached") + + takenPorts += address.get.getPort + address.get } - def start( - address: Option[InetSocketAddress], - externalMemcachedPath: String - ): Option[TestMemcachedServer] = { + def start(): MemcachedServer = { def exec(address: InetSocketAddress): Process = { val cmd = - List(externalMemcachedPath, "-l", address.getHostName, "-p", address.getPort.toString) + List(externalMemcachedPath().get, "-l", address.getHostName, "-p", address.getPort.toString) val builder = new ProcessBuilder(cmd: _*) builder.start() } - (address orElse findAddress()) flatMap { addr => - try { - val proc = exec(addr) - processes :+= proc - - if (waitForPort(addr.getPort)) - Some(new TestMemcachedServer { - val address = addr - def stop(): Unit = { - proc.destroy() - proc.waitFor() - } - }) - else - None - } catch { - case _: Throwable => None + val addr = findAddress() + val proc = exec(addr) + processes :+= proc + + if (waitForPort(addr.getPort)) { + new MemcachedServer { + val address = addr + + def stop(): Unit = { + proc.destroy() + proc.waitFor() + } } + } else { + throw new Exception("Timed out waiting for external memcached to start") } } diff --git a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/stress/InterpreterServiceTest.scala b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/stress/InterpreterServiceTest.scala deleted file mode 100644 index a29ebb45143..00000000000 --- a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/stress/InterpreterServiceTest.scala +++ /dev/null @@ -1,56 +0,0 @@ -package com.twitter.finagle.memcached.stress - -import com.twitter.conversions.DurationOps._ -import com.twitter.finagle.Address -import com.twitter.finagle.Memcached -import com.twitter.finagle.Name -import com.twitter.finagle.Service -import com.twitter.finagle.memcached.integration.external.InProcessMemcached -import com.twitter.finagle.memcached.protocol._ -import com.twitter.io.Buf -import com.twitter.util.{Await, Awaitable, Time} -import java.net.{InetAddress, InetSocketAddress} -import org.scalatest.BeforeAndAfter -import org.scalatest.funsuite.AnyFunSuite - -class InterpreterServiceTest extends AnyFunSuite with BeforeAndAfter { - - val TimeOut = 15.seconds - - private def awaitResult[T](awaitable: Awaitable[T]): T = Await.result(awaitable, TimeOut) - - var server: InProcessMemcached = null - var client: Service[Command, Response] = null - - before { - server = new InProcessMemcached(new InetSocketAddress(InetAddress.getLoopbackAddress, 0)) - val address = Address(server.start().boundAddress.asInstanceOf[InetSocketAddress]) - client = Memcached.client - .connectionsPerEndpoint(1) - .newService(Name.bound(address), "memcache") - } - - after { - server.stop() - } - - test("set & get") { - val _key = "key" - val value = Buf.Utf8("value") - val zero = Buf.Utf8("0") - val start = System.currentTimeMillis - (0 until 100) map { i => - val key = _key + i - awaitResult(client(Delete(Buf.Utf8(key)))) - awaitResult(client(Set(Buf.Utf8(key), 0, Time.epoch, value))) - assert( - awaitResult(client(Get(Seq(Buf.Utf8(key))))) == Values( - Seq(Value(Buf.Utf8(key), value, None, Some(zero))) - ) - ) - } - val end = System.currentTimeMillis - // println("%d ms".format(end - start)) - } - -} diff --git a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/unit/ConnectedClientTest.scala b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/unit/ClientCheckAndSetTest.scala similarity index 82% rename from finagle-memcached/src/test/scala/com/twitter/finagle/memcached/unit/ConnectedClientTest.scala rename to finagle-memcached/src/test/scala/com/twitter/finagle/memcached/unit/ClientCheckAndSetTest.scala index cb231381139..50b8522d78d 100644 --- a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/unit/ConnectedClientTest.scala +++ b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/unit/ClientCheckAndSetTest.scala @@ -13,7 +13,7 @@ import org.mockito.Mockito._ import org.scalatestplus.mockito.MockitoSugar import org.scalatest.funsuite.AnyFunSuite -class ConnectedClientTest extends AnyFunSuite with MockitoSugar { +class ClientCheckAndSetTest extends AnyFunSuite with MockitoSugar { val TimeOut = 15.seconds @@ -25,26 +25,18 @@ class ConnectedClientTest extends AnyFunSuite with MockitoSugar { val key = "key" val value = Buf.Utf8("value") - test("cas correctly responds to return states of the service") { - when(service.apply(any[Command])).thenReturn(Future.value(Stored)) - assert(awaitResult(client.checkAndSet(key, value, casUnique).map(_.replaced))) - - when(service.apply(any[Command])).thenReturn(Future.value(Exists)) - assert(!awaitResult(client.checkAndSet(key, value, casUnique).map(_.replaced))) - - when(service.apply(any[Command])).thenReturn(Future.value(NotFound)) - assert(!awaitResult(client.checkAndSet(key, value, casUnique).map(_.replaced))) - } - test("checkAndSet correctly responds to return states of the service") { when(service.apply(any[Command])).thenReturn(Future.value(Stored)) assert(awaitResult(client.checkAndSet(key, value, casUnique)) == CasResult.Stored) + assert(awaitResult(client.checkAndSet(key, value, casUnique)).replaced) when(service.apply(any[Command])).thenReturn(Future.value(Exists)) assert(awaitResult(client.checkAndSet(key, value, casUnique)) == CasResult.Exists) + assert(!awaitResult(client.checkAndSet(key, value, casUnique)).replaced) when(service.apply(any[Command])).thenReturn(Future.value(NotFound)) assert(awaitResult(client.checkAndSet(key, value, casUnique)) == CasResult.NotFound) + assert(!awaitResult(client.checkAndSet(key, value, casUnique)).replaced) } test("checkAndSet correctly responds to the error states of the service") { diff --git a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/unit/util/MemcachedTest.scala b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/unit/ClientConfigurationTest.scala similarity index 87% rename from finagle-memcached/src/test/scala/com/twitter/finagle/memcached/unit/util/MemcachedTest.scala rename to finagle-memcached/src/test/scala/com/twitter/finagle/memcached/unit/ClientConfigurationTest.scala index 23cfa221287..bfe5f28eb80 100644 --- a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/unit/util/MemcachedTest.scala +++ b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/unit/ClientConfigurationTest.scala @@ -1,23 +1,31 @@ -package com.twitter.finagle.memcached.unit.util +package com.twitter.finagle.memcached.unit import com.twitter.conversions.DurationOps._ import com.twitter.finagle._ import com.twitter.finagle.client.Transporter import com.twitter.finagle.factory.TimeoutFactory import com.twitter.finagle.filter.NackAdmissionFilter -import com.twitter.finagle.liveness.{FailureAccrualFactory, FailureAccrualPolicy} -import com.twitter.finagle.memcached.{Client, TwemcacheClient} -import com.twitter.finagle.pool.BalancingPool +import com.twitter.finagle.liveness.FailureAccrualFactory +import com.twitter.finagle.liveness.FailureAccrualPolicy +import com.twitter.finagle.memcached.Client +import com.twitter.finagle.memcached.TwemcacheClient import com.twitter.finagle.param.Stats import com.twitter.finagle.partitioning.{param => pparam} +import com.twitter.finagle.pool.BalancingPool import com.twitter.finagle.service._ import com.twitter.finagle.stats.InMemoryStatsReceiver -import com.twitter.util.{Await, Time} -import org.scalatest.concurrent.{Eventually, IntegrationPatience} -import org.scalatestplus.mockito.MockitoSugar +import com.twitter.util.Await +import com.twitter.util.Time +import org.scalatest.concurrent.Eventually +import org.scalatest.concurrent.IntegrationPatience import org.scalatest.funsuite.AnyFunSuite +import org.scalatestplus.mockito.MockitoSugar -class MemcachedTest extends AnyFunSuite with MockitoSugar with Eventually with IntegrationPatience { +class ClientConfigurationTest + extends AnyFunSuite + with MockitoSugar + with Eventually + with IntegrationPatience { protected def baseClient: Memcached.Client = Memcached.client diff --git a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/unit/CompressingMemcachedFilterTest.scala b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/unit/CompressingMemcachedFilterTest.scala index f7159f26635..6bd1c704b84 100644 --- a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/unit/CompressingMemcachedFilterTest.scala +++ b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/unit/CompressingMemcachedFilterTest.scala @@ -42,283 +42,282 @@ class CompressingMemcachedFilterTest stats.clear() } + test("Add is correctly processed and compressed") { + + val requestFlag = 0 + val requestKey = Buf.Utf8("key") + val addRequest = + Add(key = requestKey, flags = 0, expiry = Time.Top, value = alwaysCompress) + + val factory = + CompressionProvider(Lz4, NullStatsReceiver) + + val (compressionFlags, expectedBuf) = factory.compressor(alwaysCompress) + + val expectedFlags = + CompressionScheme.flagsWithCompression(requestFlag, compressionFlags) + + val service = mock[Service[Command, Response]] + when(service.apply(any[Add])).thenAnswer { request => + val addArg = request.getArgument[Add](0) + Future.value( + Values(values = Seq( + Value( + key = addArg.key, + value = addArg.value, + flags = Some(Buf.Utf8(addArg.flags.toString)))))) + } - test("Add is correctly processed and compressed") { + awaitResult(compressingFilter(addRequest, service)) match { + case Values(values) => + val value = values.head + assert(value.key === requestKey) + assert(value.value === expectedBuf) + assert(value.flags.exists { resultFlag => + Buf.Utf8.unapply(resultFlag).map(_.toInt).contains(expectedFlags) + }) + case resp => throw new IllegalStateException(s"$resp") + } - val requestFlag = 0 - val requestKey = Buf.Utf8("key") - val addRequest = - Add(key = requestKey, flags = 0, expiry = Time.Top, value = alwaysCompress) + assert(stats.counters(Seq("lz4", "compression", "attempted")) == 1) + assert(stats.counters(Seq("lz4", "compression", "skipped")) == 0) + } - val factory = - CompressionProvider(Lz4, NullStatsReceiver) + test("Set is correctly processed and compressed") { - val (compressionFlags, expectedBuf) = factory.compressor(alwaysCompress) + val requestFlag = 0 + val requestKey = Buf.Utf8("key") + val setRequest = + Set(key = requestKey, flags = 0, expiry = Time.Top, value = alwaysCompress) - val expectedFlags = - CompressionScheme.flagsWithCompression(requestFlag, compressionFlags) + val factory = + CompressionProvider(Lz4, NullStatsReceiver) - val service = mock[Service[Command, Response]] - when(service.apply(any[Add])).thenAnswer { request => - val addArg = request.getArgument[Add](0) - Future.value( - Values(values = Seq( - Value( - key = addArg.key, - value = addArg.value, - flags = Some(Buf.Utf8(addArg.flags.toString)))))) - } + val (compressionFlags, expectedBuf) = factory.compressor(alwaysCompress) - awaitResult(compressingFilter(addRequest, service)) match { - case Values(values) => - val value = values.head - assert(value.key === requestKey) - assert(value.value === expectedBuf) - assert(value.flags.exists { resultFlag => - Buf.Utf8.unapply(resultFlag).map(_.toInt).contains(expectedFlags) - }) - case resp => throw new IllegalStateException(s"$resp") - } + val expectedFlags = + CompressionScheme.flagsWithCompression(requestFlag, compressionFlags) + + val service = mock[Service[Command, Response]] + + when(service.apply(any[Set])).thenAnswer { request => + val setArg = request.getArgument[Set](0) + Future.value( + Values(values = Seq( + Value( + key = setArg.key, + value = setArg.value, + flags = Some(Buf.Utf8(setArg.flags.toString)))))) + } - assert(stats.counters(Seq("lz4", "compression", "attempted")) == 1) - assert(stats.counters(Seq("lz4", "compression", "skipped")) == 0) + awaitResult(compressingFilter(setRequest, service)) match { + case Values(values) => + val value = values.head + assert(value.key === requestKey) + assert(value.value === expectedBuf) + assert(value.flags.exists { resultFlag => + Buf.Utf8.unapply(resultFlag).map(_.toInt).contains(expectedFlags) + }) + case resp => throw new IllegalStateException(s"$resp") } - test("Set is correctly processed and compressed") { + assert(stats.counters(Seq("lz4", "compression", "attempted")) == 1) + assert(!stats.counters.contains(Seq("lz4", "compression", "skipped"))) + } - val requestFlag = 0 - val requestKey = Buf.Utf8("key") - val setRequest = - Set(key = requestKey, flags = 0, expiry = Time.Top, value = alwaysCompress) + test("Check And Set is correctly processed and compressed") { + + val requestFlag = 0 + val requestKey = Buf.Utf8("key") + val casUniqueKey = Buf.Utf8("cas") + val casRequest = + Cas( + key = requestKey, + flags = 0, + expiry = Time.Top, + value = alwaysCompress, + casUnique = casUniqueKey) + + val factory = + CompressionProvider(Lz4, NullStatsReceiver) + + val (compressionFlags, expectedBuf) = factory.compressor(alwaysCompress) + + val expectedFlags = + CompressionScheme.flagsWithCompression(requestFlag, compressionFlags) + + val service = mock[Service[Command, Response]] + + when(service.apply(any[Cas])).thenAnswer { request => + val casArg = request.getArgument[Cas](0) + Future.value( + Values(values = Seq( + Value( + key = casArg.key, + value = casArg.value, + flags = Some(Buf.Utf8(casArg.flags.toString)), + casUnique = Some(casUniqueKey))))) + } - val factory = - CompressionProvider(Lz4, NullStatsReceiver) + awaitResult(compressingFilter(casRequest, service)) match { + case Values(values) => + val value = values.head + assert(value.key === requestKey) + assert(value.value === expectedBuf) + assert(value.flags.exists { resultFlag => + Buf.Utf8.unapply(resultFlag).map(_.toInt).contains(expectedFlags) + }) + assert(value.casUnique.contains(casUniqueKey)) + case resp => throw new IllegalStateException(s"$resp") + } - val (compressionFlags, expectedBuf) = factory.compressor(alwaysCompress) + assert(stats.counters(Seq("lz4", "compression", "attempted")) == 1) + assert(!stats.counters.contains(Seq("lz4", "compression", "skipped"))) + } - val expectedFlags = - CompressionScheme.flagsWithCompression(requestFlag, compressionFlags) + test("throw error for unsupported commands") { - val service = mock[Service[Command, Response]] + val requestFlag = 0 + val requestKey = Buf.Utf8("key") + val requestValue = Buf.Utf8("Some Value") - when(service.apply(any[Set])).thenAnswer { request => - val setArg = request.getArgument[Set](0) - Future.value( - Values(values = Seq( - Value( - key = setArg.key, - value = setArg.value, - flags = Some(Buf.Utf8(setArg.flags.toString)))))) - } + val storageCommands: TableFor1[StorageCommand] = Table( + "request", + Append(key = requestKey, flags = requestFlag, expiry = Time.Top, value = requestValue), + Prepend(key = requestKey, flags = requestFlag, expiry = Time.Top, value = requestValue), + Replace(key = requestKey, flags = requestFlag, expiry = Time.Top, value = requestValue), + ) - awaitResult(compressingFilter(setRequest, service)) match { - case Values(values) => - val value = values.head - assert(value.key === requestKey) - assert(value.value === expectedBuf) - assert(value.flags.exists { resultFlag => - Buf.Utf8.unapply(resultFlag).map(_.toInt).contains(expectedFlags) - }) - case resp => throw new IllegalStateException(s"$resp") - } + val service = mock[Service[Command, Response]] - assert(stats.counters(Seq("lz4", "compression", "attempted")) == 1) - assert(!stats.counters.contains(Seq("lz4", "compression", "skipped"))) + forAll(storageCommands) { request: StorageCommand => + val error = + intercept[UnsupportedOperationException](awaitResult(compressingFilter(request, service))) + + assert(error.getMessage == s"${request.name} is unsupported for compressing cache") } + } - test("Check And Set is correctly processed and compressed") { + test("Retrieval Command Values are correctly processed and decompressed") { - val requestFlag = 0 - val requestKey = Buf.Utf8("key") - val casUniqueKey = Buf.Utf8("cas") - val casRequest = - Cas( - key = requestKey, - flags = 0, - expiry = Time.Top, - value = alwaysCompress, - casUnique = casUniqueKey) + val requestFlag = 0 + val requestKey = Buf.Utf8("key") - val factory = - CompressionProvider(Lz4, NullStatsReceiver) + val factory = + CompressionProvider(Lz4, NullStatsReceiver) - val (compressionFlags, expectedBuf) = factory.compressor(alwaysCompress) + val (compressionFlags, compressedBuf) = factory.compressor(alwaysCompress) - val expectedFlags = - CompressionScheme.flagsWithCompression(requestFlag, compressionFlags) + val valueFlags = + CompressionScheme.flagsWithCompression(requestFlag, compressionFlags) - val service = mock[Service[Command, Response]] + val service = mock[Service[Command, Response]] - when(service.apply(any[Cas])).thenAnswer { request => - val casArg = request.getArgument[Cas](0) + val retrievalCommandTable = + Table("retrievalCommand", Get(keys = Seq(requestKey)), Gets(keys = Seq(requestKey))) + + forAll(retrievalCommandTable) { retrievalCommand => + when(service.apply(retrievalCommand)).thenReturn { Future.value( Values(values = Seq( Value( - key = casArg.key, - value = casArg.value, - flags = Some(Buf.Utf8(casArg.flags.toString)), - casUnique = Some(casUniqueKey))))) + key = requestKey, + value = compressedBuf, + flags = Some(Buf.Utf8(valueFlags.toString)), + casUnique = None)))) } - awaitResult(compressingFilter(casRequest, service)) match { + awaitResult(compressingFilter(retrievalCommand, service)) match { case Values(values) => val value = values.head assert(value.key === requestKey) - assert(value.value === expectedBuf) + assert(value.value === alwaysCompress) assert(value.flags.exists { resultFlag => - Buf.Utf8.unapply(resultFlag).map(_.toInt).contains(expectedFlags) + Buf.Utf8.unapply(resultFlag).map(_.toInt).contains(valueFlags) }) - assert(value.casUnique.contains(casUniqueKey)) case resp => throw new IllegalStateException(s"$resp") } - - assert(stats.counters(Seq("lz4", "compression", "attempted")) == 1) - assert(!stats.counters.contains(Seq("lz4", "compression", "skipped"))) } - test("throw error for unsupported commands") { - - val requestFlag = 0 - val requestKey = Buf.Utf8("key") - val requestValue = Buf.Utf8("Some Value") - - val storageCommands: TableFor1[StorageCommand] = Table( - "request", - Append(key = requestKey, flags = requestFlag, expiry = Time.Top, value = requestValue), - Prepend(key = requestKey, flags = requestFlag, expiry = Time.Top, value = requestValue), - Replace(key = requestKey, flags = requestFlag, expiry = Time.Top, value = requestValue), - ) - - val service = mock[Service[Command, Response]] - - forAll(storageCommands) { request: StorageCommand => - val error = - intercept[UnsupportedOperationException](awaitResult(compressingFilter(request, service))) - - assert(error.getMessage == s"${request.name} is unsupported for compressing cache") - } - } + assert(stats.counters(Seq("lz4", "decompression", "attempted")) == 2) + } - test("Retrieval Command Values are correctly processed and decompressed") { + test("Retrieval Command ValuesAndErrors are correctly processed and decompressed") { - val requestFlag = 0 - val requestKey = Buf.Utf8("key") + val requestFlag = 0 + val requestKey = Buf.Utf8("key") - val factory = - CompressionProvider(Lz4, NullStatsReceiver) + val factory = + CompressionProvider(Lz4, NullStatsReceiver) - val (compressionFlags, compressedBuf) = factory.compressor(alwaysCompress) + val (compressionFlags, compressedBuf) = factory.compressor(alwaysCompress) - val valueFlags = - CompressionScheme.flagsWithCompression(requestFlag, compressionFlags) + val valueFlags = + CompressionScheme.flagsWithCompression(requestFlag, compressionFlags) - val service = mock[Service[Command, Response]] + val service = mock[Service[Command, Response]] - val retrievalCommandTable = - Table("retrievalCommand", Get(keys = Seq(requestKey)), Gets(keys = Seq(requestKey))) + val retrievalCommandTable = + Table("retrievalCommand", Get(keys = Seq(requestKey)), Gets(keys = Seq(requestKey))) - forAll(retrievalCommandTable) { retrievalCommand => - when(service.apply(retrievalCommand)).thenReturn { - Future.value( - Values(values = Seq( + forAll(retrievalCommandTable) { retrievalCommand => + when(service.apply(retrievalCommand)).thenReturn { + Future.value( + ValuesAndErrors( + values = Seq( Value( key = requestKey, value = compressedBuf, flags = Some(Buf.Utf8(valueFlags.toString)), - casUnique = None)))) - } - - awaitResult(compressingFilter(retrievalCommand, service)) match { - case Values(values) => - val value = values.head - assert(value.key === requestKey) - assert(value.value === alwaysCompress) - assert(value.flags.exists { resultFlag => - Buf.Utf8.unapply(resultFlag).map(_.toInt).contains(valueFlags) - }) - case resp => throw new IllegalStateException(s"$resp") - } + casUnique = None)), + errors = Map.empty)) } - assert(stats.counters(Seq("lz4", "decompression", "attempted")) == 2) + awaitResult(compressingFilter(retrievalCommand, service)) match { + case ValuesAndErrors(values, _) => + val value = values.head + assert(value.key === requestKey) + assert(value.value === alwaysCompress) + assert(value.flags.exists { resultFlag => + Buf.Utf8.unapply(resultFlag).map(_.toInt).contains(valueFlags) + }) + case resp => throw new IllegalStateException(s"$resp") + } } - test("Retrieval Command ValuesAndErrors are correctly processed and decompressed") { - - val requestFlag = 0 - val requestKey = Buf.Utf8("key") - - val factory = - CompressionProvider(Lz4, NullStatsReceiver) - - val (compressionFlags, compressedBuf) = factory.compressor(alwaysCompress) - - val valueFlags = - CompressionScheme.flagsWithCompression(requestFlag, compressionFlags) - - val service = mock[Service[Command, Response]] - - val retrievalCommandTable = - Table("retrievalCommand", Get(keys = Seq(requestKey)), Gets(keys = Seq(requestKey))) - - forAll(retrievalCommandTable) { retrievalCommand => - when(service.apply(retrievalCommand)).thenReturn { - Future.value( - ValuesAndErrors( - values = Seq( - Value( - key = requestKey, - value = compressedBuf, - flags = Some(Buf.Utf8(valueFlags.toString)), - casUnique = None)), - errors = Map.empty)) - } - - awaitResult(compressingFilter(retrievalCommand, service)) match { - case ValuesAndErrors(values, _) => - val value = values.head - assert(value.key === requestKey) - assert(value.value === alwaysCompress) - assert(value.flags.exists { resultFlag => - Buf.Utf8.unapply(resultFlag).map(_.toInt).contains(valueFlags) - }) - case resp => throw new IllegalStateException(s"$resp") - } - } + assert(stats.counters(Seq("lz4", "decompression", "attempted")) == 2) + } - assert(stats.counters(Seq("lz4", "decompression", "attempted")) == 2) - } + test("Retrieval Command ValuesAndErrors stores errors correctly") { + // Insert data with the wrong flags, so that it fails when it tries to + // decompress the data. + val wrongFlags = 12345678 + val requestKey = Buf.Utf8("key") + + val service = mock[Service[Command, Response]] + + val retrievalCommandTable = + Table("retrievalCommand", Get(keys = Seq(requestKey)), Gets(keys = Seq(requestKey))) - test("Retrieval Command ValuesAndErrors stores errors correctly") { - // Insert data with the wrong flags, so that it fails when it tries to - // decompress the data. - val wrongFlags = 12345678 - val requestKey = Buf.Utf8("key") - - val service = mock[Service[Command, Response]] - - val retrievalCommandTable = - Table("retrievalCommand", Get(keys = Seq(requestKey)), Gets(keys = Seq(requestKey))) - - forAll(retrievalCommandTable) { retrievalCommand => - when(service.apply(retrievalCommand)).thenReturn { - Future.value( - ValuesAndErrors( - values = Seq( - Value( - key = requestKey, - value = alwaysCompress, - flags = Some(Buf.Utf8(wrongFlags.toString)), - casUnique = None)), - errors = Map.empty)) - } - - awaitResult(compressingFilter(retrievalCommand, service)) match { - case ValuesAndErrors(_, errors) => - assert(errors.size == 1) - assert(errors.contains(requestKey)) - case resp => throw new IllegalStateException(s"$resp") - } + forAll(retrievalCommandTable) { retrievalCommand => + when(service.apply(retrievalCommand)).thenReturn { + Future.value( + ValuesAndErrors( + values = Seq( + Value( + key = requestKey, + value = alwaysCompress, + flags = Some(Buf.Utf8(wrongFlags.toString)), + casUnique = None)), + errors = Map.empty)) + } + + awaitResult(compressingFilter(retrievalCommand, service)) match { + case ValuesAndErrors(_, errors) => + assert(errors.size == 1) + assert(errors.contains(requestKey)) + case resp => throw new IllegalStateException(s"$resp") } } + } } diff --git a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/unit/GetResultTest.scala b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/unit/GetResultTest.scala index 23a5d2c44f1..469a0c85974 100644 --- a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/unit/GetResultTest.scala +++ b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/unit/GetResultTest.scala @@ -50,21 +50,15 @@ class GetResultTest extends AnyFunSuite with MockitoSugar { } test("merged of empty seq produces empty GetResult") { - val context = new Context - assert(GetResult.merged(Seq[GetResult]()) == GetResult()) } test("merged of single item produces that item") { - val context = new Context - val getResult = GetResult() assert(GetResult.merged(Seq(getResult)) == getResult) } test("merge is the same as ++") { - val context = new Context - val subResults = (1 to 10) map { i => GetResult( hits = Map("h" + i -> mock[Value]),