import requests from bs4 import BeautifulSoup import sqlite3 import time import re import os from urllib.parse import unquote # Base de datos - ruta directa desde C:\Autopartes DB_PATH = "vehicle_database/vehicle_database.db" BASE_URL = "https://www.rockauto.com/en/catalog" MISSING_YEARS = [ 2003, 2002, 2001, 2000, 1999, 1998, 1997, 1996, 1995, 1994, 1993, 1992, 1991, 1990, 1989, 1988, 1987, 1986, 1985, 1984, 1983, 1982, 1981, 1980, 1979, 1978, 1977, 1976, 1975 ] session = requests.Session() session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'text/html,application/xhtml+xml', }) def clean_name(name): name = unquote(name.replace('+', ' ')) return re.sub(r'\s+', ' ', name).strip().upper() def get_soup(url, retries=3): for attempt in range(retries): try: time.sleep(0.5) response = session.get(url, timeout=15) if response.status_code == 200: return BeautifulSoup(response.content, 'html.parser') elif response.status_code == 403: print("\n [!] BLOQUEADO (403) - Cambia el VPN!") return None except Exception as e: if attempt < retries - 1: time.sleep(3) else: print(f"\n Error: {e}") return None def get_models(brand, year): brand_url = brand.lower() soup = get_soup(f"{BASE_URL}/{brand_url},{year}") if not soup: return [] models = set() for link in soup.find_all('a', href=True): match = re.search(rf'/catalog/{brand_url},{year},([^,/]+)', link['href'], re.I) if match: model = clean_name(match.group(1)) if model and not model.isdigit() and len(model) > 1: models.add(model) return sorted(models) def get_engines(brand, year, model): brand_url = brand.lower() model_url = model.lower().replace(' ', '+') soup = get_soup(f"{BASE_URL}/{brand_url},{year},{model_url}") if not soup: return ['STANDARD'] engines = set() for link in soup.find_all('a', href=True): match = re.search(rf'/catalog/{brand_url},{year},{re.escape(model_url)},([^,/]+)', link['href'], re.I) if match: engine = clean_name(match.group(1)) if engine and re.search(r'\d+\.?\d*L|V\d|I\d|HYBRID|ELECTRIC|DIESEL', engine, re.I): engines.add(engine) return sorted(engines) if engines else ['STANDARD'] def save_to_db(conn, brand, year, model, engine): cursor = conn.cursor() try: cursor.execute("INSERT OR IGNORE INTO brands (name) VALUES (?)", (brand,)) cursor.execute("SELECT id FROM brands WHERE name = ?", (brand,)) brand_id = cursor.fetchone()[0] cursor.execute("INSERT OR IGNORE INTO years (year) VALUES (?)", (year,)) cursor.execute("SELECT id FROM years WHERE year = ?", (year,)) year_id = cursor.fetchone()[0] cursor.execute("INSERT OR IGNORE INTO engines (name) VALUES (?)", (engine,)) cursor.execute("SELECT id FROM engines WHERE name = ?", (engine,)) engine_id = cursor.fetchone()[0] cursor.execute("INSERT OR IGNORE INTO models (brand_id, name) VALUES (?, ?)", (brand_id, model)) cursor.execute("SELECT id FROM models WHERE brand_id = ? AND name = ?", (brand_id, model)) model_id = cursor.fetchone()[0] cursor.execute( "INSERT OR IGNORE INTO model_year_engine (model_id, year_id, engine_id) VALUES (?, ?, ?)", (model_id, year_id, engine_id) ) return cursor.rowcount > 0 except: return False def get_existing_years(conn, brand): cursor = conn.cursor() cursor.execute(""" SELECT DISTINCT y.year FROM years y JOIN model_year_engine mye ON y.id = mye.year_id JOIN models m ON mye.model_id = m.id JOIN brands b ON m.brand_id = b.id WHERE b.name = ? """, (brand,)) return set(row[0] for row in cursor.fetchall()) def main(): brand = "TOYOTA" print("=" * 50) print(" SCRAPER TOYOTA - WINDOWS") print(" 3 anos por lote, 60 seg pausa para VPN") print("=" * 50) if not os.path.exists(DB_PATH): print(f"\n[ERROR] No encuentro: {DB_PATH}") print("Asegurate de correr desde C:\\Autopartes") input("Presiona ENTER para salir...") return conn = sqlite3.connect(DB_PATH) existing = get_existing_years(conn, brand) years_to_do = [y for y in MISSING_YEARS if y not in existing] if not years_to_do: print("\nTodos los anos ya estan!") conn.close() return print(f"\nFaltan {len(years_to_do)} anos: {years_to_do[0]} a {years_to_do[-1]}") batches = [years_to_do[i:i+3] for i in range(0, len(years_to_do), 3)] print(f"Lotes: {len(batches)}") input("\nPresiona ENTER para comenzar...") total_saved = 0 for batch_num, batch in enumerate(batches, 1): print(f"\n{'='*50}") print(f"LOTE {batch_num}/{len(batches)}: {batch}") print("="*50) for year in batch: print(f"\n[{year}] ", end="", flush=True) models = get_models(brand, year) print(f"{len(models)} modelos") for model in models: engines = get_engines(brand, year, model) for engine in engines: if save_to_db(conn, brand, year, model, engine): total_saved += 1 print(f" {model} - {engine}") conn.commit() print(f"\n>> Guardado! Total nuevos: {total_saved}") if batch_num < len(batches): print(f"\n{'*'*50}") print(" PAUSA 60 SEG - CAMBIA EL VPN AHORA!") print(f" Faltan {len(batches) - batch_num} lotes") print("*"*50) for s in range(60, 0, -1): print(f"\r Continua en {s}s... ", end="", flush=True) time.sleep(1) print() conn.close() print(f"\n{'='*50}") print(f" LISTO! Guardados: {total_saved} vehiculos nuevos") print("="*50) input("Presiona ENTER para salir...") if __name__ == "__main__": main()