From 4361adfa9d16547bc48cfad8299d2496125df125 Mon Sep 17 00:00:00 2001 From: Bob Gregory Date: Mon, 17 Sep 2018 11:03:18 +0100 Subject: [PATCH] Feature/configurable discovery (#67) This commit adds a `selector` parameter to the connect function. The selector is a function that chooses a node from the list returned by gossip. By default we `select_random` but we also provide functions for `prefer_master` and `prefer_replica` --- CHANGES.md | 8 +++++ README.rst | 8 ++--- docs/api.rst | 3 ++ photonpump/connection.py | 38 ++++++++++++++++++++-- photonpump/discovery.py | 68 ++++++++++++++++++++++++++++++++-------- setup.py | 2 +- test/discovery_test.py | 38 ++++++++++++++++++++-- 7 files changed, 141 insertions(+), 24 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index a7d7a01..a88a7cd 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,9 @@ +## [0.6-alpha-2] - 2018-09-17 +Discovery now supports "selectors" to control how we pick a node from gossip + +## [0.6-alpha-1] - 2018-09-14 +Added support for catch-up subscriptions. + ## [0.5] - 2018-04-27 ### Breaking changes - Dropped the ConnectionContextManager class. @@ -16,6 +22,8 @@ - `published_event` reversed order of type and stream +[0.6.0-alpha-2]: https://github.com/madecom/photon-pump/compare/v0.6.0-alpha-1...v0.6.0-alpha-2 +[0.6.0-alpha-1]: https://github.com/madecom/photon-pump/compare/v0.5.0...v0.6.0-alpha-1 [0.5]: https://github.com/madecom/photon-pump/compare/v0.4.0...v0.5.0 [0.4]: https://github.com/madecom/photon-pump/compare/v0.3.0...v0.4.0 [0.3]: https://github.com/madecom/photon-pump/compare/v0.2.5...v0.3 diff --git a/README.rst b/README.rst index f8398ed..20316d0 100644 --- a/README.rst +++ b/README.rst @@ -20,7 +20,7 @@ Photon pump is available on the `cheese shop`_. :: You will need to install lib-protobuf 3.2.0 or above. -Documentation is available on `Read the docs`_. :: +Documentation is available on `Read the docs`_. Basic Usage ----------- @@ -207,12 +207,12 @@ Sometimes we want to watch a stream continuously and be notified when a new even A persistent subscription stores its state on the server. When your application restarts, you can connect to the subscription again and continue where you left off. Multiple clients can connect to the same persistent subscription to support competing consumer scenarios. To support these features, persistent subscriptions have to run against the master node of an Eventstore cluster. -Firstly, we need to create the subscription. +Firstly, we need to :meth:`create the subscription `. >>> async def create_subscription(subscription_name, stream_name, conn): >>> await conn.create_subscription(subscription_name, stream_name) -Once we have a subscription, we can connect to it to begin receiving events. A persistent subscription exposes an `events` property, which acts like an asynchronous iterator. +Once we have a subscription, we can :meth:`connect to it ` to begin receiving events. A persistent subscription exposes an `events` property, which acts like an asynchronous iterator. >>> async def read_events_from_subscription(subscription_name, stream_name, conn): >>> subscription = await conn.connect_subscription(subscription_name, stream_name) @@ -238,7 +238,7 @@ Volatile subsciptions do not support event acknowledgement. High-Availability Scenarios ~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Eventstore supports an HA-cluster deployment topology. In this scenario, Eventstore runs a master node and multiple slaves. Some operations, particularly subscriptions and projections, are handled only by the master node. To connect to an HA-cluster and automatically find the master node, photonpump supports cluster discovery. +Eventstore supports an HA-cluster deployment topology. In this scenario, Eventstore runs a master node and multiple slaves. Some operations, particularly persistent subscriptions and projections, are handled only by the master node. To connect to an HA-cluster and automatically find the master node, photonpump supports cluster discovery. The cluster discovery interrogates eventstore gossip to find the active master. You can provide the IP of a maching in the cluster, or a DNS name that resolves to some members of the cluster, and photonpump will discover the others. diff --git a/docs/api.rst b/docs/api.rst index 7582c52..68a2d5d 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -10,3 +10,6 @@ Photonpump API Reference .. automodule:: photonpump.messages :members: + +.. automodule:: photonpump.discovery + :members: diff --git a/photonpump/connection.py b/photonpump/connection.py index 1417cb4..67faf6b 100644 --- a/photonpump/connection.py +++ b/photonpump/connection.py @@ -8,7 +8,7 @@ from . import conversations as convo from . import messages as msg -from .discovery import DiscoveryRetryPolicy, NodeService, get_discoverer +from .discovery import DiscoveryRetryPolicy, NodeService, get_discoverer, select_random HEADER_LENGTH = 1 + 1 + 16 SIZE_UINT_32 = 4 @@ -972,7 +972,6 @@ async def stop(self): self.dispatch_loop, self.heartbeat_loop, loop=self.loop, - return_exceptions=True, ) self.transport.close() @@ -990,6 +989,7 @@ def connect( password=None, loop=None, name=None, + selector=select_random, ) -> Client: """ Create a new client. @@ -1015,6 +1015,36 @@ def connect( >>> async with connect(discovery_host="eventstore.test") as c: >>> await c.ping() + The discovery host returns gossip data about the cluster. We use the + gossip to select a node at random from the avaialble cluster members. + + If you're using + :meth:`persistent subscriptions ` + you will always want to connect to the master node of the cluster. + The selector parameter is a function that chooses an available node from + the gossip result. To select the master node, use the + :func:`photonpump.discovery.prefer_master` function. This function will return + the master node if there is a live master, and a random replica otherwise. + All requests to the server can be made with the require_master flag which + will raise an error if the current node is not a master. + + >>> async with connect( + >>> discovery_host="eventstore.test", + >>> selector=discovery.prefer_master, + >>> ) as c: + >>> await c.ping(require_master=True) + + Conversely, you might want to avoid connecting to the master node for reasons + of scalability. For this you can use the + :func:`photonpump.discovery.prefer_replica` function. + + >>> async with connect( + >>> discovery_host="eventstore.test", + >>> selector=discovery.prefer_replica, + >>> ) as c: + >>> await c.ping() + + For some operations, you may need to authenticate your requests by providing a username and password to the client. @@ -1040,9 +1070,11 @@ def connect( username: The username to use when communicating with eventstore. password: The password to use when communicating with eventstore. loop:An Asyncio event loop. + selector: An optional function that selects one element from a list of + :class:`photonpump.disovery.DiscoveredNode` elements. """ - discovery = get_discoverer(host, port, discovery_host, discovery_port) + discovery = get_discoverer(host, port, discovery_host, discovery_port, selector) dispatcher = MessageDispatcher(name=name, loop=loop) connector = Connector(discovery, dispatcher, name=name) diff --git a/photonpump/discovery.py b/photonpump/discovery.py index 02cd9fa..118fbab 100644 --- a/photonpump/discovery.py +++ b/photonpump/discovery.py @@ -4,7 +4,7 @@ import socket from enum import IntEnum from operator import attrgetter -from typing import Iterable, List, NamedTuple, Optional +from typing import Callable, Iterable, List, NamedTuple, Optional import aiodns import aiohttp @@ -26,7 +26,7 @@ class NodeState(IntEnum): Shutdown = 10 -INELIGIBLE_STATE = [NodeState.Manager, NodeState.ShuttingDown, NodeState.Shutdown] +ELIGIBLE_STATE = [NodeState.Clone, NodeState.Slave, NodeState.Master] class NodeService(NamedTuple): @@ -47,6 +47,9 @@ class DiscoveredNode(NamedTuple): external_http: NodeService +Selector = Callable[[List[DiscoveredNode]], Optional[DiscoveredNode]] + + def first(elems: Iterable): LOG.info(elems) @@ -54,17 +57,50 @@ def first(elems: Iterable): return elem -def select(gossip: List[DiscoveredNode]) -> Optional[DiscoveredNode]: +def prefer_master(nodes: List[DiscoveredNode]) -> Optional[DiscoveredNode]: + """ + Select the master if available, otherwise fall back to a replica. + """ + return max(nodes, key=attrgetter("state")) + + +def prefer_replica(nodes: List[DiscoveredNode]) -> Optional[DiscoveredNode]: + """ + Select a random replica if any are available or fall back to the master. + """ + masters = [node for node in nodes if node.state == NodeState.Master] + replicas = [node for node in nodes if node.state != NodeState.Master] + + if replicas: + return random.choice(replicas) + else: + # if you have more than one master then you're on your own, bud. + + return masters[0] + + +def select_random(nodes: List[DiscoveredNode]) -> Optional[DiscoveredNode]: + """ + Return a random node. + """ + return random.choice(nodes) + + +def select( + gossip: List[DiscoveredNode], selector: Optional[Selector] = None +) -> Optional[DiscoveredNode]: eligible_nodes = [ - node for node in gossip if node.is_alive and node.state not in INELIGIBLE_STATE + node for node in gossip if node.is_alive and node.state in ELIGIBLE_STATE ] - LOG.debug("Selecting node from gossip members: %r" % eligible_nodes) + LOG.debug("Selecting node from gossip members: %r", eligible_nodes) if not eligible_nodes: return None - return max(eligible_nodes, key=attrgetter("state")) + selector = selector or prefer_master + + return selector(eligible_nodes) def read_gossip(data): @@ -73,7 +109,7 @@ def read_gossip(data): return [] - LOG.debug(f"Received gossip for { len(data['members']) } nodes") + LOG.debug("Received gossip for {%s} nodes", len(data["members"])) return [ DiscoveredNode( @@ -142,7 +178,7 @@ async def reset_to_dns(self): random.shuffle(result) if result: - LOG.debug(f"Found { len(result) } hosts for name {self.name}") + LOG.debug("Found %s hosts for name %s", len(result), self.name) current_attempt = 0 self.seeds = [ NodeService(address=node.host, port=self.port, secure_port=None) @@ -182,7 +218,7 @@ async def fetch_new_gossip(session, seed): if not seed: return [] - LOG.debug(f"Fetching gossip from http://{seed.address}:{seed.port}/gossip") + LOG.debug("Fetching gossip from http://%s:%s/gossip", seed.address, seed.port) try: resp = await session.get(f"http://{seed.address}:{seed.port}/gossip") data = await resp.json() @@ -190,7 +226,7 @@ async def fetch_new_gossip(session, seed): return read_gossip(data) except aiohttp.ClientError: LOG.exception( - f"Failed loading gossip from http://{seed.address}:{seed.port}/gossip" + "Failed loading gossip from http://%s:%s/gossip", seed.address, seed.port ) return None @@ -209,6 +245,7 @@ async def discover(self): if self.failed: raise DiscoveryFailed() LOG.debug("SingleNodeDiscovery returning node %s", self.node) + return self.node @@ -245,12 +282,13 @@ def record_failure(self, node): class ClusterDiscovery: - def __init__(self, seed_finder, http_session, retry_policy): + def __init__(self, seed_finder, http_session, retry_policy, selector): self.session = http_session self.seeds = seed_finder self.last_gossip = [] self.best_node = None self.retry_policy = retry_policy + self.selector = selector def close(self): self.session.close() @@ -263,7 +301,7 @@ def record_gossip(self, node, gossip): for member in gossip: self.seeds.add_node(member.external_http) - self.best_node = select(gossip) + self.best_node = select(gossip, self.selector) self.retry_policy.record_success(node) async def get_gossip(self): @@ -343,7 +381,9 @@ def record_failure(self, node): self.stats.record_failure(node) -def get_discoverer(host, port, discovery_host, discovery_port): +def get_discoverer( + host, port, discovery_host, discovery_port, selector: Optional[Selector] = None +): if discovery_host is None: LOG.info("Using single-node discoverer") @@ -358,6 +398,7 @@ def get_discoverer(host, port, discovery_host, discovery_port): StaticSeedFinder([NodeService(discovery_host, discovery_port, None)]), session, DiscoveryRetryPolicy(), + selector, ) except socket.error: LOG.info("Using cluster node discovery with DNS") @@ -367,4 +408,5 @@ def get_discoverer(host, port, discovery_host, discovery_port): DnsSeedFinder(discovery_host, resolver, discovery_port), session, DiscoveryRetryPolicy(), + selector, ) diff --git a/setup.py b/setup.py index 5f5bff3..a79df8c 100644 --- a/setup.py +++ b/setup.py @@ -33,7 +33,7 @@ def run_tests(self): setup( name="photon-pump", - version="0.5.0", + version="0.6.0-alpha-2", url="http://github.com/madedotcom/photon-pump/", license="MIT", author="Bob Gregory", diff --git a/test/discovery_test.py b/test/discovery_test.py index 07e7160..4ce9939 100644 --- a/test/discovery_test.py +++ b/test/discovery_test.py @@ -18,6 +18,8 @@ get_discoverer, read_gossip, select, + prefer_master, + prefer_replica, ) from . import data @@ -214,7 +216,7 @@ async def wait(self, seed): session = aiohttp.ClientSession() with aioresponses() as mock: successful_discoverer = ClusterDiscovery( - StaticSeedFinder([seed]), session, retry + StaticSeedFinder([seed]), session, retry, None ) mock.get("http://1.2.3.4:2113/gossip", status=500) @@ -254,7 +256,7 @@ async def wait(self, seed): gossip = data.make_gossip("2.3.4.5") with aioresponses() as mock: successful_discoverer = ClusterDiscovery( - StaticSeedFinder([seed]), aiohttp.ClientSession(), retry + StaticSeedFinder([seed]), aiohttp.ClientSession(), retry, None ) mock.get("http://1.2.3.4:2113/gossip", status=500) @@ -302,8 +304,38 @@ def mark_failed(self, node): node = NodeService("2.3.4.5", 1234, None) finder = spy_seed_finder() - discoverer = ClusterDiscovery(finder, None, None) + discoverer = ClusterDiscovery(finder, None, None, None) discoverer.mark_failed(node) assert finder == [node] + + +@pytest.mark.asyncio +async def test_prefer_replica(): + """ + If we ask the discoverer to prefer_replica it should return a replica node + before returning a master. + """ + + discoverer = get_discoverer(None, None, "10.0.0.1", 2113, prefer_replica) + gossip = data.make_gossip("10.0.0.1", "10.0.0.2") + with aioresponses() as mock: + mock.get("http://10.0.0.1:2113/gossip", payload=gossip) + + assert await discoverer.discover() == NodeService("10.0.0.2", 1113, None) + + +@pytest.mark.asyncio +async def test_prefer_master(): + """ + If we ask the discoverer to prefer_master it should return a master node + before returning a replica. + """ + + discoverer = get_discoverer(None, None, "10.0.0.1", 2113, prefer_master) + gossip = data.make_gossip("10.0.0.1", "10.0.0.2") + with aioresponses() as mock: + mock.get("http://10.0.0.1:2113/gossip", payload=gossip) + + assert await discoverer.discover() == NodeService("10.0.0.1", 1113, None)