Cambios implementados: 1. Connection pooling (tenant_db.py): - psycopg2.pool.ThreadedConnectionPool para master y tenants - Wrapper _PooledConnection que devuelve al pool en .close() - Cero cambios en blueprints (backward compatible) 2. Tabla inventory_stock_summary + triggers (v3.2): - O(1) stock lookup en vez de SUM() sobre historial completo - Trigger AFTER INSERT en inventory_operations recalcula stock - Poblada inicialmente en ambos tenants - Refactor en 6 archivos de servicios para usar la nueva tabla 3. Fix N+1 en process_sale (pos_engine.py): - Precarga retail_price en bulk query FOR UPDATE - Elimina SELECT individual por item en loop 4. Índices críticos: - idx_parts_name_part + pattern_ops (master) - idx_inv_ops_inventory_branch_created (tenants) - idx_wi_part_stock_positive (master, ya existía desde Fase 1) Tests: 73/73 pasando (compat + fase3 + fase5 + fase6) Migración: v3.2_db_performance.sql
237 lines
8.2 KiB
Python
237 lines
8.2 KiB
Python
"""
|
|
Peer-to-peer inventory service for multi-instance Nexus deployments.
|
|
|
|
Each Nexus instance is autonomous (own DB, own POS) but can see inventory
|
|
from other instances on the network. The marketplace fans out to all peers
|
|
and merges results so users see stock from the whole Nexus network.
|
|
|
|
Architecture:
|
|
- peers.json: config file listing known peer instances (name + URL)
|
|
- /pos/api/peer/inventory: public endpoint each instance exposes (no auth)
|
|
- search_all_peers(): fan-out query to all enabled peers + local DB
|
|
|
|
For the demo (LAN), peers are static IPs in peers.json.
|
|
For production (clients on own networks), this will evolve into a central
|
|
hub model where each instance reports to a cloud server.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import requests
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from typing import Optional
|
|
|
|
# ─── Config ──────────────────────────────────────────────────────────────
|
|
|
|
_CONFIG_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'peers.json')
|
|
_config_cache = None
|
|
|
|
|
|
def _load_config():
|
|
"""Load peers.json, cached in memory after first read."""
|
|
global _config_cache
|
|
if _config_cache is not None:
|
|
return _config_cache
|
|
try:
|
|
with open(_CONFIG_PATH, 'r') as f:
|
|
_config_cache = json.load(f)
|
|
except (FileNotFoundError, json.JSONDecodeError) as e:
|
|
print(f'[peer] Warning: could not load {_CONFIG_PATH}: {e}')
|
|
_config_cache = {'instance_name': 'Unknown', 'peers': [], 'peer_timeout_seconds': 3}
|
|
return _config_cache
|
|
|
|
|
|
def reload_config():
|
|
"""Force-reload peers.json (call after editing the file)."""
|
|
global _config_cache
|
|
_config_cache = None
|
|
return _load_config()
|
|
|
|
|
|
def get_instance_name() -> str:
|
|
return _load_config().get('instance_name', 'Unknown')
|
|
|
|
|
|
def get_instance_id() -> str:
|
|
return _load_config().get('instance_id', 'unknown')
|
|
|
|
|
|
def get_peers() -> list[dict]:
|
|
"""Return list of enabled peers: [{name, url, enabled}]"""
|
|
cfg = _load_config()
|
|
return [p for p in cfg.get('peers', []) if p.get('enabled', True)]
|
|
|
|
|
|
def get_timeout() -> int:
|
|
return _load_config().get('peer_timeout_seconds', 3)
|
|
|
|
|
|
# ─── Local inventory query (what WE expose to peers) ─────────────────────
|
|
|
|
def get_local_inventory(tenant_conn, query: str = None, limit: int = 50) -> list[dict]:
|
|
"""Query this instance's inventory for the peer endpoint.
|
|
|
|
Returns parts WITH stock > 0, with enough detail for the marketplace
|
|
to render results (part number, name, brand, price, stock hint).
|
|
No exact stock numbers — just 'En stock' (per business decision).
|
|
"""
|
|
cur = tenant_conn.cursor()
|
|
|
|
# Build WHERE clause
|
|
clauses = ["COALESCE(s.stock, 0) > 0", "i.is_active = TRUE"]
|
|
params = []
|
|
|
|
if query:
|
|
clauses.append("(i.part_number ILIKE %s OR i.name ILIKE %s OR i.brand ILIKE %s)")
|
|
like = f'%{query}%'
|
|
params.extend([like, like, like])
|
|
|
|
where = " AND ".join(clauses)
|
|
|
|
cur.execute(f"""
|
|
SELECT i.id, i.part_number, i.name, i.brand, i.price_1,
|
|
COALESCE(s.stock, 0) AS stock,
|
|
i.unit, i.catalog_part_id
|
|
FROM inventory i
|
|
LEFT JOIN inventory_stock_summary s ON s.inventory_id = i.id
|
|
WHERE {where}
|
|
ORDER BY i.name
|
|
LIMIT %s
|
|
""", params + [limit])
|
|
|
|
rows = cur.fetchall()
|
|
cur.close()
|
|
|
|
return [
|
|
{
|
|
'id': r[0],
|
|
'part_number': r[1],
|
|
'name': r[2],
|
|
'brand': r[3] or '',
|
|
'price': float(r[4]) if r[4] else None,
|
|
'stock_hint': 'En stock' if r[5] > 0 else 'Agotado',
|
|
'unit': r[6] or 'PZA',
|
|
'catalog_part_id': r[7],
|
|
}
|
|
for r in rows
|
|
]
|
|
|
|
|
|
# ─── Peer fan-out query ──────────────────────────────────────────────────
|
|
|
|
def _query_one_peer(peer: dict, query: str, limit: int) -> dict:
|
|
"""Send a search request to one peer instance. Returns results or error."""
|
|
url = peer['url'].rstrip('/') + '/pos/api/peer/inventory'
|
|
params = {'limit': limit}
|
|
if query:
|
|
params['q'] = query
|
|
try:
|
|
resp = requests.get(url, params=params, timeout=get_timeout())
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
# Tag each result with the source instance name
|
|
items = data.get('data', [])
|
|
for item in items:
|
|
item['source_instance'] = peer['name']
|
|
item['source_url'] = peer['url']
|
|
return {'ok': True, 'name': peer['name'], 'data': items}
|
|
else:
|
|
return {'ok': False, 'name': peer['name'], 'error': f'HTTP {resp.status_code}'}
|
|
except requests.exceptions.Timeout:
|
|
return {'ok': False, 'name': peer['name'], 'error': 'timeout'}
|
|
except requests.exceptions.ConnectionError:
|
|
return {'ok': False, 'name': peer['name'], 'error': 'offline'}
|
|
except Exception as e:
|
|
return {'ok': False, 'name': peer['name'], 'error': str(e)[:100]}
|
|
|
|
|
|
def search_all_peers(tenant_conn, query: str = None, limit: int = 50) -> dict:
|
|
"""Search local inventory + all enabled peers in parallel.
|
|
|
|
Returns:
|
|
{
|
|
"local": { "name": "...", "data": [...] },
|
|
"peers": [
|
|
{"name": "Refac B", "data": [...], "ok": True},
|
|
{"name": "Refac C", "data": [...], "ok": True},
|
|
...
|
|
],
|
|
"merged": [...], # all results combined, local first
|
|
"total": N,
|
|
"errors": [...] # peers that failed
|
|
}
|
|
"""
|
|
peers = get_peers()
|
|
|
|
# Local results
|
|
local_data = get_local_inventory(tenant_conn, query=query, limit=limit)
|
|
for item in local_data:
|
|
item['source_instance'] = get_instance_name()
|
|
item['source_url'] = 'local'
|
|
|
|
# Fan-out to peers in parallel
|
|
peer_results = []
|
|
errors = []
|
|
|
|
if peers:
|
|
with ThreadPoolExecutor(max_workers=min(len(peers), 5)) as executor:
|
|
futures = {
|
|
executor.submit(_query_one_peer, p, query, limit): p
|
|
for p in peers
|
|
}
|
|
for future in as_completed(futures):
|
|
result = future.result()
|
|
if result['ok']:
|
|
peer_results.append(result)
|
|
else:
|
|
errors.append(result)
|
|
print(f'[peer] {result["name"]}: {result["error"]}')
|
|
|
|
# Merge: local first, then peers (sorted by name within each source)
|
|
merged = list(local_data)
|
|
for pr in peer_results:
|
|
merged.extend(pr.get('data', []))
|
|
|
|
return {
|
|
'local': {
|
|
'name': get_instance_name(),
|
|
'data': local_data,
|
|
'count': len(local_data),
|
|
},
|
|
'peers': peer_results,
|
|
'merged': merged,
|
|
'total': len(merged),
|
|
'errors': errors,
|
|
}
|
|
|
|
|
|
# ─── Health check for the peer network ───────────────────────────────────
|
|
|
|
def check_peer_health() -> list[dict]:
|
|
"""Ping all peers and return status. Useful for the admin dashboard."""
|
|
peers = get_peers()
|
|
results = []
|
|
|
|
def _ping(peer):
|
|
try:
|
|
url = peer['url'].rstrip('/') + '/pos/api/peer/health'
|
|
resp = requests.get(url, timeout=get_timeout())
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
return {
|
|
'name': peer['name'],
|
|
'url': peer['url'],
|
|
'status': 'online',
|
|
'instance_name': data.get('instance_name'),
|
|
'inventory_count': data.get('inventory_count'),
|
|
}
|
|
return {'name': peer['name'], 'url': peer['url'], 'status': f'error:{resp.status_code}'}
|
|
except Exception as e:
|
|
return {'name': peer['name'], 'url': peer['url'], 'status': f'offline:{str(e)[:50]}'}
|
|
|
|
if peers:
|
|
with ThreadPoolExecutor(max_workers=min(len(peers), 5)) as executor:
|
|
results = list(executor.map(_ping, peers))
|
|
|
|
return results
|