Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS Elasticache autodiscovery support #572

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
205 changes: 205 additions & 0 deletions pymemcache/client/ext/aws_ec_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
import logging
import operator
import socket
import re
import time

from pymemcache import MemcacheUnknownCommandError
from pymemcache.client import Client
from pymemcache.client.base import normalize_server_spec
from pymemcache.client.hash import HashClient
from pymemcache.client.rendezvous import RendezvousHash


logger = logging.getLogger(__name__)

_RE_AWS_ENDPOINT = re.compile(
r"^(?:(?:[\w\d-]{0,61}[\w\d]\.)+[\w]{1,6}|\[(?:[\d]{1,3}\.){3}[\d]{1,3}\])\:\d{1,5}$"
)


class AWSElastiCacheHashClient(HashClient):
"""
This class is a subclass of HashClient and represents a client for interacting with an AWS ElastiCache cluster
using a hash-based algorithm for key distribution.

*Connection *

Supports version 1.4.14 or higher

Example:
>>> client = AWSElastiCacheServerlessClient('cluster.abcxyz.cfg.use1.cache.amazonaws.com')
"""

def __init__(
self,
cfg_node: object,
hasher: object = RendezvousHash,
serde: object = None,
serializer: object = None,
deserializer: object = None,
connect_timeout: object = None,
timeout: object = None,
no_delay: object = False,
socket_module: object = socket,
socket_keepalive: object = None,
key_prefix: object = b"",
max_pool_size: object = None,
pool_idle_timeout: object = 0,
lock_generator: object = None,
retry_attempts: object = 2,
retry_timeout: object = 1,
dead_timeout: object = 60,
use_pooling: object = False,
ignore_exc: object = False,
allow_unicode_keys: object = False,
default_noreply: object = True,
encoding: object = "ascii",
tls_context: object = None,
use_vpc: object = True,
) -> object:
"""
Constructor.

Args:
cfg_node: formatted string containing endpoint and port of the
ElastiCache cluster endpoint. Ex.:
`test-cluster.2os1zk.cfg.use1.cache.amazonaws.com:11211`
serde: optional serializer object, see notes in the class docs.
serializer: deprecated serialization function
deserializer: deprecated deserialization function
connect_timeout: optional float, seconds to wait for a connection to
the memcached server. Defaults to "forever" (uses the underlying
default socket timeout, which can be very long).
timeout: optional float, seconds to wait for send or recv calls on
the socket connected to memcached. Defaults to "forever" (uses the
underlying default socket timeout, which can be very long).
no_delay: optional bool, set the TCP_NODELAY flag, which may help
with performance in some cases. Defaults to False.
ignore_exc: optional bool, True to cause the "get", "gets",
"get_many" and "gets_many" calls to treat any errors as cache
misses. Defaults to False.
socket_module: socket module to use, e.g. gevent.socket. Defaults to
the standard library's socket module.
socket_keepalive: Activate the socket keepalive feature by passing
a KeepaliveOpts structure in this parameter. Disabled by default
(None). This feature is only supported on Linux platforms.
key_prefix: Prefix of key. You can use this as namespace. Defaults
to b''.
default_noreply: bool, the default value for 'noreply' as passed to
store commands (except from cas, incr, and decr, which default to
False).
allow_unicode_keys: bool, support unicode (utf8) keys
encoding: optional str, controls data encoding (defaults to 'ascii').
use_vpc: optional bool, if set False (defaults to True), the client
will use FQDN to connect to nodes instead of IP addresses. See
AWS Docs for extra info
https://docs.aws.amazon.com/AmazonElastiCache/latest/mem-ug/ClientConfig.DNS.html

Notes:
The constructor does not make a connection to memcached. The first
call to a method on the object will do that.
"""
if not (_RE_AWS_ENDPOINT.fullmatch(cfg_node) and isinstance(cfg_node, str)):
raise ValueError("Invalid AWS ElastiCache endpoint value '%s'" % cfg_node)

self._cfg_node = cfg_node
self.clients = {}
self.retry_attempts = retry_attempts
self.retry_timeout = retry_timeout
self.dead_timeout = dead_timeout
self.use_pooling = use_pooling
self.key_prefix = key_prefix
self.ignore_exc = ignore_exc
self.allow_unicode_keys = allow_unicode_keys
self._failed_clients = {}
self._dead_clients = {}
self._last_dead_check_time = time.time()

self.hasher = hasher()

self.default_kwargs = {
"connect_timeout": connect_timeout,
"timeout": timeout,
"no_delay": no_delay,
"socket_module": socket_module,
"socket_keepalive": socket_keepalive,
"key_prefix": key_prefix,
"serde": serde,
"serializer": serializer,
"deserializer": deserializer,
"allow_unicode_keys": allow_unicode_keys,
"default_noreply": default_noreply,
"encoding": encoding,
"tls_context": tls_context,
}

