Files
LPR-OCR/integrated_analysis.py
AtHeartEngineer 85ac8e08d2 init for sharing
2025-07-30 13:44:13 -04:00

524 lines
22 KiB
Python

#!/usr/bin/env python3
"""
Integrated License Plate Analysis
Combines multi-image analysis and super resolution analysis into a comprehensive workflow.
Generates per-image candidates, combined frequency analysis, super resolution images, and final results.
"""
import cv2
import numpy as np
import pytesseract
from PIL import Image, ImageEnhance
import os
import json
import re
import argparse
from pathlib import Path
from collections import defaultdict, Counter
from datetime import datetime
import subprocess
import sys
class IntegratedAnalyzer:
def __init__(self, project_dir):
self.project_dir = Path(project_dir)
self.raw_dir = self.project_dir / 'raw'
self.annotations_dir = self.project_dir / 'annotations'
self.debug_dir = self.project_dir / 'debug'
self.output_dir = self.project_dir / 'output'
# Ensure directories exist
self.debug_dir.mkdir(exist_ok=True)
self.output_dir.mkdir(exist_ok=True)
print(f"✓ Integrated Analyzer initialized for project {self.project_dir.name}")
def run_annotation_analysis(self):
"""Run annotation analysis to generate detection parameters."""
print("\n" + "="*60)
print("PHASE 0: ANNOTATION ANALYSIS & PARAMETER GENERATION")
print("="*60)
try:
result = subprocess.run([
sys.executable, 'annotate_project.py',
'--project-id', str(int(self.project_dir.name)),
'--analyze'
], capture_output=True, text=True, check=True)
print("✓ Annotation analysis completed")
# Check if detection parameters were generated
params_file = self.project_dir / 'debug' / 'detection_parameters.json'
if params_file.exists():
with open(params_file, 'r') as f:
params = json.load(f)
print(f"✓ Detection parameters generated:")
print(f" Width range: {params['min_width']} - {params['max_width']}")
print(f" Height range: {params['min_height']} - {params['max_height']}")
print(f" Aspect ratio: {params['min_aspect_ratio']:.2f} - {params['max_aspect_ratio']:.2f}")
return params
else:
print("⚠ Detection parameters not found")
return None
except subprocess.CalledProcessError as e:
print(f"✗ Annotation analysis failed: {e}")
if e.stderr:
print(f"Error: {e.stderr}")
return None
def run_multi_image_analysis(self):
"""Run multi-image analysis component."""
print("\n" + "="*60)
print("PHASE 1: MULTI-IMAGE ANALYSIS")
print("="*60)
try:
result = subprocess.run([
sys.executable, 'multi_image_analysis.py',
'--project-id', str(int(self.project_dir.name))
], capture_output=True, text=True, check=True)
print("✓ Multi-image analysis completed")
# Check if investigation report was generated
investigation_report = self.output_dir / 'investigation_report.json'
if investigation_report.exists():
with open(investigation_report, 'r') as f:
multi_image_results = json.load(f)
return multi_image_results
else:
print("⚠ Investigation report not found")
return None
except subprocess.CalledProcessError as e:
print(f"✗ Multi-image analysis failed: {e}")
if e.stderr:
print(f"Error: {e.stderr}")
return None
def run_super_resolution_analysis(self):
"""Run super resolution analysis component."""
print("\n" + "="*60)
print("PHASE 2: SUPER RESOLUTION ANALYSIS")
print("="*60)
try:
result = subprocess.run([
sys.executable, 'super_resolution_analysis.py',
'--project-id', str(int(self.project_dir.name))
], capture_output=True, text=True, check=True)
print("✓ Super resolution analysis completed")
# Check if super resolution report was generated
super_res_report = self.output_dir / 'super_resolution_report.json'
if super_res_report.exists():
with open(super_res_report, 'r') as f:
super_res_results = json.load(f)
return super_res_results
else:
print("⚠ Super resolution report not found")
return None
except subprocess.CalledProcessError as e:
print(f"✗ Super resolution analysis failed: {e}")
if e.stderr:
print(f"Error: {e.stderr}")
return None
def combine_and_rank_results(self, multi_image_results, super_res_results):
"""Combine results from both analyses and create final ranking."""
print("\n" + "="*60)
print("PHASE 3: RESULT INTEGRATION & RANKING")
print("="*60)
combined_candidates = {}
# Process multi-image results
if multi_image_results and 'top_candidates' in multi_image_results:
print(f"Processing {len(multi_image_results['top_candidates'])} multi-image candidates...")
for candidate in multi_image_results['top_candidates']:
text = candidate['text']
if text not in combined_candidates:
combined_candidates[text] = {
'text': text,
'multi_image_score': candidate['final_score'],
'super_res_score': 0,
'combined_score': 0,
'multi_image_data': candidate,
'super_res_data': None,
'sources': []
}
# Add source information
if 'sources' in candidate:
combined_candidates[text]['sources'].extend(candidate['sources'])
# Process super resolution results
if super_res_results and 'top_candidates' in super_res_results:
print(f"Processing {len(super_res_results['top_candidates'])} super resolution candidates...")
for candidate in super_res_results['top_candidates']:
text = candidate['text']
if text not in combined_candidates:
combined_candidates[text] = {
'text': text,
'multi_image_score': 0,
'super_res_score': candidate['confidence'],
'combined_score': 0,
'multi_image_data': None,
'super_res_data': candidate,
'sources': []
}
else:
combined_candidates[text]['super_res_score'] = candidate['confidence']
combined_candidates[text]['super_res_data'] = candidate
# Add source information
if 'source_image' in candidate:
combined_candidates[text]['sources'].append(candidate['source_image'])
# Calculate combined scores
for text, data in combined_candidates.items():
# Base score is the higher of the two methods
base_score = max(data['multi_image_score'], data['super_res_score'])
# Cross-validation bonus if both methods found it
cross_validation_bonus = 0
if data['multi_image_score'] > 0 and data['super_res_score'] > 0:
cross_validation_bonus = 25
print(f" Cross-validated: '{text}' (multi: {data['multi_image_score']:.1f}, super: {data['super_res_score']:.1f})")
# Multi-source bonus
unique_sources = len(set(data['sources']))
multi_source_bonus = (unique_sources - 1) * 10
data['combined_score'] = base_score + cross_validation_bonus + multi_source_bonus
data['cross_validation_bonus'] = cross_validation_bonus
data['multi_source_bonus'] = multi_source_bonus
data['unique_sources'] = unique_sources
# Sort by combined score
ranked_results = sorted(combined_candidates.values(), key=lambda x: x['combined_score'], reverse=True)
print(f"\n✓ Combined {len(combined_candidates)} unique candidates")
cross_validated = len([c for c in ranked_results if c['cross_validation_bonus'] > 0])
print(f"{cross_validated} candidates cross-validated between methods")
return ranked_results
def generate_per_image_breakdown(self, multi_image_results):
"""Generate per-image candidate breakdown."""
per_image_breakdown = {}
if multi_image_results and 'image_breakdown' in multi_image_results:
for image_name, data in multi_image_results['image_breakdown'].items():
per_image_breakdown[image_name] = {
'candidate_count': data['candidate_count'],
'top_candidates': data['top_candidates']
}
return per_image_breakdown
def generate_frequency_analysis(self, ranked_results):
"""Generate character frequency analysis for final ranking."""
print("\n" + "="*60)
print("PHASE 4: CHARACTER FREQUENCY ANALYSIS")
print("="*60)
# Group high-confidence candidates by length
high_confidence = [c for c in ranked_results if c['combined_score'] >= 60]
if not high_confidence:
print("No high-confidence candidates for frequency analysis")
return {}
length_groups = defaultdict(list)
for candidate in high_confidence:
text = candidate['text']
if len(text) >= 4: # Only analyze reasonable length plates
length_groups[len(text)].append(candidate)
frequency_results = {}
for length, candidates in length_groups.items():
if len(candidates) >= 2: # Need at least 2 candidates
print(f"\nAnalyzing {len(candidates)} candidates of length {length}:")
# Count character frequency at each position, weighted by combined score
position_chars = defaultdict(lambda: defaultdict(float))
for candidate in candidates:
text = candidate['text']
weight = candidate['combined_score'] / 100.0
for pos, char in enumerate(text):
position_chars[pos][char] += weight
# Build most likely string
most_likely = ""
total_confidence = 0
position_details = []
for pos in range(length):
if pos in position_chars:
char_scores = position_chars[pos]
best_char = max(char_scores.items(), key=lambda x: x[1])
most_likely += best_char[0]
# Calculate position confidence
total_weight = sum(char_scores.values())
position_confidence = (best_char[1] / total_weight) * 100
total_confidence += position_confidence
# Show alternatives
alternatives = sorted(char_scores.items(), key=lambda x: x[1], reverse=True)[1:3]
alt_str = ", ".join([f"{char}({score:.1f})" for char, score in alternatives])
position_details.append({
'position': pos,
'character': best_char[0],
'confidence': position_confidence,
'alternatives': alternatives
})
print(f" Position {pos}: '{best_char[0]}' ({position_confidence:.1f}%) - Alt: {alt_str}")
if most_likely:
avg_confidence = total_confidence / length
maryland_score = self.score_maryland_likelihood(most_likely)
frequency_results[length] = {
'most_likely_plate': most_likely,
'average_confidence': avg_confidence,
'maryland_score': maryland_score,
'total_score': avg_confidence + maryland_score,
'candidate_count': len(candidates),
'position_details': position_details
}
print(f" → Most likely: '{most_likely}' (avg conf: {avg_confidence:.1f}%, MD score: {maryland_score})")
return frequency_results
def score_maryland_likelihood(self, text):
"""Score how likely a candidate is to be a Maryland license plate."""
if not text or len(text) < 2:
return 0
score = 0
# Length scoring
if len(text) == 7:
score += 40
elif len(text) == 6:
score += 35
elif len(text) == 5:
score += 20
else:
score += 10
# Character composition
has_letter = any(c.isalpha() for c in text)
has_number = any(c.isdigit() for c in text)
if has_letter and has_number:
score += 30
elif has_letter or has_number:
score += 15
# Maryland patterns
if len(text) == 7:
if text[:3].isalpha() and text[3:].isdigit():
score += 50
elif text[0].isdigit() and text[1:4].isalpha() and text[4:].isdigit():
score += 40
elif len(text) == 6:
if text[:3].isalpha() and text[3:].isdigit():
score += 40
elif text[:2].isalpha() and text[2:].isdigit():
score += 30
return max(0, score)
def generate_comprehensive_report(self, ranked_results, per_image_breakdown, frequency_results, multi_image_results, super_res_results):
"""Generate the final comprehensive investigation report."""
print("\n" + "="*60)
print("PHASE 5: COMPREHENSIVE REPORT GENERATION")
print("="*60)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Calculate statistics
total_candidates = len(ranked_results)
high_confidence = len([c for c in ranked_results if c['combined_score'] >= 80])
cross_validated = len([c for c in ranked_results if c['cross_validation_bonus'] > 0])
# Create comprehensive report
comprehensive_report = {
'analysis_type': 'integrated_comprehensive',
'timestamp': timestamp,
'project_id': self.project_dir.name,
'summary': {
'total_unique_candidates': total_candidates,
'high_confidence_candidates': high_confidence,
'cross_validated_candidates': cross_validated,
'images_processed': len(per_image_breakdown),
'frequency_analysis_lengths': list(frequency_results.keys())
},
'ranked_candidates': ranked_results[:30],
'per_image_breakdown': per_image_breakdown,
'frequency_analysis': frequency_results,
'methodology': {
'multi_image_analysis': multi_image_results is not None,
'super_resolution_analysis': super_res_results is not None,
'cross_validation': True,
'character_frequency_analysis': True
}
}
# Save JSON report
json_file = self.output_dir / 'comprehensive_analysis.json'
with open(json_file, 'w') as f:
json.dump(comprehensive_report, f, indent=2)
# Generate human-readable summary
summary_file = self.output_dir / 'analysis_results.txt'
with open(summary_file, 'w') as f:
f.write("=" * 70 + "\n")
f.write("LICENSE PLATE ANALYSIS RESULTS\n")
f.write("=" * 70 + "\n")
f.write(f"Generated: {timestamp}\n")
f.write(f"Project: {self.project_dir.name}\n\n")
f.write("SUMMARY:\n")
f.write(f" Total unique candidates identified: {total_candidates}\n")
f.write(f" High confidence candidates (80+): {high_confidence}\n")
f.write(f" Cross-validated candidates: {cross_validated}\n")
f.write(f" Images processed: {len(per_image_breakdown)}\n\n")
f.write("ANALYSIS METHODS:\n")
f.write(" Multi-image cross-validation\n")
f.write(" Super resolution enhancement\n")
f.write(" Character frequency analysis\n")
f.write(" Maryland license plate pattern matching\n\n")
# Per-image breakdown
f.write("PER-IMAGE CANDIDATE BREAKDOWN:\n")
f.write("-" * 40 + "\n")
for image_name, data in per_image_breakdown.items():
f.write(f"{image_name}: {data['candidate_count']} candidates\n")
for i, candidate in enumerate(data['top_candidates'][:5], 1):
f.write(f" {i}. {candidate}\n")
f.write("\n")
# Frequency analysis results
if frequency_results:
f.write("CHARACTER FREQUENCY ANALYSIS:\n")
f.write("-" * 40 + "\n")
for length, result in sorted(frequency_results.items()):
f.write(f"Length {length}: '{result['most_likely_plate']}'\n")
f.write(f" Confidence: {result['average_confidence']:.1f}%\n")
f.write(f" Maryland Score: {result['maryland_score']}\n")
f.write(f" Based on {result['candidate_count']} candidates\n\n")
# Top candidates
f.write("RANKED CANDIDATES:\n")
f.write("-" * 50 + "\n")
f.write("Rank Candidate Combined Multi Super Cross Sources\n")
f.write("---- --------- -------- ----- ----- ----- -------\n")
for i, candidate in enumerate(ranked_results[:20], 1):
sources_str = str(candidate['unique_sources'])
cross_mark = "" if candidate['cross_validation_bonus'] > 0 else " "
f.write(f"{i:3d}. {candidate['text']:9s} {candidate['combined_score']:6.1f} {candidate['multi_image_score']:4.1f} {candidate['super_res_score']:4.1f} {cross_mark:1s} {sources_str:4s}\n")
f.write(f"\nSCORE LEGEND:\n")
f.write(" Combined: Final ranking score\n")
f.write(" Multi: Multi-image analysis score\n")
f.write(" Super: Super resolution analysis score\n")
f.write(" Cross: ✓ = Cross-validated between methods\n")
f.write(" Sources: Number of unique source images\n")
print(f"✓ Analysis report generated:")
print(f" Detailed JSON: {json_file}")
print(f" Results summary: {summary_file}")
return comprehensive_report
def analyze_project(self):
"""Run the complete integrated analysis workflow."""
print("=" * 70)
print("LICENSE PLATE ANALYSIS")
print("=" * 70)
print(f"Project: {self.project_dir.name}")
# Phase 0: Annotation analysis
self.run_annotation_analysis()
# Phase 1: Multi-image analysis
multi_image_results = self.run_multi_image_analysis()
# Phase 2: Super resolution analysis
super_res_results = self.run_super_resolution_analysis()
# Check if we have any results
if not multi_image_results and not super_res_results:
print("\n✗ No analysis results generated. Check for errors above.")
return None
# Phase 3: Combine and rank results
ranked_results = self.combine_and_rank_results(multi_image_results, super_res_results)
# Phase 4: Generate per-image breakdown
per_image_breakdown = self.generate_per_image_breakdown(multi_image_results)
# Phase 5: Generate frequency analysis
frequency_results = self.generate_frequency_analysis(ranked_results)
# Phase 6: Generate comprehensive report
comprehensive_report = self.generate_comprehensive_report(
ranked_results, per_image_breakdown, frequency_results,
multi_image_results, super_res_results
)
# Display final summary
print("\n" + "=" * 70)
print("ANALYSIS COMPLETE")
print("=" * 70)
if ranked_results:
print(f"Top 10 candidates:")
for i, candidate in enumerate(ranked_results[:10], 1):
cross_mark = " (CROSS-VALIDATED)" if candidate['cross_validation_bonus'] > 0 else ""
print(f" {i:2d}. {candidate['text']:8s} (Score: {candidate['combined_score']:5.1f}){cross_mark}")
if frequency_results:
print(f"\nFrequency analysis results:")
for length, result in sorted(frequency_results.items(), key=lambda x: x[1]['total_score'], reverse=True):
print(f" Length {length}: '{result['most_likely_plate']}' (score: {result['total_score']:.1f})")
print(f"\nResults saved to: {self.output_dir}")
return comprehensive_report
def main():
parser = argparse.ArgumentParser(description='Integrated License Plate Analysis')
parser.add_argument('--project-id', type=int, required=True, help='Project ID')
args = parser.parse_args()
project_dir = Path(f"projects/{args.project_id:03d}")
if not project_dir.exists():
print(f"Project {args.project_id:03d} does not exist.")
return
analyzer = IntegratedAnalyzer(project_dir)
analyzer.analyze_project()
if __name__ == '__main__':
main()