- Add thermal_printer.py service generating raw ESC/POS bytes for 58mm/80mm printers - Add /pos/api/sales/<id>/print endpoint (escpos_raw or browser mode) - Add printer.js with WebUSB and Web Serial support for direct browser-to-printer - Add thermal print button in ticket modal with connect/print workflow Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1794 lines
63 KiB
Python
1794 lines
63 KiB
Python
# /home/Autopartes/pos/blueprints/pos_bp.py
|
|
"""POS blueprint: sales, quotations, layaways.
|
|
|
|
All sale business logic is in services.pos_engine. This blueprint is the HTTP layer
|
|
that validates input, calls the engine, and returns JSON responses.
|
|
"""
|
|
|
|
import json
|
|
from datetime import datetime, date, timedelta
|
|
from flask import Blueprint, request, jsonify, g
|
|
from middleware import require_auth, has_permission
|
|
from tenant_db import get_tenant_conn
|
|
from services.pos_engine import (
|
|
process_sale, cancel_sale, calculate_totals,
|
|
get_price_for_customer, get_margin_info
|
|
)
|
|
from services.audit import log_action
|
|
|
|
pos_bp = Blueprint('pos', __name__, url_prefix='/pos/api')
|
|
|
|
|
|
def _enrich_items(cur, items, customer_id=None):
|
|
"""Look up inventory data for items that lack unit_price/tax_rate.
|
|
|
|
Returns list of dicts with all fields needed by calculate_totals.
|
|
"""
|
|
enriched = []
|
|
for item in items:
|
|
inv_id = item.get('inventory_id')
|
|
qty = int(item.get('quantity', 1))
|
|
if qty <= 0:
|
|
raise ValueError(f"Invalid quantity for inventory_id {inv_id}")
|
|
|
|
cur.execute("""
|
|
SELECT id, part_number, name, cost, price_1, price_2, price_3,
|
|
tax_rate, branch_id
|
|
FROM inventory WHERE id = %s AND is_active = true
|
|
""", (inv_id,))
|
|
inv = cur.fetchone()
|
|
if not inv:
|
|
raise ValueError(f"Inventory item {inv_id} not found or inactive")
|
|
|
|
# Determine price tier from customer if provided
|
|
price_tier = 1
|
|
if customer_id:
|
|
cur.execute("SELECT price_tier FROM customers WHERE id = %s", (customer_id,))
|
|
cust = cur.fetchone()
|
|
if cust and cust[0]:
|
|
price_tier = int(cust[0])
|
|
|
|
# price_1=inv[4], price_2=inv[5], price_3=inv[6]
|
|
tier_prices = {1: inv[4], 2: inv[5], 3: inv[6]}
|
|
default_price = float(tier_prices.get(price_tier, inv[4]) or inv[4])
|
|
|
|
unit_price = float(item.get('unit_price', default_price))
|
|
discount_pct = float(item.get('discount_pct', 0))
|
|
tax_rate = float(item.get('tax_rate', inv[7] or 0.16))
|
|
|
|
enriched.append({
|
|
'inventory_id': inv_id,
|
|
'part_number': inv[1],
|
|
'name': inv[2],
|
|
'quantity': qty,
|
|
'unit_price': unit_price,
|
|
'unit_cost': float(inv[3]) if inv[3] else 0,
|
|
'discount_pct': discount_pct,
|
|
'tax_rate': tax_rate,
|
|
'branch_id': inv[8],
|
|
})
|
|
return enriched
|
|
|
|
|
|
# ─── Sales ───────────────────────────────────────
|
|
|
|
@pos_bp.route('/sales', methods=['POST'])
|
|
@require_auth('pos.sell')
|
|
def create_sale():
|
|
"""Create a new sale.
|
|
|
|
Body: {
|
|
items: [{inventory_id, quantity, unit_price, discount_pct, tax_rate}],
|
|
customer_id: int | null,
|
|
payment_method: 'efectivo' | 'transferencia' | 'tarjeta' | 'mixto',
|
|
sale_type: 'cash' | 'credit' | 'mixed',
|
|
register_id: int,
|
|
amount_paid: float,
|
|
payment_details: [{method, amount, reference}], (for mixed payments)
|
|
notes: str
|
|
}
|
|
"""
|
|
data = request.get_json() or {}
|
|
conn = get_tenant_conn(g.tenant_id)
|
|
|
|
try:
|
|
sale = process_sale(conn, data)
|
|
conn.commit()
|
|
conn.close()
|
|
return jsonify(sale), 201
|
|
except ValueError as e:
|
|
conn.rollback()
|
|
conn.close()
|
|
return jsonify({'error': str(e)}), 400
|
|
except Exception as e:
|
|
conn.rollback()
|
|
conn.close()
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@pos_bp.route('/sales', methods=['GET'])
|
|
@require_auth('pos.view')
|
|
def list_sales():
|
|
"""List sales with filters.
|
|
|
|
Query params:
|
|
date_from: YYYY-MM-DD
|
|
date_to: YYYY-MM-DD
|
|
employee_id: int
|
|
customer_id: int
|
|
status: completed | cancelled | returned
|
|
register_id: int
|
|
page: int (default 1)
|
|
per_page: int (default 50, max 200)
|
|
"""
|
|
conn = get_tenant_conn(g.tenant_id)
|
|
cur = conn.cursor()
|
|
|
|
page = int(request.args.get('page', 1))
|
|
per_page = min(int(request.args.get('per_page', 50)), 200)
|
|
|
|
where_clauses = ["1=1"]
|
|
params = []
|
|
|
|
date_from = request.args.get('date_from')
|
|
date_to = request.args.get('date_to')
|
|
employee_id = request.args.get('employee_id')
|
|
customer_id = request.args.get('customer_id')
|
|
status = request.args.get('status')
|
|
register_id = request.args.get('register_id')
|
|
|
|
if date_from:
|
|
where_clauses.append("s.created_at >= %s")
|
|
params.append(date_from)
|
|
if date_to:
|
|
where_clauses.append("s.created_at < %s::date + interval '1 day'")
|
|
params.append(date_to)
|
|
if employee_id:
|
|
where_clauses.append("s.employee_id = %s")
|
|
params.append(int(employee_id))
|
|
if customer_id:
|
|
where_clauses.append("s.customer_id = %s")
|
|
params.append(int(customer_id))
|
|
if status:
|
|
where_clauses.append("s.status = %s")
|
|
params.append(status)
|
|
if register_id:
|
|
where_clauses.append("s.register_id = %s")
|
|
params.append(int(register_id))
|
|
|
|
# Default to current branch
|
|
if g.branch_id:
|
|
where_clauses.append("s.branch_id = %s")
|
|
params.append(g.branch_id)
|
|
|
|
where = " AND ".join(where_clauses)
|
|
|
|
cur.execute(f"SELECT count(*) FROM sales s WHERE {where}", params)
|
|
total = cur.fetchone()[0]
|
|
|
|
cur.execute(f"""
|
|
SELECT s.id, s.branch_id, s.customer_id, s.employee_id, s.register_id,
|
|
s.sale_type, s.payment_method, s.subtotal, s.discount_total,
|
|
s.tax_total, s.total, s.amount_paid, s.change_given,
|
|
s.status, s.created_at,
|
|
e.name as employee_name,
|
|
c.name as customer_name
|
|
FROM sales s
|
|
LEFT JOIN employees e ON s.employee_id = e.id
|
|
LEFT JOIN customers c ON s.customer_id = c.id
|
|
WHERE {where}
|
|
ORDER BY s.created_at DESC
|
|
LIMIT %s OFFSET %s
|
|
""", params + [per_page, (page - 1) * per_page])
|
|
|
|
sales = []
|
|
for r in cur.fetchall():
|
|
sales.append({
|
|
'id': r[0], 'branch_id': r[1], 'customer_id': r[2],
|
|
'employee_id': r[3], 'register_id': r[4],
|
|
'sale_type': r[5], 'payment_method': r[6],
|
|
'subtotal': float(r[7]) if r[7] else 0,
|
|
'discount_total': float(r[8]) if r[8] else 0,
|
|
'tax_total': float(r[9]) if r[9] else 0,
|
|
'total': float(r[10]) if r[10] else 0,
|
|
'amount_paid': float(r[11]) if r[11] else 0,
|
|
'change_given': float(r[12]) if r[12] else 0,
|
|
'status': r[13], 'created_at': str(r[14]),
|
|
'employee_name': r[15], 'customer_name': r[16],
|
|
})
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
total_pages = (total + per_page - 1) // per_page
|
|
return jsonify({
|
|
'data': sales,
|
|
'pagination': {'page': page, 'per_page': per_page, 'total': total, 'total_pages': total_pages}
|
|
})
|
|
|
|
|
|
@pos_bp.route('/sales/<int:sale_id>', methods=['GET'])
|
|
@require_auth('pos.view')
|
|
def get_sale(sale_id):
|
|
"""Get sale detail with items."""
|
|
conn = get_tenant_conn(g.tenant_id)
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT s.*, e.name as employee_name, c.name as customer_name
|
|
FROM sales s
|
|
LEFT JOIN employees e ON s.employee_id = e.id
|
|
LEFT JOIN customers c ON s.customer_id = c.id
|
|
WHERE s.id = %s
|
|
""", (sale_id,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': 'Sale not found'}), 404
|
|
|
|
cols = [desc[0] for desc in cur.description]
|
|
sale = dict(zip(cols, row))
|
|
# Convert Decimal fields
|
|
for k in ('subtotal', 'discount_total', 'tax_total', 'total', 'amount_paid', 'change_given'):
|
|
if sale.get(k) is not None:
|
|
sale[k] = float(sale[k])
|
|
if sale.get('created_at'):
|
|
sale['created_at'] = str(sale['created_at'])
|
|
|
|
# Get items
|
|
cur.execute("""
|
|
SELECT id, inventory_id, part_number, name, quantity, unit_price,
|
|
unit_cost, discount_pct, discount_amount, tax_rate, tax_amount, subtotal,
|
|
clave_prod_serv, clave_unidad
|
|
FROM sale_items WHERE sale_id = %s ORDER BY id
|
|
""", (sale_id,))
|
|
items = []
|
|
for r in cur.fetchall():
|
|
items.append({
|
|
'id': r[0], 'inventory_id': r[1], 'part_number': r[2], 'name': r[3],
|
|
'quantity': r[4], 'unit_price': float(r[5]) if r[5] else 0,
|
|
'unit_cost': float(r[6]) if r[6] else 0,
|
|
'discount_pct': float(r[7]) if r[7] else 0,
|
|
'discount_amount': float(r[8]) if r[8] else 0,
|
|
'tax_rate': float(r[9]) if r[9] else 0,
|
|
'tax_amount': float(r[10]) if r[10] else 0,
|
|
'subtotal': float(r[11]) if r[11] else 0,
|
|
'clave_prod_serv': r[12], 'clave_unidad': r[13],
|
|
})
|
|
sale['items'] = items
|
|
|
|
# Get payments
|
|
cur.execute("""
|
|
SELECT id, method, amount, reference, created_at
|
|
FROM sale_payments WHERE sale_id = %s ORDER BY id
|
|
""", (sale_id,))
|
|
payments = []
|
|
for r in cur.fetchall():
|
|
payments.append({
|
|
'id': r[0], 'method': r[1], 'amount': float(r[2]) if r[2] else 0,
|
|
'reference': r[3], 'created_at': str(r[4]) if r[4] else None,
|
|
})
|
|
sale['payments'] = payments
|
|
|
|
cur.close()
|
|
conn.close()
|
|
return jsonify(sale)
|
|
|
|
|
|
@pos_bp.route('/sales/<int:sale_id>/cancel', methods=['PUT'])
|
|
@require_auth('pos.sell')
|
|
def api_cancel_sale(sale_id):
|
|
"""Cancel a sale. Requires mandatory reason.
|
|
|
|
Body: {reason: str}
|
|
"""
|
|
data = request.get_json() or {}
|
|
reason = data.get('reason', '').strip()
|
|
|
|
conn = get_tenant_conn(g.tenant_id)
|
|
try:
|
|
result = cancel_sale(conn, sale_id, reason)
|
|
conn.commit()
|
|
conn.close()
|
|
return jsonify(result)
|
|
except ValueError as e:
|
|
conn.rollback()
|
|
conn.close()
|
|
return jsonify({'error': str(e)}), 400
|
|
except Exception as e:
|
|
conn.rollback()
|
|
conn.close()
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@pos_bp.route('/sales/last', methods=['GET'])
|
|
@require_auth('pos.view')
|
|
def get_last_sale():
|
|
"""Get the last sale for the current employee."""
|
|
conn = get_tenant_conn(g.tenant_id)
|
|
cur = conn.cursor()
|
|
|
|
where_clauses = ["s.employee_id = %s"]
|
|
params = [g.employee_id]
|
|
|
|
if g.branch_id:
|
|
where_clauses.append("s.branch_id = %s")
|
|
params.append(g.branch_id)
|
|
|
|
where = " AND ".join(where_clauses)
|
|
|
|
cur.execute(f"""
|
|
SELECT s.id, s.branch_id, s.customer_id, s.employee_id, s.register_id,
|
|
s.sale_type, s.payment_method, s.subtotal, s.discount_total,
|
|
s.tax_total, s.total, s.amount_paid, s.change_given,
|
|
s.status, s.created_at,
|
|
e.name as employee_name,
|
|
c.name as customer_name
|
|
FROM sales s
|
|
LEFT JOIN employees e ON s.employee_id = e.id
|
|
LEFT JOIN customers c ON s.customer_id = c.id
|
|
WHERE {where}
|
|
ORDER BY s.created_at DESC
|
|
LIMIT 1
|
|
""", params)
|
|
|
|
row = cur.fetchone()
|
|
if not row:
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': 'No sales found'}), 404
|
|
|
|
sale = {
|
|
'id': row[0], 'branch_id': row[1], 'customer_id': row[2],
|
|
'employee_id': row[3], 'register_id': row[4],
|
|
'sale_type': row[5], 'payment_method': row[6],
|
|
'subtotal': float(row[7]) if row[7] else 0,
|
|
'discount_total': float(row[8]) if row[8] else 0,
|
|
'tax_total': float(row[9]) if row[9] else 0,
|
|
'total': float(row[10]) if row[10] else 0,
|
|
'amount_paid': float(row[11]) if row[11] else 0,
|
|
'change_given': float(row[12]) if row[12] else 0,
|
|
'status': row[13], 'created_at': str(row[14]),
|
|
'employee_name': row[15], 'customer_name': row[16],
|
|
}
|
|
|
|
# Get items
|
|
cur.execute("""
|
|
SELECT id, inventory_id, part_number, name, quantity, unit_price,
|
|
unit_cost, discount_pct, discount_amount, tax_rate, tax_amount, subtotal,
|
|
clave_prod_serv, clave_unidad
|
|
FROM sale_items WHERE sale_id = %s ORDER BY id
|
|
""", (sale['id'],))
|
|
items = []
|
|
for r in cur.fetchall():
|
|
items.append({
|
|
'id': r[0], 'inventory_id': r[1], 'part_number': r[2], 'name': r[3],
|
|
'quantity': r[4], 'unit_price': float(r[5]) if r[5] else 0,
|
|
'unit_cost': float(r[6]) if r[6] else 0,
|
|
'discount_pct': float(r[7]) if r[7] else 0,
|
|
'discount_amount': float(r[8]) if r[8] else 0,
|
|
'tax_rate': float(r[9]) if r[9] else 0,
|
|
'tax_amount': float(r[10]) if r[10] else 0,
|
|
'subtotal': float(r[11]) if r[11] else 0,
|
|
'clave_prod_serv': r[12], 'clave_unidad': r[13],
|
|
})
|
|
sale['items'] = items
|
|
|
|
# Get payments
|
|
cur.execute("""
|
|
SELECT id, method, amount, reference, created_at
|
|
FROM sale_payments WHERE sale_id = %s ORDER BY id
|
|
""", (sale['id'],))
|
|
payments = []
|
|
for r in cur.fetchall():
|
|
payments.append({
|
|
'id': r[0], 'method': r[1], 'amount': float(r[2]) if r[2] else 0,
|
|
'reference': r[3], 'created_at': str(r[4]) if r[4] else None,
|
|
})
|
|
sale['payments'] = payments
|
|
|
|
cur.close()
|
|
conn.close()
|
|
return jsonify(sale)
|
|
|
|
|
|
# ─── Quotations ──────────────────────────────────
|
|
|
|
@pos_bp.route('/quotations', methods=['POST'])
|
|
@require_auth('pos.sell')
|
|
def create_quotation():
|
|
"""Save a quotation from current cart.
|
|
|
|
Body: {
|
|
items: [{inventory_id, quantity, unit_price, discount_pct, tax_rate}],
|
|
customer_id: int | null,
|
|
valid_days: int (default 7),
|
|
notes: str
|
|
}
|
|
"""
|
|
data = request.get_json() or {}
|
|
items = data.get('items', [])
|
|
if not items:
|
|
return jsonify({'error': 'No items in quotation'}), 400
|
|
|
|
conn = get_tenant_conn(g.tenant_id)
|
|
cur = conn.cursor()
|
|
|
|
# Enrich items with inventory data (price, tax, etc.)
|
|
try:
|
|
enriched = _enrich_items(cur, items, data.get('customer_id'))
|
|
except ValueError as e:
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': str(e)}), 400
|
|
|
|
# Calculate totals
|
|
totals = calculate_totals(enriched)
|
|
|
|
valid_days = int(data.get('valid_days', 7))
|
|
valid_until = (date.today() + timedelta(days=valid_days)).isoformat()
|
|
|
|
try:
|
|
cur.execute("""
|
|
INSERT INTO quotations
|
|
(branch_id, customer_id, employee_id, subtotal,
|
|
tax_total, total, status, valid_until, notes)
|
|
VALUES (%s,%s,%s,%s,%s,%s,'active',%s,%s)
|
|
RETURNING id, created_at
|
|
""", (
|
|
g.branch_id, data.get('customer_id'), g.employee_id,
|
|
totals['subtotal'], totals['tax_total'],
|
|
totals['total'], valid_until, data.get('notes')
|
|
))
|
|
quot_id, created_at = cur.fetchone()
|
|
|
|
# Insert quotation items
|
|
for item in totals['items']:
|
|
part_number = item.get('part_number', '')
|
|
name = item.get('name', '')
|
|
|
|
line_subtotal = round(
|
|
item['unit_price'] * item['quantity'] * (1 - item['discount_pct'] / 100), 2
|
|
)
|
|
|
|
cur.execute("""
|
|
INSERT INTO quotation_items
|
|
(quotation_id, inventory_id, part_number, name, quantity,
|
|
unit_price, discount_pct, tax_rate, subtotal)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)
|
|
""", (
|
|
quot_id, item['inventory_id'], part_number, name,
|
|
item['quantity'], item['unit_price'], item['discount_pct'],
|
|
item['tax_rate'], line_subtotal
|
|
))
|
|
|
|
log_action(conn, 'QUOTATION_CREATE', 'quotation', quot_id,
|
|
new_value={'total': totals['total'], 'items_count': len(items)})
|
|
|
|
conn.commit()
|
|
cur.close(); conn.close()
|
|
return jsonify({
|
|
'id': quot_id,
|
|
'total': totals['total'],
|
|
'valid_until': valid_until,
|
|
'created_at': str(created_at),
|
|
'message': 'Quotation created'
|
|
}), 201
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@pos_bp.route('/quotations', methods=['GET'])
|
|
@require_auth('pos.view')
|
|
def list_quotations():
|
|
"""List quotations with filters.
|
|
|
|
Query params: customer_id, status (active|converted|expired|cancelled), page, per_page
|
|
"""
|
|
conn = get_tenant_conn(g.tenant_id)
|
|
cur = conn.cursor()
|
|
|
|
page = int(request.args.get('page', 1))
|
|
per_page = min(int(request.args.get('per_page', 50)), 200)
|
|
|
|
where_clauses = ["1=1"]
|
|
params = []
|
|
|
|
customer_id = request.args.get('customer_id')
|
|
status = request.args.get('status')
|
|
|
|
if customer_id:
|
|
where_clauses.append("q.customer_id = %s")
|
|
params.append(int(customer_id))
|
|
if status:
|
|
where_clauses.append("q.status = %s")
|
|
params.append(status)
|
|
if g.branch_id:
|
|
where_clauses.append("q.branch_id = %s")
|
|
params.append(g.branch_id)
|
|
|
|
where = " AND ".join(where_clauses)
|
|
|
|
cur.execute(f"SELECT count(*) FROM quotations q WHERE {where}", params)
|
|
total = cur.fetchone()[0]
|
|
|
|
cur.execute(f"""
|
|
SELECT q.id, q.customer_id, q.employee_id, q.subtotal, q.tax_total,
|
|
q.total, q.status, q.valid_until, q.created_at,
|
|
c.name as customer_name, e.name as employee_name
|
|
FROM quotations q
|
|
LEFT JOIN customers c ON q.customer_id = c.id
|
|
LEFT JOIN employees e ON q.employee_id = e.id
|
|
WHERE {where}
|
|
ORDER BY q.created_at DESC
|
|
LIMIT %s OFFSET %s
|
|
""", params + [per_page, (page - 1) * per_page])
|
|
|
|
quotations = []
|
|
for r in cur.fetchall():
|
|
quotations.append({
|
|
'id': r[0], 'customer_id': r[1], 'employee_id': r[2],
|
|
'subtotal': float(r[3]) if r[3] else 0,
|
|
'tax_total': float(r[4]) if r[4] else 0,
|
|
'total': float(r[5]) if r[5] else 0,
|
|
'status': r[6], 'valid_until': str(r[7]) if r[7] else None,
|
|
'created_at': str(r[8]),
|
|
'customer_name': r[9], 'employee_name': r[10],
|
|
})
|
|
|
|
cur.close(); conn.close()
|
|
|
|
total_pages = (total + per_page - 1) // per_page
|
|
return jsonify({
|
|
'data': quotations,
|
|
'pagination': {'page': page, 'per_page': per_page, 'total': total, 'total_pages': total_pages}
|
|
})
|
|
|
|
|
|
@pos_bp.route('/quotations/<int:quot_id>', methods=['GET'])
|
|
@require_auth('pos.view')
|
|
def get_quotation(quot_id):
|
|
"""Get quotation detail with items."""
|
|
conn = get_tenant_conn(g.tenant_id)
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT q.*, c.name as customer_name, e.name as employee_name
|
|
FROM quotations q
|
|
LEFT JOIN customers c ON q.customer_id = c.id
|
|
LEFT JOIN employees e ON q.employee_id = e.id
|
|
WHERE q.id = %s
|
|
""", (quot_id,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': 'Quotation not found'}), 404
|
|
|
|
cols = [desc[0] for desc in cur.description]
|
|
quot = dict(zip(cols, row))
|
|
for k in ('subtotal', 'tax_total', 'total'):
|
|
if quot.get(k) is not None:
|
|
quot[k] = float(quot[k])
|
|
if quot.get('created_at'):
|
|
quot['created_at'] = str(quot['created_at'])
|
|
if quot.get('valid_until'):
|
|
quot['valid_until'] = str(quot['valid_until'])
|
|
|
|
# Get items
|
|
cur.execute("""
|
|
SELECT id, inventory_id, part_number, name, quantity, unit_price,
|
|
discount_pct, tax_rate, subtotal
|
|
FROM quotation_items WHERE quotation_id = %s ORDER BY id
|
|
""", (quot_id,))
|
|
quot['items'] = []
|
|
for r in cur.fetchall():
|
|
quot['items'].append({
|
|
'id': r[0], 'inventory_id': r[1], 'part_number': r[2], 'name': r[3],
|
|
'quantity': r[4], 'unit_price': float(r[5]) if r[5] else 0,
|
|
'discount_pct': float(r[6]) if r[6] else 0,
|
|
'tax_rate': float(r[7]) if r[7] else 0,
|
|
'subtotal': float(r[8]) if r[8] else 0,
|
|
})
|
|
|
|
cur.close(); conn.close()
|
|
return jsonify(quot)
|
|
|
|
|
|
@pos_bp.route('/quotations/<int:quot_id>/pdf', methods=['GET'])
|
|
@require_auth('pos.view')
|
|
def get_quotation_pdf(quot_id):
|
|
"""Get printable HTML for a quotation (browser print-to-PDF)."""
|
|
from services.pdf_generator import generate_quote_html
|
|
|
|
conn = get_tenant_conn(g.tenant_id)
|
|
cur = conn.cursor()
|
|
|
|
# Get quotation
|
|
cur.execute("""
|
|
SELECT q.id, q.subtotal, q.tax_total, q.total, q.valid_until,
|
|
q.created_at, q.notes, q.customer_id, q.employee_id,
|
|
e.name as employee_name
|
|
FROM quotations q
|
|
LEFT JOIN employees e ON q.employee_id = e.id
|
|
WHERE q.id = %s
|
|
""", (quot_id,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': 'Quotation not found'}), 404
|
|
|
|
cols = [desc[0] for desc in cur.description]
|
|
quot = dict(zip(cols, row))
|
|
for k in ('subtotal', 'tax_total', 'total'):
|
|
if quot.get(k) is not None:
|
|
quot[k] = float(quot[k])
|
|
if quot.get('created_at'):
|
|
quot['created_at'] = str(quot['created_at'])
|
|
if quot.get('valid_until'):
|
|
quot['valid_until'] = str(quot['valid_until'])
|
|
|
|
# Get items
|
|
cur.execute("""
|
|
SELECT part_number, name, quantity, unit_price, discount_pct, tax_rate, subtotal
|
|
FROM quotation_items WHERE quotation_id = %s ORDER BY id
|
|
""", (quot_id,))
|
|
items = []
|
|
for r in cur.fetchall():
|
|
items.append({
|
|
'part_number': r[0], 'name': r[1], 'quantity': r[2],
|
|
'unit_price': float(r[3]) if r[3] else 0,
|
|
'discount_pct': float(r[4]) if r[4] else 0,
|
|
'tax_rate': float(r[5]) if r[5] else 0,
|
|
'subtotal': float(r[6]) if r[6] else 0,
|
|
})
|
|
|
|
# Get customer info
|
|
customer_info = None
|
|
if quot.get('customer_id'):
|
|
cur.execute("""
|
|
SELECT name, rfc, phone, email FROM customers WHERE id = %s
|
|
""", (quot['customer_id'],))
|
|
cust = cur.fetchone()
|
|
if cust:
|
|
customer_info = {'name': cust[0], 'rfc': cust[1], 'phone': cust[2], 'email': cust[3]}
|
|
|
|
# Get business info from tenant config
|
|
business_info = None
|
|
try:
|
|
cur.execute("SELECT key, value FROM config WHERE key IN ('business_name','rfc','address','phone','email')")
|
|
config_rows = cur.fetchall()
|
|
if config_rows:
|
|
business_info = {r[0]: r[1] for r in config_rows}
|
|
business_info['name'] = business_info.pop('business_name', '')
|
|
except Exception:
|
|
pass # config table may not exist
|
|
|
|
cur.close(); conn.close()
|
|
|
|
html = generate_quote_html(quot, items, business_info, customer_info)
|
|
return html, 200, {'Content-Type': 'text/html; charset=utf-8'}
|
|
|
|
|
|
@pos_bp.route('/quotations/<int:quot_id>/email', methods=['POST'])
|
|
@require_auth('pos.sell')
|
|
def email_quotation(quot_id):
|
|
"""Send a quotation as HTML email.
|
|
|
|
Body: {email: str}
|
|
"""
|
|
import smtplib
|
|
from email.mime.text import MIMEText
|
|
from email.mime.multipart import MIMEMultipart
|
|
import config
|
|
|
|
data = request.get_json() or {}
|
|
email_to = data.get('email', '').strip()
|
|
if not email_to or '@' not in email_to:
|
|
return jsonify({'error': 'Valid email address required'}), 400
|
|
|
|
conn = get_tenant_conn(g.tenant_id)
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT q.id, q.subtotal, q.tax_total, q.total, q.valid_until,
|
|
q.notes, q.created_at, c.name as customer_name, e.name as employee_name
|
|
FROM quotations q
|
|
LEFT JOIN customers c ON q.customer_id = c.id
|
|
LEFT JOIN employees e ON q.employee_id = e.id
|
|
WHERE q.id = %s
|
|
""", (quot_id,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': 'Quotation not found'}), 404
|
|
|
|
q_id, subtotal, tax_total, total, valid_until, notes, created_at, cust_name, emp_name = row
|
|
|
|
cur.execute("""
|
|
SELECT part_number, name, quantity, unit_price, discount_pct, tax_rate, subtotal
|
|
FROM quotation_items WHERE quotation_id = %s ORDER BY id
|
|
""", (quot_id,))
|
|
items = cur.fetchall()
|
|
cur.close(); conn.close()
|
|
|
|
# Build HTML email
|
|
items_html = ''
|
|
for it in items:
|
|
items_html += (
|
|
f'<tr><td>{it[0]}</td><td>{it[1]}</td><td style="text-align:center">{it[2]}</td>'
|
|
f'<td style="text-align:right">${float(it[3]):,.2f}</td>'
|
|
f'<td style="text-align:right">${float(it[6]):,.2f}</td></tr>'
|
|
)
|
|
|
|
html_body = f"""
|
|
<html><body style="font-family:Arial,sans-serif;color:#333;">
|
|
<h2>Cotizacion #{q_id} - Nexus Autoparts</h2>
|
|
<p><strong>Cliente:</strong> {cust_name or 'Publico general'}</p>
|
|
<p><strong>Vendedor:</strong> {emp_name or '-'}</p>
|
|
<p><strong>Fecha:</strong> {created_at}</p>
|
|
<p><strong>Vigencia:</strong> {valid_until or 'N/A'}</p>
|
|
<table border="1" cellpadding="6" cellspacing="0" style="border-collapse:collapse;width:100%;">
|
|
<tr style="background:#f5a623;color:#fff;">
|
|
<th>No. Parte</th><th>Descripcion</th><th>Cant.</th><th>P. Unit.</th><th>Subtotal</th>
|
|
</tr>
|
|
{items_html}
|
|
</table>
|
|
<p style="text-align:right;margin-top:12px;">
|
|
<strong>Subtotal:</strong> ${float(subtotal):,.2f}<br>
|
|
<strong>IVA:</strong> ${float(tax_total):,.2f}<br>
|
|
<strong style="font-size:1.2em;">Total: ${float(total):,.2f}</strong>
|
|
</p>
|
|
{f'<p><em>Notas: {notes}</em></p>' if notes else ''}
|
|
<p style="color:#888;font-size:12px;">Este es un documento informativo, no tiene validez fiscal.</p>
|
|
</body></html>
|
|
"""
|
|
|
|
msg = MIMEMultipart('alternative')
|
|
msg['Subject'] = f'Cotizacion #{q_id} - Nexus Autoparts'
|
|
msg['From'] = config.SMTP_FROM
|
|
msg['To'] = email_to
|
|
msg.attach(MIMEText(html_body, 'html'))
|
|
|
|
if not config.SMTP_USER:
|
|
return jsonify({'error': 'SMTP not configured on server'}), 503
|
|
|
|
try:
|
|
with smtplib.SMTP(config.SMTP_HOST, config.SMTP_PORT, timeout=15) as server:
|
|
server.starttls()
|
|
server.login(config.SMTP_USER, config.SMTP_PASS)
|
|
server.sendmail(config.SMTP_FROM, [email_to], msg.as_string())
|
|
|
|
log_action(get_tenant_conn(g.tenant_id), 'QUOTATION_EMAIL', 'quotation', quot_id,
|
|
new_value={'email': email_to})
|
|
|
|
return jsonify({'message': f'Quotation #{q_id} sent to {email_to}'})
|
|
except Exception as e:
|
|
return jsonify({'error': f'Failed to send email: {str(e)}'}), 500
|
|
|
|
|
|
@pos_bp.route('/quotations/<int:quot_id>/convert', methods=['POST'])
|
|
@require_auth('pos.sell')
|
|
def convert_quotation(quot_id):
|
|
"""Convert a quotation to a sale. Uses current stock and prices from the quotation.
|
|
|
|
Body: {
|
|
register_id: int,
|
|
payment_method: str,
|
|
sale_type: str,
|
|
amount_paid: float,
|
|
payment_details: [{method, amount, reference}]
|
|
}
|
|
"""
|
|
data = request.get_json() or {}
|
|
conn = get_tenant_conn(g.tenant_id)
|
|
cur = conn.cursor()
|
|
|
|
# Get quotation
|
|
cur.execute("SELECT id, customer_id, status FROM quotations WHERE id = %s", (quot_id,))
|
|
quot = cur.fetchone()
|
|
if not quot:
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': 'Quotation not found'}), 404
|
|
if quot[2] != 'active':
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': f'Quotation is {quot[2]}, cannot convert'}), 400
|
|
|
|
# Get quotation items
|
|
cur.execute("""
|
|
SELECT inventory_id, quantity, unit_price, discount_pct, tax_rate
|
|
FROM quotation_items WHERE quotation_id = %s
|
|
""", (quot_id,))
|
|
items = []
|
|
for r in cur.fetchall():
|
|
items.append({
|
|
'inventory_id': r[0], 'quantity': r[1], 'unit_price': float(r[2]),
|
|
'discount_pct': float(r[3]) if r[3] else 0,
|
|
'tax_rate': float(r[4]) if r[4] else 0.16,
|
|
})
|
|
|
|
# Build sale_data
|
|
sale_data = {
|
|
'items': items,
|
|
'customer_id': quot[1],
|
|
'payment_method': data.get('payment_method', 'efectivo'),
|
|
'sale_type': data.get('sale_type', 'cash'),
|
|
'register_id': data.get('register_id'),
|
|
'amount_paid': data.get('amount_paid', 0),
|
|
'payment_details': data.get('payment_details', []),
|
|
'notes': f'Convertida de cotizacion #{quot_id}',
|
|
}
|
|
|
|
try:
|
|
sale = process_sale(conn, sale_data)
|
|
|
|
# Mark quotation as converted
|
|
cur.execute("""
|
|
UPDATE quotations SET status = 'converted', converted_sale_id = %s
|
|
WHERE id = %s
|
|
""", (sale['id'], quot_id))
|
|
|
|
conn.commit()
|
|
cur.close(); conn.close()
|
|
return jsonify(sale), 201
|
|
except ValueError as e:
|
|
conn.rollback()
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': str(e)}), 400
|
|
except Exception as e:
|
|
conn.rollback()
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@pos_bp.route('/quotations/<int:quot_id>/cancel', methods=['PUT'])
|
|
@require_auth('pos.sell')
|
|
def cancel_quotation(quot_id):
|
|
"""Cancel a quotation."""
|
|
conn = get_tenant_conn(g.tenant_id)
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("SELECT id, status FROM quotations WHERE id = %s", (quot_id,))
|
|
quot = cur.fetchone()
|
|
if not quot:
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': 'Quotation not found'}), 404
|
|
if quot[1] != 'active':
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': f'Quotation is already {quot[1]}'}), 400
|
|
|
|
cur.execute("UPDATE quotations SET status = 'cancelled' WHERE id = %s", (quot_id,))
|
|
conn.commit()
|
|
cur.close(); conn.close()
|
|
return jsonify({'message': 'Quotation cancelled'})
|
|
|
|
|
|
# ─── Layaways (Apartados) ────────────────────────
|
|
|
|
@pos_bp.route('/layaways', methods=['POST'])
|
|
@require_auth('pos.sell')
|
|
def create_layaway():
|
|
"""Create a layaway. Requires customer_id and partial payment.
|
|
|
|
Body: {
|
|
items: [{inventory_id, quantity, unit_price, discount_pct, tax_rate}],
|
|
customer_id: int (required),
|
|
initial_payment: float (required, > 0),
|
|
payment_method: str,
|
|
reference: str,
|
|
register_id: int,
|
|
expires_days: int (default 30),
|
|
notes: str
|
|
}
|
|
"""
|
|
data = request.get_json() or {}
|
|
items = data.get('items', [])
|
|
customer_id = data.get('customer_id')
|
|
initial_payment = float(data.get('initial_payment', 0))
|
|
register_id = data.get('register_id')
|
|
|
|
if not items:
|
|
return jsonify({'error': 'No items in layaway'}), 400
|
|
if not customer_id:
|
|
return jsonify({'error': 'customer_id required for layaway'}), 400
|
|
if initial_payment <= 0:
|
|
return jsonify({'error': 'Initial payment must be greater than 0'}), 400
|
|
|
|
conn = get_tenant_conn(g.tenant_id)
|
|
cur = conn.cursor()
|
|
|
|
# Enrich items with inventory data
|
|
try:
|
|
enriched = _enrich_items(cur, items, customer_id)
|
|
except ValueError as e:
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': str(e)}), 400
|
|
|
|
# Calculate totals
|
|
totals = calculate_totals(enriched)
|
|
|
|
if initial_payment > totals['total']:
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': 'Initial payment exceeds total'}), 400
|
|
|
|
expires_days = int(data.get('expires_days', 30))
|
|
expires_at = (date.today() + timedelta(days=expires_days)).isoformat()
|
|
|
|
try:
|
|
# Create layaway record
|
|
cur.execute("""
|
|
INSERT INTO layaways
|
|
(branch_id, customer_id, employee_id, total, amount_paid,
|
|
status, expires_at, notes)
|
|
VALUES (%s,%s,%s,%s,%s,'active',%s,%s)
|
|
RETURNING id, created_at
|
|
""", (
|
|
g.branch_id, customer_id, g.employee_id,
|
|
totals['total'], initial_payment, expires_at, data.get('notes')
|
|
))
|
|
layaway_id, created_at = cur.fetchone()
|
|
|
|
# Insert layaway items and reserve stock (table created by migration v1.1_pos_tables.sql)
|
|
from services.inventory_engine import record_operation
|
|
for item in totals['items']:
|
|
part_number = item.get('part_number', '')
|
|
name = item.get('name', '')
|
|
item_branch_id = item.get('branch_id', g.branch_id)
|
|
|
|
line_subtotal = round(
|
|
item['unit_price'] * item['quantity'] * (1 - item['discount_pct'] / 100), 2
|
|
)
|
|
|
|
cur.execute("""
|
|
INSERT INTO layaway_items
|
|
(layaway_id, inventory_id, part_number, name, quantity,
|
|
unit_price, discount_pct, tax_rate, subtotal)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)
|
|
""", (
|
|
layaway_id, item['inventory_id'], part_number, name,
|
|
item['quantity'], item['unit_price'], item['discount_pct'],
|
|
item['tax_rate'], line_subtotal
|
|
))
|
|
|
|
# Reserve stock immediately (negative quantity = stock deduction)
|
|
record_operation(
|
|
conn, item['inventory_id'], item_branch_id,
|
|
operation_type='LAYAWAY_RESERVE',
|
|
quantity=-item['quantity'],
|
|
notes=f'Apartado #{layaway_id} - reserva'
|
|
)
|
|
|
|
# Record initial payment
|
|
cur.execute("""
|
|
INSERT INTO layaway_payments
|
|
(layaway_id, amount, payment_method, reference, employee_id)
|
|
VALUES (%s,%s,%s,%s,%s)
|
|
""", (
|
|
layaway_id, initial_payment,
|
|
data.get('payment_method', 'efectivo'),
|
|
data.get('reference'), g.employee_id
|
|
))
|
|
|
|
# Record cash movement on register if cash payment
|
|
if register_id and data.get('payment_method', 'efectivo') == 'efectivo':
|
|
cur.execute("""
|
|
INSERT INTO cash_movements (register_id, type, amount, reason, employee_id)
|
|
VALUES (%s, 'in', %s, %s, %s)
|
|
""", (register_id, initial_payment, f'Apartado #{layaway_id} - anticipo', g.employee_id))
|
|
|
|
log_action(conn, 'LAYAWAY_CREATE', 'layaway', layaway_id,
|
|
new_value={'total': totals['total'], 'initial_payment': initial_payment,
|
|
'customer_id': customer_id})
|
|
|
|
conn.commit()
|
|
cur.close(); conn.close()
|
|
|
|
return jsonify({
|
|
'id': layaway_id,
|
|
'total': totals['total'],
|
|
'amount_paid': initial_payment,
|
|
'remaining': round(totals['total'] - initial_payment, 2),
|
|
'expires_at': expires_at,
|
|
'created_at': str(created_at),
|
|
'message': 'Layaway created'
|
|
}), 201
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@pos_bp.route('/layaways', methods=['GET'])
|
|
@require_auth('pos.view')
|
|
def list_layaways():
|
|
"""List layaways. Query params: customer_id, status, page, per_page."""
|
|
conn = get_tenant_conn(g.tenant_id)
|
|
cur = conn.cursor()
|
|
|
|
page = int(request.args.get('page', 1))
|
|
per_page = min(int(request.args.get('per_page', 50)), 200)
|
|
|
|
where_clauses = ["1=1"]
|
|
params = []
|
|
|
|
customer_id = request.args.get('customer_id')
|
|
status = request.args.get('status')
|
|
|
|
if customer_id:
|
|
where_clauses.append("l.customer_id = %s")
|
|
params.append(int(customer_id))
|
|
if status:
|
|
where_clauses.append("l.status = %s")
|
|
params.append(status)
|
|
if g.branch_id:
|
|
where_clauses.append("l.branch_id = %s")
|
|
params.append(g.branch_id)
|
|
|
|
where = " AND ".join(where_clauses)
|
|
|
|
cur.execute(f"SELECT count(*) FROM layaways l WHERE {where}", params)
|
|
total = cur.fetchone()[0]
|
|
|
|
cur.execute(f"""
|
|
SELECT l.id, l.customer_id, l.employee_id, l.total, l.amount_paid,
|
|
l.status, l.expires_at, l.created_at,
|
|
c.name as customer_name, e.name as employee_name
|
|
FROM layaways l
|
|
LEFT JOIN customers c ON l.customer_id = c.id
|
|
LEFT JOIN employees e ON l.employee_id = e.id
|
|
WHERE {where}
|
|
ORDER BY l.created_at DESC
|
|
LIMIT %s OFFSET %s
|
|
""", params + [per_page, (page - 1) * per_page])
|
|
|
|
layaways = []
|
|
for r in cur.fetchall():
|
|
layaways.append({
|
|
'id': r[0], 'customer_id': r[1], 'employee_id': r[2],
|
|
'total': float(r[3]) if r[3] else 0,
|
|
'amount_paid': float(r[4]) if r[4] else 0,
|
|
'remaining': round(float(r[3] or 0) - float(r[4] or 0), 2),
|
|
'status': r[5], 'expires_at': str(r[6]) if r[6] else None,
|
|
'created_at': str(r[7]),
|
|
'customer_name': r[8], 'employee_name': r[9],
|
|
})
|
|
|
|
cur.close(); conn.close()
|
|
|
|
total_pages = (total + per_page - 1) // per_page
|
|
return jsonify({
|
|
'data': layaways,
|
|
'pagination': {'page': page, 'per_page': per_page, 'total': total, 'total_pages': total_pages}
|
|
})
|
|
|
|
|
|
@pos_bp.route('/layaways/<int:layaway_id>', methods=['GET'])
|
|
@require_auth('pos.view')
|
|
def get_layaway(layaway_id):
|
|
"""Get layaway detail with items and payments."""
|
|
conn = get_tenant_conn(g.tenant_id)
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT l.*, c.name as customer_name, e.name as employee_name
|
|
FROM layaways l
|
|
LEFT JOIN customers c ON l.customer_id = c.id
|
|
LEFT JOIN employees e ON l.employee_id = e.id
|
|
WHERE l.id = %s
|
|
""", (layaway_id,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': 'Layaway not found'}), 404
|
|
|
|
cols = [desc[0] for desc in cur.description]
|
|
layaway = dict(zip(cols, row))
|
|
for k in ('total', 'amount_paid'):
|
|
if layaway.get(k) is not None:
|
|
layaway[k] = float(layaway[k])
|
|
layaway['remaining'] = round(layaway['total'] - layaway['amount_paid'], 2)
|
|
if layaway.get('created_at'):
|
|
layaway['created_at'] = str(layaway['created_at'])
|
|
if layaway.get('expires_at'):
|
|
layaway['expires_at'] = str(layaway['expires_at'])
|
|
|
|
# Get items
|
|
cur.execute("""
|
|
SELECT id, inventory_id, part_number, name, quantity, unit_price,
|
|
discount_pct, tax_rate, subtotal
|
|
FROM layaway_items WHERE layaway_id = %s ORDER BY id
|
|
""", (layaway_id,))
|
|
layaway['items'] = []
|
|
for r in cur.fetchall():
|
|
layaway['items'].append({
|
|
'id': r[0], 'inventory_id': r[1], 'part_number': r[2], 'name': r[3],
|
|
'quantity': r[4], 'unit_price': float(r[5]) if r[5] else 0,
|
|
'discount_pct': float(r[6]) if r[6] else 0,
|
|
'tax_rate': float(r[7]) if r[7] else 0,
|
|
'subtotal': float(r[8]) if r[8] else 0,
|
|
})
|
|
|
|
# Get payments
|
|
cur.execute("""
|
|
SELECT id, amount, payment_method, reference, employee_id, created_at
|
|
FROM layaway_payments WHERE layaway_id = %s ORDER BY created_at
|
|
""", (layaway_id,))
|
|
layaway['payments'] = []
|
|
for r in cur.fetchall():
|
|
layaway['payments'].append({
|
|
'id': r[0], 'amount': float(r[1]) if r[1] else 0,
|
|
'payment_method': r[2], 'reference': r[3],
|
|
'employee_id': r[4], 'created_at': str(r[5]) if r[5] else None,
|
|
})
|
|
|
|
cur.close(); conn.close()
|
|
return jsonify(layaway)
|
|
|
|
|
|
@pos_bp.route('/layaways/<int:layaway_id>/payment', methods=['POST'])
|
|
@require_auth('pos.sell')
|
|
def layaway_payment(layaway_id):
|
|
"""Add a payment to a layaway.
|
|
|
|
Body: {amount, payment_method, reference, register_id}
|
|
"""
|
|
data = request.get_json() or {}
|
|
amount = float(data.get('amount', 0))
|
|
if amount <= 0:
|
|
return jsonify({'error': 'Amount must be greater than 0'}), 400
|
|
|
|
conn = get_tenant_conn(g.tenant_id)
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("SELECT total, amount_paid, status FROM layaways WHERE id = %s", (layaway_id,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': 'Layaway not found'}), 404
|
|
|
|
total, paid, status = float(row[0]), float(row[1]), row[2]
|
|
if status != 'active':
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': f'Layaway is {status}'}), 400
|
|
|
|
remaining = round(total - paid, 2)
|
|
if amount > remaining:
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': f'Payment ${amount:.2f} exceeds remaining ${remaining:.2f}'}), 400
|
|
|
|
try:
|
|
# Record payment
|
|
cur.execute("""
|
|
INSERT INTO layaway_payments
|
|
(layaway_id, amount, payment_method, reference, employee_id)
|
|
VALUES (%s,%s,%s,%s,%s)
|
|
RETURNING id
|
|
""", (
|
|
layaway_id, amount,
|
|
data.get('payment_method', 'efectivo'),
|
|
data.get('reference'), g.employee_id
|
|
))
|
|
payment_id = cur.fetchone()[0]
|
|
|
|
# Update amount_paid
|
|
new_paid = round(paid + amount, 2)
|
|
cur.execute("UPDATE layaways SET amount_paid = %s WHERE id = %s", (new_paid, layaway_id))
|
|
|
|
# Record cash movement if applicable
|
|
register_id = data.get('register_id')
|
|
if register_id and data.get('payment_method', 'efectivo') == 'efectivo':
|
|
cur.execute("""
|
|
INSERT INTO cash_movements (register_id, type, amount, reason, employee_id)
|
|
VALUES (%s, 'in', %s, %s, %s)
|
|
""", (register_id, amount, f'Apartado #{layaway_id} - abono', g.employee_id))
|
|
|
|
conn.commit()
|
|
cur.close(); conn.close()
|
|
|
|
new_remaining = round(total - new_paid, 2)
|
|
return jsonify({
|
|
'payment_id': payment_id,
|
|
'amount': amount,
|
|
'total_paid': new_paid,
|
|
'remaining': new_remaining,
|
|
'fully_paid': new_remaining <= 0,
|
|
'message': 'Payment recorded'
|
|
})
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@pos_bp.route('/layaways/<int:layaway_id>/complete', methods=['POST'])
|
|
@require_auth('pos.sell')
|
|
def complete_layaway(layaway_id):
|
|
"""Convert a fully paid layaway to a sale.
|
|
|
|
Body: {register_id: int}
|
|
The layaway must be fully paid (amount_paid >= total).
|
|
"""
|
|
data = request.get_json() or {}
|
|
conn = get_tenant_conn(g.tenant_id)
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT id, customer_id, total, amount_paid, status, branch_id
|
|
FROM layaways WHERE id = %s
|
|
""", (layaway_id,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': 'Layaway not found'}), 404
|
|
|
|
l_id, cust_id, total, paid, status, branch_id = row
|
|
total, paid = float(total), float(paid)
|
|
|
|
if status != 'active':
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': f'Layaway is {status}'}), 400
|
|
|
|
remaining = round(total - paid, 2)
|
|
try:
|
|
# If there's a remaining balance, accept a final payment with the complete call
|
|
if remaining > 0:
|
|
final_method = data.get('payment_method', 'efectivo')
|
|
cur.execute("""
|
|
INSERT INTO layaway_payments
|
|
(layaway_id, amount, payment_method, reference, employee_id)
|
|
VALUES (%s,%s,%s,%s,%s)
|
|
""", (layaway_id, remaining, final_method,
|
|
data.get('reference', 'Pago final al completar'), g.employee_id))
|
|
cur.execute("UPDATE layaways SET amount_paid = total WHERE id = %s", (layaway_id,))
|
|
paid = total
|
|
|
|
# Record cash movement for final payment
|
|
register_id = data.get('register_id')
|
|
if register_id and final_method == 'efectivo':
|
|
cur.execute("""
|
|
INSERT INTO cash_movements (register_id, type, amount, reason, employee_id)
|
|
VALUES (%s, 'in', %s, %s, %s)
|
|
""", (register_id, remaining, f'Apartado #{layaway_id} - pago final', g.employee_id))
|
|
|
|
# Get layaway items
|
|
cur.execute("""
|
|
SELECT inventory_id, quantity, unit_price, discount_pct, tax_rate
|
|
FROM layaway_items WHERE layaway_id = %s
|
|
""", (layaway_id,))
|
|
items = []
|
|
for r in cur.fetchall():
|
|
items.append({
|
|
'inventory_id': r[0], 'quantity': r[1],
|
|
'unit_price': float(r[2]) if r[2] else 0,
|
|
'discount_pct': float(r[3]) if r[3] else 0,
|
|
'tax_rate': float(r[4]) if r[4] else 0.16,
|
|
})
|
|
|
|
# Create sale record directly instead of calling process_sale(),
|
|
# because stock was already reserved at layaway creation time via
|
|
# LAYAWAY_RESERVE operations. Calling process_sale() would deduct
|
|
# inventory again (double deduction).
|
|
from services.pos_engine import calculate_totals
|
|
totals_calc = calculate_totals(items)
|
|
|
|
cur.execute("""
|
|
INSERT INTO sales
|
|
(branch_id, customer_id, employee_id, register_id, sale_type,
|
|
payment_method, subtotal, discount_total, tax_total, total,
|
|
amount_paid, change_given, metodo_pago_sat, forma_pago_sat,
|
|
status, notes)
|
|
VALUES (%s,%s,%s,%s,'cash','efectivo',%s,%s,%s,%s,%s,0,'PUE','01','completed',%s)
|
|
RETURNING id, created_at
|
|
""", (
|
|
branch_id, cust_id, g.employee_id, data.get('register_id'),
|
|
totals_calc['subtotal'], totals_calc['discount_total'],
|
|
totals_calc['tax_total'], totals_calc['total'], total,
|
|
f'Completado de apartado #{layaway_id}',
|
|
))
|
|
sale_id, sale_created = cur.fetchone()
|
|
|
|
# Create sale_items (no inventory deduction — already reserved)
|
|
sale_items = []
|
|
for item in totals_calc['items']:
|
|
cur.execute("SELECT part_number, name, cost FROM inventory WHERE id = %s",
|
|
(item['inventory_id'],))
|
|
inv = cur.fetchone()
|
|
cur.execute("""
|
|
INSERT INTO sale_items
|
|
(sale_id, inventory_id, part_number, name, quantity,
|
|
unit_price, unit_cost, discount_pct, discount_amount,
|
|
tax_rate, tax_amount, subtotal)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
|
|
""", (
|
|
sale_id, item['inventory_id'],
|
|
inv[0] if inv else '', inv[1] if inv else '',
|
|
item['quantity'], item['unit_price'],
|
|
float(inv[2]) if inv and inv[2] else 0,
|
|
item['discount_pct'], item['discount_amount'],
|
|
item['tax_rate'], item['tax_amount'], item['subtotal']
|
|
))
|
|
|
|
# Record payment on register
|
|
register_id = data.get('register_id')
|
|
if register_id:
|
|
cur.execute("""
|
|
INSERT INTO sale_payments
|
|
(sale_id, register_id, method, amount, reference)
|
|
VALUES (%s,%s,'efectivo',%s,%s)
|
|
""", (sale_id, register_id, total, f'Apartado #{layaway_id} completado'))
|
|
|
|
sale = {
|
|
'id': sale_id, 'status': 'completed', 'total': totals_calc['total'],
|
|
'created_at': str(sale_created),
|
|
}
|
|
|
|
# Mark layaway as completed
|
|
cur.execute("""
|
|
UPDATE layaways SET status = 'completed', converted_sale_id = %s
|
|
WHERE id = %s
|
|
""", (sale['id'], layaway_id))
|
|
|
|
log_action(conn, 'LAYAWAY_COMPLETE', 'layaway', layaway_id,
|
|
new_value={'sale_id': sale['id'], 'total': total})
|
|
|
|
conn.commit()
|
|
cur.close(); conn.close()
|
|
return jsonify(sale), 201
|
|
|
|
except ValueError as e:
|
|
conn.rollback()
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': str(e)}), 400
|
|
except Exception as e:
|
|
conn.rollback()
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@pos_bp.route('/layaways/<int:layaway_id>/cancel', methods=['PUT'])
|
|
@require_auth('pos.sell')
|
|
def cancel_layaway(layaway_id):
|
|
"""Cancel a layaway. Refunds must be handled separately.
|
|
|
|
Body: {reason: str}
|
|
"""
|
|
data = request.get_json() or {}
|
|
reason = data.get('reason', '').strip()
|
|
if not reason or len(reason) < 3:
|
|
return jsonify({'error': 'Reason is required (min 3 characters)'}), 400
|
|
|
|
conn = get_tenant_conn(g.tenant_id)
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("SELECT id, status, amount_paid, total, branch_id FROM layaways WHERE id = %s", (layaway_id,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': 'Layaway not found'}), 404
|
|
if row[1] != 'active':
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': f'Layaway is already {row[1]}'}), 400
|
|
|
|
layaway_branch = row[4] or g.branch_id
|
|
|
|
# Reverse stock reservations (return reserved items to available stock)
|
|
from services.inventory_engine import record_operation
|
|
cur.execute("""
|
|
SELECT inventory_id, quantity FROM layaway_items WHERE layaway_id = %s
|
|
""", (layaway_id,))
|
|
layaway_items = cur.fetchall()
|
|
for inv_id, qty in layaway_items:
|
|
# Positive quantity = return stock
|
|
record_operation(
|
|
conn, inv_id, layaway_branch,
|
|
operation_type='LAYAWAY_CANCEL',
|
|
quantity=qty,
|
|
notes=f'Cancelacion apartado #{layaway_id}: {reason}'
|
|
)
|
|
|
|
cur.execute("""
|
|
UPDATE layaways SET status = 'cancelled',
|
|
notes = COALESCE(notes || ' | ', '') || %s
|
|
WHERE id = %s
|
|
""", (f"CANCELADO: {reason}", layaway_id))
|
|
|
|
log_action(conn, 'LAYAWAY_CANCEL', 'layaway', layaway_id,
|
|
old_value={'status': 'active', 'amount_paid': float(row[2])},
|
|
new_value={'status': 'cancelled', 'reason': reason,
|
|
'items_unreserved': len(layaway_items)})
|
|
|
|
conn.commit()
|
|
cur.close(); conn.close()
|
|
|
|
return jsonify({
|
|
'message': 'Layaway cancelled',
|
|
'amount_paid': float(row[2]),
|
|
'items_unreserved': len(layaway_items),
|
|
'note': 'Stock reservations reversed. Refund of paid amount must be processed separately.'
|
|
})
|
|
|
|
|
|
# ─── Returns / Warranty ───────────────────────────
|
|
|
|
@pos_bp.route('/returns', methods=['POST'])
|
|
@require_auth('pos.sell')
|
|
def create_return():
|
|
"""Process a product return with warranty support.
|
|
|
|
Body: {
|
|
sale_id: int,
|
|
items: [{sale_item_id: int, quantity: int, reason: str}],
|
|
notes: str
|
|
}
|
|
"""
|
|
data = request.get_json() or {}
|
|
sale_id = data.get('sale_id')
|
|
items = data.get('items', [])
|
|
notes = data.get('notes', '')
|
|
|
|
if not sale_id:
|
|
return jsonify({'error': 'sale_id is required'}), 400
|
|
if not items:
|
|
return jsonify({'error': 'At least one return item required'}), 400
|
|
|
|
conn = get_tenant_conn(g.tenant_id)
|
|
cur = conn.cursor()
|
|
|
|
try:
|
|
# Validate sale exists and is completed
|
|
cur.execute("""
|
|
SELECT id, customer_id, total, status, branch_id
|
|
FROM sales WHERE id = %s
|
|
""", (sale_id,))
|
|
sale = cur.fetchone()
|
|
if not sale:
|
|
return jsonify({'error': 'Sale not found'}), 404
|
|
if sale[3] not in ('completed', 'partially_returned'):
|
|
return jsonify({'error': f'Cannot return items from a {sale[3]} sale'}), 400
|
|
|
|
sale_customer_id = sale[1]
|
|
sale_branch_id = sale[4] or g.branch_id
|
|
|
|
# Validate each return item against original sale items
|
|
total_refund = 0
|
|
validated_items = []
|
|
|
|
for ri in items:
|
|
si_id = ri.get('sale_item_id')
|
|
ret_qty = int(ri.get('quantity', 0))
|
|
reason = ri.get('reason', '').strip()
|
|
|
|
if ret_qty <= 0:
|
|
raise ValueError(f'Invalid return quantity for sale_item_id {si_id}')
|
|
if not reason:
|
|
raise ValueError(f'Reason required for sale_item_id {si_id}')
|
|
|
|
cur.execute("""
|
|
SELECT id, inventory_id, quantity, unit_price, discount_pct, tax_rate, subtotal
|
|
FROM sale_items WHERE id = %s AND sale_id = %s
|
|
""", (si_id, sale_id))
|
|
si = cur.fetchone()
|
|
if not si:
|
|
raise ValueError(f'Sale item {si_id} not found in sale #{sale_id}')
|
|
|
|
original_qty = si[2]
|
|
|
|
# Check how much has already been returned for this sale_item
|
|
cur.execute("""
|
|
SELECT COALESCE(SUM(ri2.quantity), 0)
|
|
FROM return_items ri2
|
|
JOIN returns r ON ri2.return_id = r.id
|
|
WHERE ri2.sale_item_id = %s AND r.status = 'completed'
|
|
""", (si_id,))
|
|
already_returned = cur.fetchone()[0]
|
|
|
|
remaining = original_qty - already_returned
|
|
if ret_qty > remaining:
|
|
raise ValueError(
|
|
f'Cannot return {ret_qty} of sale_item {si_id} — only {remaining} remaining'
|
|
)
|
|
|
|
unit_price = float(si[3])
|
|
discount_pct = float(si[4]) if si[4] else 0
|
|
tax_rate = float(si[5]) if si[5] else 0.16
|
|
price_after_discount = unit_price * (1 - discount_pct / 100)
|
|
refund_amount = round(ret_qty * price_after_discount * (1 + tax_rate), 2)
|
|
total_refund += refund_amount
|
|
|
|
validated_items.append({
|
|
'sale_item_id': si_id,
|
|
'inventory_id': si[1],
|
|
'quantity': ret_qty,
|
|
'unit_price': unit_price,
|
|
'refund_amount': refund_amount,
|
|
'reason': reason,
|
|
})
|
|
|
|
# Create return record
|
|
cur.execute("""
|
|
INSERT INTO returns (sale_id, customer_id, employee_id, total_refund, reason, status)
|
|
VALUES (%s, %s, %s, %s, %s, 'completed')
|
|
RETURNING id
|
|
""", (sale_id, sale_customer_id, g.employee_id, total_refund, notes or 'Devolucion'))
|
|
|
|
return_id = cur.fetchone()[0]
|
|
|
|
# Create return items and restore inventory
|
|
from services.inventory_engine import record_operation
|
|
|
|
for vi in validated_items:
|
|
cur.execute("""
|
|
INSERT INTO return_items
|
|
(return_id, sale_item_id, inventory_id, quantity, unit_price, refund_amount)
|
|
VALUES (%s, %s, %s, %s, %s, %s)
|
|
""", (return_id, vi['sale_item_id'], vi['inventory_id'],
|
|
vi['quantity'], vi['unit_price'], vi['refund_amount']))
|
|
|
|
# Return stock to inventory
|
|
record_operation(
|
|
conn, vi['inventory_id'], sale_branch_id,
|
|
operation_type='RETURN',
|
|
quantity=vi['quantity'],
|
|
notes=f'Devolucion #{return_id} de venta #{sale_id}: {vi["reason"]}'
|
|
)
|
|
|
|
# Update sale status if all items returned
|
|
cur.execute("""
|
|
SELECT COALESCE(SUM(ri2.quantity), 0), COALESCE(SUM(si2.quantity), 0)
|
|
FROM sale_items si2
|
|
LEFT JOIN (
|
|
SELECT sale_item_id, SUM(quantity) as quantity
|
|
FROM return_items ri3
|
|
JOIN returns r2 ON ri3.return_id = r2.id
|
|
WHERE r2.sale_id = %s AND r2.status = 'completed'
|
|
GROUP BY sale_item_id
|
|
) ri2 ON ri2.sale_item_id = si2.id
|
|
WHERE si2.sale_id = %s
|
|
""", (sale_id, sale_id))
|
|
returned_total, sold_total = cur.fetchone()
|
|
new_status = 'returned' if returned_total >= sold_total else 'partially_returned'
|
|
cur.execute("UPDATE sales SET status = %s WHERE id = %s", (new_status, sale_id))
|
|
|
|
# Update customer credit if applicable
|
|
if sale_customer_id:
|
|
cur.execute("""
|
|
UPDATE customers SET credit_balance = COALESCE(credit_balance, 0) + %s
|
|
WHERE id = %s
|
|
""", (total_refund, sale_customer_id))
|
|
|
|
log_action(conn, 'RETURN_CREATE', 'return', return_id,
|
|
new_value={
|
|
'sale_id': sale_id,
|
|
'total_refund': total_refund,
|
|
'items_count': len(validated_items),
|
|
'sale_status': new_status
|
|
})
|
|
|
|
conn.commit()
|
|
cur.close(); conn.close()
|
|
|
|
return jsonify({
|
|
'id': return_id,
|
|
'sale_id': sale_id,
|
|
'total_refund': total_refund,
|
|
'items': validated_items,
|
|
'sale_status': new_status,
|
|
'message': f'Return #{return_id} created — ${total_refund:,.2f} refund'
|
|
}), 201
|
|
|
|
except ValueError as e:
|
|
conn.rollback()
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': str(e)}), 400
|
|
except Exception as e:
|
|
conn.rollback()
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@pos_bp.route('/returns', methods=['GET'])
|
|
@require_auth('pos.view')
|
|
def list_returns():
|
|
"""List returns with optional filters."""
|
|
conn = get_tenant_conn(g.tenant_id)
|
|
cur = conn.cursor()
|
|
|
|
page = int(request.args.get('page', 1))
|
|
per_page = min(int(request.args.get('per_page', 50)), 200)
|
|
|
|
where_clauses = ["1=1"]
|
|
params = []
|
|
|
|
sale_id = request.args.get('sale_id')
|
|
customer_id = request.args.get('customer_id')
|
|
if sale_id:
|
|
where_clauses.append("r.sale_id = %s")
|
|
params.append(int(sale_id))
|
|
if customer_id:
|
|
where_clauses.append("r.customer_id = %s")
|
|
params.append(int(customer_id))
|
|
|
|
where = " AND ".join(where_clauses)
|
|
|
|
cur.execute(f"SELECT count(*) FROM returns r WHERE {where}", params)
|
|
total = cur.fetchone()[0]
|
|
|
|
cur.execute(f"""
|
|
SELECT r.id, r.sale_id, r.customer_id, r.employee_id, r.total_refund,
|
|
r.reason, r.status, r.created_at,
|
|
e.name as employee_name, c.name as customer_name
|
|
FROM returns r
|
|
LEFT JOIN employees e ON r.employee_id = e.id
|
|
LEFT JOIN customers c ON r.customer_id = c.id
|
|
WHERE {where}
|
|
ORDER BY r.created_at DESC
|
|
LIMIT %s OFFSET %s
|
|
""", params + [per_page, (page - 1) * per_page])
|
|
|
|
returns = []
|
|
for row in cur.fetchall():
|
|
returns.append({
|
|
'id': row[0], 'sale_id': row[1], 'customer_id': row[2],
|
|
'employee_id': row[3], 'total_refund': float(row[4]) if row[4] else 0,
|
|
'reason': row[5], 'status': row[6], 'created_at': str(row[7]),
|
|
'employee_name': row[8], 'customer_name': row[9],
|
|
})
|
|
|
|
cur.close(); conn.close()
|
|
total_pages = (total + per_page - 1) // per_page
|
|
return jsonify({
|
|
'data': returns,
|
|
'pagination': {'page': page, 'per_page': per_page, 'total': total, 'total_pages': total_pages}
|
|
})
|
|
|
|
|
|
# ─── Push Notifications ───────────────────────────
|
|
|
|
@pos_bp.route('/push/subscribe', methods=['POST'])
|
|
@require_auth('pos.view')
|
|
def push_subscribe():
|
|
"""Save push subscription for current employee.
|
|
|
|
Body: {subscription: <PushSubscription JSON from browser>}
|
|
"""
|
|
from services.push_service import save_subscription, ensure_push_table, get_or_create_vapid_keys
|
|
|
|
data = request.get_json() or {}
|
|
subscription = data.get('subscription')
|
|
if not subscription:
|
|
return jsonify({'error': 'subscription required'}), 400
|
|
|
|
conn = get_tenant_conn(g.tenant_id)
|
|
ensure_push_table(conn)
|
|
save_subscription(conn, g.employee_id, subscription)
|
|
conn.close()
|
|
return jsonify({'message': 'Push subscription saved'})
|
|
|
|
|
|
@pos_bp.route('/push/vapid-key', methods=['GET'])
|
|
@require_auth('pos.view')
|
|
def push_vapid_key():
|
|
"""Get the VAPID public key for push subscription."""
|
|
from services.push_service import get_or_create_vapid_keys, ensure_push_table
|
|
|
|
conn = get_tenant_conn(g.tenant_id)
|
|
ensure_push_table(conn)
|
|
_, public_key = get_or_create_vapid_keys(conn)
|
|
conn.close()
|
|
|
|
if not public_key:
|
|
return jsonify({'error': 'Push not available (pywebpush not installed)'}), 503
|
|
return jsonify({'public_key': public_key})
|
|
|
|
|
|
@pos_bp.route('/push/test', methods=['POST'])
|
|
@require_auth('pos.view')
|
|
def push_test():
|
|
"""Send a test push notification to the current employee."""
|
|
from services.push_service import send_push, ensure_push_table
|
|
|
|
conn = get_tenant_conn(g.tenant_id)
|
|
ensure_push_table(conn)
|
|
ok = send_push(conn, g.employee_id, 'Prueba Nexus POS',
|
|
'Las notificaciones push estan funcionando correctamente.', '/pos')
|
|
conn.close()
|
|
|
|
if ok:
|
|
return jsonify({'message': 'Test notification sent'})
|
|
return jsonify({'error': 'No subscription found or push failed'}), 400
|
|
|
|
|
|
# ─── Thermal Printing ──────────────────────────────
|
|
|
|
@pos_bp.route('/sales/<int:sale_id>/print', methods=['POST'])
|
|
@require_auth('pos.sell')
|
|
def print_ticket(sale_id):
|
|
"""Generate a printable ticket for a sale.
|
|
|
|
Body (optional): {printer_type: 'escpos_raw' | 'browser', width: 58 | 80}
|
|
- escpos_raw: returns raw ESC/POS bytes (application/octet-stream)
|
|
- browser: returns printable HTML fragment (text/html)
|
|
"""
|
|
from flask import Response
|
|
from services.thermal_printer import generate_ticket
|
|
|
|
body = request.get_json(silent=True) or {}
|
|
printer_type = body.get('printer_type', 'escpos_raw')
|
|
width = int(body.get('width', 80))
|
|
|
|
conn = get_tenant_conn(g.tenant_id)
|
|
cur = conn.cursor()
|
|
|
|
# Fetch sale
|
|
cur.execute("""
|
|
SELECT s.*, e.name as employee_name, c.name as customer_name
|
|
FROM sales s
|
|
LEFT JOIN employees e ON s.employee_id = e.id
|
|
LEFT JOIN customers c ON s.customer_id = c.id
|
|
WHERE s.id = %s
|
|
""", (sale_id,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
cur.close(); conn.close()
|
|
return jsonify({'error': 'Sale not found'}), 404
|
|
|
|
cols = [desc[0] for desc in cur.description]
|
|
sale = dict(zip(cols, row))
|
|
for k in ('subtotal', 'discount_total', 'tax_total', 'total', 'amount_paid', 'change_given'):
|
|
if sale.get(k) is not None:
|
|
sale[k] = float(sale[k])
|
|
|
|
# Fetch items
|
|
cur.execute("""
|
|
SELECT name, quantity, unit_price, subtotal
|
|
FROM sale_items WHERE sale_id = %s ORDER BY id
|
|
""", (sale_id,))
|
|
items = []
|
|
for r in cur.fetchall():
|
|
items.append({
|
|
'name': r[0], 'quantity': r[1],
|
|
'unit_price': float(r[2]) if r[2] else 0,
|
|
'subtotal': float(r[3]) if r[3] else 0,
|
|
})
|
|
|
|
# Fetch business info from config
|
|
business_info = {'name': 'NEXUS AUTOPARTS', 'rfc': '', 'address': ''}
|
|
try:
|
|
cur.execute("SELECT key, value FROM config WHERE key IN ('business_name','rfc','address')")
|
|
for rw in cur.fetchall():
|
|
if rw[0] == 'business_name':
|
|
business_info['name'] = rw[1]
|
|
else:
|
|
business_info[rw[0]] = rw[1]
|
|
except Exception:
|
|
pass
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
sale_data = {
|
|
'folio': f'V-{sale["id"]}',
|
|
'date': str(sale.get('created_at', '')),
|
|
'employee': sale.get('employee_name', ''),
|
|
'customer': sale.get('customer_name', ''),
|
|
'items': items,
|
|
'subtotal': sale.get('subtotal', 0),
|
|
'discount_total': sale.get('discount_total', 0),
|
|
'tax_total': sale.get('tax_total', 0),
|
|
'total': sale.get('total', 0),
|
|
'payment_method': sale.get('payment_method', 'efectivo'),
|
|
'amount_paid': sale.get('amount_paid'),
|
|
'change_given': sale.get('change_given'),
|
|
}
|
|
|
|
if printer_type == 'browser':
|
|
# Return the sale data as JSON for browser-side rendering
|
|
return jsonify(sale_data)
|
|
|
|
# Default: ESC/POS raw bytes
|
|
raw = generate_ticket(sale_data, business_info, width=width)
|
|
return Response(raw, mimetype='application/octet-stream',
|
|
headers={'Content-Disposition': f'attachment; filename=ticket_{sale_id}.bin'})
|