Skip to content

Commit

Permalink
Dont exceeed the number of pubsub messages per pub request.
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathanmetzman committed Nov 22, 2024
1 parent 9dfcae6 commit 82846a6
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 22 deletions.
8 changes: 6 additions & 2 deletions src/clusterfuzz/_internal/base/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import contextlib
import datetime
import itertools
import json
import random
import threading
Expand Down Expand Up @@ -48,6 +49,8 @@
MAX_LEASED_TASKS_LIMIT = 1000
MAX_TASKS_LIMIT = 100000

MAX_PUBSUB_MESSAGES_PER_REQ = 1000

# Various variables for task leasing and completion times (in seconds).
TASK_COMPLETION_BUFFER = 90 * 60
TASK_CREATION_WAIT_INTERVAL = 2 * 60
Expand Down Expand Up @@ -653,8 +656,9 @@ def bulk_add_tasks(tasks, queue=None, eta_now=False):

pubsub_client = pubsub.PubSubClient()
pubsub_messages = [task.to_pubsub_message() for task in tasks]
pubsub_client.publish(
pubsub.topic_name(utils.get_application_id(), queue), pubsub_messages)
topic_name = pubsub.topic_name(utils.get_application_id(), queue)
for batch in utils.batched(pubsub_messages, MAX_PUBSUB_MESSAGES_PER_REQ):
pubsub_client.publish(topic_name, batch)


def add_task(command,
Expand Down
19 changes: 19 additions & 0 deletions src/clusterfuzz/_internal/base/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1020,3 +1020,22 @@ def cpu_count():

return environment.get_value('CPU_COUNT_OVERRIDE',
multiprocessing.cpu_count())


def batched(iterator, batch_size):
"""Implementation of itertools.py's batched that was added after
Python3.11."""
# TODO(metzman): Replace this with itertools.batched.
assert batch_size > -1
idx = 0
batch = []
for item in iterator:
idx += 1
batch.append(item)
if idx == batch_size:
idx = 0
yield batch
batch = []

if batch:
yield batch
3 changes: 2 additions & 1 deletion src/clusterfuzz/_internal/fuzzing/corpus_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,8 @@ def get_proto_corpus(bucket_name,
# again.
if max_download_urls is not None:
urls = itertools.islice(urls, max_download_urls)
corpus_urls = dict(storage.sign_urls_for_existing_files(urls, include_delete_urls))
corpus_urls = dict(
storage.sign_urls_for_existing_files(urls, include_delete_urls))

upload_urls = storage.get_arbitrary_signed_upload_urls(
gcs_url, num_uploads=max_upload_urls)
Expand Down
21 changes: 2 additions & 19 deletions src/clusterfuzz/_internal/google_cloud_utils/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,24 +104,6 @@ def create_uworker_main_batch_job(module, job_type, input_download_url):
return result[0]


def _bunched(iterator, bunch_size):
"""Implementation of itertools.py's batched that was added after Python3.7."""
# TODO(metzman): Replace this with itertools.batched.
assert bunch_size > -1
idx = 0
bunch = []
for item in iterator:
idx += 1
bunch.append(item)
if idx == bunch_size:
idx = 0
yield bunch
bunch = []

if bunch:
yield bunch


def create_uworker_main_batch_jobs(batch_tasks: List[BatchTask]):
"""Creates batch jobs."""
job_specs = collections.defaultdict(list)
Expand All @@ -135,7 +117,8 @@ def create_uworker_main_batch_jobs(batch_tasks: List[BatchTask]):

logs.info('Batching utask_mains.')
for spec, input_urls in job_specs.items():
for input_urls_portion in _bunched(input_urls, MAX_CONCURRENT_VMS_PER_JOB):
for input_urls_portion in utils.batched(input_urls,
MAX_CONCURRENT_VMS_PER_JOB):
jobs.append(_create_job(spec, input_urls_portion))

return jobs
Expand Down

0 comments on commit 82846a6

Please sign in to comment.