- Base de datos SQLite con información de vehículos - Dashboard web con Flask y Bootstrap - Scripts de web scraping para RockAuto - Interfaz CLI para consultas - Documentación completa del proyecto Incluye: - 12 marcas de vehículos - 10,923 modelos - 10,919 especificaciones de motores - 12,075 combinaciones modelo-año-motor
351 lines
12 KiB
Python
351 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
RockAuto Vehicle Data Scraper v2
|
|
Extrae información de vehículos (marcas, años, modelos, motores) de RockAuto.com
|
|
"""
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
import time
|
|
import random
|
|
import sqlite3
|
|
import re
|
|
import sys
|
|
from typing import List, Dict, Set, Optional
|
|
from urllib.parse import unquote
|
|
|
|
|
|
class RockAutoScraperV2:
|
|
def __init__(self, db_path: str = "../vehicle_database/vehicle_database.db"):
|
|
self.base_url = "https://www.rockauto.com/en/catalog"
|
|
self.session = requests.Session()
|
|
self.session.headers.update({
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
|
|
'Accept-Language': 'en-US,en;q=0.5',
|
|
})
|
|
self.db_path = db_path
|
|
self.delay_range = (1, 2) # Segundos entre peticiones
|
|
|
|
def _delay(self):
|
|
"""Pausa respetuosa entre peticiones"""
|
|
time.sleep(random.uniform(*self.delay_range))
|
|
|
|
def _get_soup(self, url: str) -> Optional[BeautifulSoup]:
|
|
"""Obtiene y parsea una página"""
|
|
try:
|
|
self._delay()
|
|
response = self.session.get(url, timeout=30)
|
|
response.raise_for_status()
|
|
return BeautifulSoup(response.content, 'html.parser')
|
|
except Exception as e:
|
|
print(f" Error al obtener {url}: {e}")
|
|
return None
|
|
|
|
def _clean_name(self, name: str) -> str:
|
|
"""Limpia y formatea un nombre"""
|
|
name = unquote(name.replace('+', ' '))
|
|
name = re.sub(r'\s+', ' ', name).strip()
|
|
return name.upper()
|
|
|
|
def get_all_brands(self) -> List[str]:
|
|
"""Obtiene todas las marcas disponibles"""
|
|
print("Obteniendo lista de marcas...")
|
|
soup = self._get_soup(f"{self.base_url}/")
|
|
|
|
if not soup:
|
|
return []
|
|
|
|
brands = set()
|
|
links = soup.find_all('a', href=True)
|
|
|
|
for link in links:
|
|
href = link['href']
|
|
# Buscar enlaces como /en/catalog/MARCA
|
|
match = re.match(r'/en/catalog/([^,/]+)$', href)
|
|
if match:
|
|
brand = self._clean_name(match.group(1))
|
|
if brand and len(brand) > 1 and not brand.isdigit():
|
|
brands.add(brand)
|
|
|
|
brands_list = sorted(brands)
|
|
print(f" Encontradas {len(brands_list)} marcas")
|
|
return brands_list
|
|
|
|
def get_years_for_brand(self, brand: str) -> List[int]:
|
|
"""Obtiene los años disponibles para una marca"""
|
|
brand_url = brand.lower().replace(' ', '+')
|
|
soup = self._get_soup(f"{self.base_url}/{brand_url}")
|
|
|
|
if not soup:
|
|
return []
|
|
|
|
years = set()
|
|
links = soup.find_all('a', href=True)
|
|
|
|
for link in links:
|
|
href = link['href']
|
|
# Buscar patrones como /catalog/brand,YEAR
|
|
match = re.search(rf'/catalog/{re.escape(brand_url)},(\d{{4}})', href, re.IGNORECASE)
|
|
if match:
|
|
year = int(match.group(1))
|
|
if 1900 < year <= 2030:
|
|
years.add(year)
|
|
|
|
return sorted(years, reverse=True)
|
|
|
|
def get_models_for_brand_year(self, brand: str, year: int) -> List[str]:
|
|
"""Obtiene los modelos para una marca y año"""
|
|
brand_url = brand.lower().replace(' ', '+')
|
|
soup = self._get_soup(f"{self.base_url}/{brand_url},{year}")
|
|
|
|
if not soup:
|
|
return []
|
|
|
|
models = set()
|
|
links = soup.find_all('a', href=True)
|
|
|
|
for link in links:
|
|
href = link['href']
|
|
# Buscar patrones como /catalog/brand,year,MODEL
|
|
pattern = rf'/catalog/{re.escape(brand_url)},{year},([^,/]+)'
|
|
match = re.search(pattern, href, re.IGNORECASE)
|
|
if match:
|
|
model = self._clean_name(match.group(1))
|
|
if model and len(model) > 0 and not model.isdigit():
|
|
models.add(model)
|
|
|
|
return sorted(models)
|
|
|
|
def get_engines_for_vehicle(self, brand: str, year: int, model: str) -> List[str]:
|
|
"""Obtiene los motores para un vehículo específico"""
|
|
brand_url = brand.lower().replace(' ', '+')
|
|
model_url = model.lower().replace(' ', '+')
|
|
soup = self._get_soup(f"{self.base_url}/{brand_url},{year},{model_url}")
|
|
|
|
if not soup:
|
|
return []
|
|
|
|
engines = set()
|
|
links = soup.find_all('a', href=True)
|
|
|
|
for link in links:
|
|
href = link['href']
|
|
text = link.get_text().strip()
|
|
|
|
# Buscar patrones de motor en el href
|
|
pattern = rf'/catalog/{re.escape(brand_url)},{year},{re.escape(model_url)},([^,/]+)'
|
|
match = re.search(pattern, href, re.IGNORECASE)
|
|
if match:
|
|
engine = self._clean_name(match.group(1))
|
|
# Filtrar solo motores válidos (contienen L, V, cilindros, etc.)
|
|
if engine and re.search(r'\d+\.?\d*L|V\d|I\d|HYBRID|ELECTRIC|DIESEL', engine, re.IGNORECASE):
|
|
engines.add(engine)
|
|
|
|
return sorted(engines)
|
|
|
|
def scrape_brand(self, brand: str, max_years: int = None, max_models_per_year: int = None) -> List[Dict]:
|
|
"""Extrae todos los vehículos de una marca"""
|
|
print(f"\n{'='*50}")
|
|
print(f"Procesando marca: {brand}")
|
|
print('='*50)
|
|
|
|
vehicles = []
|
|
|
|
# Obtener años
|
|
years = self.get_years_for_brand(brand)
|
|
if max_years:
|
|
years = years[:max_years]
|
|
|
|
print(f" Años encontrados: {len(years)}")
|
|
|
|
for year in years:
|
|
print(f"\n Año {year}:")
|
|
|
|
# Obtener modelos
|
|
models = self.get_models_for_brand_year(brand, year)
|
|
if max_models_per_year:
|
|
models = models[:max_models_per_year]
|
|
|
|
print(f" Modelos: {len(models)}")
|
|
|
|
for model in models:
|
|
# Obtener motores
|
|
engines = self.get_engines_for_vehicle(brand, year, model)
|
|
|
|
if engines:
|
|
for engine in engines:
|
|
vehicle = {
|
|
'brand': brand,
|
|
'year': year,
|
|
'model': model,
|
|
'engine': engine
|
|
}
|
|
vehicles.append(vehicle)
|
|
print(f" {model} - {engine}")
|
|
else:
|
|
# Si no hay motores específicos, agregar con motor genérico
|
|
vehicle = {
|
|
'brand': brand,
|
|
'year': year,
|
|
'model': model,
|
|
'engine': 'Standard'
|
|
}
|
|
vehicles.append(vehicle)
|
|
print(f" {model} - (sin motor específico)")
|
|
|
|
print(f"\n Total vehículos para {brand}: {len(vehicles)}")
|
|
return vehicles
|
|
|
|
def save_to_database(self, vehicles: List[Dict]):
|
|
"""Guarda los vehículos en la base de datos"""
|
|
if not vehicles:
|
|
print("No hay vehículos para guardar")
|
|
return
|
|
|
|
print(f"\nGuardando {len(vehicles)} vehículos en la base de datos...")
|
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
saved = 0
|
|
skipped = 0
|
|
|
|
for vehicle in vehicles:
|
|
try:
|
|
# Insertar o obtener marca
|
|
cursor.execute(
|
|
"INSERT OR IGNORE INTO brands (name) VALUES (?)",
|
|
(vehicle['brand'],)
|
|
)
|
|
cursor.execute("SELECT id FROM brands WHERE name = ?", (vehicle['brand'],))
|
|
brand_id = cursor.fetchone()[0]
|
|
|
|
# Insertar o obtener año
|
|
cursor.execute(
|
|
"INSERT OR IGNORE INTO years (year) VALUES (?)",
|
|
(vehicle['year'],)
|
|
)
|
|
cursor.execute("SELECT id FROM years WHERE year = ?", (vehicle['year'],))
|
|
year_id = cursor.fetchone()[0]
|
|
|
|
# Insertar o obtener motor
|
|
cursor.execute(
|
|
"INSERT OR IGNORE INTO engines (name) VALUES (?)",
|
|
(vehicle['engine'],)
|
|
)
|
|
cursor.execute("SELECT id FROM engines WHERE name = ?", (vehicle['engine'],))
|
|
engine_id = cursor.fetchone()[0]
|
|
|
|
# Insertar o obtener modelo
|
|
cursor.execute(
|
|
"INSERT OR IGNORE INTO models (brand_id, name) VALUES (?, ?)",
|
|
(brand_id, vehicle['model'])
|
|
)
|
|
cursor.execute(
|
|
"SELECT id FROM models WHERE brand_id = ? AND name = ?",
|
|
(brand_id, vehicle['model'])
|
|
)
|
|
model_id = cursor.fetchone()[0]
|
|
|
|
# Insertar relación modelo-año-motor
|
|
cursor.execute(
|
|
"""INSERT OR IGNORE INTO model_year_engine
|
|
(model_id, year_id, engine_id) VALUES (?, ?, ?)""",
|
|
(model_id, year_id, engine_id)
|
|
)
|
|
|
|
if cursor.rowcount > 0:
|
|
saved += 1
|
|
else:
|
|
skipped += 1
|
|
|
|
except Exception as e:
|
|
print(f" Error guardando {vehicle}: {e}")
|
|
skipped += 1
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
print(f" Guardados: {saved}, Omitidos (duplicados): {skipped}")
|
|
|
|
def scrape_multiple_brands(self, brands: List[str], **kwargs) -> List[Dict]:
|
|
"""Extrae vehículos de múltiples marcas"""
|
|
all_vehicles = []
|
|
|
|
for i, brand in enumerate(brands, 1):
|
|
print(f"\n[{i}/{len(brands)}] ", end="")
|
|
vehicles = self.scrape_brand(brand, **kwargs)
|
|
all_vehicles.extend(vehicles)
|
|
|
|
return all_vehicles
|
|
|
|
|
|
def main():
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description='Scraper de vehículos de RockAuto')
|
|
parser.add_argument('--brands', nargs='+', help='Marcas específicas a extraer')
|
|
parser.add_argument('--all-brands', action='store_true', help='Extraer todas las marcas')
|
|
parser.add_argument('--max-years', type=int, default=5, help='Máximo de años por marca (default: 5)')
|
|
parser.add_argument('--max-models', type=int, help='Máximo de modelos por año')
|
|
parser.add_argument('--list-brands', action='store_true', help='Solo listar marcas disponibles')
|
|
parser.add_argument('--db', default='../vehicle_database/vehicle_database.db', help='Ruta a la base de datos')
|
|
|
|
args = parser.parse_args()
|
|
|
|
scraper = RockAutoScraperV2(db_path=args.db)
|
|
|
|
if args.list_brands:
|
|
brands = scraper.get_all_brands()
|
|
print("\nMarcas disponibles en RockAuto:")
|
|
for i, brand in enumerate(brands, 1):
|
|
print(f" {i:3}. {brand}")
|
|
print(f"\nTotal: {len(brands)} marcas")
|
|
return
|
|
|
|
# Determinar qué marcas procesar
|
|
if args.brands:
|
|
brands_to_scrape = [b.upper() for b in args.brands]
|
|
elif args.all_brands:
|
|
brands_to_scrape = scraper.get_all_brands()
|
|
else:
|
|
# Por defecto, algunas marcas populares
|
|
brands_to_scrape = ['TOYOTA', 'HONDA', 'FORD', 'CHEVROLET', 'NISSAN']
|
|
|
|
print(f"\nMarcas a procesar: {', '.join(brands_to_scrape)}")
|
|
print(f"Máximo años por marca: {args.max_years}")
|
|
if args.max_models:
|
|
print(f"Máximo modelos por año: {args.max_models}")
|
|
|
|
# Extraer datos
|
|
vehicles = scraper.scrape_multiple_brands(
|
|
brands_to_scrape,
|
|
max_years=args.max_years,
|
|
max_models_per_year=args.max_models
|
|
)
|
|
|
|
# Guardar en base de datos
|
|
if vehicles:
|
|
scraper.save_to_database(vehicles)
|
|
|
|
print(f"\n{'='*50}")
|
|
print("RESUMEN")
|
|
print('='*50)
|
|
print(f"Total de vehículos extraídos: {len(vehicles)}")
|
|
|
|
# Estadísticas
|
|
brands_count = len(set(v['brand'] for v in vehicles))
|
|
models_count = len(set(f"{v['brand']}-{v['model']}" for v in vehicles))
|
|
years_range = f"{min(v['year'] for v in vehicles)} - {max(v['year'] for v in vehicles)}"
|
|
|
|
print(f"Marcas: {brands_count}")
|
|
print(f"Modelos únicos: {models_count}")
|
|
print(f"Rango de años: {years_range}")
|
|
else:
|
|
print("\nNo se encontraron vehículos")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|