feat: Implementar PWA, Analytics, Reportes PDF y mejoras OCR
FASE 1 - PWA y Frontend: - Crear templates/base.html, dashboard.html, analytics.html, executive.html - Crear static/css/main.css con diseño responsivo - Agregar static/js/app.js, pwa.js, camera.js, charts.js - Implementar manifest.json y service-worker.js para PWA - Soporte para captura de tickets desde cámara móvil FASE 2 - Analytics: - Crear módulo analytics/ con predictions.py, trends.py, comparisons.py - Implementar predicción básica con promedio móvil + tendencia lineal - Agregar endpoints /api/analytics/trends, predictions, comparisons - Integrar Chart.js para gráficas interactivas FASE 3 - Reportes PDF: - Crear módulo reports/ con pdf_generator.py - Implementar SalesReportPDF con generar_reporte_diario y ejecutivo - Agregar comando /reporte [diario|semanal|ejecutivo] - Agregar endpoints /api/reports/generate y /api/reports/download FASE 4 - Mejoras OCR: - Crear módulo ocr/ con processor.py, preprocessor.py, patterns.py - Implementar AmountDetector con patrones múltiples de montos - Agregar preprocesador adaptativo con pipelines para diferentes condiciones - Soporte para corrección de rotación (deskew) y threshold Otsu Dependencias agregadas: - reportlab, matplotlib (PDF) - scipy, pandas (analytics) - imutils, deskew, cachetools (OCR) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
305
sales-bot/ocr/preprocessor.py
Normal file
305
sales-bot/ocr/preprocessor.py
Normal file
@@ -0,0 +1,305 @@
|
||||
"""
|
||||
Image preprocessing for Sales Bot OCR
|
||||
Adaptive preprocessing pipelines for different image conditions
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
from typing import Tuple, Optional, List
|
||||
from io import BytesIO
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Try to import image processing libraries
|
||||
try:
|
||||
import cv2
|
||||
import numpy as np
|
||||
CV2_AVAILABLE = True
|
||||
except ImportError:
|
||||
CV2_AVAILABLE = False
|
||||
logger.warning("OpenCV not available. Image preprocessing will be limited.")
|
||||
|
||||
try:
|
||||
from PIL import Image, ImageEnhance, ImageFilter
|
||||
PIL_AVAILABLE = True
|
||||
except ImportError:
|
||||
PIL_AVAILABLE = False
|
||||
logger.warning("PIL not available. Image preprocessing will be limited.")
|
||||
|
||||
try:
|
||||
from deskew import determine_skew
|
||||
DESKEW_AVAILABLE = True
|
||||
except ImportError:
|
||||
DESKEW_AVAILABLE = False
|
||||
logger.warning("deskew library not available. Rotation correction disabled.")
|
||||
|
||||
try:
|
||||
import imutils
|
||||
IMUTILS_AVAILABLE = True
|
||||
except ImportError:
|
||||
IMUTILS_AVAILABLE = False
|
||||
logger.warning("imutils not available. Some rotations may not work.")
|
||||
|
||||
|
||||
class ImagePreprocessor:
|
||||
"""
|
||||
Preprocesses ticket images for better OCR accuracy.
|
||||
Supports multiple preprocessing pipelines for different image conditions.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.enable_deskew = os.getenv('OCR_ENABLE_DESKEW', 'true').lower() == 'true'
|
||||
self.max_rotation = float(os.getenv('OCR_MAX_ROTATION_ANGLE', '15'))
|
||||
self.use_adaptive = os.getenv('OCR_USE_ADAPTIVE_PIPELINE', 'true').lower() == 'true'
|
||||
|
||||
# Define preprocessing pipelines
|
||||
self.pipelines = {
|
||||
'standard': ['grayscale', 'contrast', 'otsu'],
|
||||
'low_contrast': ['grayscale', 'clahe', 'adaptive_threshold'],
|
||||
'noisy': ['grayscale', 'denoise', 'sharpen', 'otsu'],
|
||||
'rotated': ['deskew', 'grayscale', 'contrast', 'otsu'],
|
||||
'dark': ['grayscale', 'brighten', 'contrast', 'otsu'],
|
||||
'light': ['grayscale', 'darken', 'contrast', 'otsu'],
|
||||
}
|
||||
|
||||
def preprocess(self, image_bytes: bytes) -> bytes:
|
||||
"""
|
||||
Preprocess image bytes for OCR.
|
||||
|
||||
Args:
|
||||
image_bytes: Raw image bytes
|
||||
|
||||
Returns:
|
||||
Preprocessed image bytes
|
||||
"""
|
||||
if self.use_adaptive and CV2_AVAILABLE:
|
||||
return self.preprocess_adaptive(image_bytes)
|
||||
else:
|
||||
return self.preprocess_basic(image_bytes)
|
||||
|
||||
def preprocess_basic(self, image_bytes: bytes) -> bytes:
|
||||
"""
|
||||
Basic preprocessing using PIL only.
|
||||
"""
|
||||
if not PIL_AVAILABLE:
|
||||
return image_bytes
|
||||
|
||||
try:
|
||||
# Load image
|
||||
img = Image.open(BytesIO(image_bytes))
|
||||
|
||||
# Convert to grayscale
|
||||
img = img.convert('L')
|
||||
|
||||
# Enhance contrast
|
||||
enhancer = ImageEnhance.Contrast(img)
|
||||
img = enhancer.enhance(1.5)
|
||||
|
||||
# Sharpen
|
||||
img = img.filter(ImageFilter.SHARPEN)
|
||||
|
||||
# Save to bytes
|
||||
output = BytesIO()
|
||||
img.save(output, format='PNG')
|
||||
return output.getvalue()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in basic preprocessing: {e}")
|
||||
return image_bytes
|
||||
|
||||
def preprocess_adaptive(self, image_bytes: bytes) -> bytes:
|
||||
"""
|
||||
Adaptive preprocessing that tries multiple pipelines
|
||||
and returns the best result.
|
||||
"""
|
||||
if not CV2_AVAILABLE:
|
||||
return self.preprocess_basic(image_bytes)
|
||||
|
||||
try:
|
||||
# Decode image
|
||||
nparr = np.frombuffer(image_bytes, np.uint8)
|
||||
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
||||
|
||||
if image is None:
|
||||
logger.error("Could not decode image")
|
||||
return image_bytes
|
||||
|
||||
# Analyze image to determine best pipeline
|
||||
pipeline_name = self._determine_best_pipeline(image)
|
||||
logger.info(f"Using preprocessing pipeline: {pipeline_name}")
|
||||
|
||||
# Apply pipeline
|
||||
processed = self._apply_pipeline(image, pipeline_name)
|
||||
|
||||
# Encode back to bytes
|
||||
_, buffer = cv2.imencode('.png', processed)
|
||||
return buffer.tobytes()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in adaptive preprocessing: {e}")
|
||||
return self.preprocess_basic(image_bytes)
|
||||
|
||||
def _determine_best_pipeline(self, image: 'np.ndarray') -> str:
|
||||
"""
|
||||
Analyzes image to determine the best preprocessing pipeline.
|
||||
"""
|
||||
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
||||
|
||||
# Calculate image statistics
|
||||
mean_brightness = np.mean(gray)
|
||||
std_brightness = np.std(gray)
|
||||
|
||||
# Check for rotation if deskew is enabled
|
||||
if self.enable_deskew and DESKEW_AVAILABLE:
|
||||
try:
|
||||
angle = determine_skew(gray)
|
||||
if abs(angle) > 1.0 and abs(angle) <= self.max_rotation:
|
||||
return 'rotated'
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Determine based on brightness/contrast
|
||||
if mean_brightness < 80:
|
||||
return 'dark'
|
||||
elif mean_brightness > 180:
|
||||
return 'light'
|
||||
elif std_brightness < 40:
|
||||
return 'low_contrast'
|
||||
elif std_brightness > 80:
|
||||
return 'noisy'
|
||||
else:
|
||||
return 'standard'
|
||||
|
||||
def _apply_pipeline(self, image: 'np.ndarray', pipeline_name: str) -> 'np.ndarray':
|
||||
"""
|
||||
Applies a preprocessing pipeline to the image.
|
||||
"""
|
||||
pipeline = self.pipelines.get(pipeline_name, self.pipelines['standard'])
|
||||
result = image.copy()
|
||||
|
||||
for step in pipeline:
|
||||
try:
|
||||
result = getattr(self, f'_step_{step}')(result)
|
||||
except AttributeError:
|
||||
logger.warning(f"Unknown preprocessing step: {step}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Error in step {step}: {e}")
|
||||
|
||||
return result
|
||||
|
||||
# Pipeline steps
|
||||
|
||||
def _step_grayscale(self, image: 'np.ndarray') -> 'np.ndarray':
|
||||
"""Convert to grayscale."""
|
||||
if len(image.shape) == 3:
|
||||
return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
||||
return image
|
||||
|
||||
def _step_contrast(self, image: 'np.ndarray') -> 'np.ndarray':
|
||||
"""Enhance contrast using histogram equalization."""
|
||||
if len(image.shape) == 3:
|
||||
image = self._step_grayscale(image)
|
||||
return cv2.equalizeHist(image)
|
||||
|
||||
def _step_otsu(self, image: 'np.ndarray') -> 'np.ndarray':
|
||||
"""Apply Otsu's thresholding."""
|
||||
if len(image.shape) == 3:
|
||||
image = self._step_grayscale(image)
|
||||
_, binary = cv2.threshold(image, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
|
||||
return binary
|
||||
|
||||
def _step_adaptive_threshold(self, image: 'np.ndarray') -> 'np.ndarray':
|
||||
"""Apply adaptive thresholding."""
|
||||
if len(image.shape) == 3:
|
||||
image = self._step_grayscale(image)
|
||||
return cv2.adaptiveThreshold(
|
||||
image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
|
||||
cv2.THRESH_BINARY, 11, 2
|
||||
)
|
||||
|
||||
def _step_clahe(self, image: 'np.ndarray') -> 'np.ndarray':
|
||||
"""Apply CLAHE (Contrast Limited Adaptive Histogram Equalization)."""
|
||||
if len(image.shape) == 3:
|
||||
image = self._step_grayscale(image)
|
||||
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
|
||||
return clahe.apply(image)
|
||||
|
||||
def _step_denoise(self, image: 'np.ndarray') -> 'np.ndarray':
|
||||
"""Remove noise while preserving edges."""
|
||||
if len(image.shape) == 3:
|
||||
return cv2.fastNlMeansDenoisingColored(image, None, 10, 10, 7, 21)
|
||||
return cv2.fastNlMeansDenoising(image, None, 10, 7, 21)
|
||||
|
||||
def _step_sharpen(self, image: 'np.ndarray') -> 'np.ndarray':
|
||||
"""Sharpen the image."""
|
||||
kernel = np.array([[-1, -1, -1],
|
||||
[-1, 9, -1],
|
||||
[-1, -1, -1]])
|
||||
return cv2.filter2D(image, -1, kernel)
|
||||
|
||||
def _step_brighten(self, image: 'np.ndarray') -> 'np.ndarray':
|
||||
"""Increase image brightness."""
|
||||
return cv2.convertScaleAbs(image, alpha=1.2, beta=30)
|
||||
|
||||
def _step_darken(self, image: 'np.ndarray') -> 'np.ndarray':
|
||||
"""Decrease image brightness."""
|
||||
return cv2.convertScaleAbs(image, alpha=0.8, beta=-20)
|
||||
|
||||
def _step_deskew(self, image: 'np.ndarray') -> 'np.ndarray':
|
||||
"""Detect and correct image rotation."""
|
||||
if not DESKEW_AVAILABLE:
|
||||
return image
|
||||
|
||||
try:
|
||||
if len(image.shape) == 3:
|
||||
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
||||
else:
|
||||
gray = image
|
||||
|
||||
angle = determine_skew(gray)
|
||||
|
||||
if abs(angle) > self.max_rotation:
|
||||
logger.info(f"Rotation angle {angle} exceeds max {self.max_rotation}, skipping")
|
||||
return image
|
||||
|
||||
if abs(angle) < 0.5:
|
||||
return image # No significant rotation
|
||||
|
||||
logger.info(f"Correcting rotation: {angle} degrees")
|
||||
|
||||
if IMUTILS_AVAILABLE:
|
||||
import imutils
|
||||
return imutils.rotate_bound(image, -angle)
|
||||
else:
|
||||
# Manual rotation
|
||||
(h, w) = image.shape[:2]
|
||||
center = (w // 2, h // 2)
|
||||
M = cv2.getRotationMatrix2D(center, angle, 1.0)
|
||||
return cv2.warpAffine(image, M, (w, h),
|
||||
flags=cv2.INTER_CUBIC,
|
||||
borderMode=cv2.BORDER_REPLICATE)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in deskew: {e}")
|
||||
return image
|
||||
|
||||
|
||||
def preprocess_image(image_bytes: bytes) -> bytes:
|
||||
"""
|
||||
Convenience function to preprocess image bytes.
|
||||
|
||||
Args:
|
||||
image_bytes: Raw image bytes
|
||||
|
||||
Returns:
|
||||
Preprocessed image bytes
|
||||
"""
|
||||
preprocessor = ImagePreprocessor()
|
||||
return preprocessor.preprocess(image_bytes)
|
||||
|
||||
|
||||
def preprocess_for_ocr(image_bytes: bytes) -> bytes:
|
||||
"""
|
||||
Alias for preprocess_image.
|
||||
"""
|
||||
return preprocess_image(image_bytes)
|
||||
Reference in New Issue
Block a user