Files
sales-bot-stacks/sales-bot/ocr_processor.py
consultoria-as 5d9cbd4812 Commit inicial: Sales Bot - Sistema de Automatización de Ventas
- Stack completo con Mattermost, NocoDB y Sales Bot
- Procesamiento OCR de tickets con Tesseract
- Sistema de comisiones por tubos de tinte
- Comandos slash /metas y /ranking
- Documentación completa del proyecto

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-18 02:41:53 +00:00

525 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import re
from PIL import Image, ImageEnhance, ImageFilter, ImageOps
import pytesseract
import io
logger = logging.getLogger(__name__)
class OCRProcessor:
"""
OCR ultra-robusto para tickets tabulares
Maneja múltiples errores comunes del OCR
"""
def __init__(self):
"""Inicializa el procesador OCR"""
pytesseract.pytesseract.tesseract_cmd = '/usr/bin/tesseract'
self.configs = [
'--psm 6 -l eng',
'--psm 4 -l eng',
'--psm 3 -l eng',
]
def procesar_ticket(self, imagen_data):
"""Procesa una imagen de ticket y extrae información completa"""
try:
imagen_pil = Image.open(io.BytesIO(imagen_data))
mejor_resultado = None
max_productos = 0
for config in self.configs:
try:
imagen_procesada = self._preprocesar_imagen_para_tabla(imagen_pil.copy())
texto = pytesseract.image_to_string(imagen_procesada, config=config)
logger.info(f"Texto extraído con config '{config}':\n{texto}")
productos = self._extraer_productos_tabla(texto)
if len(productos) > max_productos:
max_productos = len(productos)
mejor_resultado = {
'texto_completo': texto,
'config_usada': config,
'productos': productos
}
except Exception as e:
logger.warning(f"Error con config '{config}': {str(e)}")
continue
if mejor_resultado is None:
imagen_procesada = self._preprocesar_imagen_para_tabla(imagen_pil)
texto = pytesseract.image_to_string(imagen_procesada, config=self.configs[0])
mejor_resultado = {
'texto_completo': texto,
'config_usada': self.configs[0],
'productos': []
}
texto = mejor_resultado['texto_completo']
mejor_resultado.update({
'monto_detectado': self._extraer_monto(texto),
'fecha_detectada': self._extraer_fecha(texto),
'subtotal': self._extraer_subtotal(texto),
'iva': self._extraer_iva(texto),
'tienda': self._extraer_tienda(texto),
'folio': self._extraer_folio(texto)
})
logger.info(f"Procesamiento completado: {len(mejor_resultado['productos'])} productos detectados")
return mejor_resultado
except Exception as e:
logger.error(f"Error procesando OCR: {str(e)}", exc_info=True)
return None
def _preprocesar_imagen_para_tabla(self, imagen):
"""Preprocesamiento optimizado para tickets tabulares"""
try:
if imagen.mode != 'L':
imagen = imagen.convert('L')
width, height = imagen.size
if width < 1500 or height < 1500:
scale = max(1500 / width, 1500 / height)
new_size = (int(width * scale), int(height * scale))
imagen = imagen.resize(new_size, Image.Resampling.LANCZOS)
enhancer = ImageEnhance.Contrast(imagen)
imagen = enhancer.enhance(3.0)
enhancer = ImageEnhance.Sharpness(imagen)
imagen = enhancer.enhance(2.5)
imagen = ImageOps.autocontrast(imagen, cutoff=1)
threshold = 140
imagen = imagen.point(lambda p: 255 if p > threshold else 0, mode='1')
imagen = imagen.convert('L')
return imagen
except Exception as e:
logger.error(f"Error en preprocesamiento: {str(e)}")
if imagen.mode != 'L':
imagen = imagen.convert('L')
return imagen
def _normalizar_precio(self, precio_str):
"""
Normaliza precios: "$50 00" → 50.00, "1,210 00" → 1210.00
"""
try:
precio_str = precio_str.replace('$', '').strip()
precio_str = precio_str.replace(',', '')
if ' ' in precio_str:
partes = precio_str.split()
if len(partes) == 2:
precio_str = partes[0] + '.' + partes[1]
return float(precio_str)
except:
return 0.0
def _extraer_productos_tabla(self, texto):
"""
Extrae productos con máxima robustez para errores de OCR
"""
productos = []
try:
lineas = texto.split('\n')
# Patrones ultra-robustos
patrones = [
# Patrón 0: NUEVO - Cuando los precios tienen punto decimal correcto
# "TINTE CROMATIQUE 6.7 4 $50.00 $200.00"
r'^(.+?)\s+(\d{1,3})\s+\$?\s*([\d,]+\.\d{2})\s+\$?\s*([\d,]+\.\d{2})\s*$',
# Patrón 1: Nombre Cantidad PU Importe (con espacios en precios)
# "TINTE CROMATIQUE 5.0 7 $50 00 $350 00"
r'^(.+?)\s+(\d{1,3})\s+\$?\s*([\d,]+\s\d{2})\s+\$?\s*([\d,]+\s\d{2})\s*$',
# Patrón 2: Nombre+Número Cantidad PU Importe
# "TINTE CROMATIQUE 67 4 $50 00 $200.00" (67 pegado)
r'^(.+?(?:\d{1,2}[\.\s]?\d{0,2}))\s+(\d{1,3})\s+\$?\s*([\d,]+[\s\.]?\d{0,2})\s+\$?\s*([\d,]+[\s\.]?\d{0,2})\s*$',
# Patrón 3: Errores OCR "IL |" o "IL l"
# "OXIDANTE 20 VOL IL | $120 00 $120 00"
r'^(.+?)\s+(?:IL|Il|il|lL)\s+(?:\||I|l|\d)\s+\$?\s*([\d,]+[\s\.]?\d{0,2})\s+\$?\s*([\d,]+[\s\.]?\d{0,2})\s*$',
]
palabras_ignorar = [
'producto', 'cantidad', 'p.u', 'importe', 'precio',
'total', 'subtotal', 'iva', 'fecha', 'folio', 'ticket',
'gracias', 'cambio', 'efectivo', 'tarjeta', 'rfc',
'direccion', 'tel', 'hora', 'vendedor', 'distribuidor'
]
for idx, linea in enumerate(lineas):
linea_original = linea
linea = linea.strip()
if len(linea) < 5:
continue
linea_lower = linea.lower()
if any(palabra in linea_lower for palabra in palabras_ignorar):
continue
if not any(char.isdigit() for char in linea):
continue
# Intentar extraer con cada patrón
for patron_idx, patron in enumerate(patrones):
match = re.search(patron, linea, re.IGNORECASE)
if match:
try:
grupos = match.groups()
producto = self._parsear_producto_robusto(grupos, patron_idx, linea_original)
if producto and self._validar_producto(producto):
# Evitar duplicados
if not any(p['linea_original'] == producto['linea_original'] for p in productos):
productos.append(producto)
logger.info(
f"✓ Producto {len(productos)} (patrón {patron_idx}): "
f"{producto['cantidad']}x {producto['marca']} {producto['producto']} "
f"@ ${producto['precio_unitario']} = ${producto['importe']}"
)
break
except Exception as e:
logger.debug(f"Error en línea '{linea}': {str(e)}")
continue
logger.info(f"Total productos detectados: {len(productos)}")
return productos
except Exception as e:
logger.error(f"Error extrayendo productos: {str(e)}", exc_info=True)
return []
def _parsear_producto_robusto(self, grupos, patron_idx, linea_original):
"""Parsea productos con máxima flexibilidad"""
try:
# Patrones 0, 1 y 2: formato normal con 4 campos
if patron_idx in [0, 1, 2] and len(grupos) >= 4:
nombre_completo = grupos[0].strip()
cantidad = int(grupos[1])
precio_unitario = self._normalizar_precio(grupos[2])
importe = self._normalizar_precio(grupos[3])
# Validar matemática (tolerancia amplia)
diferencia = abs((cantidad * precio_unitario) - importe)
if diferencia > 5.0:
logger.debug(f"Descartado por matemática: {cantidad} × ${precio_unitario} ≠ ${importe} (dif: ${diferencia})")
return None
# Patrón 3: errores OCR "IL |"
elif patron_idx == 3 and len(grupos) >= 3:
nombre_completo = grupos[0].strip()
cantidad = 1
precio_unitario = self._normalizar_precio(grupos[1])
importe = self._normalizar_precio(grupos[2])
# Para cantidad 1, los precios deben ser iguales
if abs(precio_unitario - importe) > 2.0:
return None
else:
return None
# Validaciones básicas
if not nombre_completo or len(nombre_completo) < 3:
return None
if cantidad <= 0 or cantidad > 999:
return None
if precio_unitario <= 0 or importe <= 0:
return None
if precio_unitario > 100000 or importe > 1000000:
return None
# Filtrar nombres que son solo números
if re.match(r'^[\d\s\$\.\,\-]+$', nombre_completo):
return None
# Separar marca y producto
marca, producto = self._separar_marca_producto(nombre_completo)
return {
'producto': producto,
'marca': marca,
'cantidad': cantidad,
'precio_unitario': round(precio_unitario, 2),
'importe': round(importe, 2),
'linea_original': linea_original.strip()
}
except Exception as e:
logger.debug(f"Error parseando: {str(e)}")
return None
def _validar_producto(self, producto):
"""Validación de producto"""
try:
if not all(key in producto for key in ['producto', 'marca', 'cantidad', 'importe']):
return False
if producto['cantidad'] <= 0 or producto['cantidad'] > 999:
return False
if producto['importe'] <= 0 or producto['importe'] > 1000000:
return False
if producto['precio_unitario'] <= 0 or producto['precio_unitario'] > 100000:
return False
nombre_limpio = producto['producto'].strip()
if len(nombre_limpio) < 2:
return False
if not any(c.isalpha() for c in nombre_limpio):
return False
return True
except Exception as e:
logger.debug(f"Error validando: {str(e)}")
return False
def _separar_marca_producto(self, nombre_completo):
"""Separa marca de producto"""
try:
nombre_completo = nombre_completo.strip()
# Marcas de productos de belleza
marcas_belleza = [
'tinte cromatique', 'cromatique', 'oxidante', 'decolorante',
'trat. capilar', 'trat capilar', 'tratamiento', 'prat. capilar',
'shampoo', 'acondicionador', 'mascarilla', 'ampolleta',
'serum', 'keratina', 'botox', 'alisado'
]
nombre_lower = nombre_completo.lower().strip()
# Buscar marcas conocidas
for marca in marcas_belleza:
if marca in nombre_lower:
inicio = nombre_lower.index(marca)
fin = inicio + len(marca)
marca_encontrada = nombre_completo[inicio:fin].strip()
producto = (nombre_completo[:inicio] + nombre_completo[fin:]).strip()
# Si el producto es solo un código numérico, usar nombre completo como producto
if not producto or len(producto) < 2:
return 'Genérico', nombre_completo
# Si el producto es solo números/punto (ej: "6.7"), usar nombre completo como producto
if re.match(r'^[\d\.]+$', producto):
return 'Genérico', nombre_completo
return marca_encontrada, producto
# Buscar patrones de código/número al final
patron_codigo = r'(.*?)\s+([\d\.]+)\s*$'
match = re.search(patron_codigo, nombre_completo)
if match:
# Para productos con código al final, usar todo como producto
# Ej: "TINTE CROMATIQUE 6.7" → marca: "Producto", producto: "TINTE CROMATIQUE 6.7"
return 'Producto', nombre_completo
# Heurística: primeras palabras como marca
palabras = nombre_completo.split()
if len(palabras) >= 3:
marca = ' '.join(palabras[:-1])
producto = palabras[-1]
# Si la última palabra es solo números, usar nombre completo
if re.match(r'^[\d\.]+$', producto):
return 'Producto', nombre_completo
return marca, producto
elif len(palabras) == 2:
return palabras[0], palabras[1]
else:
return 'Producto', nombre_completo
except Exception as e:
logger.error(f"Error separando marca/producto: {str(e)}")
return 'Producto', nombre_completo
def _extraer_tienda(self, texto):
"""Extrae nombre de tienda"""
try:
lineas = texto.split('\n')[:3]
for linea in lineas:
if re.search(r'S\.A\.|S\.A\. DE C\.V\.|DISTRIBUIDORA', linea, re.IGNORECASE):
tienda = linea.strip()
logger.info(f"Tienda: {tienda}")
return tienda
return None
except Exception as e:
logger.error(f"Error extrayendo tienda: {str(e)}")
return None
def _extraer_folio(self, texto):
"""Extrae folio"""
try:
patrones = [
r'ticket[:\s]*(\w+)',
r'folio[:\s]*(\w+)',
r'no\.\s*(\w+)',
]
for patron in patrones:
match = re.search(patron, texto, re.IGNORECASE)
if match:
folio = match.group(1)
logger.info(f"Folio: {folio}")
return folio
return None
except Exception as e:
logger.error(f"Error extrayendo folio: {str(e)}")
return None
def _extraer_monto(self, texto):
"""Extrae monto total"""
try:
patron_total = r'total\s*\$?\s*([\d,]+[\s\.]?\d{0,2})'
matches = re.finditer(patron_total, texto, re.IGNORECASE)
montos = []
for match in matches:
try:
monto = self._normalizar_precio(match.group(1))
if 1 <= monto <= 1000000:
montos.append(monto)
except:
continue
if montos:
monto_final = montos[-1]
logger.info(f"Total: ${monto_final:.2f}")
return monto_final
return None
except Exception as e:
logger.error(f"Error extrayendo monto: {str(e)}")
return None
def _extraer_subtotal(self, texto):
"""Extrae subtotal"""
try:
patron = r'subtotal\s*\$?\s*([\d,]+[\s\.]?\d{0,2})'
match = re.search(patron, texto, re.IGNORECASE)
if match:
subtotal = self._normalizar_precio(match.group(1))
logger.info(f"Subtotal: ${subtotal:.2f}")
return subtotal
return None
except Exception as e:
return None
def _extraer_iva(self, texto):
"""Extrae IVA"""
try:
patron = r'iva\s*\$?\s*([\d,]+[\s\.]?\d{0,2})'
match = re.search(patron, texto, re.IGNORECASE)
if match:
iva = self._normalizar_precio(match.group(1))
logger.info(f"IVA: ${iva:.2f}")
return iva
return None
except Exception as e:
return None
def _extraer_fecha(self, texto):
"""Extrae fecha"""
try:
patrones = [
r'fecha[:\s]*(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})',
r'(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})',
]
for patron in patrones:
match = re.search(patron, texto, re.IGNORECASE)
if match:
fecha = match.group(1)
logger.info(f"Fecha: {fecha}")
return fecha
return None
except Exception as e:
return None
def validar_monto_con_ocr(self, monto_usuario, monto_ocr, tolerancia=0.1):
"""Valida monto vs OCR"""
if monto_ocr is None:
return True, "No se pudo detectar monto en la imagen"
diferencia = abs(monto_usuario - monto_ocr)
porcentaje_diferencia = (diferencia / monto_ocr) * 100 if monto_ocr > 0 else 0
if porcentaje_diferencia <= (tolerancia * 100):
return True, f"✓ Monto verificado (OCR: ${monto_ocr:,.2f})"
else:
return False, f"⚠️ Discrepancia: Usuario ${monto_usuario:,.2f} vs Ticket ${monto_ocr:,.2f}"
def generar_reporte_deteccion(self, resultado_ocr):
"""Genera reporte legible"""
if not resultado_ocr:
return "No se pudo procesar el ticket"
reporte = []
reporte.append("=" * 50)
reporte.append("REPORTE DE DETECCIÓN OCR")
reporte.append("=" * 50)
if resultado_ocr.get('tienda'):
reporte.append(f"\n🏪 Tienda: {resultado_ocr['tienda']}")
if resultado_ocr.get('folio'):
reporte.append(f"📋 Folio: {resultado_ocr['folio']}")
if resultado_ocr.get('fecha_detectada'):
reporte.append(f"📅 Fecha: {resultado_ocr['fecha_detectada']}")
productos = resultado_ocr.get('productos', [])
if productos:
reporte.append(f"\n📦 PRODUCTOS DETECTADOS ({len(productos)}):")
reporte.append("-" * 50)
for i, prod in enumerate(productos, 1):
reporte.append(
f"{i}. {prod['cantidad']}x {prod['marca']} {prod['producto']}"
)
reporte.append(f" Precio unitario: ${prod['precio_unitario']:.2f}")
reporte.append(f" Importe: ${prod['importe']:.2f}")
reporte.append("")
if resultado_ocr.get('subtotal'):
reporte.append(f"Subtotal: ${resultado_ocr['subtotal']:.2f}")
if resultado_ocr.get('iva'):
reporte.append(f"IVA: ${resultado_ocr['iva']:.2f}")
if resultado_ocr.get('monto_detectado'):
reporte.append(f"\n💰 TOTAL: ${resultado_ocr['monto_detectado']:.2f}")
reporte.append("=" * 50)
return "\n".join(reporte)