#!/usr/bin/env python3 """ Scraper Multimarca v3 - TODAS LAS MARCAS DE ROCKAUTO - Incluye todas las marcas con vehiculos en rango 1975-2026 - Procesa de 5 en 5 años - Espera 3 minutos entre lotes (saltable con ENTER) - Menu interactivo para seleccionar marca y lote - Años: 1975-2026 """ import requests from bs4 import BeautifulSoup import sqlite3 import time import re import os import sys import select from urllib.parse import unquote # Detectar ruta base del proyecto SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) if os.path.basename(SCRIPT_DIR) == "vehicle_scraper": BASE_DIR = os.path.dirname(SCRIPT_DIR) else: BASE_DIR = SCRIPT_DIR DB_PATH = os.path.join(BASE_DIR, "vehicle_database", "vehicle_database.db") BASE_URL = "https://www.rockauto.com/en/catalog" # TODAS LAS MARCAS DE ROCKAUTO (con vehiculos 1975-2026) # Organizadas alfabeticamente BRANDS = [ # A "ABARTH", "AC", "ACURA", "ALFA ROMEO", "ALPINE", "AM GENERAL", "AMERICAN MOTORS", "ASTON MARTIN", "AUDI", "AUSTIN", "AUSTIN-HEALEY", "AVANTI", # B "BENTLEY", "BERTONE", "BMW", "BRICKLIN", "BRISTOL", "BUGATTI", "BUICK", "BYD", # C "CADILLAC", "CHECKER", "CHEVROLET", "CHRYSLER", "CITROEN", "CUPRA", # D "DAEWOO", "DAIHATSU", "DATSUN", "DELOREAN", "DODGE", # E "EAGLE", "EDSEL", "EXCALIBUR", # F "FACEL VEGA", "FERRARI", "FIAT", "FISKER", "FORD", "FREIGHTLINER", # G "GENESIS", "GEO", "GMC", # H "HILLMAN", "HONDA", "HUMMER", "HYUNDAI", # I "INEOS", "INFINITI", "INTERNATIONAL", "ISUZU", # J "JAGUAR", "JEEP", "JENSEN", # K "KARMA", "KENWORTH", "KIA", # L "LAFORZA", "LAMBORGHINI", "LANCIA", "LAND ROVER", "LEXUS", "LINCOLN", "LOTUS", "LUCID", # M "MACK", "MASERATI", "MAYBACH", "MAZDA", "MCLAREN", "MERCEDES-BENZ", "MERCURY", "MERKUR", "MG", "MINI", "MITSUBISHI", "MITSUBISHI FUSO", "MOBILITY VENTURES", "MORGAN", # N "NISSAN", # O "OLDSMOBILE", "OPEL", # P "PANOZ", "PEUGEOT", "PLYMOUTH", "POLESTAR", "PONTIAC", "PORSCHE", # Q "QVALE", # R "RAM", "RENAULT", "RIVIAN", "ROLLS-ROYCE", "ROVER", # S "SAAB", "SALEEN", "SATURN", "SCION", "SEAT", "SHELBY", "SMART", "SPYKER", "SRT", "SSANGYONG", "STERLING", "STUDEBAKER", "SUBARU", "SUNBEAM", "SUZUKI", # T "TESLA", "TOYOTA", "TRIUMPH", "TVR", # U "UD", # V "VOLKSWAGEN", "VOLVO", "VPG", # W "WORKHORSE", # Y "YUGO", ] # Marcas adicionales de mercados especificos (Mexico, China, etc.) BRANDS_REGIONAL = [ "BAIC", "BESTUNE", "CHANGAN", "CHIREY", "DFSK", "FAW", "FOTON", "GAC", "GEELY", "GIANT MOTORS", "JAC", "JAECOO", "JETOUR", "JMC", "OMODA", "SERES", "VAM", "VINFAST", ] # Años de 1975 a 2026 (orden descendente) ALL_YEARS = list(range(2026, 1974, -1)) # Configuracion de lotes BATCH_SIZE = 5 # años por lote WAIT_TIME = 180 # 3 minutos entre lotes session = requests.Session() session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml', 'Accept-Language': 'en-US,en;q=0.9', }) def check_key_press(): """Verifica si se presiono alguna tecla (non-blocking)""" if sys.platform == 'win32': import msvcrt if msvcrt.kbhit(): msvcrt.getch() return True return False else: # Linux/Mac rlist, _, _ = select.select([sys.stdin], [], [], 0) if rlist: sys.stdin.readline() return True return False def wait_with_skip(seconds, message=""): """Espera que se puede saltar presionando cualquier tecla""" print(f"\n{'*'*60}") print(f" {message}") print(f" ACTIVA/CAMBIA EL VPN AHORA") print(f" >>> Presiona ENTER para saltar la espera <<<") print(f"{'*'*60}") # Limpiar buffer de entrada if sys.platform != 'win32': import termios try: termios.tcflush(sys.stdin, termios.TCIFLUSH) except: pass for sec in range(seconds, 0, -1): if check_key_press(): print(f"\n >>> ESPERA SALTADA <<<") return mins = sec // 60 secs = sec % 60 print(f"\r Continuando en {mins}:{secs:02d}... (ENTER para saltar) ", end="", flush=True) time.sleep(1) print() def clean_name(name): name = unquote(name.replace('+', ' ')) return re.sub(r'\s+', ' ', name).strip().upper() def get_soup(url, retries=3): for attempt in range(retries): try: time.sleep(0.5) response = session.get(url, timeout=15) if response.status_code == 200: return BeautifulSoup(response.content, 'html.parser') elif response.status_code == 403: print(f"\n [!] Bloqueado (403) - Cambia el VPN") return None except Exception as e: if attempt < retries - 1: time.sleep(3) else: print(f"\n Error: {e}") return None def get_models(brand, year): brand_url = brand.lower().replace(' ', '+') soup = get_soup(f"{BASE_URL}/{brand_url},{year}") if not soup: return [] models = set() for link in soup.find_all('a', href=True): pattern = rf'/catalog/{re.escape(brand_url)},{year},([^,/]+)' match = re.search(pattern, link['href'], re.I) if match: model = clean_name(match.group(1)) if model and not model.isdigit() and len(model) > 1: models.add(model) return sorted(models) def get_engines(brand, year, model): brand_url = brand.lower().replace(' ', '+') model_url = model.lower().replace(' ', '+') soup = get_soup(f"{BASE_URL}/{brand_url},{year},{model_url}") if not soup: return ['STANDARD'] engines = set() for link in soup.find_all('a', href=True): pattern = rf'/catalog/{re.escape(brand_url)},{year},{re.escape(model_url)},([^,/]+)' match = re.search(pattern, link['href'], re.I) if match: engine = clean_name(match.group(1)) if engine and re.search(r'\d+\.?\d*L|V\d|I\d|H\d|HYBRID|ELECTRIC|DIESEL', engine, re.I): engines.add(engine) return sorted(engines) if engines else ['STANDARD'] def save_to_db(conn, brand, year, model, engine): cursor = conn.cursor() try: cursor.execute("INSERT OR IGNORE INTO brands (name) VALUES (?)", (brand,)) cursor.execute("SELECT id FROM brands WHERE name = ?", (brand,)) brand_id = cursor.fetchone()[0] cursor.execute("INSERT OR IGNORE INTO years (year) VALUES (?)", (year,)) cursor.execute("SELECT id FROM years WHERE year = ?", (year,)) year_id = cursor.fetchone()[0] cursor.execute("INSERT OR IGNORE INTO engines (name) VALUES (?)", (engine,)) cursor.execute("SELECT id FROM engines WHERE name = ?", (engine,)) engine_id = cursor.fetchone()[0] cursor.execute("INSERT OR IGNORE INTO models (brand_id, name) VALUES (?, ?)", (brand_id, model)) cursor.execute("SELECT id FROM models WHERE brand_id = ? AND name = ?", (brand_id, model)) model_id = cursor.fetchone()[0] cursor.execute( "INSERT OR IGNORE INTO model_year_engine (model_id, year_id, engine_id) VALUES (?, ?, ?)", (model_id, year_id, engine_id) ) return cursor.rowcount > 0 except Exception as e: print(f" DB Error: {e}") return False def get_existing_years(conn, brand): """Obtiene los años que ya existen para esta marca""" cursor = conn.cursor() cursor.execute(""" SELECT DISTINCT y.year FROM years y JOIN model_year_engine mye ON y.id = mye.year_id JOIN models m ON mye.model_id = m.id JOIN brands b ON m.brand_id = b.id WHERE b.name = ? """, (brand,)) return set(row[0] for row in cursor.fetchall()) def process_batch(conn, brand, years_batch, batch_num, total_batches): """Procesa un lote de 5 años""" print(f"\n{'='*60}") print(f"[{brand}] LOTE {batch_num}/{total_batches}: Años {years_batch}") print('='*60) batch_saved = 0 batch_total = 0 for year in years_batch: print(f"\n[{brand} - Año {year}] Obteniendo modelos... ", end="", flush=True) models = get_models(brand, year) print(f"{len(models)} modelos encontrados") if not models: print(f" No se encontraron modelos para {year}") continue for model in models: engines = get_engines(brand, year, model) for engine in engines: batch_total += 1 if save_to_db(conn, brand, year, model, engine): batch_saved += 1 print(f" {model} - {engine}") # Guardar cambios del lote conn.commit() print(f"\n>> Lote {batch_num} completado: {batch_saved} nuevos de {batch_total} encontrados") return batch_saved, batch_total def get_brand_batches(conn, brand): """Obtiene los lotes disponibles para una marca""" existing = get_existing_years(conn, brand) years_to_process = [y for y in ALL_YEARS if y not in existing] if not years_to_process: return [], existing batches = [years_to_process[i:i+BATCH_SIZE] for i in range(0, len(years_to_process), BATCH_SIZE)] return batches, existing def process_brand(conn, brand, start_batch=1): """Procesa una marca completa desde un lote especifico""" print(f"\n{'#'*60}") print(f" PROCESANDO MARCA: {brand}") print(f"{'#'*60}") # Verificar que años ya existen existing = get_existing_years(conn, brand) print(f"Años existentes de {brand}: {len(existing)} años") if existing: print(f" Rango existente: {min(existing)}-{max(existing)}") # Filtrar solo los que faltan years_to_process = [y for y in ALL_YEARS if y not in existing] if not years_to_process: print(f"\n[OK] {brand}: Todos los años ya estan en la base de datos!") return 0, 0 print(f"\nAños por procesar para {brand}: {len(years_to_process)}") print(f" De {max(years_to_process)} a {min(years_to_process)}") # Dividir en lotes de 5 batches = [years_to_process[i:i+BATCH_SIZE] for i in range(0, len(years_to_process), BATCH_SIZE)] total_batches = len(batches) print(f"Lotes de {BATCH_SIZE} años: {total_batches} lotes") if start_batch > 1: print(f"\n>>> Comenzando desde el lote {start_batch} <<<") total_saved = 0 total_found = 0 for i, batch in enumerate(batches, 1): # Saltar lotes anteriores al inicial if i < start_batch: continue saved, found = process_batch(conn, brand, batch, i, total_batches) total_saved += saved total_found += found # Si no es el ultimo lote, esperar para cambiar VPN if i < total_batches: wait_with_skip(WAIT_TIME, f"PAUSA DE {WAIT_TIME//60} MINUTOS - [{brand}] Lotes restantes: {total_batches - i}") return total_saved, total_found def show_main_menu(conn): """Muestra menu principal con opciones""" all_brands = BRANDS + BRANDS_REGIONAL while True: print("\n" + "="*60) print(" SCRAPER MULTIMARCA v3 - MENU PRINCIPAL") print("="*60) print(f"\n Total de marcas disponibles: {len(all_brands)}") print("\n Opciones:") print(" 1. Ver estado de todas las marcas") print(" 2. Buscar marca por nombre") print(" 3. Seleccionar marca por letra inicial") print(" 4. Procesar multiples marcas (rango)") print(" 5. Procesar TODAS las marcas pendientes") print(" 0. Salir") print("="*60) choice = input("\nSelecciona opcion: ").strip() if choice == '0': return None, None, None elif choice == '1': show_all_brands_status(conn, all_brands) elif choice == '2': result = search_brand_menu(conn, all_brands) if result[0] is not None: return result elif choice == '3': result = select_by_letter_menu(conn, all_brands) if result[0] is not None: return result elif choice == '4': result = select_range_menu(conn, all_brands) if result[0] is not None: return result elif choice == '5': return 'ALL', 1, all_brands else: print("Opcion invalida") def show_all_brands_status(conn, all_brands): """Muestra estado de todas las marcas con paginacion""" print("\n" + "-"*70) print(" ESTADO DE TODAS LAS MARCAS") print("-"*70) page_size = 20 total_pages = (len(all_brands) + page_size - 1) // page_size current_page = 0 while True: start_idx = current_page * page_size end_idx = min(start_idx + page_size, len(all_brands)) print(f"\n Pagina {current_page + 1}/{total_pages}") print("-"*70) print(f" {'#':<4} {'MARCA':<20} {'EXISTENTES':<12} {'ESTADO':<30}") print("-"*70) for i, brand in enumerate(all_brands[start_idx:end_idx], start_idx + 1): batches, existing = get_brand_batches(conn, brand) years_pending = sum(len(b) for b in batches) if not batches: status = "COMPLETO" else: status = f"{years_pending} años pend. ({len(batches)} lotes)" print(f" {i:<4} {brand:<20} {len(existing):<12} {status:<30}") print("-"*70) print(f" [N] Siguiente | [P] Anterior | [Q] Volver") nav = input("\nNavegacion: ").strip().upper() if nav == 'N' and current_page < total_pages - 1: current_page += 1 elif nav == 'P' and current_page > 0: current_page -= 1 elif nav == 'Q' or nav == '': break def search_brand_menu(conn, all_brands): """Busca una marca por nombre""" print("\n" + "="*60) print(" BUSCAR MARCA") print("="*60) search = input("\nIngresa nombre de marca (o parte): ").strip().upper() if not search: return None, None, None matches = [b for b in all_brands if search in b] if not matches: print(f"\n No se encontraron marcas con '{search}'") input("\nPresiona ENTER para continuar...") return None, None, None print(f"\n Marcas encontradas ({len(matches)}):") for i, brand in enumerate(matches, 1): batches, existing = get_brand_batches(conn, brand) status = "COMPLETO" if not batches else f"{len(batches)} lotes pendientes" print(f" {i}. {brand} - {status}") print(f"\n 0. Volver") while True: try: choice = input("\nSelecciona marca: ").strip() if choice == '0' or choice == '': return None, None, None idx = int(choice) - 1 if 0 <= idx < len(matches): return select_batch_for_brand(conn, matches[idx]) print("Opcion invalida") except ValueError: print("Ingresa un numero valido") def select_by_letter_menu(conn, all_brands): """Selecciona marcas por letra inicial""" print("\n" + "="*60) print(" SELECCIONAR POR LETRA") print("="*60) # Obtener letras disponibles letters = sorted(set(b[0] for b in all_brands)) print("\n Letras disponibles:") print(f" {' '.join(letters)}") letter = input("\nIngresa letra: ").strip().upper() if not letter or letter not in letters: return None, None, None matches = [b for b in all_brands if b.startswith(letter)] print(f"\n Marcas con '{letter}' ({len(matches)}):") for i, brand in enumerate(matches, 1): batches, existing = get_brand_batches(conn, brand) status = "COMPLETO" if not batches else f"{len(batches)} lotes pendientes" print(f" {i}. {brand} - {status}") print(f"\n 0. Volver") print(f" A. Procesar TODAS las marcas con '{letter}'") while True: choice = input("\nSelecciona: ").strip().upper() if choice == '0' or choice == '': return None, None, None if choice == 'A': return 'MULTIPLE', 1, matches try: idx = int(choice) - 1 if 0 <= idx < len(matches): return select_batch_for_brand(conn, matches[idx]) print("Opcion invalida") except ValueError: print("Ingresa un numero valido o 'A'") def select_range_menu(conn, all_brands): """Selecciona un rango de marcas para procesar""" print("\n" + "="*60) print(" SELECCIONAR RANGO DE MARCAS") print("="*60) # Mostrar todas las marcas numeradas print("\n Marcas disponibles:") for i, brand in enumerate(all_brands, 1): if i % 4 == 0: print(f" {i:3}. {brand}") else: print(f" {i:3}. {brand:<18}", end="") print() print(f"\n Total: {len(all_brands)} marcas") print("\n Ingresa rango (ej: 1-10, 5-20, etc.)") range_input = input("\nRango: ").strip() if not range_input: return None, None, None try: if '-' in range_input: start, end = map(int, range_input.split('-')) else: start = end = int(range_input) if 1 <= start <= end <= len(all_brands): selected = all_brands[start-1:end] print(f"\n Marcas seleccionadas ({len(selected)}):") for b in selected: print(f" - {b}") confirm = input("\nProcesar estas marcas? (S/N): ").strip().upper() if confirm == 'S': return 'MULTIPLE', 1, selected else: print("Rango invalido") except ValueError: print("Formato invalido. Usa: inicio-fin") return None, None, None def select_batch_for_brand(conn, brand): """Selecciona el lote inicial para una marca""" batches, existing = get_brand_batches(conn, brand) if not batches: print(f"\n {brand} ya esta completo!") input("\nPresiona ENTER para continuar...") return None, None, None print(f"\n{'='*60}") print(f" LOTES PARA {brand}") print(f"{'='*60}") print(f"\n Años existentes: {len(existing)}") print(f" Lotes pendientes: {len(batches)}") # Mostrar lotes print("\n Lotes disponibles:") for j, batch in enumerate(batches, 1): print(f" {j}. Años {batch[0]} - {batch[-1]} ({len(batch)} años)") print(f"\n 0. Volver") while True: try: batch_choice = input(f"\nComenzar desde lote (1-{len(batches)}): ").strip() if batch_choice == '0' or batch_choice == '': return None, None, None batch_num = int(batch_choice) if 1 <= batch_num <= len(batches): return brand, batch_num, None print(f"Ingresa un numero entre 1 y {len(batches)}") except ValueError: print("Ingresa un numero valido") def main(): all_brands = BRANDS + BRANDS_REGIONAL print("="*60) print(" SCRAPER MULTIMARCA v3 - ROCKAUTO") print("="*60) print(f" Total marcas: {len(all_brands)}") print(f" Marcas principales: {len(BRANDS)}") print(f" Marcas regionales: {len(BRANDS_REGIONAL)}") print(f" Años: 1975-2026 | Lotes de {BATCH_SIZE} años") print(f" Pausa entre lotes: {WAIT_TIME//60} minutos") print(" >>> Presiona ENTER para saltar esperas <<<") print("="*60) # Verificar base de datos if not os.path.exists(DB_PATH): print(f"\n[ERROR] Base de datos no encontrada: {DB_PATH}") print("Verifica que la ruta sea correcta.") sys.exit(1) print(f"\nBase de datos: {DB_PATH}") conn = sqlite3.connect(DB_PATH) # Menu principal selected, start_batch, brands_list = show_main_menu(conn) if selected is None: print("\nSaliendo...") conn.close() return grand_total_saved = 0 grand_total_found = 0 brand_stats = {} # Determinar que marcas procesar if selected == 'ALL': brands_to_process = brands_list or all_brands start_batches = {brand: 1 for brand in brands_to_process} elif selected == 'MULTIPLE': brands_to_process = brands_list start_batches = {brand: 1 for brand in brands_to_process} elif selected: brands_to_process = [selected] start_batches = {selected: start_batch} else: conn.close() return print(f"\n{'='*60}") print(f" INICIANDO PROCESAMIENTO") print(f" Marcas a procesar: {len(brands_to_process)}") print(f"{'='*60}") for idx, brand in enumerate(brands_to_process, 1): print(f"\n[{idx}/{len(brands_to_process)}] ", end="") saved, found = process_brand(conn, brand, start_batches.get(brand, 1)) brand_stats[brand] = {'saved': saved, 'found': found} grand_total_saved += saved grand_total_found += found # Pausa entre marcas (si hay otra marca por procesar) if idx < len(brands_to_process): next_brand = brands_to_process[idx] wait_with_skip(WAIT_TIME, f"PAUSA ENTRE MARCAS - Siguiente: {next_brand}") conn.close() print("\n" + "="*60) print(" RESUMEN FINAL") print("="*60) # Solo mostrar marcas con datos brands_with_data = {k: v for k, v in brand_stats.items() if v['found'] > 0 or v['saved'] > 0} if brands_with_data: for brand, stats in brands_with_data.items(): print(f" {brand}:") print(f" Encontrados: {stats['found']}") print(f" Nuevos guardados: {stats['saved']}") else: print(" No se encontraron nuevos datos") print("-"*60) print(f" TOTAL:") print(f" Marcas procesadas: {len(brands_to_process)}") print(f" Vehiculos encontrados: {grand_total_found}") print(f" Nuevos guardados: {grand_total_saved}") print("="*60) if __name__ == "__main__": main()