Files
Autoparts-DB/pos/services/crm_engine.py
Nexus Dev 9ff3dc4c8b FASE 4-5-6: Infraestructura, CRM, Service Orders, Notificaciones, Ahorro, Logistica, API Publica
FASE 4:
- Redis cache de stock con fallback graceful
- Multi-moneda (MXN/USD) con contabilidad en MXN
- Proveedores y ordenes de compra completo
- Meilisearch 1.5M+ partes indexadas
- Metabase KPIs con dashboard auto-generado

FASE 5:
- CRM mejorado: activities, tags, loyalty program, analytics
- Imagenes de partes: upload, resize, thumbnails WebP
- Ordenes de servicio Kanban: received->diagnosis->repair->ready->delivered
- Garantias/RMA, alertas de reorden, multi-sucursal
- Stubs BNPL (APLAZO) y ERP Sync (Aspel/Contpaqi)

FASE 6:
- Notificaciones automaticas: push/WhatsApp/email/in-app
- Reportes de ahorro vs retail_price
- Logistica + tracking: DHL, FedEx, Estafeta, 99min, Uber
- API Publica: API keys, rate limiting, catalog search

Migraciones: v1.9-v3.0
Tests: 93/93 pasando
Backup: nexus_backup_20260427_045859.tar.gz
2026-04-27 05:23:30 +00:00

371 lines
12 KiB
Python

