feat: complete session — catalog, marketplace, WhatsApp, peer-to-peer, install scripts
Major features: - Pixel-Perfect glassmorphism design (landing + POS + public catalog) - OEM/Local catalog toggle with Nexpart taxonomy (14 groups, 108 subgroups, 558 part types) - Marketplace B2B Phase 1 (bodegas, POs, status machine, WA+email notifications) - Peer-to-peer inventory (multi-instance, LAN discovery) - WhatsApp: photo→Vision AI, voice→Whisper, conversational quotations - Smart unified search (VIN/plate/part_number/keyword auto-detect) - Shop Supplies tab (vehicle-independent parts) - Chatbot AI fallback chain (5 models) + response cache - CSV inventory import tool + setup_instance.sh installer - Tablet-responsive CSS + sidebar toggle - Filters, export CSV, employee edit, business data save - Quotation system (WA→POS) with auto-print on confirmation - Live stats on landing page Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
240
pos/services/peer_service.py
Normal file
240
pos/services/peer_service.py
Normal file
@@ -0,0 +1,240 @@
|
||||
"""
|
||||
Peer-to-peer inventory service for multi-instance Nexus deployments.
|
||||
|
||||
Each Nexus instance is autonomous (own DB, own POS) but can see inventory
|
||||
from other instances on the network. The marketplace fans out to all peers
|
||||
and merges results so users see stock from the whole Nexus network.
|
||||
|
||||
Architecture:
|
||||
- peers.json: config file listing known peer instances (name + URL)
|
||||
- /pos/api/peer/inventory: public endpoint each instance exposes (no auth)
|
||||
- search_all_peers(): fan-out query to all enabled peers + local DB
|
||||
|
||||
For the demo (LAN), peers are static IPs in peers.json.
|
||||
For production (clients on own networks), this will evolve into a central
|
||||
hub model where each instance reports to a cloud server.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import requests
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from typing import Optional
|
||||
|
||||
# ─── Config ──────────────────────────────────────────────────────────────
|
||||
|
||||
_CONFIG_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'peers.json')
|
||||
_config_cache = None
|
||||
|
||||
|
||||
def _load_config():
|
||||
"""Load peers.json, cached in memory after first read."""
|
||||
global _config_cache
|
||||
if _config_cache is not None:
|
||||
return _config_cache
|
||||
try:
|
||||
with open(_CONFIG_PATH, 'r') as f:
|
||||
_config_cache = json.load(f)
|
||||
except (FileNotFoundError, json.JSONDecodeError) as e:
|
||||
print(f'[peer] Warning: could not load {_CONFIG_PATH}: {e}')
|
||||
_config_cache = {'instance_name': 'Unknown', 'peers': [], 'peer_timeout_seconds': 3}
|
||||
return _config_cache
|
||||
|
||||
|
||||
def reload_config():
|
||||
"""Force-reload peers.json (call after editing the file)."""
|
||||
global _config_cache
|
||||
_config_cache = None
|
||||
return _load_config()
|
||||
|
||||
|
||||
def get_instance_name() -> str:
|
||||
return _load_config().get('instance_name', 'Unknown')
|
||||
|
||||
|
||||
def get_instance_id() -> str:
|
||||
return _load_config().get('instance_id', 'unknown')
|
||||
|
||||
|
||||
def get_peers() -> list[dict]:
|
||||
"""Return list of enabled peers: [{name, url, enabled}]"""
|
||||
cfg = _load_config()
|
||||
return [p for p in cfg.get('peers', []) if p.get('enabled', True)]
|
||||
|
||||
|
||||
def get_timeout() -> int:
|
||||
return _load_config().get('peer_timeout_seconds', 3)
|
||||
|
||||
|
||||
# ─── Local inventory query (what WE expose to peers) ─────────────────────
|
||||
|
||||
def get_local_inventory(tenant_conn, query: str = None, limit: int = 50) -> list[dict]:
|
||||
"""Query this instance's inventory for the peer endpoint.
|
||||
|
||||
Returns parts WITH stock > 0, with enough detail for the marketplace
|
||||
to render results (part number, name, brand, price, stock hint).
|
||||
No exact stock numbers — just 'En stock' (per business decision).
|
||||
"""
|
||||
cur = tenant_conn.cursor()
|
||||
|
||||
# Build WHERE clause
|
||||
clauses = ["COALESCE(s.stock, 0) > 0", "i.is_active = TRUE"]
|
||||
params = []
|
||||
|
||||
if query:
|
||||
clauses.append("(i.part_number ILIKE %s OR i.name ILIKE %s OR i.brand ILIKE %s)")
|
||||
like = f'%{query}%'
|
||||
params.extend([like, like, like])
|
||||
|
||||
where = " AND ".join(clauses)
|
||||
|
||||
cur.execute(f"""
|
||||
SELECT i.id, i.part_number, i.name, i.brand, i.price_1,
|
||||
COALESCE(s.stock, 0) AS stock,
|
||||
i.unit, i.catalog_part_id
|
||||
FROM inventory i
|
||||
LEFT JOIN (
|
||||
SELECT inventory_id, SUM(quantity) AS stock
|
||||
FROM inventory_operations
|
||||
GROUP BY inventory_id
|
||||
) s ON s.inventory_id = i.id
|
||||
WHERE {where}
|
||||
ORDER BY i.name
|
||||
LIMIT %s
|
||||
""", params + [limit])
|
||||
|
||||
rows = cur.fetchall()
|
||||
cur.close()
|
||||
|
||||
return [
|
||||
{
|
||||
'id': r[0],
|
||||
'part_number': r[1],
|
||||
'name': r[2],
|
||||
'brand': r[3] or '',
|
||||
'price': float(r[4]) if r[4] else None,
|
||||
'stock_hint': 'En stock' if r[5] > 0 else 'Agotado',
|
||||
'unit': r[6] or 'PZA',
|
||||
'catalog_part_id': r[7],
|
||||
}
|
||||
for r in rows
|
||||
]
|
||||
|
||||
|
||||
# ─── Peer fan-out query ──────────────────────────────────────────────────
|
||||
|
||||
def _query_one_peer(peer: dict, query: str, limit: int) -> dict:
|
||||
"""Send a search request to one peer instance. Returns results or error."""
|
||||
url = peer['url'].rstrip('/') + '/pos/api/peer/inventory'
|
||||
params = {'limit': limit}
|
||||
if query:
|
||||
params['q'] = query
|
||||
try:
|
||||
resp = requests.get(url, params=params, timeout=get_timeout())
|
||||
if resp.status_code == 200:
|
||||
data = resp.json()
|
||||
# Tag each result with the source instance name
|
||||
items = data.get('data', [])
|
||||
for item in items:
|
||||
item['source_instance'] = peer['name']
|
||||
item['source_url'] = peer['url']
|
||||
return {'ok': True, 'name': peer['name'], 'data': items}
|
||||
else:
|
||||
return {'ok': False, 'name': peer['name'], 'error': f'HTTP {resp.status_code}'}
|
||||
except requests.exceptions.Timeout:
|
||||
return {'ok': False, 'name': peer['name'], 'error': 'timeout'}
|
||||
except requests.exceptions.ConnectionError:
|
||||
return {'ok': False, 'name': peer['name'], 'error': 'offline'}
|
||||
except Exception as e:
|
||||
return {'ok': False, 'name': peer['name'], 'error': str(e)[:100]}
|
||||
|
||||
|
||||
def search_all_peers(tenant_conn, query: str = None, limit: int = 50) -> dict:
|
||||
"""Search local inventory + all enabled peers in parallel.
|
||||
|
||||
Returns:
|
||||
{
|
||||
"local": { "name": "...", "data": [...] },
|
||||
"peers": [
|
||||
{"name": "Refac B", "data": [...], "ok": True},
|
||||
{"name": "Refac C", "data": [...], "ok": True},
|
||||
...
|
||||
],
|
||||
"merged": [...], # all results combined, local first
|
||||
"total": N,
|
||||
"errors": [...] # peers that failed
|
||||
}
|
||||
"""
|
||||
peers = get_peers()
|
||||
|
||||
# Local results
|
||||
local_data = get_local_inventory(tenant_conn, query=query, limit=limit)
|
||||
for item in local_data:
|
||||
item['source_instance'] = get_instance_name()
|
||||
item['source_url'] = 'local'
|
||||
|
||||
# Fan-out to peers in parallel
|
||||
peer_results = []
|
||||
errors = []
|
||||
|
||||
if peers:
|
||||
with ThreadPoolExecutor(max_workers=min(len(peers), 5)) as executor:
|
||||
futures = {
|
||||
executor.submit(_query_one_peer, p, query, limit): p
|
||||
for p in peers
|
||||
}
|
||||
for future in as_completed(futures):
|
||||
result = future.result()
|
||||
if result['ok']:
|
||||
peer_results.append(result)
|
||||
else:
|
||||
errors.append(result)
|
||||
print(f'[peer] {result["name"]}: {result["error"]}')
|
||||
|
||||
# Merge: local first, then peers (sorted by name within each source)
|
||||
merged = list(local_data)
|
||||
for pr in peer_results:
|
||||
merged.extend(pr.get('data', []))
|
||||
|
||||
return {
|
||||
'local': {
|
||||
'name': get_instance_name(),
|
||||
'data': local_data,
|
||||
'count': len(local_data),
|
||||
},
|
||||
'peers': peer_results,
|
||||
'merged': merged,
|
||||
'total': len(merged),
|
||||
'errors': errors,
|
||||
}
|
||||
|
||||
|
||||
# ─── Health check for the peer network ───────────────────────────────────
|
||||
|
||||
def check_peer_health() -> list[dict]:
|
||||
"""Ping all peers and return status. Useful for the admin dashboard."""
|
||||
peers = get_peers()
|
||||
results = []
|
||||
|
||||
def _ping(peer):
|
||||
try:
|
||||
url = peer['url'].rstrip('/') + '/pos/api/peer/health'
|
||||
resp = requests.get(url, timeout=get_timeout())
|
||||
if resp.status_code == 200:
|
||||
data = resp.json()
|
||||
return {
|
||||
'name': peer['name'],
|
||||
'url': peer['url'],
|
||||
'status': 'online',
|
||||
'instance_name': data.get('instance_name'),
|
||||
'inventory_count': data.get('inventory_count'),
|
||||
}
|
||||
return {'name': peer['name'], 'url': peer['url'], 'status': f'error:{resp.status_code}'}
|
||||
except Exception as e:
|
||||
return {'name': peer['name'], 'url': peer['url'], 'status': f'offline:{str(e)[:50]}'}
|
||||
|
||||
if peers:
|
||||
with ThreadPoolExecutor(max_workers=min(len(peers), 5)) as executor:
|
||||
results = list(executor.map(_ping, peers))
|
||||
|
||||
return results
|
||||
Reference in New Issue
Block a user