Files
Autoparts-DB/vehicle_scraper/scrape_multibrand.py
consultoria-as 61474f7abe Update multibrand scraper v3 with all RockAuto brands
Added 108 brands total:
- 90 main brands (international)
- 18 regional brands (Mexico, China, etc.)

Features:
- Interactive menu with multiple options
- Search brand by name
- Select by initial letter
- Process range of brands (e.g., 1-10)
- View all brands status with pagination
- Skip wait time with ENTER
- Years: 1975-2026, 5 years per batch
2026-01-19 09:17:54 +00:00

792 lines
22 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Scraper Multimarca v3 - TODAS LAS MARCAS DE ROCKAUTO
- Incluye todas las marcas con vehiculos en rango 1975-2026
- Procesa de 5 en 5 años
- Espera 3 minutos entre lotes (saltable con ENTER)
- Menu interactivo para seleccionar marca y lote
- Años: 1975-2026
"""
import requests
from bs4 import BeautifulSoup
import sqlite3
import time
import re
import os
import sys
import select
from urllib.parse import unquote
# Detectar ruta base del proyecto
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
if os.path.basename(SCRIPT_DIR) == "vehicle_scraper":
BASE_DIR = os.path.dirname(SCRIPT_DIR)
else:
BASE_DIR = SCRIPT_DIR
DB_PATH = os.path.join(BASE_DIR, "vehicle_database", "vehicle_database.db")
BASE_URL = "https://www.rockauto.com/en/catalog"
# TODAS LAS MARCAS DE ROCKAUTO (con vehiculos 1975-2026)
# Organizadas alfabeticamente
BRANDS = [
# A
"ABARTH",
"AC",
"ACURA",
"ALFA ROMEO",
"ALPINE",
"AM GENERAL",
"AMERICAN MOTORS",
"ASTON MARTIN",
"AUDI",
"AUSTIN",
"AUSTIN-HEALEY",
"AVANTI",
# B
"BENTLEY",
"BERTONE",
"BMW",
"BRICKLIN",
"BRISTOL",
"BUGATTI",
"BUICK",
"BYD",
# C
"CADILLAC",
"CHECKER",
"CHEVROLET",
"CHRYSLER",
"CITROEN",
"CUPRA",
# D
"DAEWOO",
"DAIHATSU",
"DATSUN",
"DELOREAN",
"DODGE",
# E
"EAGLE",
"EDSEL",
"EXCALIBUR",
# F
"FACEL VEGA",
"FERRARI",
"FIAT",
"FISKER",
"FORD",
"FREIGHTLINER",
# G
"GENESIS",
"GEO",
"GMC",
# H
"HILLMAN",
"HONDA",
"HUMMER",
"HYUNDAI",
# I
"INEOS",
"INFINITI",
"INTERNATIONAL",
"ISUZU",
# J
"JAGUAR",
"JEEP",
"JENSEN",
# K
"KARMA",
"KENWORTH",
"KIA",
# L
"LAFORZA",
"LAMBORGHINI",
"LANCIA",
"LAND ROVER",
"LEXUS",
"LINCOLN",
"LOTUS",
"LUCID",
# M
"MACK",
"MASERATI",
"MAYBACH",
"MAZDA",
"MCLAREN",
"MERCEDES-BENZ",
"MERCURY",
"MERKUR",
"MG",
"MINI",
"MITSUBISHI",
"MITSUBISHI FUSO",
"MOBILITY VENTURES",
"MORGAN",
# N
"NISSAN",
# O
"OLDSMOBILE",
"OPEL",
# P
"PANOZ",
"PEUGEOT",
"PLYMOUTH",
"POLESTAR",
"PONTIAC",
"PORSCHE",
# Q
"QVALE",
# R
"RAM",
"RENAULT",
"RIVIAN",
"ROLLS-ROYCE",
"ROVER",
# S
"SAAB",
"SALEEN",
"SATURN",
"SCION",
"SEAT",
"SHELBY",
"SMART",
"SPYKER",
"SRT",
"SSANGYONG",
"STERLING",
"STUDEBAKER",
"SUBARU",
"SUNBEAM",
"SUZUKI",
# T
"TESLA",
"TOYOTA",
"TRIUMPH",
"TVR",
# U
"UD",
# V
"VOLKSWAGEN",
"VOLVO",
"VPG",
# W
"WORKHORSE",
# Y
"YUGO",
]
# Marcas adicionales de mercados especificos (Mexico, China, etc.)
BRANDS_REGIONAL = [
"BAIC",
"BESTUNE",
"CHANGAN",
"CHIREY",
"DFSK",
"FAW",
"FOTON",
"GAC",
"GEELY",
"GIANT MOTORS",
"JAC",
"JAECOO",
"JETOUR",
"JMC",
"OMODA",
"SERES",
"VAM",
"VINFAST",
]
# Años de 1975 a 2026 (orden descendente)
ALL_YEARS = list(range(2026, 1974, -1))
# Configuracion de lotes
BATCH_SIZE = 5 # años por lote
WAIT_TIME = 180 # 3 minutos entre lotes
session = requests.Session()
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml',
'Accept-Language': 'en-US,en;q=0.9',
})
def check_key_press():
"""Verifica si se presiono alguna tecla (non-blocking)"""
if sys.platform == 'win32':
import msvcrt
if msvcrt.kbhit():
msvcrt.getch()
return True
return False
else:
# Linux/Mac
rlist, _, _ = select.select([sys.stdin], [], [], 0)
if rlist:
sys.stdin.readline()
return True
return False
def wait_with_skip(seconds, message=""):
"""Espera que se puede saltar presionando cualquier tecla"""
print(f"\n{'*'*60}")
print(f" {message}")
print(f" ACTIVA/CAMBIA EL VPN AHORA")
print(f" >>> Presiona ENTER para saltar la espera <<<")
print(f"{'*'*60}")
# Limpiar buffer de entrada
if sys.platform != 'win32':
import termios
try:
termios.tcflush(sys.stdin, termios.TCIFLUSH)
except:
pass
for sec in range(seconds, 0, -1):
if check_key_press():
print(f"\n >>> ESPERA SALTADA <<<")
return
mins = sec // 60
secs = sec % 60
print(f"\r Continuando en {mins}:{secs:02d}... (ENTER para saltar) ", end="", flush=True)
time.sleep(1)
print()
def clean_name(name):
name = unquote(name.replace('+', ' '))
return re.sub(r'\s+', ' ', name).strip().upper()
def get_soup(url, retries=3):
for attempt in range(retries):
try:
time.sleep(0.5)
response = session.get(url, timeout=15)
if response.status_code == 200:
return BeautifulSoup(response.content, 'html.parser')
elif response.status_code == 403:
print(f"\n [!] Bloqueado (403) - Cambia el VPN")
return None
except Exception as e:
if attempt < retries - 1:
time.sleep(3)
else:
print(f"\n Error: {e}")
return None
def get_models(brand, year):
brand_url = brand.lower().replace(' ', '+')
soup = get_soup(f"{BASE_URL}/{brand_url},{year}")
if not soup:
return []
models = set()
for link in soup.find_all('a', href=True):
pattern = rf'/catalog/{re.escape(brand_url)},{year},([^,/]+)'
match = re.search(pattern, link['href'], re.I)
if match:
model = clean_name(match.group(1))
if model and not model.isdigit() and len(model) > 1:
models.add(model)
return sorted(models)
def get_engines(brand, year, model):
brand_url = brand.lower().replace(' ', '+')
model_url = model.lower().replace(' ', '+')
soup = get_soup(f"{BASE_URL}/{brand_url},{year},{model_url}")
if not soup:
return ['STANDARD']
engines = set()
for link in soup.find_all('a', href=True):
pattern = rf'/catalog/{re.escape(brand_url)},{year},{re.escape(model_url)},([^,/]+)'
match = re.search(pattern, link['href'], re.I)
if match:
engine = clean_name(match.group(1))
if engine and re.search(r'\d+\.?\d*L|V\d|I\d|H\d|HYBRID|ELECTRIC|DIESEL', engine, re.I):
engines.add(engine)
return sorted(engines) if engines else ['STANDARD']
def save_to_db(conn, brand, year, model, engine):
cursor = conn.cursor()
try:
cursor.execute("INSERT OR IGNORE INTO brands (name) VALUES (?)", (brand,))
cursor.execute("SELECT id FROM brands WHERE name = ?", (brand,))
brand_id = cursor.fetchone()[0]
cursor.execute("INSERT OR IGNORE INTO years (year) VALUES (?)", (year,))
cursor.execute("SELECT id FROM years WHERE year = ?", (year,))
year_id = cursor.fetchone()[0]
cursor.execute("INSERT OR IGNORE INTO engines (name) VALUES (?)", (engine,))
cursor.execute("SELECT id FROM engines WHERE name = ?", (engine,))
engine_id = cursor.fetchone()[0]
cursor.execute("INSERT OR IGNORE INTO models (brand_id, name) VALUES (?, ?)", (brand_id, model))
cursor.execute("SELECT id FROM models WHERE brand_id = ? AND name = ?", (brand_id, model))
model_id = cursor.fetchone()[0]
cursor.execute(
"INSERT OR IGNORE INTO model_year_engine (model_id, year_id, engine_id) VALUES (?, ?, ?)",
(model_id, year_id, engine_id)
)
return cursor.rowcount > 0
except Exception as e:
print(f" DB Error: {e}")
return False
def get_existing_years(conn, brand):
"""Obtiene los años que ya existen para esta marca"""
cursor = conn.cursor()
cursor.execute("""
SELECT DISTINCT y.year
FROM years y
JOIN model_year_engine mye ON y.id = mye.year_id
JOIN models m ON mye.model_id = m.id
JOIN brands b ON m.brand_id = b.id
WHERE b.name = ?
""", (brand,))
return set(row[0] for row in cursor.fetchall())
def process_batch(conn, brand, years_batch, batch_num, total_batches):
"""Procesa un lote de 5 años"""
print(f"\n{'='*60}")
print(f"[{brand}] LOTE {batch_num}/{total_batches}: Años {years_batch}")
print('='*60)
batch_saved = 0
batch_total = 0
for year in years_batch:
print(f"\n[{brand} - Año {year}] Obteniendo modelos... ", end="", flush=True)
models = get_models(brand, year)
print(f"{len(models)} modelos encontrados")
if not models:
print(f" No se encontraron modelos para {year}")
continue
for model in models:
engines = get_engines(brand, year, model)
for engine in engines:
batch_total += 1
if save_to_db(conn, brand, year, model, engine):
batch_saved += 1
print(f" {model} - {engine}")
# Guardar cambios del lote
conn.commit()
print(f"\n>> Lote {batch_num} completado: {batch_saved} nuevos de {batch_total} encontrados")
return batch_saved, batch_total
def get_brand_batches(conn, brand):
"""Obtiene los lotes disponibles para una marca"""
existing = get_existing_years(conn, brand)
years_to_process = [y for y in ALL_YEARS if y not in existing]
if not years_to_process:
return [], existing
batches = [years_to_process[i:i+BATCH_SIZE] for i in range(0, len(years_to_process), BATCH_SIZE)]
return batches, existing
def process_brand(conn, brand, start_batch=1):
"""Procesa una marca completa desde un lote especifico"""
print(f"\n{'#'*60}")
print(f" PROCESANDO MARCA: {brand}")
print(f"{'#'*60}")
# Verificar que años ya existen
existing = get_existing_years(conn, brand)
print(f"Años existentes de {brand}: {len(existing)} años")
if existing:
print(f" Rango existente: {min(existing)}-{max(existing)}")
# Filtrar solo los que faltan
years_to_process = [y for y in ALL_YEARS if y not in existing]
if not years_to_process:
print(f"\n[OK] {brand}: Todos los años ya estan en la base de datos!")
return 0, 0
print(f"\nAños por procesar para {brand}: {len(years_to_process)}")
print(f" De {max(years_to_process)} a {min(years_to_process)}")
# Dividir en lotes de 5
batches = [years_to_process[i:i+BATCH_SIZE] for i in range(0, len(years_to_process), BATCH_SIZE)]
total_batches = len(batches)
print(f"Lotes de {BATCH_SIZE} años: {total_batches} lotes")
if start_batch > 1:
print(f"\n>>> Comenzando desde el lote {start_batch} <<<")
total_saved = 0
total_found = 0
for i, batch in enumerate(batches, 1):
# Saltar lotes anteriores al inicial
if i < start_batch:
continue
saved, found = process_batch(conn, brand, batch, i, total_batches)
total_saved += saved
total_found += found
# Si no es el ultimo lote, esperar para cambiar VPN
if i < total_batches:
wait_with_skip(WAIT_TIME, f"PAUSA DE {WAIT_TIME//60} MINUTOS - [{brand}] Lotes restantes: {total_batches - i}")
return total_saved, total_found
def show_main_menu(conn):
"""Muestra menu principal con opciones"""
all_brands = BRANDS + BRANDS_REGIONAL
while True:
print("\n" + "="*60)
print(" SCRAPER MULTIMARCA v3 - MENU PRINCIPAL")
print("="*60)
print(f"\n Total de marcas disponibles: {len(all_brands)}")
print("\n Opciones:")
print(" 1. Ver estado de todas las marcas")
print(" 2. Buscar marca por nombre")
print(" 3. Seleccionar marca por letra inicial")
print(" 4. Procesar multiples marcas (rango)")
print(" 5. Procesar TODAS las marcas pendientes")
print(" 0. Salir")
print("="*60)
choice = input("\nSelecciona opcion: ").strip()
if choice == '0':
return None, None, None
elif choice == '1':
show_all_brands_status(conn, all_brands)
elif choice == '2':
result = search_brand_menu(conn, all_brands)
if result[0] is not None:
return result
elif choice == '3':
result = select_by_letter_menu(conn, all_brands)
if result[0] is not None:
return result
elif choice == '4':
result = select_range_menu(conn, all_brands)
if result[0] is not None:
return result
elif choice == '5':
return 'ALL', 1, all_brands
else:
print("Opcion invalida")
def show_all_brands_status(conn, all_brands):
"""Muestra estado de todas las marcas con paginacion"""
print("\n" + "-"*70)
print(" ESTADO DE TODAS LAS MARCAS")
print("-"*70)
page_size = 20
total_pages = (len(all_brands) + page_size - 1) // page_size
current_page = 0
while True:
start_idx = current_page * page_size
end_idx = min(start_idx + page_size, len(all_brands))
print(f"\n Pagina {current_page + 1}/{total_pages}")
print("-"*70)
print(f" {'#':<4} {'MARCA':<20} {'EXISTENTES':<12} {'ESTADO':<30}")
print("-"*70)
for i, brand in enumerate(all_brands[start_idx:end_idx], start_idx + 1):
batches, existing = get_brand_batches(conn, brand)
years_pending = sum(len(b) for b in batches)
if not batches:
status = "COMPLETO"
else:
status = f"{years_pending} años pend. ({len(batches)} lotes)"
print(f" {i:<4} {brand:<20} {len(existing):<12} {status:<30}")
print("-"*70)
print(f" [N] Siguiente | [P] Anterior | [Q] Volver")
nav = input("\nNavegacion: ").strip().upper()
if nav == 'N' and current_page < total_pages - 1:
current_page += 1
elif nav == 'P' and current_page > 0:
current_page -= 1
elif nav == 'Q' or nav == '':
break
def search_brand_menu(conn, all_brands):
"""Busca una marca por nombre"""
print("\n" + "="*60)
print(" BUSCAR MARCA")
print("="*60)
search = input("\nIngresa nombre de marca (o parte): ").strip().upper()
if not search:
return None, None, None
matches = [b for b in all_brands if search in b]
if not matches:
print(f"\n No se encontraron marcas con '{search}'")
input("\nPresiona ENTER para continuar...")
return None, None, None
print(f"\n Marcas encontradas ({len(matches)}):")
for i, brand in enumerate(matches, 1):
batches, existing = get_brand_batches(conn, brand)
status = "COMPLETO" if not batches else f"{len(batches)} lotes pendientes"
print(f" {i}. {brand} - {status}")
print(f"\n 0. Volver")
while True:
try:
choice = input("\nSelecciona marca: ").strip()
if choice == '0' or choice == '':
return None, None, None
idx = int(choice) - 1
if 0 <= idx < len(matches):
return select_batch_for_brand(conn, matches[idx])
print("Opcion invalida")
except ValueError:
print("Ingresa un numero valido")
def select_by_letter_menu(conn, all_brands):
"""Selecciona marcas por letra inicial"""
print("\n" + "="*60)
print(" SELECCIONAR POR LETRA")
print("="*60)
# Obtener letras disponibles
letters = sorted(set(b[0] for b in all_brands))
print("\n Letras disponibles:")
print(f" {' '.join(letters)}")
letter = input("\nIngresa letra: ").strip().upper()
if not letter or letter not in letters:
return None, None, None
matches = [b for b in all_brands if b.startswith(letter)]
print(f"\n Marcas con '{letter}' ({len(matches)}):")
for i, brand in enumerate(matches, 1):
batches, existing = get_brand_batches(conn, brand)
status = "COMPLETO" if not batches else f"{len(batches)} lotes pendientes"
print(f" {i}. {brand} - {status}")
print(f"\n 0. Volver")
print(f" A. Procesar TODAS las marcas con '{letter}'")
while True:
choice = input("\nSelecciona: ").strip().upper()
if choice == '0' or choice == '':
return None, None, None
if choice == 'A':
return 'MULTIPLE', 1, matches
try:
idx = int(choice) - 1
if 0 <= idx < len(matches):
return select_batch_for_brand(conn, matches[idx])
print("Opcion invalida")
except ValueError:
print("Ingresa un numero valido o 'A'")
def select_range_menu(conn, all_brands):
"""Selecciona un rango de marcas para procesar"""
print("\n" + "="*60)
print(" SELECCIONAR RANGO DE MARCAS")
print("="*60)
# Mostrar todas las marcas numeradas
print("\n Marcas disponibles:")
for i, brand in enumerate(all_brands, 1):
if i % 4 == 0:
print(f" {i:3}. {brand}")
else:
print(f" {i:3}. {brand:<18}", end="")
print()
print(f"\n Total: {len(all_brands)} marcas")
print("\n Ingresa rango (ej: 1-10, 5-20, etc.)")
range_input = input("\nRango: ").strip()
if not range_input:
return None, None, None
try:
if '-' in range_input:
start, end = map(int, range_input.split('-'))
else:
start = end = int(range_input)
if 1 <= start <= end <= len(all_brands):
selected = all_brands[start-1:end]
print(f"\n Marcas seleccionadas ({len(selected)}):")
for b in selected:
print(f" - {b}")
confirm = input("\nProcesar estas marcas? (S/N): ").strip().upper()
if confirm == 'S':
return 'MULTIPLE', 1, selected
else:
print("Rango invalido")
except ValueError:
print("Formato invalido. Usa: inicio-fin")
return None, None, None
def select_batch_for_brand(conn, brand):
"""Selecciona el lote inicial para una marca"""
batches, existing = get_brand_batches(conn, brand)
if not batches:
print(f"\n {brand} ya esta completo!")
input("\nPresiona ENTER para continuar...")
return None, None, None
print(f"\n{'='*60}")
print(f" LOTES PARA {brand}")
print(f"{'='*60}")
print(f"\n Años existentes: {len(existing)}")
print(f" Lotes pendientes: {len(batches)}")
# Mostrar lotes
print("\n Lotes disponibles:")
for j, batch in enumerate(batches, 1):
print(f" {j}. Años {batch[0]} - {batch[-1]} ({len(batch)} años)")
print(f"\n 0. Volver")
while True:
try:
batch_choice = input(f"\nComenzar desde lote (1-{len(batches)}): ").strip()
if batch_choice == '0' or batch_choice == '':
return None, None, None
batch_num = int(batch_choice)
if 1 <= batch_num <= len(batches):
return brand, batch_num, None
print(f"Ingresa un numero entre 1 y {len(batches)}")
except ValueError:
print("Ingresa un numero valido")
def main():
all_brands = BRANDS + BRANDS_REGIONAL
print("="*60)
print(" SCRAPER MULTIMARCA v3 - ROCKAUTO")
print("="*60)
print(f" Total marcas: {len(all_brands)}")
print(f" Marcas principales: {len(BRANDS)}")
print(f" Marcas regionales: {len(BRANDS_REGIONAL)}")
print(f" Años: 1975-2026 | Lotes de {BATCH_SIZE} años")
print(f" Pausa entre lotes: {WAIT_TIME//60} minutos")
print(" >>> Presiona ENTER para saltar esperas <<<")
print("="*60)
# Verificar base de datos
if not os.path.exists(DB_PATH):
print(f"\n[ERROR] Base de datos no encontrada: {DB_PATH}")
print("Verifica que la ruta sea correcta.")
sys.exit(1)
print(f"\nBase de datos: {DB_PATH}")
conn = sqlite3.connect(DB_PATH)
# Menu principal
selected, start_batch, brands_list = show_main_menu(conn)
if selected is None:
print("\nSaliendo...")
conn.close()
return
grand_total_saved = 0
grand_total_found = 0
brand_stats = {}
# Determinar que marcas procesar
if selected == 'ALL':
brands_to_process = brands_list or all_brands
start_batches = {brand: 1 for brand in brands_to_process}
elif selected == 'MULTIPLE':
brands_to_process = brands_list
start_batches = {brand: 1 for brand in brands_to_process}
elif selected:
brands_to_process = [selected]
start_batches = {selected: start_batch}
else:
conn.close()
return
print(f"\n{'='*60}")
print(f" INICIANDO PROCESAMIENTO")
print(f" Marcas a procesar: {len(brands_to_process)}")
print(f"{'='*60}")
for idx, brand in enumerate(brands_to_process, 1):
print(f"\n[{idx}/{len(brands_to_process)}] ", end="")
saved, found = process_brand(conn, brand, start_batches.get(brand, 1))
brand_stats[brand] = {'saved': saved, 'found': found}
grand_total_saved += saved
grand_total_found += found
# Pausa entre marcas (si hay otra marca por procesar)
if idx < len(brands_to_process):
next_brand = brands_to_process[idx]
wait_with_skip(WAIT_TIME, f"PAUSA ENTRE MARCAS - Siguiente: {next_brand}")
conn.close()
print("\n" + "="*60)
print(" RESUMEN FINAL")
print("="*60)
# Solo mostrar marcas con datos
brands_with_data = {k: v for k, v in brand_stats.items() if v['found'] > 0 or v['saved'] > 0}
if brands_with_data:
for brand, stats in brands_with_data.items():
print(f" {brand}:")
print(f" Encontrados: {stats['found']}")
print(f" Nuevos guardados: {stats['saved']}")
else:
print(" No se encontraron nuevos datos")
print("-"*60)
print(f" TOTAL:")
print(f" Marcas procesadas: {len(brands_to_process)}")
print(f" Vehiculos encontrados: {grand_total_found}")
print(f" Nuevos guardados: {grand_total_saved}")
print("="*60)
if __name__ == "__main__":
main()