Files
Autoparts-DB/pos/services/notification_engine.py
Nexus Dev 9ff3dc4c8b FASE 4-5-6: Infraestructura, CRM, Service Orders, Notificaciones, Ahorro, Logistica, API Publica
FASE 4:
- Redis cache de stock con fallback graceful
- Multi-moneda (MXN/USD) con contabilidad en MXN
- Proveedores y ordenes de compra completo
- Meilisearch 1.5M+ partes indexadas
- Metabase KPIs con dashboard auto-generado

FASE 5:
- CRM mejorado: activities, tags, loyalty program, analytics
- Imagenes de partes: upload, resize, thumbnails WebP
- Ordenes de servicio Kanban: received->diagnosis->repair->ready->delivered
- Garantias/RMA, alertas de reorden, multi-sucursal
- Stubs BNPL (APLAZO) y ERP Sync (Aspel/Contpaqi)

FASE 6:
- Notificaciones automaticas: push/WhatsApp/email/in-app
- Reportes de ahorro vs retail_price
- Logistica + tracking: DHL, FedEx, Estafeta, 99min, Uber
- API Publica: API keys, rate limiting, catalog search

Migraciones: v1.9-v3.0
Tests: 93/93 pasando
Backup: nexus_backup_20260427_045859.tar.gz
2026-04-27 05:23:30 +00:00

423 lines
14 KiB
Python

