New brands: Dodge, Honda, Mitsubishi, Jeep, BMW, Fiat, Hyundai, Infiniti, Kia, Land Rover, Lexus Features: - Interactive menu to select brand and batch - Skip wait time by pressing ENTER - Years range: 1975-2026 - 5 years per batch with 3 min pause
502 lines
16 KiB
Python
Executable File
502 lines
16 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Scraper Multimarca v2
|
|
- Marcas: Dodge, Honda, Mitsubishi, Jeep, BMW, Fiat, Hyundai, Infiniti, Kia, Land Rover, Lexus
|
|
- Procesa de 5 en 5 años
|
|
- Espera 3 minutos entre lotes (saltable con cualquier tecla)
|
|
- Menú interactivo para seleccionar marca y lote
|
|
- Años: 1975-2026
|
|
"""
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
import sqlite3
|
|
import time
|
|
import re
|
|
import os
|
|
import sys
|
|
import select
|
|
from urllib.parse import unquote
|
|
|
|
# Detectar ruta base del proyecto
|
|
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
if os.path.basename(SCRIPT_DIR) == "vehicle_scraper":
|
|
BASE_DIR = os.path.dirname(SCRIPT_DIR)
|
|
else:
|
|
BASE_DIR = SCRIPT_DIR
|
|
DB_PATH = os.path.join(BASE_DIR, "vehicle_database", "vehicle_database.db")
|
|
|
|
BASE_URL = "https://www.rockauto.com/en/catalog"
|
|
|
|
# Marcas a scrapear
|
|
BRANDS = [
|
|
"DODGE",
|
|
"HONDA",
|
|
"MITSUBISHI",
|
|
"JEEP",
|
|
"BMW",
|
|
"FIAT",
|
|
"HYUNDAI",
|
|
"INFINITI",
|
|
"KIA",
|
|
"LAND ROVER",
|
|
"LEXUS"
|
|
]
|
|
|
|
# Años de 1975 a 2026 (orden descendente)
|
|
ALL_YEARS = list(range(2026, 1974, -1))
|
|
|
|
# Configuración de lotes
|
|
BATCH_SIZE = 5 # años por lote
|
|
WAIT_TIME = 180 # 3 minutos entre lotes
|
|
|
|
session = requests.Session()
|
|
session.headers.update({
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
|
'Accept': 'text/html,application/xhtml+xml',
|
|
'Accept-Language': 'en-US,en;q=0.9',
|
|
})
|
|
|
|
|
|
def check_key_press():
|
|
"""Verifica si se presionó alguna tecla (non-blocking)"""
|
|
if sys.platform == 'win32':
|
|
import msvcrt
|
|
if msvcrt.kbhit():
|
|
msvcrt.getch()
|
|
return True
|
|
return False
|
|
else:
|
|
# Linux/Mac
|
|
rlist, _, _ = select.select([sys.stdin], [], [], 0)
|
|
if rlist:
|
|
sys.stdin.readline()
|
|
return True
|
|
return False
|
|
|
|
|
|
def wait_with_skip(seconds, message=""):
|
|
"""Espera que se puede saltar presionando cualquier tecla"""
|
|
print(f"\n{'*'*60}")
|
|
print(f" {message}")
|
|
print(f" ACTIVA/CAMBIA EL VPN AHORA")
|
|
print(f" >>> Presiona ENTER para saltar la espera <<<")
|
|
print(f"{'*'*60}")
|
|
|
|
# Limpiar buffer de entrada
|
|
if sys.platform != 'win32':
|
|
import termios
|
|
try:
|
|
termios.tcflush(sys.stdin, termios.TCIFLUSH)
|
|
except:
|
|
pass
|
|
|
|
for sec in range(seconds, 0, -1):
|
|
if check_key_press():
|
|
print(f"\n >>> ESPERA SALTADA <<<")
|
|
return
|
|
mins = sec // 60
|
|
secs = sec % 60
|
|
print(f"\r Continuando en {mins}:{secs:02d}... (ENTER para saltar) ", end="", flush=True)
|
|
time.sleep(1)
|
|
print()
|
|
|
|
|
|
def clean_name(name):
|
|
name = unquote(name.replace('+', ' '))
|
|
return re.sub(r'\s+', ' ', name).strip().upper()
|
|
|
|
|
|
def get_soup(url, retries=3):
|
|
for attempt in range(retries):
|
|
try:
|
|
time.sleep(0.5)
|
|
response = session.get(url, timeout=15)
|
|
if response.status_code == 200:
|
|
return BeautifulSoup(response.content, 'html.parser')
|
|
elif response.status_code == 403:
|
|
print(f"\n [!] Bloqueado (403) - Cambia el VPN")
|
|
return None
|
|
except Exception as e:
|
|
if attempt < retries - 1:
|
|
time.sleep(3)
|
|
else:
|
|
print(f"\n Error: {e}")
|
|
return None
|
|
|
|
|
|
def get_models(brand, year):
|
|
brand_url = brand.lower().replace(' ', '+')
|
|
soup = get_soup(f"{BASE_URL}/{brand_url},{year}")
|
|
if not soup:
|
|
return []
|
|
|
|
models = set()
|
|
for link in soup.find_all('a', href=True):
|
|
pattern = rf'/catalog/{re.escape(brand_url)},{year},([^,/]+)'
|
|
match = re.search(pattern, link['href'], re.I)
|
|
if match:
|
|
model = clean_name(match.group(1))
|
|
if model and not model.isdigit() and len(model) > 1:
|
|
models.add(model)
|
|
return sorted(models)
|
|
|
|
|
|
def get_engines(brand, year, model):
|
|
brand_url = brand.lower().replace(' ', '+')
|
|
model_url = model.lower().replace(' ', '+')
|
|
soup = get_soup(f"{BASE_URL}/{brand_url},{year},{model_url}")
|
|
if not soup:
|
|
return ['STANDARD']
|
|
|
|
engines = set()
|
|
for link in soup.find_all('a', href=True):
|
|
pattern = rf'/catalog/{re.escape(brand_url)},{year},{re.escape(model_url)},([^,/]+)'
|
|
match = re.search(pattern, link['href'], re.I)
|
|
if match:
|
|
engine = clean_name(match.group(1))
|
|
if engine and re.search(r'\d+\.?\d*L|V\d|I\d|H\d|HYBRID|ELECTRIC|DIESEL', engine, re.I):
|
|
engines.add(engine)
|
|
return sorted(engines) if engines else ['STANDARD']
|
|
|
|
|
|
def save_to_db(conn, brand, year, model, engine):
|
|
cursor = conn.cursor()
|
|
try:
|
|
cursor.execute("INSERT OR IGNORE INTO brands (name) VALUES (?)", (brand,))
|
|
cursor.execute("SELECT id FROM brands WHERE name = ?", (brand,))
|
|
brand_id = cursor.fetchone()[0]
|
|
|
|
cursor.execute("INSERT OR IGNORE INTO years (year) VALUES (?)", (year,))
|
|
cursor.execute("SELECT id FROM years WHERE year = ?", (year,))
|
|
year_id = cursor.fetchone()[0]
|
|
|
|
cursor.execute("INSERT OR IGNORE INTO engines (name) VALUES (?)", (engine,))
|
|
cursor.execute("SELECT id FROM engines WHERE name = ?", (engine,))
|
|
engine_id = cursor.fetchone()[0]
|
|
|
|
cursor.execute("INSERT OR IGNORE INTO models (brand_id, name) VALUES (?, ?)", (brand_id, model))
|
|
cursor.execute("SELECT id FROM models WHERE brand_id = ? AND name = ?", (brand_id, model))
|
|
model_id = cursor.fetchone()[0]
|
|
|
|
cursor.execute(
|
|
"INSERT OR IGNORE INTO model_year_engine (model_id, year_id, engine_id) VALUES (?, ?, ?)",
|
|
(model_id, year_id, engine_id)
|
|
)
|
|
return cursor.rowcount > 0
|
|
except Exception as e:
|
|
print(f" DB Error: {e}")
|
|
return False
|
|
|
|
|
|
def get_existing_years(conn, brand):
|
|
"""Obtiene los años que ya existen para esta marca"""
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT DISTINCT y.year
|
|
FROM years y
|
|
JOIN model_year_engine mye ON y.id = mye.year_id
|
|
JOIN models m ON mye.model_id = m.id
|
|
JOIN brands b ON m.brand_id = b.id
|
|
WHERE b.name = ?
|
|
""", (brand,))
|
|
return set(row[0] for row in cursor.fetchall())
|
|
|
|
|
|
def process_batch(conn, brand, years_batch, batch_num, total_batches):
|
|
"""Procesa un lote de 5 años"""
|
|
print(f"\n{'='*60}")
|
|
print(f"[{brand}] LOTE {batch_num}/{total_batches}: Años {years_batch}")
|
|
print('='*60)
|
|
|
|
batch_saved = 0
|
|
batch_total = 0
|
|
|
|
for year in years_batch:
|
|
print(f"\n[{brand} - Año {year}] Obteniendo modelos... ", end="", flush=True)
|
|
models = get_models(brand, year)
|
|
print(f"{len(models)} modelos encontrados")
|
|
|
|
if not models:
|
|
print(f" No se encontraron modelos para {year}")
|
|
continue
|
|
|
|
for model in models:
|
|
engines = get_engines(brand, year, model)
|
|
for engine in engines:
|
|
batch_total += 1
|
|
if save_to_db(conn, brand, year, model, engine):
|
|
batch_saved += 1
|
|
print(f" {model} - {engine}")
|
|
|
|
# Guardar cambios del lote
|
|
conn.commit()
|
|
print(f"\n>> Lote {batch_num} completado: {batch_saved} nuevos de {batch_total} encontrados")
|
|
return batch_saved, batch_total
|
|
|
|
|
|
def get_brand_batches(conn, brand):
|
|
"""Obtiene los lotes disponibles para una marca"""
|
|
existing = get_existing_years(conn, brand)
|
|
years_to_process = [y for y in ALL_YEARS if y not in existing]
|
|
if not years_to_process:
|
|
return [], existing
|
|
batches = [years_to_process[i:i+BATCH_SIZE] for i in range(0, len(years_to_process), BATCH_SIZE)]
|
|
return batches, existing
|
|
|
|
|
|
def process_brand(conn, brand, start_batch=1):
|
|
"""Procesa una marca completa desde un lote específico"""
|
|
print(f"\n{'#'*60}")
|
|
print(f" PROCESANDO MARCA: {brand}")
|
|
print(f"{'#'*60}")
|
|
|
|
# Verificar qué años ya existen
|
|
existing = get_existing_years(conn, brand)
|
|
print(f"Años existentes de {brand}: {len(existing)} años")
|
|
if existing:
|
|
print(f" Rango existente: {min(existing)}-{max(existing)}")
|
|
|
|
# Filtrar solo los que faltan
|
|
years_to_process = [y for y in ALL_YEARS if y not in existing]
|
|
|
|
if not years_to_process:
|
|
print(f"\n[OK] {brand}: Todos los años ya están en la base de datos!")
|
|
return 0, 0
|
|
|
|
print(f"\nAños por procesar para {brand}: {len(years_to_process)}")
|
|
print(f" De {max(years_to_process)} a {min(years_to_process)}")
|
|
|
|
# Dividir en lotes de 5
|
|
batches = [years_to_process[i:i+BATCH_SIZE] for i in range(0, len(years_to_process), BATCH_SIZE)]
|
|
total_batches = len(batches)
|
|
|
|
print(f"Lotes de {BATCH_SIZE} años: {total_batches} lotes")
|
|
|
|
if start_batch > 1:
|
|
print(f"\n>>> Comenzando desde el lote {start_batch} <<<")
|
|
|
|
total_saved = 0
|
|
total_found = 0
|
|
|
|
for i, batch in enumerate(batches, 1):
|
|
# Saltar lotes anteriores al inicial
|
|
if i < start_batch:
|
|
continue
|
|
|
|
saved, found = process_batch(conn, brand, batch, i, total_batches)
|
|
total_saved += saved
|
|
total_found += found
|
|
|
|
# Si no es el último lote, esperar para cambiar VPN
|
|
if i < total_batches:
|
|
wait_with_skip(WAIT_TIME, f"PAUSA DE {WAIT_TIME//60} MINUTOS - [{brand}] Lotes restantes: {total_batches - i}")
|
|
|
|
return total_saved, total_found
|
|
|
|
|
|
def show_main_menu(conn):
|
|
"""Muestra menú principal con opciones"""
|
|
while True:
|
|
print("\n" + "="*60)
|
|
print(" SCRAPER MULTIMARCA - MENU PRINCIPAL")
|
|
print("="*60)
|
|
print("\n Opciones:")
|
|
print(" 1. Ver estado de todas las marcas")
|
|
print(" 2. Seleccionar marca y lote específico")
|
|
print(" 3. Procesar todas las marcas pendientes")
|
|
print(" 0. Salir")
|
|
print("="*60)
|
|
|
|
choice = input("\nSelecciona opción: ").strip()
|
|
|
|
if choice == '0':
|
|
return None, None
|
|
elif choice == '1':
|
|
show_all_brands_status(conn)
|
|
elif choice == '2':
|
|
result = show_batch_menu(conn)
|
|
if result[0] is not None or result[1] is not None:
|
|
return result
|
|
elif choice == '3':
|
|
return 'ALL', 1
|
|
else:
|
|
print("Opción inválida")
|
|
|
|
|
|
def show_all_brands_status(conn):
|
|
"""Muestra estado de todas las marcas"""
|
|
print("\n" + "-"*60)
|
|
print(" ESTADO DE TODAS LAS MARCAS")
|
|
print("-"*60)
|
|
|
|
for brand in BRANDS:
|
|
batches, existing = get_brand_batches(conn, brand)
|
|
years_pending = sum(len(b) for b in batches)
|
|
status = "COMPLETO" if not batches else f"{years_pending} años pendientes ({len(batches)} lotes)"
|
|
print(f" {brand:15} | Existentes: {len(existing):3} | {status}")
|
|
|
|
print("-"*60)
|
|
input("\nPresiona ENTER para continuar...")
|
|
|
|
|
|
def show_batch_menu(conn):
|
|
"""Muestra menú para seleccionar marca y lote inicial"""
|
|
print("\n" + "="*60)
|
|
print(" SELECCIÓN DE MARCA Y LOTE")
|
|
print("="*60)
|
|
|
|
brand_info = {}
|
|
available_brands = []
|
|
|
|
for i, brand in enumerate(BRANDS, 1):
|
|
batches, existing = get_brand_batches(conn, brand)
|
|
brand_info[brand] = {'batches': batches, 'existing': existing}
|
|
|
|
if batches:
|
|
available_brands.append(brand)
|
|
print(f"\n {len(available_brands)}. {brand}")
|
|
print(f" Años existentes: {len(existing)}")
|
|
print(f" Lotes pendientes: {len(batches)}")
|
|
# Mostrar resumen de lotes
|
|
if len(batches) <= 5:
|
|
for j, batch in enumerate(batches, 1):
|
|
print(f" Lote {j}: años {batch[0]}-{batch[-1]}")
|
|
else:
|
|
print(f" Lote 1: años {batches[0][0]}-{batches[0][-1]}")
|
|
print(f" ...")
|
|
print(f" Lote {len(batches)}: años {batches[-1][0]}-{batches[-1][-1]}")
|
|
else:
|
|
print(f"\n -. {brand} - [COMPLETO]")
|
|
|
|
if not available_brands:
|
|
print("\n [!] Todas las marcas están completas!")
|
|
input("\nPresiona ENTER para continuar...")
|
|
return None, None
|
|
|
|
print(f"\n 0. Volver al menú principal")
|
|
print("="*60)
|
|
|
|
# Seleccionar marca
|
|
while True:
|
|
try:
|
|
choice = input("\nSelecciona marca (número): ").strip()
|
|
if choice == '0' or choice == '':
|
|
return None, None
|
|
|
|
brand_idx = int(choice) - 1
|
|
if 0 <= brand_idx < len(available_brands):
|
|
selected_brand = available_brands[brand_idx]
|
|
break
|
|
print(f"Opción inválida. Ingresa un número entre 1 y {len(available_brands)}")
|
|
except ValueError:
|
|
print("Ingresa un número válido")
|
|
|
|
batches = brand_info[selected_brand]['batches']
|
|
|
|
# Mostrar lotes disponibles
|
|
print(f"\n{'='*60}")
|
|
print(f" LOTES DISPONIBLES PARA {selected_brand}")
|
|
print(f"{'='*60}")
|
|
|
|
for j, batch in enumerate(batches, 1):
|
|
print(f" {j}. Lote {j}: años {batch[0]} - {batch[-1]} ({len(batch)} años)")
|
|
|
|
print(f"\n 0. Volver")
|
|
print("="*60)
|
|
|
|
# Seleccionar lote
|
|
while True:
|
|
try:
|
|
batch_choice = input(f"\nComenzar desde lote (1-{len(batches)}): ").strip()
|
|
if batch_choice == '0':
|
|
return None, None
|
|
if batch_choice == '':
|
|
return selected_brand, 1
|
|
|
|
batch_num = int(batch_choice)
|
|
if 1 <= batch_num <= len(batches):
|
|
return selected_brand, batch_num
|
|
print(f"Ingresa un número entre 1 y {len(batches)}")
|
|
except ValueError:
|
|
print("Ingresa un número válido")
|
|
|
|
|
|
def main():
|
|
print("="*60)
|
|
print(" SCRAPER MULTIMARCA v2")
|
|
print("="*60)
|
|
print(f" Marcas: {len(BRANDS)}")
|
|
for brand in BRANDS:
|
|
print(f" - {brand}")
|
|
print(f" Años: 1975-2026 | Lotes de {BATCH_SIZE} años")
|
|
print(f" Pausa entre lotes: {WAIT_TIME//60} minutos")
|
|
print(" >>> Presiona ENTER para saltar esperas <<<")
|
|
print("="*60)
|
|
|
|
# Verificar base de datos
|
|
if not os.path.exists(DB_PATH):
|
|
print(f"\n[ERROR] Base de datos no encontrada: {DB_PATH}")
|
|
print("Verifica que la ruta sea correcta.")
|
|
sys.exit(1)
|
|
|
|
print(f"\nBase de datos: {DB_PATH}")
|
|
|
|
conn = sqlite3.connect(DB_PATH)
|
|
|
|
# Menú principal
|
|
selected_brand, start_batch = show_main_menu(conn)
|
|
|
|
if selected_brand is None and start_batch is None:
|
|
print("\nSaliendo...")
|
|
conn.close()
|
|
return
|
|
|
|
grand_total_saved = 0
|
|
grand_total_found = 0
|
|
brand_stats = {}
|
|
|
|
# Determinar qué marcas procesar
|
|
if selected_brand == 'ALL':
|
|
# Procesar todas las marcas pendientes
|
|
brands_to_process = BRANDS
|
|
start_batches = {brand: 1 for brand in BRANDS}
|
|
elif selected_brand:
|
|
# Solo procesar la marca seleccionada desde el lote indicado
|
|
brands_to_process = [selected_brand]
|
|
start_batches = {selected_brand: start_batch}
|
|
else:
|
|
conn.close()
|
|
return
|
|
|
|
for brand in brands_to_process:
|
|
saved, found = process_brand(conn, brand, start_batches.get(brand, 1))
|
|
brand_stats[brand] = {'saved': saved, 'found': found}
|
|
grand_total_saved += saved
|
|
grand_total_found += found
|
|
|
|
# Pausa entre marcas (si hay otra marca por procesar)
|
|
if brand != brands_to_process[-1]:
|
|
next_brand = brands_to_process[brands_to_process.index(brand)+1]
|
|
wait_with_skip(WAIT_TIME, f"PAUSA ENTRE MARCAS - Siguiente: {next_brand}")
|
|
|
|
conn.close()
|
|
|
|
print("\n" + "="*60)
|
|
print(" RESUMEN FINAL")
|
|
print("="*60)
|
|
for brand, stats in brand_stats.items():
|
|
if stats['found'] > 0 or stats['saved'] > 0:
|
|
print(f" {brand}:")
|
|
print(f" Encontrados: {stats['found']}")
|
|
print(f" Nuevos guardados: {stats['saved']}")
|
|
print("-"*60)
|
|
print(f" TOTAL:")
|
|
print(f" Vehículos encontrados: {grand_total_found}")
|
|
print(f" Nuevos guardados: {grand_total_saved}")
|
|
print("="*60)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|