FASE 4: - Redis cache de stock con fallback graceful - Multi-moneda (MXN/USD) con contabilidad en MXN - Proveedores y ordenes de compra completo - Meilisearch 1.5M+ partes indexadas - Metabase KPIs con dashboard auto-generado FASE 5: - CRM mejorado: activities, tags, loyalty program, analytics - Imagenes de partes: upload, resize, thumbnails WebP - Ordenes de servicio Kanban: received->diagnosis->repair->ready->delivered - Garantias/RMA, alertas de reorden, multi-sucursal - Stubs BNPL (APLAZO) y ERP Sync (Aspel/Contpaqi) FASE 6: - Notificaciones automaticas: push/WhatsApp/email/in-app - Reportes de ahorro vs retail_price - Logistica + tracking: DHL, FedEx, Estafeta, 99min, Uber - API Publica: API keys, rate limiting, catalog search Migraciones: v1.9-v3.0 Tests: 93/93 pasando Backup: nexus_backup_20260427_045859.tar.gz
254 lines
9.1 KiB
Python
254 lines
9.1 KiB
Python
#!/usr/bin/env python3
|
|
"""FASE 6 — VALIDATION SUITE
|
|
|
|
Tests:
|
|
1. Notifications: templates, dispatch, logs
|
|
2. Savings: retail price, calculation, reports
|
|
3. Logistics: couriers, shipments, tracking
|
|
4. Public API: keys, validation, rate limiting
|
|
"""
|
|
|
|
import os, sys, time
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from tenant_db import get_master_conn, get_tenant_conn_by_dbname
|
|
from services.notification_engine import (
|
|
get_templates, create_template, dispatch_notification,
|
|
get_notification_logs, mark_as_read, notify_low_stock,
|
|
)
|
|
from services.savings_engine import (
|
|
calculate_item_savings, record_sale_savings,
|
|
get_customer_savings_report, get_global_savings_stats,
|
|
)
|
|
from services.logistics_engine import (
|
|
create_shipment, get_shipment, list_shipments,
|
|
update_shipment_status, get_couriers, add_courier,
|
|
)
|
|
from services.public_api_engine import (
|
|
create_api_key, validate_api_key, check_rate_limit,
|
|
increment_rate_limit, list_api_keys, revoke_api_key,
|
|
)
|
|
|
|
MASTER_DB_URL = os.environ.get('MASTER_DB_URL', 'postgresql://postgres@/nexus_autoparts')
|
|
TENANT_TEMPLATE = os.environ.get('TENANT_DB_URL_TEMPLATE', 'postgresql://postgres@/{db_name}')
|
|
|
|
PASS = '\033[92mPASS\033[0m'
|
|
FAIL = '\033[91mFAIL\033[0m'
|
|
passed = 0
|
|
failed = 0
|
|
|
|
|
|
def ok(label, condition, detail=''):
|
|
global passed, failed
|
|
if condition:
|
|
print(f" [{PASS}] {label}")
|
|
passed += 1
|
|
else:
|
|
print(f" [{FAIL}] {label} {detail}")
|
|
failed += 1
|
|
|
|
|
|
# Get a test tenant
|
|
master = get_master_conn()
|
|
cur = master.cursor()
|
|
cur.execute("SELECT db_name FROM tenants WHERE is_active = true ORDER BY id LIMIT 1")
|
|
row = cur.fetchone()
|
|
cur.close(); master.close()
|
|
|
|
if not row:
|
|
print("No active tenant found!")
|
|
sys.exit(1)
|
|
|
|
db_name = row[0]
|
|
conn = get_tenant_conn_by_dbname(db_name)
|
|
|
|
print("=" * 60)
|
|
print("FASE 6: Notificaciones + Ahorro + Logística + API Pública")
|
|
print("=" * 60)
|
|
|
|
# ─── NOTIFICATIONS ─────────────────────────────
|
|
print("\n[NOTIFICACIONES]")
|
|
try:
|
|
# Templates
|
|
templates = get_templates(conn, 1, event_type='low_stock')
|
|
ok("Get templates", len(templates) >= 1, f"count={len(templates)}")
|
|
|
|
# Create custom template
|
|
tid = create_template(conn, 1, 'test_event', 'push', 'Test Template',
|
|
'Hello {name}', subject_template='Test: {name}')
|
|
if not tid:
|
|
# Template may already exist from previous run
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT id FROM notification_templates WHERE tenant_id = 1 AND event_type = 'test_event' AND channel = 'push'")
|
|
row = cur.fetchone()
|
|
tid = row[0] if row else None
|
|
cur.close()
|
|
ok("Create template", tid is not None)
|
|
|
|
# Dispatch
|
|
log_ids = dispatch_notification(conn, 1, 'test_event', {'name': 'World'}, recipient_type='owner')
|
|
ok("Dispatch notification", len(log_ids) >= 1)
|
|
|
|
# Logs
|
|
logs = get_notification_logs(conn, 1, event_type='test_event')
|
|
ok("Get logs", len(logs) >= 1)
|
|
|
|
if logs:
|
|
mark_as_read(conn, logs[0]['id'])
|
|
logs2 = get_notification_logs(conn, 1, status='read')
|
|
ok("Mark as read", any(l['id'] == logs[0]['id'] for l in logs2))
|
|
|
|
# Low stock convenience
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT id FROM inventory WHERE is_active = true LIMIT 1")
|
|
inv_row = cur.fetchone()
|
|
cur.close()
|
|
if inv_row:
|
|
ls_ids = notify_low_stock(conn, 1, inv_row[0], stock=2, reorder_point=5)
|
|
ok("Low stock notify", len(ls_ids) >= 1)
|
|
except Exception as e:
|
|
conn.rollback()
|
|
ok("Notifications", False, str(e))
|
|
|
|
# ─── SAVINGS ─────────────────────────────
|
|
print("\n[AHORRO]")
|
|
try:
|
|
# Calculation
|
|
savings, pct = calculate_item_savings(100, 150, 2)
|
|
ok("Calculate savings", savings == 100.0 and pct == 33.3, f"savings={savings}, pct={pct}")
|
|
|
|
# Zero retail price
|
|
s2, p2 = calculate_item_savings(100, 0, 1)
|
|
ok("Zero retail price", s2 == 0.0 and p2 == 0.0)
|
|
|
|
# Set retail price on inventory
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT id, price_1 FROM inventory WHERE is_active = true LIMIT 1")
|
|
inv_row = cur.fetchone()
|
|
if inv_row:
|
|
inv_id, price = inv_row
|
|
retail = float(price) * 1.5 if price else 200.00
|
|
cur.execute("UPDATE inventory SET retail_price = %s WHERE id = %s", (retail, inv_id))
|
|
conn.commit()
|
|
|
|
# Create a sale with that item to test record_sale_savings
|
|
cur.execute("""
|
|
INSERT INTO sales (branch_id, customer_id, employee_id, subtotal, discount_total, tax_total, total, payment_method, status, sale_type)
|
|
VALUES (NULL, NULL, NULL, 100, 0, 16, 116, 'efectivo', 'completed', 'cash')
|
|
RETURNING id
|
|
""")
|
|
sale_id = cur.fetchone()[0]
|
|
cur.execute("""
|
|
INSERT INTO sale_items (sale_id, inventory_id, part_number, name, quantity, unit_price, unit_cost, discount_pct, discount_amount, tax_rate, tax_amount, subtotal, retail_price)
|
|
VALUES (%s, %s, 'TEST', 'Test Item', 1, 100, 50, 0, 0, 0.16, 16, 100, %s)
|
|
""", (sale_id, inv_id, retail))
|
|
conn.commit()
|
|
cur.close()
|
|
|
|
total_saved = record_sale_savings(conn, sale_id)
|
|
ok("Record sale savings", total_saved > 0, f"saved={total_saved}")
|
|
|
|
# Check sale has savings
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT total_savings FROM sales WHERE id = %s", (sale_id,))
|
|
sale_savings = cur.fetchone()[0]
|
|
cur.close()
|
|
ok("Sale savings recorded", sale_savings is not None and float(sale_savings) > 0, f"sale_savings={sale_savings}")
|
|
else:
|
|
cur.close()
|
|
ok("Savings inventory", False, "No inventory item found")
|
|
|
|
# Global stats
|
|
stats = get_global_savings_stats(conn, 1)
|
|
ok("Global savings stats", isinstance(stats, dict) and 'total_saved' in stats)
|
|
except Exception as e:
|
|
conn.rollback()
|
|
ok("Savings", False, str(e))
|
|
|
|
# ─── LOGISTICS ─────────────────────────────
|
|
print("\n[LOGÍSTICA]")
|
|
try:
|
|
couriers = get_couriers(conn, 1)
|
|
ok("Default couriers", len(couriers) >= 4, f"count={len(couriers)}")
|
|
|
|
# Add courier
|
|
courier_code = f'test_courier_{int(time.time())}'
|
|
cid = add_courier(conn, 1, 'Test Courier', courier_code,
|
|
tracking_url_template='https://track.example.com/{tracking_number}')
|
|
ok("Add courier", cid is not None)
|
|
|
|
# Create shipment
|
|
result = create_shipment(conn, {
|
|
'tenant_id': 1, 'shipment_type': 'outbound', 'related_type': 'sale', 'related_id': 1,
|
|
'courier_id': cid, 'tracking_number': 'TEST123456',
|
|
'destination_address': 'Calle Falsa 123, CDMX',
|
|
'recipient_name': 'Juan Pérez', 'recipient_phone': '5551234567',
|
|
'shipping_cost': 150.00, 'notes': 'Test shipment',
|
|
})
|
|
ok("Create shipment", result.get('shipment_id') is not None and result.get('tracking_url') is not None)
|
|
shipment_id = result['shipment_id']
|
|
|
|
# Get shipment
|
|
ship = get_shipment(conn, shipment_id)
|
|
ok("Get shipment", ship is not None and ship['status'] == 'pending')
|
|
|
|
# Update status
|
|
update_shipment_status(conn, shipment_id, 'in_transit', location='CDMX Hub',
|
|
description='Paquete en tránsito')
|
|
ship2 = get_shipment(conn, shipment_id)
|
|
ok("Update status", ship2['status'] == 'in_transit')
|
|
ok("Tracking history", len(ship2.get('tracking_history', [])) >= 2)
|
|
|
|
# List shipments
|
|
shipments = list_shipments(conn, 1)
|
|
ok("List shipments", shipments.get('data') is not None)
|
|
except Exception as e:
|
|
conn.rollback()
|
|
ok("Logistics", False, str(e))
|
|
|
|
# ─── PUBLIC API ─────────────────────────────
|
|
print("\n[API PÚBLICA]")
|
|
try:
|
|
# Create key
|
|
key_id, full_key = create_api_key(conn, 1, 'Test Key', scopes=['read', 'write'])
|
|
ok("Create API key", key_id is not None and full_key.startswith('nx_'))
|
|
|
|
# Validate key
|
|
info = validate_api_key(conn, full_key)
|
|
ok("Validate key", info is not None and info.get('valid') is True)
|
|
ok("Key scopes", 'read' in info.get('scopes', []))
|
|
|
|
# Invalid key
|
|
bad = validate_api_key(conn, 'nx_invalid_key_here')
|
|
ok("Invalid key rejected", bad is None or bad.get('valid') is False)
|
|
|
|
# Rate limit check
|
|
allowed, headers = check_rate_limit(conn, key_id, 60, 10000)
|
|
ok("Rate limit check", allowed is True)
|
|
ok("Rate limit headers", 'X-RateLimit-Limit-Minute' in headers)
|
|
|
|
# Increment and check again
|
|
increment_rate_limit(conn, key_id)
|
|
allowed2, _ = check_rate_limit(conn, key_id, 60, 10000)
|
|
ok("Rate limit increment", allowed2 is True)
|
|
|
|
# Revoke
|
|
revoke_api_key(conn, key_id)
|
|
revoked = validate_api_key(conn, full_key)
|
|
ok("Revoke key", revoked is not None and revoked.get('valid') is False)
|
|
|
|
# List keys
|
|
keys = list_api_keys(conn, 1)
|
|
ok("List keys", isinstance(keys, list))
|
|
except Exception as e:
|
|
conn.rollback()
|
|
ok("Public API", False, str(e))
|
|
|
|
conn.close()
|
|
|
|
print("\n" + "=" * 60)
|
|
print(f"RESULTS: {PASS} {passed} passed, {FAIL} {failed} failed")
|
|
print("=" * 60)
|
|
|
|
sys.exit(0 if failed == 0 else 1)
|