Skip to content

Commit 958b168

Browse files
committed
add workers arg to CLI
fix test add workers arg to CLI fix test try again add sleep
1 parent 7a911f3 commit 958b168

File tree

2 files changed

+26
-3
lines changed

2 files changed

+26
-3
lines changed

arq/cli.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import logging.config
33
import os
44
import sys
5+
from multiprocessing import Process
56
from signal import Signals
67
from typing import TYPE_CHECKING, cast
78

@@ -20,6 +21,7 @@
2021
watch_help = 'Watch a directory and reload the worker upon changes.'
2122
verbose_help = 'Enable verbose output.'
2223
logdict_help = "Import path for a dictionary in logdict form, to configure Arq's own logging."
24+
workers_help = 'Number of worker processes to spawn'
2325

2426

2527
@click.command('arq')
@@ -28,9 +30,12 @@
2830
@click.option('--burst/--no-burst', default=None, help=burst_help)
2931
@click.option('--check', is_flag=True, help=health_check_help)
3032
@click.option('--watch', type=click.Path(exists=True, dir_okay=True, file_okay=False), help=watch_help)
33+
@click.option('-w', '--workers', type=int, default=1, help=workers_help)
3134
@click.option('-v', '--verbose', is_flag=True, help=verbose_help)
3235
@click.option('--custom-log-dict', type=str, help=logdict_help)
33-
def cli(*, worker_settings: str, burst: bool, check: bool, watch: str, verbose: bool, custom_log_dict: str) -> None:
36+
def cli(
37+
*, worker_settings: str, burst: bool, check: bool, watch: str, workers: int, verbose: bool, custom_log_dict: str
38+
) -> None:
3439
"""
3540
Job queues in python with asyncio and redis.
3641
@@ -49,8 +54,15 @@ def cli(*, worker_settings: str, burst: bool, check: bool, watch: str, verbose:
4954
else:
5055
kwargs = {} if burst is None else {'burst': burst}
5156
if watch:
52-
asyncio.run(watch_reload(watch, worker_settings_))
57+
coroutine = watch_reload(watch, worker_settings_)
58+
if workers > 1:
59+
for _ in range(workers - 1):
60+
Process(target=asyncio.run, args=(coroutine,)).start()
61+
asyncio.run(coroutine)
5362
else:
63+
if workers > 1:
64+
for _ in range(workers - 1):
65+
Process(target=run_worker, args=(worker_settings_,), kwargs=kwargs).start()
5466
run_worker(worker_settings_, **kwargs)
5567

5668

tests/test_cli.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import time
2+
13
import pytest
24
from click.testing import CliRunner
35

@@ -51,7 +53,16 @@ def test_run_watch(mocker, cancel_remaining_task):
5153
runner = CliRunner()
5254
result = runner.invoke(cli, ['tests.test_cli.WorkerSettings', '--watch', 'tests'])
5355
assert result.exit_code == 0
54-
assert '1 files changes, reloading arq worker...'
56+
assert 'files changed, reloading arq worker...' in result.output
57+
58+
59+
@pytest.mark.timeout(10) # may take a while to get to the point we can test
60+
def test_multiple_workers():
61+
runner = CliRunner()
62+
result = runner.invoke(cli, ['tests.test_cli.WorkerSettings', '--workers', '4'])
63+
while 'clients_connected=4' not in result.output:
64+
time.sleep(1)
65+
assert True
5566

5667

5768
custom_log_dict = {

0 commit comments

Comments
 (0)