Major features: - Pixel-Perfect glassmorphism design (landing + POS + public catalog) - OEM/Local catalog toggle with Nexpart taxonomy (14 groups, 108 subgroups, 558 part types) - Marketplace B2B Phase 1 (bodegas, POs, status machine, WA+email notifications) - Peer-to-peer inventory (multi-instance, LAN discovery) - WhatsApp: photo→Vision AI, voice→Whisper, conversational quotations - Smart unified search (VIN/plate/part_number/keyword auto-detect) - Shop Supplies tab (vehicle-independent parts) - Chatbot AI fallback chain (5 models) + response cache - CSV inventory import tool + setup_instance.sh installer - Tablet-responsive CSS + sidebar toggle - Filters, export CSV, employee edit, business data save - Quotation system (WA→POS) with auto-print on confirmation - Live stats on landing page Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
263 lines
9.6 KiB
Python
Executable File
263 lines
9.6 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Nexus Autoparts — Inventory CSV Import Tool
|
|
|
|
Imports a refaccionaria's inventory from a CSV file into their tenant DB.
|
|
Handles flexible column names (Spanish, English, SICAR format) and creates
|
|
inventory + initial stock operations.
|
|
|
|
Usage:
|
|
python3 import_inventory.py --tenant=12 --csv=inventario.csv
|
|
python3 import_inventory.py --tenant=12 --csv=inventario.csv --branch=1 --dry-run
|
|
|
|
The CSV should have at least these columns (names are flexible):
|
|
part_number / numero_de_parte / codigo / clave / sku
|
|
name / nombre / descripcion
|
|
price / precio / precio_1 / precio_venta
|
|
stock / existencia / cantidad / qty
|
|
|
|
Optional columns:
|
|
brand / marca
|
|
cost / costo / precio_compra
|
|
price_2 / precio_2 / precio_mayoreo
|
|
price_3 / precio_3 / precio_credito
|
|
category / categoria / linea / departamento
|
|
location / ubicacion
|
|
min_stock / minimo / stock_minimo
|
|
unit / unidad (default: PZA)
|
|
barcode / codigo_barras
|
|
"""
|
|
|
|
import argparse
|
|
import csv
|
|
import io
|
|
import os
|
|
import sys
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'pos'))
|
|
from tenant_db import get_tenant_conn
|
|
|
|
|
|
# ─── Column name mapping (flexible) ──────────────────────────────────
|
|
|
|
COLUMN_ALIASES = {
|
|
'part_number': ['part_number', 'numero_de_parte', 'codigo', 'clave', 'sku', 'no_parte', 'num_parte', 'numero', 'code'],
|
|
'name': ['name', 'nombre', 'descripcion', 'description', 'producto', 'articulo'],
|
|
'brand': ['brand', 'marca', 'fabricante', 'manufacturer'],
|
|
'price': ['price', 'precio', 'precio_1', 'precio_venta', 'pvp', 'price_1'],
|
|
'cost': ['cost', 'costo', 'precio_compra', 'costo_unitario'],
|
|
'stock': ['stock', 'existencia', 'cantidad', 'qty', 'quantity', 'inventario', 'saldo'],
|
|
'price_2': ['price_2', 'precio_2', 'precio_mayoreo', 'mayoreo'],
|
|
'price_3': ['price_3', 'precio_3', 'precio_credito', 'credito'],
|
|
'category': ['category', 'categoria', 'linea', 'departamento', 'rubro', 'grupo'],
|
|
'location': ['location', 'ubicacion', 'almacen', 'warehouse'],
|
|
'min_stock': ['min_stock', 'minimo', 'stock_minimo', 'reorden', 'min'],
|
|
'unit': ['unit', 'unidad', 'um'],
|
|
'barcode': ['barcode', 'codigo_barras', 'ean', 'upc'],
|
|
'tax_rate': ['tax_rate', 'iva', 'impuesto', 'tasa_iva'],
|
|
}
|
|
|
|
|
|
def resolve_columns(header: list[str]) -> dict:
|
|
"""Map CSV header names to our standard field names.
|
|
|
|
Returns: {standard_name: csv_column_name} for each matched field.
|
|
"""
|
|
mapping = {}
|
|
normalized = {h.strip().lower().replace(' ', '_'): h for h in header}
|
|
|
|
for field, aliases in COLUMN_ALIASES.items():
|
|
for alias in aliases:
|
|
if alias in normalized:
|
|
mapping[field] = normalized[alias]
|
|
break
|
|
return mapping
|
|
|
|
|
|
def parse_number(val, default=0):
|
|
"""Parse a string that might be '$1,234.56' or '1234.56' or empty."""
|
|
if not val:
|
|
return default
|
|
cleaned = str(val).replace('$', '').replace(',', '').replace(' ', '').strip()
|
|
try:
|
|
return float(cleaned)
|
|
except ValueError:
|
|
return default
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Import inventory CSV into a Nexus tenant')
|
|
parser.add_argument('--tenant', type=int, required=True, help='Tenant ID')
|
|
parser.add_argument('--csv', required=True, help='Path to CSV file')
|
|
parser.add_argument('--branch', type=int, default=1, help='Branch ID (default: 1)')
|
|
parser.add_argument('--dry-run', action='store_true', help='Parse but don\'t insert')
|
|
parser.add_argument('--encoding', default='utf-8', help='CSV encoding (default: utf-8, try latin-1 for SICAR)')
|
|
args = parser.parse_args()
|
|
|
|
# Read CSV
|
|
csv_path = args.csv
|
|
if not os.path.exists(csv_path):
|
|
print(f'ERROR: File not found: {csv_path}')
|
|
sys.exit(1)
|
|
|
|
with open(csv_path, 'r', encoding=args.encoding, errors='replace') as f:
|
|
content = f.read()
|
|
|
|
# Detect delimiter (comma, semicolon, tab)
|
|
first_line = content.split('\n')[0]
|
|
if '\t' in first_line:
|
|
delimiter = '\t'
|
|
elif ';' in first_line and ',' not in first_line:
|
|
delimiter = ';'
|
|
else:
|
|
delimiter = ','
|
|
|
|
reader = csv.DictReader(io.StringIO(content), delimiter=delimiter)
|
|
if not reader.fieldnames:
|
|
print('ERROR: CSV has no header row')
|
|
sys.exit(1)
|
|
|
|
col_map = resolve_columns(reader.fieldnames)
|
|
print(f'CSV columns detected: {list(reader.fieldnames)}')
|
|
print(f'Mapped to: {col_map}')
|
|
print()
|
|
|
|
# Validate required columns
|
|
required = ['part_number', 'name']
|
|
missing = [r for r in required if r not in col_map]
|
|
if missing:
|
|
print(f'ERROR: Required columns not found: {missing}')
|
|
print(f'Available: {list(reader.fieldnames)}')
|
|
sys.exit(1)
|
|
|
|
if 'price' not in col_map:
|
|
print('WARNING: No price column found — all prices will be 0')
|
|
if 'stock' not in col_map:
|
|
print('WARNING: No stock column found — all stock will be 0')
|
|
|
|
# Connect to tenant DB
|
|
conn = get_tenant_conn(args.tenant)
|
|
cur = conn.cursor()
|
|
|
|
# Parse rows
|
|
rows_parsed = 0
|
|
rows_inserted = 0
|
|
rows_updated = 0
|
|
rows_skipped = 0
|
|
errors = []
|
|
|
|
def get_val(row, field, default=''):
|
|
csv_col = col_map.get(field)
|
|
if not csv_col:
|
|
return default
|
|
return (row.get(csv_col) or '').strip()
|
|
|
|
for i, row in enumerate(reader, start=2):
|
|
rows_parsed += 1
|
|
part_number = get_val(row, 'part_number')
|
|
name = get_val(row, 'name')
|
|
|
|
if not part_number and not name:
|
|
rows_skipped += 1
|
|
continue
|
|
|
|
if not part_number:
|
|
part_number = name[:50].upper().replace(' ', '-')
|
|
|
|
price_1 = parse_number(get_val(row, 'price'))
|
|
cost = parse_number(get_val(row, 'cost'))
|
|
stock = int(parse_number(get_val(row, 'stock')))
|
|
brand = get_val(row, 'brand')
|
|
price_2 = parse_number(get_val(row, 'price_2'))
|
|
price_3 = parse_number(get_val(row, 'price_3'))
|
|
category = get_val(row, 'category')
|
|
location = get_val(row, 'location') or 'Principal'
|
|
min_stock = int(parse_number(get_val(row, 'min_stock')))
|
|
unit = get_val(row, 'unit') or 'PZA'
|
|
barcode = get_val(row, 'barcode')
|
|
tax_rate = parse_number(get_val(row, 'tax_rate'), default=0.16)
|
|
|
|
if args.dry_run:
|
|
if rows_parsed <= 5:
|
|
print(f' [DRY] {part_number:20} {name[:30]:30} ${price_1:.2f} stock={stock} brand={brand}')
|
|
continue
|
|
|
|
try:
|
|
# UPSERT by part_number within this branch
|
|
cur.execute("""
|
|
INSERT INTO inventory
|
|
(part_number, name, brand, price_1, price_2, price_3,
|
|
cost, tax_rate, unit, category, location,
|
|
min_stock, barcode, branch_id, is_active)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, TRUE)
|
|
ON CONFLICT (part_number, branch_id)
|
|
DO UPDATE SET
|
|
name = EXCLUDED.name,
|
|
brand = EXCLUDED.brand,
|
|
price_1 = EXCLUDED.price_1,
|
|
price_2 = EXCLUDED.price_2,
|
|
price_3 = EXCLUDED.price_3,
|
|
cost = EXCLUDED.cost,
|
|
tax_rate = EXCLUDED.tax_rate,
|
|
unit = EXCLUDED.unit,
|
|
category = EXCLUDED.category,
|
|
location = EXCLUDED.location,
|
|
min_stock = EXCLUDED.min_stock,
|
|
barcode = EXCLUDED.barcode
|
|
RETURNING id, (xmax = 0) AS was_insert
|
|
""", (part_number, name, brand, price_1, price_2, price_3,
|
|
cost, tax_rate, unit, category, location,
|
|
min_stock, barcode, args.branch))
|
|
|
|
inv_id, was_insert = cur.fetchone()
|
|
|
|
if was_insert:
|
|
rows_inserted += 1
|
|
else:
|
|
rows_updated += 1
|
|
|
|
# Set initial stock via inventory_operations (append-only model)
|
|
if stock > 0 and was_insert:
|
|
cur.execute("""
|
|
INSERT INTO inventory_operations
|
|
(inventory_id, operation_type, quantity, reference, notes)
|
|
VALUES (%s, 'adjustment', %s, 'CSV Import', 'Carga inicial desde CSV')
|
|
""", (inv_id, stock))
|
|
|
|
except Exception as e:
|
|
error_msg = f'Row {i}: {str(e)[:100]}'
|
|
errors.append(error_msg)
|
|
rows_skipped += 1
|
|
conn.rollback()
|
|
continue
|
|
|
|
if not args.dry_run:
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
# Summary
|
|
print()
|
|
print('╔══════════════════════════════════════════════╗')
|
|
print('║ IMPORT COMPLETE ║')
|
|
print('╚══════════════════════════════════════════════╝')
|
|
print(f' Parsed: {rows_parsed}')
|
|
print(f' Inserted: {rows_inserted}')
|
|
print(f' Updated: {rows_updated}')
|
|
print(f' Skipped: {rows_skipped}')
|
|
if errors:
|
|
print(f' Errors: {len(errors)}')
|
|
for e in errors[:10]:
|
|
print(f' • {e}')
|
|
if len(errors) > 10:
|
|
print(f' ... and {len(errors) - 10} more')
|
|
if args.dry_run:
|
|
print()
|
|
print(' (DRY RUN — nothing was written to the database)')
|
|
print()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|