Files
Autoparts-DB/pos/services/peer_service.py
consultoria-as e3c85fd647 FASE 7b: DB Performance — Pooling, Stock Summary, N+1 fix
Cambios implementados:

1. Connection pooling (tenant_db.py):
   - psycopg2.pool.ThreadedConnectionPool para master y tenants
   - Wrapper _PooledConnection que devuelve al pool en .close()
   - Cero cambios en blueprints (backward compatible)

2. Tabla inventory_stock_summary + triggers (v3.2):
   - O(1) stock lookup en vez de SUM() sobre historial completo
   - Trigger AFTER INSERT en inventory_operations recalcula stock
   - Poblada inicialmente en ambos tenants
   - Refactor en 6 archivos de servicios para usar la nueva tabla

3. Fix N+1 en process_sale (pos_engine.py):
   - Precarga retail_price en bulk query FOR UPDATE
   - Elimina SELECT individual por item en loop

4. Índices críticos:
   - idx_parts_name_part + pattern_ops (master)
   - idx_inv_ops_inventory_branch_created (tenants)
   - idx_wi_part_stock_positive (master, ya existía desde Fase 1)

Tests: 73/73 pasando (compat + fase3 + fase5 + fase6)
Migración: v3.2_db_performance.sql
2026-04-27 07:34:31 +00:00

237 lines
8.2 KiB
Python

