mirror of
https://fastgit.cc/github.com/Yeachan-Heo/oh-my-claudecode
synced 2026-04-30 22:01:58 +08:00
717 lines
22 KiB
Python
Executable File
717 lines
22 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
SWE-bench Failure Analysis Tool
|
|
|
|
Analyze failed instances to identify patterns, categorize failures,
|
|
and understand differences between vanilla and OMC runs.
|
|
|
|
Usage:
|
|
python analyze_failures.py --results results/vanilla/ --predictions predictions.json
|
|
python analyze_failures.py --vanilla results/vanilla/ --omc results/omc/ --compare
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import re
|
|
from collections import Counter, defaultdict
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(levelname)s - %(message)s"
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# Common failure pattern definitions
|
|
FAILURE_PATTERNS = {
|
|
"syntax_error": [
|
|
r"SyntaxError",
|
|
r"IndentationError",
|
|
r"TabError",
|
|
],
|
|
"import_error": [
|
|
r"ImportError",
|
|
r"ModuleNotFoundError",
|
|
r"No module named",
|
|
],
|
|
"type_error": [
|
|
r"TypeError",
|
|
r"expected .+ got .+",
|
|
],
|
|
"attribute_error": [
|
|
r"AttributeError",
|
|
r"has no attribute",
|
|
],
|
|
"assertion_error": [
|
|
r"AssertionError",
|
|
r"assert .+ failed",
|
|
],
|
|
"test_failure": [
|
|
r"FAILED",
|
|
r"test.*failed",
|
|
r"failures=\d+",
|
|
],
|
|
"timeout": [
|
|
r"timeout",
|
|
r"timed out",
|
|
r"TimeoutError",
|
|
],
|
|
"empty_patch": [
|
|
r"empty patch",
|
|
r"no changes",
|
|
r"patch is empty",
|
|
],
|
|
"apply_failure": [
|
|
r"patch.*failed",
|
|
r"could not apply",
|
|
r"git apply.*failed",
|
|
r"hunks? FAILED",
|
|
],
|
|
"runtime_error": [
|
|
r"RuntimeError",
|
|
r"Exception",
|
|
r"Error:",
|
|
],
|
|
"value_error": [
|
|
r"ValueError",
|
|
r"invalid .+ value",
|
|
],
|
|
"key_error": [
|
|
r"KeyError",
|
|
r"not found in",
|
|
],
|
|
}
|
|
|
|
|
|
def load_results(results_dir: Path) -> dict[str, Any]:
|
|
"""Load evaluation results."""
|
|
results = {"instances": {}}
|
|
|
|
summary_file = results_dir / "summary.json"
|
|
if summary_file.exists():
|
|
with open(summary_file) as f:
|
|
results = json.load(f)
|
|
|
|
# Also load from logs if available
|
|
logs_dir = results_dir / "logs"
|
|
if logs_dir.exists():
|
|
for log_file in logs_dir.glob("*.log"):
|
|
instance_id = log_file.stem
|
|
if instance_id not in results.get("instances", {}):
|
|
results.setdefault("instances", {})[instance_id] = {}
|
|
|
|
results["instances"][instance_id]["log_content"] = log_file.read_text()
|
|
|
|
return results
|
|
|
|
|
|
def load_predictions(predictions_file: Path) -> dict[str, Any]:
|
|
"""Load predictions with metadata."""
|
|
with open(predictions_file) as f:
|
|
predictions = json.load(f)
|
|
|
|
if isinstance(predictions, list):
|
|
predictions = {p["instance_id"]: p for p in predictions}
|
|
|
|
return predictions
|
|
|
|
|
|
def categorize_failure(
|
|
instance_id: str,
|
|
instance_data: dict[str, Any],
|
|
prediction_data: dict[str, Any] | None = None
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Categorize a single failure instance.
|
|
|
|
Returns:
|
|
Dictionary with:
|
|
- category: Primary failure category
|
|
- subcategories: Additional categories
|
|
- error_message: Extracted error message
|
|
- confidence: Confidence in categorization
|
|
"""
|
|
result = {
|
|
"instance_id": instance_id,
|
|
"category": "unknown",
|
|
"subcategories": [],
|
|
"error_message": None,
|
|
"confidence": 0.0,
|
|
"details": {}
|
|
}
|
|
|
|
# Get content to analyze
|
|
log_content = instance_data.get("log_content", "")
|
|
error_message = instance_data.get("error_message", "")
|
|
patch = ""
|
|
|
|
if prediction_data:
|
|
patch = prediction_data.get("model_patch", prediction_data.get("patch", ""))
|
|
result["details"]["patch_length"] = len(patch)
|
|
result["details"]["patch_lines"] = patch.count("\n") + 1 if patch else 0
|
|
|
|
content_to_analyze = f"{log_content}\n{error_message}"
|
|
|
|
# Check for empty patch first
|
|
if prediction_data and not patch.strip():
|
|
result["category"] = "empty_patch"
|
|
result["confidence"] = 1.0
|
|
result["error_message"] = "No patch generated"
|
|
return result
|
|
|
|
# Match against failure patterns
|
|
matched_categories = []
|
|
|
|
for category, patterns in FAILURE_PATTERNS.items():
|
|
for pattern in patterns:
|
|
if re.search(pattern, content_to_analyze, re.IGNORECASE):
|
|
matched_categories.append(category)
|
|
break
|
|
|
|
if matched_categories:
|
|
result["category"] = matched_categories[0]
|
|
result["subcategories"] = matched_categories[1:]
|
|
result["confidence"] = 0.8 if len(matched_categories) == 1 else 0.6
|
|
|
|
# Extract specific error message
|
|
error_patterns = [
|
|
r"(Error: .+?)(?:\n|$)",
|
|
r"(Exception: .+?)(?:\n|$)",
|
|
r"(FAILED .+?)(?:\n|$)",
|
|
r"(AssertionError: .+?)(?:\n|$)",
|
|
]
|
|
|
|
for pattern in error_patterns:
|
|
match = re.search(pattern, content_to_analyze)
|
|
if match:
|
|
result["error_message"] = match.group(1).strip()[:200]
|
|
break
|
|
|
|
if not result["error_message"] and error_message:
|
|
result["error_message"] = error_message[:200]
|
|
|
|
return result
|
|
|
|
|
|
def analyze_failures(
|
|
results: dict[str, Any],
|
|
predictions: dict[str, Any] | None = None
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Analyze all failures in a results set.
|
|
|
|
Returns:
|
|
Comprehensive failure analysis including:
|
|
- category_counts: Count by failure category
|
|
- failures: List of categorized failures
|
|
- patterns: Common failure patterns
|
|
- recommendations: Suggested improvements
|
|
"""
|
|
analysis = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"total_instances": results.get("total", len(results.get("instances", {}))),
|
|
"total_failures": 0,
|
|
"category_counts": Counter(),
|
|
"failures": [],
|
|
"patterns": {},
|
|
"recommendations": []
|
|
}
|
|
|
|
# Analyze each failed instance
|
|
for instance_id, instance_data in results.get("instances", {}).items():
|
|
status = instance_data.get("status", "unknown")
|
|
|
|
if status in ("passed",):
|
|
continue
|
|
|
|
analysis["total_failures"] += 1
|
|
|
|
pred_data = predictions.get(instance_id) if predictions else None
|
|
failure_info = categorize_failure(instance_id, instance_data, pred_data)
|
|
|
|
analysis["category_counts"][failure_info["category"]] += 1
|
|
analysis["failures"].append(failure_info)
|
|
|
|
# Convert Counter to dict for JSON
|
|
analysis["category_counts"] = dict(analysis["category_counts"])
|
|
|
|
# Identify patterns
|
|
analysis["patterns"] = identify_patterns(analysis["failures"])
|
|
|
|
# Generate recommendations
|
|
analysis["recommendations"] = generate_recommendations(analysis)
|
|
|
|
return analysis
|
|
|
|
|
|
def identify_patterns(failures: list[dict[str, Any]]) -> dict[str, Any]:
|
|
"""Identify common patterns across failures."""
|
|
patterns = {
|
|
"by_repo": defaultdict(list),
|
|
"by_error_type": defaultdict(list),
|
|
"common_errors": [],
|
|
}
|
|
|
|
error_messages = []
|
|
|
|
for failure in failures:
|
|
instance_id = failure["instance_id"]
|
|
|
|
# Group by repository
|
|
if "__" in instance_id:
|
|
repo = instance_id.split("__")[0]
|
|
patterns["by_repo"][repo].append(instance_id)
|
|
|
|
# Group by error type
|
|
patterns["by_error_type"][failure["category"]].append(instance_id)
|
|
|
|
# Collect error messages for pattern detection
|
|
if failure.get("error_message"):
|
|
error_messages.append(failure["error_message"])
|
|
|
|
# Find most common error message fragments
|
|
if error_messages:
|
|
# Simple n-gram analysis for common phrases
|
|
word_counts = Counter()
|
|
for msg in error_messages:
|
|
words = msg.lower().split()
|
|
for i in range(len(words) - 2):
|
|
phrase = " ".join(words[i:i+3])
|
|
word_counts[phrase] += 1
|
|
|
|
patterns["common_errors"] = [
|
|
{"phrase": phrase, "count": count}
|
|
for phrase, count in word_counts.most_common(10)
|
|
if count > 1
|
|
]
|
|
|
|
# Convert defaultdicts
|
|
patterns["by_repo"] = dict(patterns["by_repo"])
|
|
patterns["by_error_type"] = dict(patterns["by_error_type"])
|
|
|
|
return patterns
|
|
|
|
|
|
def generate_recommendations(analysis: dict[str, Any]) -> list[dict[str, str]]:
|
|
"""Generate recommendations based on failure analysis."""
|
|
recommendations = []
|
|
category_counts = analysis["category_counts"]
|
|
total = analysis["total_failures"]
|
|
|
|
if total == 0:
|
|
return [{"type": "success", "message": "No failures to analyze!"}]
|
|
|
|
# Recommendations based on category distribution
|
|
if category_counts.get("empty_patch", 0) > total * 0.1:
|
|
recommendations.append({
|
|
"type": "critical",
|
|
"category": "empty_patch",
|
|
"message": f"{category_counts['empty_patch']} instances ({category_counts['empty_patch']/total*100:.1f}%) "
|
|
"produced empty patches. Consider improving prompt engineering or adding retry logic."
|
|
})
|
|
|
|
if category_counts.get("apply_failure", 0) > total * 0.1:
|
|
recommendations.append({
|
|
"type": "critical",
|
|
"category": "apply_failure",
|
|
"message": f"{category_counts['apply_failure']} instances had patch application failures. "
|
|
"Patches may have incorrect context or line numbers."
|
|
})
|
|
|
|
if category_counts.get("syntax_error", 0) > total * 0.05:
|
|
recommendations.append({
|
|
"type": "high",
|
|
"category": "syntax_error",
|
|
"message": f"{category_counts['syntax_error']} instances had syntax errors. "
|
|
"Consider adding syntax validation before submission."
|
|
})
|
|
|
|
if category_counts.get("test_failure", 0) > total * 0.2:
|
|
recommendations.append({
|
|
"type": "medium",
|
|
"category": "test_failure",
|
|
"message": f"{category_counts['test_failure']} instances failed tests. "
|
|
"The patches may be functionally incorrect or incomplete."
|
|
})
|
|
|
|
if category_counts.get("timeout", 0) > total * 0.05:
|
|
recommendations.append({
|
|
"type": "medium",
|
|
"category": "timeout",
|
|
"message": f"{category_counts['timeout']} instances timed out. "
|
|
"Consider increasing timeout or optimizing patch execution."
|
|
})
|
|
|
|
# Repo-specific recommendations
|
|
patterns = analysis.get("patterns", {})
|
|
by_repo = patterns.get("by_repo", {})
|
|
|
|
for repo, failures in sorted(by_repo.items(), key=lambda x: -len(x[1]))[:3]:
|
|
if len(failures) >= 3:
|
|
recommendations.append({
|
|
"type": "info",
|
|
"category": "repo_pattern",
|
|
"message": f"Repository '{repo}' has {len(failures)} failures. "
|
|
"May indicate specific challenges with this codebase."
|
|
})
|
|
|
|
return recommendations
|
|
|
|
|
|
def compare_failures(
|
|
vanilla_analysis: dict[str, Any],
|
|
omc_analysis: dict[str, Any]
|
|
) -> dict[str, Any]:
|
|
"""Compare failure patterns between vanilla and OMC."""
|
|
comparison = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"vanilla_failures": vanilla_analysis["total_failures"],
|
|
"omc_failures": omc_analysis["total_failures"],
|
|
"category_comparison": {},
|
|
"unique_to_vanilla": [],
|
|
"unique_to_omc": [],
|
|
"common_failures": [],
|
|
"insights": []
|
|
}
|
|
|
|
# Category comparison
|
|
all_categories = set(vanilla_analysis["category_counts"].keys()) | \
|
|
set(omc_analysis["category_counts"].keys())
|
|
|
|
for category in all_categories:
|
|
vanilla_count = vanilla_analysis["category_counts"].get(category, 0)
|
|
omc_count = omc_analysis["category_counts"].get(category, 0)
|
|
|
|
comparison["category_comparison"][category] = {
|
|
"vanilla": vanilla_count,
|
|
"omc": omc_count,
|
|
"delta": omc_count - vanilla_count
|
|
}
|
|
|
|
# Instance comparison
|
|
vanilla_failed = {f["instance_id"] for f in vanilla_analysis["failures"]}
|
|
omc_failed = {f["instance_id"] for f in omc_analysis["failures"]}
|
|
|
|
comparison["unique_to_vanilla"] = list(vanilla_failed - omc_failed)
|
|
comparison["unique_to_omc"] = list(omc_failed - vanilla_failed)
|
|
comparison["common_failures"] = list(vanilla_failed & omc_failed)
|
|
|
|
# Generate insights
|
|
insights = []
|
|
|
|
if len(comparison["unique_to_vanilla"]) > len(comparison["unique_to_omc"]):
|
|
insights.append({
|
|
"type": "positive",
|
|
"message": f"OMC fixed {len(comparison['unique_to_vanilla'])} failures that vanilla couldn't solve."
|
|
})
|
|
elif len(comparison["unique_to_omc"]) > len(comparison["unique_to_vanilla"]):
|
|
insights.append({
|
|
"type": "negative",
|
|
"message": f"OMC introduced {len(comparison['unique_to_omc'])} new failures compared to vanilla."
|
|
})
|
|
|
|
# Check for category improvements
|
|
for category, counts in comparison["category_comparison"].items():
|
|
if counts["delta"] < -2:
|
|
insights.append({
|
|
"type": "positive",
|
|
"message": f"OMC reduced '{category}' failures by {abs(counts['delta'])}."
|
|
})
|
|
elif counts["delta"] > 2:
|
|
insights.append({
|
|
"type": "negative",
|
|
"message": f"OMC increased '{category}' failures by {counts['delta']}."
|
|
})
|
|
|
|
comparison["insights"] = insights
|
|
|
|
return comparison
|
|
|
|
|
|
def generate_failure_report(
|
|
analysis: dict[str, Any],
|
|
comparison: dict[str, Any] | None = None
|
|
) -> str:
|
|
"""Generate a detailed failure analysis report."""
|
|
lines = [
|
|
"# SWE-bench Failure Analysis Report",
|
|
"",
|
|
f"**Generated:** {analysis['timestamp']}",
|
|
"",
|
|
"## Summary",
|
|
"",
|
|
f"- **Total Instances:** {analysis['total_instances']}",
|
|
f"- **Total Failures:** {analysis['total_failures']}",
|
|
f"- **Failure Rate:** {analysis['total_failures']/max(analysis['total_instances'],1)*100:.1f}%",
|
|
"",
|
|
"## Failure Categories",
|
|
"",
|
|
"| Category | Count | Percentage |",
|
|
"|----------|-------|------------|",
|
|
]
|
|
|
|
total = max(analysis["total_failures"], 1)
|
|
for category, count in sorted(
|
|
analysis["category_counts"].items(),
|
|
key=lambda x: -x[1]
|
|
):
|
|
pct = count / total * 100
|
|
lines.append(f"| {category} | {count} | {pct:.1f}% |")
|
|
|
|
lines.extend([
|
|
"",
|
|
"## Recommendations",
|
|
"",
|
|
])
|
|
|
|
for rec in analysis["recommendations"]:
|
|
priority = {"critical": "!!!", "high": "!!", "medium": "!", "info": "i"}.get(rec["type"], "-")
|
|
lines.append(f"- [{priority}] {rec['message']}")
|
|
|
|
# Repository breakdown
|
|
if analysis.get("patterns", {}).get("by_repo"):
|
|
lines.extend([
|
|
"",
|
|
"## Failures by Repository",
|
|
"",
|
|
"| Repository | Failures |",
|
|
"|------------|----------|",
|
|
])
|
|
|
|
for repo, failures in sorted(
|
|
analysis["patterns"]["by_repo"].items(),
|
|
key=lambda x: -len(x[1])
|
|
)[:10]:
|
|
lines.append(f"| {repo} | {len(failures)} |")
|
|
|
|
# Comparison section
|
|
if comparison:
|
|
lines.extend([
|
|
"",
|
|
"## Vanilla vs OMC Comparison",
|
|
"",
|
|
f"- **Vanilla Failures:** {comparison['vanilla_failures']}",
|
|
f"- **OMC Failures:** {comparison['omc_failures']}",
|
|
f"- **Fixed by OMC:** {len(comparison['unique_to_vanilla'])}",
|
|
f"- **New in OMC:** {len(comparison['unique_to_omc'])}",
|
|
f"- **Common Failures:** {len(comparison['common_failures'])}",
|
|
"",
|
|
"### Category Changes",
|
|
"",
|
|
"| Category | Vanilla | OMC | Delta |",
|
|
"|----------|---------|-----|-------|",
|
|
])
|
|
|
|
for category, counts in sorted(
|
|
comparison["category_comparison"].items(),
|
|
key=lambda x: x[1]["delta"]
|
|
):
|
|
delta_str = f"{counts['delta']:+d}" if counts['delta'] != 0 else "0"
|
|
lines.append(f"| {category} | {counts['vanilla']} | {counts['omc']} | {delta_str} |")
|
|
|
|
if comparison.get("insights"):
|
|
lines.extend([
|
|
"",
|
|
"### Insights",
|
|
"",
|
|
])
|
|
for insight in comparison["insights"]:
|
|
icon = {"positive": "+", "negative": "-", "neutral": "="}.get(insight["type"], "*")
|
|
lines.append(f"- [{icon}] {insight['message']}")
|
|
|
|
# Sample failures
|
|
if analysis["failures"]:
|
|
lines.extend([
|
|
"",
|
|
"## Sample Failures",
|
|
"",
|
|
])
|
|
|
|
for failure in analysis["failures"][:10]:
|
|
lines.append(f"### {failure['instance_id']}")
|
|
lines.append(f"- **Category:** {failure['category']}")
|
|
if failure.get("error_message"):
|
|
lines.append(f"- **Error:** `{failure['error_message']}`")
|
|
if failure.get("details"):
|
|
for k, v in failure["details"].items():
|
|
lines.append(f"- **{k}:** {v}")
|
|
lines.append("")
|
|
|
|
lines.extend([
|
|
"",
|
|
"---",
|
|
"",
|
|
"*Report generated by analyze_failures.py*"
|
|
])
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Analyze SWE-bench failure patterns",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Analyze single run
|
|
python analyze_failures.py --results results/vanilla/
|
|
|
|
# With predictions for more context
|
|
python analyze_failures.py --results results/omc/ --predictions predictions.json
|
|
|
|
# Compare vanilla vs OMC failures
|
|
python analyze_failures.py --vanilla results/vanilla/ --omc results/omc/ --compare
|
|
"""
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--results",
|
|
type=Path,
|
|
help="Path to results directory for single analysis"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--predictions",
|
|
type=Path,
|
|
help="Path to predictions JSON for additional context"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--vanilla",
|
|
type=Path,
|
|
help="Path to vanilla results for comparison"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--omc",
|
|
type=Path,
|
|
help="Path to OMC results for comparison"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--compare",
|
|
action="store_true",
|
|
help="Compare vanilla vs OMC (requires --vanilla and --omc)"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--output", "-o",
|
|
type=Path,
|
|
default=Path("analysis"),
|
|
help="Output directory for analysis reports (default: analysis/)"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--verbose", "-v",
|
|
action="store_true",
|
|
help="Enable verbose logging"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.verbose:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
# Validate arguments
|
|
if args.compare:
|
|
if not args.vanilla or not args.omc:
|
|
parser.error("--compare requires both --vanilla and --omc")
|
|
elif not args.results:
|
|
parser.error("Either --results or (--vanilla, --omc, --compare) required")
|
|
|
|
args.output.mkdir(parents=True, exist_ok=True)
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
if args.compare:
|
|
# Comparison mode
|
|
logger.info(f"Loading vanilla results from {args.vanilla}")
|
|
vanilla_results = load_results(args.vanilla)
|
|
vanilla_predictions = None
|
|
|
|
logger.info(f"Loading OMC results from {args.omc}")
|
|
omc_results = load_results(args.omc)
|
|
omc_predictions = None
|
|
|
|
# Try to load predictions
|
|
for pred_path in [args.vanilla / "predictions.json", args.vanilla.parent / "vanilla_predictions.json"]:
|
|
if pred_path.exists():
|
|
vanilla_predictions = load_predictions(pred_path)
|
|
break
|
|
|
|
for pred_path in [args.omc / "predictions.json", args.omc.parent / "omc_predictions.json"]:
|
|
if pred_path.exists():
|
|
omc_predictions = load_predictions(pred_path)
|
|
break
|
|
|
|
logger.info("Analyzing failures...")
|
|
vanilla_analysis = analyze_failures(vanilla_results, vanilla_predictions)
|
|
omc_analysis = analyze_failures(omc_results, omc_predictions)
|
|
|
|
logger.info("Comparing failures...")
|
|
comparison = compare_failures(vanilla_analysis, omc_analysis)
|
|
|
|
# Save outputs
|
|
json_file = args.output / f"comparison_analysis_{timestamp}.json"
|
|
with open(json_file, "w") as f:
|
|
json.dump({
|
|
"vanilla": vanilla_analysis,
|
|
"omc": omc_analysis,
|
|
"comparison": comparison
|
|
}, f, indent=2)
|
|
|
|
report = generate_failure_report(omc_analysis, comparison)
|
|
md_file = args.output / f"comparison_analysis_{timestamp}.md"
|
|
md_file.write_text(report)
|
|
|
|
print("\n" + "=" * 60)
|
|
print("FAILURE COMPARISON COMPLETE")
|
|
print("=" * 60)
|
|
print(f"Vanilla Failures: {vanilla_analysis['total_failures']}")
|
|
print(f"OMC Failures: {omc_analysis['total_failures']}")
|
|
print(f"Fixed by OMC: {len(comparison['unique_to_vanilla'])}")
|
|
print(f"New in OMC: {len(comparison['unique_to_omc'])}")
|
|
print(f"\nResults saved to: {args.output}")
|
|
print("=" * 60)
|
|
|
|
else:
|
|
# Single analysis mode
|
|
logger.info(f"Loading results from {args.results}")
|
|
results = load_results(args.results)
|
|
|
|
predictions = None
|
|
if args.predictions and args.predictions.exists():
|
|
predictions = load_predictions(args.predictions)
|
|
|
|
logger.info("Analyzing failures...")
|
|
analysis = analyze_failures(results, predictions)
|
|
|
|
# Save outputs
|
|
json_file = args.output / f"failure_analysis_{timestamp}.json"
|
|
with open(json_file, "w") as f:
|
|
json.dump(analysis, f, indent=2)
|
|
|
|
report = generate_failure_report(analysis)
|
|
md_file = args.output / f"failure_analysis_{timestamp}.md"
|
|
md_file.write_text(report)
|
|
|
|
print("\n" + "=" * 60)
|
|
print("FAILURE ANALYSIS COMPLETE")
|
|
print("=" * 60)
|
|
print(f"Total Instances: {analysis['total_instances']}")
|
|
print(f"Total Failures: {analysis['total_failures']}")
|
|
print(f"\nTop Categories:")
|
|
for cat, count in sorted(analysis["category_counts"].items(), key=lambda x: -x[1])[:5]:
|
|
print(f" {cat}: {count}")
|
|
print(f"\nResults saved to: {args.output}")
|
|
print("=" * 60)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
exit(main())
|