Skip to content

Commit

Permalink
Bug fixes for process runner
Browse files Browse the repository at this point in the history
  • Loading branch information
KashunCheng committed Aug 4, 2023
1 parent f8a82f2 commit 039832c
Show file tree
Hide file tree
Showing 16 changed files with 115 additions and 101 deletions.
18 changes: 2 additions & 16 deletions acto/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import time
import random

from acto.config import actoConfig

from acto.lib.monkey_patch_loader import load_monkey_patch
from acto.lib.operator_config import OperatorConfig

Expand Down Expand Up @@ -95,19 +93,8 @@
logging.getLogger("kubernetes").setLevel(logging.ERROR)
logging.getLogger("sh").setLevel(logging.ERROR)

if actoConfig.parallel.executor == 'ray':
import ansible_runner
import ray

ansible_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'scripts', 'ansible')
ansible_runner.run(inventory=actoConfig.parallel.ansible_inventory, playbook=os.path.join(ansible_dir, 'acto_ray.yaml'))
head_result = ansible_runner.run(inventory=actoConfig.parallel.ansible_inventory,
playbook=os.path.join(ansible_dir, 'ray_head.yaml'))
ansible_runner.run(inventory=actoConfig.parallel.ansible_inventory,
playbook=os.path.join(ansible_dir, 'ray_worker.yaml'))
if head_result.stats['changed'] != {}:
time.sleep(5)
ray.init(address='auto')
import acto.ray_acto as ray
ray.start_service()


from acto import common
Expand Down Expand Up @@ -170,7 +157,6 @@
elif not args.learn:
acto.run(modes=['normal'])
normal_finish_time = datetime.now()
acto.teardown()
logger.info('Acto normal run finished in %s', normal_finish_time - start_time)
logger.info('Start post processing steps')

Expand Down
2 changes: 1 addition & 1 deletion acto/checker/impl/kubectl_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,4 @@ def _binary_check(self, snapshot: Snapshot, prev_snapshot: Snapshot) -> OracleRe

