- Base de datos SQLite con información de vehículos - Dashboard web con Flask y Bootstrap - Scripts de web scraping para RockAuto - Interfaz CLI para consultas - Documentación completa del proyecto Incluye: - 12 marcas de vehículos - 10,923 modelos - 10,919 especificaciones de motores - 12,075 combinaciones modelo-año-motor
186 lines
6.1 KiB
Python
186 lines
6.1 KiB
Python
import requests
|
|
from bs4 import BeautifulSoup
|
|
import sqlite3
|
|
import time
|
|
import re
|
|
import os
|
|
from urllib.parse import unquote
|
|
|
|
# Base de datos - ruta directa desde C:\Autopartes
|
|
DB_PATH = "vehicle_database/vehicle_database.db"
|
|
|
|
BASE_URL = "https://www.rockauto.com/en/catalog"
|
|
|
|
MISSING_YEARS = [
|
|
2003, 2002, 2001, 2000, 1999, 1998, 1997, 1996, 1995, 1994,
|
|
1993, 1992, 1991, 1990, 1989, 1988, 1987, 1986, 1985, 1984,
|
|
1983, 1982, 1981, 1980, 1979, 1978, 1977, 1976, 1975
|
|
]
|
|
|
|
session = requests.Session()
|
|
session.headers.update({
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
|
|
'Accept': 'text/html,application/xhtml+xml',
|
|
})
|
|
|
|
def clean_name(name):
|
|
name = unquote(name.replace('+', ' '))
|
|
return re.sub(r'\s+', ' ', name).strip().upper()
|
|
|
|
def get_soup(url, retries=3):
|
|
for attempt in range(retries):
|
|
try:
|
|
time.sleep(0.5)
|
|
response = session.get(url, timeout=15)
|
|
if response.status_code == 200:
|
|
return BeautifulSoup(response.content, 'html.parser')
|
|
elif response.status_code == 403:
|
|
print("\n [!] BLOQUEADO (403) - Cambia el VPN!")
|
|
return None
|
|
except Exception as e:
|
|
if attempt < retries - 1:
|
|
time.sleep(3)
|
|
else:
|
|
print(f"\n Error: {e}")
|
|
return None
|
|
|
|
def get_models(brand, year):
|
|
brand_url = brand.lower()
|
|
soup = get_soup(f"{BASE_URL}/{brand_url},{year}")
|
|
if not soup:
|
|
return []
|
|
|
|
models = set()
|
|
for link in soup.find_all('a', href=True):
|
|
match = re.search(rf'/catalog/{brand_url},{year},([^,/]+)', link['href'], re.I)
|
|
if match:
|
|
model = clean_name(match.group(1))
|
|
if model and not model.isdigit() and len(model) > 1:
|
|
models.add(model)
|
|
return sorted(models)
|
|
|
|
def get_engines(brand, year, model):
|
|
brand_url = brand.lower()
|
|
model_url = model.lower().replace(' ', '+')
|
|
soup = get_soup(f"{BASE_URL}/{brand_url},{year},{model_url}")
|
|
if not soup:
|
|
return ['STANDARD']
|
|
|
|
engines = set()
|
|
for link in soup.find_all('a', href=True):
|
|
match = re.search(rf'/catalog/{brand_url},{year},{re.escape(model_url)},([^,/]+)', link['href'], re.I)
|
|
if match:
|
|
engine = clean_name(match.group(1))
|
|
if engine and re.search(r'\d+\.?\d*L|V\d|I\d|HYBRID|ELECTRIC|DIESEL', engine, re.I):
|
|
engines.add(engine)
|
|
return sorted(engines) if engines else ['STANDARD']
|
|
|
|
def save_to_db(conn, brand, year, model, engine):
|
|
cursor = conn.cursor()
|
|
try:
|
|
cursor.execute("INSERT OR IGNORE INTO brands (name) VALUES (?)", (brand,))
|
|
cursor.execute("SELECT id FROM brands WHERE name = ?", (brand,))
|
|
brand_id = cursor.fetchone()[0]
|
|
|
|
cursor.execute("INSERT OR IGNORE INTO years (year) VALUES (?)", (year,))
|
|
cursor.execute("SELECT id FROM years WHERE year = ?", (year,))
|
|
year_id = cursor.fetchone()[0]
|
|
|
|
cursor.execute("INSERT OR IGNORE INTO engines (name) VALUES (?)", (engine,))
|
|
cursor.execute("SELECT id FROM engines WHERE name = ?", (engine,))
|
|
engine_id = cursor.fetchone()[0]
|
|
|
|
cursor.execute("INSERT OR IGNORE INTO models (brand_id, name) VALUES (?, ?)", (brand_id, model))
|
|
cursor.execute("SELECT id FROM models WHERE brand_id = ? AND name = ?", (brand_id, model))
|
|
model_id = cursor.fetchone()[0]
|
|
|
|
cursor.execute(
|
|
"INSERT OR IGNORE INTO model_year_engine (model_id, year_id, engine_id) VALUES (?, ?, ?)",
|
|
(model_id, year_id, engine_id)
|
|
)
|
|
return cursor.rowcount > 0
|
|
except:
|
|
return False
|
|
|
|
def get_existing_years(conn, brand):
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT DISTINCT y.year FROM years y
|
|
JOIN model_year_engine mye ON y.id = mye.year_id
|
|
JOIN models m ON mye.model_id = m.id
|
|
JOIN brands b ON m.brand_id = b.id
|
|
WHERE b.name = ?
|
|
""", (brand,))
|
|
return set(row[0] for row in cursor.fetchall())
|
|
|
|
def main():
|
|
brand = "TOYOTA"
|
|
|
|
print("=" * 50)
|
|
print(" SCRAPER TOYOTA - WINDOWS")
|
|
print(" 3 anos por lote, 60 seg pausa para VPN")
|
|
print("=" * 50)
|
|
|
|
if not os.path.exists(DB_PATH):
|
|
print(f"\n[ERROR] No encuentro: {DB_PATH}")
|
|
print("Asegurate de correr desde C:\\Autopartes")
|
|
input("Presiona ENTER para salir...")
|
|
return
|
|
|
|
conn = sqlite3.connect(DB_PATH)
|
|
existing = get_existing_years(conn, brand)
|
|
years_to_do = [y for y in MISSING_YEARS if y not in existing]
|
|
|
|
if not years_to_do:
|
|
print("\nTodos los anos ya estan!")
|
|
conn.close()
|
|
return
|
|
|
|
print(f"\nFaltan {len(years_to_do)} anos: {years_to_do[0]} a {years_to_do[-1]}")
|
|
|
|
batches = [years_to_do[i:i+3] for i in range(0, len(years_to_do), 3)]
|
|
print(f"Lotes: {len(batches)}")
|
|
input("\nPresiona ENTER para comenzar...")
|
|
|
|
total_saved = 0
|
|
|
|
for batch_num, batch in enumerate(batches, 1):
|
|
print(f"\n{'='*50}")
|
|
print(f"LOTE {batch_num}/{len(batches)}: {batch}")
|
|
print("="*50)
|
|
|
|
for year in batch:
|
|
print(f"\n[{year}] ", end="", flush=True)
|
|
models = get_models(brand, year)
|
|
print(f"{len(models)} modelos")
|
|
|
|
for model in models:
|
|
engines = get_engines(brand, year, model)
|
|
for engine in engines:
|
|
if save_to_db(conn, brand, year, model, engine):
|
|
total_saved += 1
|
|
print(f" {model} - {engine}")
|
|
|
|
conn.commit()
|
|
print(f"\n>> Guardado! Total nuevos: {total_saved}")
|
|
|
|
if batch_num < len(batches):
|
|
print(f"\n{'*'*50}")
|
|
print(" PAUSA 60 SEG - CAMBIA EL VPN AHORA!")
|
|
print(f" Faltan {len(batches) - batch_num} lotes")
|
|
print("*"*50)
|
|
|
|
for s in range(60, 0, -1):
|
|
print(f"\r Continua en {s}s... ", end="", flush=True)
|
|
time.sleep(1)
|
|
print()
|
|
|
|
conn.close()
|
|
print(f"\n{'='*50}")
|
|
print(f" LISTO! Guardados: {total_saved} vehiculos nuevos")
|
|
print("="*50)
|
|
input("Presiona ENTER para salir...")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|