-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sourcery Starbot ⭐ refactored ClaasRostock/stgem #1
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,21 +19,19 @@ def collect_replica_files(path, prefix): | |
begin with the given prefix.""" | ||
|
||
if not os.path.exists(path): | ||
raise Exception("No path '{}'.".format(path)) | ||
raise Exception(f"No path '{path}'.") | ||
|
||
results = [] | ||
for dir, subdirs, files in os.walk(path): | ||
for file in files: | ||
if file.startswith(prefix): | ||
results.append(os.path.join(dir, file)) | ||
|
||
results.extend( | ||
os.path.join(dir, file) | ||
for file in files | ||
if file.startswith(prefix) | ||
) | ||
Comment on lines
-22
to
+30
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
return results | ||
|
||
def load_results(files, load_sut_output=True): | ||
results = [] | ||
for file in files: | ||
results.append(STGEMResult.restore_from_file(file)) | ||
|
||
results = [STGEMResult.restore_from_file(file) for file in files] | ||
Comment on lines
-33
to
+34
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
# This reduces memory usage if these values are not needed. | ||
if not load_sut_output: | ||
for result in results: | ||
|
@@ -48,7 +46,9 @@ def loadExperiments(path, benchmarks, prefixes): | |
for prefix in prefixes[benchmark]: | ||
files = collect_replica_files(os.path.join(path, benchmark), prefix) | ||
if len(files) == 0: | ||
raise Exception("Empty experiment for prefix '{}' for benchmark '{}'.".format(prefix, benchmark)) | ||
raise Exception( | ||
f"Empty experiment for prefix '{prefix}' for benchmark '{benchmark}'." | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
experiments[benchmark][prefix] = load_results(files) | ||
|
||
return experiments | ||
|
@@ -57,10 +57,10 @@ def falsification_rate(experiment): | |
if len(experiment) == 0: | ||
return None | ||
|
||
c = 0 | ||
for result in experiment: | ||
c += 1 if any(step.success for step in result.step_results) else 0 | ||
|
||
c = sum( | ||
1 if any(step.success for step in result.step_results) else 0 | ||
for result in experiment | ||
) | ||
Comment on lines
-60
to
+63
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
return c/len(experiment) | ||
|
||
def times(replica): | ||
|
@@ -102,17 +102,11 @@ def mean_min_along(results, length=None): | |
A.append(B) | ||
|
||
A = np.array(A) | ||
C = np.mean(A, axis=0) | ||
|
||
return C | ||
return np.mean(A, axis=0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
|
||
def first_falsification(replica): | ||
_, _, Y = replica.test_repository.get() | ||
for i in range(len(Y)): | ||
if min(Y[i]) <= 0.0: | ||
return i | ||
|
||
return None | ||
return next((i for i in range(len(Y)) if min(Y[i]) <= 0.0), None) | ||
Comment on lines
-111
to
+109
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
|
||
def set_boxplot_color(bp, color): | ||
plt.setp(bp["boxes"], color=color) | ||
|
@@ -158,10 +152,15 @@ def plotTest(replica, idx): | |
|
||
if replica.sut_parameters["input_type"] == "vector": | ||
input_type = "vector" | ||
elif replica.sut_parameters["input_type"] == "signal" or replica.sut_parameters["input_type"] == "piecewise constant signal": | ||
elif replica.sut_parameters["input_type"] in [ | ||
"signal", | ||
"piecewise constant signal", | ||
]: | ||
input_type = "signal" | ||
else: | ||
raise Exception("Unknown input type '{}'.".format(replica.sut_parameters["input_type"])) | ||
raise Exception( | ||
f"""Unknown input type '{replica.sut_parameters["input_type"]}'.""" | ||
) | ||
Comment on lines
-161
to
+163
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
output_type = replica.sut_parameters["output_type"] | ||
|
||
inputs = replica.sut_parameters["inputs"] | ||
|
@@ -213,32 +212,31 @@ def plotTest(replica, idx): | |
# Output. | ||
print(", ".join(outputs)) | ||
print(Y[idx].outputs) | ||
elif output_type == "signal": | ||
fig, ax = plt.subplots(1, len(outputs), figsize=(10*len(outputs), 10)) | ||
|
||
# Input. | ||
print(", ".join(inputs)) | ||
print(X[idx].input_denormalized) | ||
|
||
# Output. | ||
for i, var in enumerate(outputs): | ||
o = ax[i] if len(outputs) > 1 else ax | ||
o.set_title(var) | ||
o.set_xlim((0, simulation_time)) | ||
o.set_ylim(output_range[i]) | ||
x = Z[idx].output_timestamps | ||
y = Z[idx].outputs[i] | ||
o.plot(x, y) | ||
else: | ||
if output_type == "signal": | ||
fig, ax = plt.subplots(1, len(outputs), figsize=(10*len(outputs), 10)) | ||
|
||
# Input. | ||
print(", ".join(inputs)) | ||
print(X[idx].input_denormalized) | ||
|
||
# Output. | ||
for i, var in enumerate(outputs): | ||
o = ax[i] if len(outputs) > 1 else ax | ||
o.set_title(var) | ||
o.set_xlim((0, simulation_time)) | ||
o.set_ylim(output_range[i]) | ||
x = Z[idx].output_timestamps | ||
y = Z[idx].outputs[i] | ||
o.plot(x, y) | ||
else: | ||
# Input. | ||
print(", ".join(inputs)) | ||
print(X[idx].input_denormalized) | ||
print() | ||
# Input. | ||
print(", ".join(inputs)) | ||
print(X[idx].input_denormalized) | ||
print() | ||
|
||
# Output. | ||
print(", ".join(outputs)) | ||
print(Y[idx].outputs) | ||
# Output. | ||
print(", ".join(outputs)) | ||
print(Y[idx].outputs) | ||
|
||
def animateResult(replica): | ||
inputs = replica.sut_parameters["inputs"] | ||
|
@@ -329,7 +327,7 @@ def visualize3DTestSuite(experiment, idx): | |
angle = 25 | ||
result = experiment[idx] | ||
|
||
if not result.sut_parameters["input_type"] == "vector": | ||
if result.sut_parameters["input_type"] != "vector": | ||
Comment on lines
-332
to
+330
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
This removes the following comments ( why? ):
|
||
raise Exception("Test suite visualization available only for vector input SUTs.") | ||
|
||
X, _, Y = result.test_repository.get() | ||
|
@@ -371,7 +369,7 @@ def visualize3DTestSuite(experiment, idx): | |
# of the robustness values in both classes. | ||
interval_false = [1, -1] # Min & Max robustness values | ||
interval_persist = [1, -1] | ||
c = list() # List for tracking input indexes that fail the test | ||
c = [] | ||
|
||
for i in range(len(X)): | ||
if (Y[i] <= falsify_pct): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -230,7 +230,9 @@ def generator_loss_on_batch(model, batch_size): | |
# TODO: This needs to be updated to use the latest performance records. What does this do? | ||
A = model.perf.histories["generator_loss"][-1][0] | ||
B = model.perf.histories["generator_loss"][-1][-1] | ||
print("training loss: {} -> {}".format(A, B)) | ||
print("noise batch loss: {}".format(generator_loss_on_batch(model, batch_size))) | ||
print("discriminator batch loss: {}".format(discriminator_loss_on_batch(model, batch_size))) | ||
print(f"training loss: {A} -> {B}") | ||
print(f"noise batch loss: {generator_loss_on_batch(model, batch_size)}") | ||
print( | ||
f"discriminator batch loss: {discriminator_loss_on_batch(model, batch_size)}" | ||
) | ||
Comment on lines
-233
to
+237
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lines
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,9 +72,11 @@ def build_specification(selected_specification, afc_mode="normal"): | |
if selected_specification == "AFC27": | ||
E = 0.1 # Used in Ernst et al. | ||
#E = 0.05 # Used in ARCH-COMP 2021. | ||
rise = "(THROTTLE < 8.8) and (eventually[0,{}](THROTTLE > 40.0))".format(E) | ||
fall = "(THROTTLE > 40.0) and (eventually[0,{}](THROTTLE < 8.8))".format(E) | ||
specification = "always[11,50](({} or {}) -> always[1,5](|MU| < 0.008))".format(rise, fall) | ||
rise = f"(THROTTLE < 8.8) and (eventually[0,{E}](THROTTLE > 40.0))" | ||
fall = f"(THROTTLE > 40.0) and (eventually[0,{E}](THROTTLE < 8.8))" | ||
specification = ( | ||
f"always[11,50](({rise} or {fall}) -> always[1,5](|MU| < 0.008))" | ||
) | ||
Comment on lines
-75
to
+79
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
|
||
specifications = [specification] | ||
strict_horizon_check = False | ||
|
@@ -85,7 +87,7 @@ def build_specification(selected_specification, afc_mode="normal"): | |
specifications = [specification] | ||
strict_horizon_check = True | ||
else: | ||
raise Exception("Unknown specification '{}'.".format(selected_specification)) | ||
raise Exception(f"Unknown specification '{selected_specification}'.") | ||
|
||
return sut_parameters, specifications, strict_horizon_check | ||
|
||
|
@@ -102,15 +104,13 @@ def step_factory(): | |
step_1 = Search(mode=mode, | ||
budget_threshold={"executions": 75}, | ||
algorithm=Random(model_factory=(lambda: Uniform())) | ||
) | ||
) | ||
step_2 = Search(mode=mode, | ||
budget_threshold={"executions": 300}, | ||
algorithm=OGAN(model_factory=(lambda: OGAN_Model(ogan_model_parameters["convolution"])), parameters=ogan_parameters) | ||
#algorithm=WOGAN(model_factory=(lambda: WOGAN_Model()) | ||
) | ||
#steps = [step_1] | ||
steps = [step_1, step_2] | ||
return steps | ||
return [step_1, step_2] | ||
Comment on lines
-105
to
+113
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
This removes the following comments ( why? ):
|
||
|
||
def get_step_factory(): | ||
return step_factory | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -159,16 +159,14 @@ def step_factory(): | |
step_1 = Search(mode=mode, | ||
budget_threshold={"executions": 75}, | ||
algorithm=Random(model_factory=(lambda: Uniform())) | ||
) | ||
) | ||
step_2 = Search(mode=mode, | ||
budget_threshold={"executions": 300}, | ||
algorithm=OGAN(model_factory=(lambda: OGAN_Model(ogan_model_parameters["convolution"])), parameters=ogan_parameters), | ||
#algorithm=WOGAN(model_factory=(lambda: WOGAN_Model())), | ||
results_include_models=False | ||
) | ||
#steps = [step_1] | ||
steps = [step_1, step_2] | ||
return steps | ||
return [step_1, step_2] | ||
Comment on lines
-162
to
+169
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
This removes the following comments ( why? ):
|
||
|
||
def get_step_factory(): | ||
return step_factory | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -128,16 +128,14 @@ def step_factory(): | |
step_1 = Search(mode=mode, | ||
budget_threshold={"executions": 75}, | ||
algorithm=Random(model_factory=(lambda: Uniform())) | ||
) | ||
) | ||
step_2 = Search(mode=mode, | ||
budget_threshold={"executions": 300}, | ||
algorithm=OGAN(model_factory=(lambda: OGAN_Model(ogan_model_parameters["convolution"])), parameters=ogan_parameters), | ||
#algorithm=WOGAN(model_factory=(lambda: WOGAN_Model())), | ||
results_include_models=False | ||
) | ||
#steps = [step_1] | ||
steps = [step_1, step_2] | ||
return steps | ||
return [step_1, step_2] | ||
Comment on lines
-131
to
+138
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
This removes the following comments ( why? ):
|
||
|
||
def get_step_factory(): | ||
return step_factory | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,15 +48,17 @@ | |
""" | ||
|
||
|
||
for benchmark in data: | ||
module = importlib.import_module("{}.benchmark".format(benchmark.lower())) | ||
for benchmark, value in data.items(): | ||
module = importlib.import_module(f"{benchmark.lower()}.benchmark") | ||
build_specification = module.build_specification | ||
for specification, ((idx, robustness, precision), test) in data[benchmark].items(): | ||
for specification, ((idx, robustness, precision), test) in value.items(): | ||
sut_parameters, specifications, strict_horizon_check = build_specification(specification) | ||
if "type" in sut_parameters and sut_parameters["type"] == "simulink": | ||
sut = Matlab_Simulink(sut_parameters) | ||
else: | ||
sut = Matlab(sut_parameters) | ||
sut = ( | ||
Matlab_Simulink(sut_parameters) | ||
if "type" in sut_parameters | ||
and sut_parameters["type"] == "simulink" | ||
else Matlab(sut_parameters) | ||
) | ||
Comment on lines
-51
to
+61
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lines
|
||
sut.setup() | ||
objectives = [FalsifySTL(specification=s, scale=False, strict_horizon_check=strict_horizon_check) for s in specifications] | ||
for objective in objectives: | ||
|
@@ -67,7 +69,9 @@ | |
output = [objective(sut_input, sut_output) for objective in objectives] | ||
|
||
if abs(output[idx] - robustness) >= 10**(-precision): | ||
raise SystemExit("Incorrect output robustness {} for benchmark '{}' and specification '{}'. Expected {}.".format(output[idx], benchmark, specification, robustness)) | ||
raise SystemExit( | ||
f"Incorrect output robustness {output[idx]} for benchmark '{benchmark}' and specification '{specification}'. Expected {robustness}." | ||
) | ||
|
||
print("All correct!") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,14 +22,21 @@ | |
def main(selected_benchmark, selected_specification, mode, n, init_seed, identifier): | ||
N = n | ||
|
||
if not selected_specification in benchmark_specifications[selected_benchmark]: | ||
raise Exception("No specification '{}' for benchmark {}.".format(selected_specification, selected_benchmark)) | ||
|
||
output_file = "{}.npy.gz".format(identifier) | ||
if ( | ||
selected_specification | ||
not in benchmark_specifications[selected_benchmark] | ||
): | ||
raise Exception( | ||
f"No specification '{selected_specification}' for benchmark {selected_benchmark}." | ||
) | ||
|
||
output_file = f"{identifier}.npy.gz" | ||
if os.path.exists(output_file): | ||
raise Exception("Output file '{}' already exists.".format(output_file)) | ||
raise Exception(f"Output file '{output_file}' already exists.") | ||
|
||
benchmark_module = importlib.import_module("{}.benchmark".format(selected_benchmark.lower())) | ||
benchmark_module = importlib.import_module( | ||
f"{selected_benchmark.lower()}.benchmark" | ||
) | ||
Comment on lines
-25
to
+39
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
|
||
sut_parameters, specifications, strict_horizon_check = benchmark_module.build_specification(selected_specification, mode) | ||
|
||
|
@@ -38,9 +45,10 @@ def main(selected_benchmark, selected_specification, mode, n, init_seed, identif | |
else: | ||
sut = Matlab(sut_parameters) | ||
|
||
ranges = {} | ||
for n in range(len(sut_parameters["input_range"])): | ||
ranges[sut_parameters["inputs"][n]] = sut_parameters["input_range"][n] | ||
ranges = { | ||
sut_parameters["inputs"][n]: sut_parameters["input_range"][n] | ||
for n in range(len(sut_parameters["input_range"])) | ||
} | ||
for n in range(len(sut_parameters["output_range"])): | ||
ranges[sut_parameters["outputs"][n]] = sut_parameters["output_range"][n] | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,12 +54,10 @@ def get_u_ref(self, t, x_f16): | |
|
||
Nz, ps, Ny_r, throttle = self.get_u_ref(t, x_f16) | ||
|
||
assert Nz <= self.ctrlLimits.NzMax, "autopilot commanded too low Nz ({})".format(Nz) | ||
assert Nz >= self.ctrlLimits.NzMin, "autopilot commanded too high Nz ({})".format(Nz) | ||
assert Nz <= self.ctrlLimits.NzMax, f"autopilot commanded too low Nz ({Nz})" | ||
assert Nz >= self.ctrlLimits.NzMin, f"autopilot commanded too high Nz ({Nz})" | ||
|
||
u_ref = np.array([Nz, ps, Ny_r, throttle], dtype=float) | ||
|
||
return u_ref | ||
return np.array([Nz, ps, Ny_r, throttle], dtype=float) | ||
Comment on lines
-57
to
+60
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
|
||
def get_num_integrators(self): | ||
'get the number of integrators in the autopilot' | ||
|
@@ -100,9 +98,9 @@ def advance_discrete_state(self, t, x_f16): | |
eps_phi = deg2rad(5) # Max roll angle magnitude before pulling g's | ||
eps_p = deg2rad(1) # Max roll rate magnitude before pulling g's | ||
path_goal = deg2rad(0) # Final desired path angle | ||
man_start = 2 # maneuver starts after 2 seconds | ||
|
||
if self.state == GcasAutopilot.STATE_START: | ||
Comment on lines
-103
to
105
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
man_start = 2 # maneuver starts after 2 seconds | ||
|
||
if t >= man_start: | ||
self.state = GcasAutopilot.STATE_ROLL | ||
rv = True | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Function
_normalize_docstring_lines
refactored with the following changes:use-fstring-for-concatenation
)