diff --git a/docker/build.sh b/docker/build.sh index d0b1f46bfe..4afe95fe2d 100755 --- a/docker/build.sh +++ b/docker/build.sh @@ -27,6 +27,7 @@ IMAGES=( gcr.io/clusterfuzz-images/oss-fuzz/worker gcr.io/clusterfuzz-images/ci gcr.io/clusterfuzz-images/utask-main-scheduler + gcr.io/clusterfuzz-images/tworker gcr.io/clusterfuzz-images/fuchsia ) diff --git a/docker/tworker/Dockerfile b/docker/tworker/Dockerfile new file mode 100644 index 0000000000..0c9c008cf3 --- /dev/null +++ b/docker/tworker/Dockerfile @@ -0,0 +1,17 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +FROM gcr.io/clusterfuzz-images/base + +# Worker that only reads from queues, preprocesses and postprocesses. +ENV TWORKER=1 diff --git a/src/clusterfuzz/_internal/base/tasks/__init__.py b/src/clusterfuzz/_internal/base/tasks/__init__.py index bc0a269c7c..378ba94c96 100644 --- a/src/clusterfuzz/_internal/base/tasks/__init__.py +++ b/src/clusterfuzz/_internal/base/tasks/__init__.py @@ -83,6 +83,7 @@ POSTPROCESS_QUEUE = 'postprocess' UTASK_MAINS_QUEUE = 'utask_main' +PREPROCESS_QUEUE = 'preprocess' # See https://github.com/google/clusterfuzz/issues/3347 for usage SUBQUEUE_IDENTIFIER = ':' @@ -281,8 +282,7 @@ def is_done_collecting_messages(): def get_postprocess_task(): """Gets a postprocess task if one exists.""" # This should only be run on non-preemptible bots. - if not (task_utils.is_remotely_executing_utasks() or - task_utils.get_opted_in_tasks()): + if not task_utils.is_remotely_executing_utasks(): return None # Postprocess is platform-agnostic, so we run all such tasks on our # most generic and plentiful bots only. In other words, we avoid @@ -304,9 +304,29 @@ def allow_all_tasks(): return not environment.get_value('PREEMPTIBLE') +def get_preprocess_task(): + pubsub_puller = PubSubPuller(PREPROCESS_QUEUE) + messages = pubsub_puller.get_messages(max_messages=1) + if not messages: + return None + task = get_task_from_message(messages[0]) + if task: + logs.info('Pulled from preprocess queue.') + return task + + +def tworker_get_task(): + assert environment.is_tworker() + task = get_postprocess_task() + if task: + return task + + return get_preprocess_task() + + def get_task(): - """Returns an ordinary (non-postprocess, non-utask_main) task that is pulled - from a ClusterFuzz task queue.""" + """Returns an ordinary (non-utask_main) task that is pulled from a ClusterFuzz + task queue.""" task = get_command_override() if task: return task @@ -319,6 +339,7 @@ def get_task(): task = get_postprocess_task() if task: return task + # Check the high-end jobs queue for bots with multiplier greater than 1. thread_multiplier = environment.get_value('THREAD_MULTIPLIER') if thread_multiplier and thread_multiplier > 1: @@ -368,8 +389,7 @@ def __init__(self, eta=None, is_command_override=False, high_end=False, - extra_info=None, - is_from_queue=False): + extra_info=None): self.command = command self.argument = argument self.job = job @@ -378,16 +398,6 @@ def __init__(self, self.high_end = high_end self.extra_info = extra_info - # is_from_queue is a temporary hack to keep track of which fuzz tasks came - # from the queue. Previously all fuzz tasks were picked by the bot when - # there was nothing on the queue. With the rearchitecture, we want fuzz - # tasks that were put on the queue by the schedule_fuzz cron job to be - # executed on batch. is_from_queue is used to do this. - # TODO(b/378684001): This code is very ugly, get rid of it when no more - # fuzz tasks are executed on the bots themselves (i.e. when the rearch - # is complete). - self.is_from_queue = is_from_queue - def __repr__(self): return f'Task: {self.command} {self.argument} {self.job}' @@ -428,13 +438,11 @@ def lease(self): class PubSubTask(Task): """A Pub/Sub task.""" - def __init__(self, pubsub_message, is_from_queue=False): + def __init__(self, pubsub_message): self._pubsub_message = pubsub_message super().__init__( - self.attribute('command'), - self.attribute('argument'), - self.attribute('job'), - is_from_queue=is_from_queue) + self.attribute('command'), self.attribute('argument'), + self.attribute('job')) self.extra_info = { key: value @@ -540,7 +548,7 @@ def initialize_task(message) -> PubSubTask: """Creates a task from |messages|.""" if message.attributes.get('eventType') != 'OBJECT_FINALIZE': - return PubSubTask(message, is_from_queue=True) + return PubSubTask(message) # Handle postprocess task. # The GCS API for pub/sub notifications uses the data field unlike @@ -549,7 +557,7 @@ def initialize_task(message) -> PubSubTask: name = data['name'] bucket = data['bucket'] output_url_argument = storage.get_cloud_storage_file_path(bucket, name) - return PostprocessPubSubTask(output_url_argument, message, is_from_queue=True) + return PostprocessPubSubTask(output_url_argument, message) class PostprocessPubSubTask(PubSubTask): @@ -558,21 +566,14 @@ class PostprocessPubSubTask(PubSubTask): def __init__(self, output_url_argument, pubsub_message, - is_command_override=False, - is_from_queue=False): + is_command_override=False): command = 'postprocess' job_type = 'none' eta = None high_end = False grandparent_class = super(PubSubTask, self) - grandparent_class.__init__( - command, - output_url_argument, - job_type, - eta, - is_command_override, - high_end, - is_from_queue=is_from_queue) + grandparent_class.__init__(command, output_url_argument, job_type, eta, + is_command_override, high_end) self._pubsub_message = pubsub_message diff --git a/src/clusterfuzz/_internal/base/tasks/task_utils.py b/src/clusterfuzz/_internal/base/tasks/task_utils.py index 9248dad52d..bc5b36f4a8 100644 --- a/src/clusterfuzz/_internal/base/tasks/task_utils.py +++ b/src/clusterfuzz/_internal/base/tasks/task_utils.py @@ -15,7 +15,6 @@ any other module in tasks to prevent circular imports and issues with appengine.""" -from clusterfuzz._internal.config import local_config from clusterfuzz._internal.system import environment @@ -26,25 +25,12 @@ def get_command_from_module(full_module_name: str) -> str: return module_name[:-len('_task')] -def is_remotely_executing_utasks(task=None) -> bool: +def is_remotely_executing_utasks() -> bool: """Returns True if the utask_main portions of utasks are being remotely executed on Google cloud batch.""" - if bool(environment.is_production() and - environment.get_value('REMOTE_UTASK_EXECUTION')): - return True - if task is None: - return False - return bool(is_task_opted_into_uworker_execution(task)) - - -def get_opted_in_tasks(): - return local_config.ProjectConfig().get('uworker_tasks', []) - - -def is_task_opted_into_uworker_execution(task: str) -> bool: - # TODO(metzman): Remove this after OSS-Fuzz and Chrome are at parity. - uworker_tasks = get_opted_in_tasks() - return task in uworker_tasks + # TODO(metzman): REMOTE_UTASK_EXECUTION should be a config not an env var. + return (environment.is_production() and + environment.get_value('REMOTE_UTASK_EXECUTION')) class UworkerMsgParseError(RuntimeError): diff --git a/src/clusterfuzz/_internal/bot/tasks/commands.py b/src/clusterfuzz/_internal/bot/tasks/commands.py index 01d69c5c86..8c887dec1e 100644 --- a/src/clusterfuzz/_internal/bot/tasks/commands.py +++ b/src/clusterfuzz/_internal/bot/tasks/commands.py @@ -190,11 +190,25 @@ def start_web_server_if_needed(): logs.error('Failed to start web server, skipping.') +def get_command_object(task_name): + """Returns the command object that execute can be called on.""" + task = COMMAND_MAP.get(task_name) + if not environment.is_tworker(): + return task + + if isinstance(task, task_types.TrustedTask): + # We don't need to execute this remotely. + return task + + # Force remote execution. + return task_types.UTask(task_name) + + def run_command(task_name, task_argument, job_name, uworker_env): """Runs the command.""" - task = COMMAND_MAP.get(task_name) + task = get_command_object(task_name) if not task: - logs.error("Unknown command '%s'" % task_name) + logs.error(f'Unknown command "{task_name}"') return None # If applicable, ensure this is the only instance of the task running. @@ -253,10 +267,8 @@ def process_command(task): logs.error('Empty task received.') return None - # TODO(b/378684001): Remove is_from_queue kludge. return process_command_impl(task.command, task.argument, task.job, - task.high_end, task.is_command_override, - task.is_from_queue) + task.high_end, task.is_command_override) def _get_task_id(task_name, task_argument, job_name): @@ -267,13 +279,12 @@ def _get_task_id(task_name, task_argument, job_name): # TODO(mbarbella): Rewrite this function to avoid nesting issues. @set_task_payload def process_command_impl(task_name, task_argument, job_name, high_end, - is_command_override, is_from_queue): + is_command_override): """Implementation of process_command.""" uworker_env = None environment.set_value('TASK_NAME', task_name) environment.set_value('TASK_ARGUMENT', task_argument) environment.set_value('JOB_NAME', job_name) - environment.set_value('IS_FROM_QUEUE', is_from_queue) if task_name in {'uworker_main', 'postprocess'}: # We want the id of the task we are processing, not "uworker_main", or # "postprocess". @@ -455,5 +466,3 @@ def process_command_impl(task_name, task_argument, job_name, high_end, cleanup_task_state() if 'CF_TASK_ID' in os.environ: del os.environ['CF_TASK_ID'] - if 'IS_FROM_QUEUE' in os.environ: - del os.environ['IS_FROM_QUEUE'] diff --git a/src/clusterfuzz/_internal/bot/tasks/task_types.py b/src/clusterfuzz/_internal/bot/tasks/task_types.py index 648baf9c23..a338ef0ac3 100644 --- a/src/clusterfuzz/_internal/bot/tasks/task_types.py +++ b/src/clusterfuzz/_internal/bot/tasks/task_types.py @@ -46,6 +46,7 @@ class TrustedTask(BaseTask): def execute(self, task_argument, job_type, uworker_env): # Simple tasks can just use the environment they don't need the uworker env. del uworker_env + assert not environment.is_tworker() self.module.execute_task(task_argument, job_type) @@ -58,6 +59,8 @@ def execute(self, task_argument, job_type, uworker_env): raise NotImplementedError('Child class must implement.') def execute_locally(self, task_argument, job_type, uworker_env): + """Executes the utask locally (on this machine, not on batch).""" + assert not environment.is_tworker() uworker_input = utasks.tworker_preprocess_no_io(self.module, task_argument, job_type, uworker_env) if uworker_input is None: @@ -119,7 +122,7 @@ class UTask(BaseUTask): @staticmethod def is_execution_remote(command=None): - return task_utils.is_remotely_executing_utasks(command) + return task_utils.is_remotely_executing_utasks() def execute(self, task_argument, job_type, uworker_env): """Executes a utask.""" @@ -156,19 +159,6 @@ def preprocess(self, task_argument, job_type, uworker_env): return download_url -# TODO(b/378684001): Remove this, it's needed for testing but is otherwise a bad -# design. -class UTaskMostlyLocalExecutor(UTask): - - @staticmethod - def is_execution_remote(command=None): - del command - if environment.get_value('IS_FROM_QUEUE'): - logs.info('IS FROM QUEUE') - return True - return False - - class PostprocessTask(BaseTask): """Represents postprocessing of an untrusted task.""" @@ -211,7 +201,7 @@ def execute(self, task_argument, job_type, uworker_env): 'analyze': UTask, 'blame': TrustedTask, 'corpus_pruning': UTask, - 'fuzz': UTaskMostlyLocalExecutor, + 'fuzz': UTaskLocalExecutor, 'impact': TrustedTask, 'minimize': UTask, 'progression': UTask, diff --git a/src/clusterfuzz/_internal/cron/schedule_fuzz.py b/src/clusterfuzz/_internal/cron/schedule_fuzz.py index d5116e197e..6d8269357a 100644 --- a/src/clusterfuzz/_internal/cron/schedule_fuzz.py +++ b/src/clusterfuzz/_internal/cron/schedule_fuzz.py @@ -20,7 +20,6 @@ from googleapiclient import discovery -from clusterfuzz._internal.base import concurrency from clusterfuzz._internal.base import tasks from clusterfuzz._internal.base import utils from clusterfuzz._internal.config import local_config @@ -167,14 +166,12 @@ def get_fuzz_tasks(self) -> Dict[str, tasks.Task]: choices = random.choices( fuzz_task_candidates, weights=weights, k=num_instances) - queues_to_tasks = collections.defaultdict(list) - for fuzz_task_candidate in choices: - queue_tasks = queues_to_tasks[fuzz_task_candidate.queue] - - task = tasks.Task('fuzz', fuzz_task_candidate.fuzzer, - fuzz_task_candidate.job) - queue_tasks.append(task) - return queues_to_tasks + fuzz_tasks = [ + tasks.Task('fuzz', fuzz_task_candidate.fuzzer, fuzz_task_candidate.job) + for fuzz_task_candidate in choices + ] + # TODO(metzman): Remove the queue stuff if it's uneeded for Chrome. + return fuzz_tasks def get_fuzz_tasks(available_cpus: int) -> [tasks.Task]: @@ -210,10 +207,8 @@ def schedule_fuzz_tasks() -> bool: logs.error('No fuzz tasks found to schedule.') return False - # TODO(b/378684001): Change this to using one queue when oss-fuzz's untrusted - # worker model is deleted. - with concurrency.make_pool() as pool: - list(pool.map(bulk_add, fuzz_tasks.items())) + logs.info(f'Adding {fuzz_tasks} to preprocess queue.') + tasks.bulk_add_tasks(fuzz_tasks, queue=tasks.PREPROCESS_QUEUE, eta_now=True) logs.info(f'Scheduled {len(fuzz_tasks)} fuzz tasks.') end = time.time() @@ -222,11 +217,5 @@ def schedule_fuzz_tasks() -> bool: return True -def bulk_add(queue_and_tasks): - queue, task_list = queue_and_tasks - logs.info(f'Adding {task_list} to {queue}.') - tasks.bulk_add_tasks(task_list, queue=queue, eta_now=True) - - def main(): return schedule_fuzz_tasks() diff --git a/src/clusterfuzz/_internal/system/environment.py b/src/clusterfuzz/_internal/system/environment.py index e889749f20..abaae14a2e 100644 --- a/src/clusterfuzz/_internal/system/environment.py +++ b/src/clusterfuzz/_internal/system/environment.py @@ -1177,3 +1177,7 @@ def can_testcase_run_on_platform(testcase_platform_id, current_platform_id): current_platform_id) return False + + +def is_tworker(): + return get_value('TWORKER', False) diff --git a/src/clusterfuzz/_internal/tests/appengine/common/tasks_test.py b/src/clusterfuzz/_internal/tests/appengine/common/tasks_test.py index 6769fb75b6..2a99dc9dfb 100644 --- a/src/clusterfuzz/_internal/tests/appengine/common/tasks_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/common/tasks_test.py @@ -52,13 +52,11 @@ def setUp(self): 'clusterfuzz._internal.base.persistent_cache.get_value', 'clusterfuzz._internal.base.persistent_cache.set_value', 'clusterfuzz._internal.base.utils.utcnow', - 'clusterfuzz._internal.base.tasks.task_utils.get_opted_in_tasks', 'time.sleep', ]) self.mock.get_value.return_value = None self.mock.sleep.return_value = None - self.mock.get_opted_in_tasks.return_value = False data_types.Job(name='job').put() client = pubsub.PubSubClient() diff --git a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py index ae74ec4ddd..a849875c0b 100644 --- a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py @@ -69,16 +69,12 @@ def test_get_fuzz_tasks(self): num_cpus = 10 scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_cpus) - results = scheduler.get_fuzz_tasks() + tasks = scheduler.get_fuzz_tasks() comparable_results = [] - for tasks in results.values(): - comparable_tasks = [] - for task in tasks: - comparable_tasks.append((task.command, task.argument, task.job)) - comparable_results.append(comparable_tasks) - - expected_results = [[('fuzz', 'libFuzzer', 'myjob')] * 5] - self.assertListEqual(list(results.keys()), ['jobs-linux']) + for task in tasks: + comparable_results.append((task.command, task.argument, task.job)) + + expected_results = [('fuzz', 'libFuzzer', 'myjob')] * 5 self.assertListEqual(comparable_results, expected_results) diff --git a/src/clusterfuzz/_internal/tests/core/base/tasks/task_utils_test.py b/src/clusterfuzz/_internal/tests/core/base/tasks/task_utils_test.py index 8c914203b9..31a37c8efd 100644 --- a/src/clusterfuzz/_internal/tests/core/base/tasks/task_utils_test.py +++ b/src/clusterfuzz/_internal/tests/core/base/tasks/task_utils_test.py @@ -12,12 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. """Tests for task_utils.""" -import os import unittest from clusterfuzz._internal.base.tasks import task_utils from clusterfuzz._internal.bot.tasks import commands -from clusterfuzz._internal.tests.test_libs import helpers class GetCommandFromModuleTest(unittest.TestCase): @@ -35,19 +33,3 @@ def test_get_command_from_module(self): task_utils.get_command_from_module('postprocess') with self.assertRaises(ValueError): task_utils.get_command_from_module('uworker_main') - - -class IsTaskOptedIntoUworkerExecution(unittest.TestCase): - """Tests that is_task_opted_into_uworker_execution only returns True for the - tasks we are testing in oss-fuzz.""" - - def setUp(self): - helpers.patch_environ(self) - - def test_opt_in(self): - os.environ['JOB_NAME'] = 'libfuzzer_asan_skia' - self.assertTrue(task_utils.is_task_opted_into_uworker_execution('analyze')) - - def test_no_opt_in(self): - os.environ['JOB_NAME'] = 'libfuzzer_asan_skia' - self.assertFalse(task_utils.is_task_opted_into_uworker_execution('fuzz')) diff --git a/src/python/bot/startup/run_bot.py b/src/python/bot/startup/run_bot.py index 886ee44120..8c674fba2c 100644 --- a/src/python/bot/startup/run_bot.py +++ b/src/python/bot/startup/run_bot.py @@ -29,7 +29,7 @@ from clusterfuzz._internal.base import dates from clusterfuzz._internal.base import errors -from clusterfuzz._internal.base import tasks as taskslib +from clusterfuzz._internal.base import tasks from clusterfuzz._internal.base import untrusted from clusterfuzz._internal.base import utils from clusterfuzz._internal.base.tasks import task_utils @@ -89,7 +89,7 @@ def schedule_utask_mains(): from clusterfuzz._internal.google_cloud_utils import batch logs.info('Attempting to combine batch tasks.') - utask_mains = taskslib.get_utask_mains() + utask_mains = tasks.get_utask_mains() if not utask_mains: logs.info('No utask mains.') return @@ -135,7 +135,11 @@ def task_loop(): schedule_utask_mains() continue - task = taskslib.get_task() + if environment.is_tworker(): + task = tasks.tworker_get_task() + else: + task = tasks.get_task() + if not task: continue