Files
oh-my-claudecode/benchmark/analyze_failures.py
JunghwanNA 8cb1dae394 fix(permission-handler): remove dead code and add swarm marker support (#144) (#157)
fix(permission-handler): remove dead code and add swarm marker support
2026-01-27 23:24:02 +09:00

717 lines
22 KiB
Python
Executable File

#!/usr/bin/env python3
"""
SWE-bench Failure Analysis Tool
Analyze failed instances to identify patterns, categorize failures,
and understand differences between vanilla and OMC runs.
Usage:
python analyze_failures.py --results results/vanilla/ --predictions predictions.json
python analyze_failures.py --vanilla results/vanilla/ --omc results/omc/ --compare
"""
import argparse
import json
import logging
import re
from collections import Counter, defaultdict
from datetime import datetime
from pathlib import Path
from typing import Any
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Common failure pattern definitions
FAILURE_PATTERNS = {
"syntax_error": [
r"SyntaxError",
r"IndentationError",
r"TabError",
],
"import_error": [
r"ImportError",
r"ModuleNotFoundError",
r"No module named",
],
"type_error": [
r"TypeError",
r"expected .+ got .+",
],
"attribute_error": [
r"AttributeError",
r"has no attribute",
],
"assertion_error": [
r"AssertionError",
r"assert .+ failed",
],
"test_failure": [
r"FAILED",
r"test.*failed",
r"failures=\d+",
],
"timeout": [
r"timeout",
r"timed out",
r"TimeoutError",
],
"empty_patch": [
r"empty patch",
r"no changes",
r"patch is empty",
],
"apply_failure": [
r"patch.*failed",
r"could not apply",
r"git apply.*failed",
r"hunks? FAILED",
],
"runtime_error": [
r"RuntimeError",
r"Exception",
r"Error:",
],
"value_error": [
r"ValueError",
r"invalid .+ value",
],
"key_error": [
r"KeyError",
r"not found in",
],
}
def load_results(results_dir: Path) -> dict[str, Any]:
"""Load evaluation results."""
results = {"instances": {}}
summary_file = results_dir / "summary.json"
if summary_file.exists():
with open(summary_file) as f:
results = json.load(f)
# Also load from logs if available
logs_dir = results_dir / "logs"
if logs_dir.exists():
for log_file in logs_dir.glob("*.log"):
instance_id = log_file.stem
if instance_id not in results.get("instances", {}):
results.setdefault("instances", {})[instance_id] = {}
results["instances"][instance_id]["log_content"] = log_file.read_text()
return results
def load_predictions(predictions_file: Path) -> dict[str, Any]:
"""Load predictions with metadata."""
with open(predictions_file) as f:
predictions = json.load(f)
if isinstance(predictions, list):
predictions = {p["instance_id"]: p for p in predictions}
return predictions
def categorize_failure(
instance_id: str,
instance_data: dict[str, Any],
prediction_data: dict[str, Any] | None = None
) -> dict[str, Any]:
"""
Categorize a single failure instance.
Returns:
Dictionary with:
- category: Primary failure category
- subcategories: Additional categories
- error_message: Extracted error message
- confidence: Confidence in categorization
"""
result = {
"instance_id": instance_id,
"category": "unknown",
"subcategories": [],
"error_message": None,
"confidence": 0.0,
"details": {}
}
# Get content to analyze
log_content = instance_data.get("log_content", "")
error_message = instance_data.get("error_message", "")
patch = ""
if prediction_data:
patch = prediction_data.get("model_patch", prediction_data.get("patch", ""))
result["details"]["patch_length"] = len(patch)
result["details"]["patch_lines"] = patch.count("\n") + 1 if patch else 0
content_to_analyze = f"{log_content}\n{error_message}"
# Check for empty patch first
if prediction_data and not patch.strip():
result["category"] = "empty_patch"
result["confidence"] = 1.0
result["error_message"] = "No patch generated"
return result
# Match against failure patterns
matched_categories = []
for category, patterns in FAILURE_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, content_to_analyze, re.IGNORECASE):
matched_categories.append(category)
break
if matched_categories:
result["category"] = matched_categories[0]
result["subcategories"] = matched_categories[1:]
result["confidence"] = 0.8 if len(matched_categories) == 1 else 0.6
# Extract specific error message
error_patterns = [
r"(Error: .+?)(?:\n|$)",
r"(Exception: .+?)(?:\n|$)",
r"(FAILED .+?)(?:\n|$)",
r"(AssertionError: .+?)(?:\n|$)",
]
for pattern in error_patterns:
match = re.search(pattern, content_to_analyze)
if match:
result["error_message"] = match.group(1).strip()[:200]
break
if not result["error_message"] and error_message:
result["error_message"] = error_message[:200]
return result
def analyze_failures(
results: dict[str, Any],
predictions: dict[str, Any] | None = None
) -> dict[str, Any]:
"""
Analyze all failures in a results set.
Returns:
Comprehensive failure analysis including:
- category_counts: Count by failure category
- failures: List of categorized failures
- patterns: Common failure patterns
- recommendations: Suggested improvements
"""
analysis = {
"timestamp": datetime.now().isoformat(),
"total_instances": results.get("total", len(results.get("instances", {}))),
"total_failures": 0,
"category_counts": Counter(),
"failures": [],
"patterns": {},
"recommendations": []
}
# Analyze each failed instance
for instance_id, instance_data in results.get("instances", {}).items():
status = instance_data.get("status", "unknown")
if status in ("passed",):
continue
analysis["total_failures"] += 1
pred_data = predictions.get(instance_id) if predictions else None
failure_info = categorize_failure(instance_id, instance_data, pred_data)
analysis["category_counts"][failure_info["category"]] += 1
analysis["failures"].append(failure_info)
# Convert Counter to dict for JSON
analysis["category_counts"] = dict(analysis["category_counts"])
# Identify patterns
analysis["patterns"] = identify_patterns(analysis["failures"])
# Generate recommendations
analysis["recommendations"] = generate_recommendations(analysis)
return analysis
def identify_patterns(failures: list[dict[str, Any]]) -> dict[str, Any]:
"""Identify common patterns across failures."""
patterns = {
"by_repo": defaultdict(list),
"by_error_type": defaultdict(list),
"common_errors": [],
}
error_messages = []
for failure in failures:
instance_id = failure["instance_id"]
# Group by repository
if "__" in instance_id:
repo = instance_id.split("__")[0]
patterns["by_repo"][repo].append(instance_id)
# Group by error type
patterns["by_error_type"][failure["category"]].append(instance_id)
# Collect error messages for pattern detection
if failure.get("error_message"):
error_messages.append(failure["error_message"])
# Find most common error message fragments
if error_messages:
# Simple n-gram analysis for common phrases
word_counts = Counter()
for msg in error_messages:
words = msg.lower().split()
for i in range(len(words) - 2):
phrase = " ".join(words[i:i+3])
word_counts[phrase] += 1
patterns["common_errors"] = [
{"phrase": phrase, "count": count}
for phrase, count in word_counts.most_common(10)
if count > 1
]
# Convert defaultdicts
patterns["by_repo"] = dict(patterns["by_repo"])
patterns["by_error_type"] = dict(patterns["by_error_type"])
return patterns
def generate_recommendations(analysis: dict[str, Any]) -> list[dict[str, str]]:
"""Generate recommendations based on failure analysis."""
recommendations = []
category_counts = analysis["category_counts"]
total = analysis["total_failures"]
if total == 0:
return [{"type": "success", "message": "No failures to analyze!"}]
# Recommendations based on category distribution
if category_counts.get("empty_patch", 0) > total * 0.1:
recommendations.append({
"type": "critical",
"category": "empty_patch",
"message": f"{category_counts['empty_patch']} instances ({category_counts['empty_patch']/total*100:.1f}%) "
"produced empty patches. Consider improving prompt engineering or adding retry logic."
})
if category_counts.get("apply_failure", 0) > total * 0.1:
recommendations.append({
"type": "critical",
"category": "apply_failure",
"message": f"{category_counts['apply_failure']} instances had patch application failures. "
"Patches may have incorrect context or line numbers."
})
if category_counts.get("syntax_error", 0) > total * 0.05:
recommendations.append({
"type": "high",
"category": "syntax_error",
"message": f"{category_counts['syntax_error']} instances had syntax errors. "
"Consider adding syntax validation before submission."
})
if category_counts.get("test_failure", 0) > total * 0.2:
recommendations.append({
"type": "medium",
"category": "test_failure",
"message": f"{category_counts['test_failure']} instances failed tests. "
"The patches may be functionally incorrect or incomplete."
})
if category_counts.get("timeout", 0) > total * 0.05:
recommendations.append({
"type": "medium",
"category": "timeout",
"message": f"{category_counts['timeout']} instances timed out. "
"Consider increasing timeout or optimizing patch execution."
})
# Repo-specific recommendations
patterns = analysis.get("patterns", {})
by_repo = patterns.get("by_repo", {})
for repo, failures in sorted(by_repo.items(), key=lambda x: -len(x[1]))[:3]:
if len(failures) >= 3:
recommendations.append({
"type": "info",
"category": "repo_pattern",
"message": f"Repository '{repo}' has {len(failures)} failures. "
"May indicate specific challenges with this codebase."
})
return recommendations
def compare_failures(
vanilla_analysis: dict[str, Any],
omc_analysis: dict[str, Any]
) -> dict[str, Any]:
"""Compare failure patterns between vanilla and OMC."""
comparison = {
"timestamp": datetime.now().isoformat(),
"vanilla_failures": vanilla_analysis["total_failures"],
"omc_failures": omc_analysis["total_failures"],
"category_comparison": {},
"unique_to_vanilla": [],
"unique_to_omc": [],
"common_failures": [],
"insights": []
}
# Category comparison
all_categories = set(vanilla_analysis["category_counts"].keys()) | \
set(omc_analysis["category_counts"].keys())
for category in all_categories:
vanilla_count = vanilla_analysis["category_counts"].get(category, 0)
omc_count = omc_analysis["category_counts"].get(category, 0)
comparison["category_comparison"][category] = {
"vanilla": vanilla_count,
"omc": omc_count,
"delta": omc_count - vanilla_count
}
# Instance comparison
vanilla_failed = {f["instance_id"] for f in vanilla_analysis["failures"]}
omc_failed = {f["instance_id"] for f in omc_analysis["failures"]}
comparison["unique_to_vanilla"] = list(vanilla_failed - omc_failed)
comparison["unique_to_omc"] = list(omc_failed - vanilla_failed)
comparison["common_failures"] = list(vanilla_failed & omc_failed)
# Generate insights
insights = []
if len(comparison["unique_to_vanilla"]) > len(comparison["unique_to_omc"]):
insights.append({
"type": "positive",
"message": f"OMC fixed {len(comparison['unique_to_vanilla'])} failures that vanilla couldn't solve."
})
elif len(comparison["unique_to_omc"]) > len(comparison["unique_to_vanilla"]):
insights.append({
"type": "negative",
"message": f"OMC introduced {len(comparison['unique_to_omc'])} new failures compared to vanilla."
})
# Check for category improvements
for category, counts in comparison["category_comparison"].items():
if counts["delta"] < -2:
insights.append({
"type": "positive",
"message": f"OMC reduced '{category}' failures by {abs(counts['delta'])}."
})
elif counts["delta"] > 2:
insights.append({
"type": "negative",
"message": f"OMC increased '{category}' failures by {counts['delta']}."
})
comparison["insights"] = insights
return comparison
def generate_failure_report(
analysis: dict[str, Any],
comparison: dict[str, Any] | None = None
) -> str:
"""Generate a detailed failure analysis report."""
lines = [
"# SWE-bench Failure Analysis Report",
"",
f"**Generated:** {analysis['timestamp']}",
"",
"## Summary",
"",
f"- **Total Instances:** {analysis['total_instances']}",
f"- **Total Failures:** {analysis['total_failures']}",
f"- **Failure Rate:** {analysis['total_failures']/max(analysis['total_instances'],1)*100:.1f}%",
"",
"## Failure Categories",
"",
"| Category | Count | Percentage |",
"|----------|-------|------------|",
]
total = max(analysis["total_failures"], 1)
for category, count in sorted(
analysis["category_counts"].items(),
key=lambda x: -x[1]
):
pct = count / total * 100
lines.append(f"| {category} | {count} | {pct:.1f}% |")
lines.extend([
"",
"## Recommendations",
"",
])
for rec in analysis["recommendations"]:
priority = {"critical": "!!!", "high": "!!", "medium": "!", "info": "i"}.get(rec["type"], "-")
lines.append(f"- [{priority}] {rec['message']}")
# Repository breakdown
if analysis.get("patterns", {}).get("by_repo"):
lines.extend([
"",
"## Failures by Repository",
"",
"| Repository | Failures |",
"|------------|----------|",
])
for repo, failures in sorted(
analysis["patterns"]["by_repo"].items(),
key=lambda x: -len(x[1])
)[:10]:
lines.append(f"| {repo} | {len(failures)} |")
# Comparison section
if comparison:
lines.extend([
"",
"## Vanilla vs OMC Comparison",
"",
f"- **Vanilla Failures:** {comparison['vanilla_failures']}",
f"- **OMC Failures:** {comparison['omc_failures']}",
f"- **Fixed by OMC:** {len(comparison['unique_to_vanilla'])}",
f"- **New in OMC:** {len(comparison['unique_to_omc'])}",
f"- **Common Failures:** {len(comparison['common_failures'])}",
"",
"### Category Changes",
"",
"| Category | Vanilla | OMC | Delta |",
"|----------|---------|-----|-------|",
])
for category, counts in sorted(
comparison["category_comparison"].items(),
key=lambda x: x[1]["delta"]
):
delta_str = f"{counts['delta']:+d}" if counts['delta'] != 0 else "0"
lines.append(f"| {category} | {counts['vanilla']} | {counts['omc']} | {delta_str} |")
if comparison.get("insights"):
lines.extend([
"",
"### Insights",
"",
])
for insight in comparison["insights"]:
icon = {"positive": "+", "negative": "-", "neutral": "="}.get(insight["type"], "*")
lines.append(f"- [{icon}] {insight['message']}")
# Sample failures
if analysis["failures"]:
lines.extend([
"",
"## Sample Failures",
"",
])
for failure in analysis["failures"][:10]:
lines.append(f"### {failure['instance_id']}")
lines.append(f"- **Category:** {failure['category']}")
if failure.get("error_message"):
lines.append(f"- **Error:** `{failure['error_message']}`")
if failure.get("details"):
for k, v in failure["details"].items():
lines.append(f"- **{k}:** {v}")
lines.append("")
lines.extend([
"",
"---",
"",
"*Report generated by analyze_failures.py*"
])
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(
description="Analyze SWE-bench failure patterns",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Analyze single run
python analyze_failures.py --results results/vanilla/
# With predictions for more context
python analyze_failures.py --results results/omc/ --predictions predictions.json
# Compare vanilla vs OMC failures
python analyze_failures.py --vanilla results/vanilla/ --omc results/omc/ --compare
"""
)
parser.add_argument(
"--results",
type=Path,
help="Path to results directory for single analysis"
)
parser.add_argument(
"--predictions",
type=Path,
help="Path to predictions JSON for additional context"
)
parser.add_argument(
"--vanilla",
type=Path,
help="Path to vanilla results for comparison"
)
parser.add_argument(
"--omc",
type=Path,
help="Path to OMC results for comparison"
)
parser.add_argument(
"--compare",
action="store_true",
help="Compare vanilla vs OMC (requires --vanilla and --omc)"
)
parser.add_argument(
"--output", "-o",
type=Path,
default=Path("analysis"),
help="Output directory for analysis reports (default: analysis/)"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose logging"
)
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Validate arguments
if args.compare:
if not args.vanilla or not args.omc:
parser.error("--compare requires both --vanilla and --omc")
elif not args.results:
parser.error("Either --results or (--vanilla, --omc, --compare) required")
args.output.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if args.compare:
# Comparison mode
logger.info(f"Loading vanilla results from {args.vanilla}")
vanilla_results = load_results(args.vanilla)
vanilla_predictions = None
logger.info(f"Loading OMC results from {args.omc}")
omc_results = load_results(args.omc)
omc_predictions = None
# Try to load predictions
for pred_path in [args.vanilla / "predictions.json", args.vanilla.parent / "vanilla_predictions.json"]:
if pred_path.exists():
vanilla_predictions = load_predictions(pred_path)
break
for pred_path in [args.omc / "predictions.json", args.omc.parent / "omc_predictions.json"]:
if pred_path.exists():
omc_predictions = load_predictions(pred_path)
break
logger.info("Analyzing failures...")
vanilla_analysis = analyze_failures(vanilla_results, vanilla_predictions)
omc_analysis = analyze_failures(omc_results, omc_predictions)
logger.info("Comparing failures...")
comparison = compare_failures(vanilla_analysis, omc_analysis)
# Save outputs
json_file = args.output / f"comparison_analysis_{timestamp}.json"
with open(json_file, "w") as f:
json.dump({
"vanilla": vanilla_analysis,
"omc": omc_analysis,
"comparison": comparison
}, f, indent=2)
report = generate_failure_report(omc_analysis, comparison)
md_file = args.output / f"comparison_analysis_{timestamp}.md"
md_file.write_text(report)
print("\n" + "=" * 60)
print("FAILURE COMPARISON COMPLETE")
print("=" * 60)
print(f"Vanilla Failures: {vanilla_analysis['total_failures']}")
print(f"OMC Failures: {omc_analysis['total_failures']}")
print(f"Fixed by OMC: {len(comparison['unique_to_vanilla'])}")
print(f"New in OMC: {len(comparison['unique_to_omc'])}")
print(f"\nResults saved to: {args.output}")
print("=" * 60)
else:
# Single analysis mode
logger.info(f"Loading results from {args.results}")
results = load_results(args.results)
predictions = None
if args.predictions and args.predictions.exists():
predictions = load_predictions(args.predictions)
logger.info("Analyzing failures...")
analysis = analyze_failures(results, predictions)
# Save outputs
json_file = args.output / f"failure_analysis_{timestamp}.json"
with open(json_file, "w") as f:
json.dump(analysis, f, indent=2)
report = generate_failure_report(analysis)
md_file = args.output / f"failure_analysis_{timestamp}.md"
md_file.write_text(report)
print("\n" + "=" * 60)
print("FAILURE ANALYSIS COMPLETE")
print("=" * 60)
print(f"Total Instances: {analysis['total_instances']}")
print(f"Total Failures: {analysis['total_failures']}")
print(f"\nTop Categories:")
for cat, count in sorted(analysis["category_counts"].items(), key=lambda x: -x[1])[:5]:
print(f" {cat}: {count}")
print(f"\nResults saved to: {args.output}")
print("=" * 60)
return 0
if __name__ == "__main__":
exit(main())