Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add generic tworker. #4418

Merged
merged 6 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ IMAGES=(
gcr.io/clusterfuzz-images/oss-fuzz/worker
gcr.io/clusterfuzz-images/ci
gcr.io/clusterfuzz-images/utask-main-scheduler
gcr.io/clusterfuzz-images/tworker
gcr.io/clusterfuzz-images/fuchsia
)

Expand Down
17 changes: 17 additions & 0 deletions docker/tworker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM gcr.io/clusterfuzz-images/base

# Worker that only reads from queues, preprocesses and postprocesses.
ENV TWORKER=1
67 changes: 34 additions & 33 deletions src/clusterfuzz/_internal/base/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@

POSTPROCESS_QUEUE = 'postprocess'
UTASK_MAINS_QUEUE = 'utask_main'
PREPROCESS_QUEUE = 'preprocess'

# See https://github.com/google/clusterfuzz/issues/3347 for usage
SUBQUEUE_IDENTIFIER = ':'
Expand Down Expand Up @@ -281,8 +282,7 @@ def is_done_collecting_messages():
def get_postprocess_task():
"""Gets a postprocess task if one exists."""
# This should only be run on non-preemptible bots.
if not (task_utils.is_remotely_executing_utasks() or
task_utils.get_opted_in_tasks()):
if not task_utils.is_remotely_executing_utasks():
return None
# Postprocess is platform-agnostic, so we run all such tasks on our
# most generic and plentiful bots only. In other words, we avoid
Expand All @@ -304,9 +304,29 @@ def allow_all_tasks():
return not environment.get_value('PREEMPTIBLE')


def get_preprocess_task():
pubsub_puller = PubSubPuller(PREPROCESS_QUEUE)
messages = pubsub_puller.get_messages(max_messages=1)
if not messages:
return None
task = get_task_from_message(messages[0])
if task:
logs.info('Pulled from preprocess queue.')
return task


def tworker_get_task():
assert environment.is_tworker()
task = get_postprocess_task()
if task:
return task

return get_preprocess_task()


def get_task():
"""Returns an ordinary (non-postprocess, non-utask_main) task that is pulled
from a ClusterFuzz task queue."""
"""Returns an ordinary (non-utask_main) task that is pulled from a ClusterFuzz
task queue."""
task = get_command_override()
if task:
return task
Expand All @@ -319,6 +339,7 @@ def get_task():
task = get_postprocess_task()
if task:
return task

# Check the high-end jobs queue for bots with multiplier greater than 1.
thread_multiplier = environment.get_value('THREAD_MULTIPLIER')
if thread_multiplier and thread_multiplier > 1:
Expand Down Expand Up @@ -368,8 +389,7 @@ def __init__(self,
eta=None,
is_command_override=False,
high_end=False,
extra_info=None,
is_from_queue=False):
extra_info=None):
self.command = command
self.argument = argument
self.job = job
Expand All @@ -378,16 +398,6 @@ def __init__(self,
self.high_end = high_end
self.extra_info = extra_info

# is_from_queue is a temporary hack to keep track of which fuzz tasks came
# from the queue. Previously all fuzz tasks were picked by the bot when
# there was nothing on the queue. With the rearchitecture, we want fuzz
# tasks that were put on the queue by the schedule_fuzz cron job to be
# executed on batch. is_from_queue is used to do this.
# TODO(b/378684001): This code is very ugly, get rid of it when no more
# fuzz tasks are executed on the bots themselves (i.e. when the rearch
# is complete).
self.is_from_queue = is_from_queue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that, once this gets deployed, everything is coming from batch?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No.
It means that I don't need the hacky is_from_queue method to schedule fuzz tasks on batch anymore.
Because instead we are pushing to the preprocess queue.


def __repr__(self):
return f'Task: {self.command} {self.argument} {self.job}'

Expand Down Expand Up @@ -428,13 +438,11 @@ def lease(self):
class PubSubTask(Task):
"""A Pub/Sub task."""

def __init__(self, pubsub_message, is_from_queue=False):
def __init__(self, pubsub_message):
self._pubsub_message = pubsub_message
super().__init__(
self.attribute('command'),
self.attribute('argument'),
self.attribute('job'),
is_from_queue=is_from_queue)
self.attribute('command'), self.attribute('argument'),
self.attribute('job'))

