# /home/Autopartes/pos/services/ai_chat.py """AI Chat service using OpenRouter for parts lookup assistance.""" import requests import json from config import OPENROUTER_API_KEY, HERMES_API_URL, HERMES_API_KEY from config import QWEN_API_URL, QWEN_API_KEY, QWEN_MODEL OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions" HERMES_ENABLED = bool(HERMES_API_KEY and HERMES_API_URL) HERMES_CHAT_URL = (HERMES_API_URL.rstrip('/') + '/chat/completions') if HERMES_API_URL else None QWEN_ENABLED = bool(QWEN_API_KEY and QWEN_API_URL) QWEN_CHAT_URL = (QWEN_API_URL.rstrip('/') + '/chat/completions') if QWEN_API_URL else None # ⚠️ SOLO MODELOS GRATUITOS — No cambiar a modelos de pago. # El modelo DEBE terminar en ":free" para garantizar costo $0. MODEL = "qwen/qwen3.6-plus:free" # Fallback chain: si el modelo principal tiene rate limit (429) o 404 # (deprecated), intenta los siguientes. Todos :free. Mezclamos proveedores # distintos porque los rate limits aplican por-proveedor. # Lista actualizada 2026-04-09 después de que qwen3.6-plus fue deprecated. FALLBACK_MODELS = [ "openai/gpt-oss-120b:free", # OpenInference — gran cobertura "google/gemma-4-31b-it:free", # Google — nuevo, 262K ctx "qwen/qwen3-next-80b-a3b-instruct:free", # Alibaba — 262K ctx "z-ai/glm-4.5-air:free", # Z.AI "google/gemma-3-27b-it:free", # Google — backup vision "meta-llama/llama-3.3-70b-instruct:free", # Meta — último fallback ] # Hermes Agent model (OpenAI-compatible API server) HERMES_MODEL = "hermes-agent" def _validate_model(model_id): """Ensure only free models are used. Raises if model is not free. Skips validation for Hermes Agent and QWEN models (self-hosted / private API). """ if model_id == HERMES_MODEL: return if model_id == QWEN_MODEL: return if not model_id.endswith(':free'): raise ValueError(f"BLOQUEADO: Solo se permiten modelos gratuitos (:free). Modelo '{model_id}' no es gratuito.") def _post_chat_completion(url, api_key, model_id, messages, max_tokens=800, temperature=0.3, timeout=25): """Generic OpenAI-compatible chat completion POST. Returns the parsed response dict on success, None on failure. """ try: resp = requests.post( url, headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", }, json={ "model": model_id, "messages": messages, "max_tokens": max_tokens, "temperature": temperature, }, timeout=timeout, ) if resp.status_code == 429: print(f"[AI] Rate limited on {model_id} ({url})") return None if resp.status_code >= 400: print(f"[AI] HTTP {resp.status_code} on {model_id} ({url}): {resp.text[:200]}") return None data = resp.json() choice = data.get("choices", [{}])[0] content = choice.get("message", {}).get("content") or "" content = content.strip() finish = choice.get("finish_reason", "") if not content: print(f"[AI] Empty response from {model_id} (finish={finish})") return None return {"content": content, "finish_reason": finish, "model": model_id} except Exception as e: print(f"[AI] Error with {model_id} ({url}): {e}") return None SYSTEM_PROMPT_SHORT = """Eres un asistente de refaccionaria automotriz mexicana. Ayuda a encontrar autopartes. Responde SIEMPRE en formato JSON: {"message":"...","search_query":"...","vehicle":{"brand":"...","model":"...","year":...}} search_query va EN INGLES cuando el usuario pide una parte. Traducciones: Balatas=Brake Pad, Disco de freno=Brake Disc, Amortiguador=Shock Absorber, Filtro de aceite=Oil Filter, Filtro de aire=Air Filter, Bujias=Spark Plug, Banda=V-Belt, Bomba de agua=Water Pump, Alternador=Alternator, Radiador=Radiator, Sensor de oxigeno=Oxygen Sensor, Terminal de direccion=Tie Rod End, Bomba de gasolina=Fuel Pump, Clutch=Clutch Kit, Mofle=Exhaust, Inyector=Injector. No preguntes mas si ya puedes buscar. Si el usuario describe un sintoma, diagnostica y sugiere partes. Cuando pida cotizacion o multiples partes, search_query DEBE usar | para separar cada parte: "Brake Pad|Air Filter|Oil Filter|Spark Plug". """ SYSTEM_PROMPT = """Eres un asistente de refaccionaria automotriz mexicana. Tu trabajo es ayudar a encontrar autopartes. IMPORTANTE: Responde SIEMPRE en formato JSON valido con esta estructura: { "message": "Tu respuesta al usuario en español", "search_query": "termino de busqueda EN INGLES para el catalogo", "vehicle": {"brand": "TOYOTA", "model": "Corolla", "year": 2020} } Reglas OBLIGATORIAS: 1. "search_query" SIEMPRE debe tener un valor cuando el usuario menciona una parte. NUNCA dejes null si el usuario pide algo. 2. "search_query" debe estar EN INGLES porque el catalogo TecDoc tiene nombres en ingles. Traducciones comunes: - Balatas/Pastillas de freno = "Brake Pad" - Discos de freno = "Brake Disc" - Amortiguador = "Shock Absorber" - Filtro de aceite = "Oil Filter" - Filtro de aire = "Air Filter" - Bujias = "Spark Plug" - Banda serpentina = "V-Belt" o "Serpentine Belt" - Bomba de agua = "Water Pump" - Alternador = "Alternator" - Radiador = "Radiator" - Sensor de oxigeno = "Oxygen Sensor" - Terminal de direccion = "Tie Rod End" - Bomba de gasolina = "Fuel Pump" - Clutch/Embrague = "Clutch Kit" - Mofle/Escape = "Exhaust" - Inyector = "Injector" 3. "vehicle" extrae marca, modelo y ano. La marca en MAYUSCULAS. 4. Nombres mexicanos: Tsuru = TSURU, Aveo = AVEO, Jetta = JETTA, Pointer = POINTER, Chevy = CORSA, Vocho = BEETLE. 5. No preguntes mas info si ya puedes buscar. Si el usuario dice "balatas para Tsuru 2015", busca directo. 6. "message" es breve y directo: "Buscando balatas para Nissan Tsuru 2015..." Cuando el usuario describe un SINTOMA del vehiculo (no una parte especifica), diagnostica el problema y sugiere las partes que podrian necesitar reemplazo. Ejemplos de sintomas: - "el carro vibra al frenar" → Discos de freno y/o balatas desgastadas. search_query: "Brake Disc" - "se calienta el motor" → Termostato, bomba de agua, radiador. search_query: "Thermostat" - "hace ruido al dar vuelta" → Juntas homocineticas. search_query: "CV Joint" - "no arranca" → Bateria, alternador, motor de arranque. search_query: "Starter Motor" - "gasta mucha gasolina" → Filtro de aire, bujias, inyectores. search_query: "Air Filter" - "huele a gasolina" → Inyectores, bomba de gasolina, mangueras. search_query: "Fuel Pump" - "se jala a un lado" → Terminales de direccion, rotulas, alineacion. search_query: "Tie Rod End" - "hace ruido al arrancar" → Banda serpentina, tensor, marcha. search_query: "Serpentine Belt" - "pierde aceite" → Junta de tapa de valvulas, empaques. search_query: "Gasket" - "el aire no enfria" → Compresor de AC, gas refrigerante. search_query: "A/C Compressor" Si detectas un sintoma, responde con: 1. Diagnostico probable 2. Lista de partes que podrian necesitar reemplazo (en orden de probabilidad) 3. search_query con la parte mas probable Cuando el usuario pida una COTIZACION o diga "cotizame", "cuanto cuesta", "precio de": 1. Identifica TODAS las partes necesarias para el trabajo completo 2. Devuelve multiples search_queries separadas por | Ejemplo: "cotizame frenos completos para Corolla 2020" search_query: "Brake Pad|Brake Disc|Brake Fluid|Brake Hose" Ejemplo: "servicio completo para Tsuru 2015" search_query: "Oil Filter|Air Filter|Spark Plug|Coolant|Brake Fluid" Ejemplo: "kit de distribucion para Jetta 2018" search_query: "Timing Belt|Tensioner|Idler Pulley|Water Pump" Detecta el idioma del usuario y responde en el mismo idioma. Si escribe en ingles, responde en ingles. Si escribe en espanol, responde en espanol. El search_query SIEMPRE debe ser en ingles (el catalogo TecDoc esta en ingles). """ def get_inventory_context(tenant_conn, branch_id=None): """Build a summary string of the tenant's inventory for AI context. Returns a string like: Este negocio tiene 1234 productos en inventario. Categorias: BOSCH (45), MONROE (32), ACDelco (28), ... Productos con stock bajo (<=3): 15 """ cur = tenant_conn.cursor() try: # Total items where = "i.is_active = true" params = [] if branch_id: where += " AND i.branch_id = %s" params.append(branch_id) cur.execute(f"SELECT COUNT(*) FROM inventory i WHERE {where}", params) total = cur.fetchone()[0] or 0 if total == 0: return "CONTEXTO DEL INVENTARIO:\nEste negocio aun no tiene productos en inventario." # Top brands with counts cur.execute(f""" SELECT i.brand, COUNT(*) as cnt FROM inventory i WHERE {where} AND i.brand IS NOT NULL AND i.brand != '' GROUP BY i.brand ORDER BY cnt DESC LIMIT 15 """, params) brands = cur.fetchall() brand_list = ", ".join(f"{row[0]} ({row[1]})" for row in brands if row[0]) # Products with low stock (<=3) cur.execute(f""" SELECT COUNT(*) FROM inventory i WHERE {where} AND COALESCE((SELECT stock FROM inventory_stock_summary WHERE inventory_id = i.id), 0) <= 3 """, params) low_stock = cur.fetchone()[0] or 0 lines = [ "CONTEXTO DEL INVENTARIO:", f"Este negocio tiene {total} productos en inventario.", ] if brand_list: lines.append(f"Marcas disponibles: {brand_list}") lines.append(f"Productos con stock bajo (<=3 unidades): {low_stock}") lines.append("IMPORTANTE: Cuando busques partes, SIEMPRE prioriza lo que el negocio tiene en inventario local.") return "\n".join(lines) except Exception: return "" finally: cur.close() VISION_MODEL = "google/gemma-3-27b-it:free" HERMES_VISION_MODEL = "hermes-agent" VISION_SYSTEM_PROMPT = """Eres un experto en identificación de autopartes. El usuario te envía una foto de una parte automotriz. Tu trabajo es: 1. Identificar que parte es (nombre en español e inglés) 2. Describir características visibles (material, desgaste, marca si es visible) 3. Sugerir términos de búsqueda para encontrarla en un catálogo IMPORTANTE: Responde SIEMPRE en formato JSON válido con esta estructura: { "message": "Descripción de la parte identificada en español", "search_query": "término de búsqueda EN INGLÉS para el catálogo", "vehicle": null } Ejemplos de partes comunes: - Pastillas/balatas de freno = "Brake Pad" - Disco de freno = "Brake Disc" - Filtro de aceite = "Oil Filter" - Bujía = "Spark Plug" - Amortiguador = "Shock Absorber" - Bomba de agua = "Water Pump" - Sensor de oxígeno = "Oxygen Sensor" """ def chat_with_image(user_message, image_base64, conversation_history=None, inventory_context=None): """Send a message with an image to a vision-capable AI model. Args: user_message: The user's chat message. image_base64: Base64-encoded image (with or without data URL prefix). conversation_history: Previous messages in the conversation. inventory_context: Optional inventory summary string. """ _validate_model(VISION_MODEL) system_content = VISION_SYSTEM_PROMPT if inventory_context: system_content = VISION_SYSTEM_PROMPT + "\n\n" + inventory_context # Ensure proper data URL format if image_base64 and not image_base64.startswith('data:'): image_base64 = 'data:image/jpeg;base64,' + image_base64 messages = [{"role": "system", "content": system_content}] if conversation_history: # Only add text-only history messages for h in conversation_history: if isinstance(h.get('content'), str): messages.append(h) # Build multimodal user message user_content = [ {"type": "image_url", "image_url": {"url": image_base64}}, {"type": "text", "text": user_message or "Identifica esta parte automotriz y sugiere términos de búsqueda."} ] messages.append({"role": "user", "content": user_content}) # Try Hermes first for vision (if enabled), fallback to OpenRouter backends = [] if HERMES_ENABLED: backends.append((HERMES_CHAT_URL, HERMES_API_KEY, HERMES_VISION_MODEL)) if OPENROUTER_API_KEY: backends.append((OPENROUTER_URL, OPENROUTER_API_KEY, VISION_MODEL)) last_error = None for url, key, model_id in backends: _validate_model(model_id) result = _post_chat_completion(url, key, model_id, messages, max_tokens=500, temperature=0.3, timeout=30) if result is None: last_error = "api_error" continue content = result["content"] try: stripped = content.strip() if stripped.startswith("```"): lines = stripped.split("\n") json_str = "\n".join(lines[1:-1]) parsed = json.loads(json_str) return parsed else: parsed = json.loads(stripped) return parsed except (json.JSONDecodeError, IndexError): return {"message": content, "search_query": None, "vehicle": None} if last_error == "api_error": return {"message": "El asistente esta ocupado. Intenta de nuevo en unos segundos.", "search_query": None, "vehicle": None} return { "message": f"Error al analizar imagen: {last_error}", "search_query": None, "vehicle": None, } def classify_part(part_number): """Ask AI to identify a part by its OEM number.""" _validate_model(MODEL) prompt = ( f"Given auto part number '{part_number}', identify:\n" f"1) What part it is (name in Spanish)\n" f"2) Which brand makes it\n" f"3) What vehicle it fits\n" f"4) What category it belongs to (e.g. Frenos, Motor, Suspensión, Eléctrico, Filtros, Transmisión)\n" f"Respond ONLY in valid JSON: {{\"name\": \"...\", \"brand\": \"...\", \"vehicle\": \"...\", \"category\": \"...\"}}" ) messages = [ {"role": "system", "content": "Eres un experto en autopartes. Responde SOLO en JSON válido, sin texto adicional."}, {"role": "user", "content": prompt} ] # Try Hermes first (if enabled), fallback to OpenRouter backends = [] if HERMES_ENABLED: backends.append((HERMES_CHAT_URL, HERMES_API_KEY, HERMES_MODEL)) if OPENROUTER_API_KEY: backends.append((OPENROUTER_URL, OPENROUTER_API_KEY, MODEL)) for url, key, model_id in backends: _validate_model(model_id) result = _post_chat_completion(url, key, model_id, messages, max_tokens=300, temperature=0.2, timeout=15) if result is None: continue content = result["content"] try: stripped = content.strip() if stripped.startswith("```"): lines = stripped.split("\n") json_str = "\n".join(lines[1:-1]) parsed = json.loads(json_str) return parsed else: parsed = json.loads(stripped) return parsed except Exception: continue return {"name": None, "brand": None, "vehicle": None, "category": None} # ═══════════════════════════════════════════════════════════════════════════ # RESPONSE CACHE — reduces OpenRouter calls for repeated questions # ═══════════════════════════════════════════════════════════════════════════ # Keyed by a normalized form of the user message. TTL 1 hour. Bypasses # caching for messages containing VINs or specific part numbers (where the # answer depends on the exact string). import hashlib as _hashlib import re as _re import time as _time_chat _RESPONSE_CACHE = {} # key → (expires_at, response_dict) _CACHE_TTL_SECONDS = 3600 # 1 hour _CACHE_MAX_SIZE = 1000 _CACHE_HITS = 0 _CACHE_MISSES = 0 # Stopwords that add noise but no meaning — stripped from cache keys. _CACHE_STOPWORDS = { 'necesito', 'necesitas', 'me', 'das', 'dame', 'tienes', 'tiene', 'hay', 'quiero', 'quisiera', 'puedes', 'puede', 'favor', 'por', 'porfavor', 'hola', 'buenos', 'dias', 'tardes', 'noches', 'holaa', 'i', 'need', 'want', 'do', 'you', 'have', 'please', } # Patterns that disable caching — if the message contains any of these, we # never cache the response because the answer is specific to that exact input. # Rules designed to minimize false positives against normal Spanish queries # like "necesito balatas para corolla 2018". _CACHE_BYPASS_PATTERNS = [ # 17-char VIN (strict, no spaces, alphanumeric except I/O/Q) _re.compile(r'\b[A-HJ-NPR-Z0-9]{17}\b'), # Long numeric (12+ digits — too long to be a year/model code) _re.compile(r'\b\d{12,}\b'), # Mexican license plate: 3 letters + 3-4 digits _re.compile(r'\b[A-Z]{3}[-\s]?\d{3,4}\b'), # OEM with REQUIRED dash/slash separator(s), letters+digits on both sides, # and a total length that makes it unlikely to be a brand+year collision. # Example matches: "4G0-857-951-A", "0 986 4B7 013" (after normalizing). _re.compile(r'\b[A-Z0-9]{2,}[-/][A-Z0-9]{2,}([-/][A-Z0-9]+)+\b'), ] def _should_bypass_cache(message: str) -> bool: """True if the message has VIN / part number / plate — don't cache.""" if not message: return True upper = message.upper() for pat in _CACHE_BYPASS_PATTERNS: if pat.search(upper): return True return False def _normalize_for_cache(message: str) -> str: """Lowercase, strip punctuation, collapse whitespace, drop stopwords.""" if not message: return '' s = message.lower().strip() s = _re.sub(r'[¿?¡!.,;:()\[\]{}\'"]+', ' ', s) s = _re.sub(r'\s+', ' ', s).strip() tokens = [t for t in s.split() if t and t not in _CACHE_STOPWORDS] return ' '.join(tokens) def _cache_key(user_message: str, inventory_context: str | None) -> str | None: """Build a stable cache key for (message, inventory_context). Returns None if the message should bypass the cache. """ if _should_bypass_cache(user_message): return None normalized = _normalize_for_cache(user_message) if not normalized: return None # Hash the inventory context so same-tenant-same-question cache hits, # different-tenant-same-question does NOT (inventory context differs). ctx_hash = _hashlib.md5((inventory_context or '').encode()).hexdigest()[:12] return f"{normalized}::{ctx_hash}" def _cache_get(key: str): global _CACHE_HITS, _CACHE_MISSES if not key: _CACHE_MISSES += 1 return None entry = _RESPONSE_CACHE.get(key) if not entry: _CACHE_MISSES += 1 return None expires_at, data = entry if _time_chat.time() > expires_at: _RESPONSE_CACHE.pop(key, None) _CACHE_MISSES += 1 return None _CACHE_HITS += 1 return data def _cache_set(key: str, data: dict): if not key or not data: return _RESPONSE_CACHE[key] = (_time_chat.time() + _CACHE_TTL_SECONDS, data) # Bounded cache — evict oldest entries if we grow past the limit if len(_RESPONSE_CACHE) > _CACHE_MAX_SIZE: oldest_keys = sorted( _RESPONSE_CACHE.items(), key=lambda kv: kv[1][0] )[:200] for k, _v in oldest_keys: _RESPONSE_CACHE.pop(k, None) def chat_cache_stats() -> dict: """Diagnostic helper: hit rate and cache size.""" total = _CACHE_HITS + _CACHE_MISSES hit_rate = (_CACHE_HITS * 100 / total) if total else 0 return { 'entries': len(_RESPONSE_CACHE), 'hits': _CACHE_HITS, 'misses': _CACHE_MISSES, 'hit_rate_pct': round(hit_rate, 1), 'ttl_seconds': _CACHE_TTL_SECONDS, } def chat_cache_clear(): """Manual cache invalidation — e.g. after inventory bulk changes.""" _RESPONSE_CACHE.clear() def chat(user_message, conversation_history=None, inventory_context=None): """Send a message to the AI and get a response with search suggestions. Caches responses for repeated identical questions (subject to bypass rules — messages with VINs / part numbers / plates are never cached). Args: user_message: The user's chat message. conversation_history: Previous messages in the conversation. inventory_context: Optional inventory summary string to inject into the system prompt. """ # Cache lookup — only when there's no conversation history (stateless) cache_key = None if not conversation_history: cache_key = _cache_key(user_message, inventory_context) cached = _cache_get(cache_key) if cached is not None: print(f"[AI] Cache HIT for '{user_message[:40]}...'") return cached system_content = SYSTEM_PROMPT if inventory_context: system_content = SYSTEM_PROMPT + "\n\n" + inventory_context messages = [{"role": "system", "content": system_content}] if conversation_history: messages.extend(conversation_history) messages.append({"role": "user", "content": user_message}) last_error = None # Build backend list: QWEN first (fast, ~1s), then Hermes (specialized, ~30s), then OpenRouter backends = [] if QWEN_ENABLED: backends.append((QWEN_CHAT_URL, QWEN_API_KEY, QWEN_MODEL, 35, SYSTEM_PROMPT_SHORT, 4000)) if HERMES_ENABLED: backends.append((HERMES_CHAT_URL, HERMES_API_KEY, HERMES_MODEL, 45, SYSTEM_PROMPT, 800)) if OPENROUTER_API_KEY: for m in FALLBACK_MODELS: backends.append((OPENROUTER_URL, OPENROUTER_API_KEY, m, 25, SYSTEM_PROMPT, 800)) for url, key, model_id, timeout_sec, sys_prompt, max_tok in backends: _validate_model(model_id) # Use backend-specific system prompt and max_tokens sys_content = sys_prompt if inventory_context: sys_content = sys_prompt + "\n\n" + inventory_context msgs = [{"role": "system", "content": sys_content}] if conversation_history: msgs.extend(conversation_history) msgs.append({"role": "user", "content": user_message}) result = _post_chat_completion(url, key, model_id, msgs, max_tokens=max_tok, temperature=0.3, timeout=timeout_sec) if result is None: if url == QWEN_CHAT_URL: print(f"[AI] QWEN failed, trying Hermes fallback...") last_error = "qwen_failed" elif url == HERMES_CHAT_URL: print(f"[AI] Hermes failed, trying OpenRouter fallback...") last_error = "hermes_timeout" else: print(f"[AI] Rate limited on {model_id}, trying next model...") last_error = "rate_limit" continue content = result["content"] finish = result["finish_reason"] print(f"[AI] Response from {model_id} (finish={finish}, {len(content)} chars)") # Try to parse JSON response try: stripped = content.strip() if stripped.startswith("```"): lines = stripped.split("\n") json_str = "\n".join(lines[1:-1]) parsed = json.loads(json_str) else: parsed = json.loads(stripped) # Successful JSON response — cache it if cache_key: _cache_set(cache_key, parsed) return parsed except (json.JSONDecodeError, IndexError): fallback = {"message": content, "search_query": None, "vehicle": None} # Cache the fallback too — the model gave us a real answer, # it just wasn't JSON. Next hit saves the API call. if cache_key: _cache_set(cache_key, fallback) return fallback # All models exhausted — DON'T cache errors, we want retries next time if last_error == "rate_limit": return {"message": "El asistente está ocupado. Intenta de nuevo en unos segundos.", "search_query": None, "vehicle": None} if last_error == "hermes_timeout": return {"message": "El asistente tardó mucho en responder. Intenta de nuevo en un momento.", "search_query": None, "vehicle": None} return { "message": "El asistente no está disponible en este momento. Intenta de nuevo en unos segundos.", "search_query": None, "vehicle": None, }