init for sharing

This commit is contained in:
AtHeartEngineer
2025-07-30 13:44:13 -04:00
commit 85ac8e08d2
53 changed files with 4399 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
projects/001/
projects/*/debug/
projects/*/output/

141
README.md Normal file
View File

@@ -0,0 +1,141 @@
# License Plate OCR Investigation System
A comprehensive license plate detection and OCR system for law enforcement investigations. Organized by projects, with multi-image cross-validation for improved accuracy.
## Project Structure
Each investigation follows this structure:
```
projects/
├── 001/ # Investigation 001
│ ├── raw/ # Source images of the vehicle
│ │ ├── image1.jpg # Multiple angles/lighting
│ │ ├── image2.jpg # of the same vehicle
│ │ └── image3.jpg
│ ├── annotations/ # Human annotations (JSON)
│ │ ├── image1.json # License plate locations
│ │ └── image2.json
│ ├── debug/ # Debug images and parameters
│ │ ├── detection_parameters.json
│ │ └── [debug images]
│ ├── output/ # Analysis results
│ │ ├── investigation_report.json
│ │ ├── investigation_summary.txt
│ │ └── top_candidates.txt
│ └── project.json # Project metadata
└── 002/ # Investigation 002
└── ...
```
## Quick Start
### 1. Create a New Investigation
```bash
python project_manager.py create "Maryland license plate investigation - Case #2024-001"
```
This automatically creates the next project ID and folder structure.
### 2. Add Your Images
Copy your multiple images of the same vehicle to the `projects/001/raw/` folder.
### 3. Annotate License Plates
```bash
python project_manager.py annotate 1
```
Draw bounding boxes around license plates in each image.
### 4. Run Comprehensive Analysis
```bash
python project_manager.py analyze 1
```
This runs the complete analysis pipeline:
- **Automatic annotation analysis** (generates detection parameters)
- Multi-image cross-validation
- Super resolution enhancement
- Character frequency analysis
- Combined ranking and reporting
## Tools
### Project Manager (`project_manager.py`)
- `create "description"`: Create new project (auto-increments ID)
- `annotate <id>`: Run annotation tool (skips already annotated images)
- `annotate <id> --reannotate`: Reannotate all images (shows existing annotations)
- `analyze <id>`: Run comprehensive analysis (includes automatic annotation analysis)
- `list`: List all projects
- `info <id>`: Show project details
### Annotation Tool (`annotate_project.py`)
- Interactive tool to mark license plate regions
- Generates detection parameters from annotations
- Use `--analyze` flag to process annotations
### Detection System (`detect_project.py`)
- Comprehensive detection using multiple methods
- Saves debug images and results
- Generates ranked candidate list
### Multi-Image Analysis (`multi_image_analysis.py`)
- Cross-validates candidates across multiple images
- Character pattern analysis for higher accuracy
- Specialized for multiple images of same vehicle
### Super Resolution Analysis (`super_resolution_analysis.py`)
- Extracts annotated license plate regions
- Normalizes dimensions and applies super resolution
- Advanced enhancement techniques (EDSR, frequency domain, etc.)
- Enhanced OCR on super-resolved regions
## Output Files
### `output/analysis_results.txt`
Technical results summary with ranked candidates and frequency analysis.
### `output/comprehensive_analysis.json`
Detailed JSON with all analysis results, including:
- Ranked candidates with scores
- Individual image results
- Cross-image analysis
### `output/super_resolution_report.json`
Super resolution analysis results (if annotations available):
- Enhanced candidates from normalized regions
- Method breakdown and confidence scores
- Super resolution technique comparison
### `debug/detection_parameters.json`
Optimized detection parameters (if annotations were provided).
## Example Investigation Workflow
```bash
# Create investigation (auto-increments to next available project ID)
python project_manager.py create "Vehicle pursuit - Case #2024-001"
# Copy your images to the raw folder
# cp front_view.jpg rear_view.jpg side_view.jpg projects/001/raw/
# Annotate license plates
python project_manager.py annotate 1
# Run comprehensive analysis
python project_manager.py analyze 1
# Check final results
cat projects/001/output/analysis_results.txt
```
## Requirements
- Python 3.7+
- OpenCV
- NumPy
- Pytesseract
- PIL (Pillow)
## Notes
- The system uses aggressive detection by default
- Annotations improve detection accuracy significantly
- Debug images are saved for troubleshooting
- Results are automatically ranked by Maryland license plate likelihood

340
annotate_project.py Normal file
View File

