292 lines
10 KiB
Python
292 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Project Manager for License Plate OCR
|
|
Handles project creation, organization, and analysis workflow.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import shutil
|
|
import argparse
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
class ProjectManager:
|
|
def __init__(self, base_dir="projects"):
|
|
self.base_dir = Path(base_dir)
|
|
self.base_dir.mkdir(exist_ok=True)
|
|
|
|
def get_next_project_id(self):
|
|
"""Get the next available project ID."""
|
|
existing_projects = []
|
|
for project_dir in self.base_dir.iterdir():
|
|
if project_dir.is_dir() and project_dir.name.isdigit():
|
|
existing_projects.append(int(project_dir.name))
|
|
|
|
return max(existing_projects, default=0) + 1
|
|
|
|
def create_project(self, description=""):
|
|
"""Create a new project with auto-incremented ID and folder structure."""
|
|
project_id = self.get_next_project_id()
|
|
project_dir = self.base_dir / f"{project_id:03d}"
|
|
|
|
# Create folder structure
|
|
folders = ['raw', 'annotations', 'debug', 'output']
|
|
for folder in folders:
|
|
(project_dir / folder).mkdir(parents=True, exist_ok=True)
|
|
|
|
# Create project metadata
|
|
metadata = {
|
|
'project_id': f"{project_id:03d}",
|
|
'description': description,
|
|
'created': datetime.now().isoformat(),
|
|
'status': 'created',
|
|
'image_count': 0,
|
|
'annotations_count': 0,
|
|
'analysis_count': 0
|
|
}
|
|
|
|
with open(project_dir / 'project.json', 'w') as f:
|
|
json.dump(metadata, f, indent=2)
|
|
|
|
print(f"✓ Created project {project_id:03d}")
|
|
print(f" Location: {project_dir}")
|
|
print(f" Folders: {', '.join(folders)}")
|
|
print(f"\nNext steps:")
|
|
print(f" 1. Copy your images to: {project_dir}/raw/")
|
|
print(f" 2. Run: python project_manager.py annotate {project_id:03d}")
|
|
print(f" 3. Run: python project_manager.py analyze {project_id:03d}")
|
|
|
|
return project_dir, project_id
|
|
|
|
def add_images(self, project_id, image_paths):
|
|
"""Add images to a project's raw folder."""
|
|
project_dir = self.base_dir / f"{project_id:03d}"
|
|
raw_dir = project_dir / 'raw'
|
|
|
|
if not project_dir.exists():
|
|
raise ValueError(f"Project {project_id:03d} does not exist")
|
|
|
|
added_images = []
|
|
for image_path in image_paths:
|
|
if os.path.exists(image_path):
|
|
filename = os.path.basename(image_path)
|
|
dest_path = raw_dir / filename
|
|
shutil.copy2(image_path, dest_path)
|
|
added_images.append(filename)
|
|
print(f" ✓ Added {filename}")
|
|
else:
|
|
print(f" ✗ Image not found: {image_path}")
|
|
|
|
# Update metadata
|
|
self._update_metadata(project_id, {'image_count': len(list(raw_dir.glob('*')))})
|
|
|
|
return added_images
|
|
|
|
def list_projects(self):
|
|
"""List all projects."""
|
|
projects = []
|
|
for project_dir in self.base_dir.iterdir():
|
|
if project_dir.is_dir() and (project_dir / 'project.json').exists():
|
|
with open(project_dir / 'project.json', 'r') as f:
|
|
metadata = json.load(f)
|
|
projects.append(metadata)
|
|
|
|
if not projects:
|
|
print("No projects found.")
|
|
return
|
|
|
|
print("Projects:")
|
|
for project in sorted(projects, key=lambda x: x['project_id']):
|
|
print(f" {project['project_id']}: {project['description']} ({project['status']})")
|
|
print(f" Images: {project['image_count']}, Annotations: {project['annotations_count']}")
|
|
|
|
def get_project_info(self, project_id):
|
|
"""Get detailed information about a project."""
|
|
project_dir = self.base_dir / f"{project_id:03d}"
|
|
metadata_file = project_dir / 'project.json'
|
|
|
|
if not metadata_file.exists():
|
|
raise ValueError(f"Project {project_id:03d} does not exist")
|
|
|
|
with open(metadata_file, 'r') as f:
|
|
metadata = json.load(f)
|
|
|
|
# Get file counts
|
|
raw_files = list((project_dir / 'raw').glob('*'))
|
|
annotation_files = list((project_dir / 'annotations').glob('*.json'))
|
|
output_files = list((project_dir / 'output').glob('*'))
|
|
debug_files = list((project_dir / 'debug').glob('*'))
|
|
|
|
print(f"Project {project_id:03d}: {metadata['description']}")
|
|
print(f" Created: {metadata['created']}")
|
|
print(f" Status: {metadata['status']}")
|
|
print(f" Raw images: {len(raw_files)}")
|
|
print(f" Annotations: {len(annotation_files)}")
|
|
print(f" Output files: {len(output_files)}")
|
|
print(f" Debug files: {len(debug_files)}")
|
|
|
|
if raw_files:
|
|
print(" Raw images:")
|
|
for img in raw_files:
|
|
print(f" - {img.name}")
|
|
|
|
return metadata
|
|
|
|
def run_annotate(self, project_id):
|
|
"""Run the annotation tool for a project."""
|
|
project_dir = self.base_dir / f"{project_id:03d}"
|
|
|
|
if not project_dir.exists():
|
|
print(f"Project {project_id:03d} does not exist.")
|
|
return False
|
|
|
|
raw_dir = project_dir / 'raw'
|
|
image_files = list(raw_dir.glob('*'))
|
|
image_files = [f for f in image_files if f.suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp']]
|
|
|
|
if not image_files:
|
|
print(f"No images found in {raw_dir}")
|
|
print("Please copy your images to the raw folder first.")
|
|
return False
|
|
|
|
print(f"Found {len(image_files)} images in project {project_id:03d}")
|
|
print("Starting annotation tool...")
|
|
|
|
import subprocess
|
|
import sys
|
|
|
|
try:
|
|
result = subprocess.run([sys.executable, 'annotate_project.py', '--project-id', str(project_id)],
|
|
check=True)
|
|
print("✓ Annotation completed")
|
|
return True
|
|
except subprocess.CalledProcessError:
|
|
print("✗ Annotation failed")
|
|
return False
|
|
|
|
def run_analysis(self, project_id):
|
|
"""Run comprehensive analysis (multi-image + super resolution)."""
|
|
project_dir = self.base_dir / f"{project_id:03d}"
|
|
|
|
if not project_dir.exists():
|
|
print(f"Project {project_id:03d} does not exist.")
|
|
return False
|
|
|
|
# Check for images
|
|
raw_dir = project_dir / 'raw'
|
|
image_files = list(raw_dir.glob('*'))
|
|
image_files = [f for f in image_files if f.suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp']]
|
|
|
|
if not image_files:
|
|
print(f"No images found in {raw_dir}")
|
|
print("Please copy your images to the raw folder first.")
|
|
return False
|
|
|
|
# Check for annotations
|
|
annotations_dir = project_dir / 'annotations'
|
|
annotation_files = list(annotations_dir.glob('*.json'))
|
|
|
|
if not annotation_files:
|
|
print(f"No annotations found for project {project_id:03d}")
|
|
print("Annotations are required for comprehensive analysis.")
|
|
response = input("Would you like to run annotation now? (y/n): ").lower().strip()
|
|
|
|
if response in ['y', 'yes']:
|
|
if not self.run_annotate(project_id):
|
|
return False
|
|
else:
|
|
print("Analysis cancelled. Please annotate first:")
|
|
print(f" python project_manager.py annotate {project_id:03d}")
|
|
return False
|
|
|
|
print(f"Running comprehensive analysis on project {project_id:03d}...")
|
|
|
|
import subprocess
|
|
import sys
|
|
|
|
try:
|
|
# Run integrated analysis
|
|
result = subprocess.run([sys.executable, 'integrated_analysis.py', '--project-id', str(project_id)],
|
|
check=True)
|
|
print("✓ Analysis completed")
|
|
|
|
# Update metadata
|
|
self._update_metadata(project_id, {
|
|
'status': 'analyzed',
|
|
'analysis_count': self._get_analysis_count(project_id)
|
|
})
|
|
|
|
return True
|
|
except subprocess.CalledProcessError:
|
|
print("✗ Analysis failed")
|
|
return False
|
|
|
|
def _get_analysis_count(self, project_id):
|
|
"""Count analysis files in output directory."""
|
|
project_dir = self.base_dir / f"{project_id:03d}"
|
|
output_dir = project_dir / 'output'
|
|
return len(list(output_dir.glob('*.json'))) + len(list(output_dir.glob('*.txt')))
|
|
|
|
def _update_metadata(self, project_id, updates):
|
|
"""Update project metadata."""
|
|
project_dir = self.base_dir / f"{project_id:03d}"
|
|
metadata_file = project_dir / 'project.json'
|
|
|
|
with open(metadata_file, 'r') as f:
|
|
metadata = json.load(f)
|
|
|
|
metadata.update(updates)
|
|
metadata['last_updated'] = datetime.now().isoformat()
|
|
|
|
with open(metadata_file, 'w') as f:
|
|
json.dump(metadata, f, indent=2)
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='License Plate OCR Project Manager')
|
|
subparsers = parser.add_subparsers(dest='command', help='Available commands')
|
|
|
|
# Create command
|
|
create_parser = subparsers.add_parser('create', help='Create a new project')
|
|
create_parser.add_argument('description', help='Project description')
|
|
|
|
# Annotate command
|
|
annotate_parser = subparsers.add_parser('annotate', help='Run annotation tool')
|
|
annotate_parser.add_argument('project_id', type=int, help='Project ID')
|
|
|
|
# Analyze command
|
|
analyze_parser = subparsers.add_parser('analyze', help='Run comprehensive analysis')
|
|
analyze_parser.add_argument('project_id', type=int, help='Project ID')
|
|
|
|
# List command
|
|
list_parser = subparsers.add_parser('list', help='List all projects')
|
|
|
|
# Info command
|
|
info_parser = subparsers.add_parser('info', help='Show project details')
|
|
info_parser.add_argument('project_id', type=int, help='Project ID')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.command:
|
|
parser.print_help()
|
|
return
|
|
|
|
pm = ProjectManager()
|
|
|
|
if args.command == 'create':
|
|
project_dir, project_id = pm.create_project(args.description)
|
|
|
|
elif args.command == 'annotate':
|
|
pm.run_annotate(args.project_id)
|
|
|
|
elif args.command == 'analyze':
|
|
pm.run_analysis(args.project_id)
|
|
|
|
elif args.command == 'list':
|
|
pm.list_projects()
|
|
|
|
elif args.command == 'info':
|
|
pm.get_project_info(args.project_id)
|
|
|
|
if __name__ == '__main__':
|
|
main() |