Initial commit: Sistema Autoparts DB

- Base de datos SQLite con información de vehículos
- Dashboard web con Flask y Bootstrap
- Scripts de web scraping para RockAuto
- Interfaz CLI para consultas
- Documentación completa del proyecto

Incluye:
- 12 marcas de vehículos
- 10,923 modelos
- 10,919 especificaciones de motores
- 12,075 combinaciones modelo-año-motor
This commit is contained in:
2026-01-19 08:45:03 +00:00
commit f395d67136
59 changed files with 10881 additions and 0 deletions

View File

@@ -0,0 +1,175 @@
"""
Manual Data Extraction Guide for RockAuto.com
Since RockAuto has strong anti-bot measures, here's a manual approach to extract vehicle data:
1. Visit https://www.rockauto.com/
2. Click on "Catalog" in the navigation menu
3. You'll see a list of vehicle manufacturers (makes)
4. For each make, manually note down the models, years, and engines
This script provides a framework to input the manually collected data into your database.
"""
import sqlite3
from typing import List, Dict
class ManualDataInput:
def __init__(self, db_path: str = "../vehicle_database/vehicle_database.db"):
self.db_path = db_path
def add_vehicle_data(self, make: str, model: str, year: int, engine: str = "Unknown"):
"""Add a single vehicle entry to the database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
# Insert brand
cursor.execute(
"INSERT OR IGNORE INTO brands (name) VALUES (?)",
(make,)
)
cursor.execute("SELECT id FROM brands WHERE name = ?", (make,))
brand_id = cursor.fetchone()[0]
# Insert year
cursor.execute(
"INSERT OR IGNORE INTO years (year) VALUES (?)",
(year,)
)
cursor.execute("SELECT id FROM years WHERE year = ?", (year,))
year_id = cursor.fetchone()[0]
# Insert engine
cursor.execute(
"INSERT OR IGNORE INTO engines (name) VALUES (?)",
(engine,)
)
cursor.execute("SELECT id FROM engines WHERE name = ?", (engine,))
engine_id = cursor.fetchone()[0]
# Insert model
cursor.execute(
"INSERT OR IGNORE INTO models (brand_id, name) VALUES (?, ?)",
(brand_id, model)
)
cursor.execute("SELECT id FROM models WHERE brand_id = ? AND name = ?", (brand_id, model))
model_id = cursor.fetchone()[0]
# Link model, year, and engine
cursor.execute(
"""INSERT OR IGNORE INTO model_year_engine
(model_id, year_id, engine_id) VALUES (?, ?, ?)""",
(model_id, year_id, engine_id)
)
conn.commit()
print(f"Added: {year} {make} {model} with {engine}")
except Exception as e:
print(f"Error adding vehicle: {e}")
finally:
conn.close()
def add_multiple_vehicles(self, vehicles: List[Dict]):
"""Add multiple vehicles at once"""
for vehicle in vehicles:
self.add_vehicle_data(
make=vehicle.get('make', ''),
model=vehicle.get('model', ''),
year=vehicle.get('year', 0),
engine=vehicle.get('engine', 'Unknown')
)
def show_extraction_guide(self):
"""Show the manual extraction guide"""
guide = """
================================================
Manual RockAuto Data Extraction Guide
================================================
1. OPEN YOUR WEB BROWSER and go to: https://www.rockauto.com
2. CLICK on the "Catalog" link in the navigation menu
3. YOU WILL SEE a list of vehicle manufacturers (makes) like:
- Acura
- Audi
- BMW
- Chevrolet
- Ford
- Honda
- Toyota
- And many more...
4. FOR EACH MANUFACTURER:
a) Click on the manufacturer name
b) You'll see a page with vehicle models organized by year
c) Note down the models and years you see
d) Example format: 2020 Honda Civic, 2019 Ford F-150, etc.
5. TO FIND ENGINE INFORMATION:
a) Click on a specific model/year combination
b) You'll see parts categories for that vehicle
c) Look for "Engine" or "Engine Mechanical" category
d) Note down the engine type/specifications
6. USE THE FOLLOWING COMMANDS to add data to your database:
Example Python commands:
>>> from manual_input import ManualDataInput
>>> input_tool = ManualDataInput()
>>> input_tool.add_vehicle_data("Toyota", "Camry", 2020, "2.5L 4-Cylinder")
>>> input_tool.add_vehicle_data("Honda", "Civic", 2019, "1.5L Turbo")
Or add multiple at once:
>>> vehicles = [
... {"make": "Ford", "model": "F-150", "year": 2021, "engine": "3.5L V6"},
... {"make": "BMW", "model": "X3", "year": 2020, "engine": "2.0L 4-Cylinder Turbo"}
... ]
>>> input_tool.add_multiple_vehicles(vehicles)
7. TIPS FOR EFFICIENT DATA COLLECTION:
- Focus on popular makes/models first
- Record data in a spreadsheet as you go
- Take screenshots of pages for reference
- Be systematic - go alphabetically or by make popularity
================================================
"""
print(guide)
def main():
print("Manual RockAuto Data Extraction Tool")
print("=====================================")
input_tool = ManualDataInput()
# Show the extraction guide
input_tool.show_extraction_guide()
# Example of how to add data
print("\nExample - Adding sample data:")
sample_vehicles = [
{"make": "Toyota", "model": "Camry", "year": 2020, "engine": "2.5L 4-Cylinder"},
{"make": "Honda", "model": "Civic", "year": 2019, "engine": "1.5L Turbo"},
{"make": "Ford", "model": "F-150", "year": 2021, "engine": "3.5L V6"},
{"make": "BMW", "model": "X3", "year": 2020, "engine": "2.0L 4-Cylinder Turbo"},
{"make": "Chevrolet", "model": "Silverado", "year": 2022, "engine": "5.3L V8"}
]
print("Would you like to add these sample vehicles to your database? (y/n): ", end="")
response = input().lower()
if response == 'y':
input_tool.add_multiple_vehicles(sample_vehicles)
print("\nSample vehicles added to database!")
print("\nYou can now use the ManualDataInput class to add more vehicles manually.")
print("Import it in Python with: from manual_input import ManualDataInput")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,171 @@
"""
Manual Data Extraction Guide for RockAuto.com
Since RockAuto has strong anti-bot measures, here's a manual approach to extract vehicle data:
1. Visit https://www.rockauto.com/
2. Click on "Catalog" in the navigation menu
3. You'll see a list of vehicle manufacturers (makes)
4. For each make, manually note down the models, years, and engines
This script provides a framework to input the manually collected data into your database.
"""
import sqlite3
from typing import List, Dict
class ManualDataInput:
def __init__(self, db_path: str = "../vehicle_database/vehicle_database.db"):
self.db_path = db_path
def add_vehicle_data(self, make: str, model: str, year: int, engine: str = "Unknown"):
"""Add a single vehicle entry to the database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
# Insert brand
cursor.execute(
"INSERT OR IGNORE INTO brands (name) VALUES (?)",
(make,)
)
cursor.execute("SELECT id FROM brands WHERE name = ?", (make,))
brand_id = cursor.fetchone()[0]
# Insert year
cursor.execute(
"INSERT OR IGNORE INTO years (year) VALUES (?)",
(year,)
)
cursor.execute("SELECT id FROM years WHERE year = ?", (year,))
year_id = cursor.fetchone()[0]
# Insert engine
cursor.execute(
"INSERT OR IGNORE INTO engines (name) VALUES (?)",
(engine,)
)
cursor.execute("SELECT id FROM engines WHERE name = ?", (engine,))
engine_id = cursor.fetchone()[0]
# Insert model
cursor.execute(
"INSERT OR IGNORE INTO models (brand_id, name) VALUES (?, ?)",
(brand_id, model)
)
cursor.execute("SELECT id FROM models WHERE brand_id = ? AND name = ?", (brand_id, model))
model_id = cursor.fetchone()[0]
# Link model, year, and engine
cursor.execute(
"""INSERT OR IGNORE INTO model_year_engine
(model_id, year_id, engine_id) VALUES (?, ?, ?)""",
(model_id, year_id, engine_id)
)
conn.commit()
print(f"Added: {year} {make} {model} with {engine}")
except Exception as e:
print(f"Error adding vehicle: {e}")
finally:
conn.close()
def add_multiple_vehicles(self, vehicles: List[Dict]):
"""Add multiple vehicles at once"""
for vehicle in vehicles:
self.add_vehicle_data(
make=vehicle.get('make', ''),
model=vehicle.get('model', ''),
year=vehicle.get('year', 0),
engine=vehicle.get('engine', 'Unknown')
)
def show_extraction_guide(self):
"""Show the manual extraction guide"""
guide = """
================================================
Manual RockAuto Data Extraction Guide
================================================
1. OPEN YOUR WEB BROWSER and go to: https://www.rockauto.com
2. CLICK on the "Catalog" link in the navigation menu
3. YOU WILL SEE a list of vehicle manufacturers (makes) like:
- Acura
- Audi
- BMW
- Chevrolet
- Ford
- Honda
- Toyota
- And many more...
4. FOR EACH MANUFACTURER:
a) Click on the manufacturer name
b) You'll see a page with vehicle models organized by year
c) Note down the models and years you see
d) Example format: 2020 Honda Civic, 2019 Ford F-150, etc.
5. TO FIND ENGINE INFORMATION:
a) Click on a specific model/year combination
b) You'll see parts categories for that vehicle
c) Look for "Engine" or "Engine Mechanical" category
d) Note down the engine type/specifications
6. USE THE FOLLOWING COMMANDS to add data to your database:
Example Python commands:
>>> from manual_input import ManualDataInput
>>> input_tool = ManualDataInput()
>>> input_tool.add_vehicle_data("Toyota", "Camry", 2020, "2.5L 4-Cylinder")
>>> input_tool.add_vehicle_data("Honda", "Civic", 2019, "1.5L Turbo")
Or add multiple at once:
>>> vehicles = [
... {"make": "Ford", "model": "F-150", "year": 2021, "engine": "3.5L V6"},
... {"make": "BMW", "model": "X3", "year": 2020, "engine": "2.0L 4-Cylinder Turbo"}
... ]
>>> input_tool.add_multiple_vehicles(vehicles)
7. TIPS FOR EFFICIENT DATA COLLECTION:
- Focus on popular makes/models first
- Record data in a spreadsheet as you go
- Take screenshots of pages for reference
- Be systematic - go alphabetically or by make popularity
================================================
"""
print(guide)
def main():
print("Manual RockAuto Data Extraction Tool")
print("=====================================")
input_tool = ManualDataInput()
# Show the extraction guide
input_tool.show_extraction_guide()
# Add sample vehicles to database
print("\nAdding sample vehicles to database:")
sample_vehicles = [
{"make": "Toyota", "model": "Camry", "year": 2020, "engine": "2.5L 4-Cylinder"},
{"make": "Honda", "model": "Civic", "year": 2019, "engine": "1.5L Turbo"},
{"make": "Ford", "model": "F-150", "year": 2021, "engine": "3.5L V6"},
{"make": "BMW", "model": "X3", "year": 2020, "engine": "2.0L 4-Cylinder Turbo"},
{"make": "Chevrolet", "model": "Silverado", "year": 2022, "engine": "5.3L V8"}
]
input_tool.add_multiple_vehicles(sample_vehicles)
print("\nSample vehicles added to database!")
print("\nYou can now use the ManualDataInput class to add more vehicles manually.")
print("Import it in Python with: from manual_input import ManualDataInput")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,3 @@
requests
beautifulsoup4
lxml

