#!/usr/bin/env python3 """ RockAuto Vehicle Data Scraper v2 Extrae información de vehículos (marcas, años, modelos, motores) de RockAuto.com """ import requests from bs4 import BeautifulSoup import time import random import sqlite3 import re import sys from typing import List, Dict, Set, Optional from urllib.parse import unquote class RockAutoScraperV2: def __init__(self, db_path: str = "../vehicle_database/vehicle_database.db"): self.base_url = "https://www.rockauto.com/en/catalog" self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', }) self.db_path = db_path self.delay_range = (1, 2) # Segundos entre peticiones def _delay(self): """Pausa respetuosa entre peticiones""" time.sleep(random.uniform(*self.delay_range)) def _get_soup(self, url: str) -> Optional[BeautifulSoup]: """Obtiene y parsea una página""" try: self._delay() response = self.session.get(url, timeout=30) response.raise_for_status() return BeautifulSoup(response.content, 'html.parser') except Exception as e: print(f" Error al obtener {url}: {e}") return None def _clean_name(self, name: str) -> str: """Limpia y formatea un nombre""" name = unquote(name.replace('+', ' ')) name = re.sub(r'\s+', ' ', name).strip() return name.upper() def get_all_brands(self) -> List[str]: """Obtiene todas las marcas disponibles""" print("Obteniendo lista de marcas...") soup = self._get_soup(f"{self.base_url}/") if not soup: return [] brands = set() links = soup.find_all('a', href=True) for link in links: href = link['href'] # Buscar enlaces como /en/catalog/MARCA match = re.match(r'/en/catalog/([^,/]+)$', href) if match: brand = self._clean_name(match.group(1)) if brand and len(brand) > 1 and not brand.isdigit(): brands.add(brand) brands_list = sorted(brands) print(f" Encontradas {len(brands_list)} marcas") return brands_list def get_years_for_brand(self, brand: str) -> List[int]: """Obtiene los años disponibles para una marca""" brand_url = brand.lower().replace(' ', '+') soup = self._get_soup(f"{self.base_url}/{brand_url}") if not soup: return [] years = set() links = soup.find_all('a', href=True) for link in links: href = link['href'] # Buscar patrones como /catalog/brand,YEAR match = re.search(rf'/catalog/{re.escape(brand_url)},(\d{{4}})', href, re.IGNORECASE) if match: year = int(match.group(1)) if 1900 < year <= 2030: years.add(year) return sorted(years, reverse=True) def get_models_for_brand_year(self, brand: str, year: int) -> List[str]: """Obtiene los modelos para una marca y año""" brand_url = brand.lower().replace(' ', '+') soup = self._get_soup(f"{self.base_url}/{brand_url},{year}") if not soup: return [] models = set() links = soup.find_all('a', href=True) for link in links: href = link['href'] # Buscar patrones como /catalog/brand,year,MODEL pattern = rf'/catalog/{re.escape(brand_url)},{year},([^,/]+)' match = re.search(pattern, href, re.IGNORECASE) if match: model = self._clean_name(match.group(1)) if model and len(model) > 0 and not model.isdigit(): models.add(model) return sorted(models) def get_engines_for_vehicle(self, brand: str, year: int, model: str) -> List[str]: """Obtiene los motores para un vehículo específico""" brand_url = brand.lower().replace(' ', '+') model_url = model.lower().replace(' ', '+') soup = self._get_soup(f"{self.base_url}/{brand_url},{year},{model_url}") if not soup: return [] engines = set() links = soup.find_all('a', href=True) for link in links: href = link['href'] text = link.get_text().strip() # Buscar patrones de motor en el href pattern = rf'/catalog/{re.escape(brand_url)},{year},{re.escape(model_url)},([^,/]+)' match = re.search(pattern, href, re.IGNORECASE) if match: engine = self._clean_name(match.group(1)) # Filtrar solo motores válidos (contienen L, V, cilindros, etc.) if engine and re.search(r'\d+\.?\d*L|V\d|I\d|HYBRID|ELECTRIC|DIESEL', engine, re.IGNORECASE): engines.add(engine) return sorted(engines) def scrape_brand(self, brand: str, max_years: int = None, max_models_per_year: int = None) -> List[Dict]: """Extrae todos los vehículos de una marca""" print(f"\n{'='*50}") print(f"Procesando marca: {brand}") print('='*50) vehicles = [] # Obtener años years = self.get_years_for_brand(brand) if max_years: years = years[:max_years] print(f" Años encontrados: {len(years)}") for year in years: print(f"\n Año {year}:") # Obtener modelos models = self.get_models_for_brand_year(brand, year) if max_models_per_year: models = models[:max_models_per_year] print(f" Modelos: {len(models)}") for model in models: # Obtener motores engines = self.get_engines_for_vehicle(brand, year, model) if engines: for engine in engines: vehicle = { 'brand': brand, 'year': year, 'model': model, 'engine': engine } vehicles.append(vehicle) print(f" {model} - {engine}") else: # Si no hay motores específicos, agregar con motor genérico vehicle = { 'brand': brand, 'year': year, 'model': model, 'engine': 'Standard' } vehicles.append(vehicle) print(f" {model} - (sin motor específico)") print(f"\n Total vehículos para {brand}: {len(vehicles)}") return vehicles def save_to_database(self, vehicles: List[Dict]): """Guarda los vehículos en la base de datos""" if not vehicles: print("No hay vehículos para guardar") return print(f"\nGuardando {len(vehicles)} vehículos en la base de datos...") conn = sqlite3.connect(self.db_path) cursor = conn.cursor() saved = 0 skipped = 0 for vehicle in vehicles: try: # Insertar o obtener marca cursor.execute( "INSERT OR IGNORE INTO brands (name) VALUES (?)", (vehicle['brand'],) ) cursor.execute("SELECT id FROM brands WHERE name = ?", (vehicle['brand'],)) brand_id = cursor.fetchone()[0] # Insertar o obtener año cursor.execute( "INSERT OR IGNORE INTO years (year) VALUES (?)", (vehicle['year'],) ) cursor.execute("SELECT id FROM years WHERE year = ?", (vehicle['year'],)) year_id = cursor.fetchone()[0] # Insertar o obtener motor cursor.execute( "INSERT OR IGNORE INTO engines (name) VALUES (?)", (vehicle['engine'],) ) cursor.execute("SELECT id FROM engines WHERE name = ?", (vehicle['engine'],)) engine_id = cursor.fetchone()[0] # Insertar o obtener modelo cursor.execute( "INSERT OR IGNORE INTO models (brand_id, name) VALUES (?, ?)", (brand_id, vehicle['model']) ) cursor.execute( "SELECT id FROM models WHERE brand_id = ? AND name = ?", (brand_id, vehicle['model']) ) model_id = cursor.fetchone()[0] # Insertar relación modelo-año-motor cursor.execute( """INSERT OR IGNORE INTO model_year_engine (model_id, year_id, engine_id) VALUES (?, ?, ?)""", (model_id, year_id, engine_id) ) if cursor.rowcount > 0: saved += 1 else: skipped += 1 except Exception as e: print(f" Error guardando {vehicle}: {e}") skipped += 1 conn.commit() conn.close() print(f" Guardados: {saved}, Omitidos (duplicados): {skipped}") def scrape_multiple_brands(self, brands: List[str], **kwargs) -> List[Dict]: """Extrae vehículos de múltiples marcas""" all_vehicles = [] for i, brand in enumerate(brands, 1): print(f"\n[{i}/{len(brands)}] ", end="") vehicles = self.scrape_brand(brand, **kwargs) all_vehicles.extend(vehicles) return all_vehicles def main(): import argparse parser = argparse.ArgumentParser(description='Scraper de vehículos de RockAuto') parser.add_argument('--brands', nargs='+', help='Marcas específicas a extraer') parser.add_argument('--all-brands', action='store_true', help='Extraer todas las marcas') parser.add_argument('--max-years', type=int, default=5, help='Máximo de años por marca (default: 5)') parser.add_argument('--max-models', type=int, help='Máximo de modelos por año') parser.add_argument('--list-brands', action='store_true', help='Solo listar marcas disponibles') parser.add_argument('--db', default='../vehicle_database/vehicle_database.db', help='Ruta a la base de datos') args = parser.parse_args() scraper = RockAutoScraperV2(db_path=args.db) if args.list_brands: brands = scraper.get_all_brands() print("\nMarcas disponibles en RockAuto:") for i, brand in enumerate(brands, 1): print(f" {i:3}. {brand}") print(f"\nTotal: {len(brands)} marcas") return # Determinar qué marcas procesar if args.brands: brands_to_scrape = [b.upper() for b in args.brands] elif args.all_brands: brands_to_scrape = scraper.get_all_brands() else: # Por defecto, algunas marcas populares brands_to_scrape = ['TOYOTA', 'HONDA', 'FORD', 'CHEVROLET', 'NISSAN'] print(f"\nMarcas a procesar: {', '.join(brands_to_scrape)}") print(f"Máximo años por marca: {args.max_years}") if args.max_models: print(f"Máximo modelos por año: {args.max_models}") # Extraer datos vehicles = scraper.scrape_multiple_brands( brands_to_scrape, max_years=args.max_years, max_models_per_year=args.max_models ) # Guardar en base de datos if vehicles: scraper.save_to_database(vehicles) print(f"\n{'='*50}") print("RESUMEN") print('='*50) print(f"Total de vehículos extraídos: {len(vehicles)}") # Estadísticas brands_count = len(set(v['brand'] for v in vehicles)) models_count = len(set(f"{v['brand']}-{v['model']}" for v in vehicles)) years_range = f"{min(v['year'] for v in vehicles)} - {max(v['year'] for v in vehicles)}" print(f"Marcas: {brands_count}") print(f"Modelos únicos: {models_count}") print(f"Rango de años: {years_range}") else: print("\nNo se encontraron vehículos") if __name__ == "__main__": main()