"""CRM Engine: customer activities, tags, loyalty, analytics.
Provides:
- Activity timeline logging and retrieval
- Customer tag management and assignment
- Loyalty points accrual, redemption, and balance tracking
- Customer analytics (LTV, frequency, churn risk)
"""
from datetime import datetime, timedelta
# ─── Customer Activities ─────────────────────────────
def log_activity(conn, customer_id, activity_type, title=None, description=None,
metadata=None, employee_id=None):
"""Log a customer activity to the timeline."""
cur = conn.cursor()
cur.execute("""
INSERT INTO customer_activities
(customer_id, activity_type, title, description, metadata, employee_id)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id
""", (customer_id, activity_type, title, description,
metadata if metadata else None, employee_id))
activity_id = cur.fetchone()[0]
conn.commit()
cur.close()
return activity_id
def get_activities(conn, customer_id, activity_type=None, limit=50):
"""Get customer activity timeline."""
cur = conn.cursor()
params = [customer_id]
type_filter = ""
if activity_type:
type_filter = "AND activity_type = %s"
params.append(activity_type)
cur.execute(f"""
SELECT a.id, a.activity_type, a.title, a.description, a.metadata,
a.employee_id, e.name as employee_name, a.created_at
FROM customer_activities a
LEFT JOIN employees e ON a.employee_id = e.id
WHERE a.customer_id = %s {type_filter}
ORDER BY a.created_at DESC
LIMIT %s
""", params + [limit])
activities = []
for r in cur.fetchall():
activities.append({
'id': r[0], 'activity_type': r[1], 'title': r[2],
'description': r[3], 'metadata': r[4],
'employee_id': r[5], 'employee_name': r[6],
'created_at': str(r[7]),
})
cur.close()
return activities
# ─── Customer Tags ─────────────────────────────
def create_tag(conn, tenant_id, name, color='#6B7280', description=None):
cur = conn.cursor()
cur.execute("""
INSERT INTO customer_tags (tenant_id, name, color, description)
VALUES (%s, %s, %s, %s)
RETURNING id
""", (tenant_id, name, color, description))
tag_id = cur.fetchone()[0]
conn.commit()
cur.close()
return tag_id
def list_tags(conn, tenant_id):
cur = conn.cursor()
cur.execute("""
SELECT id, name, color, description, created_at
FROM customer_tags
WHERE tenant_id = %s
ORDER BY name
""", (tenant_id,))
tags = []
for r in cur.fetchall():
tags.append({
'id': r[0], 'name': r[1], 'color': r[2],
'description': r[3], 'created_at': str(r[4]),
})
cur.close()
return tags
def assign_tag(conn, customer_id, tag_id, assigned_by=None):
cur = conn.cursor()
cur.execute("""
INSERT INTO customer_tag_assignments (customer_id, tag_id, assigned_by)
VALUES (%s, %s, %s)
ON CONFLICT (customer_id, tag_id) DO NOTHING
""", (customer_id, tag_id, assigned_by))
conn.commit()
cur.close()
return True
def remove_tag(conn, customer_id, tag_id):
cur = conn.cursor()
cur.execute("""
DELETE FROM customer_tag_assignments
WHERE customer_id = %s AND tag_id = %s
""", (customer_id, tag_id))
conn.commit()
cur.close()
return True
def get_customer_tags(conn, customer_id):
cur = conn.cursor()
cur.execute("""
SELECT t.id, t.name, t.color
FROM customer_tags t
JOIN customer_tag_assignments a ON t.id = a.tag_id
WHERE a.customer_id = %s
ORDER BY t.name
""", (customer_id,))
tags = [{'id': r[0], 'name': r[1], 'color': r[2]} for r in cur.fetchall()]
cur.close()
return tags
# ─── Loyalty Program ─────────────────────────────
def add_loyalty_points(conn, customer_id, points, points_type='earned',
source_type=None, source_id=None, description=None,
expires_at=None):
"""Add loyalty points to a customer. Updates denormalized balance."""
cur = conn.cursor()
cur.execute("""
INSERT INTO loyalty_points
(customer_id, points, points_type, source_type, source_id, description, expires_at)
VALUES (%s, %s, %s, %s, %s, %s, %s)
RETURNING id
""", (customer_id, points, points_type, source_type, source_id, description, expires_at))
point_id = cur.fetchone()[0]
# Update denormalized balance
_recalculate_loyalty_balance(conn, customer_id, cur)
conn.commit()
cur.close()
return point_id
def redeem_points(conn, customer_id, points_to_use, reward_id=None,
reward_value=None, description=None, employee_id=None):
"""Redeem loyalty points. Returns redemption_id or raises ValueError."""
cur = conn.cursor()
# Check available balance
cur.execute("""
SELECT COALESCE(SUM(points), 0)
FROM loyalty_points
WHERE customer_id = %s
AND (expires_at IS NULL OR expires_at > NOW())
""", (customer_id,))
available = cur.fetchone()[0] or 0
if available < points_to_use:
cur.close()
raise ValueError(f"Insufficient points: available={available}, requested={points_to_use}")
# Record redemption
cur.execute("""
INSERT INTO loyalty_redemptions
(customer_id, reward_id, points_used, reward_value, description, employee_id)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id
""", (customer_id, reward_id, points_to_use, reward_value, description, employee_id))
redemption_id = cur.fetchone()[0]
# Deduct points (record negative entry)
cur.execute("""
INSERT INTO loyalty_points
(customer_id, points, points_type, source_type, description)
VALUES (%s, %s, 'redeemed', 'redemption', %s)
""", (customer_id, -points_to_use, description))
_recalculate_loyalty_balance(conn, customer_id, cur)
conn.commit()
cur.close()
return redemption_id
def _recalculate_loyalty_balance(conn, customer_id, cur=None):
should_close = cur is None
if should_close:
cur = conn.cursor()
# Calculate total non-expired points
cur.execute("""
SELECT COALESCE(SUM(points), 0)
FROM loyalty_points
WHERE customer_id = %s
AND (expires_at IS NULL OR expires_at > NOW())
""", (customer_id,))
balance = cur.fetchone()[0] or 0
# Determine tier
tier = 'bronze'
if balance >= 5000:
tier = 'platinum'
elif balance >= 2000:
tier = 'gold'
elif balance >= 500:
tier = 'silver'
cur.execute("""
UPDATE customers
SET loyalty_points_balance = %s, loyalty_tier = %s
WHERE id = %s
""", (balance, tier, customer_id))
if should_close:
conn.commit()
cur.close()
def get_loyalty_history(conn, customer_id, limit=50):
cur = conn.cursor()
cur.execute("""
SELECT id, points, points_type, source_type, source_id,
description, expires_at, created_at
FROM loyalty_points
WHERE customer_id = %s
ORDER BY created_at DESC
LIMIT %s
""", (customer_id, limit))
history = []
for r in cur.fetchall():
history.append({
'id': r[0], 'points': r[1], 'points_type': r[2],
'source_type': r[3], 'source_id': r[4],
'description': r[5], 'expires_at': str(r[6]) if r[6] else None,
'created_at': str(r[7]),
})
cur.close()
return history
def create_reward(conn, tenant_id, name, points_cost, reward_type='discount',
reward_value=None, description=None):
cur = conn.cursor()
cur.execute("""
INSERT INTO loyalty_rewards
(tenant_id, name, description, points_cost, reward_type, reward_value)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id
""", (tenant_id, name, description, points_cost, reward_type, reward_value))
reward_id = cur.fetchone()[0]
conn.commit()
cur.close()
return reward_id
def list_rewards(conn, tenant_id):
cur = conn.cursor()
cur.execute("""
SELECT id, name, description, points_cost, reward_type, reward_value, is_active
FROM loyalty_rewards
WHERE tenant_id = %s AND is_active = true
ORDER BY points_cost
""", (tenant_id,))
rewards = []
for r in cur.fetchall():
rewards.append({
'id': r[0], 'name': r[1], 'description': r[2],
'points_cost': r[3], 'reward_type': r[4], 'reward_value': float(r[5]) if r[5] else None,
'is_active': r[6],
})
cur.close()
return rewards
# ─── Customer Analytics ─────────────────────────────
def get_customer_analytics(conn, customer_id):
"""Compute LTV, purchase frequency, favorite categories, churn risk."""
cur = conn.cursor()
# LTV and frequency
cur.execute("""
SELECT
COALESCE(SUM(total), 0) as ltv,
COUNT(*) as total_orders,
MIN(created_at) as first_purchase,
MAX(created_at) as last_purchase,
COALESCE(AVG(total), 0) as avg_order_value
FROM sales
WHERE customer_id = %s AND status = 'completed'
""", (customer_id,))
ltv, total_orders, first_purchase, last_purchase, aov = cur.fetchone()
# Days since last purchase
days_since_last = None
if last_purchase:
days_since_last = (datetime.utcnow() - last_purchase).days
# Purchase frequency (orders per month)
frequency = 0.0
if first_purchase and last_purchase and total_orders > 1:
months = max(1, (last_purchase - first_purchase).days / 30.0)
frequency = round(total_orders / months, 2)
# Churn risk: no purchase in 90+ days = high, 60-90 = medium, <60 = low
churn_risk = 'low'
if days_since_last is not None:
if days_since_last > 90:
churn_risk = 'high'
elif days_since_last > 60:
churn_risk = 'medium'
# Favorite categories (from inventory via sale_items)
cur.execute("""
SELECT COALESCE(i.category_id::text, 'Uncategorized') as category,
COUNT(*) as cnt, SUM(si.subtotal) as revenue
FROM sale_items si
JOIN sales s ON s.id = si.sale_id
LEFT JOIN inventory i ON si.inventory_id = i.id
WHERE s.customer_id = %s AND s.status = 'completed'
GROUP BY i.category_id
ORDER BY revenue DESC
LIMIT 5
""", (customer_id,))
categories = []
for r in cur.fetchall():
categories.append({
'category': r[0] or 'Uncategorized',
'order_count': r[1],
'revenue': float(r[2]) if r[2] else 0,
})
# Loyalty status
cur.execute("""
SELECT loyalty_points_balance, loyalty_tier
FROM customers WHERE id = %s
""", (customer_id,))
row = cur.fetchone()
loyalty_balance = row[0] or 0
loyalty_tier = row[1] or 'bronze'
cur.close()
return {
'ltv': float(ltv) if ltv else 0,
'total_orders': total_orders or 0,
'avg_order_value': float(aov) if aov else 0,
'first_purchase': str(first_purchase) if first_purchase else None,
'last_purchase': str(last_purchase) if last_purchase else None,
'days_since_last_purchase': days_since_last,
'purchase_frequency_monthly': frequency,
'churn_risk': churn_risk,
'favorite_categories': categories,
'loyalty': {
'points_balance': loyalty_balance,
'tier': loyalty_tier,
}
}