Catalogo de Servicios (builder): codigo + documentacion extensiva

Builder multi-proveedor de servicios (tour / A&B / transportacion).
Python stdlib + SQLite + vanilla JS SPA. Hereda filosofia del Hub.

Secretos y datos (catalogo.db, secret.key, uploads/) excluidos via .gitignore.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
consultoria-as
2026-06-09 21:00:50 -07:00
commit 38e9e4b91c
8 changed files with 1814 additions and 0 deletions

597
server.py Normal file
View File

@@ -0,0 +1,597 @@
"""
Catálogo (borrador) — Servidor local
Builder de catálogo multi-proveedor de servicios (tours / A&B / transportación).
Base: filosofía del Art4Hotel Hub (servicio = fuente única de verdad).
Zero dependencias externas (stdlib only).
Uso:
python3 server.py
http://localhost:4402
"""
import http.server, json, sqlite3, os, urllib.parse, re, mimetypes
import hashlib, hmac, base64, secrets, time, http.cookies
from datetime import datetime
from pathlib import Path
PORT = 4403
BASE = Path(__file__).parent
DB_PATH = BASE / "catalogo.db"
STATIC_DIR = BASE
UPLOADS_DIR = BASE / "uploads"
UPLOADS_DIR.mkdir(exist_ok=True)
MAX_UPLOAD = 25 * 1024 * 1024 # 25MB
# ── Autenticación ──
SESSION_HOURS = 24 * 14
SECRET_FILE = BASE / "secret.key"
def _load_secret():
if SECRET_FILE.exists():
return SECRET_FILE.read_bytes()
s = secrets.token_bytes(32)
SECRET_FILE.write_bytes(s)
try: os.chmod(SECRET_FILE, 0o600)
except Exception: pass
return s
SECRET = _load_secret()
def hash_password(password, salt=None):
if salt is None: salt = secrets.token_hex(16)
h = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 200000).hex()
return salt + '$' + h
def verify_password(password, stored):
try:
salt, _ = stored.split('$', 1)
return hmac.compare_digest(hash_password(password, salt), stored)
except Exception:
return False
def make_session(username):
exp = int(time.time()) + SESSION_HOURS * 3600
payload = f"{username}|{exp}"
sig = hmac.new(SECRET, payload.encode(), hashlib.sha256).hexdigest()
return base64.urlsafe_b64encode(payload.encode()).decode() + '.' + sig
def check_session(token):
try:
b64, sig = token.split('.', 1)
payload = base64.urlsafe_b64decode(b64.encode()).decode()
expect = hmac.new(SECRET, payload.encode(), hashlib.sha256).hexdigest()
if not hmac.compare_digest(sig, expect): return None
username, exp = payload.split('|')
if int(exp) < time.time(): return None
return username
except Exception:
return None
def get_db():
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
def init_db():
conn = get_db(); c = conn.cursor()
# ── Usuarios (auth) ──
c.execute("""
CREATE TABLE IF NOT EXISTS usuarios (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
pass_hash TEXT NOT NULL,
nombre TEXT DEFAULT '',
activo INTEGER DEFAULT 1,
created_at TEXT DEFAULT (datetime('now','localtime'))
)
""")
# ── Proveedores (touroperadores / quien ofrece el servicio) ──
c.execute("""
CREATE TABLE IF NOT EXISTS proveedores (
id INTEGER PRIMARY KEY AUTOINCREMENT,
nombre TEXT NOT NULL,
tipo_principal TEXT DEFAULT 'tour',
contacto TEXT DEFAULT '',
telefono TEXT DEFAULT '',
email TEXT DEFAULT '',
sitio_web TEXT DEFAULT '',
comision_default REAL DEFAULT 0,
notas TEXT DEFAULT '',
activo INTEGER DEFAULT 1,
created_at TEXT DEFAULT (datetime('now','localtime')),
updated_at TEXT DEFAULT (datetime('now','localtime'))
)
""")
# ── Servicios (FUENTE ÚNICA DE VERDAD) ──
# tipo: tour | ayb | transportacion
# unidad: por_persona | por_grupo | por_vehiculo | por_evento
# atributos: JSON con extras específicos por tipo (flexible, no rompe esquema)
c.execute("""
CREATE TABLE IF NOT EXISTS servicios (
id INTEGER PRIMARY KEY AUTOINCREMENT,
proveedor_id INTEGER,
codigo TEXT DEFAULT '',
tipo TEXT DEFAULT 'tour',
categoria TEXT DEFAULT '',
nombre TEXT NOT NULL,
descripcion TEXT DEFAULT '',
terminos TEXT DEFAULT '',
horarios TEXT DEFAULT '',
capacidad_min INTEGER DEFAULT 0,
capacidad_max INTEGER DEFAULT 0,
unidad TEXT DEFAULT 'por_persona',
precio_neto REAL DEFAULT 0,
precio_publico REAL DEFAULT 0,
moneda TEXT DEFAULT 'MXN',
tarifas_adicionales TEXT DEFAULT '',
restricciones TEXT DEFAULT '',
atributos TEXT DEFAULT '',
mostrar_en_web INTEGER DEFAULT 0,
activo INTEGER DEFAULT 1,
notas TEXT DEFAULT '',
created_at TEXT DEFAULT (datetime('now','localtime')),
updated_at TEXT DEFAULT (datetime('now','localtime')),
FOREIGN KEY (proveedor_id) REFERENCES proveedores(id) ON DELETE SET NULL
)
""")
# ── migración: modo de disponibilidad + ubicación + check-in (reserva-lista) ──
scols={r[1] for r in c.execute("PRAGMA table_info(servicios)")}
for col in ['modo_disponibilidad','ubicacion','mapa_url','checkin','anticipacion','menu_detalle']:
if col not in scols: c.execute(f"ALTER TABLE servicios ADD COLUMN {col} TEXT DEFAULT ''")
if 'incluye_alimentos' not in scols: c.execute("ALTER TABLE servicios ADD COLUMN incluye_alimentos INTEGER DEFAULT 0")
conn.commit(); conn.close()
# Config para el CRUD genérico (mismo patrón que el Hub)
TABLES = {
'proveedores': {
'fields': ['nombre','tipo_principal','contacto','telefono','email','sitio_web',
'comision_default','notas','activo'],
'int_fields': ['activo'],
'float_fields': ['comision_default'],
},
'servicios': {
'fields': ['proveedor_id','codigo','tipo','categoria','nombre','descripcion','terminos',
'horarios','modo_disponibilidad','ubicacion','mapa_url','checkin','anticipacion',
'incluye_alimentos','menu_detalle',
'capacidad_min','capacidad_max','unidad','precio_neto','precio_publico',
'moneda','tarifas_adicionales','restricciones','atributos','mostrar_en_web',
'activo','notas'],
'int_fields': ['proveedor_id','capacidad_min','capacidad_max','mostrar_en_web','activo','incluye_alimentos'],
'float_fields': ['precio_neto','precio_publico'],
'nullable_fields': ['proveedor_id'],
},
}
LOGIN_PAGE = """<!DOCTYPE html>
<html lang="es"><head>
<meta charset="UTF-8"><meta name="viewport" content="width=device-width,initial-scale=1">
<title>Catálogo — Acceso</title>
<link href="https://fonts.googleapis.com/css2?family=Inter:wght@400;500;600;700&display=swap" rel="stylesheet">
<style>
*{box-sizing:border-box;margin:0;padding:0}
body{font-family:'Inter',system-ui,sans-serif;background:#f7f7f8;min-height:100vh;display:flex;align-items:center;justify-content:center;padding:20px;color:#16181d;-webkit-font-smoothing:antialiased}
.card{background:#fff;border:1px solid #e8e9ec;border-radius:14px;box-shadow:0 1px 2px rgba(20,24,28,.04),0 12px 32px rgba(20,24,28,.08);width:100%;max-width:380px;padding:38px 34px;text-align:center}
.logo{font-size:20px;font-weight:700;letter-spacing:-.01em;margin-bottom:4px;display:flex;align-items:center;justify-content:center;gap:8px}
.logo .dot{width:7px;height:7px;border-radius:2px;background:#1f4b54;display:inline-block}
.tag{font-size:10px;color:#8b9097;letter-spacing:.14em;text-transform:uppercase;font-weight:500;margin-bottom:26px}
h1{font-size:20px;color:#16181d;font-weight:700;letter-spacing:-.01em;margin-bottom:6px}
.sub{font-size:13px;color:#8b9097;margin-bottom:22px}
label{display:block;text-align:left;font-size:11px;color:#3d424b;letter-spacing:.01em;font-weight:600;margin:12px 0 5px}
input{width:100%;padding:11px 13px;border:1px solid #d9dbe0;border-radius:8px;font-family:inherit;font-size:15px;background:#fff;outline:none}
input:focus{border-color:#1f4b54;box-shadow:0 0 0 3px #eef3f3}
button{width:100%;margin-top:22px;padding:12px;background:#16181d;color:#fff;border:none;border-radius:8px;font-family:inherit;font-size:14px;font-weight:600;letter-spacing:.01em;cursor:pointer;transition:.12s}
button:hover{background:#000}
.err{color:#b3322b;font-size:12px;margin-top:14px;min-height:16px}
</style></head><body>
<div class="card">
<div class="logo"><span class="dot"></span>Catálogo</div>
<div class="tag">builder</div>
<h1 id="title">Iniciar sesión</h1>
<div class="sub" id="subtitle">Acceso al builder de catálogo</div>
<form id="f">
<div id="nombre-fg" style="display:none"><label>Tu nombre</label><input id="nombre" autocomplete="name"></div>
<label>Usuario</label><input id="username" autocomplete="username" autocapitalize="none">
<label>Contraseña</label><input id="password" type="password" autocomplete="current-password">
<button type="submit" id="btn">Entrar</button>
<div class="err" id="err"></div>
</form>
</div>
<script>
let setup=false;
fetch('/api/needs-setup').then(r=>r.json()).then(d=>{
if(d.needs_setup){
setup=true;
document.getElementById('title').textContent='Crear cuenta de administrador';
document.getElementById('subtitle').textContent='Primera vez — define tu usuario y contraseña';
document.getElementById('nombre-fg').style.display='block';
document.getElementById('btn').textContent='Crear cuenta';
document.getElementById('password').setAttribute('autocomplete','new-password');
}
});
document.getElementById('f').onsubmit=async e=>{
e.preventDefault();
const err=document.getElementById('err'); err.textContent='';
const body={username:document.getElementById('username').value.trim().toLowerCase(),
password:document.getElementById('password').value,
nombre:document.getElementById('nombre').value.trim()};
try{
const r=await fetch(setup?'/api/setup':'/api/login',{method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify(body)});
const d=await r.json();
if(d.ok){location.href='/';}
else{err.textContent=d.error||'Error';}
}catch(_){err.textContent='Error de conexión';}
};
</script></body></html>"""
class CatalogoHandler(http.server.SimpleHTTPRequestHandler):
def __init__(self, *a, **kw):
super().__init__(*a, directory=str(STATIC_DIR), **kw)
# ══════ ROUTING ══════
def do_GET(self):
p = urllib.parse.urlparse(self.path)
if p.path == "/api/needs-setup": return self.handle_needs_setup()
if not self.current_user():
if p.path.startswith("/api/") or p.path.startswith("/uploads/"):
return self.json_ok({"error": "no autenticado"}, 401)
return self.serve_login_page()
if p.path in ("", "/"): self.path = "/index.html"; return super().do_GET()
if p.path.startswith("/api/"): return self.api_get(p.path)
if p.path.startswith("/uploads/"): return self.serve_upload(p.path)
return super().do_GET()
def do_POST(self):
if self.path == "/api/login": return self.handle_login()
if self.path == "/api/setup": return self.handle_setup()
if self.path == "/api/logout": return self.handle_logout()
if not self.current_user(): return self.json_ok({"error": "no autenticado"}, 401)
if self.path.startswith("/api/upload/"): return self.handle_upload()
if self.path.startswith("/api/"): return self.api_write("POST", self.path)
self.send_error(404)
def do_PUT(self):
if not self.current_user(): return self.json_ok({"error": "no autenticado"}, 401)
if self.path.startswith("/api/"): return self.api_write("PUT", self.path)
self.send_error(404)
def do_DELETE(self):
if not self.current_user(): return self.json_ok({"error": "no autenticado"}, 401)
if self.path.startswith("/api/files/"): return self.delete_file()
if self.path.startswith("/api/"): return self.api_delete(self.path)
self.send_error(404)
def do_OPTIONS(self):
self.send_response(200)
for h, v in [("Access-Control-Allow-Origin","*"),
("Access-Control-Allow-Methods","GET,POST,PUT,DELETE,OPTIONS"),
("Access-Control-Allow-Headers","Content-Type")]:
self.send_header(h, v)
self.end_headers()
# ══════ HELPERS ══════
def json_ok(self, data, status=200, cookie=None):
self.send_response(status)
self.send_header("Content-Type","application/json")
self.send_header("Access-Control-Allow-Origin","*")
if cookie: self.send_header("Set-Cookie", cookie)
self.end_headers()
self.wfile.write(json.dumps(data, ensure_ascii=False, default=str).encode())
def body(self):
n = int(self.headers.get("Content-Length", 0))
return json.loads(self.rfile.read(n)) if n else {}
# ══════ AUTENTICACIÓN ══════
def current_user(self):
cookie = self.headers.get('Cookie', '')
if not cookie: return None
try:
c = http.cookies.SimpleCookie(cookie)
if 'cat_session' in c:
return check_session(c['cat_session'].value)
except Exception:
return None
return None
def _session_cookie(self, token):
return f"cat_session={token}; HttpOnly; Path=/; Max-Age={SESSION_HOURS*3600}; SameSite=Lax"
def handle_needs_setup(self):
conn = get_db()
n = conn.execute("SELECT COUNT(*) FROM usuarios").fetchone()[0]
conn.close()
self.json_ok({"needs_setup": n == 0})
def handle_setup(self):
conn = get_db()
n = conn.execute("SELECT COUNT(*) FROM usuarios").fetchone()[0]
if n > 0:
conn.close(); return self.json_ok({"error": "Ya hay usuarios configurados"}, 403)
data = self.body()
u = (data.get('username') or '').strip().lower()
p = data.get('password') or ''
nombre = (data.get('nombre') or '').strip()
if not u or len(p) < 6:
conn.close(); return self.json_ok({"error": "Usuario requerido y contraseña de 6+ caracteres"}, 400)
conn.execute("INSERT INTO usuarios (username,pass_hash,nombre) VALUES (?,?,?)",
(u, hash_password(p), nombre or u))
conn.commit(); conn.close()
self.json_ok({"ok": True}, cookie=self._session_cookie(make_session(u)))
def handle_login(self):
data = self.body()
u = (data.get('username') or '').strip().lower()
p = data.get('password') or ''
conn = get_db()
row = conn.execute("SELECT * FROM usuarios WHERE username=? AND activo=1", (u,)).fetchone()
conn.close()
if not row or not verify_password(p, row['pass_hash']):
return self.json_ok({"error": "Usuario o contraseña incorrectos"}, 401)
self.json_ok({"ok": True, "nombre": row['nombre'] or u},
cookie=self._session_cookie(make_session(u)))
def handle_logout(self):
self.json_ok({"ok": True}, cookie="cat_session=; Path=/; Max-Age=0")
def serve_login_page(self):
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.end_headers()
self.wfile.write(LOGIN_PAGE.encode())
# ══════ UPLOADS ══════
def handle_upload(self):
"""POST /api/upload/{entidad}?tipo=foto|doc&label=..."""
parts = self.path.split("/")
entidad = urllib.parse.unquote("/".join(parts[3:])).split("?")[0]
params = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query)
tipo = params.get("tipo", ["foto"])[0]
label = params.get("label", [""])[0].strip()
content_type = self.headers.get("Content-Type", "")
content_length = int(self.headers.get("Content-Length", 0))
if content_length > MAX_UPLOAD:
return self.json_ok({"error": "Archivo muy grande (max 25MB)"}, 413)
if "multipart/form-data" not in content_type:
return self.json_ok({"error": "Usar multipart/form-data"}, 400)
boundary = content_type.split("boundary=")[1].strip()
raw = self.rfile.read(content_length)
files_saved = []
idx = 0
for part in raw.split(f"--{boundary}".encode()):
if b"filename=" not in part: continue
header_end = part.find(b"\r\n\r\n")
if header_end == -1: continue
header = part[:header_end].decode("utf-8", errors="replace")
file_data = part[header_end+4:]
if file_data.endswith(b"\r\n"): file_data = file_data[:-2]
if file_data.endswith(b"--\r\n"): file_data = file_data[:-4]
if file_data.endswith(b"--"): file_data = file_data[:-2]
fn_match = re.search(r'filename="([^"]+)"', header)
if not fn_match or not file_data: continue
orig_name = fn_match.group(1)
ext = ('.' + orig_name.rsplit('.', 1)[1]) if '.' in orig_name else ''
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
if label:
desc = label + (f"_{idx+1}" if idx > 0 else "")
safe_desc = re.sub(r'[^\w\-. ]', '', desc).strip().replace(' ', '_')
final_name = f"{tipo}_{ts}_{safe_desc}{ext}"
else:
safe_name = re.sub(r'[^\w\-._]', '_', orig_name)
final_name = f"{tipo}_{ts}_{safe_name}"
idx += 1
ent_dir = UPLOADS_DIR / re.sub(r'[^\w\-.]', '_', entidad)
ent_dir.mkdir(exist_ok=True)
(ent_dir / final_name).write_bytes(file_data)
files_saved.append({
"name": final_name, "original": orig_name, "tipo": tipo,
"size": len(file_data), "url": f"/uploads/{ent_dir.name}/{final_name}"
})
if files_saved:
self.json_ok({"files": files_saved}, 201)
else:
self.json_ok({"error": "No se encontraron archivos"}, 400)
def serve_upload(self, path):
clean = path.replace("..", "").lstrip("/")
filepath = STATIC_DIR / clean
if not filepath.exists() or not filepath.is_file():
return self.send_error(404)
try:
filepath.resolve().relative_to(UPLOADS_DIR.resolve())
except ValueError:
return self.send_error(403)
mime, _ = mimetypes.guess_type(str(filepath))
if not mime: mime = "application/octet-stream"
data = filepath.read_bytes()
self.send_response(200)
self.send_header("Content-Type", mime)
self.send_header("Content-Length", len(data))
self.send_header("Content-Disposition", f'inline; filename="{filepath.name}"')
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(data)
def delete_file(self):
parts = self.path.split("/")
if len(parts) < 5: return self.send_error(400)
entidad = parts[3]
filename = urllib.parse.unquote(parts[4])
filepath = UPLOADS_DIR / re.sub(r'[^\w\-.]', '_', entidad) / filename
if filepath.exists():
filepath.unlink(); self.json_ok({"ok": True})
else:
self.send_error(404)
# ══════ API GET ══════
def api_get(self, path):
conn = get_db()
try:
if path.startswith("/api/files/"):
entidad = urllib.parse.unquote(path[11:])
ent_dir = UPLOADS_DIR / re.sub(r'[^\w\-.]', '_', entidad)
files = []
if ent_dir.exists():
for f in sorted(ent_dir.iterdir()):
if f.is_file():
ext = f.suffix.lower()
files.append({
"name": f.name, "size": f.stat().st_size,
"url": f"/uploads/{ent_dir.name}/{f.name}",
"is_image": ext in ('.jpg','.jpeg','.png','.gif','.webp','.bmp'),
"ext": ext,
})
return self.json_ok(files)
if path == "/api/file-counts":
IMG_EXT = ('.jpg','.jpeg','.png','.gif','.webp','.bmp')
DOC_PREFIXES = ('doc','terminos','contrato','factura','menu')
PHOTO_PREFIXES = ('foto_','mockup')
result = {}
if UPLOADS_DIR.exists():
for d in UPLOADS_DIR.iterdir():
if not d.is_dir(): continue
files = [f for f in d.iterdir() if f.is_file()]
if not files: continue
imgs = sorted([f for f in files if f.suffix.lower() in IMG_EXT], key=lambda f: f.name)
photo = next((f for f in imgs if f.name.lower().startswith(PHOTO_PREFIXES)), None)
if not photo:
photo = next((f for f in imgs if not f.name.lower().startswith(DOC_PREFIXES)), None)
result[d.name] = {
"count": len(files),
"first_image": f"/uploads/{d.name}/{photo.name}" if photo else None
}
return self.json_ok(result)
if path == "/api/dashboard":
return self.json_ok(self.dashboard(conn))
if path == "/api/proveedores":
rows = conn.execute("""
SELECT p.*, (SELECT COUNT(*) FROM servicios s WHERE s.proveedor_id=p.id) AS servicios_count
FROM proveedores p ORDER BY p.nombre
""").fetchall()
return self.json_ok([dict(r) for r in rows])
if path == "/api/servicios":
rows = conn.execute("""
SELECT s.*, p.nombre AS proveedor_nombre
FROM servicios s LEFT JOIN proveedores p ON p.id=s.proveedor_id
ORDER BY s.id DESC
""").fetchall()
return self.json_ok([dict(r) for r in rows])
# Fallback genérico para cualquier tabla registrada
table = path.split("/")[2] if len(path.split("/")) >= 3 else None
if table in TABLES:
rows = conn.execute(f"SELECT * FROM {table} ORDER BY id DESC").fetchall()
return self.json_ok([dict(r) for r in rows])
self.send_error(404)
finally:
conn.close()
# ══════ API WRITE (CRUD genérico) ══════
def api_write(self, method, path):
parts = path.split("/")
table = parts[2] if len(parts) >= 3 else None
item_id = int(parts[3]) if len(parts) >= 4 and parts[3].isdigit() else None
data = self.body()
if table not in TABLES:
return self.send_error(404)
conn = get_db()
try:
cfg = TABLES[table]
if method == "POST":
cols, vals = [], []
for f in cfg['fields']:
if f in data:
v = data[f]
if f in cfg.get('int_fields',[]): v = int(v or 0)
if f in cfg.get('float_fields',[]): v = float(v or 0)
if f in cfg.get('nullable_fields',[]) and (v == 0 or v == '' or v is None): v = None
cols.append(f); vals.append(v)
placeholders = ','.join(['?']*len(cols))
cur = conn.execute(f"INSERT INTO {table} ({','.join(cols)}) VALUES ({placeholders})", vals)
conn.commit()
return self.json_ok({"id": cur.lastrowid}, 201)
elif method == "PUT" and item_id:
sets, vals = [], []
for f in cfg['fields']:
if f in data:
v = data[f]
if f in cfg.get('int_fields',[]): v = int(v or 0)
if f in cfg.get('float_fields',[]): v = float(v or 0)
if f in cfg.get('nullable_fields',[]) and (v == 0 or v == '' or v is None): v = None
sets.append(f"{f}=?"); vals.append(v)
if sets:
cols_in_table = {r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()}
if 'updated_at' in cols_in_table:
sets.append("updated_at=datetime('now','localtime')")
conn.execute(f"UPDATE {table} SET {','.join(sets)} WHERE id=?", vals+[item_id])
conn.commit()
return self.json_ok({"ok": True})
self.send_error(400)
finally:
conn.close()
def api_delete(self, path):
parts = path.split("/")
table = parts[2] if len(parts) >= 3 else None
item_id = int(parts[3]) if len(parts) >= 4 and parts[3].isdigit() else None
if table not in TABLES or not item_id:
return self.send_error(404)
conn = get_db()
try:
conn.execute(f"DELETE FROM {table} WHERE id=?", (item_id,))
conn.commit()
self.json_ok({"ok": True})
finally:
conn.close()
# ══════ DASHBOARD ══════
def dashboard(self, conn):
por_tipo = {r['tipo']: r['n'] for r in conn.execute(
"SELECT tipo, COUNT(*) n FROM servicios WHERE activo=1 GROUP BY tipo")}
por_prov = {}
for r in conn.execute("""
SELECT COALESCE(p.nombre,'(sin proveedor)') prov, COUNT(*) n
FROM servicios s LEFT JOIN proveedores p ON p.id=s.proveedor_id
WHERE s.activo=1 GROUP BY prov ORDER BY n DESC"""):
por_prov[r['prov']] = r['n']
total_serv = conn.execute("SELECT COUNT(*) FROM servicios WHERE activo=1").fetchone()[0]
total_prov = conn.execute("SELECT COUNT(*) FROM proveedores WHERE activo=1").fetchone()[0]
en_web = conn.execute("SELECT COUNT(*) FROM servicios WHERE activo=1 AND mostrar_en_web=1").fetchone()[0]
# margen promedio (donde hay precios)
mrow = conn.execute("""
SELECT AVG((precio_publico-precio_neto)*100.0/precio_publico)
FROM servicios WHERE activo=1 AND precio_publico>0 AND precio_neto>0""").fetchone()[0]
return {
'por_tipo': por_tipo, 'por_proveedor': por_prov,
'total_servicios': total_serv, 'total_proveedores': total_prov,
'en_web': en_web, 'margen_promedio': round(mrow or 0, 1),
}
def log_message(self, fmt, *args):
if "/api/" in str(args): super().log_message(fmt, *args)
if __name__ == "__main__":
init_db()
print(f"\n Catálogo (borrador) — http://localhost:{PORT}")
print(f" DB: {DB_PATH.name}")
print(f" Ctrl+C para detener\n")
srv = http.server.HTTPServer(("", PORT), CatalogoHandler)
try: srv.serve_forever()
except KeyboardInterrupt: print("\nDetenido."); srv.server_close()