Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leakage #1503

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open

Leakage #1503

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions proxy/core/base/tcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class BaseTcpServerHandler(Work[T]):
is ready to accept new data before flushing data to it.

Most importantly, BaseTcpServerHandler ensures that pending buffers
to the client are flushed before connection is closed.
to the client are flushed before connection is closed with the client.

Implementations must provide::

Expand Down Expand Up @@ -170,9 +170,9 @@ async def handle_events(
async def handle_writables(self, writables: Writables) -> bool:
teardown = False
if self.work.connection.fileno() in writables and self.work.has_buffer():
logger.debug(
'Flushing buffer to client {0}'.format(self.work.address),
)
# logger.debug(
# 'Flushing buffer to client {0}'.format(self.work.address),
# )
self.work.flush(self.flags.max_sendbuf_size)
if self.must_flush_before_shutdown is True and \
not self.work.has_buffer():
Expand Down
57 changes: 44 additions & 13 deletions proxy/core/connection/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

from .types import tcpConnectionTypes
from ...common.types import TcpOrTlsSocket
from ...common.leakage import Leakage
from ...common.constants import DEFAULT_BUFFER_SIZE, DEFAULT_MAX_SEND_SIZE


logger = logging.getLogger(__name__)

EMPTY_MV = memoryview(b'')

class TcpConnectionUninitializedException(Exception):
pass
Expand All @@ -34,12 +36,19 @@ class TcpConnection(ABC):
a socket connection object.
"""

def __init__(self, tag: int) -> None:
def __init__(
self,
tag: int,
flush_bps: int = 512,
recv_bps: int = 512,
) -> None:
self.tag: str = 'server' if tag == tcpConnectionTypes.SERVER else 'client'
self.buffer: List[memoryview] = []
self.closed: bool = False
self._reusable: bool = False
self._num_buffer = 0
self._flush_leakage = Leakage(rate=flush_bps) if flush_bps > 0 else None
self._recv_leakage = Leakage(rate=recv_bps) if recv_bps > 0 else None

@property
@abstractmethod
Expand All @@ -55,14 +64,20 @@ def recv(
self, buffer_size: int = DEFAULT_BUFFER_SIZE,
) -> Optional[memoryview]:
"""Users must handle socket.error exceptions"""
if self._recv_leakage is not None:
allowed_bytes = self._recv_leakage.consume(buffer_size)
if allowed_bytes == 0:
return EMPTY_MV
buffer_size = min(buffer_size, allowed_bytes)
data: bytes = self.connection.recv(buffer_size)
if len(data) == 0:
size = len(data)
unused = buffer_size - size
if self._recv_leakage is not None and unused > 0:
self._recv_leakage.release(unused)
if size == 0:
return None
logger.debug(
'received %d bytes from %s' %
(len(data), self.tag),
)
# logger.info(data)
logger.debug('received %d bytes from %s' % (size, self.tag))
logger.info(data)
return memoryview(data)

def close(self) -> bool:
Expand All @@ -75,6 +90,8 @@ def has_buffer(self) -> bool:
return self._num_buffer != 0

def queue(self, mv: memoryview) -> None:
if len(mv) == 0:
return
self.buffer.append(mv)
self._num_buffer += 1

Expand All @@ -86,18 +103,32 @@ def flush(self, max_send_size: Optional[int] = None) -> int:
# TODO: Assemble multiple packets if total
# size remains below max send size.
max_send_size = max_send_size or DEFAULT_MAX_SEND_SIZE
try:
sent: int = self.send(mv[:max_send_size])
except BlockingIOError:
logger.warning('BlockingIOError when trying send to {0}'.format(self.tag))
return 0
allowed_bytes = (
self._flush_leakage.consume(min(len(mv), max_send_size))
if self._flush_leakage is not None
else max_send_size
)
sent: int = 0
if allowed_bytes > 0:
try:
sent = self.send(mv[:allowed_bytes])
except BlockingIOError:
logger.warning(
'BlockingIOError when trying send to {0}'.format(self.tag),
)
del mv
return 0
finally:
unused = allowed_bytes - sent
if self._flush_leakage is not None and unused > 0:
self._flush_leakage.release(unused)
if sent == len(mv):
self.buffer.pop(0)
self._num_buffer -= 1
else:
self.buffer[0] = mv[sent:]
logger.debug('flushed %d bytes to %s' % (sent, self.tag))
# logger.info(mv[:sent].tobytes())
logger.info(mv[:sent].tobytes())
del mv
return sent

Expand Down
3 changes: 2 additions & 1 deletion proxy/core/work/fd/fd.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@ def work(self, *args: Any) -> None:
publisher_id=self.__class__.__qualname__,
)
try:
logger.debug('Initializing work#{0}'.format(fileno))
self.works[fileno].initialize()
self._total += 1
except Exception as e:
logger.exception( # pragma: no cover
'Exception occurred during initialization',
exc_info=e,
)
self._cleanup(fileno)
self._cleanup(fileno, 'error')

@property
@abstractmethod
Expand Down
19 changes: 11 additions & 8 deletions proxy/core/work/threadless.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,16 +318,18 @@ def _cleanup_inactive(self) -> None:
if self.works[work_id].is_inactive():
inactive_works.append(work_id)
for work_id in inactive_works:
self._cleanup(work_id)
self._cleanup(work_id, 'inactive')

# TODO: HttpProtocolHandler.shutdown can call flush which may block
def _cleanup(self, work_id: int) -> None:
def _cleanup(self, work_id: int, reason: str) -> None:
if work_id in self.registered_events_by_work_ids:
assert self.selector
for fileno in self.registered_events_by_work_ids[work_id]:
logger.debug(
'fd#{0} unregistered by work#{1}'.format(
fileno, work_id,
'fd#{0} unregistered by work#{1}, reason: {2}'.format(
fileno,
work_id,
reason,
),
)
self.selector.unregister(fileno)
Expand Down Expand Up @@ -384,7 +386,7 @@ async def _run_once(self) -> bool:
return False
# Invoke Threadless.handle_events
self.unfinished.update(self._create_tasks(work_by_ids))
# logger.debug('Executing {0} works'.format(len(self.unfinished)))
# logger.debug("Executing {0} works".format(len(self.unfinished)))
# Cleanup finished tasks
for task in await self._wait_for_tasks():
# Checking for result can raise exception e.g.
Expand All @@ -398,11 +400,12 @@ async def _run_once(self) -> bool:
teardown = True
finally:
if teardown:
self._cleanup(work_id)
self._cleanup(work_id, 'teardown')
# self.cleanup(int(task.get_name()))
# logger.debug(
# 'Done executing works, {0} pending, {1} registered'.format(
# len(self.unfinished), len(self.registered_events_by_work_ids),
# "Done executing works, {0} pending, {1} registered".format(
# len(self.unfinished),
# len(self.registered_events_by_work_ids),
# ),
# )
return False
Expand Down
4 changes: 2 additions & 2 deletions proxy/http/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def handle_data(self, data: memoryview) -> Optional[bool]:

async def handle_writables(self, writables: Writables) -> bool:
if self.work.connection.fileno() in writables and self.work.has_buffer():
logger.debug('Client is write ready, flushing...')
# logger.debug('Client is write ready, flushing...')
self.last_activity = time.time()
# TODO(abhinavsingh): This hook could just reside within server recv block
# instead of invoking when flushed to client.
Expand Down Expand Up @@ -219,7 +219,7 @@ async def handle_writables(self, writables: Writables) -> bool:

async def handle_readables(self, readables: Readables) -> bool:
if self.work.connection.fileno() in readables:
logger.debug('Client is read ready, receiving...')
# logger.debug('Client is read ready, receiving...')
self.last_activity = time.time()
try:
teardown = await super().handle_readables(readables)
Expand Down
12 changes: 12 additions & 0 deletions proxy/http/server/reverse.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ def handle_request(self, request: HttpParser) -> None:
random.choice(route[1]),
)
needs_upstream = True
logger.debug(
'Starting connection to upstream {0}'.format(self.choice),
)
break
# Dynamic routes
elif isinstance(route, str):
Expand All @@ -95,14 +98,23 @@ def handle_request(self, request: HttpParser) -> None:
self.choice = choice
needs_upstream = True
self._upstream_proxy_pass = str(self.choice)
logger.debug(
'Starting connection to upstream {0}'.format(choice),
)
elif isinstance(choice, memoryview):
self.client.queue(choice)
self._upstream_proxy_pass = '{0} bytes'.format(len(choice))
logger.debug('Sending raw response to client')
else:
self.upstream = choice
self._upstream_proxy_pass = '{0}:{1}'.format(
*self.upstream.addr,
)
logger.debug(
'Using existing connection to upstream {0}'.format(
self.upstream.addr,
),
)
break
else:
raise ValueError('Invalid route')
Expand Down
3 changes: 1 addition & 2 deletions tests/common/test_leakage.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ def test_initial_consume_no_tokens(self) -> None:
rate = 100 # bytes per second
bucket = Leakage(rate)
self.assertEqual(
bucket.consume(150),
100,
bucket.consume(150), 100,
) # No tokens yet, so expect 0 bytes to be sent

def test_consume_with_refill(self) -> None:
Expand Down
14 changes: 14 additions & 0 deletions tests/http/parser/test_http_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -908,3 +908,17 @@ def test_cannot_parse_sip_protocol(self) -> None:
b'\r\n',
),
)

def test_byte_by_byte(self) -> None:
response = HttpParser(httpParserTypes.RESPONSE_PARSER)
request = [
# pylint: disable=line-too-long
b'HTTP/1.1 200 OK\r\naccess-control-allow-credentials: true\r\naccess-control-allow-origin: *\r\ncontent-type: application/json; charset=utf-8\r\ndate: Thu, 14 Nov 2024 10:24:11 GMT\r\ncontent-length: 550\r\nserver: Fly/a40a59d0 (2024-11-12)\r\nvia: 1.1 fly.io\r\nfly-request-id: 01JCN37CEK4TB4DRWZDFFQYSD9-bom\r\n\r\n{\n "args": {},\n "headers": {\n "Accept": [\n "*/*"\n ],\n "Host": [\n "httpbingo.org"\n ],\n "User-Agent": [\n "curl/8.6.0"\n ],\n "Via": [\n "1.1 fly.io"\n ],\n "X-Forwarded-For": [\n "183.82.162.68, 66.241.125.232"\n ],\n "X-Forwarded-Port": [\n "443"\n ],\n "X-Forwarded-Proto": [\n "https"\n ],\n "X-Forwarded-Ssl',
b'": [\n "on"\n ],\n "X-Request-Start": [\n "t=1731579851219982"\n ]\n },\n "method": "GET",\n "origin": "183.82.162.68",\n "url": "https://httpbingo.org/get"\n}\n',
]
response.parse(memoryview(request[0]))
self.assertEqual(response.state, httpParserStates.RCVING_BODY)
self.assertEqual(response.code, b'200')
for byte in (bytes([b]) for b in request[1]):
response.parse(memoryview(byte))
self.assertEqual(response.state, httpParserStates.COMPLETE)
Loading