"""Notification Engine: event-driven notifications via push, email, WhatsApp, in-app. Integrates with existing push_service.py for Web Push. Supports template rendering with Jinja2-style variable substitution. """ import re import json from datetime import datetime, timedelta def _render_template(template, context): """Simple variable substitution: {var_name} -> value.""" if not template: return '' result = template for key, value in context.items(): if value is None: value = '' result = result.replace(f'{{{key}}}', str(value)) return result def get_templates(conn, tenant_id, event_type=None, channel=None): cur = conn.cursor() params = [tenant_id] filters = "tenant_id = %s" if event_type: filters += " AND event_type = %s" params.append(event_type) if channel: filters += " AND channel = %s" params.append(channel) cur.execute(f""" SELECT id, event_type, channel, name, subject_template, body_template, is_active FROM notification_templates WHERE {filters} ORDER BY event_type, channel """, params) templates = [] for r in cur.fetchall(): templates.append({ 'id': r[0], 'event_type': r[1], 'channel': r[2], 'name': r[3], 'subject_template': r[4], 'body_template': r[5], 'is_active': r[6], }) cur.close() return templates def create_template(conn, tenant_id, event_type, channel, name, body_template, subject_template=None, is_active=True): cur = conn.cursor() cur.execute(""" INSERT INTO notification_templates (tenant_id, event_type, channel, name, subject_template, body_template, is_active) VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (tenant_id, event_type, channel) DO UPDATE SET name = EXCLUDED.name, subject_template = EXCLUDED.subject_template, body_template = EXCLUDED.body_template, is_active = EXCLUDED.is_active, updated_at = NOW() RETURNING id """, (tenant_id, event_type, channel, name, subject_template, body_template, is_active)) tid = cur.fetchone()[0] conn.commit() cur.close() return tid def update_template(conn, template_id, data): cur = conn.cursor() allowed = ['event_type', 'channel', 'name', 'subject_template', 'body_template', 'is_active'] sets = [] vals = [] for field in allowed: if field in data: sets.append(f"{field} = %s") vals.append(data[field]) if not sets: cur.close() return False vals.append(template_id) cur.execute(f"UPDATE notification_templates SET {', '.join(sets)} WHERE id = %s", vals) conn.commit() cur.close() return True # ─── Event Dispatch ───────────────────────────── def dispatch_notification(conn, tenant_id, event_type, context, recipient_type='owner', recipient_id=None, channels=None): """Dispatch a notification event to all configured channels. Args: conn: DB connection tenant_id: tenant ID event_type: e.g. 'low_stock', 'order_ready' context: dict with template variables recipient_type: 'owner', 'employee', 'customer', 'role' recipient_id: specific recipient ID channels: list of channels to use, or None for all active templates Returns: list of notification log IDs """ cur = conn.cursor() # Get active templates for this event if channels: placeholders = ','.join(['%s'] * len(channels)) cur.execute(f""" SELECT id, channel, subject_template, body_template FROM notification_templates WHERE tenant_id = %s AND event_type = %s AND is_active = true AND channel IN ({placeholders}) """, [tenant_id, event_type] + list(channels)) else: cur.execute(""" SELECT id, channel, subject_template, body_template FROM notification_templates WHERE tenant_id = %s AND event_type = %s AND is_active = true """, (tenant_id, event_type)) templates = cur.fetchall() if not templates: cur.close() return [] log_ids = [] for tid, channel, subject_tmpl, body_tmpl in templates: subject = _render_template(subject_tmpl, context) body = _render_template(body_tmpl, context) # Insert log as pending cur.execute(""" INSERT INTO notification_logs (tenant_id, recipient_type, recipient_id, event_type, channel, subject, body, status, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending', %s) RETURNING id """, (tenant_id, recipient_type, recipient_id, event_type, channel, subject, body, json.dumps(context) if isinstance(context, dict) else None)) log_id = cur.fetchone()[0] log_ids.append(log_id) conn.commit() cur.close() # Send asynchronously (in production, this would go to a queue) for log_id in log_ids: _send_notification(conn, log_id) return log_ids def _send_notification(conn, log_id): """Send a single notification by its log entry.""" cur = conn.cursor() cur.execute(""" SELECT channel, subject, body, recipient_type, recipient_id, metadata, tenant_id FROM notification_logs WHERE id = %s """, (log_id,)) row = cur.fetchone() if not row: cur.close() return channel, subject, body, recipient_type, recipient_id, metadata, tenant_id = row try: if channel == 'push': _send_push(conn, tenant_id, recipient_type, recipient_id, subject, body, metadata) elif channel == 'whatsapp': _send_whatsapp(conn, tenant_id, recipient_type, recipient_id, body, metadata) elif channel == 'email': _send_email(conn, tenant_id, recipient_type, recipient_id, subject, body, metadata) elif channel == 'in_app': # In-app is just the log entry; UI polls notification_logs pass cur.execute(""" UPDATE notification_logs SET status = 'sent', sent_at = NOW() WHERE id = %s """, (log_id,)) conn.commit() except Exception as e: conn.rollback() cur2 = conn.cursor() try: cur2.execute(""" UPDATE notification_logs SET status = 'failed', error_message = %s WHERE id = %s """, (str(e)[:500], log_id)) conn.commit() except Exception: conn.rollback() finally: cur2.close() finally: cur.close() def _send_push(conn, tenant_id, recipient_type, recipient_id, title, body, metadata): """Send Web Push notification.""" try: from services.push_service import notify_owner # Try to find push subscriptions for recipient cur = conn.cursor() if recipient_type == 'owner': cur.execute(""" SELECT s.subscription_json FROM push_subscriptions s JOIN employees e ON s.employee_id = e.id WHERE e.tenant_id = %s AND e.role = 'owner' AND s.is_active = true """, (tenant_id,)) elif recipient_type == 'employee' and recipient_id: cur.execute(""" SELECT subscription_json FROM push_subscriptions WHERE employee_id = %s AND is_active = true """, (recipient_id,)) else: cur.close() return subs = [r[0] for r in cur.fetchall()] cur.close() if not subs: return # Send to all subscriptions for sub_json in subs: try: from services.push_service import send_push send_push(sub_json, title, body, metadata) except Exception: pass except ImportError: pass def _send_whatsapp(conn, tenant_id, recipient_type, recipient_id, body, metadata): """Send WhatsApp notification via existing service.""" try: from services.whatsapp_service import send_message cur = conn.cursor() phone = None if recipient_type == 'customer' and recipient_id: cur.execute("SELECT phone FROM customers WHERE id = %s", (recipient_id,)) row = cur.fetchone() if row: phone = row[0] cur.close() if phone: send_message(phone, body) except Exception: pass def _send_email(conn, tenant_id, recipient_type, recipient_id, subject, body, metadata): """Send email notification. Stub — requires SMTP config.""" # Stub: would integrate with SMTP or SendGrid/Postmark pass # ─── Convenience Event Dispatchers ───────────────────────────── def notify_low_stock(conn, tenant_id, inventory_id, stock, reorder_point, branch_id=None): """Notify when stock is below reorder point.""" cur = conn.cursor() cur.execute("SELECT part_number, name FROM inventory WHERE id = %s", (inventory_id,)) row = cur.fetchone() cur.close() if not row: return [] return dispatch_notification( conn, tenant_id, 'low_stock', { 'part_number': row[0], 'part_name': row[1], 'stock': stock, 'reorder_point': reorder_point, 'inventory_id': inventory_id, }, recipient_type='owner', ) def notify_order_ready(conn, tenant_id, service_order_id, customer_id): """Notify customer when service order is ready.""" cur = conn.cursor() cur.execute(""" SELECT order_number, c.name, c.phone FROM service_orders so LEFT JOIN customers c ON so.customer_id = c.id WHERE so.id = %s """, (service_order_id,)) row = cur.fetchone() cur.close() if not row: return [] order_number, customer_name, phone = row # Push to owners push_ids = dispatch_notification( conn, tenant_id, 'order_ready', {'order_number': order_number, 'customer_name': customer_name or 'Cliente'}, recipient_type='owner', ) # WhatsApp to customer if phone exists if phone: wa_ids = dispatch_notification( conn, tenant_id, 'order_ready', {'order_number': order_number, 'customer_name': customer_name or 'Cliente'}, recipient_type='customer', recipient_id=customer_id, channels=['whatsapp'], ) return push_ids + wa_ids return push_ids def notify_maintenance_due(conn, tenant_id, vehicle_id, schedule_id): """Notify when vehicle maintenance is due.""" cur = conn.cursor() cur.execute(""" SELECT v.plate, v.current_mileage, s.maintenance_type, s.next_due_km, s.next_due_at FROM fleet_vehicles v JOIN fleet_maintenance_schedules s ON s.vehicle_id = v.id WHERE v.id = %s AND s.id = %s """, (vehicle_id, schedule_id)) row = cur.fetchone() cur.close() if not row: return [] plate, mileage, mtype, next_km, next_at = row return dispatch_notification( conn, tenant_id, 'maintenance_due', { 'vehicle_plate': plate, 'current_mileage': mileage or 0, 'maintenance_type': mtype, 'next_due_km': next_km or 'N/A', 'next_due_date': str(next_at)[:10] if next_at else 'N/A', }, recipient_type='owner', ) def notify_new_sale(conn, tenant_id, sale_id, total, payment_method, employee_id=None): """Notify owners of new sale.""" return dispatch_notification( conn, tenant_id, 'new_sale', { 'sale_id': sale_id, 'total': f"${float(total):,.2f}", 'payment_method': payment_method or 'N/A', }, recipient_type='owner', ) def notify_po_received(conn, tenant_id, po_id, total): """Notify when purchase order is received.""" return dispatch_notification( conn, tenant_id, 'po_received', {'po_id': po_id, 'total': f"${float(total):,.2f}"}, recipient_type='owner', ) def get_notification_logs(conn, tenant_id, recipient_type=None, recipient_id=None, status=None, event_type=None, limit=50): cur = conn.cursor() where = ["tenant_id = %s"] params = [tenant_id] if recipient_type: where.append("recipient_type = %s") params.append(recipient_type) if recipient_id: where.append("recipient_id = %s") params.append(recipient_id) if status: where.append("status = %s") params.append(status) if event_type: where.append("event_type = %s") params.append(event_type) cur.execute(f""" SELECT id, event_type, channel, subject, body, status, sent_at, read_at, created_at FROM notification_logs WHERE {' AND '.join(where)} ORDER BY created_at DESC LIMIT %s """, params + [limit]) logs = [] for r in cur.fetchall(): logs.append({ 'id': r[0], 'event_type': r[1], 'channel': r[2], 'subject': r[3], 'body': r[4], 'status': r[5], 'sent_at': str(r[6]) if r[6] else None, 'read_at': str(r[7]) if r[7] else None, 'created_at': str(r[8]), }) cur.close() return logs def mark_as_read(conn, log_id): cur = conn.cursor() cur.execute(""" UPDATE notification_logs SET status = 'read', read_at = NOW() WHERE id = %s """, (log_id,)) conn.commit() cur.close() return True