Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lafleur/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

101 changes: 21 additions & 80 deletions lafleur/corpus_manager.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
"""
This module provides the CorpusManager and CorpusScheduler classes.

These components are responsible for all high-level management of the fuzzing
corpus, including selecting parent test cases, adding new files, generating
initial seeds, and synchronizing the fuzzer's state with the files on disk.
"""

import hashlib
import logging
import os
import random
import subprocess
Expand All @@ -19,6 +12,8 @@
from lafleur.coverage import save_coverage_state
from lafleur.utils import ExecutionResult

logger = logging.getLogger(__name__)

TMP_DIR = Path("tmp_fuzz_run")
CORPUS_DIR = Path("corpus") / "jit_interesting_tests"

Expand Down Expand Up @@ -47,8 +42,6 @@ def _calculate_rarity_score(self, file_metadata: dict[str, Any]) -> float:

for harness_data in baseline_coverage.values():
for edge in harness_data.get("edges", []):
# The score for an edge is the inverse of its global hit count.
# We add 1 to the denominator to avoid division by zero.
global_hits = self.global_coverage.get("edges", {}).get(edge, 0)
rarity_score += 1.0 / (global_hits + 1)
return rarity_score
Expand All @@ -57,40 +50,21 @@ def calculate_scores(self) -> dict[str, float]:
"""Iterate through the corpus and calculate a score for each file."""
scores = {}
for filename, metadata in self.coverage_state.get("per_file_coverage", {}).items():
# Start with a base score
score = 100.0

# --- Heuristic 1: Performance (lower is better) ---
# Penalize slow and large files.
score -= metadata.get("execution_time_ms", 100) * 0.1
score -= metadata.get("file_size_bytes", 1000) * 0.01

# --- Heuristic 2: Rarity (higher is better) ---
# Reward files that contain globally rare coverage.
rarity = self._calculate_rarity_score(metadata)
score += rarity * 50.0

# --- Heuristic 3: Fertility (higher is better) ---
# Reward parents that have produced successful children.
score += metadata.get("total_finds", 0) * 20.0
# Heavily penalize sterile parents that haven't found anything new in a long time.
if metadata.get("is_sterile", False):
score *= 0.1

# --- Heuristic 4: Depth (higher is better) ---
# Slightly reward deeper mutation chains to encourage depth exploration.
score += metadata.get("lineage_depth", 1) * 5.0

# Ensure score is non-negative
scores[filename] = max(1.0, score)

return scores


class CorpusManager:
"""
Handle all interactions with the corpus on disk and the fuzzer's state.
"""
"""Handle all interactions with the corpus on disk and the fuzzer's state."""