logger.log(logging.CRITICAL if actoConfig.strict else logging.ERROR,
f'stderr is not empty, but invalid_input_message mark it as valid: {stderr}')
return KubectlCliResult()
return KubectlCliResult(f'Invalid input, field path: None, error {stderr}')
1 change: 1 addition & 0 deletions acto/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def load_config(path=os.path.join(os.path.dirname(os.path.dirname(os.path.abspat
global actoConfig
config = yaml.safe_load(open(path))
actoConfig = Config.parse_obj(config)
return actoConfig


load_config()
6 changes: 4 additions & 2 deletions acto/deploy.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import logging
from enum import auto, unique
from functools import wraps, partial
from typing import TypeVar, Callable

import yaml
from strenum import StrEnum

from acto.monkey_patch import monkey_patch

import acto.utils as utils
from acto.checker.checker_set import CheckerSet
from acto.checker.impl.health import HealthChecker
Expand Down Expand Up @@ -150,15 +153,14 @@ def deploy(self, runner: Runner) -> str:
the namespace from the provided object "rabbitmq-system" does not
match the namespace "acto-namespace". You must pass '--namespace=rabbitmq-system' to perform this operation.
"""
logger = get_thread_logger(with_prefix=True)
print_event('Deploying operator...')

kubectl_client = runner.kubectl_client

namespace = utils.get_yaml_existing_namespace(self.crd_yaml_files) or CONST.ACTO_NAMESPACE
ret = utils.create_namespace(kubectl_client.api_client, namespace)
if ret is None:
logger.critical('Failed to create namespace')
logging.critical('Failed to create namespace')
# use server side apply to avoid last-applied-configuration
if self.init_yaml_files:
kubectl_client.apply(self.init_yaml_files, server_side=None)
Expand Down
43 changes: 18 additions & 25 deletions acto/engine_new.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
from ssa.analysis import analyze


def task(runner: Runner, data: Tuple[Callable[[Runner, Trial, dict],Snapshot], CheckerSet, int, TrialInputIteratorLike]) -> Trial:
collector, checkers, num_of_mutations, iterator = data
trial = Trial(iterator, checkers, num_mutation=num_of_mutations)
return runner.run.remote(trial, collector)

class Acto:
def __init__(self,
workdir_path: str,
Expand Down Expand Up @@ -149,7 +154,7 @@ def __init__(self,
for oracle_modules in operator_config.custom_oracles:
module = importlib.import_module(oracle_modules)
for name, obj in inspect.getmembers(module):
if inspect.isclass(obj) and issubclass(obj, Checker) and obj != Checker:
if inspect.isclass(obj) and issubclass(obj, Checker) and not inspect.isabstract(obj):
checker_generators.append(obj)

self.checkers = CheckerSet(self.context, self.input_model, checker_generators)
Expand Down Expand Up @@ -207,24 +212,19 @@ def collect_learn_context(namespace_discovered: str, runner: Runner, trial: Tria
json.dump(self.context, context_fout, cls=ContextEncoder, indent=4, sort_keys=True)

def run_trials(self, iterators: Sequence[TrialInputIteratorLike], _collector: Callable[[Runner, Trial, dict], Snapshot] = None):
def task(runner: Runner, iterator: TrialInputIteratorLike) -> Trial:
if _collector is None:
collector = with_context(CollectorContext(
namespace=self.context['namespace'],
crd_meta_info=self.context['crd'],
collect_coverage=self.collect_coverage,
), snapshot_collector)
else:
collector = _collector
# Inject the deploy step into the collector, to deploy the operator before running the test
collector = self.deploy.chain_with(drop_first_parameter(collector))
assert isinstance(self.input_model.get_root_schema(), ObjectSchema)

trial = Trial(iterator, self.checkers, num_mutation=self.num_of_mutations)
return runner.run.remote(trial, collector)

if _collector is None:
collector = with_context(CollectorContext(
namespace=self.context['namespace'],
crd_meta_info=self.context['crd'],
collect_coverage=self.collect_coverage,
), snapshot_collector)
else:
collector = _collector
# Inject the deploy step into the collector, to deploy the operator before running the test
collector = self.deploy.chain_with(drop_first_parameter(collector))
assert isinstance(self.input_model.get_root_schema(), ObjectSchema)
for it in iterators:
self.runners.submit(task, it)
self.runners.submit(task, (collector, self.checkers, self.num_of_mutations, it))

while self.runners.has_next():
# As long as we have remaining test cases
Expand Down Expand Up @@ -268,13 +268,6 @@ def run_test_plan(test_case_list: List[Tuple[List[str], TestCase]]):

logger.info('All tests finished')

def teardown(self):
while True:
runner = self.runners.pop_idle()
if not runner:
break
runner.teardown_cluster.remote()


def do_static_analysis(analysis: AnalysisConfig):
with tempfile.TemporaryDirectory() as project_src:
Expand Down
13 changes: 13 additions & 0 deletions acto/lib/monkey_patch_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@
from acto.config import actoConfig
from acto.lib.operator_config import OperatorConfig

def patch_process_pool():
import sys
import dill
# pickle cannot serialize closures, so we use dill instead
# https://stackoverflow.com/questions/19984152/what-can-multiprocessing-and-dill-do-together
dill.Pickler.dumps, dill.Pickler.loads = dill.dumps, dill.loads
from multiprocessing import reduction
reduction.ForkingPickler = dill.Pickler
reduction.dump = dill.dump
assert 'multiprocessing.connection' not in sys.modules

def load_monkey_patch(config: OperatorConfig):
monkey_patch_load_path = os.path.expanduser('~/.acto_monkey_patch.rc')
Expand All @@ -12,6 +22,9 @@ def load_monkey_patch(config: OperatorConfig):
else:
open(monkey_patch_load_path, 'w').write('')

if actoConfig.parallel.executor == 'process':
patch_process_pool()

if actoConfig.parallel.executor == 'ray':
import ansible_runner

Expand Down
23 changes: 5 additions & 18 deletions acto/post_diff_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@

if __name__ == '__main__':

from acto.config import actoConfig
import acto.config as acto_config

acto_config.load_config(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
actoConfig = acto_config.load_config(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'config_post_diff_test.yaml'))

from acto.lib.monkey_patch_loader import load_monkey_patch
from acto.lib.operator_config import OperatorConfig
from acto.post_process import PostDiffTest

# for debugging, set random seed to 0
random.seed(0)
Expand Down Expand Up @@ -53,23 +51,13 @@
format='%(asctime)s %(levelname)-7s, %(name)s, %(filename)-9s:%(lineno)d, %(message)s')
logging.getLogger("kubernetes").setLevel(logging.ERROR)
logging.getLogger("sh").setLevel(logging.ERROR)
if actoConfig.parallel.executor == 'ray':
import ansible_runner
import ray

ansible_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'scripts', 'ansible')
ansible_runner.run(inventory=actoConfig.parallel.ansible_inventory,
playbook=os.path.join(ansible_dir, 'acto_ray.yaml'))
head_result = ansible_runner.run(inventory=actoConfig.parallel.ansible_inventory,
playbook=os.path.join(ansible_dir, 'ray_head.yaml'))
ansible_runner.run(inventory=actoConfig.parallel.ansible_inventory,
playbook=os.path.join(ansible_dir, 'ray_worker.yaml'))
if head_result.stats['changed'] != {}:
time.sleep(5)
ray.init(address='auto')
import acto.ray_acto as ray
ray.start_service()


def main():
from acto.post_process import PostDiffTest

post_diff_test_dir = os.path.join(args.workdir_path, 'post_diff_test')
trials = {}
trial_paths = glob.glob(os.path.join(args.workdir_path, '**', 'trial.pkl'))
Expand All @@ -81,7 +69,6 @@ def main():
if not args.checkonly:
p.post_process(post_diff_test_dir)
p.check(post_diff_test_dir)
p.teardown()


main()
Expand Down
31 changes: 15 additions & 16 deletions acto/post_process/post_diff_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from deepdiff.operator import BaseOperator
from pandas import DataFrame

from acto.monkey_patch import monkey_patch

from acto.checker.checker import OracleResult, OracleControlFlow
from acto.checker.impl import recovery
from acto.checker.impl.recovery import RecoveryResult
Expand Down Expand Up @@ -191,18 +193,18 @@ def history(self) -> List[Tuple[dict, dict]]:
return self.__history


def post_process_task(runner: 'PostDiffRunner', data: Tuple[dict, str]) -> Trial:
system_input, system_input_hash = data
def post_process_task(runner: 'PostDiffRunner', data: Tuple[dict, str, dict, Deploy]) -> Trial:
system_input, system_input_hash, context, deploy = data
collector = with_context(CollectorContext(
namespace=runner.context['namespace'],
crd_meta_info=runner.context['crd'],
namespace=context['namespace'],
crd_meta_info=context['crd'],
), snapshot_collector)

# Inject the deploy step into the collector, to deploy the operator before running the test
collector = runner.deploy.chain_with(drop_first_parameter(collector))
collector = deploy.chain_with(drop_first_parameter(collector))

trial = Trial(TrialSingleInputIterator(system_input, system_input_hash), None, num_mutation=10)
return runner.run.remote(trial, collector)
return ray.get(runner.run.remote(trial, collector))


def post_diff_compare_task(runner: 'PostDiffRunner', data: Tuple[Snapshot, DataFrame, bool]) -> OracleResult:
Expand All @@ -211,15 +213,18 @@ def post_diff_compare_task(runner: 'PostDiffRunner', data: Tuple[Snapshot, DataF


@ray.remote(scheduling_strategy="SPREAD", num_cpus=1, resources={"disk": 10})
class PostDiffRunner(Runner):
class PostDiffRunner:
def __init__(self, context: dict, deploy: Deploy, diff_ignore_fields: List[str], engine_class: Type[Engine],
engine_version: str, num_nodes: int, preload_images: List[str] = None,
preload_images_store: Callable[[str], str] = None):
super().__init__(engine_class, engine_version, num_nodes, preload_images, preload_images_store)
self.trial_runner = Runner.remote(engine_class, engine_version, num_nodes, preload_images, preload_images_store)
self.context = context
self.deploy = deploy
self.diff_ignore_fields = diff_ignore_fields

def run(self, trial: Trial, snapshot_collector: Callable[['Runner', Trial, dict], Snapshot])-> Trial:
return self.trial_runner.run.remote(trial, snapshot_collector)

def check_trial(self, diff_snapshot: Snapshot, originals: DataFrame, run_check_indeterministic: bool = False):
group_errs = []
diff_system_input = diff_snapshot.input
Expand All @@ -239,7 +244,7 @@ def check_trial(self, diff_snapshot: Snapshot, originals: DataFrame, run_check_i

errored = False
if run_check_indeterministic:
additional_trial = post_process_task(self, (diff_system_input, digest))
additional_trial = post_process_task(self, (diff_system_input, digest, self.context, self.deploy))
try:
additional_snapshot, _ = unpack_history_iterator_or_raise(additional_trial.history_iterator())
except RuntimeError as e:
Expand Down Expand Up @@ -341,7 +346,7 @@ def post_process(self, workdir: str):
trial_name = group.iloc[0]['trial']
if os.path.exists(os.path.join(workdir, trial_name, f'difftest-{digest}.pkl')):
continue
self._runners.submit(post_process_task, (group.iloc[0]['input'], group.iloc[0]['input_digest']))
self._runners.submit(post_process_task, (group.iloc[0]['input'], group.iloc[0]['input_digest'], self._context, self._deploy))

while self._runners.has_next():
# As long as there are still runners running, or we have remaining test cases
Expand Down Expand Up @@ -388,12 +393,6 @@ def check_by_snapshots(self, snapshots: Iterable[Snapshot], run_check_indetermin
results.extend(result)
return results

def teardown(self):
while True:
runner = self._runners.pop_idle()
if not runner:
break
runner.teardown_cluster.remote()


if __name__ == '__main__':
Expand Down
22 changes: 22 additions & 0 deletions acto/ray_acto/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,23 @@
import os
import time

from .ray import remote, get
from acto.config import actoConfig


def start_service():
if actoConfig.parallel.executor == 'ray':
import ansible_runner
import ray

ansible_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
'scripts', 'ansible')
ansible_runner.run(inventory=actoConfig.parallel.ansible_inventory,
playbook=os.path.join(ansible_dir, 'acto_ray.yaml'))
head_result = ansible_runner.run(inventory=actoConfig.parallel.ansible_inventory,
playbook=os.path.join(ansible_dir, 'ray_head.yaml'))
ansible_runner.run(inventory=actoConfig.parallel.ansible_inventory,
playbook=os.path.join(ansible_dir, 'ray_worker.yaml'))
if head_result.stats['changed'] != {}:
time.sleep(5)
ray.init(address='auto')
12 changes: 11 additions & 1 deletion acto/reproduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,14 @@
args = parser.parse_args()

from acto.lib.monkey_patch_loader import load_monkey_patch
load_monkey_patch(args.config)
from acto.lib.operator_config import OperatorConfig
with open(args.config, 'r') as config_file:
config = OperatorConfig(**json.load(config_file))
load_monkey_patch(config)

import acto.ray_acto as ray

ray.start_service()

from acto.engine_new import Acto
from acto.input import TestCase
Expand All @@ -56,6 +63,9 @@ def flush(self):
def revert(self):
pass

def redo(self):
pass

def swap_iterator(self, _: Iterator[Tuple[List[str], 'TestCase']]) -> Iterator[Tuple[List[str], 'TestCase']]:
return iter(())

Expand Down
13 changes: 0 additions & 13 deletions acto/utils/process_with_except.py

This file was deleted.

Loading

0 comments on commit 039832c

Please sign in to comment.