Files
Autoparts-DB/pos/blueprints/pos_bp.py
consultoria-as 042acd6207 OPCIÓN C + A1: Consolidación técnica + orjson
C1: Materialized view part_vehicle_preview (creación en progreso)
- Migración v3.3_materialized_view.sql
- catalog_service.py y dashboard/server.py ahora usan la MV
- Script refresh_part_vehicle_preview.py + warm_vehicle_cache.py actualizado

C2: Fix cache warming script (autónomo)
- Auto-re-ejecuta con sudo -u postgres si peer auth falla
- Args CLI: --dsn, --batch-size, --ttl, --dry-run

C3: CSS dinámico residual extraído
- sidebar.js → sidebar.css (nuevo)
- pos-utils.js → common.css (nuevo)
- Links agregados a 14 templates POS

C4: Script de load testing básico
- scripts/load_test.py: métricas p50/p95/p99, throughput, errores

C5: Documentación actualizada
- FASES_IMPLEMENTADAS.md: test count real, FASE 7 completa
- performance_audit_2026.md: anexo post-FASE 7, métricas actualizadas

A1: Serialización orjson
- pos/json_provider.py: DefaultJSONProvider con orjson.dumps/loads
- Aplicado a POS app y Dashboard server
- Fix indentation error en pos_bp.py

Tests: 73/73 pasando
2026-04-27 09:36:03 +00:00

1970 lines
70 KiB
Python

