# /home/Autopartes/pos/blueprints/cashregister_bp.py """Cash register blueprint: open/close register, cash movements, X/Z cuts.""" from datetime import datetime from flask import Blueprint, request, jsonify, g from middleware import require_auth from tenant_db import get_tenant_conn from services.audit import log_action cashregister_bp = Blueprint('cashregister', __name__, url_prefix='/pos/api/register') @cashregister_bp.route('/open', methods=['POST']) @require_auth('pos.sell') def open_register(): """Open a cash register session. Body: {register_number: int, opening_amount: float} Business rules: - An employee can only have one open register at a time - Register number identifies the physical register (1, 2, 3...) """ data = request.get_json() or {} register_number = data.get('register_number') opening_amount = float(data.get('opening_amount', 0)) if not register_number: return jsonify({'error': 'register_number required'}), 400 if opening_amount < 0: return jsonify({'error': 'opening_amount cannot be negative'}), 400 conn = get_tenant_conn(g.tenant_id) cur = conn.cursor() # Check if employee already has an open register cur.execute(""" SELECT id, register_number FROM cash_registers WHERE employee_id = %s AND status = 'open' """, (g.employee_id,)) existing = cur.fetchone() if existing: cur.close(); conn.close() return jsonify({ 'error': f'You already have register #{existing[1]} open (id={existing[0]}). Close it first.' }), 409 # Check if this register number is already open at this branch cur.execute(""" SELECT id, employee_id FROM cash_registers WHERE branch_id = %s AND register_number = %s AND status = 'open' """, (g.branch_id, register_number)) in_use = cur.fetchone() if in_use: cur.close(); conn.close() return jsonify({'error': f'Register #{register_number} is already open by another employee'}), 409 try: cur.execute(""" INSERT INTO cash_registers (branch_id, employee_id, register_number, opening_amount, status) VALUES (%s,%s,%s,%s,'open') RETURNING id, opened_at """, (g.branch_id, g.employee_id, register_number, opening_amount)) reg_id, opened_at = cur.fetchone() log_action(conn, 'REGISTER_OPEN', 'cash_register', reg_id, new_value={'register_number': register_number, 'opening_amount': opening_amount}) conn.commit() cur.close(); conn.close() return jsonify({ 'id': reg_id, 'register_number': register_number, 'opening_amount': opening_amount, 'opened_at': str(opened_at), 'message': f'Register #{register_number} opened' }), 201 except Exception as e: conn.rollback() cur.close(); conn.close() return jsonify({'error': str(e)}), 500 @cashregister_bp.route('/current', methods=['GET']) @require_auth('pos.sell') def current_register(): """Get the current open register for this employee.""" conn = get_tenant_conn(g.tenant_id) cur = conn.cursor() cur.execute(""" SELECT cr.id, cr.register_number, cr.opening_amount, cr.opened_at, cr.branch_id, b.name as branch_name FROM cash_registers cr LEFT JOIN branches b ON cr.branch_id = b.id WHERE cr.employee_id = %s AND cr.status = 'open' """, (g.employee_id,)) row = cur.fetchone() if not row: cur.close(); conn.close() return jsonify({'register': None, 'message': 'No open register'}) register = { 'id': row[0], 'register_number': row[1], 'opening_amount': float(row[2]) if row[2] else 0, 'opened_at': str(row[3]), 'branch_id': row[4], 'branch_name': row[5], } cur.close(); conn.close() return jsonify({'register': register}) @cashregister_bp.route('/movement', methods=['POST']) @require_auth('pos.sell') def cash_movement(): """Record a cash in/out movement with mandatory reason. Body: {type: 'in'|'out', amount: float, reason: str} """ data = request.get_json() or {} mov_type = data.get('type') amount = float(data.get('amount', 0)) reason = data.get('reason', '').strip() if mov_type not in ('in', 'out'): return jsonify({'error': "type must be 'in' or 'out'"}), 400 if amount <= 0: return jsonify({'error': 'amount must be positive'}), 400 if not reason or len(reason) < 3: return jsonify({'error': 'reason required (min 3 characters)'}), 400 conn = get_tenant_conn(g.tenant_id) cur = conn.cursor() # Get employee's open register cur.execute(""" SELECT id FROM cash_registers WHERE employee_id = %s AND status = 'open' """, (g.employee_id,)) reg = cur.fetchone() if not reg: cur.close(); conn.close() return jsonify({'error': 'No open register. Open a register first.'}), 400 register_id = reg[0] try: cur.execute(""" INSERT INTO cash_movements (register_id, type, amount, reason, employee_id) VALUES (%s,%s,%s,%s,%s) RETURNING id, created_at """, (register_id, mov_type, amount, reason, g.employee_id)) mov_id, created_at = cur.fetchone() log_action(conn, f'CASH_{mov_type.upper()}', 'cash_register', register_id, new_value={'movement_id': mov_id, 'type': mov_type, 'amount': amount, 'reason': reason}) conn.commit() cur.close(); conn.close() return jsonify({ 'id': mov_id, 'type': mov_type, 'amount': amount, 'reason': reason, 'created_at': str(created_at), 'message': 'Movement recorded' }), 201 except Exception as e: conn.rollback() cur.close(); conn.close() return jsonify({'error': str(e)}), 500 def _compute_register_summary(conn, register_id): """Compute the register summary for X-cut or Z-cut. Returns a dict with totals by payment method, cash movements, and expected cash amount. """ cur = conn.cursor() # Get register info cur.execute(""" SELECT opening_amount, opened_at, employee_id FROM cash_registers WHERE id = %s """, (register_id,)) reg = cur.fetchone() opening_amount = float(reg[0]) if reg[0] else 0 # Sales totals by payment method cur.execute(""" SELECT payment_method, COUNT(*), COALESCE(SUM(total), 0) FROM sales WHERE register_id = %s AND status = 'completed' GROUP BY payment_method """, (register_id,)) sales_by_method = {} total_sales = 0.0 total_sales_count = 0 for r in cur.fetchall(): method, count, amount = r[0], r[1], float(r[2]) sales_by_method[method] = {'count': count, 'amount': amount} total_sales += amount total_sales_count += count # Cash sales specifically (for expected cash calculation) cash_from_sales = sales_by_method.get('efectivo', {}).get('amount', 0) # Change given (cash out) cur.execute(""" SELECT COALESCE(SUM(change_given), 0) FROM sales WHERE register_id = %s AND status = 'completed' AND payment_method = 'efectivo' """, (register_id,)) change_given = float(cur.fetchone()[0]) # Cash movements cur.execute(""" SELECT type, COALESCE(SUM(amount), 0) FROM cash_movements WHERE register_id = %s GROUP BY type """, (register_id,)) movements = {'in': 0.0, 'out': 0.0} for r in cur.fetchall(): movements[r[0]] = float(r[1]) # Cancelled sales cur.execute(""" SELECT COUNT(*), COALESCE(SUM(total), 0) FROM sales WHERE register_id = %s AND status = 'cancelled' """, (register_id,)) cancelled = cur.fetchone() cancelled_count = cancelled[0] cancelled_amount = float(cancelled[1]) # Expected cash = opening + cash sales - change + cash_in - cash_out expected_cash = round( opening_amount + cash_from_sales - change_given + movements['in'] - movements['out'], 2 ) # Detail of cash movements cur.execute(""" SELECT id, type, amount, reason, created_at FROM cash_movements WHERE register_id = %s ORDER BY created_at """, (register_id,)) movement_detail = [] for r in cur.fetchall(): movement_detail.append({ 'id': r[0], 'type': r[1], 'amount': float(r[2]), 'reason': r[3], 'created_at': str(r[4]) }) cur.close() return { 'opening_amount': opening_amount, 'sales_by_method': sales_by_method, 'total_sales': round(total_sales, 2), 'total_sales_count': total_sales_count, 'cash_from_sales': round(cash_from_sales, 2), 'change_given': round(change_given, 2), 'cash_movements_in': round(movements['in'], 2), 'cash_movements_out': round(movements['out'], 2), 'movement_detail': movement_detail, 'cancelled_count': cancelled_count, 'cancelled_amount': round(cancelled_amount, 2), 'expected_cash': expected_cash, } @cashregister_bp.route('/cut-x', methods=['GET']) @require_auth('pos.sell') def cut_x(): """Partial cut (corte X): read-only summary without closing the register.""" conn = get_tenant_conn(g.tenant_id) cur = conn.cursor() cur.execute(""" SELECT id FROM cash_registers WHERE employee_id = %s AND status = 'open' """, (g.employee_id,)) reg = cur.fetchone() if not reg: cur.close(); conn.close() return jsonify({'error': 'No open register'}), 400 register_id = reg[0] summary = _compute_register_summary(conn, register_id) cur.close(); conn.close() return jsonify({ 'type': 'X', 'register_id': register_id, 'status': 'open', **summary, }) @cashregister_bp.route('/cut-z', methods=['POST']) @require_auth('pos.sell') def cut_z(): """Final cut (corte Z): close the register. Body: {closing_amount: float} (the amount physically counted in the register) """ data = request.get_json() or {} closing_amount = float(data.get('closing_amount', 0)) conn = get_tenant_conn(g.tenant_id) cur = conn.cursor() cur.execute(""" SELECT id FROM cash_registers WHERE employee_id = %s AND status = 'open' """, (g.employee_id,)) reg = cur.fetchone() if not reg: cur.close(); conn.close() return jsonify({'error': 'No open register'}), 400 register_id = reg[0] summary = _compute_register_summary(conn, register_id) expected = summary['expected_cash'] difference = round(closing_amount - expected, 2) try: cur.execute(""" UPDATE cash_registers SET closing_amount = %s, expected_amount = %s, difference = %s, status = 'closed', closed_at = NOW() WHERE id = %s """, (closing_amount, expected, difference, register_id)) log_action(conn, 'REGISTER_CLOSE', 'cash_register', register_id, new_value={ 'closing_amount': closing_amount, 'expected_amount': expected, 'difference': difference, 'total_sales': summary['total_sales'], }) conn.commit() cur.close(); conn.close() return jsonify({ 'type': 'Z', 'register_id': register_id, 'status': 'closed', 'closing_amount': closing_amount, 'expected_amount': expected, 'difference': difference, **summary, }) except Exception as e: conn.rollback() cur.close(); conn.close() return jsonify({'error': str(e)}), 500 @cashregister_bp.route('/history', methods=['GET']) @require_auth('pos.view') def register_history(): """List closed registers with summary. Query params: date_from, date_to, employee_id, page, per_page """ conn = get_tenant_conn(g.tenant_id) cur = conn.cursor() page = int(request.args.get('page', 1)) per_page = min(int(request.args.get('per_page', 50)), 200) where_clauses = ["cr.status = 'closed'"] params = [] if g.branch_id: where_clauses.append("cr.branch_id = %s") params.append(g.branch_id) date_from = request.args.get('date_from') date_to = request.args.get('date_to') employee_id = request.args.get('employee_id') if date_from: where_clauses.append("cr.closed_at >= %s") params.append(date_from) if date_to: where_clauses.append("cr.closed_at < %s::date + interval '1 day'") params.append(date_to) if employee_id: where_clauses.append("cr.employee_id = %s") params.append(int(employee_id)) where = " AND ".join(where_clauses) cur.execute(f"SELECT count(*) FROM cash_registers cr WHERE {where}", params) total = cur.fetchone()[0] cur.execute(f""" SELECT cr.id, cr.register_number, cr.opening_amount, cr.closing_amount, cr.expected_amount, cr.difference, cr.opened_at, cr.closed_at, cr.employee_id, e.name as employee_name FROM cash_registers cr LEFT JOIN employees e ON cr.employee_id = e.id WHERE {where} ORDER BY cr.closed_at DESC LIMIT %s OFFSET %s """, params + [per_page, (page - 1) * per_page]) registers = [] for r in cur.fetchall(): registers.append({ 'id': r[0], 'register_number': r[1], 'opening_amount': float(r[2]) if r[2] else 0, 'closing_amount': float(r[3]) if r[3] else 0, 'expected_amount': float(r[4]) if r[4] else 0, 'difference': float(r[5]) if r[5] else 0, 'opened_at': str(r[6]), 'closed_at': str(r[7]), 'employee_id': r[8], 'employee_name': r[9], }) cur.close(); conn.close() total_pages = (total + per_page - 1) // per_page return jsonify({ 'data': registers, 'pagination': {'page': page, 'per_page': per_page, 'total': total, 'total_pages': total_pages} }) @cashregister_bp.route('/daily-summary', methods=['GET']) @require_auth('pos.view') def daily_summary(): """Consolidated daily summary across all registers for a given date. Query params: date: YYYY-MM-DD (default: today) Returns aggregated totals: total sales, total by payment method, total movements, and per-register breakdown. """ from datetime import date as date_type target_date = request.args.get('date', date_type.today().isoformat()) conn = get_tenant_conn(g.tenant_id) cur = conn.cursor() branch_clause = "" params = [target_date, target_date] if g.branch_id: branch_clause = "AND s.branch_id = %s" params.append(g.branch_id) # Total sales by payment method for the date cur.execute(f""" SELECT s.payment_method, COUNT(*), COALESCE(SUM(s.total), 0) FROM sales s WHERE s.created_at >= %s::date AND s.created_at < %s::date + interval '1 day' AND s.status = 'completed' {branch_clause} GROUP BY s.payment_method """, params) sales_by_method = {} total_sales = 0.0 total_sales_count = 0 for method, count, amount in cur.fetchall(): sales_by_method[method] = {'count': count, 'amount': float(amount)} total_sales += float(amount) total_sales_count += count # Cancelled sales cancel_params = [target_date, target_date] if g.branch_id: cancel_params.append(g.branch_id) cur.execute(f""" SELECT COUNT(*), COALESCE(SUM(s.total), 0) FROM sales s WHERE s.created_at >= %s::date AND s.created_at < %s::date + interval '1 day' AND s.status = 'cancelled' {branch_clause} """, cancel_params) cancelled = cur.fetchone() cancelled_count = cancelled[0] cancelled_amount = float(cancelled[1]) # Cash movements for the date mov_params = [target_date, target_date] mov_branch_clause = "" if g.branch_id: mov_branch_clause = "AND cr.branch_id = %s" mov_params.append(g.branch_id) cur.execute(f""" SELECT cm.type, COUNT(*), COALESCE(SUM(cm.amount), 0) FROM cash_movements cm JOIN cash_registers cr ON cm.register_id = cr.id WHERE cm.created_at >= %s::date AND cm.created_at < %s::date + interval '1 day' {mov_branch_clause} GROUP BY cm.type """, mov_params) movements = {'in': {'count': 0, 'amount': 0.0}, 'out': {'count': 0, 'amount': 0.0}} for mov_type, count, amount in cur.fetchall(): movements[mov_type] = {'count': count, 'amount': float(amount)} # Per-register breakdown reg_params = [target_date, target_date] reg_branch_clause = "" if g.branch_id: reg_branch_clause = "AND cr.branch_id = %s" reg_params.append(g.branch_id) cur.execute(f""" SELECT cr.id, cr.register_number, cr.status, cr.opening_amount, cr.closing_amount, cr.expected_amount, cr.difference, cr.opened_at, cr.closed_at, e.name as employee_name, (SELECT COUNT(*) FROM sales s WHERE s.register_id = cr.id AND s.status = 'completed') as sale_count, (SELECT COALESCE(SUM(s.total), 0) FROM sales s WHERE s.register_id = cr.id AND s.status = 'completed') as sale_total FROM cash_registers cr LEFT JOIN employees e ON cr.employee_id = e.id WHERE cr.opened_at >= %s::date AND cr.opened_at < %s::date + interval '1 day' {reg_branch_clause} ORDER BY cr.opened_at """, reg_params) registers = [] for r in cur.fetchall(): registers.append({ 'id': r[0], 'register_number': r[1], 'status': r[2], 'opening_amount': float(r[3]) if r[3] else 0, 'closing_amount': float(r[4]) if r[4] else 0, 'expected_amount': float(r[5]) if r[5] else 0, 'difference': float(r[6]) if r[6] else 0, 'opened_at': str(r[7]) if r[7] else None, 'closed_at': str(r[8]) if r[8] else None, 'employee_name': r[9], 'sale_count': r[10], 'sale_total': float(r[11]), }) cur.close(); conn.close() return jsonify({ 'date': target_date, 'total_sales': round(total_sales, 2), 'total_sales_count': total_sales_count, 'sales_by_method': sales_by_method, 'cancelled_count': cancelled_count, 'cancelled_amount': round(cancelled_amount, 2), 'movements_in': movements['in'], 'movements_out': movements['out'], 'registers': registers, })