FASE 4: - Redis cache de stock con fallback graceful - Multi-moneda (MXN/USD) con contabilidad en MXN - Proveedores y ordenes de compra completo - Meilisearch 1.5M+ partes indexadas - Metabase KPIs con dashboard auto-generado FASE 5: - CRM mejorado: activities, tags, loyalty program, analytics - Imagenes de partes: upload, resize, thumbnails WebP - Ordenes de servicio Kanban: received->diagnosis->repair->ready->delivered - Garantias/RMA, alertas de reorden, multi-sucursal - Stubs BNPL (APLAZO) y ERP Sync (Aspel/Contpaqi) FASE 6: - Notificaciones automaticas: push/WhatsApp/email/in-app - Reportes de ahorro vs retail_price - Logistica + tracking: DHL, FedEx, Estafeta, 99min, Uber - API Publica: API keys, rate limiting, catalog search Migraciones: v1.9-v3.0 Tests: 93/93 pasando Backup: nexus_backup_20260427_045859.tar.gz
423 lines
14 KiB
Python
423 lines
14 KiB
Python
"""Notification Engine: event-driven notifications via push, email, WhatsApp, in-app.
|
|
|
|
Integrates with existing push_service.py for Web Push.
|
|
Supports template rendering with Jinja2-style variable substitution.
|
|
"""
|
|
|
|
import re
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
|
|
|
|
def _render_template(template, context):
|
|
"""Simple variable substitution: {var_name} -> value."""
|
|
if not template:
|
|
return ''
|
|
result = template
|
|
for key, value in context.items():
|
|
if value is None:
|
|
value = ''
|
|
result = result.replace(f'{{{key}}}', str(value))
|
|
return result
|
|
|
|
|
|
def get_templates(conn, tenant_id, event_type=None, channel=None):
|
|
cur = conn.cursor()
|
|
params = [tenant_id]
|
|
filters = "tenant_id = %s"
|
|
if event_type:
|
|
filters += " AND event_type = %s"
|
|
params.append(event_type)
|
|
if channel:
|
|
filters += " AND channel = %s"
|
|
params.append(channel)
|
|
|
|
cur.execute(f"""
|
|
SELECT id, event_type, channel, name, subject_template, body_template, is_active
|
|
FROM notification_templates
|
|
WHERE {filters}
|
|
ORDER BY event_type, channel
|
|
""", params)
|
|
|
|
templates = []
|
|
for r in cur.fetchall():
|
|
templates.append({
|
|
'id': r[0], 'event_type': r[1], 'channel': r[2], 'name': r[3],
|
|
'subject_template': r[4], 'body_template': r[5], 'is_active': r[6],
|
|
})
|
|
cur.close()
|
|
return templates
|
|
|
|
|
|
def create_template(conn, tenant_id, event_type, channel, name, body_template,
|
|
subject_template=None, is_active=True):
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
INSERT INTO notification_templates
|
|
(tenant_id, event_type, channel, name, subject_template, body_template, is_active)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (tenant_id, event_type, channel) DO UPDATE SET
|
|
name = EXCLUDED.name,
|
|
subject_template = EXCLUDED.subject_template,
|
|
body_template = EXCLUDED.body_template,
|
|
is_active = EXCLUDED.is_active,
|
|
updated_at = NOW()
|
|
RETURNING id
|
|
""", (tenant_id, event_type, channel, name, subject_template, body_template, is_active))
|
|
tid = cur.fetchone()[0]
|
|
conn.commit()
|
|
cur.close()
|
|
return tid
|
|
|
|
|
|
def update_template(conn, template_id, data):
|
|
cur = conn.cursor()
|
|
allowed = ['event_type', 'channel', 'name', 'subject_template', 'body_template', 'is_active']
|
|
sets = []
|
|
vals = []
|
|
for field in allowed:
|
|
if field in data:
|
|
sets.append(f"{field} = %s")
|
|
vals.append(data[field])
|
|
if not sets:
|
|
cur.close()
|
|
return False
|
|
vals.append(template_id)
|
|
cur.execute(f"UPDATE notification_templates SET {', '.join(sets)} WHERE id = %s", vals)
|
|
conn.commit()
|
|
cur.close()
|
|
return True
|
|
|
|
|
|
# ─── Event Dispatch ─────────────────────────────
|
|
|
|
def dispatch_notification(conn, tenant_id, event_type, context, recipient_type='owner',
|
|
recipient_id=None, channels=None):
|
|
"""Dispatch a notification event to all configured channels.
|
|
|
|
Args:
|
|
conn: DB connection
|
|
tenant_id: tenant ID
|
|
event_type: e.g. 'low_stock', 'order_ready'
|
|
context: dict with template variables
|
|
recipient_type: 'owner', 'employee', 'customer', 'role'
|
|
recipient_id: specific recipient ID
|
|
channels: list of channels to use, or None for all active templates
|
|
|
|
Returns:
|
|
list of notification log IDs
|
|
"""
|
|
cur = conn.cursor()
|
|
|
|
# Get active templates for this event
|
|
if channels:
|
|
placeholders = ','.join(['%s'] * len(channels))
|
|
cur.execute(f"""
|
|
SELECT id, channel, subject_template, body_template
|
|
FROM notification_templates
|
|
WHERE tenant_id = %s AND event_type = %s AND is_active = true
|
|
AND channel IN ({placeholders})
|
|
""", [tenant_id, event_type] + list(channels))
|
|
else:
|
|
cur.execute("""
|
|
SELECT id, channel, subject_template, body_template
|
|
FROM notification_templates
|
|
WHERE tenant_id = %s AND event_type = %s AND is_active = true
|
|
""", (tenant_id, event_type))
|
|
|
|
templates = cur.fetchall()
|
|
if not templates:
|
|
cur.close()
|
|
return []
|
|
|
|
log_ids = []
|
|
for tid, channel, subject_tmpl, body_tmpl in templates:
|
|
subject = _render_template(subject_tmpl, context)
|
|
body = _render_template(body_tmpl, context)
|
|
|
|
# Insert log as pending
|
|
cur.execute("""
|
|
INSERT INTO notification_logs
|
|
(tenant_id, recipient_type, recipient_id, event_type, channel,
|
|
subject, body, status, metadata)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending', %s)
|
|
RETURNING id
|
|
""", (tenant_id, recipient_type, recipient_id, event_type, channel,
|
|
subject, body, json.dumps(context) if isinstance(context, dict) else None))
|
|
log_id = cur.fetchone()[0]
|
|
log_ids.append(log_id)
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
|
|
# Send asynchronously (in production, this would go to a queue)
|
|
for log_id in log_ids:
|
|
_send_notification(conn, log_id)
|
|
|
|
return log_ids
|
|
|
|
|
|
def _send_notification(conn, log_id):
|
|
"""Send a single notification by its log entry."""
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
SELECT channel, subject, body, recipient_type, recipient_id, metadata, tenant_id
|
|
FROM notification_logs WHERE id = %s
|
|
""", (log_id,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
cur.close()
|
|
return
|
|
|
|
channel, subject, body, recipient_type, recipient_id, metadata, tenant_id = row
|
|
|
|
try:
|
|
if channel == 'push':
|
|
_send_push(conn, tenant_id, recipient_type, recipient_id, subject, body, metadata)
|
|
elif channel == 'whatsapp':
|
|
_send_whatsapp(conn, tenant_id, recipient_type, recipient_id, body, metadata)
|
|
elif channel == 'email':
|
|
_send_email(conn, tenant_id, recipient_type, recipient_id, subject, body, metadata)
|
|
elif channel == 'in_app':
|
|
# In-app is just the log entry; UI polls notification_logs
|
|
pass
|
|
|
|
cur.execute("""
|
|
UPDATE notification_logs SET status = 'sent', sent_at = NOW() WHERE id = %s
|
|
""", (log_id,))
|
|
conn.commit()
|
|
except Exception as e:
|
|
conn.rollback()
|
|
cur2 = conn.cursor()
|
|
try:
|
|
cur2.execute("""
|
|
UPDATE notification_logs SET status = 'failed', error_message = %s WHERE id = %s
|
|
""", (str(e)[:500], log_id))
|
|
conn.commit()
|
|
except Exception:
|
|
conn.rollback()
|
|
finally:
|
|
cur2.close()
|
|
finally:
|
|
cur.close()
|
|
|
|
|
|
def _send_push(conn, tenant_id, recipient_type, recipient_id, title, body, metadata):
|
|
"""Send Web Push notification."""
|
|
try:
|
|
from services.push_service import notify_owner
|
|
# Try to find push subscriptions for recipient
|
|
cur = conn.cursor()
|
|
if recipient_type == 'owner':
|
|
cur.execute("""
|
|
SELECT s.subscription_json
|
|
FROM push_subscriptions s
|
|
JOIN employees e ON s.employee_id = e.id
|
|
WHERE e.tenant_id = %s AND e.role = 'owner' AND s.is_active = true
|
|
""", (tenant_id,))
|
|
elif recipient_type == 'employee' and recipient_id:
|
|
cur.execute("""
|
|
SELECT subscription_json FROM push_subscriptions
|
|
WHERE employee_id = %s AND is_active = true
|
|
""", (recipient_id,))
|
|
else:
|
|
cur.close()
|
|
return
|
|
|
|
subs = [r[0] for r in cur.fetchall()]
|
|
cur.close()
|
|
|
|
if not subs:
|
|
return
|
|
|
|
# Send to all subscriptions
|
|
for sub_json in subs:
|
|
try:
|
|
from services.push_service import send_push
|
|
send_push(sub_json, title, body, metadata)
|
|
except Exception:
|
|
pass
|
|
except ImportError:
|
|
pass
|
|
|
|
|
|
def _send_whatsapp(conn, tenant_id, recipient_type, recipient_id, body, metadata):
|
|
"""Send WhatsApp notification via existing service."""
|
|
try:
|
|
from services.whatsapp_service import send_message
|
|
cur = conn.cursor()
|
|
phone = None
|
|
if recipient_type == 'customer' and recipient_id:
|
|
cur.execute("SELECT phone FROM customers WHERE id = %s", (recipient_id,))
|
|
row = cur.fetchone()
|
|
if row:
|
|
phone = row[0]
|
|
cur.close()
|
|
|
|
if phone:
|
|
send_message(phone, body)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _send_email(conn, tenant_id, recipient_type, recipient_id, subject, body, metadata):
|
|
"""Send email notification. Stub — requires SMTP config."""
|
|
# Stub: would integrate with SMTP or SendGrid/Postmark
|
|
pass
|
|
|
|
|
|
# ─── Convenience Event Dispatchers ─────────────────────────────
|
|
|
|
def notify_low_stock(conn, tenant_id, inventory_id, stock, reorder_point, branch_id=None):
|
|
"""Notify when stock is below reorder point."""
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT part_number, name FROM inventory WHERE id = %s", (inventory_id,))
|
|
row = cur.fetchone()
|
|
cur.close()
|
|
if not row:
|
|
return []
|
|
|
|
return dispatch_notification(
|
|
conn, tenant_id, 'low_stock',
|
|
{
|
|
'part_number': row[0], 'part_name': row[1],
|
|
'stock': stock, 'reorder_point': reorder_point,
|
|
'inventory_id': inventory_id,
|
|
},
|
|
recipient_type='owner',
|
|
)
|
|
|
|
|
|
def notify_order_ready(conn, tenant_id, service_order_id, customer_id):
|
|
"""Notify customer when service order is ready."""
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
SELECT order_number, c.name, c.phone
|
|
FROM service_orders so
|
|
LEFT JOIN customers c ON so.customer_id = c.id
|
|
WHERE so.id = %s
|
|
""", (service_order_id,))
|
|
row = cur.fetchone()
|
|
cur.close()
|
|
if not row:
|
|
return []
|
|
|
|
order_number, customer_name, phone = row
|
|
|
|
# Push to owners
|
|
push_ids = dispatch_notification(
|
|
conn, tenant_id, 'order_ready',
|
|
{'order_number': order_number, 'customer_name': customer_name or 'Cliente'},
|
|
recipient_type='owner',
|
|
)
|
|
|
|
# WhatsApp to customer if phone exists
|
|
if phone:
|
|
wa_ids = dispatch_notification(
|
|
conn, tenant_id, 'order_ready',
|
|
{'order_number': order_number, 'customer_name': customer_name or 'Cliente'},
|
|
recipient_type='customer', recipient_id=customer_id,
|
|
channels=['whatsapp'],
|
|
)
|
|
return push_ids + wa_ids
|
|
|
|
return push_ids
|
|
|
|
|
|
def notify_maintenance_due(conn, tenant_id, vehicle_id, schedule_id):
|
|
"""Notify when vehicle maintenance is due."""
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
SELECT v.plate, v.current_mileage, s.maintenance_type, s.next_due_km, s.next_due_at
|
|
FROM fleet_vehicles v
|
|
JOIN fleet_maintenance_schedules s ON s.vehicle_id = v.id
|
|
WHERE v.id = %s AND s.id = %s
|
|
""", (vehicle_id, schedule_id))
|
|
row = cur.fetchone()
|
|
cur.close()
|
|
if not row:
|
|
return []
|
|
|
|
plate, mileage, mtype, next_km, next_at = row
|
|
return dispatch_notification(
|
|
conn, tenant_id, 'maintenance_due',
|
|
{
|
|
'vehicle_plate': plate, 'current_mileage': mileage or 0,
|
|
'maintenance_type': mtype,
|
|
'next_due_km': next_km or 'N/A',
|
|
'next_due_date': str(next_at)[:10] if next_at else 'N/A',
|
|
},
|
|
recipient_type='owner',
|
|
)
|
|
|
|
|
|
def notify_new_sale(conn, tenant_id, sale_id, total, payment_method, employee_id=None):
|
|
"""Notify owners of new sale."""
|
|
return dispatch_notification(
|
|
conn, tenant_id, 'new_sale',
|
|
{
|
|
'sale_id': sale_id, 'total': f"${float(total):,.2f}",
|
|
'payment_method': payment_method or 'N/A',
|
|
},
|
|
recipient_type='owner',
|
|
)
|
|
|
|
|
|
def notify_po_received(conn, tenant_id, po_id, total):
|
|
"""Notify when purchase order is received."""
|
|
return dispatch_notification(
|
|
conn, tenant_id, 'po_received',
|
|
{'po_id': po_id, 'total': f"${float(total):,.2f}"},
|
|
recipient_type='owner',
|
|
)
|
|
|
|
|
|
def get_notification_logs(conn, tenant_id, recipient_type=None, recipient_id=None,
|
|
status=None, event_type=None, limit=50):
|
|
cur = conn.cursor()
|
|
where = ["tenant_id = %s"]
|
|
params = [tenant_id]
|
|
if recipient_type:
|
|
where.append("recipient_type = %s")
|
|
params.append(recipient_type)
|
|
if recipient_id:
|
|
where.append("recipient_id = %s")
|
|
params.append(recipient_id)
|
|
if status:
|
|
where.append("status = %s")
|
|
params.append(status)
|
|
if event_type:
|
|
where.append("event_type = %s")
|
|
params.append(event_type)
|
|
|
|
cur.execute(f"""
|
|
SELECT id, event_type, channel, subject, body, status,
|
|
sent_at, read_at, created_at
|
|
FROM notification_logs
|
|
WHERE {' AND '.join(where)}
|
|
ORDER BY created_at DESC
|
|
LIMIT %s
|
|
""", params + [limit])
|
|
|
|
logs = []
|
|
for r in cur.fetchall():
|
|
logs.append({
|
|
'id': r[0], 'event_type': r[1], 'channel': r[2],
|
|
'subject': r[3], 'body': r[4], 'status': r[5],
|
|
'sent_at': str(r[6]) if r[6] else None,
|
|
'read_at': str(r[7]) if r[7] else None,
|
|
'created_at': str(r[8]),
|
|
})
|
|
cur.close()
|
|
return logs
|
|
|
|
|
|
def mark_as_read(conn, log_id):
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
UPDATE notification_logs SET status = 'read', read_at = NOW() WHERE id = %s
|
|
""", (log_id,))
|
|
conn.commit()
|
|
cur.close()
|
|
return True
|