From a9beaf19158513682cb67961a0ba23aa81901e5d Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Thu, 14 Nov 2024 17:24:51 +0530 Subject: [PATCH 1/4] leakage --- proxy/core/base/tcp_server.py | 8 ++-- proxy/core/connection/connection.py | 65 +++++++++++++++++++++------ proxy/core/connection/leak.py | 44 ++++++++++++++++++ proxy/core/work/fd/fd.py | 3 +- proxy/core/work/threadless.py | 19 ++++---- proxy/http/client.py | 4 +- proxy/http/handler.py | 4 +- proxy/http/server/reverse.py | 12 +++++ tests/core/test_leakage.py | 59 ++++++++++++++++++++++++ tests/http/parser/test_http_parser.py | 14 ++++++ 10 files changed, 201 insertions(+), 31 deletions(-) create mode 100644 proxy/core/connection/leak.py create mode 100644 tests/core/test_leakage.py diff --git a/proxy/core/base/tcp_server.py b/proxy/core/base/tcp_server.py index 4d32949d55..3fa9eda2e0 100644 --- a/proxy/core/base/tcp_server.py +++ b/proxy/core/base/tcp_server.py @@ -104,7 +104,7 @@ class BaseTcpServerHandler(Work[T]): is ready to accept new data before flushing data to it. Most importantly, BaseTcpServerHandler ensures that pending buffers - to the client are flushed before connection is closed. + to the client are flushed before connection is closed with the client. Implementations must provide:: @@ -170,9 +170,9 @@ async def handle_events( async def handle_writables(self, writables: Writables) -> bool: teardown = False if self.work.connection.fileno() in writables and self.work.has_buffer(): - logger.debug( - 'Flushing buffer to client {0}'.format(self.work.address), - ) + # logger.debug( + # 'Flushing buffer to client {0}'.format(self.work.address), + # ) self.work.flush(self.flags.max_sendbuf_size) if self.must_flush_before_shutdown is True and \ not self.work.has_buffer(): diff --git a/proxy/core/connection/connection.py b/proxy/core/connection/connection.py index 2200f93e6c..5c30d547c9 100644 --- a/proxy/core/connection/connection.py +++ b/proxy/core/connection/connection.py @@ -12,6 +12,7 @@ from abc import ABC, abstractmethod from typing import List, Union, Optional +from .leak import Leakage from .types import tcpConnectionTypes from ...common.types import TcpOrTlsSocket from ...common.constants import DEFAULT_BUFFER_SIZE, DEFAULT_MAX_SEND_SIZE @@ -19,6 +20,7 @@ logger = logging.getLogger(__name__) +EMPTY_MV = memoryview(b"") class TcpConnectionUninitializedException(Exception): pass @@ -34,12 +36,23 @@ class TcpConnection(ABC): a socket connection object. """ - def __init__(self, tag: int) -> None: + def __init__( + self, + tag: int, + flush_bw_in_bps: int = 512, + recv_bw_in_bps: int = 512, + ) -> None: self.tag: str = 'server' if tag == tcpConnectionTypes.SERVER else 'client' self.buffer: List[memoryview] = [] self.closed: bool = False self._reusable: bool = False self._num_buffer = 0 + self._flush_leakage = ( + Leakage(rate=flush_bw_in_bps) if flush_bw_in_bps > 0 else None + ) + self._recv_leakage = ( + Leakage(rate=recv_bw_in_bps) if recv_bw_in_bps > 0 else None + ) @property @abstractmethod @@ -55,14 +68,19 @@ def recv( self, buffer_size: int = DEFAULT_BUFFER_SIZE, ) -> Optional[memoryview]: """Users must handle socket.error exceptions""" + if self._recv_leakage is not None: + allowed_bytes = self._recv_leakage.consume(buffer_size) + if allowed_bytes == 0: + return EMPTY_MV + buffer_size = min(buffer_size, allowed_bytes) data: bytes = self.connection.recv(buffer_size) - if len(data) == 0: + size = len(data) + if self._recv_leakage is not None: + self._recv_leakage.putback(buffer_size - size) + if size == 0: return None - logger.debug( - 'received %d bytes from %s' % - (len(data), self.tag), - ) - # logger.info(data) + logger.debug("received %d bytes from %s" % (size, self.tag)) + logger.info(data) return memoryview(data) def close(self) -> bool: @@ -75,6 +93,8 @@ def has_buffer(self) -> bool: return self._num_buffer != 0 def queue(self, mv: memoryview) -> None: + if len(mv) == 0: + return self.buffer.append(mv) self._num_buffer += 1 @@ -83,21 +103,38 @@ def flush(self, max_send_size: Optional[int] = None) -> int: if not self.has_buffer(): return 0 mv = self.buffer[0] + print(self.buffer) + print(mv.tobytes()) # TODO: Assemble multiple packets if total # size remains below max send size. max_send_size = max_send_size or DEFAULT_MAX_SEND_SIZE - try: - sent: int = self.send(mv[:max_send_size]) - except BlockingIOError: - logger.warning('BlockingIOError when trying send to {0}'.format(self.tag)) - return 0 + allowed_bytes = ( + self._flush_leakage.consume(min(len(mv), max_send_size)) + if self._flush_leakage is not None + else max_send_size + ) + sent: int = 0 + if allowed_bytes > 0: + try: + sent = self.send(mv[:allowed_bytes]) + if self._flush_leakage is not None: + self._flush_leakage.putback(allowed_bytes - sent) + except BlockingIOError: + logger.warning( + "BlockingIOError when trying send to {0}".format(self.tag) + ) + del mv + return 0 + # if sent == 0: + # return 0 if sent == len(mv): self.buffer.pop(0) self._num_buffer -= 1 else: self.buffer[0] = mv[sent:] - logger.debug('flushed %d bytes to %s' % (sent, self.tag)) - # logger.info(mv[:sent].tobytes()) + # if sent > 0: + logger.debug("flushed %d bytes to %s" % (sent, self.tag)) + logger.info(mv[:sent].tobytes()) del mv return sent diff --git a/proxy/core/connection/leak.py b/proxy/core/connection/leak.py new file mode 100644 index 0000000000..f74ec3d809 --- /dev/null +++ b/proxy/core/connection/leak.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +""" + proxy.py + ~~~~~~~~ + ⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on + Network monitoring, controls & Application development, testing, debugging. + + :copyright: (c) 2013-present by Abhinav Singh and contributors. + :license: BSD, see LICENSE for more details. +""" +import time + + +class Leakage: + def __init__(self, rate: int): + """Initialize the leaky bucket with a specified leak rate in bytes per second.""" + self.rate = ( + rate # Maximum number of tokens the bucket can hold (bytes per second) + ) + self.tokens = rate # Initially start with a full bucket + self.last_check = time.time() # Record the current time + + def _refill(self): + """Refill tokens based on the elapsed time since the last check.""" + now = time.time() + elapsed = now - self.last_check + # Add tokens proportional to elapsed time, up to the rate + self.tokens += int(elapsed * self.rate) + # Cap tokens at the maximum rate to enforce the rate limit + self.tokens = min(self.tokens, self.rate) + self.last_check = now # Update the last check time + + def putback(self, tokens) -> None: + self.tokens += tokens + + def consume(self, amount: int) -> int: + """Attempt to consume the amount from the bucket. + + Returns the amount allowed to be sent, up to the available tokens (rate). + """ + self._refill() # Refill the tokens before consumption + allowed = min(amount, self.tokens) # Allow up to the available tokens + self.tokens -= allowed # Subtract the allowed amount from the available tokens + return allowed # Return the number of bytes allowed to be consumed diff --git a/proxy/core/work/fd/fd.py b/proxy/core/work/fd/fd.py index a13024769c..854426f48a 100644 --- a/proxy/core/work/fd/fd.py +++ b/proxy/core/work/fd/fd.py @@ -41,6 +41,7 @@ def work(self, *args: Any) -> None: publisher_id=self.__class__.__qualname__, ) try: + logger.debug("Initializing work#{0}".format(fileno)) self.works[fileno].initialize() self._total += 1 except Exception as e: @@ -48,7 +49,7 @@ def work(self, *args: Any) -> None: 'Exception occurred during initialization', exc_info=e, ) - self._cleanup(fileno) + self._cleanup(fileno, "error") @property @abstractmethod diff --git a/proxy/core/work/threadless.py b/proxy/core/work/threadless.py index e8f7339def..9544692bdd 100644 --- a/proxy/core/work/threadless.py +++ b/proxy/core/work/threadless.py @@ -301,16 +301,18 @@ def _cleanup_inactive(self) -> None: if self.works[work_id].is_inactive(): inactive_works.append(work_id) for work_id in inactive_works: - self._cleanup(work_id) + self._cleanup(work_id, "inactive") # TODO: HttpProtocolHandler.shutdown can call flush which may block - def _cleanup(self, work_id: int) -> None: + def _cleanup(self, work_id: int, reason: str) -> None: if work_id in self.registered_events_by_work_ids: assert self.selector for fileno in self.registered_events_by_work_ids[work_id]: logger.debug( - 'fd#{0} unregistered by work#{1}'.format( - fileno, work_id, + "fd#{0} unregistered by work#{1}, reason: {2}".format( + fileno, + work_id, + reason, ), ) self.selector.unregister(fileno) @@ -360,7 +362,7 @@ async def _run_once(self) -> bool: return False # Invoke Threadless.handle_events self.unfinished.update(self._create_tasks(work_by_ids)) - # logger.debug('Executing {0} works'.format(len(self.unfinished))) + # logger.debug("Executing {0} works".format(len(self.unfinished))) # Cleanup finished tasks for task in await self._wait_for_tasks(): # Checking for result can raise exception e.g. @@ -374,11 +376,12 @@ async def _run_once(self) -> bool: teardown = True finally: if teardown: - self._cleanup(work_id) + self._cleanup(work_id, "teardown") # self.cleanup(int(task.get_name())) # logger.debug( - # 'Done executing works, {0} pending, {1} registered'.format( - # len(self.unfinished), len(self.registered_events_by_work_ids), + # "Done executing works, {0} pending, {1} registered".format( + # len(self.unfinished), + # len(self.registered_events_by_work_ids), # ), # ) return False diff --git a/proxy/http/client.py b/proxy/http/client.py index 28dbb97cce..c4ba7b8a59 100644 --- a/proxy/http/client.py +++ b/proxy/http/client.py @@ -32,8 +32,8 @@ def client( conn_close: bool = True, scheme: bytes = HTTPS_PROTO, timeout: float = DEFAULT_TIMEOUT, - content_type: bytes = b'application/x-www-form-urlencoded', - verify: bool = True, + content_type: bytes = b"application/x-www-form-urlencoded", + verify: bool = False, ) -> Optional[HttpParser]: """HTTP Client""" request = build_http_request( diff --git a/proxy/http/handler.py b/proxy/http/handler.py index 5120d5b32c..0b69087dc2 100644 --- a/proxy/http/handler.py +++ b/proxy/http/handler.py @@ -190,7 +190,7 @@ def handle_data(self, data: memoryview) -> Optional[bool]: async def handle_writables(self, writables: Writables) -> bool: if self.work.connection.fileno() in writables and self.work.has_buffer(): - logger.debug('Client is write ready, flushing...') + # logger.debug('Client is write ready, flushing...') self.last_activity = time.time() # TODO(abhinavsingh): This hook could just reside within server recv block # instead of invoking when flushed to client. @@ -219,7 +219,7 @@ async def handle_writables(self, writables: Writables) -> bool: async def handle_readables(self, readables: Readables) -> bool: if self.work.connection.fileno() in readables: - logger.debug('Client is read ready, receiving...') + # logger.debug('Client is read ready, receiving...') self.last_activity = time.time() try: teardown = await super().handle_readables(readables) diff --git a/proxy/http/server/reverse.py b/proxy/http/server/reverse.py index d1d5e631da..c007b06ee1 100644 --- a/proxy/http/server/reverse.py +++ b/proxy/http/server/reverse.py @@ -85,6 +85,9 @@ def handle_request(self, request: HttpParser) -> None: random.choice(route[1]), ) needs_upstream = True + logger.debug( + "Starting connection to upstream {0}".format(self.choice) + ) break # Dynamic routes elif isinstance(route, str): @@ -95,14 +98,23 @@ def handle_request(self, request: HttpParser) -> None: self.choice = choice needs_upstream = True self._upstream_proxy_pass = str(self.choice) + logger.debug( + "Starting connection to upstream {0}".format(choice) + ) elif isinstance(choice, memoryview): self.client.queue(choice) self._upstream_proxy_pass = '{0} bytes'.format(len(choice)) + logger.debug("Sending raw response to client") else: self.upstream = choice self._upstream_proxy_pass = '{0}:{1}'.format( *self.upstream.addr, ) + logger.debug( + "Using existing connection to upstream {0}".format( + self.upstream.addr + ) + ) break else: raise ValueError('Invalid route') diff --git a/tests/core/test_leakage.py b/tests/core/test_leakage.py new file mode 100644 index 0000000000..cbec6e4f3c --- /dev/null +++ b/tests/core/test_leakage.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- +""" + proxy.py + ~~~~~~~~ + ⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on + Network monitoring, controls & Application development, testing, debugging. + + :copyright: (c) 2013-present by Abhinav Singh and contributors. + :license: BSD, see LICENSE for more details. +""" +import time + +import unittest + +from proxy.core.connection.leak import Leakage + + +class TestTcpConnectionLeakage(unittest.TestCase): + def test_initial_consume_no_tokens(self): + # Test consuming with no tokens available initially + rate = 100 # bytes per second + bucket = Leakage(rate) + self.assertEqual( + bucket.consume(150), 100 + ) # No tokens yet, so expect 0 bytes to be sent + + def test_consume_with_refill(self): + # Test consuming with refill after waiting + rate = 100 # bytes per second + bucket = Leakage(rate) + time.sleep(1) # Wait for a second to allow refill + self.assertEqual(bucket.consume(50), 50) # 50 bytes should be available + + def test_consume_above_leak_rate(self): + # Test attempting to consume more than the leak rate after a refill + rate = 100 # bytes per second + bucket = Leakage(rate) + time.sleep(1) # Wait for a second to allow refill + self.assertEqual(bucket.consume(150), 100) # Only 100 bytes should be allowed + + def test_repeated_consume_with_partial_refill(self): + # Test repeated consumption with partial refill + rate = 100 # bytes per second + bucket = Leakage(rate) + + time.sleep(1) # Allow tokens to accumulate + bucket.consume(80) # Consume 80 bytes, should leave 20 + time.sleep(0.5) # Wait half a second to refill by 50 bytes + + self.assertEqual(bucket.consume(50), 50) # 50 bytes should be available now + + def test_negative_token_guard(self): + # Ensure tokens do not go negative + rate = 100 # bytes per second + bucket = Leakage(rate) + time.sleep(1) # Allow tokens to accumulate + bucket.consume(150) # Consume all available tokens + self.assertEqual(bucket.consume(10), 0) # Should return 0 as no tokens are left + self.assertEqual(bucket.tokens, 0) # Tokens should not be negative diff --git a/tests/http/parser/test_http_parser.py b/tests/http/parser/test_http_parser.py index 427553c310..c3eecf9d0a 100644 --- a/tests/http/parser/test_http_parser.py +++ b/tests/http/parser/test_http_parser.py @@ -908,3 +908,17 @@ def test_cannot_parse_sip_protocol(self) -> None: b'\r\n', ), ) + + def test_byte_by_byte(self) -> None: + response = HttpParser(httpParserTypes.RESPONSE_PARSER) + request = [ + # pylint: disable=line-too-long + b'HTTP/1.1 200 OK\r\naccess-control-allow-credentials: true\r\naccess-control-allow-origin: *\r\ncontent-type: application/json; charset=utf-8\r\ndate: Thu, 14 Nov 2024 10:24:11 GMT\r\ncontent-length: 550\r\nserver: Fly/a40a59d0 (2024-11-12)\r\nvia: 1.1 fly.io\r\nfly-request-id: 01JCN37CEK4TB4DRWZDFFQYSD9-bom\r\n\r\n{\n "args": {},\n "headers": {\n "Accept": [\n "*/*"\n ],\n "Host": [\n "httpbingo.org"\n ],\n "User-Agent": [\n "curl/8.6.0"\n ],\n "Via": [\n "1.1 fly.io"\n ],\n "X-Forwarded-For": [\n "183.82.162.68, 66.241.125.232"\n ],\n "X-Forwarded-Port": [\n "443"\n ],\n "X-Forwarded-Proto": [\n "https"\n ],\n "X-Forwarded-Ssl', + b'": [\n "on"\n ],\n "X-Request-Start": [\n "t=1731579851219982"\n ]\n },\n "method": "GET",\n "origin": "183.82.162.68",\n "url": "https://httpbingo.org/get"\n}\n', + ] + response.parse(memoryview(request[0])) + self.assertEqual(response.state, httpParserStates.RCVING_BODY) + self.assertEqual(response.code, b"200") + for byte in (bytes([b]) for b in request[1]): + response.parse(memoryview(byte)) + self.assertEqual(response.state, httpParserStates.COMPLETE) From 076ffde8342ed3ed17c89caf01bfdd09634b04a2 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 14 Nov 2024 11:56:34 +0000 Subject: [PATCH 2/4] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- proxy/core/connection/connection.py | 8 ++++---- proxy/core/work/fd/fd.py | 4 ++-- proxy/core/work/threadless.py | 6 +++--- proxy/http/client.py | 2 +- proxy/http/server/reverse.py | 12 ++++++------ tests/core/test_leakage.py | 2 +- tests/http/parser/test_http_parser.py | 2 +- 7 files changed, 18 insertions(+), 18 deletions(-) diff --git a/proxy/core/connection/connection.py b/proxy/core/connection/connection.py index 5c30d547c9..f14b8753e7 100644 --- a/proxy/core/connection/connection.py +++ b/proxy/core/connection/connection.py @@ -20,7 +20,7 @@ logger = logging.getLogger(__name__) -EMPTY_MV = memoryview(b"") +EMPTY_MV = memoryview(b'') class TcpConnectionUninitializedException(Exception): pass @@ -79,7 +79,7 @@ def recv( self._recv_leakage.putback(buffer_size - size) if size == 0: return None - logger.debug("received %d bytes from %s" % (size, self.tag)) + logger.debug('received %d bytes from %s' % (size, self.tag)) logger.info(data) return memoryview(data) @@ -121,7 +121,7 @@ def flush(self, max_send_size: Optional[int] = None) -> int: self._flush_leakage.putback(allowed_bytes - sent) except BlockingIOError: logger.warning( - "BlockingIOError when trying send to {0}".format(self.tag) + 'BlockingIOError when trying send to {0}'.format(self.tag), ) del mv return 0 @@ -133,7 +133,7 @@ def flush(self, max_send_size: Optional[int] = None) -> int: else: self.buffer[0] = mv[sent:] # if sent > 0: - logger.debug("flushed %d bytes to %s" % (sent, self.tag)) + logger.debug('flushed %d bytes to %s' % (sent, self.tag)) logger.info(mv[:sent].tobytes()) del mv return sent diff --git a/proxy/core/work/fd/fd.py b/proxy/core/work/fd/fd.py index 854426f48a..e17a33ae7a 100644 --- a/proxy/core/work/fd/fd.py +++ b/proxy/core/work/fd/fd.py @@ -41,7 +41,7 @@ def work(self, *args: Any) -> None: publisher_id=self.__class__.__qualname__, ) try: - logger.debug("Initializing work#{0}".format(fileno)) + logger.debug('Initializing work#{0}'.format(fileno)) self.works[fileno].initialize() self._total += 1 except Exception as e: @@ -49,7 +49,7 @@ def work(self, *args: Any) -> None: 'Exception occurred during initialization', exc_info=e, ) - self._cleanup(fileno, "error") + self._cleanup(fileno, 'error') @property @abstractmethod diff --git a/proxy/core/work/threadless.py b/proxy/core/work/threadless.py index 9544692bdd..93d44be444 100644 --- a/proxy/core/work/threadless.py +++ b/proxy/core/work/threadless.py @@ -301,7 +301,7 @@ def _cleanup_inactive(self) -> None: if self.works[work_id].is_inactive(): inactive_works.append(work_id) for work_id in inactive_works: - self._cleanup(work_id, "inactive") + self._cleanup(work_id, 'inactive') # TODO: HttpProtocolHandler.shutdown can call flush which may block def _cleanup(self, work_id: int, reason: str) -> None: @@ -309,7 +309,7 @@ def _cleanup(self, work_id: int, reason: str) -> None: assert self.selector for fileno in self.registered_events_by_work_ids[work_id]: logger.debug( - "fd#{0} unregistered by work#{1}, reason: {2}".format( + 'fd#{0} unregistered by work#{1}, reason: {2}'.format( fileno, work_id, reason, @@ -376,7 +376,7 @@ async def _run_once(self) -> bool: teardown = True finally: if teardown: - self._cleanup(work_id, "teardown") + self._cleanup(work_id, 'teardown') # self.cleanup(int(task.get_name())) # logger.debug( # "Done executing works, {0} pending, {1} registered".format( diff --git a/proxy/http/client.py b/proxy/http/client.py index c4ba7b8a59..2df366edfc 100644 --- a/proxy/http/client.py +++ b/proxy/http/client.py @@ -32,7 +32,7 @@ def client( conn_close: bool = True, scheme: bytes = HTTPS_PROTO, timeout: float = DEFAULT_TIMEOUT, - content_type: bytes = b"application/x-www-form-urlencoded", + content_type: bytes = b'application/x-www-form-urlencoded', verify: bool = False, ) -> Optional[HttpParser]: """HTTP Client""" diff --git a/proxy/http/server/reverse.py b/proxy/http/server/reverse.py index c007b06ee1..34bf569825 100644 --- a/proxy/http/server/reverse.py +++ b/proxy/http/server/reverse.py @@ -86,7 +86,7 @@ def handle_request(self, request: HttpParser) -> None: ) needs_upstream = True logger.debug( - "Starting connection to upstream {0}".format(self.choice) + 'Starting connection to upstream {0}'.format(self.choice), ) break # Dynamic routes @@ -99,21 +99,21 @@ def handle_request(self, request: HttpParser) -> None: needs_upstream = True self._upstream_proxy_pass = str(self.choice) logger.debug( - "Starting connection to upstream {0}".format(choice) + 'Starting connection to upstream {0}'.format(choice), ) elif isinstance(choice, memoryview): self.client.queue(choice) self._upstream_proxy_pass = '{0} bytes'.format(len(choice)) - logger.debug("Sending raw response to client") + logger.debug('Sending raw response to client') else: self.upstream = choice self._upstream_proxy_pass = '{0}:{1}'.format( *self.upstream.addr, ) logger.debug( - "Using existing connection to upstream {0}".format( - self.upstream.addr - ) + 'Using existing connection to upstream {0}'.format( + self.upstream.addr, + ), ) break else: diff --git a/tests/core/test_leakage.py b/tests/core/test_leakage.py index cbec6e4f3c..897a09691f 100644 --- a/tests/core/test_leakage.py +++ b/tests/core/test_leakage.py @@ -21,7 +21,7 @@ def test_initial_consume_no_tokens(self): rate = 100 # bytes per second bucket = Leakage(rate) self.assertEqual( - bucket.consume(150), 100 + bucket.consume(150), 100, ) # No tokens yet, so expect 0 bytes to be sent def test_consume_with_refill(self): diff --git a/tests/http/parser/test_http_parser.py b/tests/http/parser/test_http_parser.py index c3eecf9d0a..a5254c86d7 100644 --- a/tests/http/parser/test_http_parser.py +++ b/tests/http/parser/test_http_parser.py @@ -918,7 +918,7 @@ def test_byte_by_byte(self) -> None: ] response.parse(memoryview(request[0])) self.assertEqual(response.state, httpParserStates.RCVING_BODY) - self.assertEqual(response.code, b"200") + self.assertEqual(response.code, b'200') for byte in (bytes([b]) for b in request[1]): response.parse(memoryview(byte)) self.assertEqual(response.state, httpParserStates.COMPLETE) From 0bd493897bdc067c911b04d662ad78898e5e4466 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Fri, 22 Nov 2024 11:22:12 +0530 Subject: [PATCH 3/4] Fix lint issues --- proxy/core/connection/connection.py | 14 +++---- proxy/core/connection/leak.py | 44 -------------------- proxy/core/work/fd/fd.py | 4 +- proxy/core/work/threadless.py | 6 +-- proxy/http/client.py | 2 +- proxy/http/server/reverse.py | 12 +++--- tests/core/test_leakage.py | 59 --------------------------- tests/http/parser/test_http_parser.py | 2 +- 8 files changed, 20 insertions(+), 123 deletions(-) delete mode 100644 proxy/core/connection/leak.py delete mode 100644 tests/core/test_leakage.py diff --git a/proxy/core/connection/connection.py b/proxy/core/connection/connection.py index 5c30d547c9..99ffb367b4 100644 --- a/proxy/core/connection/connection.py +++ b/proxy/core/connection/connection.py @@ -12,15 +12,15 @@ from abc import ABC, abstractmethod from typing import List, Union, Optional -from .leak import Leakage from .types import tcpConnectionTypes from ...common.types import TcpOrTlsSocket +from ...common.leakage import Leakage from ...common.constants import DEFAULT_BUFFER_SIZE, DEFAULT_MAX_SEND_SIZE logger = logging.getLogger(__name__) -EMPTY_MV = memoryview(b"") +EMPTY_MV = memoryview(b'') class TcpConnectionUninitializedException(Exception): pass @@ -76,10 +76,10 @@ def recv( data: bytes = self.connection.recv(buffer_size) size = len(data) if self._recv_leakage is not None: - self._recv_leakage.putback(buffer_size - size) + self._recv_leakage.release(buffer_size - size) if size == 0: return None - logger.debug("received %d bytes from %s" % (size, self.tag)) + logger.debug('received %d bytes from %s' % (size, self.tag)) logger.info(data) return memoryview(data) @@ -118,10 +118,10 @@ def flush(self, max_send_size: Optional[int] = None) -> int: try: sent = self.send(mv[:allowed_bytes]) if self._flush_leakage is not None: - self._flush_leakage.putback(allowed_bytes - sent) + self._flush_leakage.release(allowed_bytes - sent) except BlockingIOError: logger.warning( - "BlockingIOError when trying send to {0}".format(self.tag) + 'BlockingIOError when trying send to {0}'.format(self.tag), ) del mv return 0 @@ -133,7 +133,7 @@ def flush(self, max_send_size: Optional[int] = None) -> int: else: self.buffer[0] = mv[sent:] # if sent > 0: - logger.debug("flushed %d bytes to %s" % (sent, self.tag)) + logger.debug('flushed %d bytes to %s' % (sent, self.tag)) logger.info(mv[:sent].tobytes()) del mv return sent diff --git a/proxy/core/connection/leak.py b/proxy/core/connection/leak.py deleted file mode 100644 index f74ec3d809..0000000000 --- a/proxy/core/connection/leak.py +++ /dev/null @@ -1,44 +0,0 @@ -# -*- coding: utf-8 -*- -""" - proxy.py - ~~~~~~~~ - ⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on - Network monitoring, controls & Application development, testing, debugging. - - :copyright: (c) 2013-present by Abhinav Singh and contributors. - :license: BSD, see LICENSE for more details. -""" -import time - - -class Leakage: - def __init__(self, rate: int): - """Initialize the leaky bucket with a specified leak rate in bytes per second.""" - self.rate = ( - rate # Maximum number of tokens the bucket can hold (bytes per second) - ) - self.tokens = rate # Initially start with a full bucket - self.last_check = time.time() # Record the current time - - def _refill(self): - """Refill tokens based on the elapsed time since the last check.""" - now = time.time() - elapsed = now - self.last_check - # Add tokens proportional to elapsed time, up to the rate - self.tokens += int(elapsed * self.rate) - # Cap tokens at the maximum rate to enforce the rate limit - self.tokens = min(self.tokens, self.rate) - self.last_check = now # Update the last check time - - def putback(self, tokens) -> None: - self.tokens += tokens - - def consume(self, amount: int) -> int: - """Attempt to consume the amount from the bucket. - - Returns the amount allowed to be sent, up to the available tokens (rate). - """ - self._refill() # Refill the tokens before consumption - allowed = min(amount, self.tokens) # Allow up to the available tokens - self.tokens -= allowed # Subtract the allowed amount from the available tokens - return allowed # Return the number of bytes allowed to be consumed diff --git a/proxy/core/work/fd/fd.py b/proxy/core/work/fd/fd.py index 854426f48a..e17a33ae7a 100644 --- a/proxy/core/work/fd/fd.py +++ b/proxy/core/work/fd/fd.py @@ -41,7 +41,7 @@ def work(self, *args: Any) -> None: publisher_id=self.__class__.__qualname__, ) try: - logger.debug("Initializing work#{0}".format(fileno)) + logger.debug('Initializing work#{0}'.format(fileno)) self.works[fileno].initialize() self._total += 1 except Exception as e: @@ -49,7 +49,7 @@ def work(self, *args: Any) -> None: 'Exception occurred during initialization', exc_info=e, ) - self._cleanup(fileno, "error") + self._cleanup(fileno, 'error') @property @abstractmethod diff --git a/proxy/core/work/threadless.py b/proxy/core/work/threadless.py index ed21d20e8c..3587cb9d2f 100644 --- a/proxy/core/work/threadless.py +++ b/proxy/core/work/threadless.py @@ -318,7 +318,7 @@ def _cleanup_inactive(self) -> None: if self.works[work_id].is_inactive(): inactive_works.append(work_id) for work_id in inactive_works: - self._cleanup(work_id, "inactive") + self._cleanup(work_id, 'inactive') # TODO: HttpProtocolHandler.shutdown can call flush which may block def _cleanup(self, work_id: int, reason: str) -> None: @@ -326,7 +326,7 @@ def _cleanup(self, work_id: int, reason: str) -> None: assert self.selector for fileno in self.registered_events_by_work_ids[work_id]: logger.debug( - "fd#{0} unregistered by work#{1}, reason: {2}".format( + 'fd#{0} unregistered by work#{1}, reason: {2}'.format( fileno, work_id, reason, @@ -400,7 +400,7 @@ async def _run_once(self) -> bool: teardown = True finally: if teardown: - self._cleanup(work_id, "teardown") + self._cleanup(work_id, 'teardown') # self.cleanup(int(task.get_name())) # logger.debug( # "Done executing works, {0} pending, {1} registered".format( diff --git a/proxy/http/client.py b/proxy/http/client.py index c4ba7b8a59..2df366edfc 100644 --- a/proxy/http/client.py +++ b/proxy/http/client.py @@ -32,7 +32,7 @@ def client( conn_close: bool = True, scheme: bytes = HTTPS_PROTO, timeout: float = DEFAULT_TIMEOUT, - content_type: bytes = b"application/x-www-form-urlencoded", + content_type: bytes = b'application/x-www-form-urlencoded', verify: bool = False, ) -> Optional[HttpParser]: """HTTP Client""" diff --git a/proxy/http/server/reverse.py b/proxy/http/server/reverse.py index c007b06ee1..34bf569825 100644 --- a/proxy/http/server/reverse.py +++ b/proxy/http/server/reverse.py @@ -86,7 +86,7 @@ def handle_request(self, request: HttpParser) -> None: ) needs_upstream = True logger.debug( - "Starting connection to upstream {0}".format(self.choice) + 'Starting connection to upstream {0}'.format(self.choice), ) break # Dynamic routes @@ -99,21 +99,21 @@ def handle_request(self, request: HttpParser) -> None: needs_upstream = True self._upstream_proxy_pass = str(self.choice) logger.debug( - "Starting connection to upstream {0}".format(choice) + 'Starting connection to upstream {0}'.format(choice), ) elif isinstance(choice, memoryview): self.client.queue(choice) self._upstream_proxy_pass = '{0} bytes'.format(len(choice)) - logger.debug("Sending raw response to client") + logger.debug('Sending raw response to client') else: self.upstream = choice self._upstream_proxy_pass = '{0}:{1}'.format( *self.upstream.addr, ) logger.debug( - "Using existing connection to upstream {0}".format( - self.upstream.addr - ) + 'Using existing connection to upstream {0}'.format( + self.upstream.addr, + ), ) break else: diff --git a/tests/core/test_leakage.py b/tests/core/test_leakage.py deleted file mode 100644 index cbec6e4f3c..0000000000 --- a/tests/core/test_leakage.py +++ /dev/null @@ -1,59 +0,0 @@ -# -*- coding: utf-8 -*- -""" - proxy.py - ~~~~~~~~ - ⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on - Network monitoring, controls & Application development, testing, debugging. - - :copyright: (c) 2013-present by Abhinav Singh and contributors. - :license: BSD, see LICENSE for more details. -""" -import time - -import unittest - -from proxy.core.connection.leak import Leakage - - -class TestTcpConnectionLeakage(unittest.TestCase): - def test_initial_consume_no_tokens(self): - # Test consuming with no tokens available initially - rate = 100 # bytes per second - bucket = Leakage(rate) - self.assertEqual( - bucket.consume(150), 100 - ) # No tokens yet, so expect 0 bytes to be sent - - def test_consume_with_refill(self): - # Test consuming with refill after waiting - rate = 100 # bytes per second - bucket = Leakage(rate) - time.sleep(1) # Wait for a second to allow refill - self.assertEqual(bucket.consume(50), 50) # 50 bytes should be available - - def test_consume_above_leak_rate(self): - # Test attempting to consume more than the leak rate after a refill - rate = 100 # bytes per second - bucket = Leakage(rate) - time.sleep(1) # Wait for a second to allow refill - self.assertEqual(bucket.consume(150), 100) # Only 100 bytes should be allowed - - def test_repeated_consume_with_partial_refill(self): - # Test repeated consumption with partial refill - rate = 100 # bytes per second - bucket = Leakage(rate) - - time.sleep(1) # Allow tokens to accumulate - bucket.consume(80) # Consume 80 bytes, should leave 20 - time.sleep(0.5) # Wait half a second to refill by 50 bytes - - self.assertEqual(bucket.consume(50), 50) # 50 bytes should be available now - - def test_negative_token_guard(self): - # Ensure tokens do not go negative - rate = 100 # bytes per second - bucket = Leakage(rate) - time.sleep(1) # Allow tokens to accumulate - bucket.consume(150) # Consume all available tokens - self.assertEqual(bucket.consume(10), 0) # Should return 0 as no tokens are left - self.assertEqual(bucket.tokens, 0) # Tokens should not be negative diff --git a/tests/http/parser/test_http_parser.py b/tests/http/parser/test_http_parser.py index c3eecf9d0a..a5254c86d7 100644 --- a/tests/http/parser/test_http_parser.py +++ b/tests/http/parser/test_http_parser.py @@ -918,7 +918,7 @@ def test_byte_by_byte(self) -> None: ] response.parse(memoryview(request[0])) self.assertEqual(response.state, httpParserStates.RCVING_BODY) - self.assertEqual(response.code, b"200") + self.assertEqual(response.code, b'200') for byte in (bytes([b]) for b in request[1]): response.parse(memoryview(byte)) self.assertEqual(response.state, httpParserStates.COMPLETE) From dce6afdfa7513a81714e28e16d13b4051ffb8c76 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Fri, 22 Nov 2024 12:47:34 +0530 Subject: [PATCH 4/4] cleanup --- proxy/core/connection/connection.py | 28 +++++++++++----------------- proxy/http/client.py | 2 +- 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/proxy/core/connection/connection.py b/proxy/core/connection/connection.py index 99ffb367b4..9fb53009d0 100644 --- a/proxy/core/connection/connection.py +++ b/proxy/core/connection/connection.py @@ -39,20 +39,16 @@ class TcpConnection(ABC): def __init__( self, tag: int, - flush_bw_in_bps: int = 512, - recv_bw_in_bps: int = 512, + flush_bps: int = 512, + recv_bps: int = 512, ) -> None: self.tag: str = 'server' if tag == tcpConnectionTypes.SERVER else 'client' self.buffer: List[memoryview] = [] self.closed: bool = False self._reusable: bool = False self._num_buffer = 0 - self._flush_leakage = ( - Leakage(rate=flush_bw_in_bps) if flush_bw_in_bps > 0 else None - ) - self._recv_leakage = ( - Leakage(rate=recv_bw_in_bps) if recv_bw_in_bps > 0 else None - ) + self._flush_leakage = Leakage(rate=flush_bps) if flush_bps > 0 else None + self._recv_leakage = Leakage(rate=recv_bps) if recv_bps > 0 else None @property @abstractmethod @@ -75,8 +71,9 @@ def recv( buffer_size = min(buffer_size, allowed_bytes) data: bytes = self.connection.recv(buffer_size) size = len(data) - if self._recv_leakage is not None: - self._recv_leakage.release(buffer_size - size) + unused = buffer_size - size + if self._recv_leakage is not None and unused > 0: + self._recv_leakage.release(unused) if size == 0: return None logger.debug('received %d bytes from %s' % (size, self.tag)) @@ -103,8 +100,6 @@ def flush(self, max_send_size: Optional[int] = None) -> int: if not self.has_buffer(): return 0 mv = self.buffer[0] - print(self.buffer) - print(mv.tobytes()) # TODO: Assemble multiple packets if total # size remains below max send size. max_send_size = max_send_size or DEFAULT_MAX_SEND_SIZE @@ -117,22 +112,21 @@ def flush(self, max_send_size: Optional[int] = None) -> int: if allowed_bytes > 0: try: sent = self.send(mv[:allowed_bytes]) - if self._flush_leakage is not None: - self._flush_leakage.release(allowed_bytes - sent) except BlockingIOError: logger.warning( 'BlockingIOError when trying send to {0}'.format(self.tag), ) del mv return 0 - # if sent == 0: - # return 0 + finally: + unused = allowed_bytes - sent + if self._flush_leakage is not None and unused > 0: + self._flush_leakage.release(unused) if sent == len(mv): self.buffer.pop(0) self._num_buffer -= 1 else: self.buffer[0] = mv[sent:] - # if sent > 0: logger.debug('flushed %d bytes to %s' % (sent, self.tag)) logger.info(mv[:sent].tobytes()) del mv diff --git a/proxy/http/client.py b/proxy/http/client.py index 2df366edfc..28dbb97cce 100644 --- a/proxy/http/client.py +++ b/proxy/http/client.py @@ -33,7 +33,7 @@ def client( scheme: bytes = HTTPS_PROTO, timeout: float = DEFAULT_TIMEOUT, content_type: bytes = b'application/x-www-form-urlencoded', - verify: bool = False, + verify: bool = True, ) -> Optional[HttpParser]: """HTTP Client""" request = build_http_request(