Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP compatibility. #8

Open
wants to merge 61 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
716f059
break from the for-attempts loop when we complete a succesfull proxy-…
Apr 24, 2012
1702dc4
Fixing HTTP compatibility - headers[Connection] = "close\r"
Jun 15, 2012
194cd9e
Introducing new header Host -> LoadBalanceTo
Jun 21, 2012
81112d0
Merge branch 'master' of git://github.com/wonko/mantrid into LoadBala…
Jul 5, 2012
c62af25
Added configurable load balancing algorithms.
Dec 5, 2012
68d71e9
Merge remote-tracking branch 'remotes/origin/LoadBalanceTo_and_attemt…
Dec 5, 2012
97103b4
Removed a nonsense comment.
Dec 6, 2012
27f171d
Reverted my previous "cleanup" changes.
Dec 6, 2012
490310e
First cut at health-checking. Needed to fix least-connections balancing.
Dec 6, 2012
71f70ab
Fixed the problem with save_loop dying for good.
Dec 6, 2012
4a47cbc
Defer marking the backend as free until transfers finished.
Dec 6, 2012
f082cd4
Does not really improve anything. Revert "Defer marking the backend a…
odcinek Dec 6, 2012
412f1f9
Randomize backends in the least_connections algorithm.
Dec 7, 2012
c91dc86
Prevent the management loop from failing due to exceptions.
Dec 7, 2012
08ba7c3
Made least_connections and healthcheck the defaults.
Dec 10, 2012
f49ebd9
Adding support for the old state format loading.
Dec 11, 2012
927d15d
Changed log level of healthcheck messages.
Dec 11, 2012
3194808
improves logging when no backends found
mateuszzawisza Oct 8, 2013
b3029b8
Start healthchecks once connection attempt fails
karol-nowak Jun 5, 2014
7cb11bc
Change the default number of connection attempts to 2
karol-nowak Jun 5, 2014
2ae34aa
Revert "Change the default number of connection attempts to 2"
karol-nowak Jun 5, 2014
3a3b4d5
Bump version.
karol-nowak Jun 5, 2014
70c166d
CHANGELOG update to test some pip installation silyness.
karol-nowak Jun 5, 2014
2608e01
Start healthchecks once connection attempt fails
karol-nowak Jun 5, 2014
8d2884a
Bump version.
karol-nowak Jun 5, 2014
d342726
CHANGELOG update to test some pip installation silyness.
karol-nowak Jun 5, 2014
c6843c3
sets eventlet version to always be 0.14.0
mateuszzawisza Jul 7, 2014
9405711
Merge branch 'enforce_eventlet_version' into sandbox
mateuszzawisza Jul 7, 2014
0da6505
Add connection and request timeout handling
karol-nowak Jul 10, 2014
1af3ac2
Merge branch 'timeouts' into sandbox
karol-nowak Jul 10, 2014
327ed5a
Version bump
karol-nowak Jul 10, 2014
d1dfb20
Merge branch 'timeouts' into sandbox
karol-nowak Jul 10, 2014
61e33e0
Better logging, fixed problem with missing import
karol-nowak Jul 10, 2014
fd0ac55
Retry connection (not request!) if it fails
karol-nowak Jul 10, 2014
26a6188
Merge branch 'timeouts' into sandbox
karol-nowak Jul 10, 2014
fa189c4
Add support for X-Loadbalance-To header
nagas Jan 15, 2015
08c695f
Merge branch 'X-Loadbalance-To-header' into sandbox
nagas Feb 6, 2015
7d71773
Merge pull request #4 from futuresimple/X-Loadbalance-To-header
nagas Feb 15, 2015
785dbb8
Version 1.0.6
zytek Feb 16, 2015
c2e03c8
Merge branch 'master' into sandbox
zytek Feb 16, 2015
63d7426
Check whether there are healthy backend available
lsiudut Feb 19, 2015
fd31d03
Changed to HTTP/503, as nginx doesn't support custom code
lsiudut Feb 19, 2015
219f2db
Revert "Changed to HTTP/503, as nginx doesn't support custom code"
lsiudut Feb 19, 2015
dfefe00
Logging cleanups
lsiudut Feb 19, 2015
8bdf999
Merge branch 'check-backend-availability' into sandbox
lsiudut Feb 19, 2015
b549f95
Merge pull request #5 from lsiudut/check-backend-availability
lsiudut Feb 20, 2015
a2f143a
Added support for aliases
lsiudut Feb 26, 2015
cef7db5
Rename not-used parameter to avoid overwriting
lsiudut Feb 26, 2015
91198f3
Log X-Request-Id if present
lsiudut Mar 3, 2015
3ee6703
Changed format for Bogdan request
lsiudut Mar 4, 2015
e3464ea
Merge remote-tracking branch 'origin/lucks/improved-logging'
nagas Mar 10, 2015
d77f4d8
Get rid of stack traces in logs
nagas Mar 10, 2015
d164080
Normalize mantrid logs
nagas Mar 10, 2015
8feb2e9
No need for severity string to be of fixed size
nagas Mar 10, 2015
97180b8
Fix logs formatting that include request_id string
nagas Mar 10, 2015
d632238
Fix typo
nagas Mar 10, 2015
ab7f0ac
Bump version
nagas Mar 10, 2015
2f4c65b
Merge branch 'normalize-logs'
nagas Mar 11, 2015
a303a23
Return 594 status code on backend timeout.
emate Mar 26, 2015
af7d170
Add closing connection after sending status
emate Mar 30, 2015
79d3c53
Merge pull request #10 from futuresimple/594-status-code
emate Apr 2, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
1.0.4
-----

