- SaaS + aftermarket design spec - SaaS + aftermarket implementation plan (15 tasks) - Captura partes design - POS + cuentas design and plan Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
42 KiB
SaaS + Aftermarket Implementation Plan
For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: Add JWT authentication with role-based access, warehouse inventory uploads with flexible column mapping, and migrate 357K AFT- aftermarket parts to their proper table.
Architecture: Monolith Flask + PostgreSQL. Auth via JWT (PyJWT + bcrypt). File uploads via openpyxl/csv. All new tables in same DB. Middleware decorator for route protection.
Tech Stack: Flask, PostgreSQL, PyJWT, bcrypt (already installed), openpyxl (new), python-dotenv (optional)
Task 1: Install Dependencies & Update Config
Files:
- Modify:
/home/Autopartes/requirements.txt - Modify:
/home/Autopartes/config.py
Step 1: Install new packages
Run:
pip install PyJWT openpyxl
Step 2: Update requirements.txt
Add to /home/Autopartes/requirements.txt:
PyJWT>=2.8
openpyxl>=3.1
Step 3: Add JWT config to config.py
Add to /home/Autopartes/config.py:
import os
JWT_SECRET = os.environ.get("JWT_SECRET", "nexus-saas-secret-change-in-prod-2026")
JWT_ACCESS_EXPIRES = 900 # 15 minutes
JWT_REFRESH_EXPIRES = 2592000 # 30 days
Step 4: Commit
git add requirements.txt config.py
git commit -m "feat: add JWT and openpyxl dependencies"
Task 2: Database Schema Changes
Files:
- Create:
/home/Autopartes/scripts/migrate_saas_schema.py
Step 1: Write migration script
Create /home/Autopartes/scripts/migrate_saas_schema.py:
#!/usr/bin/env python3
"""Add SaaS tables: sessions, warehouse_inventory, inventory_uploads, inventory_column_mappings.
Alter users table: add business_name, is_active, last_login.
Update roles to new names."""
import psycopg2
DB_URL = "postgresql://nexus:nexus_autoparts_2026@localhost/nexus_autoparts"
def run():
conn = psycopg2.connect(DB_URL)
conn.autocommit = True
cur = conn.cursor()
# Update role names
print("Updating roles...")
cur.execute("UPDATE roles SET name_rol = 'ADMIN' WHERE id_rol = 1")
cur.execute("UPDATE roles SET name_rol = 'OWNER' WHERE id_rol = 2")
cur.execute("UPDATE roles SET name_rol = 'TALLER' WHERE id_rol = 3")
cur.execute("UPDATE roles SET name_rol = 'BODEGA' WHERE id_rol = 4")
# Insert if missing
cur.execute("""
INSERT INTO roles (id_rol, name_rol) VALUES (3, 'TALLER')
ON CONFLICT (id_rol) DO UPDATE SET name_rol = EXCLUDED.name_rol
""")
cur.execute("""
INSERT INTO roles (id_rol, name_rol) VALUES (4, 'BODEGA')
ON CONFLICT (id_rol) DO UPDATE SET name_rol = EXCLUDED.name_rol
""")
# Alter users table
print("Altering users table...")
alterations = [
"ALTER TABLE users ADD COLUMN IF NOT EXISTS business_name VARCHAR(200)",
"ALTER TABLE users ADD COLUMN IF NOT EXISTS is_active BOOLEAN DEFAULT false",
"ALTER TABLE users ADD COLUMN IF NOT EXISTS created_at TIMESTAMP DEFAULT now()",
"ALTER TABLE users ADD COLUMN IF NOT EXISTS last_login TIMESTAMP",
]
for sql in alterations:
try:
cur.execute(sql)
except Exception as e:
print(f" Skip: {e}")
# Make email unique if not already
cur.execute("""
DO $$ BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'users_email_unique') THEN
CREATE UNIQUE INDEX users_email_unique ON users(email);
END IF;
END $$;
""")
# Sessions table
print("Creating sessions table...")
cur.execute("""
CREATE TABLE IF NOT EXISTS sessions (
id_session SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id_user) ON DELETE CASCADE,
refresh_token VARCHAR(500) UNIQUE NOT NULL,
expires_at TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT now()
)
""")
cur.execute("CREATE INDEX IF NOT EXISTS idx_sessions_token ON sessions(refresh_token)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_sessions_user ON sessions(user_id)")
# Warehouse inventory table
print("Creating warehouse_inventory table...")
cur.execute("""
CREATE TABLE IF NOT EXISTS warehouse_inventory (
id_inventory BIGSERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id_user) ON DELETE CASCADE,
part_id INTEGER NOT NULL REFERENCES parts(id_part) ON DELETE CASCADE,
price NUMERIC(12,2),
stock_quantity INTEGER DEFAULT 0,
min_order_quantity INTEGER DEFAULT 1,
warehouse_location VARCHAR(100) DEFAULT 'Principal',
updated_at TIMESTAMP DEFAULT now(),
UNIQUE(user_id, part_id, warehouse_location)
)
""")
cur.execute("CREATE INDEX IF NOT EXISTS idx_wi_part ON warehouse_inventory(part_id)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_wi_user ON warehouse_inventory(user_id)")
# Inventory uploads table
print("Creating inventory_uploads table...")
cur.execute("""
CREATE TABLE IF NOT EXISTS inventory_uploads (
id_upload SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id_user) ON DELETE CASCADE,
filename VARCHAR(200),
status VARCHAR(20) DEFAULT 'pending',
rows_total INTEGER DEFAULT 0,
rows_imported INTEGER DEFAULT 0,
rows_errors INTEGER DEFAULT 0,
error_log TEXT,
created_at TIMESTAMP DEFAULT now(),
completed_at TIMESTAMP
)
""")
# Column mappings table
print("Creating inventory_column_mappings table...")
cur.execute("""
CREATE TABLE IF NOT EXISTS inventory_column_mappings (
id_mapping SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id_user) ON DELETE CASCADE UNIQUE,
mapping JSONB NOT NULL DEFAULT '{}'::jsonb
)
""")
# Set existing admin user as active
cur.execute("UPDATE users SET is_active = true WHERE id_rol = 1")
cur.close()
conn.close()
print("Migration complete!")
if __name__ == "__main__":
run()
Step 2: Run migration
Run:
python3 /home/Autopartes/scripts/migrate_saas_schema.py
Expected: All tables created, roles updated, existing admin set active.
Step 3: Verify
Run:
PGPASSWORD=nexus_autoparts_2026 psql -U nexus -d nexus_autoparts -h localhost \
-c "SELECT * FROM roles;" \
-c "\d sessions" \
-c "\d warehouse_inventory" \
-c "\d inventory_uploads" \
-c "\d inventory_column_mappings"
Step 4: Commit
git add scripts/migrate_saas_schema.py
git commit -m "feat: add SaaS schema — sessions, inventory, mappings tables"
Task 3: Auth Middleware & Helpers
Files:
- Create:
/home/Autopartes/dashboard/auth.py
Step 1: Create auth module
Create /home/Autopartes/dashboard/auth.py:
"""JWT authentication helpers for Nexus Autoparts SaaS."""
import jwt
import bcrypt
import psycopg2
import secrets
from datetime import datetime, timedelta, timezone
from functools import wraps
from flask import request, jsonify, g
import sys
sys.path.insert(0, '/home/Autopartes')
from config import DB_URL, JWT_SECRET, JWT_ACCESS_EXPIRES, JWT_REFRESH_EXPIRES
def get_db():
"""Get a psycopg2 connection."""
return psycopg2.connect(DB_URL)
def hash_password(password):
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
def check_password(password, hashed):
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
def create_access_token(user_id, role, business_name):
payload = {
'user_id': user_id,
'role': role,
'business_name': business_name or '',
'exp': datetime.now(timezone.utc) + timedelta(seconds=JWT_ACCESS_EXPIRES),
'type': 'access'
}
return jwt.encode(payload, JWT_SECRET, algorithm='HS256')
def create_refresh_token(user_id):
token = secrets.token_urlsafe(64)
expires = datetime.now(timezone.utc) + timedelta(seconds=JWT_REFRESH_EXPIRES)
conn = get_db()
cur = conn.cursor()
cur.execute(
"INSERT INTO sessions (user_id, refresh_token, expires_at) VALUES (%s, %s, %s)",
(user_id, token, expires)
)
conn.commit()
cur.close()
conn.close()
return token
def decode_token(token):
try:
return jwt.decode(token, JWT_SECRET, algorithms=['HS256'])
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
def require_auth(*allowed_roles):
"""Decorator: require JWT with optional role check.
Usage:
@require_auth() # any authenticated user
@require_auth('ADMIN', 'OWNER') # only these roles
@require_auth('BODEGA') # only bodegas
"""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return jsonify({'error': 'Token required'}), 401
payload = decode_token(auth_header[7:])
if not payload or payload.get('type') != 'access':
return jsonify({'error': 'Invalid or expired token'}), 401
if allowed_roles and payload['role'] not in allowed_roles:
return jsonify({'error': 'Insufficient permissions'}), 403
g.user = payload
return f(*args, **kwargs)
return decorated
return decorator
Step 2: Commit
git add dashboard/auth.py
git commit -m "feat: add JWT auth module with middleware decorator"
Task 4: Auth API Endpoints
Files:
- Modify:
/home/Autopartes/dashboard/server.py(add auth routes)
Step 1: Add imports at top of server.py
After existing imports (around line 12), add:
from auth import (
hash_password, check_password,
create_access_token, create_refresh_token,
decode_token, require_auth
)
Step 2: Add auth endpoints
Add before the if __name__ == '__main__' block at the end of server.py:
# ─── Auth Endpoints ─────────────────────────────────────────
@app.route('/api/auth/register', methods=['POST'])
def auth_register():
data = request.json or {}
required = ['name', 'email', 'password', 'role']
for f in required:
if not data.get(f):
return jsonify({'error': f'Missing field: {f}'}), 400
role = data['role'].upper()
if role not in ('TALLER', 'BODEGA'):
return jsonify({'error': 'Role must be TALLER or BODEGA'}), 400
role_map = {'TALLER': 3, 'BODEGA': 4}
hashed = hash_password(data['password'])
conn = get_db_connection()
cur = conn.cursor()
try:
cur.execute("""
INSERT INTO users (name_user, email, pass, id_rol, business_name, phone, address, is_active)
VALUES (%s, %s, %s, %s, %s, %s, %s, false)
RETURNING id_user
""", (data['name'], data['email'], hashed, role_map[role],
data.get('business_name', ''), data.get('phone', ''), data.get('address', '')))
user_id = cur.fetchone()[0]
conn.commit()
return jsonify({'message': 'Account created. Pending admin approval.', 'user_id': user_id}), 201
except Exception as e:
conn.rollback()
if 'users_email_unique' in str(e) or 'duplicate' in str(e).lower():
return jsonify({'error': 'Email already registered'}), 409
return jsonify({'error': str(e)}), 500
finally:
cur.close()
conn.close()
@app.route('/api/auth/login', methods=['POST'])
def auth_login():
data = request.json or {}
email = data.get('email', '')
password = data.get('password', '')
if not email or not password:
return jsonify({'error': 'Email and password required'}), 400
conn = get_db_connection()
cur = conn.cursor()
cur.execute("""
SELECT u.id_user, u.name_user, u.pass, u.is_active, u.business_name, r.name_rol
FROM users u JOIN roles r ON u.id_rol = r.id_rol
WHERE u.email = %s
""", (email,))
row = cur.fetchone()
if not row:
cur.close(); conn.close()
return jsonify({'error': 'Invalid credentials'}), 401
user_id, name, hashed, is_active, business, role = row
if not hashed or not check_password(password, hashed):
cur.close(); conn.close()
return jsonify({'error': 'Invalid credentials'}), 401
if not is_active:
cur.close(); conn.close()
return jsonify({'error': 'Account pending approval'}), 403
# Update last_login
cur.execute("UPDATE users SET last_login = now() WHERE id_user = %s", (user_id,))
conn.commit()
cur.close()
conn.close()
access = create_access_token(user_id, role, business)
refresh = create_refresh_token(user_id)
return jsonify({
'access_token': access,
'refresh_token': refresh,
'user': {'id': user_id, 'name': name, 'role': role, 'business_name': business}
})
@app.route('/api/auth/refresh', methods=['POST'])
def auth_refresh():
data = request.json or {}
token = data.get('refresh_token', '')
if not token:
return jsonify({'error': 'Refresh token required'}), 400
conn = get_db_connection()
cur = conn.cursor()
cur.execute("""
SELECT s.user_id, s.expires_at, u.name_user, u.business_name, r.name_rol
FROM sessions s
JOIN users u ON s.user_id = u.id_user
JOIN roles r ON u.id_rol = r.id_rol
WHERE s.refresh_token = %s
""", (token,))
row = cur.fetchone()
if not row:
cur.close(); conn.close()
return jsonify({'error': 'Invalid refresh token'}), 401
user_id, expires, name, business, role = row
from datetime import datetime, timezone
if expires.replace(tzinfo=timezone.utc) < datetime.now(timezone.utc):
cur.execute("DELETE FROM sessions WHERE refresh_token = %s", (token,))
conn.commit(); cur.close(); conn.close()
return jsonify({'error': 'Refresh token expired'}), 401
cur.close()
conn.close()
access = create_access_token(user_id, role, business)
return jsonify({'access_token': access})
@app.route('/api/auth/me', methods=['GET'])
@require_auth()
def auth_me():
return jsonify({'user': g.user})
Step 3: Commit
git add dashboard/server.py
git commit -m "feat: add auth endpoints — register, login, refresh, me"
Task 5: Admin User Management Endpoints
Files:
- Modify:
/home/Autopartes/dashboard/server.py
Step 1: Add admin user endpoints
Add after the auth endpoints in server.py:
# ─── Admin User Management ──────────────────────────────────
@app.route('/api/admin/users', methods=['GET'])
@require_auth('ADMIN', 'OWNER')
def admin_list_users():
conn = get_db_connection()
cur = conn.cursor()
cur.execute("""
SELECT u.id_user, u.name_user, u.email, u.business_name,
u.phone, u.is_active, u.created_at, u.last_login, r.name_rol
FROM users u JOIN roles r ON u.id_rol = r.id_rol
ORDER BY u.created_at DESC
""")
users = []
for row in cur.fetchall():
users.append({
'id': row[0], 'name': row[1], 'email': row[2],
'business_name': row[3], 'phone': row[4],
'is_active': row[5], 'created_at': str(row[6]),
'last_login': str(row[7]) if row[7] else None, 'role': row[8]
})
cur.close(); conn.close()
return jsonify({'data': users})
@app.route('/api/admin/users/<int:user_id>/activate', methods=['PUT'])
@require_auth('ADMIN', 'OWNER')
def admin_toggle_user(user_id):
data = request.json or {}
active = data.get('is_active', True)
conn = get_db_connection()
cur = conn.cursor()
cur.execute("UPDATE users SET is_active = %s WHERE id_user = %s", (active, user_id))
conn.commit()
cur.close(); conn.close()
return jsonify({'message': f'User {"activated" if active else "deactivated"}'})
Step 2: Commit
git add dashboard/server.py
git commit -m "feat: add admin user management endpoints"
Task 6: Inventory Upload & Mapping Endpoints
Files:
- Modify:
/home/Autopartes/dashboard/server.py
Step 1: Add inventory endpoints
Add after admin user endpoints in server.py:
# ─── Inventory Endpoints (BODEGA) ───────────────────────────
import csv
import io
import openpyxl
from datetime import datetime, timezone
@app.route('/api/inventory/mapping', methods=['GET'])
@require_auth('BODEGA', 'ADMIN')
def get_inventory_mapping():
user_id = g.user['user_id']
conn = get_db_connection()
cur = conn.cursor()
cur.execute("SELECT mapping FROM inventory_column_mappings WHERE user_id = %s", (user_id,))
row = cur.fetchone()
cur.close(); conn.close()
return jsonify({'mapping': row[0] if row else {}})
@app.route('/api/inventory/mapping', methods=['PUT'])
@require_auth('BODEGA', 'ADMIN')
def set_inventory_mapping():
user_id = g.user['user_id']
mapping = (request.json or {}).get('mapping', {})
required_keys = ['part_number', 'price', 'stock']
for k in required_keys:
if k not in mapping:
return jsonify({'error': f'Mapping must include: {", ".join(required_keys)}'}), 400
conn = get_db_connection()
cur = conn.cursor()
cur.execute("""
INSERT INTO inventory_column_mappings (user_id, mapping)
VALUES (%s, %s::jsonb)
ON CONFLICT (user_id) DO UPDATE SET mapping = EXCLUDED.mapping
""", (user_id, json.dumps(mapping)))
conn.commit()
cur.close(); conn.close()
return jsonify({'message': 'Mapping saved'})
@app.route('/api/inventory/upload', methods=['POST'])
@require_auth('BODEGA', 'ADMIN')
def upload_inventory():
user_id = g.user['user_id']
if 'file' not in request.files:
return jsonify({'error': 'No file uploaded'}), 400
file = request.files['file']
filename = file.filename or 'unknown'
# Get mapping
conn = get_db_connection()
cur = conn.cursor()
cur.execute("SELECT mapping FROM inventory_column_mappings WHERE user_id = %s", (user_id,))
row = cur.fetchone()
if not row:
cur.close(); conn.close()
return jsonify({'error': 'Configure column mapping first'}), 400
mapping = row[0]
# Create upload record
cur.execute("""
INSERT INTO inventory_uploads (user_id, filename, status)
VALUES (%s, %s, 'processing') RETURNING id_upload
""", (user_id, filename))
upload_id = cur.fetchone()[0]
conn.commit()
# Parse file
try:
rows = []
if filename.endswith(('.xlsx', '.xls')):
wb = openpyxl.load_workbook(io.BytesIO(file.read()), read_only=True)
ws = wb.active
headers = [str(c.value or '').strip() for c in next(ws.iter_rows(min_row=1, max_row=1))]
for row_cells in ws.iter_rows(min_row=2, values_only=True):
rows.append({headers[i]: row_cells[i] for i in range(len(headers)) if i < len(row_cells)})
wb.close()
else: # CSV
content = file.read().decode('utf-8-sig')
reader = csv.DictReader(io.StringIO(content))
rows = list(reader)
# Apply mapping and upsert
imported = 0
errors = []
part_col = mapping['part_number']
price_col = mapping['price']
stock_col = mapping['stock']
location_col = mapping.get('location')
for i, r in enumerate(rows):
try:
part_num = str(r.get(part_col, '')).strip()
if not part_num:
continue
price = float(str(r.get(price_col, 0)).replace(',', '').replace('$', ''))
stock = int(float(str(r.get(stock_col, 0)).replace(',', '')))
location = str(r.get(location_col, 'Principal')).strip() if location_col else 'Principal'
# Find part by oem_part_number
cur.execute("SELECT id_part FROM parts WHERE oem_part_number = %s", (part_num,))
part_row = cur.fetchone()
if not part_row:
# Try aftermarket_parts
cur.execute("SELECT oem_part_id FROM aftermarket_parts WHERE part_number = %s LIMIT 1", (part_num,))
aft_row = cur.fetchone()
if aft_row:
part_id = aft_row[0]
else:
errors.append(f"Row {i+2}: Part '{part_num}' not found")
continue
else:
part_id = part_row[0]
cur.execute("""
INSERT INTO warehouse_inventory (user_id, part_id, price, stock_quantity, warehouse_location, updated_at)
VALUES (%s, %s, %s, %s, %s, now())
ON CONFLICT (user_id, part_id, warehouse_location)
DO UPDATE SET price = EXCLUDED.price, stock_quantity = EXCLUDED.stock_quantity, updated_at = now()
""", (user_id, part_id, price, stock, location))
imported += 1
except Exception as e:
errors.append(f"Row {i+2}: {str(e)[:100]}")
conn.commit()
# Update upload record
cur.execute("""
UPDATE inventory_uploads
SET status = 'completed', rows_total = %s, rows_imported = %s,
rows_errors = %s, error_log = %s, completed_at = now()
WHERE id_upload = %s
""", (len(rows), imported, len(errors), '\n'.join(errors[:100]), upload_id))
conn.commit()
cur.close(); conn.close()
return jsonify({
'message': f'Imported {imported}/{len(rows)} rows',
'upload_id': upload_id,
'imported': imported,
'errors': len(errors),
'error_samples': errors[:10]
})
except Exception as e:
cur.execute("""
UPDATE inventory_uploads SET status = 'failed', error_log = %s, completed_at = now()
WHERE id_upload = %s
""", (str(e), upload_id))
conn.commit(); cur.close(); conn.close()
return jsonify({'error': f'File processing failed: {str(e)}'}), 500
@app.route('/api/inventory/uploads', methods=['GET'])
@require_auth('BODEGA', 'ADMIN')
def list_inventory_uploads():
user_id = g.user['user_id']
conn = get_db_connection()
cur = conn.cursor()
cur.execute("""
SELECT id_upload, filename, status, rows_total, rows_imported, rows_errors, created_at, completed_at
FROM inventory_uploads WHERE user_id = %s ORDER BY created_at DESC LIMIT 50
""", (user_id,))
uploads = []
for r in cur.fetchall():
uploads.append({
'id': r[0], 'filename': r[1], 'status': r[2],
'rows_total': r[3], 'rows_imported': r[4], 'rows_errors': r[5],
'created_at': str(r[6]), 'completed_at': str(r[7]) if r[7] else None
})
cur.close(); conn.close()
return jsonify({'data': uploads})
@app.route('/api/inventory/items', methods=['GET'])
@require_auth('BODEGA', 'ADMIN')
def list_inventory_items():
user_id = g.user['user_id']
page = int(request.args.get('page', 1))
per_page = min(int(request.args.get('per_page', 50)), 200)
search = request.args.get('q', '')
conn = get_db_connection()
cur = conn.cursor()
where = "WHERE wi.user_id = %s"
params = [user_id]
if search:
where += " AND (p.oem_part_number ILIKE %s OR p.name_part ILIKE %s)"
params.extend([f'%{search}%', f'%{search}%'])
cur.execute(f"SELECT count(*) FROM warehouse_inventory wi JOIN parts p ON wi.part_id = p.id_part {where}", params)
total = cur.fetchone()[0]
cur.execute(f"""
SELECT wi.id_inventory, p.oem_part_number, p.name_part, wi.price,
wi.stock_quantity, wi.warehouse_location, wi.updated_at
FROM warehouse_inventory wi
JOIN parts p ON wi.part_id = p.id_part
{where}
ORDER BY wi.updated_at DESC
LIMIT %s OFFSET %s
""", params + [per_page, (page - 1) * per_page])
items = []
for r in cur.fetchall():
items.append({
'id': r[0], 'part_number': r[1], 'part_name': r[2],
'price': float(r[3]) if r[3] else None,
'stock': r[4], 'location': r[5], 'updated_at': str(r[6])
})
cur.close(); conn.close()
total_pages = (total + per_page - 1) // per_page
return jsonify({
'data': items,
'pagination': {'page': page, 'per_page': per_page, 'total': total, 'total_pages': total_pages}
})
@app.route('/api/inventory/items', methods=['DELETE'])
@require_auth('BODEGA', 'ADMIN')
def clear_inventory():
user_id = g.user['user_id']
conn = get_db_connection()
cur = conn.cursor()
cur.execute("DELETE FROM warehouse_inventory WHERE user_id = %s", (user_id,))
deleted = cur.rowcount
conn.commit()
cur.close(); conn.close()
return jsonify({'message': f'Deleted {deleted} inventory items'})
Step 2: Commit
git add dashboard/server.py
git commit -m "feat: add inventory upload, mapping, and listing endpoints"
Task 7: Part Availability & Aftermarket Endpoints
Files:
- Modify:
/home/Autopartes/dashboard/server.py
Step 1: Add availability and aftermarket endpoints
Add after inventory endpoints in server.py:
# ─── Part Availability & Aftermarket ────────────────────────
@app.route('/api/parts/<int:part_id>/availability', methods=['GET'])
@require_auth('TALLER', 'ADMIN', 'OWNER')
def part_availability(part_id):
conn = get_db_connection()
cur = conn.cursor()
cur.execute("""
SELECT u.business_name, wi.price, wi.stock_quantity,
wi.warehouse_location, wi.updated_at
FROM warehouse_inventory wi
JOIN users u ON wi.user_id = u.id_user
WHERE wi.part_id = %s AND wi.stock_quantity > 0 AND u.is_active = true
ORDER BY wi.price ASC
""", (part_id,))
results = []
for r in cur.fetchall():
results.append({
'bodega': r[0], 'price': float(r[1]) if r[1] else None,
'stock': r[2], 'location': r[3], 'updated_at': str(r[4])
})
cur.close(); conn.close()
return jsonify({'data': results})
@app.route('/api/parts/<int:part_id>/aftermarket', methods=['GET'])
def part_aftermarket(part_id):
conn = get_db_connection()
cur = conn.cursor()
cur.execute("""
SELECT ap.id_aftermarket_parts, ap.part_number, ap.name_aftermarket_parts,
m.name_manufacture, qt.name_quality, ap.price_usd
FROM aftermarket_parts ap
LEFT JOIN manufacturers m ON ap.manufacturer_id = m.id_manufacture
LEFT JOIN quality_tier qt ON ap.id_quality_tier = qt.id_quality_tier
WHERE ap.oem_part_id = %s
ORDER BY m.name_manufacture
""", (part_id,))
results = []
for r in cur.fetchall():
results.append({
'id': r[0], 'part_number': r[1], 'name': r[2],
'manufacturer': r[3], 'quality_tier': r[4],
'price': float(r[5]) if r[5] else None
})
# Also include cross-references as aftermarket alternatives
cur.execute("""
SELECT pcr.cross_reference_number, pcr.source_ref, pcr.notes
FROM part_cross_references pcr
WHERE pcr.part_id = %s
ORDER BY pcr.source_ref
""", (part_id,))
for r in cur.fetchall():
results.append({
'id': None, 'part_number': r[0], 'name': None,
'manufacturer': r[1], 'quality_tier': None,
'price': None, 'source': 'cross_reference'
})
cur.close(); conn.close()
return jsonify({'data': results})
Step 2: Commit
git add dashboard/server.py
git commit -m "feat: add part availability and aftermarket endpoints"
Task 8: Aftermarket Parts Migration Script
Files:
- Create:
/home/Autopartes/scripts/migrate_aftermarket.py
Step 1: Write migration script
Create /home/Autopartes/scripts/migrate_aftermarket.py:
#!/usr/bin/env python3
"""Migrate AFT- parts from `parts` table into `aftermarket_parts`.
1. Parse AFT-{partNo}-{manufacturer} format
2. Find or create manufacturer
3. Find OEM part via cross-references or vehicle overlap
4. Insert into aftermarket_parts
5. Re-link vehicle_parts to OEM part
6. Delete AFT- part from parts
"""
import psycopg2
from psycopg2.extras import execute_values
DB_URL = "postgresql://nexus:nexus_autoparts_2026@localhost/nexus_autoparts"
BATCH_SIZE = 5000
def parse_aft_key(oem_part_number, manufacturer_set):
"""Parse 'AFT-AC191-PARTQUIP' → ('AC191', 'PARTQUIP').
The manufacturer is the longest suffix match after AFT- that exists in manufacturer_set.
"""
if not oem_part_number.startswith('AFT-'):
return None, None
rest = oem_part_number[4:] # Remove 'AFT-'
# Try splitting from the right — manufacturer is last segment(s)
parts = rest.rsplit('-', 1)
if len(parts) == 2 and parts[1] in manufacturer_set:
return parts[0], parts[1]
# Try multi-word manufacturers (e.g., "MEAT & DORIA" won't have dashes)
# But the format uses dashes as separators, so the manufacturer is always after the last dash
# that leaves a valid manufacturer name
segments = rest.split('-')
for i in range(len(segments) - 1, 0, -1):
candidate_mfr = '-'.join(segments[i:])
candidate_part = '-'.join(segments[:i])
if candidate_mfr in manufacturer_set:
return candidate_part, candidate_mfr
# Fallback: last segment is manufacturer
if len(parts) == 2:
return parts[0], parts[1]
return rest, 'UNKNOWN'
def run():
conn = psycopg2.connect(DB_URL)
cur = conn.cursor()
# Load manufacturer names
print("Loading manufacturers...", flush=True)
cur.execute("SELECT id_manufacture, name_manufacture FROM manufacturers")
mfr_map = {r[1]: r[0] for r in cur.fetchall()}
mfr_set = set(mfr_map.keys())
print(f" {len(mfr_set):,} manufacturers loaded", flush=True)
# Load all AFT- parts
print("Loading AFT- parts...", flush=True)
cur.execute("""
SELECT id_part, oem_part_number, name_part, description, group_id
FROM parts WHERE oem_part_number LIKE 'AFT-%%'
""")
aft_parts = cur.fetchall()
print(f" {len(aft_parts):,} AFT- parts found", flush=True)
stats = {'migrated': 0, 'no_oem': 0, 'mfr_created': 0, 'vp_relinked': 0, 'deleted': 0}
for batch_start in range(0, len(aft_parts), BATCH_SIZE):
batch = aft_parts[batch_start:batch_start + BATCH_SIZE]
aft_inserts = []
parts_to_delete = []
vp_updates = [] # (old_part_id, new_part_id)
for id_part, oem_pn, name_part, desc, group_id in batch:
part_number, mfr_name = parse_aft_key(oem_pn, mfr_set)
if not part_number:
continue
# Ensure manufacturer exists
if mfr_name not in mfr_map:
cur.execute("""
INSERT INTO manufacturers (name_manufacture)
VALUES (%s)
ON CONFLICT (name_manufacture) DO NOTHING
RETURNING id_manufacture
""", (mfr_name,))
row = cur.fetchone()
if row:
mfr_map[mfr_name] = row[0]
mfr_set.add(mfr_name)
stats['mfr_created'] += 1
else:
cur.execute("SELECT id_manufacture FROM manufacturers WHERE name_manufacture = %s", (mfr_name,))
mfr_map[mfr_name] = cur.fetchone()[0]
mfr_id = mfr_map[mfr_name]
# Find OEM part via cross-references
cur.execute("""
SELECT part_id FROM part_cross_references
WHERE cross_reference_number = %s AND source_ref = %s
LIMIT 1
""", (part_number, mfr_name))
oem_row = cur.fetchone()
oem_part_id = None
if oem_row:
oem_part_id = oem_row[0]
else:
# Try finding OEM part linked to same vehicles
cur.execute("""
SELECT DISTINCT vp2.part_id
FROM vehicle_parts vp1
JOIN vehicle_parts vp2 ON vp1.model_year_engine_id = vp2.model_year_engine_id
JOIN parts p ON vp2.part_id = p.id_part
WHERE vp1.part_id = %s
AND p.oem_part_number NOT LIKE 'AFT-%%'
AND p.group_id = %s
LIMIT 1
""", (id_part, group_id))
vp_row = cur.fetchone()
if vp_row:
oem_part_id = vp_row[0]
if not oem_part_id:
stats['no_oem'] += 1
continue
# Queue aftermarket insert
aft_inserts.append((oem_part_id, mfr_id, part_number, name_part))
vp_updates.append((id_part, oem_part_id))
parts_to_delete.append(id_part)
# Batch insert aftermarket_parts
if aft_inserts:
execute_values(cur, """
INSERT INTO aftermarket_parts (oem_part_id, manufacturer_id, part_number, name_aftermarket_parts)
VALUES %s ON CONFLICT DO NOTHING
""", aft_inserts, page_size=1000)
# Re-link vehicle_parts
for old_id, new_id in vp_updates:
cur.execute("""
UPDATE vehicle_parts SET part_id = %s
WHERE part_id = %s
""", (new_id, old_id))
stats['vp_relinked'] += cur.rowcount
# Delete migrated AFT- parts
if parts_to_delete:
# First delete cross-references pointing to these parts
cur.execute(
"DELETE FROM part_cross_references WHERE part_id = ANY(%s)",
(parts_to_delete,)
)
cur.execute(
"DELETE FROM parts WHERE id_part = ANY(%s)",
(parts_to_delete,)
)
stats['deleted'] += len(parts_to_delete)
conn.commit()
stats['migrated'] += len(aft_inserts)
print(f" Batch {batch_start//BATCH_SIZE + 1}: "
f"migrated={stats['migrated']:,} no_oem={stats['no_oem']:,} "
f"deleted={stats['deleted']:,} vp_relinked={stats['vp_relinked']:,}", flush=True)
cur.close()
conn.close()
print(f"\n{'='*50}", flush=True)
print(f"AFTERMARKET MIGRATION COMPLETE", flush=True)
print(f" Migrated to aftermarket_parts: {stats['migrated']:,}", flush=True)
print(f" No OEM match (skipped): {stats['no_oem']:,}", flush=True)
print(f" Manufacturers created: {stats['mfr_created']:,}", flush=True)
print(f" Vehicle_parts re-linked: {stats['vp_relinked']:,}", flush=True)
print(f" AFT- parts deleted: {stats['deleted']:,}", flush=True)
print(f"{'='*50}", flush=True)
if __name__ == "__main__":
run()
Step 2: Commit
git add scripts/migrate_aftermarket.py
git commit -m "feat: add aftermarket migration script — AFT- parts to aftermarket_parts"
Task 9: Fix Import Pipeline (Stop Creating AFT- Parts)
Files:
- Modify:
/home/Autopartes/scripts/import_live.py(find AFT- creation logic) - Modify:
/home/Autopartes/scripts/import_phase1.py(find AFT- creation logic) - Modify:
/home/Autopartes/scripts/link_vehicle_parts.py(update AFT- lookup)
Step 1: Update import_live.py
Change the AFT- part creation to insert into aftermarket_parts instead of parts. Where the script creates AFT-{article_no}-{supplier} in the parts table, replace with:
- Skip creating the AFT- part in
parts - Instead, find the OEM part from
articleOemNo - Insert into
aftermarket_partswith clean part_number and manufacturer
Step 2: Update import_phase1.py
Same change — line 128 area where AFT- keys are created. Replace with aftermarket_parts insert.
Step 3: Update link_vehicle_parts.py
Line 218: Remove the AFT- fallback lookup in part_cache. Vehicle_parts should only link to real OEM parts.
# REMOVE these lines (218-220):
# aft_key = f"AFT-{article_no}-{supplier}"
# pid = part_cache.get(aft_key)
# if pid:
# part_ids.add(pid)
Step 4: Commit
git add scripts/import_live.py scripts/import_phase1.py scripts/link_vehicle_parts.py
git commit -m "fix: stop creating AFT- parts in pipeline, use aftermarket_parts instead"
Task 10: Login Page
Files:
- Create:
/home/Autopartes/dashboard/login.html - Create:
/home/Autopartes/dashboard/login.js - Create:
/home/Autopartes/dashboard/login.css
Step 1: Create login page
Create a login/register page with:
- Email + password login form
- Registration form (name, email, password, business_name, phone, role selector: TALLER/BODEGA)
- Toggle between login/register
- On login success: store tokens in localStorage, redirect based on role:
- ADMIN/OWNER →
/admin.html - BODEGA →
/bodega.html - TALLER →
/index.html
- ADMIN/OWNER →
Step 2: Add static route in server.py
Add route to serve login.html (same pattern as other pages).
Step 3: Commit
git add dashboard/login.html dashboard/login.js dashboard/login.css
git commit -m "feat: add login/register page"
Task 11: Bodega Dashboard Page
Files:
- Create:
/home/Autopartes/dashboard/bodega.html - Create:
/home/Autopartes/dashboard/bodega.js - Create:
/home/Autopartes/dashboard/bodega.css
Step 1: Create bodega panel
Page with 3 tabs:
- Mapeo de columnas — configure which columns in their CSV/Excel map to part_number, price, stock, location
- Subir inventario — file upload with drag & drop, shows progress and results
- Mi inventario — searchable paginated table of their current inventory
Step 2: Add static route in server.py
Step 3: Commit
git add dashboard/bodega.html dashboard/bodega.js dashboard/bodega.css
git commit -m "feat: add bodega dashboard — mapping, upload, inventory view"
Task 12: Update Nav & Catalog for Auth
Files:
- Modify:
/home/Autopartes/dashboard/nav.js - Modify:
/home/Autopartes/dashboard/demo.html(or its JS) - Modify:
/home/Autopartes/dashboard/index.html
Step 1: Update nav.js
Add to the nav bar:
- If token in localStorage → show username + logout button
- If no token → show "Iniciar Sesión" link to
/login.html - Auto-refresh token when near expiry
Step 2: Update catalog part detail
In the part detail modal/section:
- Add "Alternativas aftermarket" section (public, calls
/api/parts/{id}/aftermarket) - Add "Disponibilidad en bodegas" section (only if authenticated TALLER, calls
/api/parts/{id}/availability)
Step 3: Commit
git add dashboard/nav.js dashboard/demo.html dashboard/index.html
git commit -m "feat: add auth to nav, aftermarket + availability to catalog"
Task 13: Admin Users Tab
Files:
- Modify:
/home/Autopartes/dashboard/admin.html - Modify:
/home/Autopartes/dashboard/admin.js
Step 1: Add users tab to admin panel
Add a new tab "Usuarios" to the admin sidebar that:
- Lists all users with name, email, role, business_name, is_active, last_login
- Toggle activate/deactivate button per user
- Badge showing pending (inactive) users count
Step 2: Commit
git add dashboard/admin.html dashboard/admin.js
git commit -m "feat: add users management tab to admin panel"
Task 14: Create Admin User & Test
Step 1: Set password for existing admin
Run:
python3 -c "
import bcrypt, psycopg2
pw = bcrypt.hashpw(b'admin2026', bcrypt.gensalt()).decode()
conn = psycopg2.connect('postgresql://nexus:nexus_autoparts_2026@localhost/nexus_autoparts')
cur = conn.cursor()
cur.execute('UPDATE users SET pass = %s, email = %s, is_active = true WHERE id_rol = 1', (pw, 'admin@nexus.com'))
conn.commit()
print('Admin password set')
"
Step 2: Test auth flow
# Login
curl -X POST http://localhost:5000/api/auth/login \
-H 'Content-Type: application/json' \
-d '{"email":"admin@nexus.com","password":"admin2026"}'
# Register a bodega
curl -X POST http://localhost:5000/api/auth/register \
-H 'Content-Type: application/json' \
-d '{"name":"Test Bodega","email":"bodega@test.com","password":"test123","role":"BODEGA","business_name":"Bodega Central"}'
# Activate the bodega (use admin token)
curl -X PUT http://localhost:5000/api/admin/users/2/activate \
-H 'Authorization: Bearer <admin_token>' \
-H 'Content-Type: application/json' \
-d '{"is_active":true}'
Step 3: Commit
git commit -m "chore: set up admin user credentials"
Task 15: Run Aftermarket Migration
Step 1: Run migration
python3 /home/Autopartes/scripts/migrate_aftermarket.py 2>&1 | tee /tmp/aftermarket_migration.log
Step 2: Verify results
PGPASSWORD=nexus_autoparts_2026 psql -U nexus -d nexus_autoparts -h localhost \
-c "SELECT count(*) as aftermarket_parts FROM aftermarket_parts;" \
-c "SELECT count(*) as remaining_aft FROM parts WHERE oem_part_number LIKE 'AFT-%';" \
-c "SELECT count(*) as total_parts FROM parts;"
Expected: aftermarket_parts populated, remaining AFT- parts minimal (only those without OEM match), total parts reduced.
Step 3: Commit
git commit -m "chore: run aftermarket migration — 357K AFT- parts moved"
Execution Order
The tasks should be executed in this order due to dependencies:
- Task 1 (deps) → Task 2 (schema) → Task 3 (auth module)
- Task 4 (auth endpoints) → Task 5 (admin users)
- Task 6 (inventory endpoints) → Task 7 (availability endpoints)
- Task 8 (aftermarket migration script) → Task 9 (fix pipeline) → Task 15 (run migration)
- Task 10 (login page) → Task 11 (bodega page) → Task 12 (nav update) → Task 13 (admin users tab)
- Task 14 (test)
Tasks 1-3 are sequential. After that, the auth branch (4-5) and aftermarket branch (8-9) can run in parallel. Frontend tasks (10-13) depend on the backend being ready.