if use_pooling is True:
self.default_kwargs.update(
{
"max_pool_size": max_pool_size,
"pool_idle_timeout": pool_idle_timeout,
"lock_generator": lock_generator,
}
)

# server config returns as `[fqdn, ip, port]` if it's VPC installation you need to use ip
self._use_vpc = int(use_vpc)

self.reconfigure_nodes()

self.encoding = encoding
self.tls_context = tls_context

def reconfigure_nodes(self):
"""
Reconfigures the nodes in the server cluster based on the provided configuration node.

May useful on error handling during cluster scale down or scale up
"""
old_clients = self.clients.copy()
self.clients.clear()

for server in self._get_nodes_list():
self.add_server(normalize_server_spec(server))

for client in old_clients.values():
client.close()

def _get_nodes_list(self) -> list[tuple[str, int]]:
"""
Get the list of nodes from the cluster configuration.

Returns:
A list of tuples containing the address and port of each node in the cluster.
Each tuple has the format (address: str, port: int).
"""
addr, port = self._cfg_node.rsplit(":", maxsplit=1)
client = Client((addr, port), **self.default_kwargs)

# https://docs.aws.amazon.com/AmazonElastiCache/latest/mem-ug/AutoDiscovery.AddingToYourClientLibrary.html
try:
*_, config_line = client.raw_command(
b"config get cluster",
end_tokens=b"\n\r\nEND\r\n",
).splitlines()
except MemcacheUnknownCommandError:
logger.exception(
"Can't retrieve cluster configuration from '%s:%s' "
"Seems like it is ElastiCache Serverless or even isn't ElastiCache at all.",
client.server,
)
finally:
client.close()

servers = [
(server[self._use_vpc], server[2])
for server in map(
operator.methodcaller("split", "|"),
config_line.decode().split(" "),
)
]

logger.debug("Got the next nodes from cluster config: %s", servers)

return servers
84 changes: 84 additions & 0 deletions pymemcache/test/test_ext_aws_ec_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import pytest

from unittest.mock import MagicMock, patch

from pymemcache import Client
from pymemcache.client.ext.aws_ec_client import AWSElastiCacheHashClient

from .test_client import MockSocketModule, MockMemcacheClient


@pytest.mark.unit()
@pytest.mark.parametrize(
"connection_sting",
["cluster.abcxyz.cfg.use1.cache.amazonaws.com:11211", "1.1.1.1:11211"],
)
def test_init_valid_node_endpoint(connection_sting, monkeypatch):
with patch.object(
AWSElastiCacheHashClient, "reconfigure_nodes", new=MagicMock()
) as mock:
client = AWSElastiCacheHashClient(
connection_sting, socket_module=MockSocketModule()
)

assert client._cfg_node == connection_sting
assert mock.called


@pytest.mark.unit()
@pytest.mark.parametrize(
"connection_sting",
[
"cluster.abcxyz.cfg.use1.cache.amazonaws.com:abc",
"cluster.abcxyz.cfg.use1.cache.amazonaws.com",
"cluster.abcxyz.cfg.use1.cache.amazonaws.com:123123",
"1.1..1:11211",
],
)
def test_init_invalid_node_endpoint(connection_sting, monkeypatch):
with patch.object(
AWSElastiCacheHashClient, "reconfigure_nodes", new=MagicMock()
) as mock:
with pytest.raises(ValueError):
AWSElastiCacheHashClient(connection_sting, socket_module=MockSocketModule())


@pytest.mark.parametrize(
"server_configuration",
[
(True, ["10.0.0.1:11211", "10.0.0.2:11211"]),
(
False,
[
"cluster.abcxyz.0001.use1.cache.amazonaws.com:11211",
"cluster.abcxyz.0002.use1.cache.amazonaws.com:11211",
],
),
],
)
@pytest.mark.unit()
def test_get_cluster_config_command(server_configuration, monkeypatch):
use_vpc, configuration_list = server_configuration

raw_command = MagicMock(
return_value=b"CONFIG cluster 0 139\r\n"
b"4\n"
b"cluster.abcxyz.0001.use1.cache.amazonaws.com|10.0.0.1|11211 "
b"cluster.abcxyz.0002.use1.cache.amazonaws.com|10.0.0.2|11211"
)

with monkeypatch.context() as ctx:
ctx.setattr(Client, "raw_command", raw_command)
ctx.setattr(AWSElastiCacheHashClient, "client_class", MockMemcacheClient)

client = AWSElastiCacheHashClient(
"cluster.abcxyz.cfg.use1.cache.amazonaws.com:11211",
socket_module=MockSocketModule(),
use_vpc=use_vpc,
)

for name, client in client.clients.items():
assert isinstance(client, MockMemcacheClient)
assert name in configuration_list

assert raw_command.called
1 change: 1 addition & 0 deletions pymemcache/test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def __init__(
allow_unicode_keys=False,
encoding="ascii",
tls_context=None,
**kwargs,
):
self._contents = {}

Expand Down