@@ -0,0 +1,340 @@
#!/usr/bin/env python3
"""
Interactive license plate annotation tool for projects.
Click and drag to select license plate regions for training the detection algorithm.
"""
import cv2
import numpy as np
import json
import os
import argparse
from pathlib import Path
class LicensePlateAnnotator:
def __init__(self, image_path, project_dir, load_existing=True):
self.image_path = image_path
self.project_dir = Path(project_dir)
self.image = cv2.imread(image_path)
self.original = self.image.copy()
self.annotations = []
self.current_rect = None
self.drawing = False
self.start_point = None
self.scale_factor = 1.0
# Scale image for display if too large
h, w = self.image.shape[:2]
if w > 1200 or h > 800:
self.scale_factor = min(1200/w, 800/h)
new_w = int(w * self.scale_factor)
new_h = int(h * self.scale_factor)
self.display_image = cv2.resize(self.image, (new_w, new_h))
else:
self.display_image = self.image.copy()
self.window_name = f"Annotate License Plates - {Path(image_path).name}"
# Load existing annotations if requested
if load_existing:
self.load_existing_annotations()
def load_existing_annotations(self):
"""Load existing annotations for this image."""
image_name = Path(self.image_path).stem
annotation_file = self.project_dir / 'annotations' / f"{image_name}.json"
if annotation_file.exists():
try:
with open(annotation_file, 'r') as f:
data = json.load(f)
self.annotations = data.get('annotations', [])
print(f"Loaded {len(self.annotations)} existing annotations")
except Exception as e:
print(f"Warning: Could not load existing annotations: {e}")
self.annotations = []
def mouse_callback(self, event, x, y, flags, param):
orig_x = int(x / self.scale_factor)
orig_y = int(y / self.scale_factor)
if event == cv2.EVENT_LBUTTONDOWN:
self.drawing = True
self.start_point = (orig_x, orig_y)
elif event == cv2.EVENT_MOUSEMOVE:
if self.drawing:
temp_display = self.display_image.copy()
display_start = (int(self.start_point[0] * self.scale_factor),
int(self.start_point[1] * self.scale_factor))
cv2.rectangle(temp_display, display_start, (x, y), (0, 255, 0), 2)
cv2.imshow(self.window_name, temp_display)
elif event == cv2.EVENT_LBUTTONUP:
if self.drawing:
self.drawing = False
end_point = (orig_x, orig_y)
x1, y1 = self.start_point
x2, y2 = end_point
if abs(x2 - x1) > 10 and abs(y2 - y1) > 5:
annotation = {
'top_left': (min(x1, x2), min(y1, y2)),
'bottom_right': (max(x1, x2), max(y1, y2)),
'width': abs(x2 - x1),
'height': abs(y2 - y1),
'aspect_ratio': abs(x2 - x1) / abs(y2 - y1)
}
self.annotations.append(annotation)
print(f"Added annotation: {annotation['width']}x{annotation['height']}, AR: {annotation['aspect_ratio']:.2f}")
self.update_display()
def update_display(self):
self.display_image = cv2.resize(self.original,
(int(self.original.shape[1] * self.scale_factor),
int(self.original.shape[0] * self.scale_factor)))
for i, annotation in enumerate(self.annotations):
tl = annotation['top_left']
br = annotation['bottom_right']
display_tl = (int(tl[0] * self.scale_factor), int(tl[1] * self.scale_factor))
display_br = (int(br[0] * self.scale_factor), int(br[1] * self.scale_factor))
cv2.rectangle(self.display_image, display_tl, display_br, (0, 255, 0), 2)
cv2.putText(self.display_image, f"LP{i+1}",
(display_tl[0], display_tl[1] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.imshow(self.window_name, self.display_image)
def run(self):
cv2.namedWindow(self.window_name, cv2.WINDOW_NORMAL)
cv2.setMouseCallback(self.window_name, self.mouse_callback)
print(f"\nAnnotating: {self.image_path}")
print("Instructions:")
print("- Click and drag to select license plate regions")
print("- Press 'u' to undo last annotation")
print("- Press 's' to save annotations")
print("- Press 'n' to go to next image")
print("- Press 'q' to quit program")
print("- Press 'r' to reset all annotations")
self.update_display()
while True:
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
cv2.destroyWindow(self.window_name)
return None # Signal to quit program
elif key == ord('n'):
cv2.destroyWindow(self.window_name)
return self.annotations # Signal to go to next image
elif key == ord('u') and self.annotations:
self.annotations.pop()
print(f"Undid last annotation. {len(self.annotations)} remaining.")
self.update_display()
elif key == ord('r'):
self.annotations = []
print("Reset all annotations.")
self.update_display()
elif key == ord('s'):
self.save_annotations()
cv2.destroyWindow(self.window_name)
return self.annotations
def save_annotations(self):
if not self.annotations:
print("No annotations to save.")
return
# Save to project annotations folder
image_name = Path(self.image_path).stem
output_file = self.project_dir / 'annotations' / f"{image_name}.json"
annotation_data = {
'image_path': str(self.image_path),
'image_shape': self.original.shape,
'annotations': self.annotations,
'statistics': self.get_statistics()
}
with open(output_file, 'w') as f:
json.dump(annotation_data, f, indent=2)
print(f"Saved {len(self.annotations)} annotations to {output_file}")
def get_statistics(self):
if not self.annotations:
return {}
widths = [a['width'] for a in self.annotations]
heights = [a['height'] for a in self.annotations]
aspect_ratios = [a['aspect_ratio'] for a in self.annotations]
return {
'count': len(self.annotations),
'avg_width': np.mean(widths),
'avg_height': np.mean(heights),
'avg_aspect_ratio': np.mean(aspect_ratios),
'width_range': [min(widths), max(widths)],
'height_range': [min(heights), max(heights)],
'aspect_ratio_range': [min(aspect_ratios), max(aspect_ratios)]
}
def analyze_project_annotations(project_dir):
"""Analyze all annotations in a project and generate detection parameters."""
project_dir = Path(project_dir)
annotations_dir = project_dir / 'annotations'
if not annotations_dir.exists():
print("No annotations directory found.")
return
annotation_files = list(annotations_dir.glob('*.json'))
if not annotation_files:
print("No annotation files found.")
return
all_annotations = []
all_stats = []
print(f"\nAnalyzing {len(annotation_files)} annotation files...")
for file in annotation_files:
with open(file, 'r') as f:
data = json.load(f)
all_annotations.extend(data['annotations'])
all_stats.append(data['statistics'])
print(f" {file.name}: {data['statistics']['count']} plates")
if not all_annotations:
print("No annotations found in files.")
return
# Calculate statistics
widths = [a['width'] for a in all_annotations]
heights = [a['height'] for a in all_annotations]
aspects = [a['aspect_ratio'] for a in all_annotations]
areas = [a['width'] * a['height'] for a in all_annotations]
print(f"\n=== DETECTION PARAMETERS ===")
print(f"Based on {len(all_annotations)} manually annotated license plates:")
print(f"")
print(f"Width range: {min(widths):3.0f} - {max(widths):3.0f} pixels (avg: {np.mean(widths):.0f})")
print(f"Height range: {min(heights):3.0f} - {max(heights):3.0f} pixels (avg: {np.mean(heights):.0f})")
print(f"Aspect ratio: {min(aspects):.2f} - {max(aspects):.2f} (avg: {np.mean(aspects):.2f})")
print(f"Area range: {min(areas):4.0f} - {max(areas):4.0f} pixels²")
print(f"")
print(f"GENERATED PARAMETERS:")
print(f" min_width = {int(np.percentile(widths, 10))}")
print(f" max_width = {int(np.percentile(widths, 90)) * 2}")
print(f" min_height = {int(np.percentile(heights, 10))}")
print(f" max_height = {int(np.percentile(heights, 90)) * 2}")
print(f" min_aspect_ratio = {np.percentile(aspects, 5):.2f}")
print(f" max_aspect_ratio = {np.percentile(aspects, 95):.2f}")
print(f" min_area = {int(np.percentile(areas, 10))}")
print(f" max_area = {int(np.percentile(areas, 90)) * 3}")
# Save parameters to project
params = {
'min_width': int(np.percentile(widths, 10)),
'max_width': int(np.percentile(widths, 90)) * 2,
'min_height': int(np.percentile(heights, 10)),
'max_height': int(np.percentile(heights, 90)) * 2,
'min_aspect_ratio': np.percentile(aspects, 5),
'max_aspect_ratio': np.percentile(aspects, 95),
'min_area': int(np.percentile(areas, 10)),
'max_area': int(np.percentile(areas, 90)) * 3
}
params_file = project_dir / 'debug' / 'detection_parameters.json'
with open(params_file, 'w') as f:
json.dump(params, f, indent=2)
print(f"\nParameters saved to {params_file}")
return params
def main():
parser = argparse.ArgumentParser(description='License Plate Annotation Tool')
parser.add_argument('--project-id', type=int, required=True, help='Project ID')
parser.add_argument('--image', help='Specific image to annotate')
parser.add_argument('--analyze', action='store_true', help='Analyze all annotations')
parser.add_argument('--reannotate', action='store_true', help='Reannotate images that already have annotations')
args = parser.parse_args()
project_dir = Path(f"projects/{args.project_id:03d}")
if not project_dir.exists():
print(f"Project {args.project_id:03d} does not exist. Create it first.")
return
if args.analyze:
analyze_project_annotations(project_dir)
return
raw_dir = project_dir / 'raw'
if not raw_dir.exists():
print(f"Raw directory not found: {raw_dir}")
return
# Get list of images
image_files = list(raw_dir.glob('*'))
image_files = [f for f in image_files if f.suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp']]
if not image_files:
print(f"No image files found in {raw_dir}")
return
if args.image:
# Annotate specific image
target_image = raw_dir / args.image
if not target_image.exists():
print(f"Image {args.image} not found in {raw_dir}")
return
image_files = [target_image]
# Filter images based on existing annotations
if not args.reannotate:
filtered_images = []
for image_file in image_files:
image_name = image_file.stem
annotation_file = project_dir / 'annotations' / f"{image_name}.json"
if not annotation_file.exists():
filtered_images.append(image_file)
else:
print(f"Skipping {image_file.name} (already annotated)")
image_files = filtered_images
if not image_files:
print("All images already annotated. Use --reannotate flag to reannotate.")
return
print(f"Found {len(image_files)} images to annotate in project {args.project_id:03d}")
for i, image_file in enumerate(image_files):
print(f"\n{'='*60}")
print(f"Annotating: {image_file.name} ({i+1}/{len(image_files)})")
print(f"{'='*60}")
# Load existing annotations if reannotating
load_existing = args.reannotate
annotator = LicensePlateAnnotator(str(image_file), project_dir, load_existing=load_existing)
annotations = annotator.run()
if annotations is None:
print("Annotation session cancelled by user.")
break
elif annotations:
annotator.save_annotations()
print(f"Completed annotation of {image_file.name}")
print(f"Found {len(annotations)} license plate regions")
else:
print(f"No annotations saved for {image_file.name}")
print(f"\nAnnotation session completed for project {args.project_id:03d}")
print("Run with --analyze flag to generate detection parameters.")
if __name__ == '__main__':
main()

813
detect_project.py Normal file
View File

@@ -0,0 +1,813 @@
#!/usr/bin/env python3
"""
License Plate Detection for Projects
Uses project-specific parameters and saves results to project output folder.
"""
import cv2
import numpy as np
import pytesseract
from PIL import Image
import os
import json
import re
import argparse
from pathlib import Path
from collections import defaultdict, Counter
class ProjectDetector:
def __init__(self, project_dir):
self.project_dir = Path(project_dir)
self.raw_dir = self.project_dir / 'raw'
self.debug_dir = self.project_dir / 'debug'
self.output_dir = self.project_dir / 'output'
# Ensure directories exist
self.debug_dir.mkdir(exist_ok=True)
self.output_dir.mkdir(exist_ok=True)
# Load project parameters if available
self.params = self.load_detection_parameters()
def load_detection_parameters(self):
"""Load detection parameters from project annotations or use defaults."""
params_file = self.project_dir / 'debug' / 'detection_parameters.json'
if params_file.exists():
with open(params_file, 'r') as f:
params = json.load(f)
print(f"✓ Loaded detection parameters from {params_file}")
return params
else:
# Use aggressive default parameters
params = {
'min_width': 30,
'max_width': 1200,
'min_height': 15,
'max_height': 600,
'min_aspect_ratio': 0.8,
'max_aspect_ratio': 12.0,
'min_area': 450,
'max_area': 720000
}
print("⚠ Using default aggressive parameters (no annotations found)")
return params
def score_license_plate_likelihood(self, bbox, area, solidity, extent):
"""Score how likely this region is to be a license plate."""
x, y, w, h = bbox
aspect_ratio = w / float(h)
score = 0
# Size scoring - prefer license plate-like sizes
if 50 <= w <= 600 and 20 <= h <= 200:
score += 30
elif 30 <= w <= 800 and 15 <= h <= 300:
score += 20
else:
score += 5
# Aspect ratio scoring
if 1.5 <= aspect_ratio <= 6.0:
score += 40
elif 1.0 <= aspect_ratio <= 8.0:
score += 25
else:
score += 10
# Area scoring
if 1000 <= area <= 120000:
score += 20
elif 500 <= area <= 200000:
score += 15
else:
score += 5
# Geometric quality
if solidity > 0.3:
score += 15
if extent > 0.3:
score += 15
# Bonus for rectangular shapes
if 0.7 <= extent <= 1.0 and solidity > 0.7:
score += 20
return score
def comprehensive_preprocessing(self, image):
"""Apply comprehensive preprocessing to maximize detection."""
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if len(image.shape) == 3 else image
methods = []
# 1. Original
methods.append(('original', gray))
# 2. Multiple contrast enhancements
clahe1 = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
enhanced1 = clahe1.apply(gray)
methods.append(('clahe_2', enhanced1))
clahe2 = cv2.createCLAHE(clipLimit=4.0, tileGridSize=(8,8))
enhanced2 = clahe2.apply(gray)
methods.append(('clahe_4', enhanced2))
# 3. Histogram equalization
hist_eq = cv2.equalizeHist(gray)
methods.append(('hist_eq', hist_eq))
# 4. Multiple bilateral filters
bilateral1 = cv2.bilateralFilter(gray, 9, 75, 75)
methods.append(('bilateral_9', bilateral1))
bilateral2 = cv2.bilateralFilter(gray, 15, 80, 80)
methods.append(('bilateral_15', bilateral2))
# 5. Gaussian blurs
gaussian1 = cv2.GaussianBlur(gray, (3, 3), 0)
methods.append(('gaussian_3', gaussian1))
gaussian2 = cv2.GaussianBlur(gray, (5, 5), 0)
methods.append(('gaussian_5', gaussian2))
# 6. Morphological operations
kernel3 = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
morph1 = cv2.morphologyEx(gray, cv2.MORPH_CLOSE, kernel3)
methods.append(('morph_close_3', morph1))
kernel5 = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
morph2 = cv2.morphologyEx(gray, cv2.MORPH_OPEN, kernel5)
methods.append(('morph_open_5', morph2))
# 7. Sharpening
sharpening_kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]])
sharpened = cv2.filter2D(gray, -1, sharpening_kernel)
methods.append(('sharpened', sharpened))
# 8. Unsharp masking
gaussian_blur = cv2.GaussianBlur(gray, (9, 9), 10.0)
unsharp = cv2.addWeighted(gray, 1.5, gaussian_blur, -0.5, 0)
methods.append(('unsharp', unsharp))
# 9. Top-hat filtering
tophat = cv2.morphologyEx(gray, cv2.MORPH_TOPHAT, kernel5)
methods.append(('tophat', tophat))
# 10. Bottom-hat filtering
blackhat = cv2.morphologyEx(gray, cv2.MORPH_BLACKHAT, kernel5)
methods.append(('blackhat', blackhat))
return methods
def comprehensive_edge_detection(self, image):
"""Apply comprehensive edge detection methods."""
methods = []
# Multiple Canny thresholds
canny_configs = [
(20, 60), (30, 90), (40, 120), (50, 150),
(60, 180), (80, 200), (100, 250), (30, 200)
]
for low, high in canny_configs:
canny = cv2.Canny(image, low, high)
methods.append((f'canny_{low}_{high}', canny))
# Sobel edges
sobelx = cv2.Sobel(image, cv2.CV_64F, 1, 0, ksize=3)
sobely = cv2.Sobel(image, cv2.CV_64F, 0, 1, ksize=3)
sobel = np.sqrt(sobelx**2 + sobely**2)
sobel_norm = np.uint8(sobel * 255 / np.max(sobel))
methods.append(('sobel_3', sobel_norm))
# Sobel with different kernel sizes
sobelx5 = cv2.Sobel(image, cv2.CV_64F, 1, 0, ksize=5)
sobely5 = cv2.Sobel(image, cv2.CV_64F, 0, 1, ksize=5)
sobel5 = np.sqrt(sobelx5**2 + sobely5**2)
sobel5_norm = np.uint8(sobel5 * 255 / np.max(sobel5))
methods.append(('sobel_5', sobel5_norm))
# Laplacian
laplacian = cv2.Laplacian(image, cv2.CV_64F)
laplacian_norm = np.uint8(np.absolute(laplacian))
methods.append(('laplacian', laplacian_norm))
# Laplacian of Gaussian
gaussian = cv2.GaussianBlur(image, (3, 3), 0)
log = cv2.Laplacian(gaussian, cv2.CV_64F)
log_norm = np.uint8(np.absolute(log))
methods.append(('log', log_norm))
# Scharr edges
scharrx = cv2.Scharr(image, cv2.CV_64F, 1, 0)
scharry = cv2.Scharr(image, cv2.CV_64F, 0, 1)
scharr = np.sqrt(scharrx**2 + scharry**2)
scharr_norm = np.uint8(scharr * 255 / np.max(scharr))
methods.append(('scharr', scharr_norm))
return methods
def find_all_potential_plates(self, edge_image, method_name=""):
"""Find ALL potential license plate regions with very loose filtering."""
contours, _ = cv2.findContours(edge_image, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
candidates = []
for contour in contours:
# Basic measurements
x, y, w, h = cv2.boundingRect(contour)
aspect_ratio = w / float(h)
area = cv2.contourArea(contour)
# Very loose filtering
if (self.params['min_width'] <= w <= self.params['max_width'] and
self.params['min_height'] <= h <= self.params['max_height'] and
self.params['min_aspect_ratio'] <= aspect_ratio <= self.params['max_aspect_ratio'] and
self.params['min_area'] <= area <= self.params['max_area']):
# Geometric quality
hull = cv2.convexHull(contour)
hull_area = cv2.contourArea(hull)
solidity = area / hull_area if hull_area > 0 else 0
extent = area / (w * h)
# Calculate likelihood score
likelihood_score = self.score_license_plate_likelihood((x, y, w, h), area, solidity, extent)
candidates.append({
'contour': contour,
'bbox': (x, y, w, h),
'aspect_ratio': aspect_ratio,
'area': area,
'solidity': solidity,
'extent': extent,
'likelihood_score': likelihood_score,
'method': method_name
})
return candidates
def save_comprehensive_visualization(self, image, candidates, method_name, top_n=20):
"""Save visualization with many candidates."""
if len(image.shape) == 2:
vis_img = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
else:
vis_img = image.copy()
# Sort by likelihood score
candidates_sorted = sorted(candidates, key=lambda x: x['likelihood_score'], reverse=True)
colors = [
(0, 255, 0), (255, 0, 0), (0, 0, 255), (255, 255, 0), (255, 0, 255),
(0, 255, 255), (128, 255, 0), (255, 128, 0), (128, 0, 255), (0, 128, 255),
(255, 255, 128), (255, 128, 255), (128, 255, 255), (192, 192, 0), (192, 0, 192),
(0, 192, 192), (64, 255, 64), (255, 64, 64), (64, 64, 255), (128, 128, 128)
]
for i, candidate in enumerate(candidates_sorted[:top_n]):
x, y, w, h = candidate['bbox']
color = colors[i % len(colors)]
# Draw rectangle
thickness = 3 if i < 5 else 2
cv2.rectangle(vis_img, (x, y), (x + w, y + h), color, thickness)
# Add label
label = f"#{i+1}:{candidate['likelihood_score']:.0f}"
font_scale = 0.7 if i < 5 else 0.5
cv2.putText(vis_img, label, (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX, font_scale, color, 2)
# Add size info for top candidates
if i < 10:
size_label = f"{w}x{h}"
cv2.putText(vis_img, size_label, (x, y + h + 15), cv2.FONT_HERSHEY_SIMPLEX, 0.4, color, 1)
output_path = self.debug_dir / f"comprehensive_{method_name}.jpg"
cv2.imwrite(str(output_path), vis_img)
return output_path
def extract_and_ocr_region(self, image, candidate, candidate_id):
"""Extract region and try comprehensive OCR."""
x, y, w, h = candidate['bbox']
# Add padding
padding = max(5, min(w, h) // 10)
x1 = max(0, x - padding)
y1 = max(0, y - padding)
x2 = min(image.shape[1], x + w + padding)
y2 = min(image.shape[0], y + h + padding)
region = image[y1:y2, x1:x2]
if region.size == 0:
return []
# Save original region
cv2.imwrite(str(self.debug_dir / f"region_{candidate_id:02d}_original.jpg"), region)
# Multiple preprocessing approaches
preprocessed = []
# 1. Original
preprocessed.append(('original', region))
# 2. Multiple thresholding methods
_, otsu = cv2.threshold(region, 0, 255, cv2.THRESH_BINARY | cv2.THRESH_OTSU)
preprocessed.append(('otsu', otsu))
_, inv_otsu = cv2.threshold(region, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU)
preprocessed.append(('inv_otsu', inv_otsu))
# 3. Adaptive thresholding with different parameters
if region.shape[0] > 10 and region.shape[1] > 10:
adaptive1 = cv2.adaptiveThreshold(region, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2)
preprocessed.append(('adaptive_11_2', adaptive1))
if min(region.shape) > 20:
adaptive2 = cv2.adaptiveThreshold(region, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, 15, 5)
preprocessed.append(('adaptive_15_5', adaptive2))
# 4. Morphological operations
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2))
if len(otsu.shape) == 2:
morph_close = cv2.morphologyEx(otsu, cv2.MORPH_CLOSE, kernel)
preprocessed.append(('morph_close', morph_close))
morph_open = cv2.morphologyEx(otsu, cv2.MORPH_OPEN, kernel)
preprocessed.append(('morph_open', morph_open))
# 5. Contrast enhancement
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
enhanced = clahe.apply(region)
preprocessed.append(('enhanced', enhanced))
# 6. Resize if too small
if w < 100 or h < 25:
scale_factor = max(100/w, 25/h, 2.0)
new_w, new_h = int(w * scale_factor), int(h * scale_factor)
resized = cv2.resize(region, (new_w, new_h), interpolation=cv2.INTER_CUBIC)
preprocessed.append(('resized', resized))
# Also resize thresholded versions
resized_otsu = cv2.resize(otsu, (new_w, new_h), interpolation=cv2.INTER_CUBIC)
preprocessed.append(('resized_otsu', resized_otsu))
# OCR configurations
ocr_configs = [
('psm6', '--psm 6 -c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'),
('psm7', '--psm 7 -c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'),
('psm8', '--psm 8 -c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'),
('psm10', '--psm 10 -c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'),
('psm13', '--psm 13'),
('default', ''),
('digits', '-c tessedit_char_whitelist=0123456789'),
('letters', '-c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZ'),
]
results = []
for preprocess_name, processed_img in preprocessed:
# Save preprocessed image
cv2.imwrite(str(self.debug_dir / f"region_{candidate_id:02d}_{preprocess_name}.jpg"), processed_img)
for config_name, config in ocr_configs:
try:
pil_img = Image.fromarray(processed_img)
text = pytesseract.image_to_string(pil_img, config=config).strip()
# Clean text
clean_text = re.sub(r'[^A-Z0-9]', '', text.upper())
if len(clean_text) >= 2: # Very loose requirement
confidence = self.calculate_comprehensive_confidence(clean_text, candidate)
results.append({
'text': clean_text,
'confidence': confidence,
'preprocessing': preprocess_name,
'config': config_name,
'raw_text': text,
'candidate_score': candidate['likelihood_score']
})
except Exception as e:
continue
return results
def calculate_comprehensive_confidence(self, text, candidate):
"""Calculate confidence for any potential license plate text."""
if not text or len(text) < 2:
return 0
score = 0
# Length scoring
if 6 <= len(text) <= 8:
score += 30
elif 4 <= len(text) <= 9:
score += 20
elif 3 <= len(text) <= 10:
score += 10
else:
score += 5
# Character composition
has_letter = any(c.isalpha() for c in text)
has_number = any(c.isdigit() for c in text)
if has_letter and has_number:
score += 25
elif has_letter or has_number:
score += 15
# Maryland-specific patterns
if len(text) == 7:
if text[:3].isalpha() and text[3:].isdigit():
score += 30 # ABC1234
elif text[0].isdigit() and text[1:4].isalpha() and text[4:].isdigit():
score += 25 # 1ABC234
elif len(text) == 6:
if text[:3].isalpha() and text[3:].isdigit():
score += 25 # ABC123
elif text[:2].isalpha() and text[2:].isdigit():
score += 20 # AB1234
# Geometric bonus
score += candidate['likelihood_score'] * 0.3
# Penalize too many repeated characters
unique_chars = len(set(text))
if unique_chars < len(text) * 0.5:
score -= 15
# Bonus for reasonable character diversity
if unique_chars >= 3:
score += 10
return max(0, score)
def remove_overlapping_candidates(self, candidates, overlap_threshold=0.3):
"""Remove overlapping candidates, keeping highest scoring ones."""
if not candidates:
return []
# Sort by likelihood score
candidates.sort(key=lambda x: x['likelihood_score'], reverse=True)
unique = []
for candidate in candidates:
bbox1 = candidate['bbox']
is_duplicate = False
for existing in unique:
bbox2 = existing['bbox']
if self.calculate_overlap(bbox1, bbox2) > overlap_threshold:
is_duplicate = True
break
if not is_duplicate:
unique.append(candidate)
return unique
def calculate_overlap(self, bbox1, bbox2):
"""Calculate intersection over union."""
x1, y1, w1, h1 = bbox1
x2, y2, w2, h2 = bbox2
x_left = max(x1, x2)
y_top = max(y1, y2)
x_right = min(x1 + w1, x2 + w2)
y_bottom = min(y1 + h1, y2 + h2)
if x_right < x_left or y_bottom < y_top:
return 0.0
intersection = (x_right - x_left) * (y_bottom - y_top)
area1 = w1 * h1
area2 = w2 * h2
union = area1 + area2 - intersection
return intersection / union if union > 0 else 0.0
def process_image(self, image_path):
"""Process a single image with comprehensive detection."""
print(f"\n=== PROCESSING: {Path(image_path).name} ===")
# Load image
image = cv2.imread(str(image_path))
if image is None:
print(f"Could not load image: {image_path}")
return []
# Save original
cv2.imwrite(str(self.debug_dir / f"00_original_{Path(image_path).stem}.jpg"), image)
# Comprehensive preprocessing
preprocessed_images = self.comprehensive_preprocessing(image)
all_candidates = []
method_count = 0
print(f"Testing {len(preprocessed_images)} preprocessing methods...")
for preprocess_name, preprocessed_img in preprocessed_images:
print(f"\n Preprocessing: {preprocess_name}")
# Save preprocessed image
cv2.imwrite(str(self.debug_dir / f"01_{preprocess_name}_{Path(image_path).stem}.jpg"), preprocessed_img)
# Comprehensive edge detection
edge_methods = self.comprehensive_edge_detection(preprocessed_img)
for edge_name, edge_img in edge_methods:
method_name = f"{preprocess_name}_{edge_name}"
method_count += 1
# Save edge image
cv2.imwrite(str(self.debug_dir / f"02_{method_name}_{Path(image_path).stem}.jpg"), edge_img)
# Find candidates
candidates = self.find_all_potential_plates(edge_img, method_name)
if candidates:
print(f" {edge_name}: {len(candidates)} candidates")
# Add method info
for candidate in candidates:
candidate['full_method'] = method_name
candidate['preprocessing'] = preprocess_name
candidate['edge_detection'] = edge_name
all_candidates.extend(candidates)
# Save visualization for this method
if len(candidates) > 0:
self.save_comprehensive_visualization(image, candidates, method_name, top_n=10)
print(f"\nProcessed {method_count} total method combinations")
print(f"Found {len(all_candidates)} total candidates")
if not all_candidates:
print("No candidates found even with aggressive parameters!")
return []
# Remove overlapping candidates
unique_candidates = self.remove_overlapping_candidates(all_candidates, overlap_threshold=0.2)
print(f"After removing overlaps: {len(unique_candidates)} unique candidates")
# Sort by likelihood score
unique_candidates.sort(key=lambda x: x['likelihood_score'], reverse=True)
# Show top candidates
print(f"\nTop 20 candidates by likelihood score:")
for i, candidate in enumerate(unique_candidates[:20], 1):
x, y, w, h = candidate['bbox']
print(f" {i:2d}. {w:3d}x{h:3d} at ({x:3d},{y:3d}) - Score: {candidate['likelihood_score']:5.1f} - AR: {candidate['aspect_ratio']:.2f} - {candidate['full_method']}")
# Create overall visualization
self.save_comprehensive_visualization(image, unique_candidates, f"all_methods_{Path(image_path).stem}", top_n=30)
# Extract and OCR top candidates
print(f"\nExtracting and OCR'ing top 15 candidates...")
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
all_ocr_results = []
for i, candidate in enumerate(unique_candidates[:15]):
print(f"\nCandidate {i+1}: {candidate['bbox'][2]}x{candidate['bbox'][3]} (score: {candidate['likelihood_score']:.1f})")
ocr_results = self.extract_and_ocr_region(gray, candidate, i+1)
all_ocr_results.extend(ocr_results)
# Print OCR results for this candidate
for result in ocr_results:
if result['confidence'] > 10:
print(f" OCR: '{result['text']}' (conf: {result['confidence']:.1f}, {result['preprocessing']}+{result['config']})")
# Sort all OCR results by confidence
if all_ocr_results:
# Remove duplicates, keeping highest confidence
unique_ocr = {}
for result in all_ocr_results:
text = result['text']
if text not in unique_ocr or result['confidence'] > unique_ocr[text]['confidence']:
unique_ocr[text] = result
sorted_results = sorted(unique_ocr.values(), key=lambda x: x['confidence'], reverse=True)
print(f"\n=== ALL OCR RESULTS (Top 20) ===")
for i, result in enumerate(sorted_results[:20], 1):
print(f"{i:2d}. '{result['text']}' (confidence: {result['confidence']:.1f}) - {result['preprocessing']}+{result['config']}")
return [r['text'] for r in sorted_results if r['confidence'] > 5]
print("No valid OCR results found.")
return []
def analyze_project(self):
"""Analyze all images in the project."""
# Get all images from raw directory
image_files = list(self.raw_dir.glob('*'))
image_files = [f for f in image_files if f.suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp']]
if not image_files:
print(f"No image files found in {self.raw_dir}")
return
print(f"=== PROJECT ANALYSIS: {len(image_files)} images ===")
all_results = {}
for image_file in image_files:
results = self.process_image(image_file)
all_results[image_file.name] = results
# Generate comprehensive analysis
self.generate_comprehensive_analysis(all_results)
return all_results
def generate_comprehensive_analysis(self, all_results):
"""Generate comprehensive analysis and save to output folder."""
print(f"\n=== GENERATING COMPREHENSIVE ANALYSIS ===")
# Collect all unique candidates with their scores
all_unique_candidates = {}
for image_file, candidates in all_results.items():
for candidate in candidates:
if candidate not in all_unique_candidates:
base_score = self.score_maryland_likelihood(candidate)
# Add multi-image bonus
appearance_count = sum(1 for cands in all_results.values() if candidate in cands)
multi_image_bonus = (appearance_count - 1) * 25
total_score = base_score + multi_image_bonus
all_unique_candidates[candidate] = {
'base_score': base_score,
'multi_image_bonus': multi_image_bonus,
'total_score': total_score,
'appearances': appearance_count,
'sources': [img for img, cands in all_results.items() if candidate in cands]
}
# Sort by total score
ranked_candidates = sorted(all_unique_candidates.items(), key=lambda x: x[1]['total_score'], reverse=True)
# Save detailed results
output_file = self.output_dir / 'comprehensive_results.json'
output_data = {
'project_id': self.project_dir.name,
'analysis_date': str(Path().cwd()),
'total_images': len(all_results),
'total_candidates': len(all_unique_candidates),
'ranked_candidates': [
{
'text': candidate,
'total_score': scores['total_score'],
'base_score': scores['base_score'],
'multi_image_bonus': scores['multi_image_bonus'],
'appearances': scores['appearances'],
'sources': scores['sources']
}
for candidate, scores in ranked_candidates
],
'individual_results': all_results
}
with open(output_file, 'w') as f:
json.dump(output_data, f, indent=2)
# Save top candidates for easy access
top_candidates_file = self.output_dir / 'top_candidates.txt'
with open(top_candidates_file, 'w') as f:
f.write("=== TOP LICENSE PLATE CANDIDATES ===\n\n")
f.write("Rank Candidate Total Base Multi Appears Sources\n")
f.write("---- --------- ----- ---- ----- ------- -------\n")
for i, (candidate, scores) in enumerate(ranked_candidates[:30], 1):
sources = '+'.join([s.split('.')[0] for s in scores['sources']])
f.write(f"{i:3d}. {candidate:9s} {scores['total_score']:3.0f} {scores['base_score']:3.0f} {scores['multi_image_bonus']:3.0f} {scores['appearances']:7d} {sources}\n")
f.write(f"\n=== RECOMMENDATIONS ===\n")
f.write("Start Maryland DMV search with these high-scoring candidates:\n\n")
high_score_candidates = [item for item in ranked_candidates if item[1]['total_score'] >= 50]
for i, (candidate, scores) in enumerate(high_score_candidates[:15], 1):
confidence_level = "HIGH" if scores['total_score'] >= 80 else "MEDIUM" if scores['total_score'] >= 60 else "GOOD"
multi_img = " (BOTH IMAGES)" if scores['appearances'] > 1 else ""
f.write(f" {i:2d}. {candidate:8s} (Score: {scores['total_score']:3.0f}, {confidence_level}){multi_img}\n")
print(f"✓ Analysis complete!")
print(f"✓ Detailed results saved to: {output_file}")
print(f"✓ Top candidates saved to: {top_candidates_file}")
print(f"✓ Found {len(all_unique_candidates)} unique candidates")
print(f"{len([c for c in all_unique_candidates.values() if c['total_score'] >= 50])} high-confidence candidates")
def score_maryland_likelihood(self, text):
"""Score how likely a candidate is to be a Maryland license plate."""
if not text or len(text) < 2:
return 0
score = 0
# Length scoring - Maryland plates are typically 6-7 characters
if len(text) == 7:
score += 40
elif len(text) == 6:
score += 35
elif len(text) == 5:
score += 20
elif len(text) == 4:
score += 15
elif len(text) == 8:
score += 10
else:
score += 5
# Character composition
has_letter = any(c.isalpha() for c in text)
has_number = any(c.isdigit() for c in text)
if has_letter and has_number:
score += 30
elif has_letter or has_number:
score += 15
# Maryland-specific patterns
if len(text) == 7:
if text[:3].isalpha() and text[3:].isdigit():
score += 50 # ABC1234 - most common Maryland format
elif text[0].isdigit() and text[1:4].isalpha() and text[4:].isdigit():
score += 40 # 1ABC234 - also common
elif len(text) == 6:
if text[:3].isalpha() and text[3:].isdigit():
score += 40 # ABC123
elif text[:2].isalpha() and text[2:].isdigit():
score += 30 # AB1234
# Penalize very short results or all same character
if len(text) <= 2:
score -= 20
if len(set(text)) == 1: # All same character
score -= 30
# Bonus for realistic character diversity
unique_chars = len(set(text))
if unique_chars >= 4:
score += 15
elif unique_chars >= 3:
score += 10
# Penalize common OCR errors/noise
noise_patterns = ['SSS', 'EEE', 'AAA', 'OOO', '111', '000']
if text in noise_patterns:
score -= 40
# Penalize obviously wrong patterns
if re.match(r'^[A-Z]{1,2}$', text) or re.match(r'^[0-9]{1,2}$', text):
score -= 20
return max(0, score)
def main():
parser = argparse.ArgumentParser(description='License Plate Detection for Projects')
parser.add_argument('--project-id', type=int, required=True, help='Project ID')
parser.add_argument('--image', help='Specific image to process')
args = parser.parse_args()
project_dir = Path(f"projects/{args.project_id:03d}")
if not project_dir.exists():
print(f"Project {args.project_id:03d} does not exist. Create it first.")
return
detector = ProjectDetector(project_dir)
if args.image:
# Process specific image
image_path = project_dir / 'raw' / args.image
if not image_path.exists():
print(f"Image {args.image} not found in project {args.project_id:03d}")
return
results = detector.process_image(image_path)
print(f"\nResults for {args.image}: {len(results)} candidates")
for i, result in enumerate(results[:10], 1):
print(f" {i}. {result}")
else:
# Analyze entire project
detector.analyze_project()
if __name__ == '__main__':
main()

524
integrated_analysis.py Normal file
View File

@@ -0,0 +1,524 @@
#!/usr/bin/env python3
"""
Integrated License Plate Analysis
Combines multi-image analysis and super resolution analysis into a comprehensive workflow.
Generates per-image candidates, combined frequency analysis, super resolution images, and final results.
"""
import cv2
import numpy as np
import pytesseract
from PIL import Image, ImageEnhance
import os
import json
import re
import argparse
from pathlib import Path
from collections import defaultdict, Counter
from datetime import datetime
import subprocess
import sys
class IntegratedAnalyzer:
def __init__(self, project_dir):
self.project_dir = Path(project_dir)
self.raw_dir = self.project_dir / 'raw'
self.annotations_dir = self.project_dir / 'annotations'
self.debug_dir = self.project_dir / 'debug'
self.output_dir = self.project_dir / 'output'
# Ensure directories exist
self.debug_dir.mkdir(exist_ok=True)
self.output_dir.mkdir(exist_ok=True)
print(f"✓ Integrated Analyzer initialized for project {self.project_dir.name}")
def run_annotation_analysis(self):
"""Run annotation analysis to generate detection parameters."""
print("\n" + "="*60)
print("PHASE 0: ANNOTATION ANALYSIS & PARAMETER GENERATION")
print("="*60)
try:
result = subprocess.run([
sys.executable, 'annotate_project.py',
'--project-id', str(int(self.project_dir.name)),
'--analyze'
], capture_output=True, text=True, check=True)
print("✓ Annotation analysis completed")
# Check if detection parameters were generated
params_file = self.project_dir / 'debug' / 'detection_parameters.json'
if params_file.exists():
with open(params_file, 'r') as f:
params = json.load(f)
print(f"✓ Detection parameters generated:")
print(f" Width range: {params['min_width']} - {params['max_width']}")
print(f" Height range: {params['min_height']} - {params['max_height']}")
print(f" Aspect ratio: {params['min_aspect_ratio']:.2f} - {params['max_aspect_ratio']:.2f}")
return params
else:
print("⚠ Detection parameters not found")
return None
except subprocess.CalledProcessError as e:
print(f"✗ Annotation analysis failed: {e}")
if e.stderr:
print(f"Error: {e.stderr}")
return None
def run_multi_image_analysis(self):
"""Run multi-image analysis component."""
print("\n" + "="*60)
print("PHASE 1: MULTI-IMAGE ANALYSIS")
print("="*60)
try:
result = subprocess.run([
sys.executable, 'multi_image_analysis.py',
'--project-id', str(int(self.project_dir.name))
], capture_output=True, text=True, check=True)
print("✓ Multi-image analysis completed")
# Check if investigation report was generated
investigation_report = self.output_dir / 'investigation_report.json'
if investigation_report.exists():
with open(investigation_report, 'r') as f:
multi_image_results = json.load(f)
return multi_image_results
else:
print("⚠ Investigation report not found")
return None
except subprocess.CalledProcessError as e:
print(f"✗ Multi-image analysis failed: {e}")
if e.stderr:
print(f"Error: {e.stderr}")
return None
def run_super_resolution_analysis(self):
"""Run super resolution analysis component."""
print("\n" + "="*60)
print("PHASE 2: SUPER RESOLUTION ANALYSIS")
print("="*60)
try:
result = subprocess.run([
sys.executable, 'super_resolution_analysis.py',
'--project-id', str(int(self.project_dir.name))
], capture_output=True, text=True, check=True)
print("✓ Super resolution analysis completed")
# Check if super resolution report was generated
super_res_report = self.output_dir / 'super_resolution_report.json'
if super_res_report.exists():
with open(super_res_report, 'r') as f:
super_res_results = json.load(f)
return super_res_results
else:
print("⚠ Super resolution report not found")
return None
except subprocess.CalledProcessError as e:
print(f"✗ Super resolution analysis failed: {e}")
if e.stderr:
print(f"Error: {e.stderr}")
return None
def combine_and_rank_results(self, multi_image_results, super_res_results):
"""Combine results from both analyses and create final ranking."""
print("\n" + "="*60)
print("PHASE 3: RESULT INTEGRATION & RANKING")
print("="*60)
combined_candidates = {}
# Process multi-image results
if multi_image_results and 'top_candidates' in multi_image_results:
print(f"Processing {len(multi_image_results['top_candidates'])} multi-image candidates...")
for candidate in multi_image_results['top_candidates']:
text = candidate['text']
if text not in combined_candidates:
combined_candidates[text] = {
'text': text,
'multi_image_score': candidate['final_score'],
'super_res_score': 0,
'combined_score': 0,
'multi_image_data': candidate,
'super_res_data': None,
'sources': []
}
# Add source information
if 'sources' in candidate:
combined_candidates[text]['sources'].extend(candidate['sources'])
# Process super resolution results
if super_res_results and 'top_candidates' in super_res_results:
print(f"Processing {len(super_res_results['top_candidates'])} super resolution candidates...")
for candidate in super_res_results['top_candidates']:
text = candidate['text']
if text not in combined_candidates:
combined_candidates[text] = {
'text': text,
'multi_image_score': 0,
'super_res_score': candidate['confidence'],
'combined_score': 0,
'multi_image_data': None,
'super_res_data': candidate,
'sources': []
}
else:
combined_candidates[text]['super_res_score'] = candidate['confidence']
combined_candidates[text]['super_res_data'] = candidate
# Add source information
if 'source_image' in candidate:
combined_candidates[text]['sources'].append(candidate['source_image'])
# Calculate combined scores
for text, data in combined_candidates.items():
# Base score is the higher of the two methods
base_score = max(data['multi_image_score'], data['super_res_score'])
# Cross-validation bonus if both methods found it
cross_validation_bonus = 0
if data['multi_image_score'] > 0 and data['super_res_score'] > 0:
cross_validation_bonus = 25
print(f" Cross-validated: '{text}' (multi: {data['multi_image_score']:.1f}, super: {data['super_res_score']:.1f})")
# Multi-source bonus
unique_sources = len(set(data['sources']))
multi_source_bonus = (unique_sources - 1) * 10
data['combined_score'] = base_score + cross_validation_bonus + multi_source_bonus
data['cross_validation_bonus'] = cross_validation_bonus
data['multi_source_bonus'] = multi_source_bonus
data['unique_sources'] = unique_sources
# Sort by combined score
ranked_results = sorted(combined_candidates.values(), key=lambda x: x['combined_score'], reverse=True)
print(f"\n✓ Combined {len(combined_candidates)} unique candidates")
cross_validated = len([c for c in ranked_results if c['cross_validation_bonus'] > 0])
print(f"{cross_validated} candidates cross-validated between methods")
return ranked_results
def generate_per_image_breakdown(self, multi_image_results):
"""Generate per-image candidate breakdown."""
per_image_breakdown = {}
if multi_image_results and 'image_breakdown' in multi_image_results:
for image_name, data in multi_image_results['image_breakdown'].items():
per_image_breakdown[image_name] = {
'candidate_count': data['candidate_count'],
'top_candidates': data['top_candidates']
}
return per_image_breakdown
def generate_frequency_analysis(self, ranked_results):
"""Generate character frequency analysis for final ranking."""
print("\n" + "="*60)
print("PHASE 4: CHARACTER FREQUENCY ANALYSIS")
print("="*60)
# Group high-confidence candidates by length
high_confidence = [c for c in ranked_results if c['combined_score'] >= 60]
if not high_confidence:
print("No high-confidence candidates for frequency analysis")
return {}
length_groups = defaultdict(list)
for candidate in high_confidence:
text = candidate['text']
if len(text) >= 4: # Only analyze reasonable length plates
length_groups[len(text)].append(candidate)
frequency_results = {}
for length, candidates in length_groups.items():
if len(candidates) >= 2: # Need at least 2 candidates
print(f"\nAnalyzing {len(candidates)} candidates of length {length}:")
# Count character frequency at each position, weighted by combined score
position_chars = defaultdict(lambda: defaultdict(float))
for candidate in candidates:
text = candidate['text']
weight = candidate['combined_score'] / 100.0
for pos, char in enumerate(text):
position_chars[pos][char] += weight
# Build most likely string
most_likely = ""
total_confidence = 0
position_details = []
for pos in range(length):
if pos in position_chars:
char_scores = position_chars[pos]
best_char = max(char_scores.items(), key=lambda x: x[1])
most_likely += best_char[0]
# Calculate position confidence
total_weight = sum(char_scores.values())
position_confidence = (best_char[1] / total_weight) * 100
total_confidence += position_confidence
# Show alternatives
alternatives = sorted(char_scores.items(), key=lambda x: x[1], reverse=True)[1:3]
alt_str = ", ".join([f"{char}({score:.1f})" for char, score in alternatives])
position_details.append({
'position': pos,
'character': best_char[0],
'confidence': position_confidence,
'alternatives': alternatives
})
print(f" Position {pos}: '{best_char[0]}' ({position_confidence:.1f}%) - Alt: {alt_str}")
if most_likely:
avg_confidence = total_confidence / length
maryland_score = self.score_maryland_likelihood(most_likely)
frequency_results[length] = {
'most_likely_plate': most_likely,
'average_confidence': avg_confidence,
'maryland_score': maryland_score,
'total_score': avg_confidence + maryland_score,
'candidate_count': len(candidates),
'position_details': position_details
}
print(f" → Most likely: '{most_likely}' (avg conf: {avg_confidence:.1f}%, MD score: {maryland_score})")
return frequency_results
def score_maryland_likelihood(self, text):
"""Score how likely a candidate is to be a Maryland license plate."""
if not text or len(text) < 2:
return 0
score = 0
# Length scoring
if len(text) == 7:
score += 40
elif len(text) == 6:
score += 35
elif len(text) == 5:
score += 20
else:
score += 10
# Character composition
has_letter = any(c.isalpha() for c in text)
has_number = any(c.isdigit() for c in text)
if has_letter and has_number:
score += 30
elif has_letter or has_number:
score += 15
# Maryland patterns
if len(text) == 7:
if text[:3].isalpha() and text[3:].isdigit():
score += 50
elif text[0].isdigit() and text[1:4].isalpha() and text[4:].isdigit():
score += 40
elif len(text) == 6:
if text[:3].isalpha() and text[3:].isdigit():
score += 40
elif text[:2].isalpha() and text[2:].isdigit():
score += 30
return max(0, score)
def generate_comprehensive_report(self, ranked_results, per_image_breakdown, frequency_results, multi_image_results, super_res_results):
"""Generate the final comprehensive investigation report."""
print("\n" + "="*60)
print("PHASE 5: COMPREHENSIVE REPORT GENERATION")
print("="*60)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Calculate statistics
total_candidates = len(ranked_results)
high_confidence = len([c for c in ranked_results if c['combined_score'] >= 80])
cross_validated = len([c for c in ranked_results if c['cross_validation_bonus'] > 0])
# Create comprehensive report
comprehensive_report = {
'analysis_type': 'integrated_comprehensive',
'timestamp': timestamp,
'project_id': self.project_dir.name,
'summary': {
'total_unique_candidates': total_candidates,
'high_confidence_candidates': high_confidence,
'cross_validated_candidates': cross_validated,
'images_processed': len(per_image_breakdown),
'frequency_analysis_lengths': list(frequency_results.keys())
},
'ranked_candidates': ranked_results[:30],
'per_image_breakdown': per_image_breakdown,
'frequency_analysis': frequency_results,
'methodology': {
'multi_image_analysis': multi_image_results is not None,
'super_resolution_analysis': super_res_results is not None,
'cross_validation': True,
'character_frequency_analysis': True
}
}
# Save JSON report
json_file = self.output_dir / 'comprehensive_analysis.json'
with open(json_file, 'w') as f:
json.dump(comprehensive_report, f, indent=2)
# Generate human-readable summary
summary_file = self.output_dir / 'analysis_results.txt'
with open(summary_file, 'w') as f:
f.write("=" * 70 + "\n")
f.write("LICENSE PLATE ANALYSIS RESULTS\n")
f.write("=" * 70 + "\n")
f.write(f"Generated: {timestamp}\n")
f.write(f"Project: {self.project_dir.name}\n\n")
f.write("SUMMARY:\n")
f.write(f" Total unique candidates identified: {total_candidates}\n")
f.write(f" High confidence candidates (80+): {high_confidence}\n")
f.write(f" Cross-validated candidates: {cross_validated}\n")
f.write(f" Images processed: {len(per_image_breakdown)}\n\n")
f.write("ANALYSIS METHODS:\n")
f.write(" Multi-image cross-validation\n")
f.write(" Super resolution enhancement\n")
f.write(" Character frequency analysis\n")
f.write(" Maryland license plate pattern matching\n\n")
# Per-image breakdown
f.write("PER-IMAGE CANDIDATE BREAKDOWN:\n")
f.write("-" * 40 + "\n")
for image_name, data in per_image_breakdown.items():
f.write(f"{image_name}: {data['candidate_count']} candidates\n")
for i, candidate in enumerate(data['top_candidates'][:5], 1):
f.write(f" {i}. {candidate}\n")
f.write("\n")
# Frequency analysis results
if frequency_results:
f.write("CHARACTER FREQUENCY ANALYSIS:\n")
f.write("-" * 40 + "\n")
for length, result in sorted(frequency_results.items()):
f.write(f"Length {length}: '{result['most_likely_plate']}'\n")
f.write(f" Confidence: {result['average_confidence']:.1f}%\n")
f.write(f" Maryland Score: {result['maryland_score']}\n")
f.write(f" Based on {result['candidate_count']} candidates\n\n")
# Top candidates
f.write("RANKED CANDIDATES:\n")
f.write("-" * 50 + "\n")
f.write("Rank Candidate Combined Multi Super Cross Sources\n")
f.write("---- --------- -------- ----- ----- ----- -------\n")
for i, candidate in enumerate(ranked_results[:20], 1):
sources_str = str(candidate['unique_sources'])
cross_mark = "" if candidate['cross_validation_bonus'] > 0 else " "
f.write(f"{i:3d}. {candidate['text']:9s} {candidate['combined_score']:6.1f} {candidate['multi_image_score']:4.1f} {candidate['super_res_score']:4.1f} {cross_mark:1s} {sources_str:4s}\n")
f.write(f"\nSCORE LEGEND:\n")
f.write(" Combined: Final ranking score\n")
f.write(" Multi: Multi-image analysis score\n")
f.write(" Super: Super resolution analysis score\n")
f.write(" Cross: ✓ = Cross-validated between methods\n")
f.write(" Sources: Number of unique source images\n")
print(f"✓ Analysis report generated:")
print(f" Detailed JSON: {json_file}")
print(f" Results summary: {summary_file}")
return comprehensive_report
def analyze_project(self):
"""Run the complete integrated analysis workflow."""
print("=" * 70)
print("LICENSE PLATE ANALYSIS")
print("=" * 70)
print(f"Project: {self.project_dir.name}")
# Phase 0: Annotation analysis
self.run_annotation_analysis()
# Phase 1: Multi-image analysis
multi_image_results = self.run_multi_image_analysis()
# Phase 2: Super resolution analysis
super_res_results = self.run_super_resolution_analysis()
# Check if we have any results
if not multi_image_results and not super_res_results:
print("\n✗ No analysis results generated. Check for errors above.")
return None
# Phase 3: Combine and rank results
ranked_results = self.combine_and_rank_results(multi_image_results, super_res_results)
# Phase 4: Generate per-image breakdown
per_image_breakdown = self.generate_per_image_breakdown(multi_image_results)
# Phase 5: Generate frequency analysis
frequency_results = self.generate_frequency_analysis(ranked_results)
# Phase 6: Generate comprehensive report
comprehensive_report = self.generate_comprehensive_report(
ranked_results, per_image_breakdown, frequency_results,
multi_image_results, super_res_results
)
# Display final summary
print("\n" + "=" * 70)
print("ANALYSIS COMPLETE")
print("=" * 70)
if ranked_results:
print(f"Top 10 candidates:")
for i, candidate in enumerate(ranked_results[:10], 1):
cross_mark = " (CROSS-VALIDATED)" if candidate['cross_validation_bonus'] > 0 else ""
print(f" {i:2d}. {candidate['text']:8s} (Score: {candidate['combined_score']:5.1f}){cross_mark}")
if frequency_results:
print(f"\nFrequency analysis results:")
for length, result in sorted(frequency_results.items(), key=lambda x: x[1]['total_score'], reverse=True):
print(f" Length {length}: '{result['most_likely_plate']}' (score: {result['total_score']:.1f})")
print(f"\nResults saved to: {self.output_dir}")
return comprehensive_report
def main():
parser = argparse.ArgumentParser(description='Integrated License Plate Analysis')
parser.add_argument('--project-id', type=int, required=True, help='Project ID')
args = parser.parse_args()
project_dir = Path(f"projects/{args.project_id:03d}")
if not project_dir.exists():
print(f"Project {args.project_id:03d} does not exist.")
return
analyzer = IntegratedAnalyzer(project_dir)
analyzer.analyze_project()
if __name__ == '__main__':
main()

652
multi_image_analysis.py Normal file
View File

@@ -0,0 +1,652 @@
#!/usr/bin/env python3
"""
Multi-Image License Plate Analysis
Analyzes multiple images of the same vehicle to cross-validate license plate candidates.
"""
import cv2
import numpy as np
import pytesseract
from PIL import Image
import os
import json
import re
import argparse
import time
from pathlib import Path
from collections import defaultdict, Counter
from datetime import datetime
class MultiImageAnalyzer:
def __init__(self, project_dir):
self.project_dir = Path(project_dir)
self.raw_dir = self.project_dir / 'raw'
self.debug_dir = self.project_dir / 'debug'
self.output_dir = self.project_dir / 'output'
# Ensure directories exist
self.debug_dir.mkdir(exist_ok=True)
self.output_dir.mkdir(exist_ok=True)
# Load project parameters if available
self.params = self.load_detection_parameters()
# Add timeout and optimization settings
self.max_ocr_time_per_region = 2.0 # seconds
self.max_candidates_per_image = 50
self.max_total_ocr_operations = 1000
def load_detection_parameters(self):
"""Load detection parameters or use aggressive defaults."""
params_file = self.project_dir / 'debug' / 'detection_parameters.json'
if params_file.exists():
with open(params_file, 'r') as f:
params = json.load(f)
# Check if parameters are too restrictive for edge detection
aspect_range = params['max_aspect_ratio'] - params['min_aspect_ratio']
if aspect_range < 1.0: # Very narrow aspect ratio range
print(f"⚠ Annotation parameters too restrictive (aspect range: {aspect_range:.2f})")
print(f" Using more lenient parameters for edge detection")
# Use more lenient parameters for edge detection
lenient_params = {
'min_width': max(50, params['min_width'] // 2),
'max_width': min(1500, params['max_width'] * 2),
'min_height': max(25, params['min_height'] // 2),
'max_height': min(800, params['max_height'] * 2),
'min_aspect_ratio': max(1.0, params['min_aspect_ratio'] - 1.0),
'max_aspect_ratio': min(8.0, params['max_aspect_ratio'] + 2.0),
'min_area': max(1000, params['min_area'] // 4),
'max_area': min(1000000, params['max_area'] * 3)
}
return lenient_params
else:
print(f"✓ Using project-specific detection parameters")
return params
else:
# Use aggressive default parameters for maximum detection
params = {
'min_width': 30,
'max_width': 1200,
'min_height': 15,
'max_height': 600,
'min_aspect_ratio': 0.8,
'max_aspect_ratio': 12.0,
'min_area': 450,
'max_area': 720000
}
print("⚠ Using aggressive default parameters")
return params
def score_maryland_likelihood(self, text):
"""Score how likely a candidate is to be a Maryland license plate."""
if not text or len(text) < 2:
return 0
score = 0
# Length scoring - Maryland plates are typically 6-7 characters
if len(text) == 7:
score += 40
elif len(text) == 6:
score += 35
elif len(text) == 5:
score += 20
elif len(text) == 4:
score += 15
elif len(text) == 8:
score += 10
else:
score += 5
# Character composition
has_letter = any(c.isalpha() for c in text)
has_number = any(c.isdigit() for c in text)
if has_letter and has_number:
score += 30
elif has_letter:
score += 15
elif has_number:
score += 10
# Common Maryland patterns
if len(text) >= 6:
# Check for common patterns like 1ABC234 or ABC123
if (text[0].isdigit() and text[1:4].isalpha() and text[4:].isdigit()):
score += 25
elif (text[:3].isalpha() and text[3:].isdigit()):
score += 20
# Penalize unlikely characters
unlikely_chars = ['Q', 'Z', 'X', 'V', 'J']
for char in unlikely_chars:
if char in text:
score -= 5
return max(0, score)
def process_single_image_aggressive(self, image_path):
"""Process a single image with aggressive detection to find all possible candidates."""
print(f"\nProcessing: {Path(image_path).name}")
image = cv2.imread(str(image_path))
if image is None:
print(f"Could not load image: {image_path}")
return []
print(f" Loading image...")
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
print(f" Image loaded: {gray.shape[1]}x{gray.shape[0]}")
# Reduced preprocessing methods for efficiency
print(f" Applying preprocessing methods...")
preprocessed = []
# Original
print(f" - Original")
preprocessed.append(('original', gray))
# Enhanced contrast (most effective)
print(f" - Enhanced contrast (CLAHE)")
clahe = cv2.createCLAHE(clipLimit=4.0, tileGridSize=(8,8))
enhanced = clahe.apply(gray)
preprocessed.append(('enhanced', enhanced))
# Histogram equalization
print(f" - Histogram equalization")
hist_eq = cv2.equalizeHist(gray)
preprocessed.append(('hist_eq', hist_eq))
print(f" ✓ Preprocessing complete ({len(preprocessed)} methods)")
all_candidates = []
total_combinations = len(preprocessed) * 2 # Reduced to 2 edge detection methods
current_combination = 0
total_ocr_operations = 0
print(f" Processing {total_combinations} preprocessing + edge detection combinations...")
for preprocess_name, processed_img in preprocessed:
print(f" Processing {preprocess_name}...")
# Reduced edge detection methods
edge_methods = []
# Canny with mid threshold (most effective)
print(f" - Canny (mid threshold)")
edge_methods.append(('canny_mid', cv2.Canny(processed_img, 50, 150)))
current_combination += 1
# Sobel (alternative method)
print(f" - Sobel edge detection")
sobelx = cv2.Sobel(processed_img, cv2.CV_64F, 1, 0, ksize=3)
sobely = cv2.Sobel(processed_img, cv2.CV_64F, 0, 1, ksize=3)
sobel = np.sqrt(sobelx**2 + sobely**2)
sobel_norm = np.uint8(sobel * 255 / np.max(sobel))
edge_methods.append(('sobel', sobel_norm))
current_combination += 1
print(f" ✓ Edge detection complete for {preprocess_name}")
for edge_name, edge_img in edge_methods:
print(f" Finding potential plates with {edge_name}...")
candidates = self.find_potential_plates(edge_img, f"{preprocess_name}_{edge_name}")
print(f" Found {len(candidates)} potential plate regions")
# Limit candidates per method to prevent excessive processing
if len(candidates) > self.max_candidates_per_image // len(edge_methods):
candidates = candidates[:self.max_candidates_per_image // len(edge_methods)]
print(f" Limited to {len(candidates)} candidates for efficiency")
# Extract text from each candidate
for i, candidate in enumerate(candidates):
if total_ocr_operations >= self.max_total_ocr_operations:
print(f" ⚠ Reached maximum OCR operations limit ({self.max_total_ocr_operations})")
break
x, y, w, h = candidate['bbox']
# Extract region with padding
padding = max(5, min(w, h) // 10)
x1 = max(0, x - padding)
y1 = max(0, y - padding)
x2 = min(gray.shape[1], x + w + padding)
y2 = min(gray.shape[0], y + h + padding)
region = gray[y1:y2, x1:x2]
if region.size > 0:
print(f" Extracting text from region {i+1}/{len(candidates)} ({w}x{h})...")
start_time = time.time()
ocr_results = self.extract_text_from_region_optimized(region)
ocr_time = time.time() - start_time
total_ocr_operations += 1
print(f" OCR found {len(ocr_results)} text candidates (took {ocr_time:.2f}s)")
for text in ocr_results:
if len(text) >= 3: # Minimum length
all_candidates.append({
'text': text,
'confidence': self.score_maryland_likelihood(text),
'method': f"{preprocess_name}_{edge_name}",
'bbox': candidate['bbox'],
'source_image': Path(image_path).name
})
if total_ocr_operations >= self.max_total_ocr_operations:
break
if total_ocr_operations >= self.max_total_ocr_operations:
break
print(f" ✓ All combinations processed (total OCR operations: {total_ocr_operations})")
# Remove duplicates and sort by confidence
print(f" Removing duplicates and sorting...")
unique_candidates = {}
for candidate in all_candidates:
text = candidate['text']
if text not in unique_candidates or candidate['confidence'] > unique_candidates[text]['confidence']:
unique_candidates[text] = candidate
sorted_candidates = sorted(unique_candidates.values(), key=lambda x: x['confidence'], reverse=True)
print(f" Found {len(sorted_candidates)} unique candidates")
for i, candidate in enumerate(sorted_candidates[:10], 1):
print(f" {i:2d}. {candidate['text']:8s} (conf: {candidate['confidence']:3.0f})")
return sorted_candidates
def find_potential_plates(self, edge_image, method_name):
"""Find potential license plate regions."""
contours, _ = cv2.findContours(edge_image, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
candidates = []
for contour in contours:
x, y, w, h = cv2.boundingRect(contour)
aspect_ratio = w / float(h)
area = cv2.contourArea(contour)
# Apply loose filtering
if (self.params['min_width'] <= w <= self.params['max_width'] and
self.params['min_height'] <= h <= self.params['max_height'] and
self.params['min_aspect_ratio'] <= aspect_ratio <= self.params['max_aspect_ratio'] and
self.params['min_area'] <= area <= self.params['max_area']):
candidates.append({
'bbox': (x, y, w, h),
'aspect_ratio': aspect_ratio,
'area': area,
'method': method_name
})
return candidates
def extract_text_from_region_optimized(self, region):
"""Extract text from a region using optimized OCR configurations."""
results = []
start_time = time.time()
# Reduced preprocessing for efficiency
preprocessed_regions = []
# Original
preprocessed_regions.append(region)
# Otsu thresholding (most effective)
_, otsu = cv2.threshold(region, 0, 255, cv2.THRESH_BINARY | cv2.THRESH_OTSU)
preprocessed_regions.append(otsu)
# Enhanced contrast for small regions
if region.shape[1] < 100 or region.shape[0] < 25:
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
enhanced = clahe.apply(region)
preprocessed_regions.append(enhanced)
# Reduced OCR configurations
ocr_configs = [
'--psm 8 -c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789',
'--psm 7 -c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789',
'--psm 13' # Raw line
]
for processed_region in preprocessed_regions:
# Check timeout
if time.time() - start_time > self.max_ocr_time_per_region:
print(f" ⚠ OCR timeout for region")
break
for config in ocr_configs:
try:
pil_img = Image.fromarray(processed_region)
text = pytesseract.image_to_string(pil_img, config=config).strip()
# Clean text
clean_text = re.sub(r'[^A-Z0-9]', '', text.upper())
if len(clean_text) >= 3:
results.append(clean_text)
except Exception as e:
continue
return list(set(results)) # Remove duplicates
def cross_validate_candidates(self, all_image_results):
"""Cross-validate license plate candidates across multiple images."""
print(f" Collecting candidates from all images...")
# Collect all candidates
all_candidates = []
for image_results in all_image_results.values():
all_candidates.extend(image_results)
print(f" Total candidates collected: {len(all_candidates)}")
if not all_candidates:
print(" No candidates found across any images.")
return []
print(f" Analyzing candidate statistics...")
# Count occurrences and calculate scores
candidate_stats = defaultdict(lambda: {
'occurrences': 0,
'total_confidence': 0,
'sources': [],
'methods': [],
'best_confidence': 0
})
for candidate in all_candidates:
text = candidate['text']
stats = candidate_stats[text]
stats['occurrences'] += 1
stats['total_confidence'] += candidate['confidence']
stats['sources'].append(candidate['source_image'])
stats['methods'].append(candidate['method'])
stats['best_confidence'] = max(stats['best_confidence'], candidate['confidence'])
print(f" Unique candidates found: {len(candidate_stats)}")
print(f" Calculating final scores...")
# Calculate final scores
scored_candidates = []
for text, stats in candidate_stats.items():
# Base score is the best confidence from any image
base_score = stats['best_confidence']
# Multi-image bonus (appears in multiple images)
multi_image_bonus = (len(set(stats['sources'])) - 1) * 30
# Frequency bonus (appears multiple times)
frequency_bonus = (stats['occurrences'] - 1) * 10
# Consistency bonus (appears with different methods)
method_diversity = len(set(stats['methods']))
consistency_bonus = method_diversity * 5
final_score = base_score + multi_image_bonus + frequency_bonus + consistency_bonus
scored_candidates.append({
'text': text,
'final_score': final_score,
'base_score': base_score,
'multi_image_bonus': multi_image_bonus,
'frequency_bonus': frequency_bonus,
'consistency_bonus': consistency_bonus,
'occurrences': stats['occurrences'],
'image_count': len(set(stats['sources'])),
'sources': list(set(stats['sources'])),
'avg_confidence': stats['total_confidence'] / stats['occurrences']
})
# Sort by final score
scored_candidates.sort(key=lambda x: x['final_score'], reverse=True)
print(f" ✓ Cross-validation complete: {len(scored_candidates)} scored candidates")
return scored_candidates
def analyze_character_patterns(self, scored_candidates):
"""Analyze character patterns across candidates to find the most likely plate."""
print(f"\n=== CHARACTER PATTERN ANALYSIS ===")
# Group by length
length_groups = defaultdict(list)
for candidate in scored_candidates:
if candidate['final_score'] >= 30: # Only consider reasonable candidates
length_groups[len(candidate['text'])].append(candidate)
pattern_results = []
for length, candidates in length_groups.items():
if len(candidates) >= 2 and length >= 4: # Need multiple candidates of reasonable length
print(f"\nAnalyzing {len(candidates)} candidates of length {length}:")
# Count character frequency at each position
position_chars = defaultdict(lambda: defaultdict(float))
for candidate in candidates:
text = candidate['text']
weight = candidate['final_score'] / 100.0 # Convert score to weight
for pos, char in enumerate(text):
position_chars[pos][char] += weight
# Build most likely string
most_likely = ""
total_confidence = 0
for pos in range(length):
if pos in position_chars:
char_scores = position_chars[pos]
best_char = max(char_scores.items(), key=lambda x: x[1])
most_likely += best_char[0]
# Calculate position confidence
total_weight = sum(char_scores.values())
position_confidence = (best_char[1] / total_weight) * 100
total_confidence += position_confidence
print(f" Position {pos}: '{best_char[0]}' (confidence: {position_confidence:.1f}%)")
if most_likely:
avg_confidence = total_confidence / length
maryland_score = self.score_maryland_likelihood(most_likely)
pattern_results.append({
'text': most_likely,
'pattern_confidence': avg_confidence,
'maryland_score': maryland_score,
'total_score': avg_confidence + maryland_score,
'candidate_count': len(candidates),
'length': length
})
print(f" Most likely: '{most_likely}' (pattern conf: {avg_confidence:.1f}%, MD score: {maryland_score})")
return pattern_results
def generate_investigation_report(self, all_image_results, scored_candidates, pattern_results):
"""Generate a comprehensive investigation report."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Count statistics
total_images = len(all_image_results)
total_candidates = sum(len(results) for results in all_image_results.values())
unique_candidates = len(scored_candidates)
high_confidence = len([c for c in scored_candidates if c['final_score'] >= 80])
cross_image = len([c for c in scored_candidates if c['image_count'] > 1])
report = {
'investigation_summary': {
'timestamp': timestamp,
'total_images_processed': total_images,
'total_candidates_found': total_candidates,
'unique_candidates': unique_candidates,
'high_confidence_candidates': high_confidence,
'cross_image_candidates': cross_image
},
'top_candidates': scored_candidates[:20],
'pattern_analysis': pattern_results,
'image_breakdown': {}
}
# Add per-image breakdown
for image_name, results in all_image_results.items():
report['image_breakdown'][image_name] = {
'candidate_count': len(results),
'top_candidates': [r['text'] for r in results[:5]]
}
# Save detailed JSON report
json_file = self.output_dir / 'investigation_report.json'
with open(json_file, 'w') as f:
json.dump(report, f, indent=2)
# Save human-readable summary
summary_file = self.output_dir / 'investigation_summary.txt'
with open(summary_file, 'w') as f:
f.write("=== LICENSE PLATE RECOGNITION REPORT ===\n")
f.write(f"Generated: {timestamp}\n\n")
f.write("SUMMARY:\n")
f.write(f" Images processed: {total_images}\n")
f.write(f" Total candidates: {total_candidates}\n")
f.write(f" Unique candidates: {unique_candidates}\n")
f.write(f" High confidence: {high_confidence}\n")
f.write(f" Cross-image validation: {cross_image}\n\n")
f.write("TOP CANDIDATES FOR DMV SEARCH:\n")
f.write("Rank Candidate Score Images Occurrences Sources\n")
f.write("---- --------- ----- ------ ----------- -------\n")
for i, candidate in enumerate(scored_candidates[:20], 1):
sources = '+'.join([s.split('.')[0] for s in candidate['sources']])
f.write(f"{i:3d}. {candidate['text']:9s} {candidate['final_score']:3.0f} {candidate['image_count']:3d} {candidate['occurrences']:6d} {sources}\n")
if pattern_results:
f.write(f"\nPATTERN ANALYSIS RESULTS:\n")
for i, pattern in enumerate(sorted(pattern_results, key=lambda x: x['total_score'], reverse=True), 1):
f.write(f" {i}. '{pattern['text']}' (total score: {pattern['total_score']:.1f})\n")
f.write(f" Pattern confidence: {pattern['pattern_confidence']:.1f}%\n")
f.write(f" Maryland score: {pattern['maryland_score']}\n")
f.write(f" Based on {pattern['candidate_count']} candidates\n\n")
f.write("RECOMMENDATIONS:\n")
f.write("1. Start DMV search with candidates scoring 80+\n")
f.write("2. Consider pattern analysis results for additional validation\n")
f.write("3. Pay attention to candidates appearing in multiple images\n")
f.write("4. Check character substitutions (O/0, I/1, S/5, etc.)\n")
print(f"\n✓ Investigation report saved:")
print(f" Detailed: {json_file}")
print(f" Summary: {summary_file}")
return report
def analyze_project(self):
"""Analyze all images in the project for license plate candidates."""
# Get all images
image_files = list(self.raw_dir.glob('*'))
image_files = [f for f in image_files if f.suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp']]
if not image_files:
print(f"No image files found in {self.raw_dir}")
return
print(f"=== MULTI-IMAGE LICENSE PLATE ANALYSIS ===")
print(f"Processing {len(image_files)} images for cross-validation")
# Add overall timeout for the entire analysis
max_total_time = 300 # 5 minutes
start_time = time.time()
# Process each image
all_image_results = {}
for i, image_file in enumerate(image_files, 1):
# Check overall timeout
if time.time() - start_time > max_total_time:
print(f"⚠ Overall analysis timeout reached ({max_total_time}s)")
break
print(f"\n{'='*60}")
print(f"IMAGE {i}/{len(image_files)}: {image_file.name}")
print(f"{'='*60}")
# Add per-image timeout
image_start_time = time.time()
max_image_time = 60 # 1 minute per image
try:
results = self.process_single_image_aggressive(image_file)
all_image_results[image_file.name] = results
image_time = time.time() - image_start_time
print(f"✓ Image processed in {image_time:.1f}s")
except Exception as e:
print(f"✗ Error processing image {image_file.name}: {e}")
all_image_results[image_file.name] = []
continue
if not all_image_results:
print("No images were successfully processed.")
return
# Cross-validate candidates
print(f"\n{'='*60}")
print(f"CROSS-VALIDATION PHASE")
print(f"{'='*60}")
scored_candidates = self.cross_validate_candidates(all_image_results)
# Analyze character patterns
print(f"\n{'='*60}")
print(f"PATTERN ANALYSIS PHASE")
print(f"{'='*60}")
pattern_results = self.analyze_character_patterns(scored_candidates)
# Generate report
report = self.generate_investigation_report(all_image_results, scored_candidates, pattern_results)
# Display key results
print(f"\n=== KEY FINDINGS ===")
print(f"Top 10 candidates for Maryland DMV search:")
for i, candidate in enumerate(scored_candidates[:10], 1):
multi_img = " (MULTIPLE IMAGES)" if candidate['image_count'] > 1 else ""
print(f" {i:2d}. {candidate['text']:8s} (Score: {candidate['final_score']:3.0f}){multi_img}")
if pattern_results:
best_pattern = max(pattern_results, key=lambda x: x['total_score'])
print(f"\nBest pattern analysis result: '{best_pattern['text']}' (score: {best_pattern['total_score']:.1f})")
total_time = time.time() - start_time
print(f"\n✓ Analysis completed in {total_time:.1f}s")
return report
def main():
parser = argparse.ArgumentParser(description='Multi-Image License Plate Analysis')
parser.add_argument('--project-id', type=int, required=True, help='Project ID')
args = parser.parse_args()
project_dir = Path(f"projects/{args.project_id:03d}")
if not project_dir.exists():
print(f"Project {args.project_id:03d} does not exist.")
return
analyzer = MultiImageAnalyzer(project_dir)
analyzer.analyze_project()
if __name__ == '__main__':
main()

292
project_manager.py Normal file
View File

@@ -0,0 +1,292 @@
#!/usr/bin/env python3
"""
Project Manager for License Plate OCR
Handles project creation, organization, and analysis workflow.
"""
import os
import json
import shutil
import argparse
from pathlib import Path
from datetime import datetime
class ProjectManager:
def __init__(self, base_dir="projects"):
self.base_dir = Path(base_dir)
self.base_dir.mkdir(exist_ok=True)
def get_next_project_id(self):
"""Get the next available project ID."""
existing_projects = []
for project_dir in self.base_dir.iterdir():
if project_dir.is_dir() and project_dir.name.isdigit():
existing_projects.append(int(project_dir.name))
return max(existing_projects, default=0) + 1
def create_project(self, description=""):
"""Create a new project with auto-incremented ID and folder structure."""
project_id = self.get_next_project_id()
project_dir = self.base_dir / f"{project_id:03d}"
# Create folder structure
folders = ['raw', 'annotations', 'debug', 'output']
for folder in folders:
(project_dir / folder).mkdir(parents=True, exist_ok=True)
# Create project metadata
metadata = {
'project_id': f"{project_id:03d}",
'description': description,
'created': datetime.now().isoformat(),
'status': 'created',
'image_count': 0,
'annotations_count': 0,
'analysis_count': 0
}
with open(project_dir / 'project.json', 'w') as f:
json.dump(metadata, f, indent=2)
print(f"✓ Created project {project_id:03d}")
print(f" Location: {project_dir}")
print(f" Folders: {', '.join(folders)}")
print(f"\nNext steps:")
print(f" 1. Copy your images to: {project_dir}/raw/")
print(f" 2. Run: python project_manager.py annotate {project_id:03d}")
print(f" 3. Run: python project_manager.py analyze {project_id:03d}")
return project_dir, project_id
def add_images(self, project_id, image_paths):
"""Add images to a project's raw folder."""
project_dir = self.base_dir / f"{project_id:03d}"
raw_dir = project_dir / 'raw'
if not project_dir.exists():
raise ValueError(f"Project {project_id:03d} does not exist")
added_images = []
for image_path in image_paths:
if os.path.exists(image_path):
filename = os.path.basename(image_path)
dest_path = raw_dir / filename
shutil.copy2(image_path, dest_path)
added_images.append(filename)
print(f" ✓ Added {filename}")
else:
print(f" ✗ Image not found: {image_path}")
# Update metadata
self._update_metadata(project_id, {'image_count': len(list(raw_dir.glob('*')))})
return added_images
def list_projects(self):
"""List all projects."""
projects = []
for project_dir in self.base_dir.iterdir():
if project_dir.is_dir() and (project_dir / 'project.json').exists():
with open(project_dir / 'project.json', 'r') as f:
metadata = json.load(f)
projects.append(metadata)
if not projects:
print("No projects found.")
return
print("Projects:")
for project in sorted(projects, key=lambda x: x['project_id']):
print(f" {project['project_id']}: {project['description']} ({project['status']})")
print(f" Images: {project['image_count']}, Annotations: {project['annotations_count']}")
def get_project_info(self, project_id):
"""Get detailed information about a project."""
project_dir = self.base_dir / f"{project_id:03d}"
metadata_file = project_dir / 'project.json'
if not metadata_file.exists():
raise ValueError(f"Project {project_id:03d} does not exist")
with open(metadata_file, 'r') as f:
metadata = json.load(f)
# Get file counts
raw_files = list((project_dir / 'raw').glob('*'))
annotation_files = list((project_dir / 'annotations').glob('*.json'))
output_files = list((project_dir / 'output').glob('*'))
debug_files = list((project_dir / 'debug').glob('*'))
print(f"Project {project_id:03d}: {metadata['description']}")
print(f" Created: {metadata['created']}")
print(f" Status: {metadata['status']}")
print(f" Raw images: {len(raw_files)}")
print(f" Annotations: {len(annotation_files)}")
print(f" Output files: {len(output_files)}")
print(f" Debug files: {len(debug_files)}")
if raw_files:
print(" Raw images:")
for img in raw_files:
print(f" - {img.name}")
return metadata
def run_annotate(self, project_id):
"""Run the annotation tool for a project."""
project_dir = self.base_dir / f"{project_id:03d}"
if not project_dir.exists():
print(f"Project {project_id:03d} does not exist.")
return False
raw_dir = project_dir / 'raw'
image_files = list(raw_dir.glob('*'))
image_files = [f for f in image_files if f.suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp']]
if not image_files:
print(f"No images found in {raw_dir}")
print("Please copy your images to the raw folder first.")
return False
print(f"Found {len(image_files)} images in project {project_id:03d}")
print("Starting annotation tool...")
import subprocess
import sys
try:
result = subprocess.run([sys.executable, 'annotate_project.py', '--project-id', str(project_id)],
check=True)
print("✓ Annotation completed")
return True
except subprocess.CalledProcessError:
print("✗ Annotation failed")
return False
def run_analysis(self, project_id):
"""Run comprehensive analysis (multi-image + super resolution)."""
project_dir = self.base_dir / f"{project_id:03d}"
if not project_dir.exists():
print(f"Project {project_id:03d} does not exist.")
return False
# Check for images
raw_dir = project_dir / 'raw'
image_files = list(raw_dir.glob('*'))
image_files = [f for f in image_files if f.suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp']]
if not image_files:
print(f"No images found in {raw_dir}")
print("Please copy your images to the raw folder first.")
return False
# Check for annotations
annotations_dir = project_dir / 'annotations'
annotation_files = list(annotations_dir.glob('*.json'))
if not annotation_files:
print(f"No annotations found for project {project_id:03d}")
print("Annotations are required for comprehensive analysis.")
response = input("Would you like to run annotation now? (y/n): ").lower().strip()
if response in ['y', 'yes']:
if not self.run_annotate(project_id):
return False
else:
print("Analysis cancelled. Please annotate first:")
print(f" python project_manager.py annotate {project_id:03d}")
return False
print(f"Running comprehensive analysis on project {project_id:03d}...")
import subprocess
import sys
try:
# Run integrated analysis
result = subprocess.run([sys.executable, 'integrated_analysis.py', '--project-id', str(project_id)],
check=True)
print("✓ Analysis completed")
# Update metadata
self._update_metadata(project_id, {
'status': 'analyzed',
'analysis_count': self._get_analysis_count(project_id)
})
return True
except subprocess.CalledProcessError:
print("✗ Analysis failed")
return False
def _get_analysis_count(self, project_id):
"""Count analysis files in output directory."""
project_dir = self.base_dir / f"{project_id:03d}"
output_dir = project_dir / 'output'
return len(list(output_dir.glob('*.json'))) + len(list(output_dir.glob('*.txt')))
def _update_metadata(self, project_id, updates):
"""Update project metadata."""
project_dir = self.base_dir / f"{project_id:03d}"
metadata_file = project_dir / 'project.json'
with open(metadata_file, 'r') as f:
metadata = json.load(f)
metadata.update(updates)
metadata['last_updated'] = datetime.now().isoformat()
with open(metadata_file, 'w') as f:
json.dump(metadata, f, indent=2)
def main():
parser = argparse.ArgumentParser(description='License Plate OCR Project Manager')
subparsers = parser.add_subparsers(dest='command', help='Available commands')
# Create command
create_parser = subparsers.add_parser('create', help='Create a new project')
create_parser.add_argument('description', help='Project description')
# Annotate command
annotate_parser = subparsers.add_parser('annotate', help='Run annotation tool')
annotate_parser.add_argument('project_id', type=int, help='Project ID')
# Analyze command
analyze_parser = subparsers.add_parser('analyze', help='Run comprehensive analysis')
analyze_parser.add_argument('project_id', type=int, help='Project ID')
# List command
list_parser = subparsers.add_parser('list', help='List all projects')
# Info command
info_parser = subparsers.add_parser('info', help='Show project details')
info_parser.add_argument('project_id', type=int, help='Project ID')
args = parser.parse_args()
if not args.command:
parser.print_help()
return
pm = ProjectManager()
if args.command == 'create':
project_dir, project_id = pm.create_project(args.description)
elif args.command == 'annotate':
pm.run_annotate(args.project_id)
elif args.command == 'analyze':
pm.run_analysis(args.project_id)
elif args.command == 'list':
pm.list_projects()
elif args.command == 'info':
pm.get_project_info(args.project_id)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,41 @@
{
"image_path": "projects/002/raw/vlcsnap-2025-07-29-12h10m46s029.png",
"image_shape": [
1080,
1920,
3
],
"annotations": [
{
"top_left": [
1385,
507
],
"bottom_right": [
1436,
544
],
"width": 51,
"height": 37,
"aspect_ratio": 1.3783783783783783
}
],
"statistics": {
"count": 1,
"avg_width": 51.0,
"avg_height": 37.0,
"avg_aspect_ratio": 1.3783783783783783,
"width_range": [
51,
51
],
"height_range": [
37,
37
],
"aspect_ratio_range": [
1.3783783783783783,
1.3783783783783783
]
}
}

View File

@@ -0,0 +1,41 @@
{
"image_path": "projects/002/raw/vlcsnap-2025-07-29-12h11m52s619.png",
"image_shape": [
1080,
1920,
3
],
"annotations": [
{
"top_left": [
1393,
512
],
"bottom_right": [
1443,
540
],
"width": 50,
"height": 28,
"aspect_ratio": 1.7857142857142858
}
],
"statistics": {
"count": 1,
"avg_width": 50.0,
"avg_height": 28.0,
"avg_aspect_ratio": 1.7857142857142858,
"width_range": [
50,
50
],
"height_range": [
28,
28
],
"aspect_ratio_range": [
1.7857142857142858,
1.7857142857142858
]
}
}

View File

@@ -0,0 +1,41 @@
{
"image_path": "projects/002/raw/vlcsnap-2025-07-29-12h11m55s915.png",
"image_shape": [
1080,
1920,
3
],
"annotations": [
{
"top_left": [
1401,
512
],
"bottom_right": [
1448,
542
],
"width": 47,
"height": 30,
"aspect_ratio": 1.5666666666666667
}
],
"statistics": {
"count": 1,
"avg_width": 47.0,
"avg_height": 30.0,
"avg_aspect_ratio": 1.5666666666666667,
"width_range": [
47,
47
],
"height_range": [
30,
30
],
"aspect_ratio_range": [
1.5666666666666667,
1.5666666666666667
]
}
}

View File

@@ -0,0 +1,41 @@
{
"image_path": "projects/002/raw/vlcsnap-2025-07-29-12h11m58s481.png",
"image_shape": [
1080,
1920,
3
],
"annotations": [
{
"top_left": [
1412,
515
],
"bottom_right": [
1457,
545
],
"width": 45,
"height": 30,
"aspect_ratio": 1.5
}
],
"statistics": {
"count": 1,
"avg_width": 45.0,
"avg_height": 30.0,
"avg_aspect_ratio": 1.5,
"width_range": [
45,
45
],
"height_range": [
30,
30
],
"aspect_ratio_range": [
1.5,
1.5
]
}
}

View File

@@ -0,0 +1,41 @@
{
"image_path": "projects/002/raw/vlcsnap-2025-07-29-12h12m00s797.png",
"image_shape": [
1080,
1920,
3
],
"annotations": [
{
"top_left": [
1417,
513
],
"bottom_right": [
1468,
550
],
"width": 51,
"height": 37,
"aspect_ratio": 1.3783783783783783
}
],
"statistics": {
"count": 1,
"avg_width": 51.0,
"avg_height": 37.0,
"avg_aspect_ratio": 1.3783783783783783,
"width_range": [
51,
51
],
"height_range": [
37,
37
],
"aspect_ratio_range": [
1.3783783783783783,
1.3783783783783783
]
}
}

View File

@@ -0,0 +1,41 @@
{
"image_path": "projects/002/raw/vlcsnap-2025-07-29-12h12m02s950.png",
"image_shape": [
1080,
1920,
3
],
"annotations": [
{
"top_left": [
1425,
523
],
"bottom_right": [
1472,
550
],
"width": 47,
"height": 27,
"aspect_ratio": 1.7407407407407407
}
],
"statistics": {
"count": 1,
"avg_width": 47.0,
"avg_height": 27.0,
"avg_aspect_ratio": 1.7407407407407407,
"width_range": [
47,
47
],
"height_range": [
27,
27
],
"aspect_ratio_range": [
1.7407407407407407,
1.7407407407407407
]
}
}

View File

@@ -0,0 +1,41 @@
{
"image_path": "projects/002/raw/vlcsnap-2025-07-29-12h12m05s236.png",
"image_shape": [
1080,
1920,
3
],
"annotations": [
{
"top_left": [
1433,
513
],
"bottom_right": [
1481,
556
],
"width": 48,
"height": 43,
"aspect_ratio": 1.1162790697674418
}
],
"statistics": {
"count": 1,
"avg_width": 48.0,
"avg_height": 43.0,
"avg_aspect_ratio": 1.1162790697674418,
"width_range": [
48,
48
],
"height_range": [
43,
43
],
"aspect_ratio_range": [
1.1162790697674418,
1.1162790697674418
]
}
}

View File

@@ -0,0 +1,41 @@
{
"image_path": "projects/002/raw/vlcsnap-2025-07-29-12h12m07s176.png",
"image_shape": [
1080,
1920,
3
],
"annotations": [
{
"top_left": [
1446,
529
],
"bottom_right": [
1489,
556
],
"width": 43,
"height": 27,
"aspect_ratio": 1.5925925925925926
}
],
"statistics": {
"count": 1,
"avg_width": 43.0,
"avg_height": 27.0,
"avg_aspect_ratio": 1.5925925925925926,
"width_range": [
43,
43
],
"height_range": [
27,
27
],
"aspect_ratio_range": [
1.5925925925925926,
1.5925925925925926
]
}
}

View File

@@ -0,0 +1,41 @@
{
"image_path": "projects/002/raw/vlcsnap-2025-07-29-12h12m09s720.png",
"image_shape": [
1080,
1920,
3
],
"annotations": [
{
"top_left": [
1448,
529
],
"bottom_right": [
1502,
561
],
"width": 54,
"height": 32,
"aspect_ratio": 1.6875
}
],
"statistics": {
"count": 1,
"avg_width": 54.0,
"avg_height": 32.0,
"avg_aspect_ratio": 1.6875,
"width_range": [
54,
54
],
"height_range": [
32,
32
],
"aspect_ratio_range": [
1.6875,
1.6875
]
}
}

View File

@@ -0,0 +1,41 @@
{
"image_path": "projects/002/raw/vlcsnap-2025-07-29-12h12m12s869.png",
"image_shape": [
1080,
1920,
3
],
"annotations": [
{
"top_left": [
1460,
526
],
"bottom_right": [
1512,
569
],
"width": 52,
"height": 43,
"aspect_ratio": 1.2093023255813953
}
],
"statistics": {
"count": 1,
"avg_width": 52.0,
"avg_height": 43.0,
"avg_aspect_ratio": 1.2093023255813953,
"width_range": [
52,
52
],
"height_range": [
43,
43
],
"aspect_ratio_range": [
1.2093023255813953,
1.2093023255813953
]
}
}

View File

@@ -0,0 +1,41 @@
{
"image_path": "projects/002/raw/vlcsnap-2025-07-29-12h12m18s762.png",
"image_shape": [
1080,
1920,
3
],
"annotations": [
{
"top_left": [
1465,
536
],
"bottom_right": [
1521,
569
],
"width": 56,
"height": 33,
"aspect_ratio": 1.696969696969697
}
],
"statistics": {
"count": 1,
"avg_width": 56.0,
"avg_height": 33.0,
"avg_aspect_ratio": 1.696969696969697,
"width_range": [
56,
56
],
"height_range": [
33,
33
],
"aspect_ratio_range": [
1.696969696969697,
1.696969696969697
]
}
}

View File

@@ -0,0 +1,41 @@
{
"image_path": "projects/002/raw/vlcsnap-2025-07-29-12h12m20s943.png",
"image_shape": [
1080,
1920,
3
],
"annotations": [
{
"top_left": [
1480,
540
],
"bottom_right": [
1528,
572
],
"width": 48,
"height": 32,
"aspect_ratio": 1.5
}
],
"statistics": {
"count": 1,
"avg_width": 48.0,
"avg_height": 32.0,
"avg_aspect_ratio": 1.5,
"width_range": [
48,
48
],
"height_range": [
32,
32
],
"aspect_ratio_range": [
1.5,
1.5
]
}
}

View File

@@ -0,0 +1,41 @@
{
"image_path": "projects/002/raw/vlcsnap-2025-07-29-12h12m23s929.png",
"image_shape": [
1080,
1920,
3
],
"annotations": [
{
"top_left": [
1489,
547
],
"bottom_right": [
1542,
577
],
"width": 53,
"height": 30,
"aspect_ratio": 1.7666666666666666
}
],
"statistics": {
"count": 1,
"avg_width": 53.0,
"avg_height": 30.0,
"avg_aspect_ratio": 1.7666666666666666,
"width_range": [
53,
53
],
"height_range": [
30,
30
],
"aspect_ratio_range": [
1.7666666666666666,
1.7666666666666666
]
}
}

View File

@@ -0,0 +1,41 @@
{
"image_path": "projects/002/raw/vlcsnap-2025-07-29-12h12m25s943.png",
"image_shape": [
1080,
1920,
3
],
"annotations": [
{
"top_left": [
1504,
550
],
"bottom_right": [
1550,
588
],
"width": 46,
"height": 38,
"aspect_ratio": 1.2105263157894737
}
],
"statistics": {
"count": 1,
"avg_width": 46.0,
"avg_height": 38.0,
"avg_aspect_ratio": 1.2105263157894737,
"width_range": [
46,
46
],
"height_range": [
38,
38
],
"aspect_ratio_range": [
1.2105263157894737,
1.2105263157894737
]
}
}

View File

@@ -0,0 +1,41 @@
{
"image_path": "projects/002/raw/vlcsnap-2025-07-29-12h12m28s281.png",
"image_shape": [
1080,
1920,
3
],
"annotations": [
{
"top_left": [
1510,
552
],
"bottom_right": [
1564,
587
],
"width": 54,
"height": 35,
"aspect_ratio": 1.542857142857143
}
],
"statistics": {
"count": 1,
"avg_width": 54.0,
"avg_height": 35.0,
"avg_aspect_ratio": 1.542857142857143,
"width_range": [
54,
54
],
"height_range": [
35,
35
],
"aspect_ratio_range": [
1.542857142857143,
1.542857142857143
]
}
}

View File

@@ -0,0 +1,41 @@
{
"image_path": "projects/002/raw/vlcsnap-2025-07-29-12h12m30s395.png",
"image_shape": [
1080,
1920,
3
],
"annotations": [
{
"top_left": [
1523,
560
],
"bottom_right": [
1569,
587
],
"width": 46,
"height": 27,
"aspect_ratio": 1.7037037037037037
}
],
"statistics": {
"count": 1,
"avg_width": 46.0,
"avg_height": 27.0,
"avg_aspect_ratio": 1.7037037037037037,
"width_range": [
46,
46
],
"height_range": [
27,
27
],
"aspect_ratio_range": [
1.7037037037037037,
1.7037037037037037
]
}
}

View File

@@ -0,0 +1,41 @@
{
"image_path": "projects/002/raw/vlcsnap-2025-07-29-12h12m33s377.png",
"image_shape": [
1080,
1920,
3
],
"annotations": [
{
"top_left": [
1537,
561
],
"bottom_right": [
1585,
595
],
"width": 48,
"height": 34,
"aspect_ratio": 1.411764705882353
}
],
"statistics": {
"count": 1,
"avg_width": 48.0,
"avg_height": 34.0,
"avg_aspect_ratio": 1.411764705882353,
"width_range": [
48,
48
],
"height_range": [
34,
34
],
"aspect_ratio_range": [
1.411764705882353,
1.411764705882353
]
}
}

View File

@@ -0,0 +1,41 @@
{
"image_path": "projects/002/raw/vlcsnap-2025-07-29-12h12m39s991.png",
"image_shape": [
1080,
1920,
3
],
"annotations": [
{
"top_left": [
1635,
590
],
"bottom_right": [
1691,
632
],
"width": 56,
"height": 42,
"aspect_ratio": 1.3333333333333333
}
],
"statistics": {
"count": 1,
"avg_width": 56.0,
"avg_height": 42.0,
"avg_aspect_ratio": 1.3333333333333333,
"width_range": [
56,
56
],
"height_range": [
42,
42
],
"aspect_ratio_range": [
1.3333333333333333,
1.3333333333333333
]
}
}

View File

@@ -0,0 +1,41 @@
{
"image_path": "projects/002/raw/vlcsnap-2025-07-29-12h12m43s634.png",
"image_shape": [
1080,
1920,
3
],
"annotations": [
{
"top_left": [
1654,
600
],
"bottom_right": [
1702,
640
],
"width": 48,
"height": 40,
"aspect_ratio": 1.2
}
],
"statistics": {
"count": 1,
"avg_width": 48.0,
"avg_height": 40.0,
"avg_aspect_ratio": 1.2,
"width_range": [
48,
48
],
"height_range": [
40,
40
],
"aspect_ratio_range": [
1.2,
1.2
]
}
}

View File

@@ -0,0 +1,41 @@
{
"image_path": "projects/002/raw/vlcsnap-2025-07-29-12h12m45s849.png",
"image_shape": [
1080,
1920,
3
],
"annotations": [
{
"top_left": [
1670,
604
],
"bottom_right": [
1718,
641
],
"width": 48,
"height": 37,
"aspect_ratio": 1.2972972972972974
}
],
"statistics": {
"count": 1,
"avg_width": 48.0,
"avg_height": 37.0,
"avg_aspect_ratio": 1.2972972972972974,
"width_range": [
48,
48
],
"height_range": [
37,
37
],
"aspect_ratio_range": [
1.2972972972972974,
1.2972972972972974
]
}
}

View File

@@ -0,0 +1,41 @@
{
"image_path": "projects/002/raw/vlcsnap-2025-07-29-12h12m47s773.png",
"image_shape": [
1080,
1920,
3
],
"annotations": [
{
"top_left": [
1684,
612
],
"bottom_right": [
1734,
649
],
"width": 50,
"height": 37,
"aspect_ratio": 1.3513513513513513
}
],
"statistics": {
"count": 1,
"avg_width": 50.0,
"avg_height": 37.0,
"avg_aspect_ratio": 1.3513513513513513,
"width_range": [
50,
50
],
"height_range": [
37,
37
],
"aspect_ratio_range": [
1.3513513513513513,
1.3513513513513513
]
}
}

View File

@@ -0,0 +1,41 @@
{
"image_path": "projects/002/raw/vlcsnap-2025-07-29-12h12m49s661.png",
"image_shape": [
1080,
1920,
3
],
"annotations": [
{
"top_left": [
1697,
617
],
"bottom_right": [
1750,
654
],
"width": 53,
"height": 37,
"aspect_ratio": 1.4324324324324325
}
],
"statistics": {
"count": 1,
"avg_width": 53.0,
"avg_height": 37.0,
"avg_aspect_ratio": 1.4324324324324325,
"width_range": [
53,
53
],
"height_range": [
37,
37
],
"aspect_ratio_range": [
1.4324324324324325,
1.4324324324324325
]
}
}

10
projects/002/project.json Normal file
View File

@@ -0,0 +1,10 @@
{
"project_id": "002",
"description": "Will's sideswipe",
"created": "2025-07-29T12:09:04.994531",
"status": "analyzed",
"image_count": 0,
"annotations_count": 0,
"analysis_count": 6,
"last_updated": "2025-07-30T13:43:10.745298"
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 MiB

View File

@@ -0,0 +1,722 @@
#!/usr/bin/env python3
"""
Super Resolution License Plate Analysis
Extracts annotated license plate regions, aligns multiple images of the same plate,
applies multi-frame super resolution, and performs enhanced OCR.
"""
import cv2
import numpy as np
import pytesseract
from PIL import Image, ImageEnhance
import os
import json
import re
import argparse
from pathlib import Path
from collections import defaultdict, Counter
from datetime import datetime
import math
class SuperResolutionAnalyzer:
def __init__(self, project_dir):
self.project_dir = Path(project_dir)
self.raw_dir = self.project_dir / 'raw'
self.annotations_dir = self.project_dir / 'annotations'
self.debug_dir = self.project_dir / 'debug'
self.output_dir = self.project_dir / 'output'
# Ensure directories exist
self.debug_dir.mkdir(exist_ok=True)
self.output_dir.mkdir(exist_ok=True)
# Standard license plate dimensions for normalization
# Using a 4:1 aspect ratio (typical for US plates) at high resolution
self.target_width = 400
self.target_height = 100
print(f"✓ Multi-Frame Super Resolution Analyzer initialized")
print(f" Target dimensions: {self.target_width}x{self.target_height}")
def load_annotations(self):
"""Load all annotation files from the project."""
annotation_files = list(self.annotations_dir.glob('*.json'))
if not annotation_files:
print("No annotation files found. Please annotate license plates first.")
return {}
annotations = {}
total_plates = 0
for file in annotation_files:
with open(file, 'r') as f:
data = json.load(f)
image_name = Path(data['image_path']).name
annotations[image_name] = data['annotations']
total_plates += len(data['annotations'])
print(f" Loaded {len(data['annotations'])} annotations from {file.name}")
print(f"✓ Loaded {total_plates} license plate annotations from {len(annotation_files)} images")
return annotations
def extract_and_normalize_plates(self, annotations):
"""Extract all license plate regions and normalize them to the same dimensions."""
normalized_plates = []
print(f"\nExtracting and normalizing license plate regions:")
for image_name, image_annotations in annotations.items():
image_path = self.raw_dir / image_name
if not image_path.exists():
print(f"⚠ Image not found: {image_path}")
continue
image = cv2.imread(str(image_path))
if image is None:
print(f"⚠ Could not load image: {image_path}")
continue
print(f"\n Processing {image_name}:")
for i, annotation in enumerate(image_annotations):
tl = annotation['top_left']
br = annotation['bottom_right']
# Extract region with padding
x1, y1 = tl
x2, y2 = br
# Add 5% padding
w, h = x2 - x1, y2 - y1
padding_x = int(w * 0.05)
padding_y = int(h * 0.05)
x1 = max(0, x1 - padding_x)
y1 = max(0, y1 - padding_y)
x2 = min(image.shape[1], x2 + padding_x)
y2 = min(image.shape[0], y2 + padding_y)
region = image[y1:y2, x1:x2]
if region.size > 0:
# Convert to grayscale
if len(region.shape) == 3:
gray_region = cv2.cvtColor(region, cv2.COLOR_BGR2GRAY)
else:
gray_region = region
# Normalize to target dimensions
normalized = self.normalize_plate_region(gray_region)
plate_info = {
'normalized_plate': normalized,
'original_region': gray_region,
'source_image': image_name,
'plate_id': f"{image_name.split('.')[0]}_plate_{i+1}",
'original_bbox': (x1, y1, x2, y2),
'original_size': (x2-x1, y2-y1)
}
normalized_plates.append(plate_info)
# Save original and normalized regions for debugging
cv2.imwrite(str(self.debug_dir / f"original_{plate_info['plate_id']}.jpg"), gray_region)
cv2.imwrite(str(self.debug_dir / f"normalized_{plate_info['plate_id']}.jpg"), normalized)
print(f" Plate {i+1}: {x2-x1}x{y2-y1}{self.target_width}x{self.target_height}")
print(f"\n✓ Extracted and normalized {len(normalized_plates)} license plate regions")
return normalized_plates
def normalize_plate_region(self, region):
"""Normalize a license plate region to target dimensions while preserving aspect ratio."""
h, w = region.shape[:2]
# Calculate scaling to fit within target dimensions
scale_w = self.target_width / w
scale_h = self.target_height / h
scale = min(scale_w, scale_h)
# Resize maintaining aspect ratio
new_w = int(w * scale)
new_h = int(h * scale)
resized = cv2.resize(region, (new_w, new_h), interpolation=cv2.INTER_CUBIC)
# Create normalized image with padding
normalized = np.zeros((self.target_height, self.target_width), dtype=np.uint8)
# Center the resized image
start_x = (self.target_width - new_w) // 2
start_y = (self.target_height - new_h) // 2
normalized[start_y:start_y+new_h, start_x:start_x+new_w] = resized
return normalized
def align_plates(self, plates):
"""Align multiple normalized license plate images using feature matching."""
if len(plates) < 2:
print(" ⚠ Need at least 2 plates for alignment")
return plates
print(f"\n Aligning {len(plates)} license plate images:")
# Use the first plate as reference
reference = plates[0]['normalized_plate']
aligned_plates = [plates[0]] # Reference doesn't need alignment
# Create SIFT detector for feature matching
sift = cv2.SIFT_create()
# Find keypoints and descriptors for reference
kp_ref, desc_ref = sift.detectAndCompute(reference, None)
if desc_ref is None:
print(" ⚠ No features found in reference image")
return plates
# Align each subsequent plate to the reference
for i, plate in enumerate(plates[1:], 1):
current = plate['normalized_plate']
# Find keypoints and descriptors
kp_curr, desc_curr = sift.detectAndCompute(current, None)
if desc_curr is None:
print(f" ⚠ No features found in plate {i+1}, using original")
aligned_plates.append(plate)
continue
# Match features
matcher = cv2.BFMatcher()
matches = matcher.knnMatch(desc_ref, desc_curr, k=2)
# Filter good matches using Lowe's ratio test
good_matches = []
for match_pair in matches:
if len(match_pair) == 2:
m, n = match_pair
if m.distance < 0.75 * n.distance:
good_matches.append(m)
if len(good_matches) < 10:
print(f" ⚠ Not enough good matches for plate {i+1} ({len(good_matches)}), using original")
aligned_plates.append(plate)
continue
# Extract matched points
src_pts = np.float32([kp_curr[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2)
dst_pts = np.float32([kp_ref[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2)
# Find homography
homography, mask = cv2.findHomography(src_pts, dst_pts,
cv2.RANSAC, 5.0)
if homography is not None:
# Warp the current plate to align with reference
aligned = cv2.warpPerspective(current, homography,
(self.target_width, self.target_height))
# Update plate info
aligned_plate = plate.copy()
aligned_plate['normalized_plate'] = aligned
aligned_plate['alignment_transform'] = homography
aligned_plates.append(aligned_plate)
# Save aligned image for debugging
cv2.imwrite(str(self.debug_dir / f"aligned_{plate['plate_id']}.jpg"), aligned)
print(f" ✓ Aligned plate {i+1} using {len(good_matches)} matches")
else:
print(f" ⚠ Failed to compute homography for plate {i+1}, using original")
aligned_plates.append(plate)
return aligned_plates
def apply_multi_frame_super_resolution(self, aligned_plates):
"""Apply multi-frame super resolution to aligned license plate images."""
if len(aligned_plates) < 2:
print(" ⚠ Need at least 2 aligned plates for multi-frame super resolution")
if len(aligned_plates) == 1:
return self.apply_single_frame_enhancement(aligned_plates[0]['normalized_plate'])
return None
print(f"\n Applying multi-frame super resolution to {len(aligned_plates)} aligned plates:")
# Extract normalized plates
plates = [plate['normalized_plate'] for plate in aligned_plates]
# Method 1: Simple averaging (baseline)
print(" Method 1: Multi-frame averaging")
averaged = self.multi_frame_average(plates)
cv2.imwrite(str(self.debug_dir / "super_res_averaged.jpg"), averaged)
# Method 2: Weighted averaging with quality assessment
print(" Method 2: Quality-weighted averaging")
weighted_avg = self.quality_weighted_average(plates)
cv2.imwrite(str(self.debug_dir / "super_res_weighted.jpg"), weighted_avg)
# Method 3: Median blending (reduces noise)
print(" Method 3: Median blending")
median_blend = self.median_blend(plates)
cv2.imwrite(str(self.debug_dir / "super_res_median.jpg"), median_blend)
# Method 4: Maximum clarity fusion
print(" Method 4: Maximum clarity fusion")
clarity_fusion = self.clarity_fusion(plates)
cv2.imwrite(str(self.debug_dir / "super_res_clarity.jpg"), clarity_fusion)
# Method 5: Enhanced multi-frame with edge preservation
print(" Method 5: Edge-preserving fusion")
edge_preserving = self.edge_preserving_fusion(plates)
cv2.imwrite(str(self.debug_dir / "super_res_edge_preserving.jpg"), edge_preserving)
super_resolution_results = {
'averaged': averaged,
'weighted_average': weighted_avg,
'median_blend': median_blend,
'clarity_fusion': clarity_fusion,
'edge_preserving': edge_preserving,
'source_plates': len(plates)
}
print(f" ✓ Generated 5 super resolution variants")
return super_resolution_results
def multi_frame_average(self, plates):
"""Simple averaging of multiple aligned frames."""
if not plates:
return None
# Convert to float for averaging
sum_image = np.zeros_like(plates[0], dtype=np.float64)
for plate in plates:
sum_image += plate.astype(np.float64)
averaged = (sum_image / len(plates)).astype(np.uint8)
return averaged
def quality_weighted_average(self, plates):
"""Weighted averaging based on image quality metrics."""
if not plates:
return None
# Calculate quality weights for each plate
weights = []
for plate in plates:
# Use variance as quality metric (higher variance = more detail)
variance = cv2.Laplacian(plate, cv2.CV_64F).var()
# Use gradient magnitude as sharpness metric
grad_x = cv2.Sobel(plate, cv2.CV_64F, 1, 0, ksize=3)
grad_y = cv2.Sobel(plate, cv2.CV_64F, 0, 1, ksize=3)
sharpness = np.mean(np.sqrt(grad_x**2 + grad_y**2))
# Combine metrics
quality = variance * 0.7 + sharpness * 0.3
weights.append(quality)
# Normalize weights
total_weight = sum(weights)
if total_weight == 0:
return self.multi_frame_average(plates)
weights = [w / total_weight for w in weights]
# Weighted average
weighted_sum = np.zeros_like(plates[0], dtype=np.float64)
for plate, weight in zip(plates, weights):
weighted_sum += plate.astype(np.float64) * weight
return weighted_sum.astype(np.uint8)
def median_blend(self, plates):
"""Median blending to reduce noise."""
if not plates:
return None
# Stack plates along new axis
stacked = np.stack(plates, axis=2)
# Take median along the frame axis
median = np.median(stacked, axis=2).astype(np.uint8)
return median
def clarity_fusion(self, plates):
"""Fuse images by selecting the clearest pixels from each frame."""
if not plates:
return None
h, w = plates[0].shape
result = np.zeros((h, w), dtype=np.uint8)
# Calculate local clarity for each plate
clarity_maps = []
for plate in plates:
# Use Laplacian variance as clarity metric
laplacian = cv2.Laplacian(plate, cv2.CV_64F)
clarity = cv2.GaussianBlur(laplacian**2, (5, 5), 0)
clarity_maps.append(clarity)
# For each pixel, select from the clearest frame
for y in range(h):
for x in range(w):
# Find frame with maximum clarity at this pixel
clarities = [clarity_map[y, x] for clarity_map in clarity_maps]
best_frame = np.argmax(clarities)
result[y, x] = plates[best_frame][y, x]
return result
def edge_preserving_fusion(self, plates):
"""Advanced fusion with edge preservation."""
if not plates:
return None
# Start with weighted average as base
base = self.quality_weighted_average(plates)
# Enhance edges by combining edge information from all frames
edge_enhanced = base.copy().astype(np.float64)
for plate in plates:
# Extract edges
edges = cv2.Canny(plate, 50, 150)
# Create edge enhancement
kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]])
sharpened = cv2.filter2D(plate, -1, kernel)
# Blend edges into result
edge_mask = edges.astype(np.float64) / 255.0
edge_enhanced = edge_enhanced * (1 - edge_mask * 0.3) + sharpened.astype(np.float64) * (edge_mask * 0.3)
# Apply final enhancement
enhanced = cv2.convertScaleAbs(edge_enhanced)
# CLAHE for final contrast enhancement
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
final = clahe.apply(enhanced)
return final
def apply_single_frame_enhancement(self, plate):
"""Apply enhancement to a single plate when multi-frame SR is not possible."""
print(" Applying single-frame enhancement")
# Upscale using cubic interpolation
upscaled = cv2.resize(plate, (self.target_width*2, self.target_height*2),
interpolation=cv2.INTER_CUBIC)
# Apply sharpening
kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]])
sharpened = cv2.filter2D(upscaled, -1, kernel)
# CLAHE enhancement
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
enhanced = clahe.apply(sharpened)
# Resize back to target dimensions
final = cv2.resize(enhanced, (self.target_width, self.target_height),
interpolation=cv2.INTER_CUBIC)
cv2.imwrite(str(self.debug_dir / "single_frame_enhanced.jpg"), final)
return {
'single_enhanced': final,
'source_plates': 1
}
def enhanced_ocr_analysis(self, super_res_results):
"""Perform OCR analysis on super resolution results."""
print(f"\n Performing OCR on super resolution results:")
all_results = []
if super_res_results is None:
print(" ⚠ No super resolution results to analyze")
return all_results
# Process each super resolution method
for method_name, sr_image in super_res_results.items():
if method_name == 'source_plates':
continue
print(f" Analyzing {method_name}...")
# Multiple preprocessing variants
variants = self.create_ocr_variants(sr_image)
# OCR configurations
ocr_configs = [
('psm8', '--psm 8 -c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'),
('psm7', '--psm 7 -c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'),
('psm6', '--psm 6 -c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'),
('psm13', '--psm 13'),
('lstm', '--oem 1 --psm 8 -c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'),
]
for variant_name, variant_image in variants:
for config_name, config in ocr_configs:
try:
# Convert to PIL
pil_image = Image.fromarray(variant_image)
# Apply PIL enhancements
enhancer = ImageEnhance.Contrast(pil_image)
pil_enhanced = enhancer.enhance(1.5)
# OCR
text = pytesseract.image_to_string(pil_enhanced, config=config).strip()
clean_text = re.sub(r'[^A-Z0-9]', '', text.upper())
if len(clean_text) >= 3:
confidence = self.score_super_res_result(clean_text, method_name,
super_res_results['source_plates'])
result = {
'text': clean_text,
'confidence': confidence,
'sr_method': method_name,
'preprocessing': variant_name,
'ocr_config': config_name,
'source_frames': super_res_results['source_plates'],
'raw_text': text
}
all_results.append(result)
except Exception as e:
continue
return all_results
def create_ocr_variants(self, image):
"""Create multiple preprocessing variants for OCR."""
variants = []
# Original
variants.append(('original', image))
# Thresholding
_, otsu = cv2.threshold(image, 0, 255, cv2.THRESH_BINARY | cv2.THRESH_OTSU)
variants.append(('otsu', otsu))
_, inv_otsu = cv2.threshold(image, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU)
variants.append(('inv_otsu', inv_otsu))
# Adaptive thresholding
if image.shape[0] > 10 and image.shape[1] > 10:
adaptive = cv2.adaptiveThreshold(image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 11, 2)
variants.append(('adaptive', adaptive))
# Morphological operations
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2))
dilated = cv2.dilate(image, kernel, iterations=1)
variants.append(('dilated', dilated))
return variants
def score_super_res_result(self, text, sr_method, num_frames):
"""Score OCR results from super resolution analysis."""
if not text or len(text) < 3:
return 0
# Base Maryland pattern score
base_score = self.score_maryland_likelihood(text)
# Super resolution method bonuses
sr_bonuses = {
'averaged': 10,
'weighted_average': 15,
'median_blend': 12,
'clarity_fusion': 18,
'edge_preserving': 20,
'single_enhanced': 5
}
sr_bonus = sr_bonuses.get(sr_method, 0)
# Multi-frame bonus (more frames = better potential quality)
frame_bonus = min(num_frames * 5, 25) # Cap at 25
# Quality bonuses
quality_bonus = 0
if len(text) >= 6:
quality_bonus += 10
if len(set(text)) >= 4:
quality_bonus += 10
total_score = base_score + sr_bonus + frame_bonus + quality_bonus
return max(0, total_score)
def score_maryland_likelihood(self, text):
"""Score Maryland license plate likelihood."""
if not text or len(text) < 2:
return 0
score = 0
# Length scoring
if len(text) == 7:
score += 40
elif len(text) == 6:
score += 35
elif len(text) == 5:
score += 20
else:
score += 10
# Character composition
has_letter = any(c.isalpha() for c in text)
has_number = any(c.isdigit() for c in text)
if has_letter and has_number:
score += 30
elif has_letter or has_number:
score += 15
# Maryland patterns
if len(text) == 7:
if text[:3].isalpha() and text[3:].isdigit():
score += 50
elif text[0].isdigit() and text[1:4].isalpha() and text[4:].isdigit():
score += 40
elif len(text) == 6:
if text[:3].isalpha() and text[3:].isdigit():
score += 40
return max(0, score)
def generate_report(self, all_results, aligned_plates):
"""Generate super resolution analysis report."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Process results
unique_candidates = {}
for result in all_results:
text = result['text']
if text not in unique_candidates or result['confidence'] > unique_candidates[text]['confidence']:
unique_candidates[text] = result
sorted_candidates = sorted(unique_candidates.values(), key=lambda x: x['confidence'], reverse=True)
# Generate report
report = {
'analysis_type': 'multi_frame_super_resolution',
'timestamp': timestamp,
'summary': {
'input_plates': len(aligned_plates),
'total_ocr_results': len(all_results),
'unique_candidates': len(unique_candidates),
'high_confidence': len([c for c in sorted_candidates if c['confidence'] >= 80])
},
'top_candidates': sorted_candidates[:20],
'methodology': {
'alignment': 'SIFT feature matching with homography',
'super_resolution_methods': ['averaged', 'weighted_average', 'median_blend',
'clarity_fusion', 'edge_preserving'],
'target_dimensions': f"{self.target_width}x{self.target_height}"
}
}
# Save report
json_file = self.output_dir / 'super_resolution_report.json'
with open(json_file, 'w') as f:
json.dump(report, f, indent=2)
# Human readable summary
summary_file = self.output_dir / 'super_resolution_summary.txt'
with open(summary_file, 'w') as f:
f.write("=== MULTI-FRAME SUPER RESOLUTION ANALYSIS ===\n")
f.write(f"Generated: {timestamp}\n\n")
f.write("SUMMARY:\n")
f.write(f" Input license plates: {len(aligned_plates)}\n")
f.write(f" Total OCR results: {len(all_results)}\n")
f.write(f" Unique candidates: {len(unique_candidates)}\n")
f.write(f" High confidence (80+): {len([c for c in sorted_candidates if c['confidence'] >= 80])}\n\n")
f.write("TOP CANDIDATES:\n")
f.write("Rank Candidate Confidence Method Frames\n")
f.write("---- --------- ---------- ---------------- ------\n")
for i, candidate in enumerate(sorted_candidates[:15], 1):
f.write(f"{i:3d}. {candidate['text']:9s} {candidate['confidence']:8.1f} {candidate['sr_method']:15s} {candidate['source_frames']:4d}\n")
print(f"\n✓ Super resolution analysis complete!")
print(f" Report: {json_file}")
print(f" Summary: {summary_file}")
return report
def analyze_project(self):
"""Run complete multi-frame super resolution analysis."""
print("=== MULTI-FRAME SUPER RESOLUTION ANALYSIS ===")
# Load annotations
annotations = self.load_annotations()
if not annotations:
return None
# Extract and normalize all license plate regions
normalized_plates = self.extract_and_normalize_plates(annotations)
if len(normalized_plates) < 1:
print("No license plate regions found.")
return None
# Align plates for multi-frame super resolution
aligned_plates = self.align_plates(normalized_plates)
# Apply multi-frame super resolution
super_res_results = self.apply_multi_frame_super_resolution(aligned_plates)
# Enhanced OCR analysis
all_results = self.enhanced_ocr_analysis(super_res_results)
# Generate report
report = self.generate_report(all_results, aligned_plates)
# Display results
unique_candidates = {}
for result in all_results:
text = result['text']
if text not in unique_candidates or result['confidence'] > unique_candidates[text]['confidence']:
unique_candidates[text] = result
sorted_candidates = sorted(unique_candidates.values(), key=lambda x: x['confidence'], reverse=True)
print(f"\nTop 10 multi-frame super resolution candidates:")
for i, candidate in enumerate(sorted_candidates[:10], 1):
print(f" {i:2d}. {candidate['text']:8s} (Conf: {candidate['confidence']:5.1f}) - {candidate['sr_method']}")
return report
def main():
parser = argparse.ArgumentParser(description='Multi-Frame Super Resolution License Plate Analysis')
parser.add_argument('--project-id', type=int, required=True, help='Project ID')
args = parser.parse_args()
project_dir = Path(f"projects/{args.project_id:03d}")
if not project_dir.exists():
print(f"Project {args.project_id:03d} does not exist.")
return
# Check if annotations exist
annotations_dir = project_dir / 'annotations'
if not annotations_dir.exists() or not list(annotations_dir.glob('*.json')):
print(f"No annotations found for project {args.project_id:03d}.")
print("Please run annotation first.")
return
analyzer = SuperResolutionAnalyzer(project_dir)
analyzer.analyze_project()
if __name__ == '__main__':
main()