Files
Autoparts-DB/pos/blueprints/cashregister_bp.py
consultoria-as 5d5a2777eb feat(pos): add 3 improvements — Spanish translations, PDF quotes, push notifications
1. Spanish translations for TecDoc catalog (translations.py) applied to
   catalog_service.py and dashboard server.py endpoints
2. Printable quotation HTML endpoint (/pos/api/quotations/<id>/pdf) with
   @media print CSS for clean browser-to-PDF output
3. Web Push notifications to owner/admin on sale cancellation, stock zero,
   and cash register differences > $500. Includes service worker, VAPID
   key management, and subscription endpoints.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 08:05:11 +00:00

579 lines
19 KiB
Python

# /home/Autopartes/pos/blueprints/cashregister_bp.py
"""Cash register blueprint: open/close register, cash movements, X/Z cuts."""
from datetime import datetime
from flask import Blueprint, request, jsonify, g
from middleware import require_auth
from tenant_db import get_tenant_conn
from services.audit import log_action
cashregister_bp = Blueprint('cashregister', __name__, url_prefix='/pos/api/register')
@cashregister_bp.route('/open', methods=['POST'])
@require_auth('pos.sell')
def open_register():
"""Open a cash register session.
Body: {register_number: int, opening_amount: float}
Business rules:
- An employee can only have one open register at a time
- Register number identifies the physical register (1, 2, 3...)
"""
data = request.get_json() or {}
register_number = data.get('register_number')
opening_amount = float(data.get('opening_amount', 0))
if not register_number:
return jsonify({'error': 'register_number required'}), 400
if opening_amount < 0:
return jsonify({'error': 'opening_amount cannot be negative'}), 400
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
# Check if employee already has an open register
cur.execute("""
SELECT id, register_number FROM cash_registers
WHERE employee_id = %s AND status = 'open'
""", (g.employee_id,))
existing = cur.fetchone()
if existing:
cur.close(); conn.close()
return jsonify({
'error': f'You already have register #{existing[1]} open (id={existing[0]}). Close it first.'
}), 409
# Check if this register number is already open at this branch
cur.execute("""
SELECT id, employee_id FROM cash_registers
WHERE branch_id = %s AND register_number = %s AND status = 'open'
""", (g.branch_id, register_number))
in_use = cur.fetchone()
if in_use:
cur.close(); conn.close()
return jsonify({'error': f'Register #{register_number} is already open by another employee'}), 409
try:
cur.execute("""
INSERT INTO cash_registers
(branch_id, employee_id, register_number, opening_amount, status)
VALUES (%s,%s,%s,%s,'open')
RETURNING id, opened_at
""", (g.branch_id, g.employee_id, register_number, opening_amount))
reg_id, opened_at = cur.fetchone()
log_action(conn, 'REGISTER_OPEN', 'cash_register', reg_id,
new_value={'register_number': register_number, 'opening_amount': opening_amount})
conn.commit()
cur.close(); conn.close()
return jsonify({
'id': reg_id,
'register_number': register_number,
'opening_amount': opening_amount,
'opened_at': str(opened_at),
'message': f'Register #{register_number} opened'
}), 201
except Exception as e:
conn.rollback()
cur.close(); conn.close()
return jsonify({'error': str(e)}), 500
@cashregister_bp.route('/current', methods=['GET'])
@require_auth('pos.sell')
def current_register():
"""Get the current open register for this employee."""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
cur.execute("""
SELECT cr.id, cr.register_number, cr.opening_amount, cr.opened_at,
cr.branch_id, b.name as branch_name
FROM cash_registers cr
LEFT JOIN branches b ON cr.branch_id = b.id
WHERE cr.employee_id = %s AND cr.status = 'open'
""", (g.employee_id,))
row = cur.fetchone()
if not row:
cur.close(); conn.close()
return jsonify({'register': None, 'message': 'No open register'})
register = {
'id': row[0], 'register_number': row[1],
'opening_amount': float(row[2]) if row[2] else 0,
'opened_at': str(row[3]),
'branch_id': row[4], 'branch_name': row[5],
}
cur.close(); conn.close()
return jsonify({'register': register})
@cashregister_bp.route('/movement', methods=['POST'])
@require_auth('pos.sell')
def cash_movement():
"""Record a cash in/out movement with mandatory reason.
Body: {type: 'in'|'out', amount: float, reason: str}
"""
data = request.get_json() or {}
mov_type = data.get('type')
amount = float(data.get('amount', 0))
reason = data.get('reason', '').strip()
if mov_type not in ('in', 'out'):
return jsonify({'error': "type must be 'in' or 'out'"}), 400
if amount <= 0:
return jsonify({'error': 'amount must be positive'}), 400
if not reason or len(reason) < 3:
return jsonify({'error': 'reason required (min 3 characters)'}), 400
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
# Get employee's open register
cur.execute("""
SELECT id FROM cash_registers
WHERE employee_id = %s AND status = 'open'
""", (g.employee_id,))
reg = cur.fetchone()
if not reg:
cur.close(); conn.close()
return jsonify({'error': 'No open register. Open a register first.'}), 400
register_id = reg[0]
try:
cur.execute("""
INSERT INTO cash_movements (register_id, type, amount, reason, employee_id)
VALUES (%s,%s,%s,%s,%s)
RETURNING id, created_at
""", (register_id, mov_type, amount, reason, g.employee_id))
mov_id, created_at = cur.fetchone()
log_action(conn, f'CASH_{mov_type.upper()}', 'cash_register', register_id,
new_value={'movement_id': mov_id, 'type': mov_type,
'amount': amount, 'reason': reason})
conn.commit()
cur.close(); conn.close()
return jsonify({
'id': mov_id,
'type': mov_type,
'amount': amount,
'reason': reason,
'created_at': str(created_at),
'message': 'Movement recorded'
}), 201
except Exception as e:
conn.rollback()
cur.close(); conn.close()
return jsonify({'error': str(e)}), 500
def _compute_register_summary(conn, register_id):
"""Compute the register summary for X-cut or Z-cut.
Returns a dict with totals by payment method, cash movements, and expected cash amount.
"""
cur = conn.cursor()
# Get register info
cur.execute("""
SELECT opening_amount, opened_at, employee_id
FROM cash_registers WHERE id = %s
""", (register_id,))
reg = cur.fetchone()
opening_amount = float(reg[0]) if reg[0] else 0
# Sales totals by payment method
cur.execute("""
SELECT payment_method, COUNT(*), COALESCE(SUM(total), 0)
FROM sales
WHERE register_id = %s AND status = 'completed'
GROUP BY payment_method
""", (register_id,))
sales_by_method = {}
total_sales = 0.0
total_sales_count = 0
for r in cur.fetchall():
method, count, amount = r[0], r[1], float(r[2])
sales_by_method[method] = {'count': count, 'amount': amount}
total_sales += amount
total_sales_count += count
# Cash sales specifically (for expected cash calculation)
cash_from_sales = sales_by_method.get('efectivo', {}).get('amount', 0)
# Change given (cash out)
cur.execute("""
SELECT COALESCE(SUM(change_given), 0) FROM sales
WHERE register_id = %s AND status = 'completed' AND payment_method = 'efectivo'
""", (register_id,))
change_given = float(cur.fetchone()[0])
# Cash movements
cur.execute("""
SELECT type, COALESCE(SUM(amount), 0) FROM cash_movements
WHERE register_id = %s GROUP BY type
""", (register_id,))
movements = {'in': 0.0, 'out': 0.0}
for r in cur.fetchall():
movements[r[0]] = float(r[1])
# Cancelled sales
cur.execute("""
SELECT COUNT(*), COALESCE(SUM(total), 0) FROM sales
WHERE register_id = %s AND status = 'cancelled'
""", (register_id,))
cancelled = cur.fetchone()
cancelled_count = cancelled[0]
cancelled_amount = float(cancelled[1])
# Expected cash = opening + cash sales - change + cash_in - cash_out
expected_cash = round(
opening_amount + cash_from_sales - change_given + movements['in'] - movements['out'],
2
)
# Detail of cash movements
cur.execute("""
SELECT id, type, amount, reason, created_at
FROM cash_movements WHERE register_id = %s ORDER BY created_at
""", (register_id,))
movement_detail = []
for r in cur.fetchall():
movement_detail.append({
'id': r[0], 'type': r[1], 'amount': float(r[2]),
'reason': r[3], 'created_at': str(r[4])
})
cur.close()
return {
'opening_amount': opening_amount,
'sales_by_method': sales_by_method,
'total_sales': round(total_sales, 2),
'total_sales_count': total_sales_count,
'cash_from_sales': round(cash_from_sales, 2),
'change_given': round(change_given, 2),
'cash_movements_in': round(movements['in'], 2),
'cash_movements_out': round(movements['out'], 2),
'movement_detail': movement_detail,
'cancelled_count': cancelled_count,
'cancelled_amount': round(cancelled_amount, 2),
'expected_cash': expected_cash,
}
@cashregister_bp.route('/cut-x', methods=['GET'])
@require_auth('pos.sell')
def cut_x():
"""Partial cut (corte X): read-only summary without closing the register."""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
cur.execute("""
SELECT id FROM cash_registers
WHERE employee_id = %s AND status = 'open'
""", (g.employee_id,))
reg = cur.fetchone()
if not reg:
cur.close(); conn.close()
return jsonify({'error': 'No open register'}), 400
register_id = reg[0]
summary = _compute_register_summary(conn, register_id)
cur.close(); conn.close()
return jsonify({
'type': 'X',
'register_id': register_id,
'status': 'open',
**summary,
})
@cashregister_bp.route('/cut-z', methods=['POST'])
@require_auth('pos.sell')
def cut_z():
"""Final cut (corte Z): close the register.
Body: {closing_amount: float} (the amount physically counted in the register)
"""
data = request.get_json() or {}
closing_amount = float(data.get('closing_amount', 0))
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
cur.execute("""
SELECT id FROM cash_registers
WHERE employee_id = %s AND status = 'open'
""", (g.employee_id,))
reg = cur.fetchone()
if not reg:
cur.close(); conn.close()
return jsonify({'error': 'No open register'}), 400
register_id = reg[0]
summary = _compute_register_summary(conn, register_id)
expected = summary['expected_cash']
difference = round(closing_amount - expected, 2)
try:
cur.execute("""
UPDATE cash_registers
SET closing_amount = %s, expected_amount = %s, difference = %s,
status = 'closed', closed_at = NOW()
WHERE id = %s
""", (closing_amount, expected, difference, register_id))
log_action(conn, 'REGISTER_CLOSE', 'cash_register', register_id,
new_value={
'closing_amount': closing_amount,
'expected_amount': expected,
'difference': difference,
'total_sales': summary['total_sales'],
})
conn.commit()
# Push notification to owner if cash difference > $500
if abs(difference) > 500:
try:
from services.push_service import notify_owner
emp_name = getattr(g, 'employee_name', 'Empleado')
notify_owner(
conn,
'Diferencia en Caja',
f'Corte Z caja #{register_id}: diferencia de ${difference:,.2f} ({emp_name})',
'/pos'
)
except Exception:
pass # Push failures never block business logic
cur.close(); conn.close()
return jsonify({
'type': 'Z',
'register_id': register_id,
'status': 'closed',
'closing_amount': closing_amount,
'expected_amount': expected,
'difference': difference,
**summary,
})
except Exception as e:
conn.rollback()
cur.close(); conn.close()
return jsonify({'error': str(e)}), 500
@cashregister_bp.route('/history', methods=['GET'])
@require_auth('pos.view')
def register_history():
"""List closed registers with summary.
Query params: date_from, date_to, employee_id, page, per_page
"""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
page = int(request.args.get('page', 1))
per_page = min(int(request.args.get('per_page', 50)), 200)
where_clauses = ["cr.status = 'closed'"]
params = []
if g.branch_id:
where_clauses.append("cr.branch_id = %s")
params.append(g.branch_id)
date_from = request.args.get('date_from')
date_to = request.args.get('date_to')
employee_id = request.args.get('employee_id')
if date_from:
where_clauses.append("cr.closed_at >= %s")
params.append(date_from)
if date_to:
where_clauses.append("cr.closed_at < %s::date + interval '1 day'")
params.append(date_to)
if employee_id:
where_clauses.append("cr.employee_id = %s")
params.append(int(employee_id))
where = " AND ".join(where_clauses)
cur.execute(f"SELECT count(*) FROM cash_registers cr WHERE {where}", params)
total = cur.fetchone()[0]
cur.execute(f"""
SELECT cr.id, cr.register_number, cr.opening_amount, cr.closing_amount,
cr.expected_amount, cr.difference, cr.opened_at, cr.closed_at,
cr.employee_id, e.name as employee_name
FROM cash_registers cr
LEFT JOIN employees e ON cr.employee_id = e.id
WHERE {where}
ORDER BY cr.closed_at DESC
LIMIT %s OFFSET %s
""", params + [per_page, (page - 1) * per_page])
registers = []
for r in cur.fetchall():
registers.append({
'id': r[0], 'register_number': r[1],
'opening_amount': float(r[2]) if r[2] else 0,
'closing_amount': float(r[3]) if r[3] else 0,
'expected_amount': float(r[4]) if r[4] else 0,
'difference': float(r[5]) if r[5] else 0,
'opened_at': str(r[6]), 'closed_at': str(r[7]),
'employee_id': r[8], 'employee_name': r[9],
})
cur.close(); conn.close()
total_pages = (total + per_page - 1) // per_page
return jsonify({
'data': registers,
'pagination': {'page': page, 'per_page': per_page, 'total': total, 'total_pages': total_pages}
})
@cashregister_bp.route('/daily-summary', methods=['GET'])
@require_auth('pos.view')
def daily_summary():
"""Consolidated daily summary across all registers for a given date.
Query params:
date: YYYY-MM-DD (default: today)
Returns aggregated totals: total sales, total by payment method,
total movements, and per-register breakdown.
"""
from datetime import date as date_type
target_date = request.args.get('date', date_type.today().isoformat())
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
branch_clause = ""
params = [target_date, target_date]
if g.branch_id:
branch_clause = "AND s.branch_id = %s"
params.append(g.branch_id)
# Total sales by payment method for the date
cur.execute(f"""
SELECT s.payment_method, COUNT(*), COALESCE(SUM(s.total), 0)
FROM sales s
WHERE s.created_at >= %s::date
AND s.created_at < %s::date + interval '1 day'
AND s.status = 'completed'
{branch_clause}
GROUP BY s.payment_method
""", params)
sales_by_method = {}
total_sales = 0.0
total_sales_count = 0
for method, count, amount in cur.fetchall():
sales_by_method[method] = {'count': count, 'amount': float(amount)}
total_sales += float(amount)
total_sales_count += count
# Cancelled sales
cancel_params = [target_date, target_date]
if g.branch_id:
cancel_params.append(g.branch_id)
cur.execute(f"""
SELECT COUNT(*), COALESCE(SUM(s.total), 0)
FROM sales s
WHERE s.created_at >= %s::date
AND s.created_at < %s::date + interval '1 day'
AND s.status = 'cancelled'
{branch_clause}
""", cancel_params)
cancelled = cur.fetchone()
cancelled_count = cancelled[0]
cancelled_amount = float(cancelled[1])
# Cash movements for the date
mov_params = [target_date, target_date]
mov_branch_clause = ""
if g.branch_id:
mov_branch_clause = "AND cr.branch_id = %s"
mov_params.append(g.branch_id)
cur.execute(f"""
SELECT cm.type, COUNT(*), COALESCE(SUM(cm.amount), 0)
FROM cash_movements cm
JOIN cash_registers cr ON cm.register_id = cr.id
WHERE cm.created_at >= %s::date
AND cm.created_at < %s::date + interval '1 day'
{mov_branch_clause}
GROUP BY cm.type
""", mov_params)
movements = {'in': {'count': 0, 'amount': 0.0}, 'out': {'count': 0, 'amount': 0.0}}
for mov_type, count, amount in cur.fetchall():
movements[mov_type] = {'count': count, 'amount': float(amount)}
# Per-register breakdown
reg_params = [target_date, target_date]
reg_branch_clause = ""
if g.branch_id:
reg_branch_clause = "AND cr.branch_id = %s"
reg_params.append(g.branch_id)
cur.execute(f"""
SELECT cr.id, cr.register_number, cr.status,
cr.opening_amount, cr.closing_amount, cr.expected_amount,
cr.difference, cr.opened_at, cr.closed_at,
e.name as employee_name,
(SELECT COUNT(*) FROM sales s WHERE s.register_id = cr.id AND s.status = 'completed') as sale_count,
(SELECT COALESCE(SUM(s.total), 0) FROM sales s WHERE s.register_id = cr.id AND s.status = 'completed') as sale_total
FROM cash_registers cr
LEFT JOIN employees e ON cr.employee_id = e.id
WHERE cr.opened_at >= %s::date
AND cr.opened_at < %s::date + interval '1 day'
{reg_branch_clause}
ORDER BY cr.opened_at
""", reg_params)
registers = []
for r in cur.fetchall():
registers.append({
'id': r[0], 'register_number': r[1], 'status': r[2],
'opening_amount': float(r[3]) if r[3] else 0,
'closing_amount': float(r[4]) if r[4] else 0,
'expected_amount': float(r[5]) if r[5] else 0,
'difference': float(r[6]) if r[6] else 0,
'opened_at': str(r[7]) if r[7] else None,
'closed_at': str(r[8]) if r[8] else None,
'employee_name': r[9],
'sale_count': r[10], 'sale_total': float(r[11]),
})
cur.close(); conn.close()
return jsonify({
'date': target_date,
'total_sales': round(total_sales, 2),
'total_sales_count': total_sales_count,
'sales_by_method': sales_by_method,
'cancelled_count': cancelled_count,
'cancelled_amount': round(cancelled_amount, 2),
'movements_in': movements['in'],
'movements_out': movements['out'],
'registers': registers,
})