diff --git a/acto/engine.py b/acto/engine.py index 5ad5021346..2c8ade72f9 100644 --- a/acto/engine.py +++ b/acto/engine.py @@ -286,7 +286,7 @@ def run( self.input_model.set_worker_id(self.worker_id) apiclient = None - self.input_model.set_mode(mode) + # self.input_model.set_mode(mode) if mode != InputModel.NORMAL: self.workdir = os.path.join(self.workdir, mode) os.makedirs(self.base_workdir, exist_ok=True) @@ -1088,6 +1088,7 @@ def run( if "normal" in modes: threads = [] + self.input_model.set_mode(InputModel.NORMAL) for runner in runners: t = threading.Thread( target=runner.run, args=[errors, InputModel.NORMAL] @@ -1102,6 +1103,7 @@ def run( if "overspecified" in modes: threads = [] + self.input_model.set_mode(InputModel.OVERSPECIFIED) for runner in runners: t = threading.Thread( target=runner.run, args=([errors, InputModel.OVERSPECIFIED]) @@ -1116,6 +1118,7 @@ def run( if "copiedover" in modes: threads = [] + self.input_model.set_mode(InputModel.COPIED_OVER) for runner in runners: t = threading.Thread( target=runner.run, args=([errors, InputModel.COPIED_OVER]) @@ -1130,6 +1133,7 @@ def run( if InputModel.ADDITIONAL_SEMANTIC in modes: threads = [] + self.input_model.set_mode(InputModel.ADDITIONAL_SEMANTIC) for runner in runners: t = threading.Thread( target=runner.run, diff --git a/acto/input/input.py b/acto/input/input.py index f7271f28f0..5760f34f43 100644 --- a/acto/input/input.py +++ b/acto/input/input.py @@ -22,7 +22,7 @@ from acto.utils import get_thread_logger from .testcase import TestCase -from .testplan import DeterministicTestPlan, TestGroup, TestPlan +from .testplan import DeterministicTestPlan, SharedTestPlan, TestGroup, TestPlan from .value_with_schema import attach_schema_to_value @@ -49,6 +49,8 @@ class InputMetadata(pydantic.BaseModel): # The number of test cases to form a group CHUNK_SIZE = 10 +# The number of groups to fetch when empty to form a group +FETCH_SIZE = 1 class InputModel(abc.ABC): """An abstract class for input model""" @@ -269,15 +271,14 @@ def set_worker_id(self, worker_id: int): # Thread local variables self.thread_vars.id = worker_id # so that we can run the test case itself right after the setup - self.thread_vars.normal_test_plan = DeterministicTestPlan() + + # thread_vars.test_plan is the local queue, fetch from global queue when empty + self.thread_vars.test_plan = DeterministicTestPlan() self.thread_vars.semantic_test_plan = TestPlan( self.root_schema.to_tree() ) - for group in self.normal_test_plan_partitioned[worker_id]: - self.thread_vars.normal_test_plan.add_testcase_group( - TestGroup(group) - ) + def generate_test_plan( self, @@ -293,9 +294,7 @@ def generate_test_plan( normal_testcases = {} - test_cases = get_testcases( - self.get_schema_by_path(self.mount), self.full_matched_schemas - ) + test_cases = get_testcases(self.root_schema, self.full_matched_schemas) num_test_cases = 0 num_run_test_cases = 0 @@ -374,25 +373,21 @@ def split_into_subgroups( return subgroups normal_subgroups = split_into_subgroups(normal_test_plan_items) - - # Initialize the three test plans, and assign test cases to them - # according to the number of workers - for i in range(self.num_workers): - self.normal_test_plan_partitioned.append([]) - - for i in range(0, len(normal_subgroups)): - self.normal_test_plan_partitioned[i % self.num_workers].append( - normal_subgroups[i] + + # global job queue + self.normal_test_plan = SharedTestPlan() + self.semantic_test_plan = TestPlan( + self.root_schema.to_tree() + ) + for group in normal_subgroups: + self.normal_test_plan.add_testcase_group( + TestGroup(group) ) - # appending empty lists to avoid no test cases distributed to certain - # work nodes - assert self.num_workers == len(self.normal_test_plan_partitioned) - return { "normal_testcases": normal_testcases, } - + def next_test( self, ) -> Optional[List[Tuple[TestGroup, tuple[str, TestCase]]]]: @@ -406,39 +401,51 @@ def next_test( """ logger = get_thread_logger(with_prefix=True) - logger.info("Progress [%d] cases left", len(self.thread_vars.test_plan)) + logger.info("Global queue [%d] cases left", len(self.test_plan)) + logger.info("Local queue [%d] cases left", len(self.thread_vars.test_plan)) selected_group: TestGroup = self.thread_vars.test_plan.next_group() - if selected_group is None: - return None - elif len(selected_group) == 0: + if selected_group is None or len(selected_group) == 0: + + for i in range(FETCH_SIZE): + new_group = self.test_plan.next_group() + if new_group is None: + break + self.thread_vars.test_plan.add_testcase_group(new_group) + return None - else: + + else: testcase = selected_group.get_next_testcase() return [(selected_group, testcase)] + + def set_mode(self, mode: str): if mode == InputModel.NORMAL: - self.thread_vars.test_plan = self.thread_vars.normal_test_plan + self.test_plan = self.normal_test_plan elif mode == "OVERSPECIFIED": - self.thread_vars.test_plan = ( - self.thread_vars.overspecified_test_plan + self.test_plan = ( + self.overspecified_test_plan.next_group() ) elif mode == "COPIED_OVER": - self.thread_vars.test_plan = self.thread_vars.copiedover_test_plan + self.test_plan = self.copiedover_test_plan elif mode == InputModel.SEMANTIC: - self.thread_vars.test_plan = self.thread_vars.semantic_test_plan + self.test_plan = self.semantic_test_plan elif mode == InputModel.ADDITIONAL_SEMANTIC: - self.thread_vars.test_plan = ( - self.thread_vars.additional_semantic_test_plan + self.test_plan = ( + self.additional_semantic_test_plan ) else: raise ValueError(mode) + + def is_empty(self): - """if test plan is empty""" - return len(self.thread_vars.test_plan) == 0 + """if test plan is empty, both global queue and local queue""" + return len(self.test_plan) == 0 and len(self.thread_vars.test_plan) == 0 + def get_seed_input(self) -> dict: """Get the raw value of the seed input""" @@ -528,3 +535,4 @@ def apply_default_value(self, default_value_result: dict): "Setting default value for %s to %s", path, decoded_value ) self.get_schema_by_path(path).set_default(value) + diff --git a/acto/input/testplan.py b/acto/input/testplan.py index 53e4b80f17..e11a3a1925 100644 --- a/acto/input/testplan.py +++ b/acto/input/testplan.py @@ -1,5 +1,7 @@ import json +import queue import random +import threading from typing import List, Tuple from acto.schema.base import TreeNode @@ -251,8 +253,8 @@ def finish_testcase(self): def __len__(self): return len(self.tests) - - + + class DeterministicTestPlan(TestPlan): def __init__(self): @@ -275,4 +277,38 @@ def add_testcase_group(self, groups: TestGroup): self.groups.append(groups) def __len__(self): - return sum([len(i) for i in self.groups]) \ No newline at end of file + return sum([len(i) for i in self.groups]) + + +class SharedTestPlan(TestPlan): + + def __init__(self): + self.groups = queue.Queue() + self.length = 0 + pass + + def next_group(self): + if self.groups.empty(): + return None + + head = self.groups.get() + self.groups.task_done() + + if len(head) == 0: + return None + else: + self.length -= len(head) + return head + + def add_testcase_groups(self, groups: List[TestGroup]): + for group in groups: + self.groups.put(group) + self.length += len(group) + + + def add_testcase_group(self, groups: TestGroup): + self.groups.put(groups) + self.length += len(groups) + + def __len__(self): + return self.length