Skip to content
This repository has been archived by the owner on Jun 5, 2023. It is now read-only.

Commit

Permalink
Merge pull request #122 from lcwill/worker-pids-fix
Browse files Browse the repository at this point in the history
worker_pids() does not return correct set of pyres worker pids
  • Loading branch information
binarymatt committed Jan 22, 2013
2 parents 104df5b + e119d7e commit d936b45
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pyres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def reserve(self, queues):
return Job.reserve(queues, self)

def __str__(self):
return "PyRes Client connected to %s" % self.redis.server
return "PyRes Client connected to %s" % self.dsn

def workers(self):
from pyres.worker import Worker
Expand Down
14 changes: 9 additions & 5 deletions pyres/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def unregister_worker(self):

def prune_dead_workers(self):
all_workers = Worker.all(self.resq)
known_workers = self.worker_pids()
known_workers = Worker.worker_pids()
for worker in all_workers:
host, pid, queues = worker.id.split(':')
if host != self.hostname:
Expand Down Expand Up @@ -329,12 +329,16 @@ def state(self):
return 'working'
return 'idle'

def worker_pids(self):
@classmethod
def worker_pids(cls):
"""Returns an array of all pids (as strings) of the workers on
this machine. Used when pruning dead workers."""
return map(lambda l: l.strip().split(' ')[0],
commands.getoutput("ps -A -o pid,command | \
grep pyres_worker").split("\n"))
cmd = "ps -A -o pid,command | grep pyres_worker | grep -v grep"
output = commands.getoutput(cmd)
if output:
return map(lambda l: l.strip().split(' ')[0], output.split("\n"))
else:
return []

@classmethod
def run(cls, queues, server="localhost:6379", interval=None, timeout=None):
Expand Down
31 changes: 31 additions & 0 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,5 +296,36 @@ def test_retries_give_up_eventually(self):
assert None == worker.process()
assert worker.get_failed() == 1

def test_worker_pids(self):
# spawn worker processes and get pids
pids = []
pids.append(self.spawn_worker(['basic']))
pids.append(self.spawn_worker(['basic']))
time.sleep(1)
worker_pids = Worker.worker_pids()

# send kill signal to workers and wait for them to exit
import signal
for pid in pids:
os.kill(pid, signal.SIGQUIT)
os.waitpid(pid, 0)

# ensure worker_pids() returned the correct pids
for pid in pids:
assert str(pid) in worker_pids

# ensure the workers are no longer returned by worker_pids()
worker_pids = Worker.worker_pids()
for pid in pids:
assert str(pid) not in worker_pids

def spawn_worker(self, queues):
pid = os.fork()
if not pid:
Worker.run(queues, interval=1)
os._exit(0)
else:
return pid

def set_current_time(self, time):
ResQ._current_time = staticmethod(lambda: time)

0 comments on commit d936b45

Please sign in to comment.