"""Notification Engine: event-driven notifications via push, email, WhatsApp, in-app.
Integrates with existing push_service.py for Web Push.
Supports template rendering with Jinja2-style variable substitution.
"""
import re
import json
from datetime import datetime, timedelta
def _render_template(template, context):
"""Simple variable substitution: {var_name} -> value."""
if not template:
return ''
result = template
for key, value in context.items():
if value is None:
value = ''
result = result.replace(f'{{{key}}}', str(value))
return result
def get_templates(conn, tenant_id, event_type=None, channel=None):
cur = conn.cursor()
params = [tenant_id]
filters = "tenant_id = %s"
if event_type:
filters += " AND event_type = %s"
params.append(event_type)
if channel:
filters += " AND channel = %s"
params.append(channel)
cur.execute(f"""
SELECT id, event_type, channel, name, subject_template, body_template, is_active
FROM notification_templates
WHERE {filters}
ORDER BY event_type, channel
""", params)
templates = []
for r in cur.fetchall():
templates.append({
'id': r[0], 'event_type': r[1], 'channel': r[2], 'name': r[3],
'subject_template': r[4], 'body_template': r[5], 'is_active': r[6],
})
cur.close()
return templates
def create_template(conn, tenant_id, event_type, channel, name, body_template,
subject_template=None, is_active=True):
cur = conn.cursor()
cur.execute("""
INSERT INTO notification_templates
(tenant_id, event_type, channel, name, subject_template, body_template, is_active)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (tenant_id, event_type, channel) DO UPDATE SET
name = EXCLUDED.name,
subject_template = EXCLUDED.subject_template,
body_template = EXCLUDED.body_template,
is_active = EXCLUDED.is_active,
updated_at = NOW()
RETURNING id
""", (tenant_id, event_type, channel, name, subject_template, body_template, is_active))
tid = cur.fetchone()[0]
conn.commit()
cur.close()
return tid
def update_template(conn, template_id, data):
cur = conn.cursor()
allowed = ['event_type', 'channel', 'name', 'subject_template', 'body_template', 'is_active']
sets = []
vals = []
for field in allowed:
if field in data:
sets.append(f"{field} = %s")
vals.append(data[field])
if not sets:
cur.close()
return False
vals.append(template_id)
cur.execute(f"UPDATE notification_templates SET {', '.join(sets)} WHERE id = %s", vals)
conn.commit()
cur.close()
return True
# ─── Event Dispatch ─────────────────────────────
def dispatch_notification(conn, tenant_id, event_type, context, recipient_type='owner',
recipient_id=None, channels=None):
"""Dispatch a notification event to all configured channels.
Args:
conn: DB connection
tenant_id: tenant ID
event_type: e.g. 'low_stock', 'order_ready'
context: dict with template variables
recipient_type: 'owner', 'employee', 'customer', 'role'
recipient_id: specific recipient ID
channels: list of channels to use, or None for all active templates
Returns:
list of notification log IDs
"""
cur = conn.cursor()
# Get active templates for this event
if channels:
placeholders = ','.join(['%s'] * len(channels))
cur.execute(f"""
SELECT id, channel, subject_template, body_template
FROM notification_templates
WHERE tenant_id = %s AND event_type = %s AND is_active = true
AND channel IN ({placeholders})
""", [tenant_id, event_type] + list(channels))
else:
cur.execute("""
SELECT id, channel, subject_template, body_template
FROM notification_templates
WHERE tenant_id = %s AND event_type = %s AND is_active = true
""", (tenant_id, event_type))
templates = cur.fetchall()
if not templates:
cur.close()
return []
log_ids = []
for tid, channel, subject_tmpl, body_tmpl in templates:
subject = _render_template(subject_tmpl, context)
body = _render_template(body_tmpl, context)
# Insert log as pending
cur.execute("""
INSERT INTO notification_logs
(tenant_id, recipient_type, recipient_id, event_type, channel,
subject, body, status, metadata)
VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending', %s)
RETURNING id
""", (tenant_id, recipient_type, recipient_id, event_type, channel,
subject, body, json.dumps(context) if isinstance(context, dict) else None))
log_id = cur.fetchone()[0]
log_ids.append(log_id)
conn.commit()
cur.close()
# Send asynchronously (in production, this would go to a queue)
for log_id in log_ids:
_send_notification(conn, log_id)
return log_ids
def _send_notification(conn, log_id):
"""Send a single notification by its log entry."""
cur = conn.cursor()
cur.execute("""
SELECT channel, subject, body, recipient_type, recipient_id, metadata, tenant_id
FROM notification_logs WHERE id = %s
""", (log_id,))
row = cur.fetchone()
if not row:
cur.close()
return
channel, subject, body, recipient_type, recipient_id, metadata, tenant_id = row
try:
if channel == 'push':
_send_push(conn, tenant_id, recipient_type, recipient_id, subject, body, metadata)
elif channel == 'whatsapp':
_send_whatsapp(conn, tenant_id, recipient_type, recipient_id, body, metadata)
elif channel == 'email':
_send_email(conn, tenant_id, recipient_type, recipient_id, subject, body, metadata)
elif channel == 'in_app':
# In-app is just the log entry; UI polls notification_logs
pass
cur.execute("""
UPDATE notification_logs SET status = 'sent', sent_at = NOW() WHERE id = %s
""", (log_id,))
conn.commit()
except Exception as e:
conn.rollback()
cur2 = conn.cursor()
try:
cur2.execute("""
UPDATE notification_logs SET status = 'failed', error_message = %s WHERE id = %s
""", (str(e)[:500], log_id))
conn.commit()
except Exception:
conn.rollback()
finally:
cur2.close()
finally:
cur.close()
def _send_push(conn, tenant_id, recipient_type, recipient_id, title, body, metadata):
"""Send Web Push notification."""
try:
from services.push_service import notify_owner
# Try to find push subscriptions for recipient
cur = conn.cursor()
if recipient_type == 'owner':
cur.execute("""
SELECT s.subscription_json
FROM push_subscriptions s
JOIN employees e ON s.employee_id = e.id
WHERE e.tenant_id = %s AND e.role = 'owner' AND s.is_active = true
""", (tenant_id,))
elif recipient_type == 'employee' and recipient_id:
cur.execute("""
SELECT subscription_json FROM push_subscriptions
WHERE employee_id = %s AND is_active = true
""", (recipient_id,))
else:
cur.close()
return
subs = [r[0] for r in cur.fetchall()]
cur.close()
if not subs:
return
# Send to all subscriptions
for sub_json in subs:
try:
from services.push_service import send_push
send_push(sub_json, title, body, metadata)
except Exception:
pass
except ImportError:
pass
def _send_whatsapp(conn, tenant_id, recipient_type, recipient_id, body, metadata):
"""Send WhatsApp notification via existing service."""
try:
from services.whatsapp_service import send_message
cur = conn.cursor()
phone = None
if recipient_type == 'customer' and recipient_id:
cur.execute("SELECT phone FROM customers WHERE id = %s", (recipient_id,))
row = cur.fetchone()
if row:
phone = row[0]
cur.close()
if phone:
send_message(phone, body)
except Exception:
pass
def _send_email(conn, tenant_id, recipient_type, recipient_id, subject, body, metadata):
"""Send email notification. Stub — requires SMTP config."""
# Stub: would integrate with SMTP or SendGrid/Postmark
pass
# ─── Convenience Event Dispatchers ─────────────────────────────
def notify_low_stock(conn, tenant_id, inventory_id, stock, reorder_point, branch_id=None):
"""Notify when stock is below reorder point."""
cur = conn.cursor()
cur.execute("SELECT part_number, name FROM inventory WHERE id = %s", (inventory_id,))
row = cur.fetchone()
cur.close()
if not row:
return []
return dispatch_notification(
conn, tenant_id, 'low_stock',
{
'part_number': row[0], 'part_name': row[1],
'stock': stock, 'reorder_point': reorder_point,
'inventory_id': inventory_id,
},
recipient_type='owner',
)
def notify_order_ready(conn, tenant_id, service_order_id, customer_id):
"""Notify customer when service order is ready."""
cur = conn.cursor()
cur.execute("""
SELECT order_number, c.name, c.phone
FROM service_orders so
LEFT JOIN customers c ON so.customer_id = c.id
WHERE so.id = %s
""", (service_order_id,))
row = cur.fetchone()
cur.close()
if not row:
return []
order_number, customer_name, phone = row
# Push to owners
push_ids = dispatch_notification(
conn, tenant_id, 'order_ready',
{'order_number': order_number, 'customer_name': customer_name or 'Cliente'},
recipient_type='owner',
)
# WhatsApp to customer if phone exists
if phone:
wa_ids = dispatch_notification(
conn, tenant_id, 'order_ready',
{'order_number': order_number, 'customer_name': customer_name or 'Cliente'},
recipient_type='customer', recipient_id=customer_id,
channels=['whatsapp'],
)
return push_ids + wa_ids
return push_ids
def notify_maintenance_due(conn, tenant_id, vehicle_id, schedule_id):
"""Notify when vehicle maintenance is due."""
cur = conn.cursor()
cur.execute("""
SELECT v.plate, v.current_mileage, s.maintenance_type, s.next_due_km, s.next_due_at
FROM fleet_vehicles v
JOIN fleet_maintenance_schedules s ON s.vehicle_id = v.id
WHERE v.id = %s AND s.id = %s
""", (vehicle_id, schedule_id))
row = cur.fetchone()
cur.close()
if not row:
return []
plate, mileage, mtype, next_km, next_at = row
return dispatch_notification(
conn, tenant_id, 'maintenance_due',
{
'vehicle_plate': plate, 'current_mileage': mileage or 0,
'maintenance_type': mtype,
'next_due_km': next_km or 'N/A',
'next_due_date': str(next_at)[:10] if next_at else 'N/A',
},
recipient_type='owner',
)
def notify_new_sale(conn, tenant_id, sale_id, total, payment_method, employee_id=None):
"""Notify owners of new sale."""
return dispatch_notification(
conn, tenant_id, 'new_sale',
{
'sale_id': sale_id, 'total': f"${float(total):,.2f}",
'payment_method': payment_method or 'N/A',
},
recipient_type='owner',
)
def notify_po_received(conn, tenant_id, po_id, total):
"""Notify when purchase order is received."""
return dispatch_notification(
conn, tenant_id, 'po_received',
{'po_id': po_id, 'total': f"${float(total):,.2f}"},
recipient_type='owner',
)
def get_notification_logs(conn, tenant_id, recipient_type=None, recipient_id=None,
status=None, event_type=None, limit=50):
cur = conn.cursor()
where = ["tenant_id = %s"]
params = [tenant_id]
if recipient_type:
where.append("recipient_type = %s")
params.append(recipient_type)
if recipient_id:
where.append("recipient_id = %s")
params.append(recipient_id)
if status:
where.append("status = %s")
params.append(status)
if event_type:
where.append("event_type = %s")
params.append(event_type)
cur.execute(f"""
SELECT id, event_type, channel, subject, body, status,
sent_at, read_at, created_at
FROM notification_logs
WHERE {' AND '.join(where)}
ORDER BY created_at DESC
LIMIT %s
""", params + [limit])
logs = []
for r in cur.fetchall():
logs.append({
'id': r[0], 'event_type': r[1], 'channel': r[2],
'subject': r[3], 'body': r[4], 'status': r[5],
'sent_at': str(r[6]) if r[6] else None,
'read_at': str(r[7]) if r[7] else None,
'created_at': str(r[8]),
})
cur.close()
return logs
def mark_as_read(conn, log_id):
cur = conn.cursor()
cur.execute("""
UPDATE notification_logs SET status = 'read', read_at = NOW() WHERE id = %s
""", (log_id,))
conn.commit()
cur.close()
return True