self.extra_info = {
key: value
Expand Down Expand Up @@ -540,7 +548,7 @@ def initialize_task(message) -> PubSubTask:
"""Creates a task from |messages|."""

if message.attributes.get('eventType') != 'OBJECT_FINALIZE':
return PubSubTask(message, is_from_queue=True)
return PubSubTask(message)

# Handle postprocess task.
# The GCS API for pub/sub notifications uses the data field unlike
Expand All @@ -549,7 +557,7 @@ def initialize_task(message) -> PubSubTask:
name = data['name']
bucket = data['bucket']
output_url_argument = storage.get_cloud_storage_file_path(bucket, name)
return PostprocessPubSubTask(output_url_argument, message, is_from_queue=True)
return PostprocessPubSubTask(output_url_argument, message)


class PostprocessPubSubTask(PubSubTask):
Expand All @@ -558,21 +566,14 @@ class PostprocessPubSubTask(PubSubTask):
def __init__(self,
output_url_argument,
pubsub_message,
is_command_override=False,
is_from_queue=False):
is_command_override=False):
command = 'postprocess'
job_type = 'none'
eta = None
high_end = False
grandparent_class = super(PubSubTask, self)
grandparent_class.__init__(
command,
output_url_argument,
job_type,
eta,
is_command_override,
high_end,
is_from_queue=is_from_queue)
grandparent_class.__init__(command, output_url_argument, job_type, eta,
is_command_override, high_end)
self._pubsub_message = pubsub_message