def __init__(
self,
Expand All @@ -99,7 +73,6 @@ def __init__(
fusil_path: str,
get_boilerplate_func: Callable[..., str],
):
"""Initialize the CorpusManager."""
self.coverage_state = coverage_state
self.run_stats = run_stats
self.fusil_path = fusil_path
Expand All @@ -122,77 +95,63 @@ def __init__(
def synchronize(
self, orchestrator_analyze_run_func: Callable, orchestrator_build_lineage_func: Callable
) -> None:
"""
Reconcile the state file with the corpus directory on disk.

Ensure the fuzzer's state is consistent with the actual files in
the corpus. Handle files that were deleted, added, or modified
since the last run.
"""
print("[*] Synchronizing corpus directory with state file...")
logger.info("[*] Synchronizing corpus directory with state file...")
if not CORPUS_DIR.exists():
CORPUS_DIR.mkdir(parents=True, exist_ok=True)

disk_files = {p.name for p in CORPUS_DIR.glob("*.py")}
state_files = set(self.coverage_state["per_file_coverage"].keys())

# 1. Prune state for files that were deleted from disk.
missing_from_disk = state_files - disk_files
if missing_from_disk:
print(
logger.warning(
f"[-] Found {len(missing_from_disk)} files in state but not on disk. Pruning state."
)
for filename in missing_from_disk:
del self.coverage_state["per_file_coverage"][filename]

# 2. Identify new or modified files to be analyzed.
files_to_analyze = self._get_files_to_analyze(disk_files, state_files)

# 3. Run analysis on all new/modified files to generate their metadata.
if files_to_analyze:
self._analyze_and_add_files(
files_to_analyze, orchestrator_analyze_run_func, orchestrator_build_lineage_func
)

# 4. Synchronize the global file counter to prevent overwrites.
current_max_id = 0
for filename in disk_files:
try:
file_id = int(Path(filename).stem)
if file_id > current_max_id:
current_max_id = file_id
except (ValueError, IndexError):
continue # Ignore non-integer filenames
continue

if current_max_id > self.corpus_file_counter:
print(
logger.info(
f"[*] Advancing file counter from {self.corpus_file_counter} to {current_max_id} to match corpus."
)
self.corpus_file_counter = current_max_id

# Re-populate known_hashes after synchronization is complete.
self.known_hashes = {
metadata.get("content_hash")
for metadata in self.coverage_state.get("per_file_coverage", {}).values()
if "content_hash" in metadata
}

# 5. Save the synchronized state.
save_coverage_state(self.coverage_state)
print("[*] Corpus synchronization complete.")
logger.info("[*] Corpus synchronization complete.")

def _analyze_and_add_files(
self,
files_to_analyze: set[str],
orchestrator_analyze_run_func: Callable,
orchestrator_build_lineage_func: Callable,
) -> None:
"""Analyze a set of new or modified files and add them to the corpus."""
print(f"[*] Analyzing {len(files_to_analyze)} new or modified corpus files...")
logger.info(f"[*] Analyzing {len(files_to_analyze)} new or modified corpus files...")
for filename in sorted(list(files_to_analyze)):
source_path = CORPUS_DIR / filename
log_path = TMP_DIR / f"sync_{source_path.stem}.log"
print(f" -> Analyzing {filename}...")
logger.info(f" -> Analyzing {filename}...")
try:
with open(log_path, "w") as log_file:
start_time = time.monotonic()
Expand Down Expand Up @@ -228,49 +187,42 @@ def _analyze_and_add_files(
mutation_seed=analysis_data["mutation_seed"],
build_lineage_func=orchestrator_build_lineage_func,
)

except Exception as e:
print(f" [!] Failed to analyze seed file {filename}: {e}", file=sys.stderr)
logger.error(f" [!] Failed to analyze seed file {filename}: {e}")

def _get_files_to_analyze(self, disk_files: set[str], state_files: set[str]) -> set[str]:
"""Identify new or modified files on disk that require analysis."""
files_to_analyze = set()
for filename in disk_files:
file_path = CORPUS_DIR / filename
if filename not in state_files:
print(f"[+] Discovered new file in corpus: {filename}")
logger.info(f"[+] Discovered new file in corpus: {filename}")
files_to_analyze.add(filename)
else:
# File exists in both, verify its hash.
try:
content = file_path.read_text()
current_hash = hashlib.sha256(content.encode("utf-8")).hexdigest()
if (
self.coverage_state["per_file_coverage"][filename].get("content_hash")
!= current_hash
):
print(f"[~] File content has changed for {filename}. Re-analyzing.")
logger.info(f"[~] File content has changed for {filename}. Re-analyzing.")
del self.coverage_state["per_file_coverage"][filename]
files_to_analyze.add(filename)
except (IOError, KeyError) as e:
print(f"[!] Error processing existing file {filename}: {e}. Re-analyzing.")
logger.error(
f"[!] Error processing existing file {filename}: {e}. Re-analyzing."
)
if filename in self.coverage_state["per_file_coverage"]:
del self.coverage_state["per_file_coverage"][filename]
files_to_analyze.add(filename)
return files_to_analyze

def select_parent(self) -> tuple[Path, float] | None:
"""
Select a test case from the corpus using a weighted random choice.

Return the path to the selected parent and its calculated score, or
None if the corpus is empty.
"""
corpus_files = list(self.coverage_state.get("per_file_coverage", {}).keys())
if not corpus_files:
return None

print("[+] Calculating corpus scores for parent selection...")
logger.info("[+] Calculating corpus scores for parent selection...")
scores = self.scheduler.calculate_scores()

corpus_weights = [scores.get(filename, 1.0) for filename in corpus_files]
Expand All @@ -294,16 +246,11 @@ def add_new_file(
content_hash: str,
build_lineage_func: Callable,
) -> str:
"""
Add a new file to the corpus and update all related state.

Return the unique filename assigned to the new corpus file.
"""
self.corpus_file_counter += 1
new_filename = f"{self.corpus_file_counter}.py"
corpus_filepath = CORPUS_DIR / new_filename
corpus_filepath.write_text(core_code)
print(f"[+] Added minimized file to corpus: {new_filename}")
logger.info(f"[+] Added minimized file to corpus: {new_filename}")

parent_metadata = (
self.coverage_state["per_file_coverage"].get(parent_id, {}) if parent_id else {}
Expand All @@ -330,20 +277,17 @@ def add_new_file(
self.coverage_state["per_file_coverage"][new_filename] = metadata
self.known_hashes.add(content_hash)

# The manager is now responsible for saving the state it modifies.
save_coverage_state(self.coverage_state)

return new_filename

def generate_new_seed(
self, orchestrator_analyze_run_func: Callable, orchestrator_build_lineage_func: Callable
) -> None:
"""Run a single generative session to create a new seed file."""
tmp_source = TMP_DIR / "gen_run.py"
tmp_log = TMP_DIR / "gen_run.log"

python_executable = sys.executable
# Use fusil to generate a new file
command = [
"sudo",
python_executable,
Expand All @@ -363,19 +307,16 @@ def generate_new_seed(
"--jit-loop-iterations=300",
"--no-numpy",
"--modules=encodings.ascii",
# "--keep-sessions",
]
print(f"[*] Generating new seed with command: {' '.join(command)}")
logger.info(f"[*] Generating new seed with command: {' '.join(command)}")
subprocess.run(command, capture_output=True)

# Execute it to get a log
with open(tmp_log, "w") as log_file:
result = subprocess.run(
["python3", tmp_source], stdout=log_file, stderr=subprocess.STDOUT, env=ENV
)

# Analyze it for coverage
execution_time_ms = 0 # This is a placeholder, as we don't time this run
execution_time_ms = 0
analysis_data = orchestrator_analyze_run_func(
exec_result=ExecutionResult(
returncode=result.returncode,
Expand Down
Loading