* Don't start healtchecks for every backend unless down

1.0.3
-----

Expand Down
2 changes: 1 addition & 1 deletion mantrid/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.0.3"
__version__ = "1.0.8"
111 changes: 93 additions & 18 deletions mantrid/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,22 @@
"""

import errno
import logging
import operator
import os
import random

import eventlet
from eventlet.green import socket
from eventlet.timeout import Timeout
from httplib import responses
from .socketmeld import SocketMelder

from mantrid.backend import Backend
from mantrid.socketmeld import SocketMelder

class NoHealthyBackends(Exception):
"Poll of usable backends is empty"
pass

class Action(object):
"Base action. Doesn't do anything."
Expand Down Expand Up @@ -125,38 +134,87 @@ def handle(self, sock, read_data, path, headers):
class Proxy(Action):
"Proxies them through to a server. What loadbalancers do."

attempts = 1
attempts = 2
delay = 1
default_healthcheck = True
default_algorithm = "least_connections"
connection_timeout_seconds = 2

def __init__(self, balancer, host, matched_host, backends, attempts=None, delay=None):
def __init__(self, balancer, host, matched_host, backends, attempts=None, delay=None, algorithm=default_algorithm, healthcheck=default_healthcheck):
super(Proxy, self).__init__(balancer, host, matched_host)
self.host = host
self.backends = backends
self.algorithm = algorithm
self.healthcheck = healthcheck
self.select_backend = self.random if algorithm == 'random' else self.least_connections
assert self.backends
if attempts is not None:
self.attempts = int(attempts)
if delay is not None:
self.delay = float(delay)

def valid_backends(self):
return [b for b in self.backends if not b.blacklisted or not self.healthcheck]

def random(self):
return random.choice(self.valid_backends())

def least_connections(self):
backends = self.valid_backends()

try:
min_connections = min(b.connections for b in backends)
except ValueError:
raise NoHealthyBackends()

# this is possibly a little bit safer than always returning the first backend
return random.choice([b for b in backends if b.connections == min_connections])

def handle(self, sock, read_data, path, headers):
"Sends back a static error page."
for i in range(self.attempts):
request_id = headers.get("X-Request-Id", "-")
for attempt in range(self.attempts):
if attempt > 0:
logging.warn("[%s] Retrying connection for host %s", request_id, self.host)

backend = self.select_backend()
try:
server_sock = eventlet.connect(
tuple(random.choice(self.backends)),
)
timeout = Timeout(self.connection_timeout_seconds)
try:
server_sock = eventlet.connect((backend.host, backend.port))
finally:
timeout.cancel()

backend.add_connection()
break
except socket.error:
logging.error("[%s] Proxy socket error on connect() to %s of %s", request_id, backend, self.host)
self.blacklist(backend)
eventlet.sleep(self.delay)
continue
# Function to help track data usage
def send_onwards(data):
server_sock.sendall(data)
return len(data)
try:
size = send_onwards(read_data)
size += SocketMelder(sock, server_sock).run()
except socket.error, e:
if e.errno != errno.EPIPE:
raise
except:
logging.warn("[%s] Proxy timeout on connect() to %s of %s", request_id, backend, self.host)
self.blacklist(backend)
eventlet.sleep(self.delay)
continue

# Function to help track data usage
def send_onwards(data):
server_sock.sendall(data)
return len(data)

try:
size = send_onwards(read_data)
size += SocketMelder(sock, server_sock, backend, self.host).run()
except socket.error, e:
if e.errno != errno.EPIPE:
raise
finally:
backend.drop_connection()

def blacklist(self, backend):
if self.healthcheck and not backend.blacklisted:
logging.warn("Blacklisting backend %s of %s", backend, self.host)
backend.blacklisted = True


class Spin(Action):
Expand Down Expand Up @@ -187,3 +245,20 @@ def handle(self, sock, read_data, path, headers):
# OK, nothing happened, so give up.
action = Static(self.balancer, self.host, self.matched_host, type="timeout")
return action.handle(sock, read_data, path, headers)

class Alias(Action):
"""
Alias for another backend
"""
def __init__(self, balancer, host, matched_host, hostname, **_kwargs):
self.host = host
self.balancer = balancer
self.matched_host = matched_host
self.hostname = hostname

action, kwargs, allow_subs = self.balancer.hosts[self.hostname]
action_class = self.balancer.action_mapping[action]
self.aliased = action_class(balancer = self.balancer, host = self.host, matched_host = self.matched_host, **kwargs)

def handle(self, **kwargs):
return self.aliased.handle(**kwargs)
80 changes: 80 additions & 0 deletions mantrid/backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import eventlet
import logging

from eventlet.green import socket
from eventlet.timeout import Timeout

class Backend(object):

healthcheck_delay_seconds = 1
healthcheck_timeout_seconds = 1

def __init__(self, address_tuple):
self.address_tuple = address_tuple
self.active_connections = 0
self._blacklisted = False
self.retired = False

@property
def blacklisted(self):
return self._blacklisted

@blacklisted.setter
def blacklisted(self, value):
if value:
self.start_health_check()
self._blacklisted = value

@property
def address(self):
return self.address_tuple

def add_connection(self):
self.active_connections += 1

def drop_connection(self):
self.active_connections -= 1

@property
def connections(self):
return self.active_connections

@property
def host(self):
return self.address_tuple[0]

@property
def port(self):
return self.address_tuple[1]

def __repr__(self):
return "Backend((%s, %s))" % (self.host, self.port)

def start_health_check(self):
eventlet.spawn(self._health_check_loop)

def _health_check_loop(self):
while True:
if self.retired or not self.blacklisted:
reason = "removing backend" if self.retired else "available"
logging.warn("Stopping health-checking of %s: %s", self, reason)
break

self._check_health()
eventlet.sleep(self.healthcheck_delay_seconds)

def _check_health(self):
logging.debug("Checking health of %s", self)
try:
timeout = Timeout(self.healthcheck_timeout_seconds)
try:
socket = eventlet.connect((self.host, self.port))
finally:
timeout.cancel()

logging.debug("%s is alive, making sure it is not blacklisted", self)
self.blacklisted = False
socket.close()
except:
logging.debug("%s seems dead, will check again later", self)

31 changes: 26 additions & 5 deletions mantrid/cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import sys
from .client import MantridClient

from mantrid.actions import Proxy
from mantrid.backend import Backend
from mantrid.client import MantridClient


class MantridCli(object):
Expand Down Expand Up @@ -47,11 +50,13 @@ def action_list(self):
print format % ("HOST", "ACTION", "SUBDOMS")
for host, details in sorted(self.client.get_all().items()):
if details[0] in ("proxy", "mirror"):
action = "%s<%s>" % (
action = "%s[algorithm=%s,healthcheck=%s]<%s>" % (
details[0],
details[1].get('algorithm', Proxy.default_algorithm),
details[1].get('healthcheck', Proxy.default_healthcheck),
",".join(
"%s:%s" % (host, port)
for host, port in details[1]['backends']
"%s:%s" % (backend.host, backend.port)
for backend in details[1]['backends']
)
)
elif details[0] == "static":
Expand All @@ -69,6 +74,11 @@ def action_list(self):
details[0],
details[1]['code'],
)
elif details[0] == "alias":
action = "%s<%s>" % (
details[0],
details[1]['hostname'],
)
else:
action = details[0]
print format % (host, action, details[2])
Expand Down Expand Up @@ -102,6 +112,12 @@ def action_set(self, hostname=None, action=None, subdoms=None, *args):
if action in ("proxy, mirror") and "backends" not in options:
sys.stderr.write("The %s action requires a backends option.\n" % action)
sys.exit(1)
if action == "alias" and "hostname" not in options:
sys.stderr.write("The %s action requires hostname option.\n" % action)
sys.exit(1)
if "healthcheck" in options and options["healthcheck"].lower() not in ("true", "false"):
sys.stderr.write("The healthcheck option must be one of (true, false)")
sys.exit(1)
if action == "static" and "type" not in options:
sys.stderr.write("The %s action requires a type option.\n" % action)
sys.exit(1)
Expand All @@ -114,9 +130,11 @@ def action_set(self, hostname=None, action=None, subdoms=None, *args):
# Expand some options from text to datastructure
if "backends" in options:
options['backends'] = [
(lambda x: (x[0], int(x[1])))(bit.split(":", 1))
Backend((lambda x: (x[0], int(x[1])))(bit.split(":", 1)))
for bit in options['backends'].split(",")
]
if "healthcheck" in options:
options['healthcheck'] = (options['healthcheck'].lower() == "true")
# Set!
self.client.set(
hostname,
Expand All @@ -141,3 +159,6 @@ def action_stats(self, hostname=None):
details.get("bytes_received", 0),
details.get("bytes_sent", 0),
)

if __name__ == "__main__":
MantridCli.main()
6 changes: 3 additions & 3 deletions mantrid/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
httplib2 = eventlet.import_patched("httplib2")
except ImportError:
import httplib2
import json

import mantrid.json

class MantridClient(object):
"""
Expand All @@ -20,10 +20,10 @@ def _request(self, path, method, body=None):
resp, content = h.request(
self.base_url + path,
method,
body = json.dumps(body),
body = mantrid.json.dumps(body),
)
if resp['status'] == "200":
return json.loads(content)
return mantrid.json.loads(content)
else:
raise IOError(
"Got %s reponse from server (%s)" % (
Expand Down
50 changes: 50 additions & 0 deletions mantrid/json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""JSON helper that defaults to secure (loads, dumps) methods and
supports custom mantrid data types.
"""

from __future__ import absolute_import

import copy
import json

import mantrid.backend


class MantridEncoder(json.JSONEncoder):
"""Custom serialization for mantrid types."""
def default(self, obj):
if isinstance(obj, mantrid.backend.Backend):
return {'__backend__': (obj.host, obj.port)}
return json.JSONEncoder.default(self, obj)

def load_mantrid(dct):
"""Custom deserialization for mantrid types."""
if '__backend__' in dct:
return mantrid.backend.Backend(dct['__backend__'])
return dct


def dumps(*args, **kwargs):
"""Securely dump objects to JSON, supporting custom mantrid types."""
new_kwargs = copy.copy(kwargs)
new_kwargs['cls'] = MantridEncoder
return json.dumps(*args, **new_kwargs)

def dump(*args, **kwargs):
"""Securely dump objects to JSON, supporting custom mantrid types."""
new_kwargs = copy.copy(kwargs)
new_kwargs['cls'] = MantridEncoder
return json.dump(*args, **new_kwargs)

def loads(*args, **kwargs):
"""Securely load objects from JSON, supporting custom mantrid types."""
new_kwargs = copy.copy(kwargs)
new_kwargs['object_hook'] = load_mantrid
return json.loads(*args, **new_kwargs)

def load(*args, **kwargs):
"""Securely load objects from JSON, supporting custom mantrid types."""
new_kwargs = copy.copy(kwargs)
new_kwargs['object_hook'] = load_mantrid
return json.load(*args, **new_kwargs)

Loading