Files
LPR-OCR/multi_image_analysis.py
AtHeartEngineer 85ac8e08d2 init for sharing
2025-07-30 13:44:13 -04:00

652 lines
26 KiB
Python

#!/usr/bin/env python3
"""
Multi-Image License Plate Analysis
Analyzes multiple images of the same vehicle to cross-validate license plate candidates.
"""
import cv2
import numpy as np
import pytesseract
from PIL import Image
import os
import json
import re
import argparse
import time
from pathlib import Path
from collections import defaultdict, Counter
from datetime import datetime
class MultiImageAnalyzer:
def __init__(self, project_dir):
self.project_dir = Path(project_dir)
self.raw_dir = self.project_dir / 'raw'
self.debug_dir = self.project_dir / 'debug'
self.output_dir = self.project_dir / 'output'
# Ensure directories exist
self.debug_dir.mkdir(exist_ok=True)
self.output_dir.mkdir(exist_ok=True)
# Load project parameters if available
self.params = self.load_detection_parameters()
# Add timeout and optimization settings
self.max_ocr_time_per_region = 2.0 # seconds
self.max_candidates_per_image = 50
self.max_total_ocr_operations = 1000
def load_detection_parameters(self):
"""Load detection parameters or use aggressive defaults."""
params_file = self.project_dir / 'debug' / 'detection_parameters.json'
if params_file.exists():
with open(params_file, 'r') as f:
params = json.load(f)
# Check if parameters are too restrictive for edge detection
aspect_range = params['max_aspect_ratio'] - params['min_aspect_ratio']
if aspect_range < 1.0: # Very narrow aspect ratio range
print(f"⚠ Annotation parameters too restrictive (aspect range: {aspect_range:.2f})")
print(f" Using more lenient parameters for edge detection")
# Use more lenient parameters for edge detection
lenient_params = {
'min_width': max(50, params['min_width'] // 2),
'max_width': min(1500, params['max_width'] * 2),
'min_height': max(25, params['min_height'] // 2),
'max_height': min(800, params['max_height'] * 2),
'min_aspect_ratio': max(1.0, params['min_aspect_ratio'] - 1.0),
'max_aspect_ratio': min(8.0, params['max_aspect_ratio'] + 2.0),
'min_area': max(1000, params['min_area'] // 4),
'max_area': min(1000000, params['max_area'] * 3)
}
return lenient_params
else:
print(f"✓ Using project-specific detection parameters")
return params
else:
# Use aggressive default parameters for maximum detection
params = {
'min_width': 30,
'max_width': 1200,
'min_height': 15,
'max_height': 600,
'min_aspect_ratio': 0.8,
'max_aspect_ratio': 12.0,
'min_area': 450,
'max_area': 720000
}
print("⚠ Using aggressive default parameters")
return params
def score_maryland_likelihood(self, text):
"""Score how likely a candidate is to be a Maryland license plate."""
if not text or len(text) < 2:
return 0
score = 0
# Length scoring - Maryland plates are typically 6-7 characters
if len(text) == 7:
score += 40
elif len(text) == 6:
score += 35
elif len(text) == 5:
score += 20
elif len(text) == 4:
score += 15
elif len(text) == 8:
score += 10
else:
score += 5
# Character composition
has_letter = any(c.isalpha() for c in text)
has_number = any(c.isdigit() for c in text)
if has_letter and has_number:
score += 30
elif has_letter:
score += 15
elif has_number:
score += 10
# Common Maryland patterns
if len(text) >= 6:
# Check for common patterns like 1ABC234 or ABC123
if (text[0].isdigit() and text[1:4].isalpha() and text[4:].isdigit()):
score += 25
elif (text[:3].isalpha() and text[3:].isdigit()):
score += 20
# Penalize unlikely characters
unlikely_chars = ['Q', 'Z', 'X', 'V', 'J']
for char in unlikely_chars:
if char in text:
score -= 5
return max(0, score)
def process_single_image_aggressive(self, image_path):
"""Process a single image with aggressive detection to find all possible candidates."""
print(f"\nProcessing: {Path(image_path).name}")
image = cv2.imread(str(image_path))
if image is None:
print(f"Could not load image: {image_path}")
return []
print(f" Loading image...")
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
print(f" Image loaded: {gray.shape[1]}x{gray.shape[0]}")
# Reduced preprocessing methods for efficiency
print(f" Applying preprocessing methods...")
preprocessed = []
# Original
print(f" - Original")
preprocessed.append(('original', gray))
# Enhanced contrast (most effective)
print(f" - Enhanced contrast (CLAHE)")
clahe = cv2.createCLAHE(clipLimit=4.0, tileGridSize=(8,8))
enhanced = clahe.apply(gray)
preprocessed.append(('enhanced', enhanced))
# Histogram equalization
print(f" - Histogram equalization")
hist_eq = cv2.equalizeHist(gray)
preprocessed.append(('hist_eq', hist_eq))
print(f" ✓ Preprocessing complete ({len(preprocessed)} methods)")
all_candidates = []
total_combinations = len(preprocessed) * 2 # Reduced to 2 edge detection methods
current_combination = 0
total_ocr_operations = 0
print(f" Processing {total_combinations} preprocessing + edge detection combinations...")
for preprocess_name, processed_img in preprocessed:
print(f" Processing {preprocess_name}...")
# Reduced edge detection methods
edge_methods = []
# Canny with mid threshold (most effective)
print(f" - Canny (mid threshold)")
edge_methods.append(('canny_mid', cv2.Canny(processed_img, 50, 150)))
current_combination += 1
# Sobel (alternative method)
print(f" - Sobel edge detection")
sobelx = cv2.Sobel(processed_img, cv2.CV_64F, 1, 0, ksize=3)
sobely = cv2.Sobel(processed_img, cv2.CV_64F, 0, 1, ksize=3)
sobel = np.sqrt(sobelx**2 + sobely**2)
sobel_norm = np.uint8(sobel * 255 / np.max(sobel))
edge_methods.append(('sobel', sobel_norm))
current_combination += 1
print(f" ✓ Edge detection complete for {preprocess_name}")
for edge_name, edge_img in edge_methods:
print(f" Finding potential plates with {edge_name}...")
candidates = self.find_potential_plates(edge_img, f"{preprocess_name}_{edge_name}")
print(f" Found {len(candidates)} potential plate regions")
# Limit candidates per method to prevent excessive processing
if len(candidates) > self.max_candidates_per_image // len(edge_methods):
candidates = candidates[:self.max_candidates_per_image // len(edge_methods)]
print(f" Limited to {len(candidates)} candidates for efficiency")
# Extract text from each candidate
for i, candidate in enumerate(candidates):
if total_ocr_operations >= self.max_total_ocr_operations:
print(f" ⚠ Reached maximum OCR operations limit ({self.max_total_ocr_operations})")
break
x, y, w, h = candidate['bbox']
# Extract region with padding
padding = max(5, min(w, h) // 10)
x1 = max(0, x - padding)
y1 = max(0, y - padding)
x2 = min(gray.shape[1], x + w + padding)
y2 = min(gray.shape[0], y + h + padding)
region = gray[y1:y2, x1:x2]
if region.size > 0:
print(f" Extracting text from region {i+1}/{len(candidates)} ({w}x{h})...")
start_time = time.time()
ocr_results = self.extract_text_from_region_optimized(region)
ocr_time = time.time() - start_time
total_ocr_operations += 1
print(f" OCR found {len(ocr_results)} text candidates (took {ocr_time:.2f}s)")
for text in ocr_results:
if len(text) >= 3: # Minimum length
all_candidates.append({
'text': text,
'confidence': self.score_maryland_likelihood(text),
'method': f"{preprocess_name}_{edge_name}",
'bbox': candidate['bbox'],
'source_image': Path(image_path).name
})
if total_ocr_operations >= self.max_total_ocr_operations:
break
if total_ocr_operations >= self.max_total_ocr_operations:
break
print(f" ✓ All combinations processed (total OCR operations: {total_ocr_operations})")
# Remove duplicates and sort by confidence
print(f" Removing duplicates and sorting...")
unique_candidates = {}
for candidate in all_candidates:
text = candidate['text']
if text not in unique_candidates or candidate['confidence'] > unique_candidates[text]['confidence']:
unique_candidates[text] = candidate
sorted_candidates = sorted(unique_candidates.values(), key=lambda x: x['confidence'], reverse=True)
print(f" Found {len(sorted_candidates)} unique candidates")
for i, candidate in enumerate(sorted_candidates[:10], 1):
print(f" {i:2d}. {candidate['text']:8s} (conf: {candidate['confidence']:3.0f})")
return sorted_candidates
def find_potential_plates(self, edge_image, method_name):
"""Find potential license plate regions."""
contours, _ = cv2.findContours(edge_image, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
candidates = []
for contour in contours:
x, y, w, h = cv2.boundingRect(contour)
aspect_ratio = w / float(h)
area = cv2.contourArea(contour)
# Apply loose filtering
if (self.params['min_width'] <= w <= self.params['max_width'] and
self.params['min_height'] <= h <= self.params['max_height'] and
self.params['min_aspect_ratio'] <= aspect_ratio <= self.params['max_aspect_ratio'] and
self.params['min_area'] <= area <= self.params['max_area']):
candidates.append({
'bbox': (x, y, w, h),
'aspect_ratio': aspect_ratio,
'area': area,
'method': method_name
})
return candidates
def extract_text_from_region_optimized(self, region):
"""Extract text from a region using optimized OCR configurations."""
results = []
start_time = time.time()
# Reduced preprocessing for efficiency
preprocessed_regions = []
# Original
preprocessed_regions.append(region)
# Otsu thresholding (most effective)
_, otsu = cv2.threshold(region, 0, 255, cv2.THRESH_BINARY | cv2.THRESH_OTSU)
preprocessed_regions.append(otsu)
# Enhanced contrast for small regions
if region.shape[1] < 100 or region.shape[0] < 25:
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
enhanced = clahe.apply(region)
preprocessed_regions.append(enhanced)
# Reduced OCR configurations
ocr_configs = [
'--psm 8 -c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789',
'--psm 7 -c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789',
'--psm 13' # Raw line
]
for processed_region in preprocessed_regions:
# Check timeout
if time.time() - start_time > self.max_ocr_time_per_region:
print(f" ⚠ OCR timeout for region")
break
for config in ocr_configs:
try:
pil_img = Image.fromarray(processed_region)
text = pytesseract.image_to_string(pil_img, config=config).strip()
# Clean text
clean_text = re.sub(r'[^A-Z0-9]', '', text.upper())
if len(clean_text) >= 3:
results.append(clean_text)
except Exception as e:
continue
return list(set(results)) # Remove duplicates
def cross_validate_candidates(self, all_image_results):
"""Cross-validate license plate candidates across multiple images."""
print(f" Collecting candidates from all images...")
# Collect all candidates
all_candidates = []
for image_results in all_image_results.values():
all_candidates.extend(image_results)
print(f" Total candidates collected: {len(all_candidates)}")
if not all_candidates:
print(" No candidates found across any images.")
return []
print(f" Analyzing candidate statistics...")
# Count occurrences and calculate scores
candidate_stats = defaultdict(lambda: {
'occurrences': 0,
'total_confidence': 0,
'sources': [],
'methods': [],
'best_confidence': 0
})
for candidate in all_candidates:
text = candidate['text']
stats = candidate_stats[text]
stats['occurrences'] += 1
stats['total_confidence'] += candidate['confidence']
stats['sources'].append(candidate['source_image'])
stats['methods'].append(candidate['method'])
stats['best_confidence'] = max(stats['best_confidence'], candidate['confidence'])
print(f" Unique candidates found: {len(candidate_stats)}")
print(f" Calculating final scores...")
# Calculate final scores
scored_candidates = []
for text, stats in candidate_stats.items():
# Base score is the best confidence from any image
base_score = stats['best_confidence']
# Multi-image bonus (appears in multiple images)
multi_image_bonus = (len(set(stats['sources'])) - 1) * 30
# Frequency bonus (appears multiple times)
frequency_bonus = (stats['occurrences'] - 1) * 10
# Consistency bonus (appears with different methods)
method_diversity = len(set(stats['methods']))
consistency_bonus = method_diversity * 5
final_score = base_score + multi_image_bonus + frequency_bonus + consistency_bonus
scored_candidates.append({
'text': text,
'final_score': final_score,
'base_score': base_score,
'multi_image_bonus': multi_image_bonus,
'frequency_bonus': frequency_bonus,
'consistency_bonus': consistency_bonus,
'occurrences': stats['occurrences'],
'image_count': len(set(stats['sources'])),
'sources': list(set(stats['sources'])),
'avg_confidence': stats['total_confidence'] / stats['occurrences']
})
# Sort by final score
scored_candidates.sort(key=lambda x: x['final_score'], reverse=True)
print(f" ✓ Cross-validation complete: {len(scored_candidates)} scored candidates")
return scored_candidates
def analyze_character_patterns(self, scored_candidates):
"""Analyze character patterns across candidates to find the most likely plate."""
print(f"\n=== CHARACTER PATTERN ANALYSIS ===")
# Group by length
length_groups = defaultdict(list)
for candidate in scored_candidates:
if candidate['final_score'] >= 30: # Only consider reasonable candidates
length_groups[len(candidate['text'])].append(candidate)
pattern_results = []
for length, candidates in length_groups.items():
if len(candidates) >= 2 and length >= 4: # Need multiple candidates of reasonable length
print(f"\nAnalyzing {len(candidates)} candidates of length {length}:")
# Count character frequency at each position
position_chars = defaultdict(lambda: defaultdict(float))
for candidate in candidates:
text = candidate['text']
weight = candidate['final_score'] / 100.0 # Convert score to weight
for pos, char in enumerate(text):
position_chars[pos][char] += weight
# Build most likely string
most_likely = ""
total_confidence = 0
for pos in range(length):
if pos in position_chars:
char_scores = position_chars[pos]
best_char = max(char_scores.items(), key=lambda x: x[1])
most_likely += best_char[0]
# Calculate position confidence
total_weight = sum(char_scores.values())
position_confidence = (best_char[1] / total_weight) * 100
total_confidence += position_confidence
print(f" Position {pos}: '{best_char[0]}' (confidence: {position_confidence:.1f}%)")
if most_likely:
avg_confidence = total_confidence / length
maryland_score = self.score_maryland_likelihood(most_likely)
pattern_results.append({
'text': most_likely,
'pattern_confidence': avg_confidence,
'maryland_score': maryland_score,
'total_score': avg_confidence + maryland_score,
'candidate_count': len(candidates),
'length': length
})
print(f" Most likely: '{most_likely}' (pattern conf: {avg_confidence:.1f}%, MD score: {maryland_score})")
return pattern_results
def generate_investigation_report(self, all_image_results, scored_candidates, pattern_results):
"""Generate a comprehensive investigation report."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Count statistics
total_images = len(all_image_results)
total_candidates = sum(len(results) for results in all_image_results.values())
unique_candidates = len(scored_candidates)
high_confidence = len([c for c in scored_candidates if c['final_score'] >= 80])
cross_image = len([c for c in scored_candidates if c['image_count'] > 1])
report = {
'investigation_summary': {
'timestamp': timestamp,
'total_images_processed': total_images,
'total_candidates_found': total_candidates,
'unique_candidates': unique_candidates,
'high_confidence_candidates': high_confidence,
'cross_image_candidates': cross_image
},
'top_candidates': scored_candidates[:20],
'pattern_analysis': pattern_results,
'image_breakdown': {}
}
# Add per-image breakdown
for image_name, results in all_image_results.items():
report['image_breakdown'][image_name] = {
'candidate_count': len(results),
'top_candidates': [r['text'] for r in results[:5]]
}
# Save detailed JSON report
json_file = self.output_dir / 'investigation_report.json'
with open(json_file, 'w') as f:
json.dump(report, f, indent=2)
# Save human-readable summary
summary_file = self.output_dir / 'investigation_summary.txt'
with open(summary_file, 'w') as f:
f.write("=== LICENSE PLATE RECOGNITION REPORT ===\n")
f.write(f"Generated: {timestamp}\n\n")
f.write("SUMMARY:\n")
f.write(f" Images processed: {total_images}\n")
f.write(f" Total candidates: {total_candidates}\n")
f.write(f" Unique candidates: {unique_candidates}\n")
f.write(f" High confidence: {high_confidence}\n")
f.write(f" Cross-image validation: {cross_image}\n\n")
f.write("TOP CANDIDATES FOR DMV SEARCH:\n")
f.write("Rank Candidate Score Images Occurrences Sources\n")
f.write("---- --------- ----- ------ ----------- -------\n")
for i, candidate in enumerate(scored_candidates[:20], 1):
sources = '+'.join([s.split('.')[0] for s in candidate['sources']])
f.write(f"{i:3d}. {candidate['text']:9s} {candidate['final_score']:3.0f} {candidate['image_count']:3d} {candidate['occurrences']:6d} {sources}\n")
if pattern_results:
f.write(f"\nPATTERN ANALYSIS RESULTS:\n")
for i, pattern in enumerate(sorted(pattern_results, key=lambda x: x['total_score'], reverse=True), 1):
f.write(f" {i}. '{pattern['text']}' (total score: {pattern['total_score']:.1f})\n")
f.write(f" Pattern confidence: {pattern['pattern_confidence']:.1f}%\n")
f.write(f" Maryland score: {pattern['maryland_score']}\n")
f.write(f" Based on {pattern['candidate_count']} candidates\n\n")
f.write("RECOMMENDATIONS:\n")
f.write("1. Start DMV search with candidates scoring 80+\n")
f.write("2. Consider pattern analysis results for additional validation\n")
f.write("3. Pay attention to candidates appearing in multiple images\n")
f.write("4. Check character substitutions (O/0, I/1, S/5, etc.)\n")
print(f"\n✓ Investigation report saved:")
print(f" Detailed: {json_file}")
print(f" Summary: {summary_file}")
return report
def analyze_project(self):
"""Analyze all images in the project for license plate candidates."""
# Get all images
image_files = list(self.raw_dir.glob('*'))
image_files = [f for f in image_files if f.suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp']]
if not image_files:
print(f"No image files found in {self.raw_dir}")
return
print(f"=== MULTI-IMAGE LICENSE PLATE ANALYSIS ===")
print(f"Processing {len(image_files)} images for cross-validation")
# Add overall timeout for the entire analysis
max_total_time = 300 # 5 minutes
start_time = time.time()
# Process each image
all_image_results = {}
for i, image_file in enumerate(image_files, 1):
# Check overall timeout
if time.time() - start_time > max_total_time:
print(f"⚠ Overall analysis timeout reached ({max_total_time}s)")
break
print(f"\n{'='*60}")
print(f"IMAGE {i}/{len(image_files)}: {image_file.name}")
print(f"{'='*60}")
# Add per-image timeout
image_start_time = time.time()
max_image_time = 60 # 1 minute per image
try:
results = self.process_single_image_aggressive(image_file)
all_image_results[image_file.name] = results
image_time = time.time() - image_start_time
print(f"✓ Image processed in {image_time:.1f}s")
except Exception as e:
print(f"✗ Error processing image {image_file.name}: {e}")
all_image_results[image_file.name] = []
continue
if not all_image_results:
print("No images were successfully processed.")
return
# Cross-validate candidates
print(f"\n{'='*60}")
print(f"CROSS-VALIDATION PHASE")
print(f"{'='*60}")
scored_candidates = self.cross_validate_candidates(all_image_results)
# Analyze character patterns
print(f"\n{'='*60}")
print(f"PATTERN ANALYSIS PHASE")
print(f"{'='*60}")
pattern_results = self.analyze_character_patterns(scored_candidates)
# Generate report
report = self.generate_investigation_report(all_image_results, scored_candidates, pattern_results)
# Display key results
print(f"\n=== KEY FINDINGS ===")
print(f"Top 10 candidates for Maryland DMV search:")
for i, candidate in enumerate(scored_candidates[:10], 1):
multi_img = " (MULTIPLE IMAGES)" if candidate['image_count'] > 1 else ""
print(f" {i:2d}. {candidate['text']:8s} (Score: {candidate['final_score']:3.0f}){multi_img}")
if pattern_results:
best_pattern = max(pattern_results, key=lambda x: x['total_score'])
print(f"\nBest pattern analysis result: '{best_pattern['text']}' (score: {best_pattern['total_score']:.1f})")
total_time = time.time() - start_time
print(f"\n✓ Analysis completed in {total_time:.1f}s")
return report
def main():
parser = argparse.ArgumentParser(description='Multi-Image License Plate Analysis')
parser.add_argument('--project-id', type=int, required=True, help='Project ID')
args = parser.parse_args()
project_dir = Path(f"projects/{args.project_id:03d}")
if not project_dir.exists():
print(f"Project {args.project_id:03d} does not exist.")
return
analyzer = MultiImageAnalyzer(project_dir)
analyzer.analyze_project()
if __name__ == '__main__':
main()