# /home/Autopartes/pos/blueprints/pos_bp.py
"""POS blueprint: sales, quotations, layaways.
All sale business logic is in services.pos_engine. This blueprint is the HTTP layer
that validates input, calls the engine, and returns JSON responses.
"""
import json
from datetime import datetime, date, timedelta
from flask import Blueprint, request, jsonify, g
from middleware import require_auth, has_permission
from tenant_db import get_tenant_conn
from services.pos_engine import (
process_sale, cancel_sale, calculate_totals,
get_price_for_customer, get_margin_info
)
from services.audit import log_action
pos_bp = Blueprint('pos', __name__, url_prefix='/pos/api')
def _enrich_items(cur, items, customer_id=None):
"""Look up inventory data for items that lack unit_price/tax_rate.
Uses batch queries to avoid N+1 performance issues.
Returns list of dicts with all fields needed by calculate_totals.
"""
inv_ids = [item.get('inventory_id') for item in items if item.get('inventory_id')]
if not inv_ids:
raise ValueError("No valid inventory items provided")
# Batch fetch all inventory items in one query
cur.execute("""
SELECT id, part_number, name, cost, price_1, price_2, price_3,
tax_rate, branch_id
FROM inventory WHERE id = ANY(%s) AND is_active = true
""", (inv_ids,))
inv_map = {r[0]: r for r in cur.fetchall()}
# Fetch customer price tier once (if provided)
price_tier = 1
if customer_id:
cur.execute("SELECT price_tier FROM customers WHERE id = %s", (customer_id,))
cust = cur.fetchone()
if cust and cust[0]:
price_tier = int(cust[0])
enriched = []
for item in items:
inv_id = item.get('inventory_id')
qty = int(item.get('quantity', 1))
if qty <= 0:
raise ValueError(f"Invalid quantity for inventory_id {inv_id}")
inv = inv_map.get(inv_id)
if not inv:
raise ValueError(f"Inventory item {inv_id} not found or inactive")
# price_1=inv[4], price_2=inv[5], price_3=inv[6]
tier_prices = {1: inv[4], 2: inv[5], 3: inv[6]}
default_price = float(tier_prices.get(price_tier, inv[4]) or inv[4])
unit_price = float(item.get('unit_price', default_price))
discount_pct = float(item.get('discount_pct', 0))
tax_rate = float(item.get('tax_rate', inv[7] or 0.16))
enriched.append({
'inventory_id': inv_id,
'part_number': inv[1],
'name': inv[2],
'quantity': qty,
'unit_price': unit_price,
'unit_cost': float(inv[3]) if inv[3] else 0,
'discount_pct': discount_pct,
'tax_rate': tax_rate,
'branch_id': inv[8],
})
return enriched
# ─── Sales ───────────────────────────────────────
@pos_bp.route('/sales', methods=['POST'])
@require_auth('pos.sell')
def create_sale():
"""Create a new sale.
Body: {
items: [{inventory_id, quantity, unit_price, discount_pct, tax_rate}],
customer_id: int | null,
payment_method: 'efectivo' | 'transferencia' | 'tarjeta' | 'mixto',
sale_type: 'cash' | 'credit' | 'mixed',
register_id: int,
amount_paid: float,
payment_details: [{method, amount, reference}], (for mixed payments)
notes: str,
currency: 'MXN' | 'USD' (default 'MXN'),
exchange_rate: float (optional, auto-fetched from tenant config if omitted)
}
"""
data = request.get_json() or {}
conn = get_tenant_conn(g.tenant_id)
try:
sale = process_sale(conn, data)
conn.commit()
conn.close()
return jsonify(sale), 201
except ValueError as e:
conn.rollback()
conn.close()
return jsonify({'error': str(e)}), 400
except Exception as e:
conn.rollback()
conn.close()
return jsonify({'error': str(e)}), 500
@pos_bp.route('/sales', methods=['GET'])
@require_auth('pos.view')
def list_sales():
"""List sales with filters.
Query params:
date_from: YYYY-MM-DD
date_to: YYYY-MM-DD
employee_id: int
customer_id: int
status: completed | cancelled | returned
register_id: int
page: int (default 1)
per_page: int (default 50, max 200)
"""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
page = int(request.args.get('page', 1))
per_page = min(int(request.args.get('per_page', 50)), 200)
where_clauses = ["1=1"]
params = []
date_from = request.args.get('date_from')
date_to = request.args.get('date_to')
employee_id = request.args.get('employee_id')
customer_id = request.args.get('customer_id')
status = request.args.get('status')
register_id = request.args.get('register_id')
if date_from:
where_clauses.append("s.created_at >= %s")
params.append(date_from)
if date_to:
where_clauses.append("s.created_at < %s::date + interval '1 day'")
params.append(date_to)
if employee_id:
where_clauses.append("s.employee_id = %s")
params.append(int(employee_id))
if customer_id:
where_clauses.append("s.customer_id = %s")
params.append(int(customer_id))
if status:
where_clauses.append("s.status = %s")
params.append(status)
if register_id:
where_clauses.append("s.register_id = %s")
params.append(int(register_id))
# Default to current branch
if g.branch_id:
where_clauses.append("s.branch_id = %s")
params.append(g.branch_id)
where = " AND ".join(where_clauses)
cur.execute(f"SELECT count(*) FROM sales s WHERE {where}", params)
total = cur.fetchone()[0]
cur.execute(f"""
SELECT s.id, s.branch_id, s.customer_id, s.employee_id, s.register_id,
s.sale_type, s.payment_method, s.subtotal, s.discount_total,
s.tax_total, s.total, s.amount_paid, s.change_given,
s.status, s.created_at,
e.name as employee_name,
c.name as customer_name
FROM sales s
LEFT JOIN employees e ON s.employee_id = e.id
LEFT JOIN customers c ON s.customer_id = c.id
WHERE {where}
ORDER BY s.created_at DESC
LIMIT %s OFFSET %s
""", params + [per_page, (page - 1) * per_page])
sales = []
for r in cur.fetchall():
sales.append({
'id': r[0], 'branch_id': r[1], 'customer_id': r[2],
'employee_id': r[3], 'register_id': r[4],
'sale_type': r[5], 'payment_method': r[6],
'subtotal': float(r[7]) if r[7] else 0,
'discount_total': float(r[8]) if r[8] else 0,
'tax_total': float(r[9]) if r[9] else 0,
'total': float(r[10]) if r[10] else 0,
'amount_paid': float(r[11]) if r[11] else 0,
'change_given': float(r[12]) if r[12] else 0,
'status': r[13], 'created_at': str(r[14]),
'employee_name': r[15], 'customer_name': r[16],
})
cur.close()
conn.close()
total_pages = (total + per_page - 1) // per_page
return jsonify({
'data': sales,
'pagination': {'page': page, 'per_page': per_page, 'total': total, 'total_pages': total_pages}
})
@pos_bp.route('/sales/<int:sale_id>', methods=['GET'])
@require_auth('pos.view')
def get_sale(sale_id):
"""Get sale detail with items."""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
cur.execute("""
SELECT s.*, e.name as employee_name, c.name as customer_name
FROM sales s
LEFT JOIN employees e ON s.employee_id = e.id
LEFT JOIN customers c ON s.customer_id = c.id
WHERE s.id = %s
""", (sale_id,))
row = cur.fetchone()
if not row:
cur.close(); conn.close()
return jsonify({'error': 'Sale not found'}), 404
cols = [desc[0] for desc in cur.description]
sale = dict(zip(cols, row))
# Convert Decimal fields
for k in ('subtotal', 'discount_total', 'tax_total', 'total', 'amount_paid', 'change_given'):
if sale.get(k) is not None:
sale[k] = float(sale[k])
if sale.get('created_at'):
sale['created_at'] = str(sale['created_at'])
# Get items
cur.execute("""
SELECT id, inventory_id, part_number, name, quantity, unit_price,
unit_cost, discount_pct, discount_amount, tax_rate, tax_amount, subtotal,
clave_prod_serv, clave_unidad
FROM sale_items WHERE sale_id = %s ORDER BY id
""", (sale_id,))
items = []
for r in cur.fetchall():
items.append({
'id': r[0], 'inventory_id': r[1], 'part_number': r[2], 'name': r[3],
'quantity': r[4], 'unit_price': float(r[5]) if r[5] else 0,
'unit_cost': float(r[6]) if r[6] else 0,
'discount_pct': float(r[7]) if r[7] else 0,
'discount_amount': float(r[8]) if r[8] else 0,
'tax_rate': float(r[9]) if r[9] else 0,
'tax_amount': float(r[10]) if r[10] else 0,
'subtotal': float(r[11]) if r[11] else 0,
'clave_prod_serv': r[12], 'clave_unidad': r[13],
})
sale['items'] = items
# Get payments
cur.execute("""
SELECT id, method, amount, reference, created_at
FROM sale_payments WHERE sale_id = %s ORDER BY id
""", (sale_id,))
payments = []
for r in cur.fetchall():
payments.append({
'id': r[0], 'method': r[1], 'amount': float(r[2]) if r[2] else 0,
'reference': r[3], 'created_at': str(r[4]) if r[4] else None,
})
sale['payments'] = payments
cur.close()
conn.close()
return jsonify(sale)
@pos_bp.route('/sales/<int:sale_id>/cancel', methods=['PUT'])
@require_auth('pos.sell')
def api_cancel_sale(sale_id):
"""Cancel a sale. Requires mandatory reason.
Body: {reason: str}
"""
data = request.get_json() or {}
reason = data.get('reason', '').strip()
conn = get_tenant_conn(g.tenant_id)
try:
result = cancel_sale(conn, sale_id, reason)
conn.commit()
conn.close()
return jsonify(result)
except ValueError as e:
conn.rollback()
conn.close()
return jsonify({'error': str(e)}), 400
except Exception as e:
conn.rollback()
conn.close()
return jsonify({'error': str(e)}), 500
@pos_bp.route('/sales/last', methods=['GET'])
@require_auth('pos.view')
def get_last_sale():
"""Get the last sale for the current employee."""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
where_clauses = ["s.employee_id = %s"]
params = [g.employee_id]
if g.branch_id:
where_clauses.append("s.branch_id = %s")
params.append(g.branch_id)
where = " AND ".join(where_clauses)
cur.execute(f"""
SELECT s.id, s.branch_id, s.customer_id, s.employee_id, s.register_id,
s.sale_type, s.payment_method, s.subtotal, s.discount_total,
s.tax_total, s.total, s.amount_paid, s.change_given,
s.status, s.created_at,
e.name as employee_name,
c.name as customer_name
FROM sales s
LEFT JOIN employees e ON s.employee_id = e.id
LEFT JOIN customers c ON s.customer_id = c.id
WHERE {where}
ORDER BY s.created_at DESC
LIMIT 1
""", params)
row = cur.fetchone()
if not row:
cur.close(); conn.close()
return jsonify({'error': 'No sales found'}), 404
sale = {
'id': row[0], 'branch_id': row[1], 'customer_id': row[2],
'employee_id': row[3], 'register_id': row[4],
'sale_type': row[5], 'payment_method': row[6],
'subtotal': float(row[7]) if row[7] else 0,
'discount_total': float(row[8]) if row[8] else 0,
'tax_total': float(row[9]) if row[9] else 0,
'total': float(row[10]) if row[10] else 0,
'amount_paid': float(row[11]) if row[11] else 0,
'change_given': float(row[12]) if row[12] else 0,
'status': row[13], 'created_at': str(row[14]),
'employee_name': row[15], 'customer_name': row[16],
}
# Get items
cur.execute("""
SELECT id, inventory_id, part_number, name, quantity, unit_price,
unit_cost, discount_pct, discount_amount, tax_rate, tax_amount, subtotal,
clave_prod_serv, clave_unidad
FROM sale_items WHERE sale_id = %s ORDER BY id
""", (sale['id'],))
items = []
for r in cur.fetchall():
items.append({
'id': r[0], 'inventory_id': r[1], 'part_number': r[2], 'name': r[3],
'quantity': r[4], 'unit_price': float(r[5]) if r[5] else 0,
'unit_cost': float(r[6]) if r[6] else 0,
'discount_pct': float(r[7]) if r[7] else 0,
'discount_amount': float(r[8]) if r[8] else 0,
'tax_rate': float(r[9]) if r[9] else 0,
'tax_amount': float(r[10]) if r[10] else 0,
'subtotal': float(r[11]) if r[11] else 0,
'clave_prod_serv': r[12], 'clave_unidad': r[13],
})
sale['items'] = items
# Get payments
cur.execute("""
SELECT id, method, amount, reference, created_at
FROM sale_payments WHERE sale_id = %s ORDER BY id
""", (sale['id'],))
payments = []
for r in cur.fetchall():
payments.append({
'id': r[0], 'method': r[1], 'amount': float(r[2]) if r[2] else 0,
'reference': r[3], 'created_at': str(r[4]) if r[4] else None,
})
sale['payments'] = payments
cur.close()
conn.close()
return jsonify(sale)
# ─── Quotations ──────────────────────────────────
@pos_bp.route('/quotations', methods=['POST'])
@require_auth('pos.sell')
def create_quotation():
"""Save a quotation from current cart.
Body: {
items: [{inventory_id, quantity, unit_price, discount_pct, tax_rate}],
customer_id: int | null,
valid_days: int (default 7),
notes: str,
currency: 'MXN' | 'USD' (default 'MXN'),
exchange_rate: float (optional, auto-fetched if not provided)
}
"""
data = request.get_json() or {}
items = data.get('items', [])
if not items:
return jsonify({'error': 'No items in quotation'}), 400
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
# Enrich items with inventory data (price, tax, etc.)
try:
enriched = _enrich_items(cur, items, data.get('customer_id'))
except ValueError as e:
cur.close(); conn.close()
return jsonify({'error': str(e)}), 400
# Calculate totals
totals = calculate_totals(enriched)
valid_days = int(data.get('valid_days', 7))
valid_until = (date.today() + timedelta(days=valid_days)).isoformat()
# Multi-currency for quotations
from services.currency import get_exchange_rate
currency = data.get('currency', 'MXN')
if currency not in ('MXN', 'USD'):
cur.close(); conn.close()
return jsonify({'error': f'Unsupported currency: {currency}'}), 400
exchange_rate = data.get('exchange_rate')
if currency != 'MXN' and exchange_rate is None:
exchange_rate = float(get_exchange_rate(conn, currency, 'MXN'))
exchange_rate = float(exchange_rate) if exchange_rate else 1.0
try:
cur.execute("""
INSERT INTO quotations
(branch_id, customer_id, employee_id, subtotal,
tax_total, total, status, valid_until, notes, currency, exchange_rate)
VALUES (%s,%s,%s,%s,%s,%s,'active',%s,%s,%s,%s)
RETURNING id, created_at
""", (
g.branch_id, data.get('customer_id'), g.employee_id,
totals['subtotal'], totals['tax_total'],
totals['total'], valid_until, data.get('notes'),
currency, exchange_rate
))
quot_id, created_at = cur.fetchone()
# Insert quotation items
for item in totals['items']:
part_number = item.get('part_number', '')
name = item.get('name', '')
line_subtotal = round(
item['unit_price'] * item['quantity'] * (1 - item['discount_pct'] / 100), 2
)
cur.execute("""
INSERT INTO quotation_items
(quotation_id, inventory_id, part_number, name, quantity,
unit_price, discount_pct, tax_rate, subtotal, currency, exchange_rate)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
""", (
quot_id, item['inventory_id'], part_number, name,
item['quantity'], item['unit_price'], item['discount_pct'],
item['tax_rate'], line_subtotal,
currency, exchange_rate
))
log_action(conn, 'QUOTATION_CREATE', 'quotation', quot_id,
new_value={'total': totals['total'], 'items_count': len(items)})
conn.commit()
cur.close(); conn.close()
return jsonify({
'id': quot_id,
'total': totals['total'],
'valid_until': valid_until,
'created_at': str(created_at),
'message': 'Quotation created'
}), 201
except Exception as e:
conn.rollback()
cur.close(); conn.close()
return jsonify({'error': str(e)}), 500
@pos_bp.route('/quotations', methods=['GET'])
@require_auth('pos.view')
def list_quotations():
"""List quotations with filters.
Query params: customer_id, status (active|converted|expired|cancelled), page, per_page
"""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
page = int(request.args.get('page', 1))
per_page = min(int(request.args.get('per_page', 50)), 200)
where_clauses = ["1=1"]
params = []
customer_id = request.args.get('customer_id')
status = request.args.get('status')
if customer_id:
where_clauses.append("q.customer_id = %s")
params.append(int(customer_id))
if status:
where_clauses.append("q.status = %s")
params.append(status)
if g.branch_id:
# Show both this branch's quotes AND branchless ones (e.g. WhatsApp)
where_clauses.append("(q.branch_id = %s OR q.branch_id IS NULL)")
params.append(g.branch_id)
where = " AND ".join(where_clauses)
cur.execute(f"SELECT count(*) FROM quotations q WHERE {where}", params)
total = cur.fetchone()[0]
cur.execute(f"""
SELECT q.id, q.customer_id, q.employee_id, q.subtotal, q.tax_total,
q.total, q.status, q.valid_until, q.created_at, q.notes,
c.name as customer_name, e.name as employee_name
FROM quotations q
LEFT JOIN customers c ON q.customer_id = c.id
LEFT JOIN employees e ON q.employee_id = e.id
WHERE {where}
ORDER BY q.created_at DESC
LIMIT %s OFFSET %s
""", params + [per_page, (page - 1) * per_page])
quotations = []
for r in cur.fetchall():
notes = r[9] or ''
source = 'whatsapp' if notes.startswith('WA:') else 'pos'
wa_phone = notes.replace('WA:', '') if source == 'whatsapp' else None
quotations.append({
'id': r[0], 'customer_id': r[1], 'employee_id': r[2],
'subtotal': float(r[3]) if r[3] else 0,
'tax_total': float(r[4]) if r[4] else 0,
'total': float(r[5]) if r[5] else 0,
'status': r[6], 'valid_until': str(r[7]) if r[7] else None,
'created_at': str(r[8]),
'customer_name': r[10], 'employee_name': r[11],
'source': source,
'wa_phone': wa_phone,
})
cur.close(); conn.close()
total_pages = (total + per_page - 1) // per_page
return jsonify({
'data': quotations,
'pagination': {'page': page, 'per_page': per_page, 'total': total, 'total_pages': total_pages}
})
@pos_bp.route('/quotations/<int:quot_id>', methods=['DELETE'])
@require_auth('pos.sell')
def delete_quotation(quot_id):
"""Delete a quotation and its items."""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
cur.execute("DELETE FROM quotation_items WHERE quotation_id = %s", (quot_id,))
cur.execute("DELETE FROM quotations WHERE id = %s", (quot_id,))
deleted = cur.rowcount
conn.commit()
cur.close()
conn.close()
if deleted == 0:
return jsonify({'error': 'Cotización no encontrada'}), 404
return jsonify({'ok': True, 'deleted_id': quot_id})
@pos_bp.route('/quotations/<int:quot_id>/print', methods=['POST'])
@require_auth('pos.sell')
def print_quotation_ticket(quot_id):
"""Generate a printable ticket for a quotation (ESC/POS or browser)."""
from flask import Response
from services.thermal_printer import generate_quotation_ticket
body = request.get_json(silent=True) or {}
printer_type = body.get('printer_type', 'escpos_raw')
width = int(body.get('width', 80))
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
cur.execute("""
SELECT q.id, q.subtotal, q.tax_total, q.total, q.valid_until,
q.created_at, q.notes, c.name as customer_name
FROM quotations q
LEFT JOIN customers c ON q.customer_id = c.id
WHERE q.id = %s
""", (quot_id,))
row = cur.fetchone()
if not row:
cur.close(); conn.close()
return jsonify({'error': 'Quotation not found'}), 404
notes = row[6] or ''
wa_phone = notes.replace('WA:', '') if notes.startswith('WA:') else None
cur.execute("""
SELECT part_number, name, quantity, unit_price, subtotal
FROM quotation_items WHERE quotation_id = %s ORDER BY id
""", (quot_id,))
items = [{'part_number': r[0], 'name': r[1], 'quantity': r[2],
'unit_price': float(r[3]) if r[3] else 0,
'subtotal': float(r[4]) if r[4] else 0} for r in cur.fetchall()]
business_info = {'name': 'NEXUS AUTOPARTS', 'rfc': '', 'address': ''}
try:
cur.execute("SELECT key, value FROM tenant_config WHERE key LIKE 'tenant_%'")
for rw in cur.fetchall():
if rw[0] == 'tenant_nombre': business_info['name'] = rw[1]
elif rw[0] == 'tenant_rfc': business_info['rfc'] = rw[1]
elif rw[0] == 'tenant_direccion': business_info['address'] = rw[1]
except Exception:
pass
cur.close(); conn.close()
quote_data = {
'id': row[0],
'subtotal': float(row[1]) if row[1] else 0,
'tax_total': float(row[2]) if row[2] else 0,
'total': float(row[3]) if row[3] else 0,
'valid_until': str(row[4]) if row[4] else None,
'created_at': str(row[5]) if row[5] else '',
'customer_name': row[7] or '',
'wa_phone': wa_phone,
'items': items,
}
if printer_type == 'browser':
return jsonify(quote_data)
raw = generate_quotation_ticket(quote_data, business_info, width=width)
return Response(raw, mimetype='application/octet-stream',
headers={'Content-Disposition': f'attachment; filename=cotizacion_{quot_id}.bin'})
@pos_bp.route('/quotations/print-queue', methods=['GET'])
@require_auth('pos.sell')
def quotation_print_queue():
"""Return quotations that were confirmed via WhatsApp and haven't been
printed yet. The POS browser polls this endpoint and auto-prints.
Returns: {data: [{id, total, customer_name, wa_phone, confirmed_at}]}
"""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
cur.execute("""
SELECT q.id, q.total, q.notes, q.created_at,
c.name as customer_name
FROM quotations q
LEFT JOIN customers c ON q.customer_id = c.id
WHERE q.status = 'converted'
AND q.notes LIKE 'WA:%%'
AND NOT EXISTS (
SELECT 1 FROM tenant_config
WHERE key = 'printed_quote_' || q.id::text
)
ORDER BY q.created_at DESC
LIMIT 10
""")
rows = cur.fetchall()
data = []
for r in rows:
notes = r[2] or ''
data.append({
'id': r[0],
'total': float(r[1]) if r[1] else 0,
'wa_phone': notes.replace('WA:', '') if notes.startswith('WA:') else None,
'created_at': str(r[3]) if r[3] else '',
'customer_name': r[4] or '',
})
cur.close(); conn.close()
return jsonify({'data': data})
@pos_bp.route('/quotations/<int:quot_id>/mark-printed', methods=['POST'])
@require_auth('pos.sell')
def mark_quotation_printed(quot_id):
"""Mark a quotation as printed so it doesn't appear in the print queue again."""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
cur.execute("""
INSERT INTO tenant_config (key, value) VALUES (%s, %s)
ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value
""", (f'printed_quote_{quot_id}', 'true'))
conn.commit()
cur.close(); conn.close()
return jsonify({'ok': True})
@pos_bp.route('/quotations/<int:quot_id>', methods=['GET'])
@require_auth('pos.view')
def get_quotation(quot_id):
"""Get quotation detail with items."""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
cur.execute("""
SELECT q.*, c.name as customer_name, e.name as employee_name
FROM quotations q
LEFT JOIN customers c ON q.customer_id = c.id
LEFT JOIN employees e ON q.employee_id = e.id
WHERE q.id = %s
""", (quot_id,))
row = cur.fetchone()
if not row:
cur.close(); conn.close()
return jsonify({'error': 'Quotation not found'}), 404
cols = [desc[0] for desc in cur.description]
quot = dict(zip(cols, row))
for k in ('subtotal', 'tax_total', 'total'):
if quot.get(k) is not None:
quot[k] = float(quot[k])
if quot.get('created_at'):
quot['created_at'] = str(quot['created_at'])
if quot.get('valid_until'):
quot['valid_until'] = str(quot['valid_until'])
# Get items
cur.execute("""
SELECT id, inventory_id, part_number, name, quantity, unit_price,
discount_pct, tax_rate, subtotal
FROM quotation_items WHERE quotation_id = %s ORDER BY id
""", (quot_id,))
quot['items'] = []
for r in cur.fetchall():
quot['items'].append({
'id': r[0], 'inventory_id': r[1], 'part_number': r[2], 'name': r[3],
'quantity': r[4], 'unit_price': float(r[5]) if r[5] else 0,
'discount_pct': float(r[6]) if r[6] else 0,
'tax_rate': float(r[7]) if r[7] else 0,
'subtotal': float(r[8]) if r[8] else 0,
})
cur.close(); conn.close()
return jsonify(quot)
@pos_bp.route('/quotations/<int:quot_id>/pdf', methods=['GET'])
@require_auth('pos.view')
def get_quotation_pdf(quot_id):
"""Get printable HTML for a quotation (browser print-to-PDF)."""
from services.pdf_generator import generate_quote_html
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
# Get quotation
cur.execute("""
SELECT q.id, q.subtotal, q.tax_total, q.total, q.valid_until,
q.created_at, q.notes, q.customer_id, q.employee_id,
e.name as employee_name
FROM quotations q
LEFT JOIN employees e ON q.employee_id = e.id
WHERE q.id = %s
""", (quot_id,))
row = cur.fetchone()
if not row:
cur.close(); conn.close()
return jsonify({'error': 'Quotation not found'}), 404
cols = [desc[0] for desc in cur.description]
quot = dict(zip(cols, row))
for k in ('subtotal', 'tax_total', 'total'):
if quot.get(k) is not None:
quot[k] = float(quot[k])
if quot.get('created_at'):
quot['created_at'] = str(quot['created_at'])
if quot.get('valid_until'):
quot['valid_until'] = str(quot['valid_until'])
# Get items
cur.execute("""
SELECT part_number, name, quantity, unit_price, discount_pct, tax_rate, subtotal
FROM quotation_items WHERE quotation_id = %s ORDER BY id
""", (quot_id,))
items = []
for r in cur.fetchall():
items.append({
'part_number': r[0], 'name': r[1], 'quantity': r[2],
'unit_price': float(r[3]) if r[3] else 0,
'discount_pct': float(r[4]) if r[4] else 0,
'tax_rate': float(r[5]) if r[5] else 0,
'subtotal': float(r[6]) if r[6] else 0,
})
# Get customer info
customer_info = None
if quot.get('customer_id'):
cur.execute("""
SELECT name, rfc, phone, email FROM customers WHERE id = %s
""", (quot['customer_id'],))
cust = cur.fetchone()
if cust:
customer_info = {'name': cust[0], 'rfc': cust[1], 'phone': cust[2], 'email': cust[3]}
# Get business info from tenant config
business_info = None
try:
cur.execute("SELECT key, value FROM config WHERE key IN ('business_name','rfc','address','phone','email')")
config_rows = cur.fetchall()
if config_rows:
business_info = {r[0]: r[1] for r in config_rows}
business_info['name'] = business_info.pop('business_name', '')
except Exception:
pass # config table may not exist
cur.close(); conn.close()
html = generate_quote_html(quot, items, business_info, customer_info)
return html, 200, {'Content-Type': 'text/html; charset=utf-8'}
@pos_bp.route('/quotations/<int:quot_id>/email', methods=['POST'])
@require_auth('pos.sell')
def email_quotation(quot_id):
"""Send a quotation as HTML email.
Body: {email: str}
"""
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import config
data = request.get_json() or {}
email_to = data.get('email', '').strip()
if not email_to or '@' not in email_to:
return jsonify({'error': 'Valid email address required'}), 400
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
cur.execute("""
SELECT q.id, q.subtotal, q.tax_total, q.total, q.valid_until,
q.notes, q.created_at, c.name as customer_name, e.name as employee_name
FROM quotations q
LEFT JOIN customers c ON q.customer_id = c.id
LEFT JOIN employees e ON q.employee_id = e.id
WHERE q.id = %s
""", (quot_id,))
row = cur.fetchone()
if not row:
cur.close(); conn.close()
return jsonify({'error': 'Quotation not found'}), 404
q_id, subtotal, tax_total, total, valid_until, notes, created_at, cust_name, emp_name = row
cur.execute("""
SELECT part_number, name, quantity, unit_price, discount_pct, tax_rate, subtotal
FROM quotation_items WHERE quotation_id = %s ORDER BY id
""", (quot_id,))
items = cur.fetchall()
cur.close(); conn.close()
# Build HTML email
items_html = ''
for it in items:
items_html += (
f'<tr><td>{it[0]}</td><td>{it[1]}</td><td style="text-align:center">{it[2]}</td>'
f'<td style="text-align:right">${float(it[3]):,.2f}</td>'
f'<td style="text-align:right">${float(it[6]):,.2f}</td></tr>'
)
html_body = f"""
<html><body style="font-family:Arial,sans-serif;color:#333;">
<h2>Cotizacion #{q_id} - Nexus Autoparts</h2>
<p><strong>Cliente:</strong> {cust_name or 'Publico general'}</p>
<p><strong>Vendedor:</strong> {emp_name or '-'}</p>
<p><strong>Fecha:</strong> {created_at}</p>
<p><strong>Vigencia:</strong> {valid_until or 'N/A'}</p>
<table border="1" cellpadding="6" cellspacing="0" style="border-collapse:collapse;width:100%;">
<tr style="background:#f5a623;color:#fff;">
<th>No. Parte</th><th>Descripcion</th><th>Cant.</th><th>P. Unit.</th><th>Subtotal</th>
</tr>
{items_html}
</table>
<p style="text-align:right;margin-top:12px;">
<strong>Subtotal:</strong> ${float(subtotal):,.2f}<br>
<strong>IVA:</strong> ${float(tax_total):,.2f}<br>
<strong style="font-size:1.2em;">Total: ${float(total):,.2f}</strong>
</p>
{f'<p><em>Notas: {notes}</em></p>' if notes else ''}
<p style="color:#888;font-size:12px;">Este es un documento informativo, no tiene validez fiscal.</p>
</body></html>
"""
msg = MIMEMultipart('alternative')
msg['Subject'] = f'Cotizacion #{q_id} - Nexus Autoparts'
msg['From'] = config.SMTP_FROM
msg['To'] = email_to
msg.attach(MIMEText(html_body, 'html'))
if not config.SMTP_USER:
return jsonify({'error': 'SMTP not configured on server'}), 503
try:
with smtplib.SMTP(config.SMTP_HOST, config.SMTP_PORT, timeout=15) as server:
server.starttls()
server.login(config.SMTP_USER, config.SMTP_PASS)
server.sendmail(config.SMTP_FROM, [email_to], msg.as_string())
log_action(get_tenant_conn(g.tenant_id), 'QUOTATION_EMAIL', 'quotation', quot_id,
new_value={'email': email_to})
return jsonify({'message': f'Quotation #{q_id} sent to {email_to}'})
except Exception as e:
return jsonify({'error': f'Failed to send email: {str(e)}'}), 500
@pos_bp.route('/quotations/<int:quot_id>/convert', methods=['POST'])
@require_auth('pos.sell')
def convert_quotation(quot_id):
"""Convert a quotation to a sale. Uses current stock and prices from the quotation.
Body: {
register_id: int,
payment_method: str,
sale_type: str,
amount_paid: float,
payment_details: [{method, amount, reference}]
}
"""
data = request.get_json() or {}
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
# Get quotation (include currency)
cur.execute("SELECT id, customer_id, status, currency, exchange_rate FROM quotations WHERE id = %s", (quot_id,))
quot = cur.fetchone()
if not quot:
cur.close(); conn.close()
return jsonify({'error': 'Quotation not found'}), 404
if quot[2] != 'active':
cur.close(); conn.close()
return jsonify({'error': f'Quotation is {quot[2]}, cannot convert'}), 400
quot_currency = quot[3] or 'MXN'
quot_rate = quot[4] or 1.0
# Get quotation items
cur.execute("""
SELECT inventory_id, quantity, unit_price, discount_pct, tax_rate
FROM quotation_items WHERE quotation_id = %s
""", (quot_id,))
items = []
for r in cur.fetchall():
items.append({
'inventory_id': r[0], 'quantity': r[1], 'unit_price': float(r[2]),
'discount_pct': float(r[3]) if r[3] else 0,
'tax_rate': float(r[4]) if r[4] else 0.16,
})
# Build sale_data (preserve quotation currency)
sale_data = {
'items': items,
'customer_id': quot[1],
'payment_method': data.get('payment_method', 'efectivo'),
'sale_type': data.get('sale_type', 'cash'),
'register_id': data.get('register_id'),
'amount_paid': data.get('amount_paid', 0),
'payment_details': data.get('payment_details', []),
'notes': f'Convertida de cotizacion #{quot_id}',
'currency': quot_currency,
'exchange_rate': quot_rate,
}
try:
sale = process_sale(conn, sale_data)
# Mark quotation as converted
cur.execute("""
UPDATE quotations SET status = 'converted', converted_sale_id = %s
WHERE id = %s
""", (sale['id'], quot_id))
conn.commit()
cur.close(); conn.close()
return jsonify(sale), 201
except ValueError as e:
conn.rollback()
cur.close(); conn.close()
return jsonify({'error': str(e)}), 400
except Exception as e:
conn.rollback()
cur.close(); conn.close()
return jsonify({'error': str(e)}), 500
@pos_bp.route('/quotations/<int:quot_id>/cancel', methods=['PUT'])
@require_auth('pos.sell')
def cancel_quotation(quot_id):
"""Cancel a quotation."""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
cur.execute("SELECT id, status FROM quotations WHERE id = %s", (quot_id,))
quot = cur.fetchone()
if not quot:
cur.close(); conn.close()
return jsonify({'error': 'Quotation not found'}), 404
if quot[1] != 'active':
cur.close(); conn.close()
return jsonify({'error': f'Quotation is already {quot[1]}'}), 400
cur.execute("UPDATE quotations SET status = 'cancelled' WHERE id = %s", (quot_id,))
conn.commit()
cur.close(); conn.close()
return jsonify({'message': 'Quotation cancelled'})
# ─── Layaways (Apartados) ────────────────────────
@pos_bp.route('/layaways', methods=['POST'])
@require_auth('pos.sell')
def create_layaway():
"""Create a layaway. Requires customer_id and partial payment.
Body: {
items: [{inventory_id, quantity, unit_price, discount_pct, tax_rate}],
customer_id: int (required),
initial_payment: float (required, > 0),
payment_method: str,
reference: str,
register_id: int,
expires_days: int (default 30),
notes: str
}
"""
data = request.get_json() or {}
items = data.get('items', [])
customer_id = data.get('customer_id')
initial_payment = float(data.get('initial_payment', 0))
register_id = data.get('register_id')
if not items:
return jsonify({'error': 'No items in layaway'}), 400
if not customer_id:
return jsonify({'error': 'customer_id required for layaway'}), 400
if initial_payment <= 0:
return jsonify({'error': 'Initial payment must be greater than 0'}), 400
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
# Enrich items with inventory data
try:
enriched = _enrich_items(cur, items, customer_id)
except ValueError as e:
cur.close(); conn.close()
return jsonify({'error': str(e)}), 400
# Calculate totals
totals = calculate_totals(enriched)
if initial_payment > totals['total']:
cur.close(); conn.close()
return jsonify({'error': 'Initial payment exceeds total'}), 400
expires_days = int(data.get('expires_days', 30))
expires_at = (date.today() + timedelta(days=expires_days)).isoformat()
try:
# Create layaway record
cur.execute("""
INSERT INTO layaways
(branch_id, customer_id, employee_id, total, amount_paid,
status, expires_at, notes)
VALUES (%s,%s,%s,%s,%s,'active',%s,%s)
RETURNING id, created_at
""", (
g.branch_id, customer_id, g.employee_id,
totals['total'], initial_payment, expires_at, data.get('notes')
))
layaway_id, created_at = cur.fetchone()
# Insert layaway items and reserve stock (table created by migration v1.1_pos_tables.sql)
from services.inventory_engine import record_operation
for item in totals['items']:
part_number = item.get('part_number', '')
name = item.get('name', '')
item_branch_id = item.get('branch_id', g.branch_id)
line_subtotal = round(
item['unit_price'] * item['quantity'] * (1 - item['discount_pct'] / 100), 2
)
cur.execute("""
INSERT INTO layaway_items
(layaway_id, inventory_id, part_number, name, quantity,
unit_price, discount_pct, tax_rate, subtotal)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)
""", (
layaway_id, item['inventory_id'], part_number, name,
item['quantity'], item['unit_price'], item['discount_pct'],
item['tax_rate'], line_subtotal
))
# Reserve stock immediately (negative quantity = stock deduction)
record_operation(
conn, item['inventory_id'], item_branch_id,
operation_type='LAYAWAY_RESERVE',
quantity=-item['quantity'],
notes=f'Apartado #{layaway_id} - reserva'
)
# Record initial payment
cur.execute("""
INSERT INTO layaway_payments
(layaway_id, amount, payment_method, reference, employee_id)
VALUES (%s,%s,%s,%s,%s)
""", (
layaway_id, initial_payment,
data.get('payment_method', 'efectivo'),
data.get('reference'), g.employee_id
))
# Record cash movement on register if cash payment
if register_id and data.get('payment_method', 'efectivo') == 'efectivo':
cur.execute("""
INSERT INTO cash_movements (register_id, type, amount, reason, employee_id)
VALUES (%s, 'in', %s, %s, %s)
""", (register_id, initial_payment, f'Apartado #{layaway_id} - anticipo', g.employee_id))
log_action(conn, 'LAYAWAY_CREATE', 'layaway', layaway_id,
new_value={'total': totals['total'], 'initial_payment': initial_payment,
'customer_id': customer_id})
conn.commit()
cur.close(); conn.close()
return jsonify({
'id': layaway_id,
'total': totals['total'],
'amount_paid': initial_payment,
'remaining': round(totals['total'] - initial_payment, 2),
'expires_at': expires_at,
'created_at': str(created_at),
'message': 'Layaway created'
}), 201
except Exception as e:
conn.rollback()
cur.close(); conn.close()
return jsonify({'error': str(e)}), 500
@pos_bp.route('/layaways', methods=['GET'])
@require_auth('pos.view')
def list_layaways():
"""List layaways. Query params: customer_id, status, page, per_page."""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
page = int(request.args.get('page', 1))
per_page = min(int(request.args.get('per_page', 50)), 200)
where_clauses = ["1=1"]
params = []
customer_id = request.args.get('customer_id')
status = request.args.get('status')
if customer_id:
where_clauses.append("l.customer_id = %s")
params.append(int(customer_id))
if status:
where_clauses.append("l.status = %s")
params.append(status)
if g.branch_id:
where_clauses.append("l.branch_id = %s")
params.append(g.branch_id)
where = " AND ".join(where_clauses)
cur.execute(f"SELECT count(*) FROM layaways l WHERE {where}", params)
total = cur.fetchone()[0]
cur.execute(f"""
SELECT l.id, l.customer_id, l.employee_id, l.total, l.amount_paid,
l.status, l.expires_at, l.created_at,
c.name as customer_name, e.name as employee_name
FROM layaways l
LEFT JOIN customers c ON l.customer_id = c.id
LEFT JOIN employees e ON l.employee_id = e.id
WHERE {where}
ORDER BY l.created_at DESC
LIMIT %s OFFSET %s
""", params + [per_page, (page - 1) * per_page])
layaways = []
for r in cur.fetchall():
layaways.append({
'id': r[0], 'customer_id': r[1], 'employee_id': r[2],
'total': float(r[3]) if r[3] else 0,
'amount_paid': float(r[4]) if r[4] else 0,
'remaining': round(float(r[3] or 0) - float(r[4] or 0), 2),
'status': r[5], 'expires_at': str(r[6]) if r[6] else None,
'created_at': str(r[7]),
'customer_name': r[8], 'employee_name': r[9],
})
cur.close(); conn.close()
total_pages = (total + per_page - 1) // per_page
return jsonify({
'data': layaways,
'pagination': {'page': page, 'per_page': per_page, 'total': total, 'total_pages': total_pages}
})
@pos_bp.route('/layaways/<int:layaway_id>', methods=['GET'])
@require_auth('pos.view')
def get_layaway(layaway_id):
"""Get layaway detail with items and payments."""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
cur.execute("""
SELECT l.*, c.name as customer_name, e.name as employee_name
FROM layaways l
LEFT JOIN customers c ON l.customer_id = c.id
LEFT JOIN employees e ON l.employee_id = e.id
WHERE l.id = %s
""", (layaway_id,))
row = cur.fetchone()
if not row:
cur.close(); conn.close()
return jsonify({'error': 'Layaway not found'}), 404
cols = [desc[0] for desc in cur.description]
layaway = dict(zip(cols, row))
for k in ('total', 'amount_paid'):
if layaway.get(k) is not None:
layaway[k] = float(layaway[k])
layaway['remaining'] = round(layaway['total'] - layaway['amount_paid'], 2)
if layaway.get('created_at'):
layaway['created_at'] = str(layaway['created_at'])
if layaway.get('expires_at'):
layaway['expires_at'] = str(layaway['expires_at'])
# Get items
cur.execute("""
SELECT id, inventory_id, part_number, name, quantity, unit_price,
discount_pct, tax_rate, subtotal
FROM layaway_items WHERE layaway_id = %s ORDER BY id
""", (layaway_id,))
layaway['items'] = []
for r in cur.fetchall():
layaway['items'].append({
'id': r[0], 'inventory_id': r[1], 'part_number': r[2], 'name': r[3],
'quantity': r[4], 'unit_price': float(r[5]) if r[5] else 0,
'discount_pct': float(r[6]) if r[6] else 0,
'tax_rate': float(r[7]) if r[7] else 0,
'subtotal': float(r[8]) if r[8] else 0,
})
# Get payments
cur.execute("""
SELECT id, amount, payment_method, reference, employee_id, created_at
FROM layaway_payments WHERE layaway_id = %s ORDER BY created_at
""", (layaway_id,))
layaway['payments'] = []
for r in cur.fetchall():
layaway['payments'].append({
'id': r[0], 'amount': float(r[1]) if r[1] else 0,
'payment_method': r[2], 'reference': r[3],
'employee_id': r[4], 'created_at': str(r[5]) if r[5] else None,
})
cur.close(); conn.close()
return jsonify(layaway)
@pos_bp.route('/layaways/<int:layaway_id>/payment', methods=['POST'])
@require_auth('pos.sell')
def layaway_payment(layaway_id):
"""Add a payment to a layaway.
Body: {amount, payment_method, reference, register_id}
"""
data = request.get_json() or {}
amount = float(data.get('amount', 0))
if amount <= 0:
return jsonify({'error': 'Amount must be greater than 0'}), 400
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
cur.execute("SELECT total, amount_paid, status FROM layaways WHERE id = %s", (layaway_id,))
row = cur.fetchone()
if not row:
cur.close(); conn.close()
return jsonify({'error': 'Layaway not found'}), 404
total, paid, status = float(row[0]), float(row[1]), row[2]
if status != 'active':
cur.close(); conn.close()
return jsonify({'error': f'Layaway is {status}'}), 400
remaining = round(total - paid, 2)
if amount > remaining:
cur.close(); conn.close()
return jsonify({'error': f'Payment ${amount:.2f} exceeds remaining ${remaining:.2f}'}), 400
try:
# Record payment
cur.execute("""
INSERT INTO layaway_payments
(layaway_id, amount, payment_method, reference, employee_id)
VALUES (%s,%s,%s,%s,%s)
RETURNING id
""", (
layaway_id, amount,
data.get('payment_method', 'efectivo'),
data.get('reference'), g.employee_id
))
payment_id = cur.fetchone()[0]
# Update amount_paid
new_paid = round(paid + amount, 2)
cur.execute("UPDATE layaways SET amount_paid = %s WHERE id = %s", (new_paid, layaway_id))
# Record cash movement if applicable
register_id = data.get('register_id')
if register_id and data.get('payment_method', 'efectivo') == 'efectivo':
cur.execute("""
INSERT INTO cash_movements (register_id, type, amount, reason, employee_id)
VALUES (%s, 'in', %s, %s, %s)
""", (register_id, amount, f'Apartado #{layaway_id} - abono', g.employee_id))
conn.commit()
cur.close(); conn.close()
new_remaining = round(total - new_paid, 2)
return jsonify({
'payment_id': payment_id,
'amount': amount,
'total_paid': new_paid,
'remaining': new_remaining,
'fully_paid': new_remaining <= 0,
'message': 'Payment recorded'
})
except Exception as e:
conn.rollback()
cur.close(); conn.close()
return jsonify({'error': str(e)}), 500
@pos_bp.route('/layaways/<int:layaway_id>/complete', methods=['POST'])
@require_auth('pos.sell')
def complete_layaway(layaway_id):
"""Convert a fully paid layaway to a sale.
Body: {register_id: int}
The layaway must be fully paid (amount_paid >= total).
"""
data = request.get_json() or {}
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
cur.execute("""
SELECT id, customer_id, total, amount_paid, status, branch_id
FROM layaways WHERE id = %s
""", (layaway_id,))
row = cur.fetchone()
if not row:
cur.close(); conn.close()
return jsonify({'error': 'Layaway not found'}), 404
l_id, cust_id, total, paid, status, branch_id = row
total, paid = float(total), float(paid)
if status != 'active':
cur.close(); conn.close()
return jsonify({'error': f'Layaway is {status}'}), 400
remaining = round(total - paid, 2)
try:
# If there's a remaining balance, accept a final payment with the complete call
if remaining > 0:
final_method = data.get('payment_method', 'efectivo')
cur.execute("""
INSERT INTO layaway_payments
(layaway_id, amount, payment_method, reference, employee_id)
VALUES (%s,%s,%s,%s,%s)
""", (layaway_id, remaining, final_method,
data.get('reference', 'Pago final al completar'), g.employee_id))
cur.execute("UPDATE layaways SET amount_paid = total WHERE id = %s", (layaway_id,))
paid = total
# Record cash movement for final payment
register_id = data.get('register_id')
if register_id and final_method == 'efectivo':
cur.execute("""
INSERT INTO cash_movements (register_id, type, amount, reason, employee_id)
VALUES (%s, 'in', %s, %s, %s)
""", (register_id, remaining, f'Apartado #{layaway_id} - pago final', g.employee_id))
# Get layaway items
cur.execute("""
SELECT inventory_id, quantity, unit_price, discount_pct, tax_rate
FROM layaway_items WHERE layaway_id = %s
""", (layaway_id,))
items = []
for r in cur.fetchall():
items.append({
'inventory_id': r[0], 'quantity': r[1],
'unit_price': float(r[2]) if r[2] else 0,
'discount_pct': float(r[3]) if r[3] else 0,
'tax_rate': float(r[4]) if r[4] else 0.16,
})
# Create sale record directly instead of calling process_sale(),
# because stock was already reserved at layaway creation time via
# LAYAWAY_RESERVE operations. Calling process_sale() would deduct
# inventory again (double deduction).
from services.pos_engine import calculate_totals
totals_calc = calculate_totals(items)
cur.execute("""
INSERT INTO sales
(branch_id, customer_id, employee_id, register_id, sale_type,
payment_method, subtotal, discount_total, tax_total, total,
amount_paid, change_given, metodo_pago_sat, forma_pago_sat,
status, notes)
VALUES (%s,%s,%s,%s,'cash','efectivo',%s,%s,%s,%s,%s,0,'PUE','01','completed',%s)
RETURNING id, created_at
""", (
branch_id, cust_id, g.employee_id, data.get('register_id'),
totals_calc['subtotal'], totals_calc['discount_total'],
totals_calc['tax_total'], totals_calc['total'], total,
f'Completado de apartado #{layaway_id}',
))
sale_id, sale_created = cur.fetchone()
# Create sale_items (no inventory deduction — already reserved)
sale_items = []
for item in totals_calc['items']:
cur.execute("SELECT part_number, name, cost FROM inventory WHERE id = %s",
(item['inventory_id'],))
inv = cur.fetchone()
cur.execute("""
INSERT INTO sale_items
(sale_id, inventory_id, part_number, name, quantity,
unit_price, unit_cost, discount_pct, discount_amount,
tax_rate, tax_amount, subtotal)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
""", (
sale_id, item['inventory_id'],
inv[0] if inv else '', inv[1] if inv else '',
item['quantity'], item['unit_price'],
float(inv[2]) if inv and inv[2] else 0,
item['discount_pct'], item['discount_amount'],
item['tax_rate'], item['tax_amount'], item['subtotal']
))
# Record payment on register
register_id = data.get('register_id')
if register_id:
cur.execute("""
INSERT INTO sale_payments
(sale_id, register_id, method, amount, reference)
VALUES (%s,%s,'efectivo',%s,%s)
""", (sale_id, register_id, total, f'Apartado #{layaway_id} completado'))
sale = {
'id': sale_id, 'status': 'completed', 'total': totals_calc['total'],
'created_at': str(sale_created),
}
# Mark layaway as completed
cur.execute("""
UPDATE layaways SET status = 'completed', converted_sale_id = %s
WHERE id = %s
""", (sale['id'], layaway_id))
log_action(conn, 'LAYAWAY_COMPLETE', 'layaway', layaway_id,
new_value={'sale_id': sale['id'], 'total': total})
conn.commit()
cur.close(); conn.close()
return jsonify(sale), 201
except ValueError as e:
conn.rollback()
cur.close(); conn.close()
return jsonify({'error': str(e)}), 400
except Exception as e:
conn.rollback()
cur.close(); conn.close()
return jsonify({'error': str(e)}), 500
@pos_bp.route('/layaways/<int:layaway_id>/cancel', methods=['PUT'])
@require_auth('pos.sell')
def cancel_layaway(layaway_id):
"""Cancel a layaway. Refunds must be handled separately.
Body: {reason: str}
"""
data = request.get_json() or {}
reason = data.get('reason', '').strip()
if not reason or len(reason) < 3:
return jsonify({'error': 'Reason is required (min 3 characters)'}), 400
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
cur.execute("SELECT id, status, amount_paid, total, branch_id FROM layaways WHERE id = %s", (layaway_id,))
row = cur.fetchone()
if not row:
cur.close(); conn.close()
return jsonify({'error': 'Layaway not found'}), 404
if row[1] != 'active':
cur.close(); conn.close()
return jsonify({'error': f'Layaway is already {row[1]}'}), 400
layaway_branch = row[4] or g.branch_id
# Reverse stock reservations (return reserved items to available stock)
from services.inventory_engine import record_operation
cur.execute("""
SELECT inventory_id, quantity FROM layaway_items WHERE layaway_id = %s
""", (layaway_id,))
layaway_items = cur.fetchall()
for inv_id, qty in layaway_items:
# Positive quantity = return stock
record_operation(
conn, inv_id, layaway_branch,
operation_type='LAYAWAY_CANCEL',
quantity=qty,
notes=f'Cancelacion apartado #{layaway_id}: {reason}'
)
cur.execute("""
UPDATE layaways SET status = 'cancelled',
notes = COALESCE(notes || ' | ', '') || %s
WHERE id = %s
""", (f"CANCELADO: {reason}", layaway_id))
log_action(conn, 'LAYAWAY_CANCEL', 'layaway', layaway_id,
old_value={'status': 'active', 'amount_paid': float(row[2])},
new_value={'status': 'cancelled', 'reason': reason,
'items_unreserved': len(layaway_items)})
conn.commit()
cur.close(); conn.close()
return jsonify({
'message': 'Layaway cancelled',
'amount_paid': float(row[2]),
'items_unreserved': len(layaway_items),
'note': 'Stock reservations reversed. Refund of paid amount must be processed separately.'
})
# ─── Returns / Warranty ───────────────────────────
@pos_bp.route('/returns', methods=['POST'])
@require_auth('pos.sell')
def create_return():
"""Process a product return with warranty support.
Body: {
sale_id: int,
items: [{sale_item_id: int, quantity: int, reason: str}],
notes: str
}
"""
data = request.get_json() or {}
sale_id = data.get('sale_id')
items = data.get('items', [])
notes = data.get('notes', '')
if not sale_id:
return jsonify({'error': 'sale_id is required'}), 400
if not items:
return jsonify({'error': 'At least one return item required'}), 400
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
try:
# Validate sale exists and is completed
cur.execute("""
SELECT id, customer_id, total, status, branch_id
FROM sales WHERE id = %s
""", (sale_id,))
sale = cur.fetchone()
if not sale:
return jsonify({'error': 'Sale not found'}), 404
if sale[3] not in ('completed', 'partially_returned'):
return jsonify({'error': f'Cannot return items from a {sale[3]} sale'}), 400
sale_customer_id = sale[1]
sale_branch_id = sale[4] or g.branch_id
# Validate each return item against original sale items
total_refund = 0
validated_items = []
for ri in items:
si_id = ri.get('sale_item_id')
ret_qty = int(ri.get('quantity', 0))
reason = ri.get('reason', '').strip()
if ret_qty <= 0:
raise ValueError(f'Invalid return quantity for sale_item_id {si_id}')
if not reason:
raise ValueError(f'Reason required for sale_item_id {si_id}')
cur.execute("""
SELECT id, inventory_id, quantity, unit_price, discount_pct, tax_rate, subtotal
FROM sale_items WHERE id = %s AND sale_id = %s
""", (si_id, sale_id))
si = cur.fetchone()
if not si:
raise ValueError(f'Sale item {si_id} not found in sale #{sale_id}')
original_qty = si[2]
# Check how much has already been returned for this sale_item
cur.execute("""
SELECT COALESCE(SUM(ri2.quantity), 0)
FROM return_items ri2
JOIN returns r ON ri2.return_id = r.id
WHERE ri2.sale_item_id = %s AND r.status = 'completed'
""", (si_id,))
already_returned = cur.fetchone()[0]
remaining = original_qty - already_returned
if ret_qty > remaining:
raise ValueError(
f'Cannot return {ret_qty} of sale_item {si_id} — only {remaining} remaining'
)
unit_price = float(si[3])
discount_pct = float(si[4]) if si[4] else 0
tax_rate = float(si[5]) if si[5] else 0.16
price_after_discount = unit_price * (1 - discount_pct / 100)
refund_amount = round(ret_qty * price_after_discount * (1 + tax_rate), 2)
total_refund += refund_amount
validated_items.append({
'sale_item_id': si_id,
'inventory_id': si[1],
'quantity': ret_qty,
'unit_price': unit_price,
'refund_amount': refund_amount,
'reason': reason,
})
# Create return record
cur.execute("""
INSERT INTO returns (sale_id, customer_id, employee_id, total_refund, reason, status)
VALUES (%s, %s, %s, %s, %s, 'completed')
RETURNING id
""", (sale_id, sale_customer_id, g.employee_id, total_refund, notes or 'Devolucion'))
return_id = cur.fetchone()[0]
# Create return items and restore inventory
from services.inventory_engine import record_operation
for vi in validated_items:
cur.execute("""
INSERT INTO return_items
(return_id, sale_item_id, inventory_id, quantity, unit_price, refund_amount)
VALUES (%s, %s, %s, %s, %s, %s)
""", (return_id, vi['sale_item_id'], vi['inventory_id'],
vi['quantity'], vi['unit_price'], vi['refund_amount']))
# Return stock to inventory
record_operation(
conn, vi['inventory_id'], sale_branch_id,
operation_type='RETURN',
quantity=vi['quantity'],
notes=f'Devolucion #{return_id} de venta #{sale_id}: {vi["reason"]}'
)
# Update sale status if all items returned
cur.execute("""
SELECT COALESCE(SUM(ri2.quantity), 0), COALESCE(SUM(si2.quantity), 0)
FROM sale_items si2
LEFT JOIN (
SELECT sale_item_id, SUM(quantity) as quantity
FROM return_items ri3
JOIN returns r2 ON ri3.return_id = r2.id
WHERE r2.sale_id = %s AND r2.status = 'completed'
GROUP BY sale_item_id
) ri2 ON ri2.sale_item_id = si2.id
WHERE si2.sale_id = %s
""", (sale_id, sale_id))
returned_total, sold_total = cur.fetchone()
new_status = 'returned' if returned_total >= sold_total else 'partially_returned'
cur.execute("UPDATE sales SET status = %s WHERE id = %s", (new_status, sale_id))
# Update customer credit if applicable
if sale_customer_id:
cur.execute("""
UPDATE customers SET credit_balance = COALESCE(credit_balance, 0) + %s
WHERE id = %s
""", (total_refund, sale_customer_id))
log_action(conn, 'RETURN_CREATE', 'return', return_id,
new_value={
'sale_id': sale_id,
'total_refund': total_refund,
'items_count': len(validated_items),
'sale_status': new_status
})
conn.commit()
cur.close(); conn.close()
return jsonify({
'id': return_id,
'sale_id': sale_id,
'total_refund': total_refund,
'items': validated_items,
'sale_status': new_status,
'message': f'Return #{return_id} created — ${total_refund:,.2f} refund'
}), 201
except ValueError as e:
conn.rollback()
cur.close(); conn.close()
return jsonify({'error': str(e)}), 400
except Exception as e:
conn.rollback()
cur.close(); conn.close()
return jsonify({'error': str(e)}), 500
@pos_bp.route('/returns', methods=['GET'])
@require_auth('pos.view')
def list_returns():
"""List returns with optional filters."""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
page = int(request.args.get('page', 1))
per_page = min(int(request.args.get('per_page', 50)), 200)
where_clauses = ["1=1"]
params = []
sale_id = request.args.get('sale_id')
customer_id = request.args.get('customer_id')
if sale_id:
where_clauses.append("r.sale_id = %s")
params.append(int(sale_id))
if customer_id:
where_clauses.append("r.customer_id = %s")
params.append(int(customer_id))
where = " AND ".join(where_clauses)
cur.execute(f"SELECT count(*) FROM returns r WHERE {where}", params)
total = cur.fetchone()[0]
cur.execute(f"""
SELECT r.id, r.sale_id, r.customer_id, r.employee_id, r.total_refund,
r.reason, r.status, r.created_at,
e.name as employee_name, c.name as customer_name
FROM returns r
LEFT JOIN employees e ON r.employee_id = e.id
LEFT JOIN customers c ON r.customer_id = c.id
WHERE {where}
ORDER BY r.created_at DESC
LIMIT %s OFFSET %s
""", params + [per_page, (page - 1) * per_page])
returns = []
for row in cur.fetchall():
returns.append({
'id': row[0], 'sale_id': row[1], 'customer_id': row[2],
'employee_id': row[3], 'total_refund': float(row[4]) if row[4] else 0,
'reason': row[5], 'status': row[6], 'created_at': str(row[7]),
'employee_name': row[8], 'customer_name': row[9],
})
cur.close(); conn.close()
total_pages = (total + per_page - 1) // per_page
return jsonify({
'data': returns,
'pagination': {'page': page, 'per_page': per_page, 'total': total, 'total_pages': total_pages}
})
# ─── Push Notifications ───────────────────────────
@pos_bp.route('/push/subscribe', methods=['POST'])
@require_auth('pos.view')
def push_subscribe():
"""Save push subscription for current employee.
Body: {subscription: <PushSubscription JSON from browser>}
"""
from services.push_service import save_subscription, ensure_push_table, get_or_create_vapid_keys
data = request.get_json() or {}
subscription = data.get('subscription')
if not subscription:
return jsonify({'error': 'subscription required'}), 400
conn = get_tenant_conn(g.tenant_id)
ensure_push_table(conn)
save_subscription(conn, g.employee_id, subscription)
conn.close()
return jsonify({'message': 'Push subscription saved'})
@pos_bp.route('/push/vapid-key', methods=['GET'])
@require_auth('pos.view')
def push_vapid_key():
"""Get the VAPID public key for push subscription."""
from services.push_service import get_or_create_vapid_keys, ensure_push_table
conn = get_tenant_conn(g.tenant_id)
ensure_push_table(conn)
_, public_key = get_or_create_vapid_keys(conn)
conn.close()
if not public_key:
return jsonify({'error': 'Push not available (pywebpush not installed)'}), 503
return jsonify({'public_key': public_key})
@pos_bp.route('/push/test', methods=['POST'])
@require_auth('pos.view')
def push_test():
"""Send a test push notification to the current employee."""
from services.push_service import send_push, ensure_push_table
conn = get_tenant_conn(g.tenant_id)
ensure_push_table(conn)
ok = send_push(conn, g.employee_id, 'Prueba Nexus POS',
'Las notificaciones push estan funcionando correctamente.', '/pos')
conn.close()
if ok:
return jsonify({'message': 'Test notification sent'})
return jsonify({'error': 'No subscription found or push failed'}), 400
# ─── Thermal Printing ──────────────────────────────
@pos_bp.route('/sales/<int:sale_id>/print', methods=['POST'])
@require_auth('pos.sell')
def print_ticket(sale_id):
"""Generate a printable ticket for a sale.
Body (optional): {printer_type: 'escpos_raw' | 'browser', width: 58 | 80}
- escpos_raw: returns raw ESC/POS bytes (application/octet-stream)
- browser: returns printable HTML fragment (text/html)
"""
from flask import Response
from services.thermal_printer import generate_ticket
body = request.get_json(silent=True) or {}
printer_type = body.get('printer_type', 'escpos_raw')
width = int(body.get('width', 80))
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
# Fetch sale
cur.execute("""
SELECT s.*, e.name as employee_name, c.name as customer_name
FROM sales s
LEFT JOIN employees e ON s.employee_id = e.id
LEFT JOIN customers c ON s.customer_id = c.id
WHERE s.id = %s
""", (sale_id,))
row = cur.fetchone()
if not row:
cur.close(); conn.close()
return jsonify({'error': 'Sale not found'}), 404
cols = [desc[0] for desc in cur.description]
sale = dict(zip(cols, row))
for k in ('subtotal', 'discount_total', 'tax_total', 'total', 'amount_paid', 'change_given'):
if sale.get(k) is not None:
sale[k] = float(sale[k])
# Fetch items
cur.execute("""
SELECT name, quantity, unit_price, subtotal
FROM sale_items WHERE sale_id = %s ORDER BY id
""", (sale_id,))
items = []
for r in cur.fetchall():
items.append({
'name': r[0], 'quantity': r[1],
'unit_price': float(r[2]) if r[2] else 0,
'subtotal': float(r[3]) if r[3] else 0,
})
# Fetch business info from config
business_info = {'name': 'NEXUS AUTOPARTS', 'rfc': '', 'address': ''}
try:
cur.execute("SELECT key, value FROM config WHERE key IN ('business_name','rfc','address')")
for rw in cur.fetchall():
if rw[0] == 'business_name':
business_info['name'] = rw[1]
else:
business_info[rw[0]] = rw[1]
except Exception:
pass
cur.close()
conn.close()
sale_data = {
'folio': f'V-{sale["id"]}',
'date': str(sale.get('created_at', '')),
'employee': sale.get('employee_name', ''),
'customer': sale.get('customer_name', ''),
'items': items,
'subtotal': sale.get('subtotal', 0),
'discount_total': sale.get('discount_total', 0),
'tax_total': sale.get('tax_total', 0),
'total': sale.get('total', 0),
'payment_method': sale.get('payment_method', 'efectivo'),
'amount_paid': sale.get('amount_paid'),
'change_given': sale.get('change_given'),
}
if printer_type == 'browser':
# Return the sale data as JSON for browser-side rendering
return jsonify(sale_data)
# Default: ESC/POS raw bytes
raw = generate_ticket(sale_data, business_info, width=width)
return Response(raw, mimetype='application/octet-stream',
headers={'Content-Disposition': f'attachment; filename=ticket_{sale_id}.bin'})