Skip to content

Commit

Permalink
Add --listener-pool-klass, --acceptor-pool-klass, --threadless-pool-k…
Browse files Browse the repository at this point in the history
…lass
  • Loading branch information
alexey-pelykh committed Mar 21, 2023
1 parent f3d19ff commit fa80db5
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 14 deletions.
1 change: 1 addition & 0 deletions docs/changelog-fragments.d/1324.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `--listener-pool-klass`, `--acceptor-pool-klass`, `--threadless-pool-klass`
3 changes: 3 additions & 0 deletions proxy/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ def _env_threadless_compliant() -> bool:
'{response_bytes} bytes - {connection_time_ms}ms'
DEFAULT_REVERSE_PROXY_ACCESS_LOG_FORMAT = '{client_ip}:{client_port} - ' + \
'{request_method} {request_path} -> {upstream_proxy_pass} - {connection_time_ms}ms'
DEFAULT_LISTENER_POOL_KLASS = 'proxy.core.listener.pool.ListenerPool'
DEFAULT_ACCEPTOR_POOL_KLASS = 'proxy.core.acceptor.pool.AcceptorPool'
DEFAULT_NUM_ACCEPTORS = 0
DEFAULT_NUM_WORKERS = 0
DEFAULT_OPEN_FILE_LIMIT = 1024
Expand All @@ -127,6 +129,7 @@ def _env_threadless_compliant() -> bool:
DEFAULT_STATIC_SERVER_DIR = os.path.join(PROXY_PY_DIR, "public")
DEFAULT_MIN_COMPRESSION_LENGTH = 20 # In bytes
DEFAULT_THREADLESS = _env_threadless_compliant()
DEFAULT_THREADLESS_POOL_KLASS = 'proxy.core.work.pool.ThreadlessPool'
DEFAULT_LOCAL_EXECUTOR = True
DEFAULT_TIMEOUT = 10.0
DEFAULT_VERSION = False
Expand Down
21 changes: 21 additions & 0 deletions proxy/common/flag.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,24 @@ def initialize(
if isinstance(work_klass, str) \
else work_klass

# Load acceptor_pool_klass
acceptor_pool_klass = opts.get('acceptor_pool_klass', args.acceptor_pool_klass)
acceptor_pool_klass = Plugins.importer(bytes_(acceptor_pool_klass))[0] \
if isinstance(acceptor_pool_klass, str) \
else acceptor_pool_klass

# Load listener_pool_klass
listener_pool_klass = opts.get('listener_pool_klass', args.listener_pool_klass)
listener_pool_klass = Plugins.importer(bytes_(listener_pool_klass))[0] \
if isinstance(listener_pool_klass, str) \
else listener_pool_klass

# Load threadless_pool_klass
threadless_pool_klass = opts.get('threadless_pool_klass', args.threadless_pool_klass)
threadless_pool_klass = Plugins.importer(bytes_(threadless_pool_klass))[0] \
if isinstance(threadless_pool_klass, str) \
else threadless_pool_klass

# TODO: Plugin flag initialization logic must be moved within plugins.
#
# Generate auth_code required for basic authentication if enabled
Expand Down Expand Up @@ -201,6 +219,8 @@ def initialize(
# def option(t: object, key: str, default: Any) -> Any:
# return cast(t, opts.get(key, default))
args.work_klass = work_klass
args.acceptor_pool_klass = acceptor_pool_klass
args.listener_pool_klass = listener_pool_klass
args.plugins = plugins
args.auth_code = cast(
Optional[bytes],
Expand Down Expand Up @@ -376,6 +396,7 @@ def initialize(
# evaluates to False.
args.threadless = cast(bool, opts.get('threadless', args.threadless))
args.threadless = is_threadless(args.threadless, args.threaded)
args.threadless_pool_klass = threadless_pool_klass

args.pid_file = cast(
Optional[str], opts.get(
Expand Down
12 changes: 11 additions & 1 deletion proxy/core/acceptor/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
from .acceptor import Acceptor
from ..listener import ListenerPool
from ...common.flag import flags
from ...common.constants import DEFAULT_NUM_ACCEPTORS
from ...common.constants import (
DEFAULT_NUM_ACCEPTORS, DEFAULT_ACCEPTOR_POOL_KLASS,
)


if TYPE_CHECKING: # pragma: no cover
Expand All @@ -33,6 +35,14 @@
logger = logging.getLogger(__name__)


flags.add_argument(
'--acceptor-pool-klass',
type=str,
default=DEFAULT_ACCEPTOR_POOL_KLASS,
help='Default: ' + DEFAULT_ACCEPTOR_POOL_KLASS +
'. Acceptor pool klass.',
)

flags.add_argument(
'--num-acceptors',
type=int,
Expand Down
11 changes: 11 additions & 0 deletions proxy/core/listener/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,23 @@

from .tcp import TcpSocketListener
from .unix import UnixSocketListener
from ...common.flag import flags
from ...common.constants import DEFAULT_LISTENER_POOL_KLASS


if TYPE_CHECKING: # pragma: no cover
from .base import BaseListener


flags.add_argument(
'--listener-pool-klass',
type=str,
default=DEFAULT_LISTENER_POOL_KLASS,
help='Default: ' + DEFAULT_LISTENER_POOL_KLASS +
'. Listener pool klass.',
)


class ListenerPool:
"""Provides abstraction around starting multiple listeners
based upon flags."""
Expand Down
12 changes: 11 additions & 1 deletion proxy/core/work/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
from multiprocessing import connection

from ...common.flag import flags
from ...common.constants import DEFAULT_THREADLESS, DEFAULT_NUM_WORKERS
from ...common.constants import (
DEFAULT_THREADLESS, DEFAULT_NUM_WORKERS, DEFAULT_THREADLESS_POOL_KLASS,
)


if TYPE_CHECKING: # pragma: no cover
Expand Down Expand Up @@ -54,6 +56,14 @@
help='Defaults to number of CPU cores.',
)

flags.add_argument(
'--threadless-pool-klass',
type=str,
default=DEFAULT_THREADLESS_POOL_KLASS,
help='Default: ' + DEFAULT_THREADLESS_POOL_KLASS +
'. Threadless pool klass.',
)


class ThreadlessPool:
"""Manages lifecycle of threadless pool and delegates work to them
Expand Down
33 changes: 21 additions & 12 deletions proxy/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,10 @@ def setup(self) -> None:
self._write_pid_file()
# We setup listeners first because of flags.port override
# in case of ephemeral port being used
self.listeners = ListenerPool(flags=self.flags)
self.listeners = cast(
'ListenerPool',
self.flags.listener_pool_klass(flags=self.flags),
)
self.listeners.setup()
# Override flags.port to match the actual port
# we are listening upon. This is necessary to preserve
Expand Down Expand Up @@ -234,20 +237,26 @@ def setup(self) -> None:
# Setup remote executors only if
# --local-executor mode isn't enabled.
if self.remote_executors_enabled:
self.executors = ThreadlessPool(
flags=self.flags,
event_queue=event_queue,
executor_klass=RemoteFdExecutor,
self.executors = cast(
'ThreadlessPool',
self.flags.threadless_pool_klass(
flags=self.flags,
event_queue=event_queue,
executor_klass=RemoteFdExecutor,
),
)
self.executors.setup()
# Setup acceptors
self.acceptors = AcceptorPool(
flags=self.flags,
listeners=self.listeners,
executor_queues=self.executors.work_queues if self.executors else [],
executor_pids=self.executors.work_pids if self.executors else [],
executor_locks=self.executors.work_locks if self.executors else [],
event_queue=event_queue,
self.acceptors = cast(
'AcceptorPool',
self.flags.acceptor_pool_klass(
flags=self.flags,
listeners=self.listeners,
executor_queues=self.executors.work_queues if self.executors else [],
executor_pids=self.executors.work_pids if self.executors else [],
executor_locks=self.executors.work_locks if self.executors else [],
event_queue=event_queue,
),
)
self.acceptors.setup()
# Start SSH tunnel acceptor if enabled
Expand Down

0 comments on commit fa80db5

Please sign in to comment.