"""
Peer-to-peer inventory service for multi-instance Nexus deployments.
Each Nexus instance is autonomous (own DB, own POS) but can see inventory
from other instances on the network. The marketplace fans out to all peers
and merges results so users see stock from the whole Nexus network.
Architecture:
- peers.json: config file listing known peer instances (name + URL)
- /pos/api/peer/inventory: public endpoint each instance exposes (no auth)
- search_all_peers(): fan-out query to all enabled peers + local DB
For the demo (LAN), peers are static IPs in peers.json.
For production (clients on own networks), this will evolve into a central
hub model where each instance reports to a cloud server.
"""
import json
import os
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Optional
# ─── Config ──────────────────────────────────────────────────────────────
_CONFIG_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'peers.json')
_config_cache = None
def _load_config():
"""Load peers.json, cached in memory after first read."""
global _config_cache
if _config_cache is not None:
return _config_cache
try:
with open(_CONFIG_PATH, 'r') as f:
_config_cache = json.load(f)
except (FileNotFoundError, json.JSONDecodeError) as e:
print(f'[peer] Warning: could not load {_CONFIG_PATH}: {e}')
_config_cache = {'instance_name': 'Unknown', 'peers': [], 'peer_timeout_seconds': 3}
return _config_cache
def reload_config():
"""Force-reload peers.json (call after editing the file)."""
global _config_cache
_config_cache = None
return _load_config()
def get_instance_name() -> str:
return _load_config().get('instance_name', 'Unknown')
def get_instance_id() -> str:
return _load_config().get('instance_id', 'unknown')
def get_peers() -> list[dict]:
"""Return list of enabled peers: [{name, url, enabled}]"""
cfg = _load_config()
return [p for p in cfg.get('peers', []) if p.get('enabled', True)]
def get_timeout() -> int:
return _load_config().get('peer_timeout_seconds', 3)
# ─── Local inventory query (what WE expose to peers) ─────────────────────
def get_local_inventory(tenant_conn, query: str = None, limit: int = 50) -> list[dict]:
"""Query this instance's inventory for the peer endpoint.
Returns parts WITH stock > 0, with enough detail for the marketplace
to render results (part number, name, brand, price, stock hint).
No exact stock numbers — just 'En stock' (per business decision).
"""
cur = tenant_conn.cursor()
# Build WHERE clause
clauses = ["COALESCE(s.stock, 0) > 0", "i.is_active = TRUE"]
params = []
if query:
clauses.append("(i.part_number ILIKE %s OR i.name ILIKE %s OR i.brand ILIKE %s)")
like = f'%{query}%'
params.extend([like, like, like])
where = " AND ".join(clauses)
cur.execute(f"""
SELECT i.id, i.part_number, i.name, i.brand, i.price_1,
COALESCE(s.stock, 0) AS stock,
i.unit, i.catalog_part_id
FROM inventory i
LEFT JOIN inventory_stock_summary s ON s.inventory_id = i.id
WHERE {where}
ORDER BY i.name
LIMIT %s
""", params + [limit])
rows = cur.fetchall()
cur.close()
return [
{
'id': r[0],
'part_number': r[1],
'name': r[2],
'brand': r[3] or '',
'price': float(r[4]) if r[4] else None,
'stock_hint': 'En stock' if r[5] > 0 else 'Agotado',
'unit': r[6] or 'PZA',
'catalog_part_id': r[7],
}
for r in rows
]
# ─── Peer fan-out query ──────────────────────────────────────────────────
def _query_one_peer(peer: dict, query: str, limit: int) -> dict:
"""Send a search request to one peer instance. Returns results or error."""
url = peer['url'].rstrip('/') + '/pos/api/peer/inventory'
params = {'limit': limit}
if query:
params['q'] = query
try:
resp = requests.get(url, params=params, timeout=get_timeout())
if resp.status_code == 200:
data = resp.json()
# Tag each result with the source instance name
items = data.get('data', [])
for item in items:
item['source_instance'] = peer['name']
item['source_url'] = peer['url']
return {'ok': True, 'name': peer['name'], 'data': items}
else:
return {'ok': False, 'name': peer['name'], 'error': f'HTTP {resp.status_code}'}
except requests.exceptions.Timeout:
return {'ok': False, 'name': peer['name'], 'error': 'timeout'}
except requests.exceptions.ConnectionError:
return {'ok': False, 'name': peer['name'], 'error': 'offline'}
except Exception as e:
return {'ok': False, 'name': peer['name'], 'error': str(e)[:100]}
def search_all_peers(tenant_conn, query: str = None, limit: int = 50) -> dict:
"""Search local inventory + all enabled peers in parallel.
Returns:
{
"local": { "name": "...", "data": [...] },
"peers": [
{"name": "Refac B", "data": [...], "ok": True},
{"name": "Refac C", "data": [...], "ok": True},
...
],
"merged": [...], # all results combined, local first
"total": N,
"errors": [...] # peers that failed
}
"""
peers = get_peers()
# Local results
local_data = get_local_inventory(tenant_conn, query=query, limit=limit)
for item in local_data:
item['source_instance'] = get_instance_name()
item['source_url'] = 'local'
# Fan-out to peers in parallel
peer_results = []
errors = []
if peers:
with ThreadPoolExecutor(max_workers=min(len(peers), 5)) as executor:
futures = {
executor.submit(_query_one_peer, p, query, limit): p
for p in peers
}
for future in as_completed(futures):
result = future.result()
if result['ok']:
peer_results.append(result)
else:
errors.append(result)
print(f'[peer] {result["name"]}: {result["error"]}')
# Merge: local first, then peers (sorted by name within each source)
merged = list(local_data)
for pr in peer_results:
merged.extend(pr.get('data', []))
return {
'local': {
'name': get_instance_name(),
'data': local_data,
'count': len(local_data),
},
'peers': peer_results,
'merged': merged,
'total': len(merged),
'errors': errors,
}
# ─── Health check for the peer network ───────────────────────────────────
def check_peer_health() -> list[dict]:
"""Ping all peers and return status. Useful for the admin dashboard."""
peers = get_peers()
results = []
def _ping(peer):
try:
url = peer['url'].rstrip('/') + '/pos/api/peer/health'
resp = requests.get(url, timeout=get_timeout())
if resp.status_code == 200:
data = resp.json()
return {
'name': peer['name'],
'url': peer['url'],
'status': 'online',
'instance_name': data.get('instance_name'),
'inventory_count': data.get('inventory_count'),
}
return {'name': peer['name'], 'url': peer['url'], 'status': f'error:{resp.status_code}'}
except Exception as e:
return {'name': peer['name'], 'url': peer['url'], 'status': f'offline:{str(e)[:50]}'}
if peers:
with ThreadPoolExecutor(max_workers=min(len(peers), 5)) as executor:
results = list(executor.map(_ping, peers))
return results