524 lines
22 KiB
Python
524 lines
22 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Integrated License Plate Analysis
|
|
Combines multi-image analysis and super resolution analysis into a comprehensive workflow.
|
|
Generates per-image candidates, combined frequency analysis, super resolution images, and final results.
|
|
"""
|
|
|
|
import cv2
|
|
import numpy as np
|
|
import pytesseract
|
|
from PIL import Image, ImageEnhance
|
|
import os
|
|
import json
|
|
import re
|
|
import argparse
|
|
from pathlib import Path
|
|
from collections import defaultdict, Counter
|
|
from datetime import datetime
|
|
import subprocess
|
|
import sys
|
|
|
|
class IntegratedAnalyzer:
|
|
def __init__(self, project_dir):
|
|
self.project_dir = Path(project_dir)
|
|
self.raw_dir = self.project_dir / 'raw'
|
|
self.annotations_dir = self.project_dir / 'annotations'
|
|
self.debug_dir = self.project_dir / 'debug'
|
|
self.output_dir = self.project_dir / 'output'
|
|
|
|
# Ensure directories exist
|
|
self.debug_dir.mkdir(exist_ok=True)
|
|
self.output_dir.mkdir(exist_ok=True)
|
|
|
|
print(f"✓ Integrated Analyzer initialized for project {self.project_dir.name}")
|
|
|
|
def run_annotation_analysis(self):
|
|
"""Run annotation analysis to generate detection parameters."""
|
|
print("\n" + "="*60)
|
|
print("PHASE 0: ANNOTATION ANALYSIS & PARAMETER GENERATION")
|
|
print("="*60)
|
|
|
|
try:
|
|
result = subprocess.run([
|
|
sys.executable, 'annotate_project.py',
|
|
'--project-id', str(int(self.project_dir.name)),
|
|
'--analyze'
|
|
], capture_output=True, text=True, check=True)
|
|
|
|
print("✓ Annotation analysis completed")
|
|
|
|
# Check if detection parameters were generated
|
|
params_file = self.project_dir / 'debug' / 'detection_parameters.json'
|
|
if params_file.exists():
|
|
with open(params_file, 'r') as f:
|
|
params = json.load(f)
|
|
print(f"✓ Detection parameters generated:")
|
|
print(f" Width range: {params['min_width']} - {params['max_width']}")
|
|
print(f" Height range: {params['min_height']} - {params['max_height']}")
|
|
print(f" Aspect ratio: {params['min_aspect_ratio']:.2f} - {params['max_aspect_ratio']:.2f}")
|
|
return params
|
|
else:
|
|
print("⚠ Detection parameters not found")
|
|
return None
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"✗ Annotation analysis failed: {e}")
|
|
if e.stderr:
|
|
print(f"Error: {e.stderr}")
|
|
return None
|
|
|
|
def run_multi_image_analysis(self):
|
|
"""Run multi-image analysis component."""
|
|
print("\n" + "="*60)
|
|
print("PHASE 1: MULTI-IMAGE ANALYSIS")
|
|
print("="*60)
|
|
|
|
try:
|
|
result = subprocess.run([
|
|
sys.executable, 'multi_image_analysis.py',
|
|
'--project-id', str(int(self.project_dir.name))
|
|
], capture_output=True, text=True, check=True)
|
|
|
|
print("✓ Multi-image analysis completed")
|
|
|
|
# Check if investigation report was generated
|
|
investigation_report = self.output_dir / 'investigation_report.json'
|
|
if investigation_report.exists():
|
|
with open(investigation_report, 'r') as f:
|
|
multi_image_results = json.load(f)
|
|
return multi_image_results
|
|
else:
|
|
print("⚠ Investigation report not found")
|
|
return None
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"✗ Multi-image analysis failed: {e}")
|
|
if e.stderr:
|
|
print(f"Error: {e.stderr}")
|
|
return None
|
|
|
|
def run_super_resolution_analysis(self):
|
|
"""Run super resolution analysis component."""
|
|
print("\n" + "="*60)
|
|
print("PHASE 2: SUPER RESOLUTION ANALYSIS")
|
|
print("="*60)
|
|
|
|
try:
|
|
result = subprocess.run([
|
|
sys.executable, 'super_resolution_analysis.py',
|
|
'--project-id', str(int(self.project_dir.name))
|
|
], capture_output=True, text=True, check=True)
|
|
|
|
print("✓ Super resolution analysis completed")
|
|
|
|
# Check if super resolution report was generated
|
|
super_res_report = self.output_dir / 'super_resolution_report.json'
|
|
if super_res_report.exists():
|
|
with open(super_res_report, 'r') as f:
|
|
super_res_results = json.load(f)
|
|
return super_res_results
|
|
else:
|
|
print("⚠ Super resolution report not found")
|
|
return None
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"✗ Super resolution analysis failed: {e}")
|
|
if e.stderr:
|
|
print(f"Error: {e.stderr}")
|
|
return None
|
|
|
|
def combine_and_rank_results(self, multi_image_results, super_res_results):
|
|
"""Combine results from both analyses and create final ranking."""
|
|
print("\n" + "="*60)
|
|
print("PHASE 3: RESULT INTEGRATION & RANKING")
|
|
print("="*60)
|
|
|
|
combined_candidates = {}
|
|
|
|
# Process multi-image results
|
|
if multi_image_results and 'top_candidates' in multi_image_results:
|
|
print(f"Processing {len(multi_image_results['top_candidates'])} multi-image candidates...")
|
|
|
|
for candidate in multi_image_results['top_candidates']:
|
|
text = candidate['text']
|
|
if text not in combined_candidates:
|
|
combined_candidates[text] = {
|
|
'text': text,
|
|
'multi_image_score': candidate['final_score'],
|
|
'super_res_score': 0,
|
|
'combined_score': 0,
|
|
'multi_image_data': candidate,
|
|
'super_res_data': None,
|
|
'sources': []
|
|
}
|
|
|
|
# Add source information
|
|
if 'sources' in candidate:
|
|
combined_candidates[text]['sources'].extend(candidate['sources'])
|
|
|
|
# Process super resolution results
|
|
if super_res_results and 'top_candidates' in super_res_results:
|
|
print(f"Processing {len(super_res_results['top_candidates'])} super resolution candidates...")
|
|
|
|
for candidate in super_res_results['top_candidates']:
|
|
text = candidate['text']
|
|
if text not in combined_candidates:
|
|
combined_candidates[text] = {
|
|
'text': text,
|
|
'multi_image_score': 0,
|
|
'super_res_score': candidate['confidence'],
|
|
'combined_score': 0,
|
|
'multi_image_data': None,
|
|
'super_res_data': candidate,
|
|
'sources': []
|
|
}
|
|
else:
|
|
combined_candidates[text]['super_res_score'] = candidate['confidence']
|
|
combined_candidates[text]['super_res_data'] = candidate
|
|
|
|
# Add source information
|
|
if 'source_image' in candidate:
|
|
combined_candidates[text]['sources'].append(candidate['source_image'])
|
|
|
|
# Calculate combined scores
|
|
for text, data in combined_candidates.items():
|
|
# Base score is the higher of the two methods
|
|
base_score = max(data['multi_image_score'], data['super_res_score'])
|
|
|
|
# Cross-validation bonus if both methods found it
|
|
cross_validation_bonus = 0
|
|
if data['multi_image_score'] > 0 and data['super_res_score'] > 0:
|
|
cross_validation_bonus = 25
|
|
print(f" Cross-validated: '{text}' (multi: {data['multi_image_score']:.1f}, super: {data['super_res_score']:.1f})")
|
|
|
|
# Multi-source bonus
|
|
unique_sources = len(set(data['sources']))
|
|
multi_source_bonus = (unique_sources - 1) * 10
|
|
|
|
data['combined_score'] = base_score + cross_validation_bonus + multi_source_bonus
|
|
data['cross_validation_bonus'] = cross_validation_bonus
|
|
data['multi_source_bonus'] = multi_source_bonus
|
|
data['unique_sources'] = unique_sources
|
|
|
|
# Sort by combined score
|
|
ranked_results = sorted(combined_candidates.values(), key=lambda x: x['combined_score'], reverse=True)
|
|
|
|
print(f"\n✓ Combined {len(combined_candidates)} unique candidates")
|
|
cross_validated = len([c for c in ranked_results if c['cross_validation_bonus'] > 0])
|
|
print(f"✓ {cross_validated} candidates cross-validated between methods")
|
|
|
|
return ranked_results
|
|
|
|
def generate_per_image_breakdown(self, multi_image_results):
|
|
"""Generate per-image candidate breakdown."""
|
|
per_image_breakdown = {}
|
|
|
|
if multi_image_results and 'image_breakdown' in multi_image_results:
|
|
for image_name, data in multi_image_results['image_breakdown'].items():
|
|
per_image_breakdown[image_name] = {
|
|
'candidate_count': data['candidate_count'],
|
|
'top_candidates': data['top_candidates']
|
|
}
|
|
|
|
return per_image_breakdown
|
|
|
|
def generate_frequency_analysis(self, ranked_results):
|
|
"""Generate character frequency analysis for final ranking."""
|
|
print("\n" + "="*60)
|
|
print("PHASE 4: CHARACTER FREQUENCY ANALYSIS")
|
|
print("="*60)
|
|
|
|
# Group high-confidence candidates by length
|
|
high_confidence = [c for c in ranked_results if c['combined_score'] >= 60]
|
|
|
|
if not high_confidence:
|
|
print("No high-confidence candidates for frequency analysis")
|
|
return {}
|
|
|
|
length_groups = defaultdict(list)
|
|
for candidate in high_confidence:
|
|
text = candidate['text']
|
|
if len(text) >= 4: # Only analyze reasonable length plates
|
|
length_groups[len(text)].append(candidate)
|
|
|
|
frequency_results = {}
|
|
|
|
for length, candidates in length_groups.items():
|
|
if len(candidates) >= 2: # Need at least 2 candidates
|
|
print(f"\nAnalyzing {len(candidates)} candidates of length {length}:")
|
|
|
|
# Count character frequency at each position, weighted by combined score
|
|
position_chars = defaultdict(lambda: defaultdict(float))
|
|
|
|
for candidate in candidates:
|
|
text = candidate['text']
|
|
weight = candidate['combined_score'] / 100.0
|
|
|
|
for pos, char in enumerate(text):
|
|
position_chars[pos][char] += weight
|
|
|
|
# Build most likely string
|
|
most_likely = ""
|
|
total_confidence = 0
|
|
position_details = []
|
|
|
|
for pos in range(length):
|
|
if pos in position_chars:
|
|
char_scores = position_chars[pos]
|
|
best_char = max(char_scores.items(), key=lambda x: x[1])
|
|
most_likely += best_char[0]
|
|
|
|
# Calculate position confidence
|
|
total_weight = sum(char_scores.values())
|
|
position_confidence = (best_char[1] / total_weight) * 100
|
|
total_confidence += position_confidence
|
|
|
|
# Show alternatives
|
|
alternatives = sorted(char_scores.items(), key=lambda x: x[1], reverse=True)[1:3]
|
|
alt_str = ", ".join([f"{char}({score:.1f})" for char, score in alternatives])
|
|
|
|
position_details.append({
|
|
'position': pos,
|
|
'character': best_char[0],
|
|
'confidence': position_confidence,
|
|
'alternatives': alternatives
|
|
})
|
|
|
|
print(f" Position {pos}: '{best_char[0]}' ({position_confidence:.1f}%) - Alt: {alt_str}")
|
|
|
|
if most_likely:
|
|
avg_confidence = total_confidence / length
|
|
maryland_score = self.score_maryland_likelihood(most_likely)
|
|
|
|
frequency_results[length] = {
|
|
'most_likely_plate': most_likely,
|
|
'average_confidence': avg_confidence,
|
|
'maryland_score': maryland_score,
|
|
'total_score': avg_confidence + maryland_score,
|
|
'candidate_count': len(candidates),
|
|
'position_details': position_details
|
|
}
|
|
|
|
print(f" → Most likely: '{most_likely}' (avg conf: {avg_confidence:.1f}%, MD score: {maryland_score})")
|
|
|
|
return frequency_results
|
|
|
|
def score_maryland_likelihood(self, text):
|
|
"""Score how likely a candidate is to be a Maryland license plate."""
|
|
if not text or len(text) < 2:
|
|
return 0
|
|
|
|
score = 0
|
|
|
|
# Length scoring
|
|
if len(text) == 7:
|
|
score += 40
|
|
elif len(text) == 6:
|
|
score += 35
|
|
elif len(text) == 5:
|
|
score += 20
|
|
else:
|
|
score += 10
|
|
|
|
# Character composition
|
|
has_letter = any(c.isalpha() for c in text)
|
|
has_number = any(c.isdigit() for c in text)
|
|
|
|
if has_letter and has_number:
|
|
score += 30
|
|
elif has_letter or has_number:
|
|
score += 15
|
|
|
|
# Maryland patterns
|
|
if len(text) == 7:
|
|
if text[:3].isalpha() and text[3:].isdigit():
|
|
score += 50
|
|
elif text[0].isdigit() and text[1:4].isalpha() and text[4:].isdigit():
|
|
score += 40
|
|
elif len(text) == 6:
|
|
if text[:3].isalpha() and text[3:].isdigit():
|
|
score += 40
|
|
elif text[:2].isalpha() and text[2:].isdigit():
|
|
score += 30
|
|
|
|
return max(0, score)
|
|
|
|
def generate_comprehensive_report(self, ranked_results, per_image_breakdown, frequency_results, multi_image_results, super_res_results):
|
|
"""Generate the final comprehensive investigation report."""
|
|
print("\n" + "="*60)
|
|
print("PHASE 5: COMPREHENSIVE REPORT GENERATION")
|
|
print("="*60)
|
|
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
# Calculate statistics
|
|
total_candidates = len(ranked_results)
|
|
high_confidence = len([c for c in ranked_results if c['combined_score'] >= 80])
|
|
cross_validated = len([c for c in ranked_results if c['cross_validation_bonus'] > 0])
|
|
|
|
# Create comprehensive report
|
|
comprehensive_report = {
|
|
'analysis_type': 'integrated_comprehensive',
|
|
'timestamp': timestamp,
|
|
'project_id': self.project_dir.name,
|
|
'summary': {
|
|
'total_unique_candidates': total_candidates,
|
|
'high_confidence_candidates': high_confidence,
|
|
'cross_validated_candidates': cross_validated,
|
|
'images_processed': len(per_image_breakdown),
|
|
'frequency_analysis_lengths': list(frequency_results.keys())
|
|
},
|
|
'ranked_candidates': ranked_results[:30],
|
|
'per_image_breakdown': per_image_breakdown,
|
|
'frequency_analysis': frequency_results,
|
|
'methodology': {
|
|
'multi_image_analysis': multi_image_results is not None,
|
|
'super_resolution_analysis': super_res_results is not None,
|
|
'cross_validation': True,
|
|
'character_frequency_analysis': True
|
|
}
|
|
}
|
|
|
|
# Save JSON report
|
|
json_file = self.output_dir / 'comprehensive_analysis.json'
|
|
with open(json_file, 'w') as f:
|
|
json.dump(comprehensive_report, f, indent=2)
|
|
|
|
# Generate human-readable summary
|
|
summary_file = self.output_dir / 'analysis_results.txt'
|
|
with open(summary_file, 'w') as f:
|
|
f.write("=" * 70 + "\n")
|
|
f.write("LICENSE PLATE ANALYSIS RESULTS\n")
|
|
f.write("=" * 70 + "\n")
|
|
f.write(f"Generated: {timestamp}\n")
|
|
f.write(f"Project: {self.project_dir.name}\n\n")
|
|
|
|
f.write("SUMMARY:\n")
|
|
f.write(f" Total unique candidates identified: {total_candidates}\n")
|
|
f.write(f" High confidence candidates (80+): {high_confidence}\n")
|
|
f.write(f" Cross-validated candidates: {cross_validated}\n")
|
|
f.write(f" Images processed: {len(per_image_breakdown)}\n\n")
|
|
|
|
f.write("ANALYSIS METHODS:\n")
|
|
f.write(" Multi-image cross-validation\n")
|
|
f.write(" Super resolution enhancement\n")
|
|
f.write(" Character frequency analysis\n")
|
|
f.write(" Maryland license plate pattern matching\n\n")
|
|
|
|
# Per-image breakdown
|
|
f.write("PER-IMAGE CANDIDATE BREAKDOWN:\n")
|
|
f.write("-" * 40 + "\n")
|
|
for image_name, data in per_image_breakdown.items():
|
|
f.write(f"{image_name}: {data['candidate_count']} candidates\n")
|
|
for i, candidate in enumerate(data['top_candidates'][:5], 1):
|
|
f.write(f" {i}. {candidate}\n")
|
|
f.write("\n")
|
|
|
|
# Frequency analysis results
|
|
if frequency_results:
|
|
f.write("CHARACTER FREQUENCY ANALYSIS:\n")
|
|
f.write("-" * 40 + "\n")
|
|
for length, result in sorted(frequency_results.items()):
|
|
f.write(f"Length {length}: '{result['most_likely_plate']}'\n")
|
|
f.write(f" Confidence: {result['average_confidence']:.1f}%\n")
|
|
f.write(f" Maryland Score: {result['maryland_score']}\n")
|
|
f.write(f" Based on {result['candidate_count']} candidates\n\n")
|
|
|
|
# Top candidates
|
|
f.write("RANKED CANDIDATES:\n")
|
|
f.write("-" * 50 + "\n")
|
|
f.write("Rank Candidate Combined Multi Super Cross Sources\n")
|
|
f.write("---- --------- -------- ----- ----- ----- -------\n")
|
|
|
|
for i, candidate in enumerate(ranked_results[:20], 1):
|
|
sources_str = str(candidate['unique_sources'])
|
|
cross_mark = "✓" if candidate['cross_validation_bonus'] > 0 else " "
|
|
f.write(f"{i:3d}. {candidate['text']:9s} {candidate['combined_score']:6.1f} {candidate['multi_image_score']:4.1f} {candidate['super_res_score']:4.1f} {cross_mark:1s} {sources_str:4s}\n")
|
|
|
|
f.write(f"\nSCORE LEGEND:\n")
|
|
f.write(" Combined: Final ranking score\n")
|
|
f.write(" Multi: Multi-image analysis score\n")
|
|
f.write(" Super: Super resolution analysis score\n")
|
|
f.write(" Cross: ✓ = Cross-validated between methods\n")
|
|
f.write(" Sources: Number of unique source images\n")
|
|
|
|
print(f"✓ Analysis report generated:")
|
|
print(f" Detailed JSON: {json_file}")
|
|
print(f" Results summary: {summary_file}")
|
|
|
|
return comprehensive_report
|
|
|
|
def analyze_project(self):
|
|
"""Run the complete integrated analysis workflow."""
|
|
print("=" * 70)
|
|
print("LICENSE PLATE ANALYSIS")
|
|
print("=" * 70)
|
|
print(f"Project: {self.project_dir.name}")
|
|
|
|
# Phase 0: Annotation analysis
|
|
self.run_annotation_analysis()
|
|
|
|
# Phase 1: Multi-image analysis
|
|
multi_image_results = self.run_multi_image_analysis()
|
|
|
|
# Phase 2: Super resolution analysis
|
|
super_res_results = self.run_super_resolution_analysis()
|
|
|
|
# Check if we have any results
|
|
if not multi_image_results and not super_res_results:
|
|
print("\n✗ No analysis results generated. Check for errors above.")
|
|
return None
|
|
|
|
# Phase 3: Combine and rank results
|
|
ranked_results = self.combine_and_rank_results(multi_image_results, super_res_results)
|
|
|
|
# Phase 4: Generate per-image breakdown
|
|
per_image_breakdown = self.generate_per_image_breakdown(multi_image_results)
|
|
|
|
# Phase 5: Generate frequency analysis
|
|
frequency_results = self.generate_frequency_analysis(ranked_results)
|
|
|
|
# Phase 6: Generate comprehensive report
|
|
comprehensive_report = self.generate_comprehensive_report(
|
|
ranked_results, per_image_breakdown, frequency_results,
|
|
multi_image_results, super_res_results
|
|
)
|
|
|
|
# Display final summary
|
|
print("\n" + "=" * 70)
|
|
print("ANALYSIS COMPLETE")
|
|
print("=" * 70)
|
|
|
|
if ranked_results:
|
|
print(f"Top 10 candidates:")
|
|
for i, candidate in enumerate(ranked_results[:10], 1):
|
|
cross_mark = " (CROSS-VALIDATED)" if candidate['cross_validation_bonus'] > 0 else ""
|
|
print(f" {i:2d}. {candidate['text']:8s} (Score: {candidate['combined_score']:5.1f}){cross_mark}")
|
|
|
|
if frequency_results:
|
|
print(f"\nFrequency analysis results:")
|
|
for length, result in sorted(frequency_results.items(), key=lambda x: x[1]['total_score'], reverse=True):
|
|
print(f" Length {length}: '{result['most_likely_plate']}' (score: {result['total_score']:.1f})")
|
|
|
|
print(f"\nResults saved to: {self.output_dir}")
|
|
|
|
return comprehensive_report
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Integrated License Plate Analysis')
|
|
parser.add_argument('--project-id', type=int, required=True, help='Project ID')
|
|
|
|
args = parser.parse_args()
|
|
|
|
project_dir = Path(f"projects/{args.project_id:03d}")
|
|
|
|
if not project_dir.exists():
|
|
print(f"Project {args.project_id:03d} does not exist.")
|
|
return
|
|
|
|
analyzer = IntegratedAnalyzer(project_dir)
|
|
analyzer.analyze_project()
|
|
|
|
if __name__ == '__main__':
|
|
main() |