FASE 1 - PWA y Frontend: - Crear templates/base.html, dashboard.html, analytics.html, executive.html - Crear static/css/main.css con diseño responsivo - Agregar static/js/app.js, pwa.js, camera.js, charts.js - Implementar manifest.json y service-worker.js para PWA - Soporte para captura de tickets desde cámara móvil FASE 2 - Analytics: - Crear módulo analytics/ con predictions.py, trends.py, comparisons.py - Implementar predicción básica con promedio móvil + tendencia lineal - Agregar endpoints /api/analytics/trends, predictions, comparisons - Integrar Chart.js para gráficas interactivas FASE 3 - Reportes PDF: - Crear módulo reports/ con pdf_generator.py - Implementar SalesReportPDF con generar_reporte_diario y ejecutivo - Agregar comando /reporte [diario|semanal|ejecutivo] - Agregar endpoints /api/reports/generate y /api/reports/download FASE 4 - Mejoras OCR: - Crear módulo ocr/ con processor.py, preprocessor.py, patterns.py - Implementar AmountDetector con patrones múltiples de montos - Agregar preprocesador adaptativo con pipelines para diferentes condiciones - Soporte para corrección de rotación (deskew) y threshold Otsu Dependencias agregadas: - reportlab, matplotlib (PDF) - scipy, pandas (analytics) - imutils, deskew, cachetools (OCR) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
306 lines
9.9 KiB
Python
306 lines
9.9 KiB
Python
"""
|
|
Image preprocessing for Sales Bot OCR
|
|
Adaptive preprocessing pipelines for different image conditions
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
from typing import Tuple, Optional, List
|
|
from io import BytesIO
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Try to import image processing libraries
|
|
try:
|
|
import cv2
|
|
import numpy as np
|
|
CV2_AVAILABLE = True
|
|
except ImportError:
|
|
CV2_AVAILABLE = False
|
|
logger.warning("OpenCV not available. Image preprocessing will be limited.")
|
|
|
|
try:
|
|
from PIL import Image, ImageEnhance, ImageFilter
|
|
PIL_AVAILABLE = True
|
|
except ImportError:
|
|
PIL_AVAILABLE = False
|
|
logger.warning("PIL not available. Image preprocessing will be limited.")
|
|
|
|
try:
|
|
from deskew import determine_skew
|
|
DESKEW_AVAILABLE = True
|
|
except ImportError:
|
|
DESKEW_AVAILABLE = False
|
|
logger.warning("deskew library not available. Rotation correction disabled.")
|
|
|
|
try:
|
|
import imutils
|
|
IMUTILS_AVAILABLE = True
|
|
except ImportError:
|
|
IMUTILS_AVAILABLE = False
|
|
logger.warning("imutils not available. Some rotations may not work.")
|
|
|
|
|
|
class ImagePreprocessor:
|
|
"""
|
|
Preprocesses ticket images for better OCR accuracy.
|
|
Supports multiple preprocessing pipelines for different image conditions.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.enable_deskew = os.getenv('OCR_ENABLE_DESKEW', 'true').lower() == 'true'
|
|
self.max_rotation = float(os.getenv('OCR_MAX_ROTATION_ANGLE', '15'))
|
|
self.use_adaptive = os.getenv('OCR_USE_ADAPTIVE_PIPELINE', 'true').lower() == 'true'
|
|
|
|
# Define preprocessing pipelines
|
|
self.pipelines = {
|
|
'standard': ['grayscale', 'contrast', 'otsu'],
|
|
'low_contrast': ['grayscale', 'clahe', 'adaptive_threshold'],
|
|
'noisy': ['grayscale', 'denoise', 'sharpen', 'otsu'],
|
|
'rotated': ['deskew', 'grayscale', 'contrast', 'otsu'],
|
|
'dark': ['grayscale', 'brighten', 'contrast', 'otsu'],
|
|
'light': ['grayscale', 'darken', 'contrast', 'otsu'],
|
|
}
|
|
|
|
def preprocess(self, image_bytes: bytes) -> bytes:
|
|
"""
|
|
Preprocess image bytes for OCR.
|
|
|
|
Args:
|
|
image_bytes: Raw image bytes
|
|
|
|
Returns:
|
|
Preprocessed image bytes
|
|
"""
|
|
if self.use_adaptive and CV2_AVAILABLE:
|
|
return self.preprocess_adaptive(image_bytes)
|
|
else:
|
|
return self.preprocess_basic(image_bytes)
|
|
|
|
def preprocess_basic(self, image_bytes: bytes) -> bytes:
|
|
"""
|
|
Basic preprocessing using PIL only.
|
|
"""
|
|
if not PIL_AVAILABLE:
|
|
return image_bytes
|
|
|
|
try:
|
|
# Load image
|
|
img = Image.open(BytesIO(image_bytes))
|
|
|
|
# Convert to grayscale
|
|
img = img.convert('L')
|
|
|
|
# Enhance contrast
|
|
enhancer = ImageEnhance.Contrast(img)
|
|
img = enhancer.enhance(1.5)
|
|
|
|
# Sharpen
|
|
img = img.filter(ImageFilter.SHARPEN)
|
|
|
|
# Save to bytes
|
|
output = BytesIO()
|
|
img.save(output, format='PNG')
|
|
return output.getvalue()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in basic preprocessing: {e}")
|
|
return image_bytes
|
|
|
|
def preprocess_adaptive(self, image_bytes: bytes) -> bytes:
|
|
"""
|
|
Adaptive preprocessing that tries multiple pipelines
|
|
and returns the best result.
|
|
"""
|
|
if not CV2_AVAILABLE:
|
|
return self.preprocess_basic(image_bytes)
|
|
|
|
try:
|
|
# Decode image
|
|
nparr = np.frombuffer(image_bytes, np.uint8)
|
|
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
|
|
|
if image is None:
|
|
logger.error("Could not decode image")
|
|
return image_bytes
|
|
|
|
# Analyze image to determine best pipeline
|
|
pipeline_name = self._determine_best_pipeline(image)
|
|
logger.info(f"Using preprocessing pipeline: {pipeline_name}")
|
|
|
|
# Apply pipeline
|
|
processed = self._apply_pipeline(image, pipeline_name)
|
|
|
|
# Encode back to bytes
|
|
_, buffer = cv2.imencode('.png', processed)
|
|
return buffer.tobytes()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in adaptive preprocessing: {e}")
|
|
return self.preprocess_basic(image_bytes)
|
|
|
|
def _determine_best_pipeline(self, image: 'np.ndarray') -> str:
|
|
"""
|
|
Analyzes image to determine the best preprocessing pipeline.
|
|
"""
|
|
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
|
|
|
# Calculate image statistics
|
|
mean_brightness = np.mean(gray)
|
|
std_brightness = np.std(gray)
|
|
|
|
# Check for rotation if deskew is enabled
|
|
if self.enable_deskew and DESKEW_AVAILABLE:
|
|
try:
|
|
angle = determine_skew(gray)
|
|
if abs(angle) > 1.0 and abs(angle) <= self.max_rotation:
|
|
return 'rotated'
|
|
except Exception:
|
|
pass
|
|
|
|
# Determine based on brightness/contrast
|
|
if mean_brightness < 80:
|
|
return 'dark'
|
|
elif mean_brightness > 180:
|
|
return 'light'
|
|
elif std_brightness < 40:
|
|
return 'low_contrast'
|
|
elif std_brightness > 80:
|
|
return 'noisy'
|
|
else:
|
|
return 'standard'
|
|
|
|
def _apply_pipeline(self, image: 'np.ndarray', pipeline_name: str) -> 'np.ndarray':
|
|
"""
|
|
Applies a preprocessing pipeline to the image.
|
|
"""
|
|
pipeline = self.pipelines.get(pipeline_name, self.pipelines['standard'])
|
|
result = image.copy()
|
|
|
|
for step in pipeline:
|
|
try:
|
|
result = getattr(self, f'_step_{step}')(result)
|
|
except AttributeError:
|
|
logger.warning(f"Unknown preprocessing step: {step}")
|
|
except Exception as e:
|
|
logger.warning(f"Error in step {step}: {e}")
|
|
|
|
return result
|
|
|
|
# Pipeline steps
|
|
|
|
def _step_grayscale(self, image: 'np.ndarray') -> 'np.ndarray':
|
|
"""Convert to grayscale."""
|
|
if len(image.shape) == 3:
|
|
return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
|
return image
|
|
|
|
def _step_contrast(self, image: 'np.ndarray') -> 'np.ndarray':
|
|
"""Enhance contrast using histogram equalization."""
|
|
if len(image.shape) == 3:
|
|
image = self._step_grayscale(image)
|
|
return cv2.equalizeHist(image)
|
|
|
|
def _step_otsu(self, image: 'np.ndarray') -> 'np.ndarray':
|
|
"""Apply Otsu's thresholding."""
|
|
if len(image.shape) == 3:
|
|
image = self._step_grayscale(image)
|
|
_, binary = cv2.threshold(image, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
|
|
return binary
|
|
|
|
def _step_adaptive_threshold(self, image: 'np.ndarray') -> 'np.ndarray':
|
|
"""Apply adaptive thresholding."""
|
|
if len(image.shape) == 3:
|
|
image = self._step_grayscale(image)
|
|
return cv2.adaptiveThreshold(
|
|
image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
|
|
cv2.THRESH_BINARY, 11, 2
|
|
)
|
|
|
|
def _step_clahe(self, image: 'np.ndarray') -> 'np.ndarray':
|
|
"""Apply CLAHE (Contrast Limited Adaptive Histogram Equalization)."""
|
|
if len(image.shape) == 3:
|
|
image = self._step_grayscale(image)
|
|
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
|
|
return clahe.apply(image)
|
|
|
|
def _step_denoise(self, image: 'np.ndarray') -> 'np.ndarray':
|
|
"""Remove noise while preserving edges."""
|
|
if len(image.shape) == 3:
|
|
return cv2.fastNlMeansDenoisingColored(image, None, 10, 10, 7, 21)
|
|
return cv2.fastNlMeansDenoising(image, None, 10, 7, 21)
|
|
|
|
def _step_sharpen(self, image: 'np.ndarray') -> 'np.ndarray':
|
|
"""Sharpen the image."""
|
|
kernel = np.array([[-1, -1, -1],
|
|
[-1, 9, -1],
|
|
[-1, -1, -1]])
|
|
return cv2.filter2D(image, -1, kernel)
|
|
|
|
def _step_brighten(self, image: 'np.ndarray') -> 'np.ndarray':
|
|
"""Increase image brightness."""
|
|
return cv2.convertScaleAbs(image, alpha=1.2, beta=30)
|
|
|
|
def _step_darken(self, image: 'np.ndarray') -> 'np.ndarray':
|
|
"""Decrease image brightness."""
|
|
return cv2.convertScaleAbs(image, alpha=0.8, beta=-20)
|
|
|
|
def _step_deskew(self, image: 'np.ndarray') -> 'np.ndarray':
|
|
"""Detect and correct image rotation."""
|
|
if not DESKEW_AVAILABLE:
|
|
return image
|
|
|
|
try:
|
|
if len(image.shape) == 3:
|
|
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
|
else:
|
|
gray = image
|
|
|
|
angle = determine_skew(gray)
|
|
|
|
if abs(angle) > self.max_rotation:
|
|
logger.info(f"Rotation angle {angle} exceeds max {self.max_rotation}, skipping")
|
|
return image
|
|
|
|
if abs(angle) < 0.5:
|
|
return image # No significant rotation
|
|
|
|
logger.info(f"Correcting rotation: {angle} degrees")
|
|
|
|
if IMUTILS_AVAILABLE:
|
|
import imutils
|
|
return imutils.rotate_bound(image, -angle)
|
|
else:
|
|
# Manual rotation
|
|
(h, w) = image.shape[:2]
|
|
center = (w // 2, h // 2)
|
|
M = cv2.getRotationMatrix2D(center, angle, 1.0)
|
|
return cv2.warpAffine(image, M, (w, h),
|
|
flags=cv2.INTER_CUBIC,
|
|
borderMode=cv2.BORDER_REPLICATE)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in deskew: {e}")
|
|
return image
|
|
|
|
|
|
def preprocess_image(image_bytes: bytes) -> bytes:
|
|
"""
|
|
Convenience function to preprocess image bytes.
|
|
|
|
Args:
|
|
image_bytes: Raw image bytes
|
|
|
|
Returns:
|
|
Preprocessed image bytes
|
|
"""
|
|
preprocessor = ImagePreprocessor()
|
|
return preprocessor.preprocess(image_bytes)
|
|
|
|
|
|
def preprocess_for_ocr(image_bytes: bytes) -> bytes:
|
|
"""
|
|
Alias for preprocess_image.
|
|
"""
|
|
return preprocess_image(image_bytes)
|