Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Repeaters, v.2: Sleep 0.2s if all locked #35686

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion corehq/motech/repeaters/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
CHECK_REPEATERS_INTERVAL = timedelta(minutes=5)
CHECK_REPEATERS_PARTITION_COUNT = settings.CHECK_REPEATERS_PARTITION_COUNT
CHECK_REPEATERS_KEY = 'check-repeaters-key'
PROCESS_REPEATERS_INTERVAL = timedelta(minutes=5)
PROCESS_REPEATERS_KEY = 'process-repeaters-key'
PROCESS_REPEATERS_INTERVAL = timedelta(minutes=1)
ENDPOINT_TIMER = 'endpoint_timer'
# Number of attempts to an online endpoint before cancelling payload
MAX_ATTEMPTS = 3
Expand Down
82 changes: 55 additions & 27 deletions corehq/motech/repeaters/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@

"""
import random
import time
import uuid
from datetime import datetime, timedelta
from inspect import cleandoc
Expand Down Expand Up @@ -110,7 +111,7 @@
MAX_RETRY_WAIT,
PROCESS_REPEATERS_INTERVAL,
RATE_LIMITER_DELAY_RANGE,
State,
State, PROCESS_REPEATERS_KEY,
)
from .models import (
Repeater,
Expand Down Expand Up @@ -305,19 +306,60 @@ def process_repeaters():
Processes repeaters, instead of processing repeat records
independently the way that ``check_repeaters()`` does.
"""
process_repeaters_lock = get_redis_lock(
PROCESS_REPEATERS_KEY,
timeout=None, # Iterating repeaters forever is fine
name=PROCESS_REPEATERS_KEY,
)
if not process_repeaters_lock.acquire(blocking=False):
return

metrics_counter('commcare.repeaters.process_repeaters.start')
for domain, repeater_id in iter_ready_repeater_ids():
try:
for repeater_id, lock_token in iter_repeater_id_tokens():
process_repeater(repeater_id, lock_token)
finally:
process_repeaters_lock.release()
metrics_counter('commcare.repeaters.process_repeaters.complete')


def iter_repeater_id_tokens():
"""
Iterate repeater_id-lock_token pairs until all ready repeat records
are processed.
"""
while True:
metrics_counter('commcare.repeaters.process_repeaters.iter_once')
acquired_list = []
for repeater_id in iter_filtered_repeater_ids():
lock = RepeaterLock(repeater_id)
if acquired := lock.acquire():
yield repeater_id, lock.token
acquired_list.append(acquired)
if not acquired_list:
# No repeaters are ready, enabled, or not rate-limited
return
if not any(acquired_list):
# All repeaters are still processing. Sleep to allow at
# least one repeater to finish.
kaapstorm marked this conversation as resolved.
Show resolved Hide resolved
metrics_counter('commcare.repeaters.process_repeaters.all_locked')
time.sleep(0.2)


def iter_filtered_repeater_ids():
"""
Filters ready repeater_ids based on whether data forwarding is
enabled for the domain, and rate limiting.
"""
for domain, repeater_id in iter_ready_domain_repeater_ids():
if not domain_can_forward_now(domain):
continue
if rate_limit_repeater(domain, repeater_id):
continue
lock = RepeaterLock(repeater_id)
if lock.acquire():
repeater = Repeater.objects.get(domain=domain, id=repeater_id)
process_repeater(repeater, lock.token)
yield repeater_id


def iter_ready_repeater_ids():
def iter_ready_domain_repeater_ids():
"""
Yields domain-repeater_id tuples in a round-robin fashion.

Expand Down Expand Up @@ -358,7 +400,7 @@ def get_repeater_ids_by_domain():
}


def process_repeater(repeater, lock_token):
def process_repeater(repeater_id, lock_token):
"""
Initiates a Celery chord to process a repeater.
"""
Expand All @@ -370,14 +412,10 @@ def get_task_signature(repeat_record):
}[repeat_record.state]
return task_.s(repeat_record.id, repeat_record.domain)

# Fetch an extra row to determine whether there are more repeat
# records to send after this batch
repeat_records = repeater.repeat_records_ready[:repeater.num_workers + 1]
more = len(repeat_records) > repeater.num_workers

repeat_records = repeat_records[:repeater.num_workers]
repeater = Repeater.objects.get(id=repeater_id)
repeat_records = repeater.repeat_records_ready[:repeater.num_workers]
header_tasks = [get_task_signature(rr) for rr in repeat_records]
callback = update_repeater.s(repeater.repeater_id, lock_token, more)
callback = update_repeater.s(repeater_id, lock_token)
chord(header_tasks, callback)()


Expand Down Expand Up @@ -467,15 +505,10 @@ def _get_wait_duration_seconds(repeat_record):


@task(queue=settings.CELERY_REPEAT_RECORD_QUEUE)
def update_repeater(repeat_record_states, repeater_id, lock_token, more):
def update_repeater(repeat_record_states, repeater_id, lock_token):
"""
Determines whether the repeater should back off, based on the
results of ``process_ready_repeat_record()``.

If ``more`` is ``True``, there are more repeat records ready to be
sent. Initiates another Celery chord to continue processing the
repeater. If ``more`` is ``False``, releases the lock for the
repeater.
"""
repeater = Repeater.objects.get(id=repeater_id)
try:
Expand All @@ -492,15 +525,10 @@ def update_repeater(repeat_record_states, repeater_id, lock_token, more):
'commcare.repeaters.process_repeaters.repeater_backoff',
tags={'domain': repeater.domain},
)
more = False
repeater.set_backoff()
finally:
lock = RepeaterLock(repeater_id, lock_token)
if more:
lock.reacquire()
process_repeater(repeater, lock_token)
else:
lock.release()
lock.release()


class RepeaterLock:
Expand Down
Loading
Loading