Files
Autoparts-DB/scripts/import_inventory.py
consultoria-as e95f7cf684 feat: complete session — catalog, marketplace, WhatsApp, peer-to-peer, install scripts
Major features:
- Pixel-Perfect glassmorphism design (landing + POS + public catalog)
- OEM/Local catalog toggle with Nexpart taxonomy (14 groups, 108 subgroups, 558 part types)
- Marketplace B2B Phase 1 (bodegas, POs, status machine, WA+email notifications)
- Peer-to-peer inventory (multi-instance, LAN discovery)
- WhatsApp: photo→Vision AI, voice→Whisper, conversational quotations
- Smart unified search (VIN/plate/part_number/keyword auto-detect)
- Shop Supplies tab (vehicle-independent parts)
- Chatbot AI fallback chain (5 models) + response cache
- CSV inventory import tool + setup_instance.sh installer
- Tablet-responsive CSS + sidebar toggle
- Filters, export CSV, employee edit, business data save
- Quotation system (WA→POS) with auto-print on confirmation
- Live stats on landing page

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-18 05:35:53 +00:00

263 lines
9.6 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Nexus Autoparts — Inventory CSV Import Tool
Imports a refaccionaria's inventory from a CSV file into their tenant DB.
Handles flexible column names (Spanish, English, SICAR format) and creates
inventory + initial stock operations.
Usage:
python3 import_inventory.py --tenant=12 --csv=inventario.csv
python3 import_inventory.py --tenant=12 --csv=inventario.csv --branch=1 --dry-run
The CSV should have at least these columns (names are flexible):
part_number / numero_de_parte / codigo / clave / sku
name / nombre / descripcion
price / precio / precio_1 / precio_venta
stock / existencia / cantidad / qty
Optional columns:
brand / marca
cost / costo / precio_compra
price_2 / precio_2 / precio_mayoreo
price_3 / precio_3 / precio_credito
category / categoria / linea / departamento
location / ubicacion
min_stock / minimo / stock_minimo
unit / unidad (default: PZA)
barcode / codigo_barras
"""
import argparse
import csv
import io
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'pos'))
from tenant_db import get_tenant_conn
# ─── Column name mapping (flexible) ──────────────────────────────────
COLUMN_ALIASES = {
'part_number': ['part_number', 'numero_de_parte', 'codigo', 'clave', 'sku', 'no_parte', 'num_parte', 'numero', 'code'],
'name': ['name', 'nombre', 'descripcion', 'description', 'producto', 'articulo'],
'brand': ['brand', 'marca', 'fabricante', 'manufacturer'],
'price': ['price', 'precio', 'precio_1', 'precio_venta', 'pvp', 'price_1'],
'cost': ['cost', 'costo', 'precio_compra', 'costo_unitario'],
'stock': ['stock', 'existencia', 'cantidad', 'qty', 'quantity', 'inventario', 'saldo'],
'price_2': ['price_2', 'precio_2', 'precio_mayoreo', 'mayoreo'],
'price_3': ['price_3', 'precio_3', 'precio_credito', 'credito'],
'category': ['category', 'categoria', 'linea', 'departamento', 'rubro', 'grupo'],
'location': ['location', 'ubicacion', 'almacen', 'warehouse'],
'min_stock': ['min_stock', 'minimo', 'stock_minimo', 'reorden', 'min'],
'unit': ['unit', 'unidad', 'um'],
'barcode': ['barcode', 'codigo_barras', 'ean', 'upc'],
'tax_rate': ['tax_rate', 'iva', 'impuesto', 'tasa_iva'],
}
def resolve_columns(header: list[str]) -> dict:
"""Map CSV header names to our standard field names.
Returns: {standard_name: csv_column_name} for each matched field.
"""
mapping = {}
normalized = {h.strip().lower().replace(' ', '_'): h for h in header}
for field, aliases in COLUMN_ALIASES.items():
for alias in aliases:
if alias in normalized:
mapping[field] = normalized[alias]
break
return mapping
def parse_number(val, default=0):
"""Parse a string that might be '$1,234.56' or '1234.56' or empty."""
if not val:
return default
cleaned = str(val).replace('$', '').replace(',', '').replace(' ', '').strip()
try:
return float(cleaned)
except ValueError:
return default
def main():
parser = argparse.ArgumentParser(description='Import inventory CSV into a Nexus tenant')
parser.add_argument('--tenant', type=int, required=True, help='Tenant ID')
parser.add_argument('--csv', required=True, help='Path to CSV file')
parser.add_argument('--branch', type=int, default=1, help='Branch ID (default: 1)')
parser.add_argument('--dry-run', action='store_true', help='Parse but don\'t insert')
parser.add_argument('--encoding', default='utf-8', help='CSV encoding (default: utf-8, try latin-1 for SICAR)')
args = parser.parse_args()
# Read CSV
csv_path = args.csv
if not os.path.exists(csv_path):
print(f'ERROR: File not found: {csv_path}')
sys.exit(1)
with open(csv_path, 'r', encoding=args.encoding, errors='replace') as f:
content = f.read()
# Detect delimiter (comma, semicolon, tab)
first_line = content.split('\n')[0]
if '\t' in first_line:
delimiter = '\t'
elif ';' in first_line and ',' not in first_line:
delimiter = ';'
else:
delimiter = ','
reader = csv.DictReader(io.StringIO(content), delimiter=delimiter)
if not reader.fieldnames:
print('ERROR: CSV has no header row')
sys.exit(1)
col_map = resolve_columns(reader.fieldnames)
print(f'CSV columns detected: {list(reader.fieldnames)}')
print(f'Mapped to: {col_map}')
print()
# Validate required columns
required = ['part_number', 'name']
missing = [r for r in required if r not in col_map]
if missing:
print(f'ERROR: Required columns not found: {missing}')
print(f'Available: {list(reader.fieldnames)}')
sys.exit(1)
if 'price' not in col_map:
print('WARNING: No price column found — all prices will be 0')
if 'stock' not in col_map:
print('WARNING: No stock column found — all stock will be 0')
# Connect to tenant DB
conn = get_tenant_conn(args.tenant)
cur = conn.cursor()
# Parse rows
rows_parsed = 0
rows_inserted = 0
rows_updated = 0
rows_skipped = 0
errors = []
def get_val(row, field, default=''):
csv_col = col_map.get(field)
if not csv_col:
return default
return (row.get(csv_col) or '').strip()
for i, row in enumerate(reader, start=2):
rows_parsed += 1
part_number = get_val(row, 'part_number')
name = get_val(row, 'name')
if not part_number and not name:
rows_skipped += 1
continue
if not part_number:
part_number = name[:50].upper().replace(' ', '-')
price_1 = parse_number(get_val(row, 'price'))
cost = parse_number(get_val(row, 'cost'))
stock = int(parse_number(get_val(row, 'stock')))
brand = get_val(row, 'brand')
price_2 = parse_number(get_val(row, 'price_2'))
price_3 = parse_number(get_val(row, 'price_3'))
category = get_val(row, 'category')
location = get_val(row, 'location') or 'Principal'
min_stock = int(parse_number(get_val(row, 'min_stock')))
unit = get_val(row, 'unit') or 'PZA'
barcode = get_val(row, 'barcode')
tax_rate = parse_number(get_val(row, 'tax_rate'), default=0.16)
if args.dry_run:
if rows_parsed <= 5:
print(f' [DRY] {part_number:20} {name[:30]:30} ${price_1:.2f} stock={stock} brand={brand}')
continue
try:
# UPSERT by part_number within this branch
cur.execute("""
INSERT INTO inventory
(part_number, name, brand, price_1, price_2, price_3,
cost, tax_rate, unit, category, location,
min_stock, barcode, branch_id, is_active)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, TRUE)
ON CONFLICT (part_number, branch_id)
DO UPDATE SET
name = EXCLUDED.name,
brand = EXCLUDED.brand,
price_1 = EXCLUDED.price_1,
price_2 = EXCLUDED.price_2,
price_3 = EXCLUDED.price_3,
cost = EXCLUDED.cost,
tax_rate = EXCLUDED.tax_rate,
unit = EXCLUDED.unit,
category = EXCLUDED.category,
location = EXCLUDED.location,
min_stock = EXCLUDED.min_stock,
barcode = EXCLUDED.barcode
RETURNING id, (xmax = 0) AS was_insert
""", (part_number, name, brand, price_1, price_2, price_3,
cost, tax_rate, unit, category, location,
min_stock, barcode, args.branch))
inv_id, was_insert = cur.fetchone()
if was_insert:
rows_inserted += 1
else:
rows_updated += 1
# Set initial stock via inventory_operations (append-only model)
if stock > 0 and was_insert:
cur.execute("""
INSERT INTO inventory_operations
(inventory_id, operation_type, quantity, reference, notes)
VALUES (%s, 'adjustment', %s, 'CSV Import', 'Carga inicial desde CSV')
""", (inv_id, stock))
except Exception as e:
error_msg = f'Row {i}: {str(e)[:100]}'
errors.append(error_msg)
rows_skipped += 1
conn.rollback()
continue
if not args.dry_run:
conn.commit()
cur.close()
conn.close()
# Summary
print()
print('╔══════════════════════════════════════════════╗')
print('║ IMPORT COMPLETE ║')
print('╚══════════════════════════════════════════════╝')
print(f' Parsed: {rows_parsed}')
print(f' Inserted: {rows_inserted}')
print(f' Updated: {rows_updated}')
print(f' Skipped: {rows_skipped}')
if errors:
print(f' Errors: {len(errors)}')
for e in errors[:10]:
print(f'{e}')
if len(errors) > 10:
print(f' ... and {len(errors) - 10} more')
if args.dry_run:
print()
print(' (DRY RUN — nothing was written to the database)')
print()
if __name__ == '__main__':
main()