Files
Autoparts-DB/vehicle_scraper/scrape_nissan_ford_chevrolet.py
consultoria-as f395d67136 Initial commit: Sistema Autoparts DB
- Base de datos SQLite con información de vehículos
- Dashboard web con Flask y Bootstrap
- Scripts de web scraping para RockAuto
- Interfaz CLI para consultas
- Documentación completa del proyecto

Incluye:
- 12 marcas de vehículos
- 10,923 modelos
- 10,919 especificaciones de motores
- 12,075 combinaciones modelo-año-motor
2026-01-19 08:45:03 +00:00

394 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Scraper de Ford y Chevrolet
- Procesa de 5 en 5 años
- Espera 3 minutos (180 segundos) entre lotes para activar VPN
- Presiona ENTER para saltar la espera
- Años: 1975-2026
"""
import requests
from bs4 import BeautifulSoup
import sqlite3
import time
import re
import os
import sys
import threading
from urllib.parse import unquote
# Detectar ruta base del proyecto
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
if os.path.basename(SCRIPT_DIR) == "vehicle_scraper":
BASE_DIR = os.path.dirname(SCRIPT_DIR)
else:
BASE_DIR = SCRIPT_DIR
DB_PATH = os.path.join(BASE_DIR, "vehicle_database", "vehicle_database.db")
BASE_URL = "https://www.rockauto.com/en/catalog"
# Marcas a scrapear (Nissan ya fue procesado)
BRANDS = ["FORD", "CHEVROLET"]
# Años de 1975 a 2026 (orden descendente)
ALL_YEARS = list(range(2026, 1974, -1))
# Configuración de lotes
BATCH_SIZE = 5 # años por lote
WAIT_TIME = 180 # 3 minutos entre lotes
session = requests.Session()
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml',
'Accept-Language': 'en-US,en;q=0.9',
})
# Variable global para controlar salto de espera
skip_wait = False
def wait_with_skip(seconds, message=""):
"""Espera que se puede saltar presionando ENTER"""
global skip_wait
skip_wait = False
print(f"\n{'*'*60}")
print(f" {message}")
print(f" ACTIVA/CAMBIA EL VPN AHORA")
print(f" >>> Presiona ENTER para saltar la espera <<<")
print(f"{'*'*60}")
# Usar threading para detectar input
def check_input():
global skip_wait
try:
input()
skip_wait = True
except:
pass
input_thread = threading.Thread(target=check_input, daemon=True)
input_thread.start()
for sec in range(seconds, 0, -1):
if skip_wait:
print(f"\n >>> ESPERA SALTADA <<<")
return
mins = sec // 60
secs = sec % 60
print(f"\r Continuando en {mins}:{secs:02d}... (ENTER para saltar) ", end="", flush=True)
time.sleep(1)
print()
def clean_name(name):
name = unquote(name.replace('+', ' '))
return re.sub(r'\s+', ' ', name).strip().upper()
def get_soup(url, retries=3):
for attempt in range(retries):
try:
time.sleep(0.5)
response = session.get(url, timeout=15)
if response.status_code == 200:
return BeautifulSoup(response.content, 'html.parser')
elif response.status_code == 403:
print(f"\n [!] Bloqueado (403) - Cambia el VPN")
return None
except Exception as e:
if attempt < retries - 1:
time.sleep(3)
else:
print(f"\n Error: {e}")
return None
def get_models(brand, year):
brand_url = brand.lower().replace(' ', '+')
soup = get_soup(f"{BASE_URL}/{brand_url},{year}")
if not soup:
return []
models = set()
for link in soup.find_all('a', href=True):
pattern = rf'/catalog/{re.escape(brand_url)},{year},([^,/]+)'
match = re.search(pattern, link['href'], re.I)
if match:
model = clean_name(match.group(1))
if model and not model.isdigit() and len(model) > 1:
models.add(model)
return sorted(models)
def get_engines(brand, year, model):
brand_url = brand.lower().replace(' ', '+')
model_url = model.lower().replace(' ', '+')
soup = get_soup(f"{BASE_URL}/{brand_url},{year},{model_url}")
if not soup:
return ['STANDARD']
engines = set()
for link in soup.find_all('a', href=True):
pattern = rf'/catalog/{re.escape(brand_url)},{year},{re.escape(model_url)},([^,/]+)'
match = re.search(pattern, link['href'], re.I)
if match:
engine = clean_name(match.group(1))
if engine and re.search(r'\d+\.?\d*L|V\d|I\d|H\d|HYBRID|ELECTRIC|DIESEL', engine, re.I):
engines.add(engine)
return sorted(engines) if engines else ['STANDARD']
def save_to_db(conn, brand, year, model, engine):
cursor = conn.cursor()
try:
cursor.execute("INSERT OR IGNORE INTO brands (name) VALUES (?)", (brand,))
cursor.execute("SELECT id FROM brands WHERE name = ?", (brand,))
brand_id = cursor.fetchone()[0]
cursor.execute("INSERT OR IGNORE INTO years (year) VALUES (?)", (year,))
cursor.execute("SELECT id FROM years WHERE year = ?", (year,))
year_id = cursor.fetchone()[0]
cursor.execute("INSERT OR IGNORE INTO engines (name) VALUES (?)", (engine,))
cursor.execute("SELECT id FROM engines WHERE name = ?", (engine,))
engine_id = cursor.fetchone()[0]
cursor.execute("INSERT OR IGNORE INTO models (brand_id, name) VALUES (?, ?)", (brand_id, model))
cursor.execute("SELECT id FROM models WHERE brand_id = ? AND name = ?", (brand_id, model))
model_id = cursor.fetchone()[0]
cursor.execute(
"INSERT OR IGNORE INTO model_year_engine (model_id, year_id, engine_id) VALUES (?, ?, ?)",
(model_id, year_id, engine_id)
)
return cursor.rowcount > 0
except Exception as e:
print(f" DB Error: {e}")
return False
def get_existing_years(conn, brand):
"""Obtiene los años que ya existen para esta marca"""
cursor = conn.cursor()
cursor.execute("""
SELECT DISTINCT y.year
FROM years y
JOIN model_year_engine mye ON y.id = mye.year_id
JOIN models m ON mye.model_id = m.id
JOIN brands b ON m.brand_id = b.id
WHERE b.name = ?
""", (brand,))
return set(row[0] for row in cursor.fetchall())
def process_batch(conn, brand, years_batch, batch_num, total_batches):
"""Procesa un lote de 5 años"""
print(f"\n{'='*60}")
print(f"[{brand}] LOTE {batch_num}/{total_batches}: Años {years_batch}")
print('='*60)
batch_saved = 0
batch_total = 0
for year in years_batch:
print(f"\n[{brand} - Año {year}] Obteniendo modelos... ", end="", flush=True)
models = get_models(brand, year)
print(f"{len(models)} modelos encontrados")
if not models:
print(f" No se encontraron modelos para {year}")
continue
for model in models:
engines = get_engines(brand, year, model)
for engine in engines:
batch_total += 1
if save_to_db(conn, brand, year, model, engine):
batch_saved += 1
print(f" {model} - {engine}")
# Guardar cambios del lote
conn.commit()
print(f"\n>> Lote {batch_num} completado: {batch_saved} nuevos de {batch_total} encontrados")
return batch_saved, batch_total
def get_brand_batches(conn, brand):
"""Obtiene los lotes disponibles para una marca"""
existing = get_existing_years(conn, brand)
years_to_process = [y for y in ALL_YEARS if y not in existing]
if not years_to_process:
return [], existing
batches = [years_to_process[i:i+BATCH_SIZE] for i in range(0, len(years_to_process), BATCH_SIZE)]
return batches, existing
def process_brand(conn, brand, start_batch=1):
"""Procesa una marca completa desde un lote específico"""
print(f"\n{'#'*60}")
print(f" PROCESANDO MARCA: {brand}")
print(f"{'#'*60}")
# Verificar qué años ya existen
existing = get_existing_years(conn, brand)
print(f"Años existentes de {brand}: {len(existing)} años")
if existing:
print(f" Rango existente: {min(existing)}-{max(existing)}")
# Filtrar solo los que faltan
years_to_process = [y for y in ALL_YEARS if y not in existing]
if not years_to_process:
print(f"\n[OK] {brand}: Todos los años ya están en la base de datos!")
return 0, 0
print(f"\nAños por procesar para {brand}: {len(years_to_process)}")
print(f" De {max(years_to_process)} a {min(years_to_process)}")
# Dividir en lotes de 5
batches = [years_to_process[i:i+BATCH_SIZE] for i in range(0, len(years_to_process), BATCH_SIZE)]
total_batches = len(batches)
print(f"Lotes de {BATCH_SIZE} años: {total_batches} lotes")
if start_batch > 1:
print(f"\n>>> Comenzando desde el lote {start_batch} <<<")
total_saved = 0
total_found = 0
for i, batch in enumerate(batches, 1):
# Saltar lotes anteriores al inicial
if i < start_batch:
continue
saved, found = process_batch(conn, brand, batch, i, total_batches)
total_saved += saved
total_found += found
# Si no es el último lote, esperar para cambiar VPN
if i < total_batches:
wait_with_skip(WAIT_TIME, f"PAUSA DE {WAIT_TIME//60} MINUTOS - [{brand}] Lotes restantes: {total_batches - i}")
return total_saved, total_found
def show_batch_menu(conn):
"""Muestra menú para seleccionar marca y lote inicial"""
print("\n" + "="*60)
print(" MENÚ DE SELECCIÓN DE LOTES")
print("="*60)
brand_info = {}
for i, brand in enumerate(BRANDS, 1):
batches, existing = get_brand_batches(conn, brand)
brand_info[brand] = {'batches': batches, 'existing': existing}
if batches:
print(f"\n {i}. {brand}")
print(f" Años existentes: {len(existing)}")
print(f" Lotes pendientes: {len(batches)}")
for j, batch in enumerate(batches, 1):
print(f" Lote {j}: años {batch[0]}-{batch[-1]}")
else:
print(f"\n {i}. {brand} - [COMPLETO]")
print(f"\n 0. Procesar todo desde el inicio")
print("="*60)
# Seleccionar marca
while True:
try:
choice = input("\nSelecciona marca (0 para todo): ").strip()
if choice == '0' or choice == '':
return None, 1 # Procesar todo
brand_idx = int(choice) - 1
if 0 <= brand_idx < len(BRANDS):
selected_brand = BRANDS[brand_idx]
break
print("Opción inválida")
except ValueError:
print("Ingresa un número válido")
batches = brand_info[selected_brand]['batches']
if not batches:
print(f"\n{selected_brand} ya está completo!")
return selected_brand, 1
# Seleccionar lote
print(f"\n--- Lotes de {selected_brand} ---")
for j, batch in enumerate(batches, 1):
print(f" {j}. Lote {j}: años {batch[0]}-{batch[-1]}")
while True:
try:
batch_choice = input(f"\nComenzar desde lote (1-{len(batches)}): ").strip()
if batch_choice == '':
return selected_brand, 1
batch_num = int(batch_choice)
if 1 <= batch_num <= len(batches):
return selected_brand, batch_num
print(f"Ingresa un número entre 1 y {len(batches)}")
except ValueError:
print("Ingresa un número válido")
def main():
print("="*60)
print(" SCRAPER FORD, CHEVROLET")
print(f" Años: 1975-2026 | Lotes de {BATCH_SIZE} años")
print(f" Pausa entre lotes: {WAIT_TIME//60} minutos")
print(" >>> Presiona ENTER para saltar esperas <<<")
print("="*60)
# Verificar base de datos
if not os.path.exists(DB_PATH):
print(f"\n[ERROR] Base de datos no encontrada: {DB_PATH}")
print("Verifica que la ruta sea correcta.")
sys.exit(1)
print(f"\nBase de datos: {DB_PATH}")
conn = sqlite3.connect(DB_PATH)
# Mostrar estado inicial
print(f"\nMarcas a procesar: {', '.join(BRANDS)}")
print(f"Rango de años: {min(ALL_YEARS)}-{max(ALL_YEARS)} ({len(ALL_YEARS)} años)")
# Menú de selección de lotes
selected_brand, start_batch = show_batch_menu(conn)
grand_total_saved = 0
grand_total_found = 0
brand_stats = {}
# Determinar qué marcas procesar
if selected_brand:
# Solo procesar la marca seleccionada desde el lote indicado
brands_to_process = [selected_brand]
start_batches = {selected_brand: start_batch}
else:
# Procesar todas las marcas desde el inicio
brands_to_process = BRANDS
start_batches = {brand: 1 for brand in BRANDS}
for brand in brands_to_process:
saved, found = process_brand(conn, brand, start_batches.get(brand, 1))
brand_stats[brand] = {'saved': saved, 'found': found}
grand_total_saved += saved
grand_total_found += found
# Pausa entre marcas (si hay otra marca por procesar)
if brand != brands_to_process[-1]:
wait_with_skip(WAIT_TIME, f"PAUSA ENTRE MARCAS - Siguiente: {brands_to_process[brands_to_process.index(brand)+1]}")
conn.close()
print("\n" + "="*60)
print(" RESUMEN FINAL")
print("="*60)
for brand, stats in brand_stats.items():
print(f" {brand}:")
print(f" Encontrados: {stats['found']}")
print(f" Nuevos guardados: {stats['saved']}")
print("-"*60)
print(f" TOTAL:")
print(f" Vehículos encontrados: {grand_total_found}")
print(f" Nuevos guardados: {grand_total_saved}")
print("="*60)
if __name__ == "__main__":
main()