Expand Down
22 changes: 4 additions & 18 deletions src/clusterfuzz/_internal/base/tasks/task_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
any other module in tasks to prevent circular imports and issues with
appengine."""

from clusterfuzz._internal.config import local_config
from clusterfuzz._internal.system import environment


Expand All @@ -26,25 +25,12 @@ def get_command_from_module(full_module_name: str) -> str:
return module_name[:-len('_task')]


def is_remotely_executing_utasks(task=None) -> bool:
def is_remotely_executing_utasks() -> bool:
"""Returns True if the utask_main portions of utasks are being remotely
executed on Google cloud batch."""
if bool(environment.is_production() and
environment.get_value('REMOTE_UTASK_EXECUTION')):
return True
if task is None:
return False
return bool(is_task_opted_into_uworker_execution(task))


def get_opted_in_tasks():
return local_config.ProjectConfig().get('uworker_tasks', [])


def is_task_opted_into_uworker_execution(task: str) -> bool:
# TODO(metzman): Remove this after OSS-Fuzz and Chrome are at parity.
uworker_tasks = get_opted_in_tasks()
return task in uworker_tasks
# TODO(metzman): REMOTE_UTASK_EXECUTION should be a config not an env var.
return (environment.is_production() and
environment.get_value('REMOTE_UTASK_EXECUTION'))


class UworkerMsgParseError(RuntimeError):
Expand Down
27 changes: 18 additions & 9 deletions src/clusterfuzz/_internal/bot/tasks/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,25 @@ def start_web_server_if_needed():
logs.error('Failed to start web server, skipping.')


def get_command_object(task_name):
"""Returns the command object that execute can be called on."""
task = COMMAND_MAP.get(task_name)
if not environment.is_tworker():
return task

if isinstance(task, task_types.TrustedTask):
# We don't need to execute this remotely.
return task

# Force remote execution.
return task_types.UTask(task_name)


def run_command(task_name, task_argument, job_name, uworker_env):
"""Runs the command."""
task = COMMAND_MAP.get(task_name)
task = get_command_object(task_name)
if not task:
logs.error("Unknown command '%s'" % task_name)
logs.error(f'Unknown command "{task_name}"')
return None

# If applicable, ensure this is the only instance of the task running.
Expand Down Expand Up @@ -253,10 +267,8 @@ def process_command(task):
logs.error('Empty task received.')
return None

# TODO(b/378684001): Remove is_from_queue kludge.
return process_command_impl(task.command, task.argument, task.job,
task.high_end, task.is_command_override,
task.is_from_queue)
task.high_end, task.is_command_override)


def _get_task_id(task_name, task_argument, job_name):
Expand All @@ -267,13 +279,12 @@ def _get_task_id(task_name, task_argument, job_name):
# TODO(mbarbella): Rewrite this function to avoid nesting issues.
@set_task_payload
def process_command_impl(task_name, task_argument, job_name, high_end,
is_command_override, is_from_queue):
is_command_override):
"""Implementation of process_command."""
uworker_env = None
environment.set_value('TASK_NAME', task_name)
environment.set_value('TASK_ARGUMENT', task_argument)
environment.set_value('JOB_NAME', job_name)
environment.set_value('IS_FROM_QUEUE', is_from_queue)
if task_name in {'uworker_main', 'postprocess'}:
# We want the id of the task we are processing, not "uworker_main", or
# "postprocess".
Expand Down Expand Up @@ -455,5 +466,3 @@ def process_command_impl(task_name, task_argument, job_name, high_end,
cleanup_task_state()
if 'CF_TASK_ID' in os.environ:
del os.environ['CF_TASK_ID']
if 'IS_FROM_QUEUE' in os.environ:
del os.environ['IS_FROM_QUEUE']
20 changes: 5 additions & 15 deletions src/clusterfuzz/_internal/bot/tasks/task_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class TrustedTask(BaseTask):
def execute(self, task_argument, job_type, uworker_env):
# Simple tasks can just use the environment they don't need the uworker env.
del uworker_env
assert not environment.is_tworker()
self.module.execute_task(task_argument, job_type)


Expand All @@ -58,6 +59,8 @@ def execute(self, task_argument, job_type, uworker_env):
raise NotImplementedError('Child class must implement.')

def execute_locally(self, task_argument, job_type, uworker_env):
"""Executes the utask locally (on this machine, not on batch)."""
assert not environment.is_tworker()
uworker_input = utasks.tworker_preprocess_no_io(self.module, task_argument,
job_type, uworker_env)
if uworker_input is None:
Expand Down Expand Up @@ -119,7 +122,7 @@ class UTask(BaseUTask):

@staticmethod
def is_execution_remote(command=None):
return task_utils.is_remotely_executing_utasks(command)
return task_utils.is_remotely_executing_utasks()

def execute(self, task_argument, job_type, uworker_env):
"""Executes a utask."""
Expand Down Expand Up @@ -156,19 +159,6 @@ def preprocess(self, task_argument, job_type, uworker_env):
return download_url


# TODO(b/378684001): Remove this, it's needed for testing but is otherwise a bad
# design.
class UTaskMostlyLocalExecutor(UTask):

@staticmethod
def is_execution_remote(command=None):
del command
if environment.get_value('IS_FROM_QUEUE'):
logs.info('IS FROM QUEUE')
return True
return False


class PostprocessTask(BaseTask):
"""Represents postprocessing of an untrusted task."""

Expand Down Expand Up @@ -211,7 +201,7 @@ def execute(self, task_argument, job_type, uworker_env):
'analyze': UTask,
'blame': TrustedTask,
'corpus_pruning': UTask,
'fuzz': UTaskMostlyLocalExecutor,
'fuzz': UTaskLocalExecutor,
'impact': TrustedTask,
'minimize': UTask,
'progression': UTask,
Expand Down
27 changes: 8 additions & 19 deletions src/clusterfuzz/_internal/cron/schedule_fuzz.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

from googleapiclient import discovery

from clusterfuzz._internal.base import concurrency
from clusterfuzz._internal.base import tasks
from clusterfuzz._internal.base import utils
from clusterfuzz._internal.config import local_config
Expand Down Expand Up @@ -167,14 +166,12 @@ def get_fuzz_tasks(self) -> Dict[str, tasks.Task]:

choices = random.choices(
fuzz_task_candidates, weights=weights, k=num_instances)
queues_to_tasks = collections.defaultdict(list)
for fuzz_task_candidate in choices:
queue_tasks = queues_to_tasks[fuzz_task_candidate.queue]

task = tasks.Task('fuzz', fuzz_task_candidate.fuzzer,
fuzz_task_candidate.job)
queue_tasks.append(task)
return queues_to_tasks
fuzz_tasks = [
tasks.Task('fuzz', fuzz_task_candidate.fuzzer, fuzz_task_candidate.job)
for fuzz_task_candidate in choices
]
# TODO(metzman): Remove the queue stuff if it's uneeded for Chrome.
return fuzz_tasks


def get_fuzz_tasks(available_cpus: int) -> [tasks.Task]:
Expand Down Expand Up @@ -210,10 +207,8 @@ def schedule_fuzz_tasks() -> bool:
logs.error('No fuzz tasks found to schedule.')
return False

# TODO(b/378684001): Change this to using one queue when oss-fuzz's untrusted
# worker model is deleted.
with concurrency.make_pool() as pool:
list(pool.map(bulk_add, fuzz_tasks.items()))
logs.info(f'Adding {fuzz_tasks} to preprocess queue.')
tasks.bulk_add_tasks(fuzz_tasks, queue=tasks.PREPROCESS_QUEUE, eta_now=True)
logs.info(f'Scheduled {len(fuzz_tasks)} fuzz tasks.')

end = time.time()
Expand All @@ -222,11 +217,5 @@ def schedule_fuzz_tasks() -> bool:
return True


def bulk_add(queue_and_tasks):
queue, task_list = queue_and_tasks
logs.info(f'Adding {task_list} to {queue}.')
tasks.bulk_add_tasks(task_list, queue=queue, eta_now=True)


def main():
return schedule_fuzz_tasks()
4 changes: 4 additions & 0 deletions src/clusterfuzz/_internal/system/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -1177,3 +1177,7 @@ def can_testcase_run_on_platform(testcase_platform_id, current_platform_id):
current_platform_id)

return False


def is_tworker():
return get_value('TWORKER', False)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be True? This is a bit confusing, by reading environment.py, I do know what to expect from ast.eval when it gets '1' as input

return ast.literal_eval(value_string)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. THis is the default value, which I want to be False if the env var isn't defined.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought i replied to this earlier, but no, I want the default to be False.

Loading
Loading