"""Service Order Engine: workshop Kanban management. States: received -> diagnosis -> waiting_parts -> repair -> quality_check -> ready -> delivered """ import contextlib from datetime import datetime from services import inventory_engine VALID_TRANSITIONS = { 'received': ['diagnosis', 'cancelled'], 'diagnosis': ['waiting_parts', 'repair', 'cancelled'], 'waiting_parts': ['repair', 'cancelled'], 'repair': ['quality_check', 'cancelled'], 'quality_check': ['ready', 'repair', 'cancelled'], 'ready': ['delivered', 'cancelled'], 'delivered': [], 'cancelled': [], } def _generate_order_number(conn): """Generate SO-YYYY-NNNN order number.""" cur = conn.cursor() year = datetime.utcnow().year prefix = f"SO-{year}-" cur.execute(""" SELECT order_number FROM service_orders WHERE order_number LIKE %s ORDER BY order_number DESC LIMIT 1 """, (f"{prefix}%",)) row = cur.fetchone() last_num = 0 if row and row[0]: with contextlib.suppress(ValueError): last_num = int(row[0].split('-')[-1]) new_num = last_num + 1 cur.close() return f"{prefix}{new_num:04d}" def create_service_order(conn, data): """Create a new service order. data: { customer_id, vehicle_id, branch_id, priority, reception_notes, estimated_cost, estimated_completion, employee_id, mileage_in, fuel_level, created_by } """ cur = conn.cursor() order_number = _generate_order_number(conn) cur.execute(""" INSERT INTO service_orders (tenant_id, branch_id, customer_id, vehicle_id, order_number, status, priority, reception_notes, estimated_cost, estimated_completion, employee_id, mileage_in, fuel_level, created_by) VALUES (%s, %s, %s, %s, %s, 'received', %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id """, ( data.get('tenant_id'), data.get('branch_id'), data.get('customer_id'), data.get('vehicle_id'), order_number, data.get('priority', 'normal'), data.get('reception_notes'), data.get('estimated_cost'), data.get('estimated_completion'), data.get('employee_id'), data.get('mileage_in'), data.get('fuel_level'), data.get('created_by'), )) so_id = cur.fetchone()[0] # Log initial status cur.execute(""" INSERT INTO service_order_status_history (service_order_id, new_status, changed_by, notes) VALUES (%s, 'received', %s, 'Orden creada') """, (so_id, data.get('created_by'))) conn.commit() cur.close() return {'service_order_id': so_id, 'order_number': order_number} def get_service_order(conn, so_id): cur = conn.cursor() cur.execute(""" SELECT so.id, so.order_number, so.status, so.priority, so.customer_id, c.name as customer_name, c.phone as customer_phone, so.vehicle_id, fv.plate as vehicle_plate, fv.make as vehicle_make, fv.model as vehicle_model, so.branch_id, so.reception_notes, so.diagnosis_notes, so.repair_notes, so.delivery_notes, so.estimated_cost, so.final_cost, so.estimated_completion, so.actual_completion, so.delivered_at, so.mileage_in, so.mileage_out, so.fuel_level, so.employee_id, e.name as employee_name, so.created_by, so.created_at, so.updated_at FROM service_orders so LEFT JOIN customers c ON so.customer_id = c.id LEFT JOIN fleet_vehicles fv ON so.vehicle_id = fv.id LEFT JOIN employees e ON so.employee_id = e.id WHERE so.id = %s """, (so_id,)) row = cur.fetchone() if not row: cur.close() return None so = { 'id': row[0], 'order_number': row[1], 'status': row[2], 'priority': row[3], 'customer_id': row[4], 'customer_name': row[5], 'customer_phone': row[6], 'vehicle_id': row[7], 'vehicle_plate': row[8], 'vehicle_make': row[9], 'vehicle_model': row[10], 'branch_id': row[11], 'reception_notes': row[12], 'diagnosis_notes': row[13], 'repair_notes': row[14], 'delivery_notes': row[15], 'estimated_cost': float(row[16]) if row[16] else None, 'final_cost': float(row[17]) if row[17] else None, 'estimated_completion': str(row[18]) if row[18] else None, 'actual_completion': str(row[19]) if row[19] else None, 'delivered_at': str(row[20]) if row[20] else None, 'mileage_in': row[21], 'mileage_out': row[22], 'fuel_level': row[23], 'employee_id': row[24], 'employee_name': row[25], 'created_by': row[26], 'created_at': str(row[27]), 'updated_at': str(row[28]), } # Items cur.execute(""" SELECT id, inventory_id, part_number, name, quantity, unit_cost, unit_price, status, notes FROM service_order_items WHERE service_order_id = %s ORDER BY id """, (so_id,)) so['items'] = [] for r in cur.fetchall(): so['items'].append({ 'id': r[0], 'inventory_id': r[1], 'part_number': r[2], 'name': r[3], 'quantity': float(r[4]) if r[4] else 0, 'unit_cost': float(r[5]) if r[5] else None, 'unit_price': float(r[6]) if r[6] else None, 'status': r[7], 'notes': r[8], }) # Labor cur.execute(""" SELECT id, description, hours, hourly_rate, total_cost, employee_id, status FROM service_order_labor WHERE service_order_id = %s ORDER BY id """, (so_id,)) so['labor'] = [] for r in cur.fetchall(): so['labor'].append({ 'id': r[0], 'description': r[1], 'hours': float(r[2]) if r[2] else 0, 'hourly_rate': float(r[3]) if r[3] else 0, 'total_cost': float(r[4]) if r[4] else 0, 'employee_id': r[5], 'status': r[6], }) # Status history cur.execute(""" SELECT id, old_status, new_status, changed_by, notes, created_at FROM service_order_status_history WHERE service_order_id = %s ORDER BY created_at """, (so_id,)) so['status_history'] = [] for r in cur.fetchall(): so['status_history'].append({ 'id': r[0], 'old_status': r[1], 'new_status': r[2], 'changed_by': r[3], 'notes': r[4], 'created_at': str(r[5]), }) cur.close() return so def list_service_orders(conn, status=None, branch_id=None, customer_id=None, priority=None, employee_id=None, page=1, per_page=50): cur = conn.cursor() where_clauses = [] params = [] if status: where_clauses.append("so.status = %s") params.append(status) if branch_id: where_clauses.append("so.branch_id = %s") params.append(branch_id) if customer_id: where_clauses.append("so.customer_id = %s") params.append(customer_id) if priority: where_clauses.append("so.priority = %s") params.append(priority) if employee_id: where_clauses.append("so.employee_id = %s") params.append(employee_id) where = " AND ".join(where_clauses) if where_clauses else "true" cur.execute(f""" SELECT count(*) FROM service_orders so WHERE {where} """, params) total = cur.fetchone()[0] cur.execute(f""" SELECT so.id, so.order_number, so.status, so.priority, so.customer_id, c.name as customer_name, so.vehicle_id, fv.plate as vehicle_plate, so.estimated_cost, so.estimated_completion, so.created_at FROM service_orders so LEFT JOIN customers c ON so.customer_id = c.id LEFT JOIN fleet_vehicles fv ON so.vehicle_id = fv.id WHERE {where} ORDER BY CASE so.priority WHEN 'urgent' THEN 1 WHEN 'high' THEN 2 WHEN 'normal' THEN 3 WHEN 'low' THEN 4 END, so.created_at DESC LIMIT %s OFFSET %s """, params + [per_page, (page - 1) * per_page]) orders = [] for r in cur.fetchall(): orders.append({ 'id': r[0], 'order_number': r[1], 'status': r[2], 'priority': r[3], 'customer_id': r[4], 'customer_name': r[5], 'vehicle_id': r[6], 'vehicle_plate': r[7], 'estimated_cost': float(r[8]) if r[8] else None, 'estimated_completion': str(r[9]) if r[9] else None, 'created_at': str(r[10]), }) cur.close() total_pages = (total + per_page - 1) // per_page return { 'data': orders, 'pagination': {'page': page, 'per_page': per_page, 'total': total, 'total_pages': total_pages} } def update_status(conn, so_id, new_status, changed_by=None, notes=None): """Update service order status with validation.""" cur = conn.cursor() cur.execute("SELECT status FROM service_orders WHERE id = %s", (so_id,)) row = cur.fetchone() if not row: cur.close() raise ValueError("Service order not found") old_status = row[0] if new_status not in VALID_TRANSITIONS.get(old_status, []): cur.close() raise ValueError(f"Invalid transition: {old_status} -> {new_status}") # Update status extra_sets = [] extra_vals = [] if new_status == 'ready': extra_sets.append("actual_completion = NOW()") if new_status == 'delivered': extra_sets.append("delivered_at = NOW()") extra_sets.append("delivered_by = %s") extra_vals.append(changed_by) set_clause = ", ".join(["status = %s"] + extra_sets) cur.execute(f""" UPDATE service_orders SET {set_clause} WHERE id = %s """, [new_status] + extra_vals + [so_id]) # Log history cur.execute(""" INSERT INTO service_order_status_history (service_order_id, old_status, new_status, changed_by, notes) VALUES (%s, %s, %s, %s, %s) """, (so_id, old_status, new_status, changed_by, notes)) conn.commit() cur.close() return {'old_status': old_status, 'new_status': new_status} def add_item(conn, so_id, item_data): """Add a part/item to the service order.""" cur = conn.cursor() cur.execute(""" INSERT INTO service_order_items (service_order_id, inventory_id, part_number, name, quantity, unit_cost, unit_price, status, notes) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id """, ( so_id, item_data.get('inventory_id'), item_data.get('part_number'), item_data.get('name'), item_data.get('quantity', 1), item_data.get('unit_cost'), item_data.get('unit_price'), item_data.get('status', 'pending'), item_data.get('notes'), )) item_id = cur.fetchone()[0] conn.commit() cur.close() return item_id def update_item(conn, item_id, data): cur = conn.cursor() allowed = ['part_number', 'name', 'quantity', 'unit_cost', 'unit_price', 'status', 'notes'] sets = [] vals = [] for field in allowed: if field in data: sets.append(f"{field} = %s") vals.append(data[field]) if not sets: cur.close() return False vals.append(item_id) cur.execute(f"UPDATE service_order_items SET {', '.join(sets)} WHERE id = %s", vals) conn.commit() cur.close() return True def remove_item(conn, item_id): cur = conn.cursor() cur.execute("DELETE FROM service_order_items WHERE id = %s", (item_id,)) conn.commit() cur.close() return True def add_labor(conn, so_id, labor_data): cur = conn.cursor() total_cost = labor_data.get('hours', 0) * labor_data.get('hourly_rate', 0) cur.execute(""" INSERT INTO service_order_labor (service_order_id, description, hours, hourly_rate, total_cost, employee_id, status) VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING id """, ( so_id, labor_data['description'], labor_data.get('hours', 0), labor_data.get('hourly_rate', 0), total_cost, labor_data.get('employee_id'), labor_data.get('status', 'pending'), )) labor_id = cur.fetchone()[0] conn.commit() cur.close() return labor_id def update_labor(conn, labor_id, data): cur = conn.cursor() allowed = ['description', 'hours', 'hourly_rate', 'employee_id', 'status'] sets = [] vals = [] for field in allowed: if field in data: sets.append(f"{field} = %s") vals.append(data[field]) if not sets: cur.close() return False # Recalculate total_cost if hours or rate changed cur.execute("SELECT hours, hourly_rate FROM service_order_labor WHERE id = %s", (labor_id,)) row = cur.fetchone() hours = data.get('hours', row[0]) if row else data.get('hours', 0) rate = data.get('hourly_rate', row[1]) if row else data.get('hourly_rate', 0) sets.append("total_cost = %s") vals.append((hours or 0) * (rate or 0)) vals.append(labor_id) cur.execute(f"UPDATE service_order_labor SET {', '.join(sets)} WHERE id = %s", vals) conn.commit() cur.close() return True def remove_labor(conn, labor_id): cur = conn.cursor() cur.execute("DELETE FROM service_order_labor WHERE id = %s", (labor_id,)) conn.commit() cur.close() return True def update_service_order(conn, so_id, data): """Update general service order fields.""" cur = conn.cursor() allowed = ['priority', 'reception_notes', 'diagnosis_notes', 'repair_notes', 'delivery_notes', 'estimated_cost', 'estimated_completion', 'employee_id', 'mileage_out', 'fuel_level', 'final_cost'] sets = [] vals = [] for field in allowed: if field in data: sets.append(f"{field} = %s") vals.append(data[field]) if not sets: cur.close() return False vals.append(so_id) cur.execute(f"UPDATE service_orders SET {', '.join(sets)} WHERE id = %s", vals) conn.commit() cur.close() return True def get_kanban_summary(conn, branch_id=None): """Get counts per status for Kanban board.""" cur = conn.cursor() params = [] branch_filter = "" if branch_id: branch_filter = "AND branch_id = %s" params.append(branch_id) cur.execute(f""" SELECT status, COUNT(*) as cnt FROM service_orders WHERE status != 'cancelled' {branch_filter} GROUP BY status """, params) summary = {status: 0 for status in VALID_TRANSITIONS if status != 'cancelled'} for r in cur.fetchall(): summary[r[0]] = r[1] # Overdue orders (estimated_completion passed and not ready/delivered) cur.execute(f""" SELECT count(*) FROM service_orders WHERE estimated_completion < NOW() AND status NOT IN ('ready', 'delivered', 'cancelled') {branch_filter} """, params) overdue = cur.fetchone()[0] cur.close() summary['overdue'] = overdue return summary # ─── Workshop inventory integration ───────────────────────────────────────── def reserve_item(conn, so_item_id, branch_id, employee_id=None): """Reserve inventory for a service order item. Records a negative SO_RESERVE operation and updates reserved_quantity. Raises ValueError if stock is insufficient. """ cur = conn.cursor() cur.execute( """ SELECT soi.service_order_id, soi.inventory_id, soi.quantity, soi.status, so.order_number FROM service_order_items soi JOIN service_orders so ON so.id = soi.service_order_id WHERE soi.id = %s """, (so_item_id,), ) row = cur.fetchone() if not row: cur.close() raise ValueError("Service order item not found") so_id, inventory_id, quantity, status, order_number = row if status == "cancelled": cur.close() raise ValueError("Cannot reserve a cancelled item") if not inventory_id: cur.close() raise ValueError("Item has no inventory linked") qty = int(quantity) available = inventory_engine.get_stock(conn, inventory_id, branch_id) if available < qty: cur.close() raise ValueError(f"Insufficient stock. Available: {available}, requested: {qty}") inventory_engine.record_operation( conn, inventory_id, branch_id, "SO_RESERVE", -qty, reference_id=so_id, reference_type="service_order_item", notes=f"Reserva orden {order_number}", employee_id=employee_id, ) cur.execute( "UPDATE service_order_items SET reserved_quantity = %s WHERE id = %s", (qty, so_item_id), ) conn.commit() cur.close() return {"reserved": qty} def release_item(conn, so_item_id, employee_id=None): """Release a previous reservation for a service order item. Records a positive SO_RELEASE operation and resets reserved_quantity. """ cur = conn.cursor() cur.execute( """ SELECT soi.service_order_id, soi.inventory_id, soi.reserved_quantity, so.branch_id, so.order_number FROM service_order_items soi JOIN service_orders so ON so.id = soi.service_order_id WHERE soi.id = %s """, (so_item_id,), ) row = cur.fetchone() if not row: cur.close() raise ValueError("Service order item not found") so_id, inventory_id, reserved_qty, branch_id, order_number = row if not inventory_id or not reserved_qty: cur.close() return {"released": 0} qty = int(reserved_qty) inventory_engine.record_operation( conn, inventory_id, branch_id, "SO_RELEASE", qty, reference_id=so_id, reference_type="service_order_item", notes=f"Liberacion reserva orden {order_number}", employee_id=employee_id, ) cur.execute( "UPDATE service_order_items SET reserved_quantity = 0 WHERE id = %s", (so_item_id,), ) conn.commit() cur.close() return {"released": qty} def _consume_item_inventory(conn, so_item, sale_id, order_number, branch_id, employee_id=None): """Release reservation and record final SALE for a service order item.""" inventory_id = so_item.get("inventory_id") reserved_qty = so_item.get("reserved_quantity", 0) qty = int(so_item.get("quantity", 0)) if not inventory_id or qty <= 0: return if reserved_qty: inventory_engine.record_operation( conn, inventory_id, branch_id, "SO_RELEASE", int(reserved_qty), reference_id=so_item.get("service_order_id"), reference_type="service_order", notes=f"Liberacion para venta orden {order_number}", employee_id=employee_id, ) inventory_engine.record_operation( conn, inventory_id, branch_id, "SALE", -qty, reference_id=sale_id, reference_type="sale", notes=f"Venta desde orden {order_number}", employee_id=employee_id, ) def convert_to_sale(conn, so_id, sale_data, employee_id=None): """Convert a service order into a POS sale. sale_data keys: payment_method: 'efectivo' | 'transferencia' | 'tarjeta' | 'mixto' sale_type: 'cash' | 'credit' | 'mixed' register_id: int (optional) amount_paid: float (optional) payment_details: list (optional) notes: str (optional) Returns dict with sale_id, total, items_count. """ cur = conn.cursor() so = get_service_order(conn, so_id) if not so: cur.close() raise ValueError("Service order not found") if so["status"] == "cancelled": cur.close() raise ValueError("Cannot convert a cancelled service order") if so.get("sale_id"): cur.close() raise ValueError("Service order already converted to sale") branch_id = so["branch_id"] customer_id = so["customer_id"] # Build sale items from SO parts and labor sale_items = [] for item in so.get("items", []): if item.get("status") == "cancelled": continue qty = int(item.get("quantity", 1)) unit_price = float(item.get("unit_price") or 0) unit_cost = float(item.get("unit_cost") or 0) sale_items.append( { "inventory_id": item.get("inventory_id"), "part_number": item.get("part_number") or "PART", "name": item.get("name") or "Refaccion", "quantity": qty, "unit_price": unit_price, "unit_cost": unit_cost, "tax_rate": 0.16, } ) for labor in so.get("labor", []): if labor.get("status") == "cancelled": continue sale_items.append( { "inventory_id": None, "part_number": "SERV", "name": labor.get("description") or "Mano de obra", "quantity": 1, "unit_price": float(labor.get("total_cost") or 0), "unit_cost": 0, "tax_rate": 0.16, } ) if not sale_items: cur.close() raise ValueError("No items or labor to invoice") # Calculate totals subtotal = 0.0 tax_total = 0.0 for item in sale_items: item_subtotal = item["quantity"] * item["unit_price"] item_tax = item_subtotal * item["tax_rate"] item["subtotal"] = item_subtotal item["tax_amount"] = item_tax subtotal += item_subtotal tax_total += item_tax total = subtotal + tax_total payment_method = sale_data.get("payment_method", "efectivo") sale_type = sale_data.get("sale_type", "cash") register_id = sale_data.get("register_id") amount_paid = float(sale_data.get("amount_paid", total if sale_type == "cash" else 0)) change_given = max(amount_paid - total, 0) if sale_type == "cash" and payment_method == "efectivo" else 0 notes = sale_data.get("notes") or f"Orden de servicio {so['order_number']}" metodo_pago_sat = "PPD" if sale_type == "credit" else "PUE" forma_pago_map = {"efectivo": "01", "transferencia": "03", "tarjeta": "04", "mixto": "99"} forma_pago_sat = forma_pago_map.get(payment_method, "99") cur.execute( """ INSERT INTO sales (branch_id, customer_id, employee_id, register_id, sale_type, payment_method, subtotal, discount_total, tax_total, total, amount_paid, change_given, metodo_pago_sat, forma_pago_sat, status, notes) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'completed', %s) RETURNING id, created_at """, ( branch_id, customer_id, employee_id, register_id, sale_type, payment_method, subtotal, 0, tax_total, total, amount_paid, change_given, metodo_pago_sat, forma_pago_sat, notes, ), ) sale_id, _created_at = cur.fetchone() # Insert sale_items for item in sale_items: cur.execute( """ INSERT INTO sale_items (sale_id, inventory_id, part_number, name, quantity, unit_price, unit_cost, tax_rate, tax_amount, subtotal) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, ( sale_id, item["inventory_id"], item["part_number"], item["name"], item["quantity"], item["unit_price"], item["unit_cost"], item["tax_rate"], item["tax_amount"], item["subtotal"], ), ) # Consume inventory for parts for item in so.get("items", []): if item.get("status") == "cancelled": continue _consume_item_inventory( conn, item, sale_id, so["order_number"], branch_id, employee_id ) # Link order to sale cur.execute("UPDATE service_orders SET sale_id = %s WHERE id = %s", (sale_id, so_id)) conn.commit() cur.close() return {"sale_id": sale_id, "total": total, "items_count": len(sale_items)} def assign_mechanic(conn, so_id, employee_id): """Assign a mechanic/technician to a service order.""" cur = conn.cursor() cur.execute("SELECT id FROM service_orders WHERE id = %s", (so_id,)) if not cur.fetchone(): cur.close() raise ValueError("Service order not found") cur.execute( "UPDATE service_orders SET employee_id = %s WHERE id = %s", (employee_id, so_id), ) conn.commit() cur.close() return {"employee_id": employee_id} # ─── Service catalog (reusable labor concepts) ─────────────────────────────── def list_service_catalog(conn, active_only=True): """List reusable labor/service concepts.""" cur = conn.cursor() where = "WHERE is_active = true" if active_only else "" cur.execute( f""" SELECT id, tenant_id, name, description, suggested_hours, suggested_rate, is_active, created_at, updated_at FROM service_catalog {where} ORDER BY name """ ) items = [] for r in cur.fetchall(): items.append( { "id": r[0], "tenant_id": r[1], "name": r[2], "description": r[3], "suggested_hours": float(r[4]) if r[4] else 0, "suggested_rate": float(r[5]) if r[5] else 0, "is_active": r[6], "created_at": str(r[7]) if r[7] else None, "updated_at": str(r[8]) if r[8] else None, } ) cur.close() return items def create_service_catalog_item(conn, tenant_id, data): """Create a reusable labor concept.""" cur = conn.cursor() cur.execute( """ INSERT INTO service_catalog (tenant_id, name, description, suggested_hours, suggested_rate, is_active) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id """, ( tenant_id, data.get("name"), data.get("description"), data.get("suggested_hours", 0), data.get("suggested_rate", 0), data.get("is_active", True), ), ) item_id = cur.fetchone()[0] conn.commit() cur.close() return {"id": item_id} def update_service_catalog_item(conn, item_id, data): """Update a reusable labor concept.""" cur = conn.cursor() allowed = ["name", "description", "suggested_hours", "suggested_rate", "is_active"] sets = [] vals = [] for field in allowed: if field in data: sets.append(f"{field} = %s") vals.append(data[field]) if not sets: cur.close() return False vals.append(item_id) cur.execute(f"UPDATE service_catalog SET {', '.join(sets)} WHERE id = %s", vals) conn.commit() cur.close() return True def delete_service_catalog_item(conn, item_id): """Soft-delete a reusable labor concept by setting is_active = false.""" cur = conn.cursor() cur.execute( "UPDATE service_catalog SET is_active = false WHERE id = %s", (item_id,) ) conn.commit() cur.close() return True