View File

@@ -0,0 +1,292 @@
#!/usr/bin/env python3
"""
RockAuto Vehicle Data Scraper
Extracts vehicle information (brands, models, years, engines) from RockAuto.com
"""
import requests
from bs4 import BeautifulSoup
import time
import random
from urllib.parse import urljoin, urlparse
import json
import sqlite3
from typing import List, Dict, Optional
class RockAutoScraper:
def __init__(self, db_path: str = "../vehicle_database/vehicle_database.db"):
self.base_url = "https://www.rockauto.com"
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
})
self.db_path = db_path
# Create a mapping of RockAuto brand names to standardized names
self.brand_mapping = {
'acura': 'Acura',
'alfa-romeo': 'Alfa Romeo',
'audi': 'Audi',
'bmw': 'BMW',
'buick': 'Buick',
'cadillac': 'Cadillac',
'chevrolet': 'Chevrolet',
'chrysler': 'Chrysler',
'dodge': 'Dodge',
'fiat': 'Fiat',
'ford': 'Ford',
'gmc': 'GMC',
'honda': 'Honda',
'hyundai': 'Hyundai',
'infiniti': 'Infiniti',
'isuzu': 'Isuzu',
'jaguar': 'Jaguar',
'jeep': 'Jeep',
'kia': 'Kia',
'land-rover': 'Land Rover',
'lexus': 'Lexus',
'lincoln': 'Lincoln',
'mazda': 'Mazda',
'mercedes-benz': 'Mercedes-Benz',
'mercury': 'Mercury',
'mitsubishi': 'Mitsubishi',
'nissan': 'Nissan',
'oldsmobile': 'Oldsmobile',
'plymouth': 'Plymouth',
'pontiac': 'Pontiac',
'porsche': 'Porsche',
'ram': 'Ram',
'saab': 'Saab',
'saturn': 'Saturn',
'scion': 'Scion',
'subaru': 'Subaru',
'suzuki': 'Suzuki',
'tesla': 'Tesla',
'toyota': 'Toyota',
'volkswagen': 'Volkswagen',
'volvo': 'Volvo'
}
def get_page(self, url: str) -> Optional[BeautifulSoup]:
"""Get a page and return BeautifulSoup object"""
try:
# Add random delay to be respectful to the server
time.sleep(random.uniform(1, 3))
response = self.session.get(url)
response.raise_for_status()
return BeautifulSoup(response.content, 'html.parser')
except requests.RequestException as e:
print(f"Error fetching {url}: {e}")
return None
def get_makes(self) -> List[str]:
"""Get list of makes from RockAuto"""
print("Fetching list of makes...")
soup = self.get_page(f"{self.base_url}/catalog/catalog.php")
if not soup:
return []
makes = []
# Look for make selection dropdown or similar element
make_elements = soup.find_all('a', href=lambda x: x and '/catalog/' in x and x.count('/') >= 3)
for elem in make_elements:
href = elem.get('href', '')
# Extract make from URL
parts = href.split('/')
for part in parts:
if part in self.brand_mapping:
make = self.brand_mapping[part]
if make not in makes:
makes.append(make)
# Alternative approach: look for common selector patterns
if not makes:
# Look for elements that might contain make information
links = soup.find_all('a', href=True)
for link in links:
href = link['href'].lower()
for key, value in self.brand_mapping.items():
if key in href and value not in makes:
makes.append(value)
print(f"Found {len(makes)} makes: {makes[:10]}{'...' if len(makes) > 10 else ''}")
return makes
def get_models_for_make(self, make: str) -> List[Dict]:
"""Get models for a specific make"""
print(f"Fetching models for {make}...")
# Convert make to RockAuto format
make_key = None
for key, value in self.brand_mapping.items():
if value.lower() == make.lower():
make_key = key
break
if not make_key:
print(f"Make {make} not found in mapping")
return []
models = []
soup = self.get_page(f"{self.base_url}/catalog/catalog.php?c={make_key}")
if not soup:
return models
# Look for model/year combinations
# RockAuto typically has links with year and model info
links = soup.find_all('a', href=True)
for link in links:
href = link['href']
text = link.get_text().strip()
# Look for patterns that indicate year/model/engine info
if any(char.isdigit() for char in text) and len(text) > 2:
# Try to extract year and model info
parts = text.split()
# Look for year (usually 4 digits)
year = None
model_parts = []
for part in parts:
if part.isdigit() and len(part) == 4 and 1900 < int(part) < 2030:
year = int(part)
else:
model_parts.append(part)
if model_parts and year:
model = ' '.join(model_parts)
# Create a record
record = {
'make': make,
'model': model,
'year': year,
'engine': 'Unknown', # Will need to extract from deeper pages
'href': href
}
if record not in models:
models.append(record)
print(f"Found {len(models)} models for {make}")
return models
def scrape_vehicle_data(self) -> List[Dict]:
"""Main method to scrape vehicle data from RockAuto"""
print("Starting RockAuto scraping...")
all_vehicles = []
# Get all makes
makes = self.get_makes()
# Limit to first 5 makes for testing
makes = makes[:5] if len(makes) > 5 else makes
for make in makes:
models = self.get_models_for_make(make)
all_vehicles.extend(models)
# Limit total records for testing
if len(all_vehicles) > 20:
break
print(f"Total vehicles found: {len(all_vehicles)}")
return all_vehicles
def save_to_database(self, vehicles: List[Dict]):
"""Save scraped data to the vehicle database"""
print(f"Saving {len(vehicles)} vehicles to database...")
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
for vehicle in vehicles:
try:
# Insert brand
cursor.execute(
"INSERT OR IGNORE INTO brands (name) VALUES (?)",
(vehicle['make'],)
)
cursor.execute("SELECT id FROM brands WHERE name = ?", (vehicle['make'],))
brand_id = cursor.fetchone()[0]
# Insert year
cursor.execute(
"INSERT OR IGNORE INTO years (year) VALUES (?)",
(vehicle['year'],)
)
cursor.execute("SELECT id FROM years WHERE year = ?", (vehicle['year'],))
year_id = cursor.fetchone()[0]
# Insert engine (with unknown specs for now)
cursor.execute(
"INSERT OR IGNORE INTO engines (name) VALUES (?)",
(vehicle['engine'],)
)
cursor.execute("SELECT id FROM engines WHERE name = ?", (vehicle['engine'],))
engine_id = cursor.fetchone()[0]
# Insert model
cursor.execute(
"INSERT OR IGNORE INTO models (brand_id, name) VALUES (?, ?)",
(brand_id, vehicle['model'])
)
cursor.execute("SELECT id FROM models WHERE brand_id = ? AND name = ?", (brand_id, vehicle['model']))
model_id = cursor.fetchone()[0]
# Link model, year, and engine
cursor.execute(
"""INSERT OR IGNORE INTO model_year_engine
(model_id, year_id, engine_id) VALUES (?, ?, ?)""",
(model_id, year_id, engine_id)
)
except Exception as e:
print(f"Error saving vehicle {vehicle}: {e}")
conn.commit()
conn.close()
print("Data saved to database successfully!")
def main():
scraper = RockAutoScraper()
print("Starting RockAuto data extraction...")
print("Note: This may take several minutes due to rate limiting.")
try:
# Scrape vehicle data
vehicles = scraper.scrape_vehicle_data()
if vehicles:
print(f"\nFound {len(vehicles)} vehicles:")
for i, v in enumerate(vehicles[:10]): # Show first 10
print(f" {i+1}. {v['make']} {v['model']} {v['year']}")
if len(vehicles) > 10:
print(f" ... and {len(vehicles)-10} more")
# Save to database
scraper.save_to_database(vehicles)
print("\nScraping completed successfully!")
else:
print("No vehicles found. This could be due to:")
print("1. RockAuto blocking automated requests")
print("2. Changes in website structure")
print("3. Network connectivity issues")
except Exception as e:
print(f"An error occurred during scraping: {e}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,400 @@
#!/usr/bin/env python3
"""
RockAuto Vehicle Data Scraper - Enhanced Version
Extracts vehicle information (brands, models, years, engines) from RockAuto.com
"""
import requests
from bs4 import BeautifulSoup
import time
import random
from urllib.parse import urljoin, urlparse
import json
import sqlite3
from typing import List, Dict, Optional
class RockAutoScraper:
def __init__(self, db_path: str = "../vehicle_database/vehicle_database.db"):
self.base_url = "https://www.rockauto.com"
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
})
self.db_path = db_path
# Create a mapping of RockAuto brand names to standardized names
self.brand_mapping = {
'acura': 'Acura',
'alfa-romeo': 'Alfa Romeo',
'audi': 'Audi',
'bmw': 'BMW',
'buick': 'Buick',
'cadillac': 'Cadillac',
'chevrolet': 'Chevrolet',
'chrysler': 'Chrysler',
'dodge': 'Dodge',
'fiat': 'Fiat',
'ford': 'Ford',
'gmc': 'GMC',
'honda': 'Honda',
'hyundai': 'Hyundai',
'infiniti': 'Infiniti',
'isuzu': 'Isuzu',
'jaguar': 'Jaguar',
'jeep': 'Jeep',
'kia': 'Kia',
'land-rover': 'Land Rover',
'lexus': 'Lexus',
'lincoln': 'Lincoln',
'mazda': 'Mazda',
'mercedes-benz': 'Mercedes-Benz',
'mercury': 'Mercury',
'mitsubishi': 'Mitsubishi',
'nissan': 'Nissan',
'oldsmobile': 'Oldsmobile',
'plymouth': 'Plymouth',
'pontiac': 'Pontiac',
'porsche': 'Porsche',
'ram': 'Ram',
'saab': 'Saab',
'saturn': 'Saturn',
'scion': 'Scion',
'subaru': 'Subaru',
'suzuki': 'Suzuki',
'tesla': 'Tesla',
'toyota': 'Toyota',
'volkswagen': 'Volkswagen',
'volvo': 'Volvo'
}
def get_page(self, url: str) -> Optional[BeautifulSoup]:
"""Get a page and return BeautifulSoup object"""
try:
# Add random delay to be respectful to the server
time.sleep(random.uniform(2, 4))
response = self.session.get(url)
response.raise_for_status()
return BeautifulSoup(response.content, 'html.parser')
except requests.RequestException as e:
print(f"Error fetching {url}: {e}")
return None
def get_makes_enhanced(self) -> List[str]:
"""Enhanced method to get makes from RockAuto"""
print("Fetching list of makes (enhanced)...")
# Try multiple approaches to get makes
makes = []
# Approach 1: Visit the main catalog page
soup = self.get_page(f"{self.base_url}/catalog/catalog.php")
if not soup:
return makes
# Look for links that contain make information in the URL
links = soup.find_all('a', href=True)
for link in links:
href = link.get('href', '').lower()
# Check if the href contains a known make
for key, value in self.brand_mapping.items():
if f"/{key}/" in href and value not in makes:
makes.append(value)
# Approach 2: Look for JavaScript variables or data attributes that might contain makes
scripts = soup.find_all('script')
for script in scripts:
if script.string:
# Look for common patterns in JavaScript
import re
# Look for patterns like make names in quotes
matches = re.findall(r'["\']([a-z-]+)["\']', script.string)
for match in matches:
if match in self.brand_mapping and self.brand_mapping[match] not in makes:
makes.append(self.brand_mapping[match])
print(f"Found {len(makes)} makes: {makes[:10]}{'...' if len(makes) > 10 else ''}")
return makes
def get_detailed_models_for_make(self, make: str) -> List[Dict]:
"""Get detailed models for a specific make by exploring deeper pages"""
print(f"Fetching detailed models for {make}...")
# Convert make to RockAuto format
make_key = None
for key, value in self.brand_mapping.items():
if value.lower() == make.lower():
make_key = key
break
if not make_key:
print(f"Make {make} not found in mapping")
return []
models = []
# Visit the make-specific page
url = f"{self.base_url}/catalog/catalog.php?c={make_key}"
soup = self.get_page(url)
if not soup:
return models
# Look for year links first
year_links = soup.find_all('a', href=lambda x: x and f'/catalog/{make_key}/' in x and any(str(y) in x for y in range(1900, 2030)))
for link in year_links:
href = link.get('href', '')
text = link.get_text().strip()
# Extract year from URL or text
import re
year_match = re.search(r'\b(19|20)\d{2}\b', text)
if not year_match:
year_match = re.search(r'\b(19|20)\d{2}\b', href)
if year_match:
year = int(year_match.group())
# Extract model from text or URL
# Remove year from text to get model
model_text = re.sub(r'\b(19|20)\d{2}\b', '', text).strip()
if model_text:
# Create a record
record = {
'make': make,
'model': model_text,
'year': year,
'engine': 'Unknown', # Will need to extract from deeper pages
'href': href
}
if record not in models:
models.append(record)
# If no year-specific links found, try alternative approach
if not models:
# Look for links that might contain both make and year
all_links = soup.find_all('a', href=True)
for link in all_links:
href = link.get('href', '').lower()
text = link.get_text().strip()
if f"/{make_key}/" in href:
# Look for year in the text or href
year_match = re.search(r'\b(19|20)\d{2}\b', text)
if not year_match:
year_match = re.search(r'\b(19|20)\d{2}\b', href)
if year_match:
year = int(year_match.group())
# Extract model info
model_parts = [part for part in text.split() if not re.match(r'\b(19|20)\d{2}\b', part)]
model = ' '.join(model_parts)
if model:
record = {
'make': make,
'model': model,
'year': year,
'engine': 'Unknown',
'href': link.get('href')
}
if record not in models:
models.append(record)
print(f"Found {len(models)} models for {make}")
return models
def explore_categories(self, make: str) -> List[Dict]:
"""Explore categories for a specific make to find models and years"""
print(f"Exploring categories for {make}...")
# Convert make to RockAuto format
make_key = None
for key, value in self.brand_mapping.items():
if value.lower() == make.lower():
make_key = key
break
if not make_key:
print(f"Make {make} not found in mapping")
return []
models = []
# Visit the make-specific page
url = f"{self.base_url}/catalog/catalog.php?c={make_key}"
soup = self.get_page(url)
if not soup:
return models
# Look for elements that represent vehicle categories
# RockAuto typically organizes by year/model
category_elements = soup.find_all(['div', 'section', 'ul'], class_=lambda x: x and any(keyword in x.lower() for keyword in ['year', 'model', 'catalog', 'vehicle']))
if not category_elements:
# If no categorized elements found, try looking for all links with year info
all_links = soup.find_all('a', href=True)
for link in all_links:
href = link.get('href', '').lower()
text = link.get_text().strip()
if f"/{make_key}/" in href and any(str(year) in href for year in range(1900, 2030)):
# Extract year and model
import re
year_match = re.search(r'\b(19|20)\d{2}\b', href)
if year_match:
year = int(year_match.group())
# Clean up text to extract model
clean_text = re.sub(r'\b(19|20)\d{2}\b', '', text).strip(' -_')
if clean_text and len(clean_text) > 1:
record = {
'make': make,
'model': clean_text,
'year': year,
'engine': 'Unknown',
'href': link.get('href')
}
if record not in models:
models.append(record)
print(f"Found {len(models)} entries for {make} through category exploration")
return models
def scrape_vehicle_data(self) -> List[Dict]:
"""Main method to scrape vehicle data from RockAuto"""
print("Starting enhanced RockAuto scraping...")
all_vehicles = []
# Get all makes using enhanced method
makes = self.get_makes_enhanced()
# Limit to first 3 makes for testing
makes = makes[:3] if len(makes) > 3 else makes
for make in makes:
# Try multiple approaches to get models
models = self.get_detailed_models_for_make(make)
# If still no models, try category exploration
if not models:
models = self.explore_categories(make)
all_vehicles.extend(models)
# Limit total records for testing
if len(all_vehicles) > 15:
break
print(f"Total vehicles found: {len(all_vehicles)}")
return all_vehicles
def save_to_database(self, vehicles: List[Dict]):
"""Save scraped data to the vehicle database"""
print(f"Saving {len(vehicles)} vehicles to database...")
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
for vehicle in vehicles:
try:
# Insert brand
cursor.execute(
"INSERT OR IGNORE INTO brands (name) VALUES (?)",
(vehicle['make'],)
)
cursor.execute("SELECT id FROM brands WHERE name = ?", (vehicle['make'],))
brand_id = cursor.fetchone()[0]
# Insert year
cursor.execute(
"INSERT OR IGNORE INTO years (year) VALUES (?)",
(vehicle['year'],)
)
cursor.execute("SELECT id FROM years WHERE year = ?", (vehicle['year'],))
year_id = cursor.fetchone()[0]
# Insert engine (with unknown specs for now)
engine_name = vehicle['engine'] if vehicle['engine'] != 'Unknown' else f"Engine_{vehicle['year']}_{vehicle['model'][:10]}"
cursor.execute(
"INSERT OR IGNORE INTO engines (name) VALUES (?)",
(engine_name,)
)
cursor.execute("SELECT id FROM engines WHERE name = ?", (engine_name,))
engine_id = cursor.fetchone()[0]
# Insert model
cursor.execute(
"INSERT OR IGNORE INTO models (brand_id, name, body_type) VALUES (?, ?, ?)",
(brand_id, vehicle['model'], 'Unknown')
)
cursor.execute("SELECT id FROM models WHERE brand_id = ? AND name = ?", (brand_id, vehicle['model']))
model_id = cursor.fetchone()[0]
# Link model, year, and engine
cursor.execute(
"""INSERT OR IGNORE INTO model_year_engine
(model_id, year_id, engine_id) VALUES (?, ?, ?)""",
(model_id, year_id, engine_id)
)
except Exception as e:
print(f"Error saving vehicle {vehicle}: {e}")
conn.commit()
conn.close()
print("Data saved to database successfully!")
def main():
scraper = RockAutoScraper()
print("Starting enhanced RockAuto data extraction...")
print("Note: This may take several minutes due to rate limiting.")
try:
# Scrape vehicle data
vehicles = scraper.scrape_vehicle_data()
if vehicles:
print(f"\nFound {len(vehicles)} vehicles:")
for i, v in enumerate(vehicles[:10]): # Show first 10
print(f" {i+1}. {v['make']} {v['model']} {v['year']}")
if len(vehicles) > 10:
print(f" ... and {len(vehicles)-10} more")
# Save to database
scraper.save_to_database(vehicles)
print("\nScraping completed successfully!")
else:
print("No vehicles found. This could be due to:")
print("1. RockAuto blocking automated requests")
print("2. Changes in website structure")
print("3. Network connectivity issues")
print("4. Anti-bot measures implemented by RockAuto")
except Exception as e:
print(f"An error occurred during scraping: {e}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,350 @@
#!/usr/bin/env python3
"""
RockAuto Vehicle Data Scraper v2
Extrae información de vehículos (marcas, años, modelos, motores) de RockAuto.com
"""
import requests
from bs4 import BeautifulSoup
import time
import random
import sqlite3
import re
import sys
from typing import List, Dict, Set, Optional
from urllib.parse import unquote
class RockAutoScraperV2:
def __init__(self, db_path: str = "../vehicle_database/vehicle_database.db"):
self.base_url = "https://www.rockauto.com/en/catalog"
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
})
self.db_path = db_path
self.delay_range = (1, 2) # Segundos entre peticiones
def _delay(self):
"""Pausa respetuosa entre peticiones"""
time.sleep(random.uniform(*self.delay_range))
def _get_soup(self, url: str) -> Optional[BeautifulSoup]:
"""Obtiene y parsea una página"""
try:
self._delay()
response = self.session.get(url, timeout=30)
response.raise_for_status()
return BeautifulSoup(response.content, 'html.parser')
except Exception as e:
print(f" Error al obtener {url}: {e}")
return None
def _clean_name(self, name: str) -> str:
"""Limpia y formatea un nombre"""
name = unquote(name.replace('+', ' '))
name = re.sub(r'\s+', ' ', name).strip()
return name.upper()
def get_all_brands(self) -> List[str]:
"""Obtiene todas las marcas disponibles"""
print("Obteniendo lista de marcas...")
soup = self._get_soup(f"{self.base_url}/")
if not soup:
return []
brands = set()
links = soup.find_all('a', href=True)
for link in links:
href = link['href']
# Buscar enlaces como /en/catalog/MARCA
match = re.match(r'/en/catalog/([^,/]+)$', href)
if match:
brand = self._clean_name(match.group(1))
if brand and len(brand) > 1 and not brand.isdigit():
brands.add(brand)
brands_list = sorted(brands)
print(f" Encontradas {len(brands_list)} marcas")
return brands_list
def get_years_for_brand(self, brand: str) -> List[int]:
"""Obtiene los años disponibles para una marca"""
brand_url = brand.lower().replace(' ', '+')
soup = self._get_soup(f"{self.base_url}/{brand_url}")
if not soup:
return []
years = set()
links = soup.find_all('a', href=True)
for link in links:
href = link['href']
# Buscar patrones como /catalog/brand,YEAR
match = re.search(rf'/catalog/{re.escape(brand_url)},(\d{{4}})', href, re.IGNORECASE)
if match:
year = int(match.group(1))
if 1900 < year <= 2030:
years.add(year)
return sorted(years, reverse=True)
def get_models_for_brand_year(self, brand: str, year: int) -> List[str]:
"""Obtiene los modelos para una marca y año"""
brand_url = brand.lower().replace(' ', '+')
soup = self._get_soup(f"{self.base_url}/{brand_url},{year}")
if not soup:
return []
models = set()
links = soup.find_all('a', href=True)
for link in links:
href = link['href']
# Buscar patrones como /catalog/brand,year,MODEL
pattern = rf'/catalog/{re.escape(brand_url)},{year},([^,/]+)'
match = re.search(pattern, href, re.IGNORECASE)
if match:
model = self._clean_name(match.group(1))
if model and len(model) > 0 and not model.isdigit():
models.add(model)
return sorted(models)
def get_engines_for_vehicle(self, brand: str, year: int, model: str) -> List[str]:
"""Obtiene los motores para un vehículo específico"""
brand_url = brand.lower().replace(' ', '+')
model_url = model.lower().replace(' ', '+')
soup = self._get_soup(f"{self.base_url}/{brand_url},{year},{model_url}")
if not soup:
return []
engines = set()
links = soup.find_all('a', href=True)
for link in links:
href = link['href']
text = link.get_text().strip()
# Buscar patrones de motor en el href
pattern = rf'/catalog/{re.escape(brand_url)},{year},{re.escape(model_url)},([^,/]+)'
match = re.search(pattern, href, re.IGNORECASE)
if match:
engine = self._clean_name(match.group(1))
# Filtrar solo motores válidos (contienen L, V, cilindros, etc.)
if engine and re.search(r'\d+\.?\d*L|V\d|I\d|HYBRID|ELECTRIC|DIESEL', engine, re.IGNORECASE):
engines.add(engine)
return sorted(engines)
def scrape_brand(self, brand: str, max_years: int = None, max_models_per_year: int = None) -> List[Dict]:
"""Extrae todos los vehículos de una marca"""
print(f"\n{'='*50}")
print(f"Procesando marca: {brand}")
print('='*50)
vehicles = []
# Obtener años
years = self.get_years_for_brand(brand)
if max_years:
years = years[:max_years]
print(f" Años encontrados: {len(years)}")
for year in years:
print(f"\n Año {year}:")
# Obtener modelos
models = self.get_models_for_brand_year(brand, year)
if max_models_per_year:
models = models[:max_models_per_year]
print(f" Modelos: {len(models)}")
for model in models:
# Obtener motores
engines = self.get_engines_for_vehicle(brand, year, model)
if engines:
for engine in engines:
vehicle = {
'brand': brand,
'year': year,
'model': model,
'engine': engine
}
vehicles.append(vehicle)
print(f" {model} - {engine}")
else:
# Si no hay motores específicos, agregar con motor genérico
vehicle = {
'brand': brand,
'year': year,
'model': model,
'engine': 'Standard'
}
vehicles.append(vehicle)
print(f" {model} - (sin motor específico)")
print(f"\n Total vehículos para {brand}: {len(vehicles)}")
return vehicles
def save_to_database(self, vehicles: List[Dict]):
"""Guarda los vehículos en la base de datos"""
if not vehicles:
print("No hay vehículos para guardar")
return
print(f"\nGuardando {len(vehicles)} vehículos en la base de datos...")
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
saved = 0
skipped = 0
for vehicle in vehicles:
try:
# Insertar o obtener marca
cursor.execute(
"INSERT OR IGNORE INTO brands (name) VALUES (?)",
(vehicle['brand'],)
)
cursor.execute("SELECT id FROM brands WHERE name = ?", (vehicle['brand'],))
brand_id = cursor.fetchone()[0]
# Insertar o obtener año
cursor.execute(
"INSERT OR IGNORE INTO years (year) VALUES (?)",
(vehicle['year'],)
)
cursor.execute("SELECT id FROM years WHERE year = ?", (vehicle['year'],))
year_id = cursor.fetchone()[0]
# Insertar o obtener motor
cursor.execute(
"INSERT OR IGNORE INTO engines (name) VALUES (?)",
(vehicle['engine'],)
)
cursor.execute("SELECT id FROM engines WHERE name = ?", (vehicle['engine'],))
engine_id = cursor.fetchone()[0]
# Insertar o obtener modelo
cursor.execute(
"INSERT OR IGNORE INTO models (brand_id, name) VALUES (?, ?)",
(brand_id, vehicle['model'])
)
cursor.execute(
"SELECT id FROM models WHERE brand_id = ? AND name = ?",
(brand_id, vehicle['model'])
)
model_id = cursor.fetchone()[0]
# Insertar relación modelo-año-motor
cursor.execute(
"""INSERT OR IGNORE INTO model_year_engine
(model_id, year_id, engine_id) VALUES (?, ?, ?)""",
(model_id, year_id, engine_id)
)
if cursor.rowcount > 0:
saved += 1
else:
skipped += 1
except Exception as e:
print(f" Error guardando {vehicle}: {e}")
skipped += 1
conn.commit()
conn.close()
print(f" Guardados: {saved}, Omitidos (duplicados): {skipped}")
def scrape_multiple_brands(self, brands: List[str], **kwargs) -> List[Dict]:
"""Extrae vehículos de múltiples marcas"""
all_vehicles = []
for i, brand in enumerate(brands, 1):
print(f"\n[{i}/{len(brands)}] ", end="")
vehicles = self.scrape_brand(brand, **kwargs)
all_vehicles.extend(vehicles)
return all_vehicles
def main():
import argparse
parser = argparse.ArgumentParser(description='Scraper de vehículos de RockAuto')
parser.add_argument('--brands', nargs='+', help='Marcas específicas a extraer')
parser.add_argument('--all-brands', action='store_true', help='Extraer todas las marcas')
parser.add_argument('--max-years', type=int, default=5, help='Máximo de años por marca (default: 5)')
parser.add_argument('--max-models', type=int, help='Máximo de modelos por año')
parser.add_argument('--list-brands', action='store_true', help='Solo listar marcas disponibles')
parser.add_argument('--db', default='../vehicle_database/vehicle_database.db', help='Ruta a la base de datos')
args = parser.parse_args()
scraper = RockAutoScraperV2(db_path=args.db)
if args.list_brands:
brands = scraper.get_all_brands()
print("\nMarcas disponibles en RockAuto:")
for i, brand in enumerate(brands, 1):
print(f" {i:3}. {brand}")
print(f"\nTotal: {len(brands)} marcas")
return
# Determinar qué marcas procesar
if args.brands:
brands_to_scrape = [b.upper() for b in args.brands]
elif args.all_brands:
brands_to_scrape = scraper.get_all_brands()
else:
# Por defecto, algunas marcas populares
brands_to_scrape = ['TOYOTA', 'HONDA', 'FORD', 'CHEVROLET', 'NISSAN']
print(f"\nMarcas a procesar: {', '.join(brands_to_scrape)}")
print(f"Máximo años por marca: {args.max_years}")
if args.max_models:
print(f"Máximo modelos por año: {args.max_models}")
# Extraer datos
vehicles = scraper.scrape_multiple_brands(
brands_to_scrape,
max_years=args.max_years,
max_models_per_year=args.max_models
)
# Guardar en base de datos
if vehicles:
scraper.save_to_database(vehicles)
print(f"\n{'='*50}")
print("RESUMEN")
print('='*50)
print(f"Total de vehículos extraídos: {len(vehicles)}")
# Estadísticas
brands_count = len(set(v['brand'] for v in vehicles))
models_count = len(set(f"{v['brand']}-{v['model']}" for v in vehicles))
years_range = f"{min(v['year'] for v in vehicles)} - {max(v['year'] for v in vehicles)}"
print(f"Marcas: {brands_count}")
print(f"Modelos únicos: {models_count}")
print(f"Rango de años: {years_range}")
else:
print("\nNo se encontraron vehículos")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,393 @@
#!/usr/bin/env python3
"""
Scraper de Ford y Chevrolet
- Procesa de 5 en 5 años
- Espera 3 minutos (180 segundos) entre lotes para activar VPN
- Presiona ENTER para saltar la espera
- Años: 1975-2026
"""
import requests
from bs4 import BeautifulSoup
import sqlite3
import time
import re
import os
import sys
import threading
from urllib.parse import unquote
# Detectar ruta base del proyecto
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
if os.path.basename(SCRIPT_DIR) == "vehicle_scraper":
BASE_DIR = os.path.dirname(SCRIPT_DIR)
else:
BASE_DIR = SCRIPT_DIR
DB_PATH = os.path.join(BASE_DIR, "vehicle_database", "vehicle_database.db")
BASE_URL = "https://www.rockauto.com/en/catalog"
# Marcas a scrapear (Nissan ya fue procesado)
BRANDS = ["FORD", "CHEVROLET"]
# Años de 1975 a 2026 (orden descendente)
ALL_YEARS = list(range(2026, 1974, -1))
# Configuración de lotes
BATCH_SIZE = 5 # años por lote
WAIT_TIME = 180 # 3 minutos entre lotes
session = requests.Session()
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml',
'Accept-Language': 'en-US,en;q=0.9',
})
# Variable global para controlar salto de espera
skip_wait = False
def wait_with_skip(seconds, message=""):
"""Espera que se puede saltar presionando ENTER"""
global skip_wait
skip_wait = False
print(f"\n{'*'*60}")
print(f" {message}")
print(f" ACTIVA/CAMBIA EL VPN AHORA")
print(f" >>> Presiona ENTER para saltar la espera <<<")
print(f"{'*'*60}")
# Usar threading para detectar input
def check_input():
global skip_wait
try:
input()
skip_wait = True
except:
pass
input_thread = threading.Thread(target=check_input, daemon=True)
input_thread.start()
for sec in range(seconds, 0, -1):
if skip_wait:
print(f"\n >>> ESPERA SALTADA <<<")
return
mins = sec // 60
secs = sec % 60
print(f"\r Continuando en {mins}:{secs:02d}... (ENTER para saltar) ", end="", flush=True)
time.sleep(1)
print()
def clean_name(name):
name = unquote(name.replace('+', ' '))
return re.sub(r'\s+', ' ', name).strip().upper()
def get_soup(url, retries=3):
for attempt in range(retries):
try:
time.sleep(0.5)
response = session.get(url, timeout=15)
if response.status_code == 200:
return BeautifulSoup(response.content, 'html.parser')
elif response.status_code == 403:
print(f"\n [!] Bloqueado (403) - Cambia el VPN")
return None
except Exception as e:
if attempt < retries - 1:
time.sleep(3)
else:
print(f"\n Error: {e}")
return None
def get_models(brand, year):
brand_url = brand.lower().replace(' ', '+')
soup = get_soup(f"{BASE_URL}/{brand_url},{year}")
if not soup:
return []
models = set()
for link in soup.find_all('a', href=True):
pattern = rf'/catalog/{re.escape(brand_url)},{year},([^,/]+)'
match = re.search(pattern, link['href'], re.I)
if match:
model = clean_name(match.group(1))
if model and not model.isdigit() and len(model) > 1:
models.add(model)
return sorted(models)
def get_engines(brand, year, model):
brand_url = brand.lower().replace(' ', '+')
model_url = model.lower().replace(' ', '+')
soup = get_soup(f"{BASE_URL}/{brand_url},{year},{model_url}")
if not soup:
return ['STANDARD']
engines = set()
for link in soup.find_all('a', href=True):
pattern = rf'/catalog/{re.escape(brand_url)},{year},{re.escape(model_url)},([^,/]+)'
match = re.search(pattern, link['href'], re.I)
if match:
engine = clean_name(match.group(1))
if engine and re.search(r'\d+\.?\d*L|V\d|I\d|H\d|HYBRID|ELECTRIC|DIESEL', engine, re.I):
engines.add(engine)
return sorted(engines) if engines else ['STANDARD']
def save_to_db(conn, brand, year, model, engine):
cursor = conn.cursor()
try:
cursor.execute("INSERT OR IGNORE INTO brands (name) VALUES (?)", (brand,))
cursor.execute("SELECT id FROM brands WHERE name = ?", (brand,))
brand_id = cursor.fetchone()[0]
cursor.execute("INSERT OR IGNORE INTO years (year) VALUES (?)", (year,))
cursor.execute("SELECT id FROM years WHERE year = ?", (year,))
year_id = cursor.fetchone()[0]
cursor.execute("INSERT OR IGNORE INTO engines (name) VALUES (?)", (engine,))
cursor.execute("SELECT id FROM engines WHERE name = ?", (engine,))
engine_id = cursor.fetchone()[0]
cursor.execute("INSERT OR IGNORE INTO models (brand_id, name) VALUES (?, ?)", (brand_id, model))
cursor.execute("SELECT id FROM models WHERE brand_id = ? AND name = ?", (brand_id, model))
model_id = cursor.fetchone()[0]
cursor.execute(
"INSERT OR IGNORE INTO model_year_engine (model_id, year_id, engine_id) VALUES (?, ?, ?)",
(model_id, year_id, engine_id)
)
return cursor.rowcount > 0
except Exception as e:
print(f" DB Error: {e}")
return False
def get_existing_years(conn, brand):
"""Obtiene los años que ya existen para esta marca"""
cursor = conn.cursor()
cursor.execute("""
SELECT DISTINCT y.year
FROM years y
JOIN model_year_engine mye ON y.id = mye.year_id
JOIN models m ON mye.model_id = m.id
JOIN brands b ON m.brand_id = b.id
WHERE b.name = ?
""", (brand,))
return set(row[0] for row in cursor.fetchall())
def process_batch(conn, brand, years_batch, batch_num, total_batches):
"""Procesa un lote de 5 años"""
print(f"\n{'='*60}")
print(f"[{brand}] LOTE {batch_num}/{total_batches}: Años {years_batch}")
print('='*60)
batch_saved = 0
batch_total = 0
for year in years_batch:
print(f"\n[{brand} - Año {year}] Obteniendo modelos... ", end="", flush=True)
models = get_models(brand, year)
print(f"{len(models)} modelos encontrados")
if not models:
print(f" No se encontraron modelos para {year}")
continue
for model in models:
engines = get_engines(brand, year, model)
for engine in engines:
batch_total += 1
if save_to_db(conn, brand, year, model, engine):
batch_saved += 1
print(f" {model} - {engine}")
# Guardar cambios del lote
conn.commit()
print(f"\n>> Lote {batch_num} completado: {batch_saved} nuevos de {batch_total} encontrados")
return batch_saved, batch_total
def get_brand_batches(conn, brand):
"""Obtiene los lotes disponibles para una marca"""
existing = get_existing_years(conn, brand)
years_to_process = [y for y in ALL_YEARS if y not in existing]
if not years_to_process:
return [], existing
batches = [years_to_process[i:i+BATCH_SIZE] for i in range(0, len(years_to_process), BATCH_SIZE)]
return batches, existing
def process_brand(conn, brand, start_batch=1):
"""Procesa una marca completa desde un lote específico"""
print(f"\n{'#'*60}")
print(f" PROCESANDO MARCA: {brand}")
print(f"{'#'*60}")
# Verificar qué años ya existen
existing = get_existing_years(conn, brand)
print(f"Años existentes de {brand}: {len(existing)} años")
if existing:
print(f" Rango existente: {min(existing)}-{max(existing)}")
# Filtrar solo los que faltan
years_to_process = [y for y in ALL_YEARS if y not in existing]
if not years_to_process:
print(f"\n[OK] {brand}: Todos los años ya están en la base de datos!")
return 0, 0
print(f"\nAños por procesar para {brand}: {len(years_to_process)}")
print(f" De {max(years_to_process)} a {min(years_to_process)}")
# Dividir en lotes de 5
batches = [years_to_process[i:i+BATCH_SIZE] for i in range(0, len(years_to_process), BATCH_SIZE)]
total_batches = len(batches)
print(f"Lotes de {BATCH_SIZE} años: {total_batches} lotes")
if start_batch > 1:
print(f"\n>>> Comenzando desde el lote {start_batch} <<<")
total_saved = 0
total_found = 0
for i, batch in enumerate(batches, 1):
# Saltar lotes anteriores al inicial
if i < start_batch:
continue
saved, found = process_batch(conn, brand, batch, i, total_batches)
total_saved += saved
total_found += found
# Si no es el último lote, esperar para cambiar VPN
if i < total_batches:
wait_with_skip(WAIT_TIME, f"PAUSA DE {WAIT_TIME//60} MINUTOS - [{brand}] Lotes restantes: {total_batches - i}")
return total_saved, total_found
def show_batch_menu(conn):
"""Muestra menú para seleccionar marca y lote inicial"""
print("\n" + "="*60)
print(" MENÚ DE SELECCIÓN DE LOTES")
print("="*60)
brand_info = {}
for i, brand in enumerate(BRANDS, 1):
batches, existing = get_brand_batches(conn, brand)
brand_info[brand] = {'batches': batches, 'existing': existing}
if batches:
print(f"\n {i}. {brand}")
print(f" Años existentes: {len(existing)}")
print(f" Lotes pendientes: {len(batches)}")
for j, batch in enumerate(batches, 1):
print(f" Lote {j}: años {batch[0]}-{batch[-1]}")
else:
print(f"\n {i}. {brand} - [COMPLETO]")
print(f"\n 0. Procesar todo desde el inicio")
print("="*60)
# Seleccionar marca
while True:
try:
choice = input("\nSelecciona marca (0 para todo): ").strip()
if choice == '0' or choice == '':
return None, 1 # Procesar todo
brand_idx = int(choice) - 1
if 0 <= brand_idx < len(BRANDS):
selected_brand = BRANDS[brand_idx]
break
print("Opción inválida")
except ValueError:
print("Ingresa un número válido")
batches = brand_info[selected_brand]['batches']
if not batches:
print(f"\n{selected_brand} ya está completo!")
return selected_brand, 1
# Seleccionar lote
print(f"\n--- Lotes de {selected_brand} ---")
for j, batch in enumerate(batches, 1):
print(f" {j}. Lote {j}: años {batch[0]}-{batch[-1]}")
while True:
try:
batch_choice = input(f"\nComenzar desde lote (1-{len(batches)}): ").strip()
if batch_choice == '':
return selected_brand, 1
batch_num = int(batch_choice)
if 1 <= batch_num <= len(batches):
return selected_brand, batch_num
print(f"Ingresa un número entre 1 y {len(batches)}")
except ValueError:
print("Ingresa un número válido")
def main():
print("="*60)
print(" SCRAPER FORD, CHEVROLET")
print(f" Años: 1975-2026 | Lotes de {BATCH_SIZE} años")
print(f" Pausa entre lotes: {WAIT_TIME//60} minutos")
print(" >>> Presiona ENTER para saltar esperas <<<")
print("="*60)
# Verificar base de datos
if not os.path.exists(DB_PATH):
print(f"\n[ERROR] Base de datos no encontrada: {DB_PATH}")
print("Verifica que la ruta sea correcta.")
sys.exit(1)
print(f"\nBase de datos: {DB_PATH}")
conn = sqlite3.connect(DB_PATH)
# Mostrar estado inicial
print(f"\nMarcas a procesar: {', '.join(BRANDS)}")
print(f"Rango de años: {min(ALL_YEARS)}-{max(ALL_YEARS)} ({len(ALL_YEARS)} años)")
# Menú de selección de lotes
selected_brand, start_batch = show_batch_menu(conn)
grand_total_saved = 0
grand_total_found = 0
brand_stats = {}
# Determinar qué marcas procesar
if selected_brand:
# Solo procesar la marca seleccionada desde el lote indicado
brands_to_process = [selected_brand]
start_batches = {selected_brand: start_batch}
else:
# Procesar todas las marcas desde el inicio
brands_to_process = BRANDS
start_batches = {brand: 1 for brand in BRANDS}
for brand in brands_to_process:
saved, found = process_brand(conn, brand, start_batches.get(brand, 1))
brand_stats[brand] = {'saved': saved, 'found': found}
grand_total_saved += saved
grand_total_found += found
# Pausa entre marcas (si hay otra marca por procesar)
if brand != brands_to_process[-1]:
wait_with_skip(WAIT_TIME, f"PAUSA ENTRE MARCAS - Siguiente: {brands_to_process[brands_to_process.index(brand)+1]}")
conn.close()
print("\n" + "="*60)
print(" RESUMEN FINAL")
print("="*60)
for brand, stats in brand_stats.items():
print(f" {brand}:")
print(f" Encontrados: {stats['found']}")
print(f" Nuevos guardados: {stats['saved']}")
print("-"*60)
print(f" TOTAL:")
print(f" Vehículos encontrados: {grand_total_found}")
print(f" Nuevos guardados: {grand_total_saved}")
print("="*60)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,163 @@
#!/usr/bin/env python3
"""
Script optimizado para extraer todos los vehículos Toyota de RockAuto
Guarda datos incrementalmente para no perder progreso
"""
import requests
from bs4 import BeautifulSoup
import sqlite3
import time
import re
import sys
from urllib.parse import unquote
DB_PATH = "/home/Autopartes/vehicle_database/vehicle_database.db"
BASE_URL = "https://www.rockauto.com/en/catalog"
session = requests.Session()
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml',
'Accept-Language': 'en-US,en;q=0.9',
})
def clean_name(name):
name = unquote(name.replace('+', ' '))
return re.sub(r'\s+', ' ', name).strip().upper()
def get_soup(url, retries=3):
for attempt in range(retries):
try:
time.sleep(0.3) # Delay corto
response = session.get(url, timeout=10)
if response.status_code == 200:
return BeautifulSoup(response.content, 'html.parser')
except Exception as e:
if attempt < retries - 1:
time.sleep(2)
else:
print(f" Error: {e}")
return None
def get_years(brand):
brand_url = brand.lower().replace(' ', '+')
soup = get_soup(f"{BASE_URL}/{brand_url}")
if not soup:
return []
years = set()
for link in soup.find_all('a', href=True):
match = re.search(rf'/catalog/{re.escape(brand_url)},(\d{{4}})', link['href'], re.I)
if match:
year = int(match.group(1))
if 1950 < year <= 2030:
years.add(year)
return sorted(years, reverse=True)
def get_models(brand, year):
brand_url = brand.lower().replace(' ', '+')
soup = get_soup(f"{BASE_URL}/{brand_url},{year}")
if not soup:
return []
models = set()
for link in soup.find_all('a', href=True):
pattern = rf'/catalog/{re.escape(brand_url)},{year},([^,/]+)'
match = re.search(pattern, link['href'], re.I)
if match:
model = clean_name(match.group(1))
if model and not model.isdigit() and len(model) > 1:
models.add(model)
return sorted(models)
def get_engines(brand, year, model):
brand_url = brand.lower().replace(' ', '+')
model_url = model.lower().replace(' ', '+')
soup = get_soup(f"{BASE_URL}/{brand_url},{year},{model_url}")
if not soup:
return []
engines = set()
for link in soup.find_all('a', href=True):
pattern = rf'/catalog/{re.escape(brand_url)},{year},{re.escape(model_url)},([^,/]+)'
match = re.search(pattern, link['href'], re.I)
if match:
engine = clean_name(match.group(1))
if engine and re.search(r'\d+\.?\d*L|V\d|I\d|H\d|HYBRID|ELECTRIC|DIESEL', engine, re.I):
engines.add(engine)
return sorted(engines) if engines else ['Standard']
def save_to_db(conn, brand, year, model, engine):
cursor = conn.cursor()
try:
cursor.execute("INSERT OR IGNORE INTO brands (name) VALUES (?)", (brand,))
cursor.execute("SELECT id FROM brands WHERE name = ?", (brand,))
brand_id = cursor.fetchone()[0]
cursor.execute("INSERT OR IGNORE INTO years (year) VALUES (?)", (year,))
cursor.execute("SELECT id FROM years WHERE year = ?", (year,))
year_id = cursor.fetchone()[0]
cursor.execute("INSERT OR IGNORE INTO engines (name) VALUES (?)", (engine,))
cursor.execute("SELECT id FROM engines WHERE name = ?", (engine,))
engine_id = cursor.fetchone()[0]
cursor.execute("INSERT OR IGNORE INTO models (brand_id, name) VALUES (?, ?)", (brand_id, model))
cursor.execute("SELECT id FROM models WHERE brand_id = ? AND name = ?", (brand_id, model))
model_id = cursor.fetchone()[0]
cursor.execute(
"INSERT OR IGNORE INTO model_year_engine (model_id, year_id, engine_id) VALUES (?, ?, ?)",
(model_id, year_id, engine_id)
)
return cursor.rowcount > 0
except Exception as e:
print(f" DB Error: {e}")
return False
def main():
brand = "TOYOTA"
print(f"Obteniendo años disponibles para {brand}...")
years = get_years(brand)
print(f"Encontrados {len(years)} años: {years[0]} - {years[-1]}")
# Filtrar solo 1975-2026
years = [y for y in years if 1975 <= y <= 2026]
print(f"Procesando años 1975-2026: {len(years)} años")
print("=" * 60)
conn = sqlite3.connect(DB_PATH)
total_saved = 0
total_vehicles = 0
for i, year in enumerate(years, 1):
print(f"\n[{i}/{len(years)}] Año {year}: ", end="", flush=True)
models = get_models(brand, year)
print(f"{len(models)} modelos")
year_count = 0
for model in models:
engines = get_engines(brand, year, model)
for engine in engines:
total_vehicles += 1
if save_to_db(conn, brand, year, model, engine):
total_saved += 1
year_count += 1
print(f" {model}: {engine}")
conn.commit()
print(f" -> Guardados: {year_count} nuevos")
conn.close()
print("\n" + "=" * 60)
print(f"RESUMEN TOYOTA")
print(f" Años procesados: {len(years)}")
print(f" Total vehículos encontrados: {total_vehicles}")
print(f" Nuevos guardados: {total_saved}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,240 @@
#!/usr/bin/env python3
"""
Scraper de Toyota para Windows
- Procesa de 3 en 3 años
- Espera 60 segundos entre lotes para activar VPN
- Años faltantes: 1975-2003
"""
import requests
from bs4 import BeautifulSoup
import sqlite3
import time
import re
import os
import sys
from urllib.parse import unquote
# Detectar ruta base del proyecto
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
# Si estamos en vehicle_scraper, subir un nivel
if os.path.basename(SCRIPT_DIR) == "vehicle_scraper":
BASE_DIR = os.path.dirname(SCRIPT_DIR)
else:
BASE_DIR = SCRIPT_DIR
DB_PATH = os.path.join(BASE_DIR, "vehicle_database", "vehicle_database.db")
BASE_URL = "https://www.rockauto.com/en/catalog"
# Años que faltan por scrapear
MISSING_YEARS = [
2003, 2002, 2001, 2000, 1999, 1998, 1997, 1996, 1995, 1994,
1993, 1992, 1991, 1990, 1989, 1988, 1987, 1986, 1985, 1984,
1983, 1982, 1981, 1980, 1979, 1978, 1977, 1976, 1975
]
session = requests.Session()
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml',
'Accept-Language': 'en-US,en;q=0.9',
})
def clean_name(name):
name = unquote(name.replace('+', ' '))
return re.sub(r'\s+', ' ', name).strip().upper()
def get_soup(url, retries=3):
for attempt in range(retries):
try:
time.sleep(0.5)
response = session.get(url, timeout=15)
if response.status_code == 200:
return BeautifulSoup(response.content, 'html.parser')
elif response.status_code == 403:
print(f"\n [!] Bloqueado (403) - Cambia el VPN")
return None
except Exception as e:
if attempt < retries - 1:
time.sleep(3)
else:
print(f"\n Error: {e}")
return None
def get_models(brand, year):
brand_url = brand.lower().replace(' ', '+')
soup = get_soup(f"{BASE_URL}/{brand_url},{year}")
if not soup:
return []
models = set()
for link in soup.find_all('a', href=True):
pattern = rf'/catalog/{re.escape(brand_url)},{year},([^,/]+)'
match = re.search(pattern, link['href'], re.I)
if match:
model = clean_name(match.group(1))
if model and not model.isdigit() and len(model) > 1:
models.add(model)
return sorted(models)
def get_engines(brand, year, model):
brand_url = brand.lower().replace(' ', '+')
model_url = model.lower().replace(' ', '+')
soup = get_soup(f"{BASE_URL}/{brand_url},{year},{model_url}")
if not soup:
return ['STANDARD']
engines = set()
for link in soup.find_all('a', href=True):
pattern = rf'/catalog/{re.escape(brand_url)},{year},{re.escape(model_url)},([^,/]+)'
match = re.search(pattern, link['href'], re.I)
if match:
engine = clean_name(match.group(1))
if engine and re.search(r'\d+\.?\d*L|V\d|I\d|H\d|HYBRID|ELECTRIC|DIESEL', engine, re.I):
engines.add(engine)
return sorted(engines) if engines else ['STANDARD']
def save_to_db(conn, brand, year, model, engine):
cursor = conn.cursor()
try:
cursor.execute("INSERT OR IGNORE INTO brands (name) VALUES (?)", (brand,))
cursor.execute("SELECT id FROM brands WHERE name = ?", (brand,))
brand_id = cursor.fetchone()[0]
cursor.execute("INSERT OR IGNORE INTO years (year) VALUES (?)", (year,))
cursor.execute("SELECT id FROM years WHERE year = ?", (year,))
year_id = cursor.fetchone()[0]
cursor.execute("INSERT OR IGNORE INTO engines (name) VALUES (?)", (engine,))
cursor.execute("SELECT id FROM engines WHERE name = ?", (engine,))
engine_id = cursor.fetchone()[0]
cursor.execute("INSERT OR IGNORE INTO models (brand_id, name) VALUES (?, ?)", (brand_id, model))
cursor.execute("SELECT id FROM models WHERE brand_id = ? AND name = ?", (brand_id, model))
model_id = cursor.fetchone()[0]
cursor.execute(
"INSERT OR IGNORE INTO model_year_engine (model_id, year_id, engine_id) VALUES (?, ?, ?)",
(model_id, year_id, engine_id)
)
return cursor.rowcount > 0
except Exception as e:
print(f" DB Error: {e}")
return False
def get_existing_years(conn, brand):
"""Obtiene los años que ya existen para esta marca"""
cursor = conn.cursor()
cursor.execute("""
SELECT DISTINCT y.year
FROM years y
JOIN model_year_engine mye ON y.id = mye.year_id
JOIN models m ON mye.model_id = m.id
JOIN brands b ON m.brand_id = b.id
WHERE b.name = ?
""", (brand,))
return set(row[0] for row in cursor.fetchall())
def process_batch(conn, brand, years_batch, batch_num, total_batches):
"""Procesa un lote de 3 años"""
print(f"\n{'='*60}")
print(f"LOTE {batch_num}/{total_batches}: Años {years_batch}")
print('='*60)
batch_saved = 0
batch_total = 0
for year in years_batch:
print(f"\n[Año {year}] Obteniendo modelos... ", end="", flush=True)
models = get_models(brand, year)
print(f"{len(models)} modelos encontrados")
if not models:
print(f" No se encontraron modelos para {year}")
continue
for model in models:
engines = get_engines(brand, year, model)
for engine in engines:
batch_total += 1
if save_to_db(conn, brand, year, model, engine):
batch_saved += 1
print(f" {model} - {engine}")
# Guardar cambios del lote
conn.commit()
print(f"\n>> Lote {batch_num} completado: {batch_saved} nuevos de {batch_total} encontrados")
return batch_saved, batch_total
def main():
brand = "TOYOTA"
print("="*60)
print(" SCRAPER TOYOTA - WINDOWS")
print(" Procesa 3 años, guarda, espera 60s para VPN")
print("="*60)
# Verificar base de datos
if not os.path.exists(DB_PATH):
print(f"\n[ERROR] Base de datos no encontrada: {DB_PATH}")
print("Verifica que la ruta sea correcta.")
sys.exit(1)
print(f"\nBase de datos: {DB_PATH}")
conn = sqlite3.connect(DB_PATH)
# Verificar qué años ya existen
existing = get_existing_years(conn, brand)
print(f"Años existentes de {brand}: {sorted(existing)}")
# Filtrar solo los que faltan
years_to_process = [y for y in MISSING_YEARS if y not in existing]
if not years_to_process:
print("\n[OK] Todos los años ya están en la base de datos!")
conn.close()
return
print(f"\nAños por procesar: {years_to_process}")
print(f"Total: {len(years_to_process)} años")
# Dividir en lotes de 3
batches = [years_to_process[i:i+3] for i in range(0, len(years_to_process), 3)]
total_batches = len(batches)
print(f"Lotes de 3 años: {total_batches} lotes")
input("\nPresiona ENTER para comenzar...")
total_saved = 0
total_found = 0
for i, batch in enumerate(batches, 1):
saved, found = process_batch(conn, brand, batch, i, total_batches)
total_saved += saved
total_found += found
# Si no es el último lote, esperar para cambiar VPN
if i < total_batches:
print(f"\n{'*'*60}")
print(f" PAUSA DE 60 SEGUNDOS - ACTIVA/CAMBIA EL VPN AHORA")
print(f" Lotes restantes: {total_batches - i}")
print(f"{'*'*60}")
for sec in range(60, 0, -1):
print(f"\r Continuando en {sec} segundos... ", end="", flush=True)
time.sleep(1)
print()
conn.close()
print("\n" + "="*60)
print(" RESUMEN FINAL - TOYOTA")
print("="*60)
print(f" Años procesados: {len(years_to_process)}")
print(f" Vehículos encontrados: {total_found}")
print(f" Nuevos guardados: {total_saved}")
print("="*60)
if __name__ == "__main__":
main()