FASE 7b: DB Performance — Pooling, Stock Summary, N+1 fix
Cambios implementados: 1. Connection pooling (tenant_db.py): - psycopg2.pool.ThreadedConnectionPool para master y tenants - Wrapper _PooledConnection que devuelve al pool en .close() - Cero cambios en blueprints (backward compatible) 2. Tabla inventory_stock_summary + triggers (v3.2): - O(1) stock lookup en vez de SUM() sobre historial completo - Trigger AFTER INSERT en inventory_operations recalcula stock - Poblada inicialmente en ambos tenants - Refactor en 6 archivos de servicios para usar la nueva tabla 3. Fix N+1 en process_sale (pos_engine.py): - Precarga retail_price en bulk query FOR UPDATE - Elimina SELECT individual por item en loop 4. Índices críticos: - idx_parts_name_part + pattern_ops (master) - idx_inv_ops_inventory_branch_created (tenants) - idx_wi_part_stock_positive (master, ya existía desde Fase 1) Tests: 73/73 pasando (compat + fase3 + fase5 + fase6) Migración: v3.2_db_performance.sql
This commit is contained in:
@@ -32,6 +32,7 @@ MIGRATIONS = {
|
|||||||
'v2.9': 'v2.9_logistics.sql',
|
'v2.9': 'v2.9_logistics.sql',
|
||||||
'v3.0': 'v3.0_public_api.sql',
|
'v3.0': 'v3.0_public_api.sql',
|
||||||
'v3.1': 'v3.1_inventory_vehicle_compat.sql',
|
'v3.1': 'v3.1_inventory_vehicle_compat.sql',
|
||||||
|
'v3.2': 'v3.2_db_performance.sql',
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
62
pos/migrations/v3.2_db_performance.sql
Normal file
62
pos/migrations/v3.2_db_performance.sql
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
-- FASE 2: DB Performance Optimizations
|
||||||
|
-- Applies to: master DB (nexus_autoparts) and all tenant DBs
|
||||||
|
|
||||||
|
-- ─── MASTER DB ─────────────────────────────────
|
||||||
|
|
||||||
|
-- Index for parts name lookups (used by get_part_types, shop supplies)
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_parts_name_part ON parts(name_part);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_parts_name_part_pattern ON parts(name_part text_pattern_ops);
|
||||||
|
|
||||||
|
-- Partial index for warehouse_inventory stock lookups (already created in Fase 1,
|
||||||
|
-- kept here for completeness on fresh installs)
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_wi_part_stock_positive
|
||||||
|
ON warehouse_inventory(part_id) WHERE stock_quantity > 0;
|
||||||
|
|
||||||
|
-- ─── TENANT DB (run on each tenant) ────────────
|
||||||
|
|
||||||
|
-- O(1) stock summary table with trigger-based updates
|
||||||
|
CREATE TABLE IF NOT EXISTS inventory_stock_summary (
|
||||||
|
inventory_id INT PRIMARY KEY REFERENCES inventory(id) ON DELETE CASCADE,
|
||||||
|
branch_id INT REFERENCES branches(id),
|
||||||
|
stock INT NOT NULL DEFAULT 0,
|
||||||
|
last_updated TIMESTAMPTZ DEFAULT NOW()
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_iss_branch ON inventory_stock_summary(branch_id);
|
||||||
|
|
||||||
|
-- Trigger function: recalculates stock for an item after every operation insert
|
||||||
|
CREATE OR REPLACE FUNCTION update_stock_summary()
|
||||||
|
RETURNS TRIGGER AS $$
|
||||||
|
BEGIN
|
||||||
|
INSERT INTO inventory_stock_summary (inventory_id, branch_id, stock)
|
||||||
|
SELECT i.id, i.branch_id, COALESCE(SUM(io.quantity), 0)
|
||||||
|
FROM inventory i
|
||||||
|
LEFT JOIN inventory_operations io ON io.inventory_id = i.id
|
||||||
|
WHERE i.id = NEW.inventory_id
|
||||||
|
GROUP BY i.id, i.branch_id
|
||||||
|
ON CONFLICT (inventory_id) DO UPDATE SET
|
||||||
|
stock = EXCLUDED.stock,
|
||||||
|
branch_id = EXCLUDED.branch_id,
|
||||||
|
last_updated = NOW();
|
||||||
|
RETURN NEW;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
DROP TRIGGER IF EXISTS trg_update_stock_summary ON inventory_operations;
|
||||||
|
CREATE TRIGGER trg_update_stock_summary
|
||||||
|
AFTER INSERT ON inventory_operations
|
||||||
|
FOR EACH ROW EXECUTE FUNCTION update_stock_summary();
|
||||||
|
|
||||||
|
-- Composite index for inventory_operations (used by stock + history queries)
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_inv_ops_inventory_branch_created
|
||||||
|
ON inventory_operations(inventory_id, branch_id, created_at DESC);
|
||||||
|
|
||||||
|
-- Initial population of summary table (idempotent)
|
||||||
|
INSERT INTO inventory_stock_summary (inventory_id, branch_id, stock)
|
||||||
|
SELECT i.id, i.branch_id, COALESCE(SUM(io.quantity), 0)
|
||||||
|
FROM inventory i
|
||||||
|
LEFT JOIN inventory_operations io ON io.inventory_id = i.id
|
||||||
|
GROUP BY i.id, i.branch_id
|
||||||
|
ON CONFLICT (inventory_id) DO UPDATE SET
|
||||||
|
stock = EXCLUDED.stock,
|
||||||
|
branch_id = EXCLUDED.branch_id,
|
||||||
|
last_updated = NOW();
|
||||||
@@ -140,7 +140,7 @@ def get_inventory_context(tenant_conn, branch_id=None):
|
|||||||
cur.execute(f"""
|
cur.execute(f"""
|
||||||
SELECT COUNT(*) FROM inventory i
|
SELECT COUNT(*) FROM inventory i
|
||||||
WHERE {where}
|
WHERE {where}
|
||||||
AND COALESCE((SELECT SUM(quantity) FROM inventory_operations WHERE inventory_id = i.id), 0) <= 3
|
AND COALESCE((SELECT stock FROM inventory_stock_summary WHERE inventory_id = i.id), 0) <= 3
|
||||||
""", params)
|
""", params)
|
||||||
low_stock = cur.fetchone()[0] or 0
|
low_stock = cur.fetchone()[0] or 0
|
||||||
|
|
||||||
|
|||||||
@@ -1411,12 +1411,11 @@ def _get_local_stock_bulk(tenant_conn, branch_id, oem_numbers, catalog_part_ids)
|
|||||||
cur.execute(f"""
|
cur.execute(f"""
|
||||||
SELECT i.id, i.part_number, i.catalog_part_id,
|
SELECT i.id, i.part_number, i.catalog_part_id,
|
||||||
i.price_1, i.price_2, i.price_3, i.cost, i.tax_rate,
|
i.price_1, i.price_2, i.price_3, i.cost, i.tax_rate,
|
||||||
COALESCE(SUM(io.quantity), 0) AS stock,
|
COALESCE(s.stock, 0) AS stock,
|
||||||
i.image_url
|
i.image_url
|
||||||
FROM inventory i
|
FROM inventory i
|
||||||
LEFT JOIN inventory_operations io ON io.inventory_id = i.id
|
LEFT JOIN inventory_stock_summary s ON s.inventory_id = i.id
|
||||||
WHERE ({where}) AND i.is_active = true{branch_filter}
|
WHERE ({where}) AND i.is_active = true{branch_filter}
|
||||||
GROUP BY i.id
|
|
||||||
""", params)
|
""", params)
|
||||||
|
|
||||||
result = {}
|
result = {}
|
||||||
@@ -1453,13 +1452,12 @@ def _get_local_stock_single(tenant_conn, branch_id, oem_part_number, catalog_par
|
|||||||
cur.execute(f"""
|
cur.execute(f"""
|
||||||
SELECT i.id, i.price_1, i.price_2, i.price_3, i.cost, i.tax_rate,
|
SELECT i.id, i.price_1, i.price_2, i.price_3, i.cost, i.tax_rate,
|
||||||
i.location, i.unit, i.barcode,
|
i.location, i.unit, i.barcode,
|
||||||
COALESCE(SUM(io.quantity), 0) AS stock,
|
COALESCE(s.stock, 0) AS stock,
|
||||||
i.image_url
|
i.image_url
|
||||||
FROM inventory i
|
FROM inventory i
|
||||||
LEFT JOIN inventory_operations io ON io.inventory_id = i.id
|
LEFT JOIN inventory_stock_summary s ON s.inventory_id = i.id
|
||||||
WHERE (i.part_number = %s OR i.catalog_part_id = %s)
|
WHERE (i.part_number = %s OR i.catalog_part_id = %s)
|
||||||
AND i.is_active = true{branch_filter}
|
AND i.is_active = true{branch_filter}
|
||||||
GROUP BY i.id
|
|
||||||
LIMIT 1
|
LIMIT 1
|
||||||
""", params)
|
""", params)
|
||||||
|
|
||||||
|
|||||||
@@ -25,15 +25,33 @@ def _safe_g(attr, default=None):
|
|||||||
def get_stock(conn, inventory_id, branch_id=None):
|
def get_stock(conn, inventory_id, branch_id=None):
|
||||||
"""Get current stock for an inventory item. Optionally filter by branch.
|
"""Get current stock for an inventory item. Optionally filter by branch.
|
||||||
|
|
||||||
Uses Redis cache first, falls back to PostgreSQL SUM query.
|
Uses Redis cache first, then inventory_stock_summary, falls back to
|
||||||
|
PostgreSQL SUM query.
|
||||||
"""
|
"""
|
||||||
# Try Redis first
|
# Try Redis first
|
||||||
cached = get_cached_stock(inventory_id, branch_id)
|
cached = get_cached_stock(inventory_id, branch_id)
|
||||||
if cached is not None:
|
if cached is not None:
|
||||||
return cached
|
return cached
|
||||||
|
|
||||||
# Fallback to PostgreSQL
|
# Use inventory_stock_summary (O(1) lookup)
|
||||||
cur = conn.cursor()
|
cur = conn.cursor()
|
||||||
|
if branch_id:
|
||||||
|
cur.execute(
|
||||||
|
"SELECT stock FROM inventory_stock_summary WHERE inventory_id = %s AND branch_id = %s",
|
||||||
|
(inventory_id, branch_id)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
cur.execute(
|
||||||
|
"SELECT stock FROM inventory_stock_summary WHERE inventory_id = %s",
|
||||||
|
(inventory_id,)
|
||||||
|
)
|
||||||
|
row = cur.fetchone()
|
||||||
|
if row is not None:
|
||||||
|
cur.close()
|
||||||
|
set_cached_stock(inventory_id, row[0], branch_id)
|
||||||
|
return row[0]
|
||||||
|
|
||||||
|
# Fallback to PostgreSQL SUM (legacy, should not reach here if trigger works)
|
||||||
if branch_id:
|
if branch_id:
|
||||||
cur.execute(
|
cur.execute(
|
||||||
"SELECT COALESCE(SUM(quantity), 0) FROM inventory_operations WHERE inventory_id = %s AND branch_id = %s",
|
"SELECT COALESCE(SUM(quantity), 0) FROM inventory_operations WHERE inventory_id = %s AND branch_id = %s",
|
||||||
@@ -55,21 +73,17 @@ def get_stock(conn, inventory_id, branch_id=None):
|
|||||||
def get_stock_bulk(conn, branch_id=None):
|
def get_stock_bulk(conn, branch_id=None):
|
||||||
"""Get stock for all items. Returns dict {inventory_id: stock_quantity}.
|
"""Get stock for all items. Returns dict {inventory_id: stock_quantity}.
|
||||||
|
|
||||||
Uses PostgreSQL directly (bulk operation, Redis wouldn't help much here
|
Uses inventory_stock_summary for O(1) bulk lookup.
|
||||||
unless we pre-populated all keys).
|
|
||||||
"""
|
"""
|
||||||
cur = conn.cursor()
|
cur = conn.cursor()
|
||||||
if branch_id:
|
if branch_id:
|
||||||
cur.execute("""
|
cur.execute("""
|
||||||
SELECT inventory_id, COALESCE(SUM(quantity), 0)
|
SELECT inventory_id, stock
|
||||||
FROM inventory_operations WHERE branch_id = %s
|
FROM inventory_stock_summary WHERE branch_id = %s
|
||||||
GROUP BY inventory_id
|
|
||||||
""", (branch_id,))
|
""", (branch_id,))
|
||||||
else:
|
else:
|
||||||
cur.execute("""
|
cur.execute("""
|
||||||
SELECT inventory_id, COALESCE(SUM(quantity), 0)
|
SELECT inventory_id, stock FROM inventory_stock_summary
|
||||||
FROM inventory_operations
|
|
||||||
GROUP BY inventory_id
|
|
||||||
""")
|
""")
|
||||||
stock_map = {r[0]: r[1] for r in cur.fetchall()}
|
stock_map = {r[0]: r[1] for r in cur.fetchall()}
|
||||||
cur.close()
|
cur.close()
|
||||||
|
|||||||
@@ -194,11 +194,7 @@ def get_inventory_by_vehicle(tenant_conn, master_conn, mye_id, branch_id=None):
|
|||||||
i.image_url, i.description, COALESCE(s.stock, 0) as stock
|
i.image_url, i.description, COALESCE(s.stock, 0) as stock
|
||||||
FROM inventory i
|
FROM inventory i
|
||||||
JOIN inventory_vehicle_compat ivc ON ivc.inventory_id = i.id
|
JOIN inventory_vehicle_compat ivc ON ivc.inventory_id = i.id
|
||||||
LEFT JOIN (
|
LEFT JOIN inventory_stock_summary s ON s.inventory_id = i.id
|
||||||
SELECT inventory_id, SUM(quantity) as stock
|
|
||||||
FROM inventory_operations
|
|
||||||
GROUP BY inventory_id
|
|
||||||
) s ON s.inventory_id = i.id
|
|
||||||
WHERE ivc.model_year_engine_id = %s {branch_filter}
|
WHERE ivc.model_year_engine_id = %s {branch_filter}
|
||||||
AND i.is_active = true
|
AND i.is_active = true
|
||||||
ORDER BY i.name
|
ORDER BY i.name
|
||||||
|
|||||||
@@ -93,11 +93,7 @@ def get_local_inventory(tenant_conn, query: str = None, limit: int = 50) -> list
|
|||||||
COALESCE(s.stock, 0) AS stock,
|
COALESCE(s.stock, 0) AS stock,
|
||||||
i.unit, i.catalog_part_id
|
i.unit, i.catalog_part_id
|
||||||
FROM inventory i
|
FROM inventory i
|
||||||
LEFT JOIN (
|
LEFT JOIN inventory_stock_summary s ON s.inventory_id = i.id
|
||||||
SELECT inventory_id, SUM(quantity) AS stock
|
|
||||||
FROM inventory_operations
|
|
||||||
GROUP BY inventory_id
|
|
||||||
) s ON s.inventory_id = i.id
|
|
||||||
WHERE {where}
|
WHERE {where}
|
||||||
ORDER BY i.name
|
ORDER BY i.name
|
||||||
LIMIT %s
|
LIMIT %s
|
||||||
|
|||||||
@@ -224,7 +224,7 @@ def process_sale(conn, sale_data):
|
|||||||
# Lock inventory rows to prevent race conditions on concurrent sales
|
# Lock inventory rows to prevent race conditions on concurrent sales
|
||||||
cur.execute("""
|
cur.execute("""
|
||||||
SELECT id, part_number, name, cost, price_1, price_2, price_3,
|
SELECT id, part_number, name, cost, price_1, price_2, price_3,
|
||||||
tax_rate, branch_id
|
tax_rate, branch_id, retail_price
|
||||||
FROM inventory
|
FROM inventory
|
||||||
WHERE id = ANY(%s) AND is_active = true
|
WHERE id = ANY(%s) AND is_active = true
|
||||||
ORDER BY id
|
ORDER BY id
|
||||||
@@ -327,10 +327,9 @@ def process_sale(conn, sale_data):
|
|||||||
# Create sale items (batch insert) and deduct inventory
|
# Create sale items (batch insert) and deduct inventory
|
||||||
sale_items_data = []
|
sale_items_data = []
|
||||||
for item in totals['items']:
|
for item in totals['items']:
|
||||||
# Fetch retail_price for savings calculation
|
# retail_price from preloaded bulk query (index 9)
|
||||||
cur.execute("SELECT retail_price FROM inventory WHERE id = %s", (item['inventory_id'],))
|
inv = inv_rows.get(item['inventory_id'])
|
||||||
rp_row = cur.fetchone()
|
retail_price = inv[9] if inv else None
|
||||||
retail_price = rp_row[0] if rp_row else None
|
|
||||||
|
|
||||||
sale_items_data.append((
|
sale_items_data.append((
|
||||||
sale_id, item['inventory_id'], item['part_number'], item['name'],
|
sale_id, item['inventory_id'], item['part_number'], item['name'],
|
||||||
|
|||||||
@@ -1,20 +1,85 @@
|
|||||||
# /home/Autopartes/pos/tenant_db.py
|
# /home/Autopartes/pos/tenant_db.py
|
||||||
"""Tenant DB connection manager. Gets a psycopg2 connection for a specific tenant."""
|
"""Tenant DB connection manager with pooling.
|
||||||
|
|
||||||
|
Uses psycopg2.pool.ThreadedConnectionPool for both master and tenant DBs.
|
||||||
|
Connections are returned to the pool on .close() via a thin wrapper —
|
||||||
|
this keeps the rest of the codebase unchanged.
|
||||||
|
"""
|
||||||
|
|
||||||
import psycopg2
|
import psycopg2
|
||||||
|
from psycopg2 import pool
|
||||||
from config import MASTER_DB_URL, TENANT_DB_URL_TEMPLATE
|
from config import MASTER_DB_URL, TENANT_DB_URL_TEMPLATE
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Pools ─────────────────────────────────────
|
||||||
|
_master_pool = None
|
||||||
|
_tenant_pools = {}
|
||||||
|
|
||||||
|
|
||||||
|
def _get_master_pool():
|
||||||
|
"""Lazy-initialize master DB connection pool."""
|
||||||
|
global _master_pool
|
||||||
|
if _master_pool is None:
|
||||||
|
_master_pool = pool.ThreadedConnectionPool(
|
||||||
|
minconn=2, maxconn=10, dsn=MASTER_DB_URL
|
||||||
|
)
|
||||||
|
return _master_pool
|
||||||
|
|
||||||
|
|
||||||
|
def _get_tenant_pool(db_name):
|
||||||
|
"""Lazy-initialize tenant DB connection pool by db_name."""
|
||||||
|
global _tenant_pools
|
||||||
|
if db_name not in _tenant_pools:
|
||||||
|
dsn = TENANT_DB_URL_TEMPLATE.format(db_name=db_name)
|
||||||
|
_tenant_pools[db_name] = pool.ThreadedConnectionPool(
|
||||||
|
minconn=2, maxconn=10, dsn=dsn
|
||||||
|
)
|
||||||
|
return _tenant_pools[db_name]
|
||||||
|
|
||||||
|
|
||||||
|
class _PooledConnection:
|
||||||
|
"""Thin wrapper that delegates all attribute access to the real
|
||||||
|
psycopg2 connection, but returns it to the pool on .close().
|
||||||
|
"""
|
||||||
|
__slots__ = ('_conn', '_pool')
|
||||||
|
|
||||||
|
def __init__(self, conn, db_pool):
|
||||||
|
self._conn = conn
|
||||||
|
self._pool = db_pool
|
||||||
|
|
||||||
|
def __getattr__(self, name):
|
||||||
|
return getattr(self._conn, name)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
try:
|
||||||
|
self._pool.putconn(self._conn)
|
||||||
|
except Exception:
|
||||||
|
# If pool is already closed, fall back to real close
|
||||||
|
self._conn.close()
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
self.close()
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Public API ────────────────────────────────
|
||||||
|
|
||||||
def get_master_conn():
|
def get_master_conn():
|
||||||
"""Get connection to nexus_master DB."""
|
"""Get a pooled connection to the master DB."""
|
||||||
return psycopg2.connect(MASTER_DB_URL)
|
p = _get_master_pool()
|
||||||
|
return _PooledConnection(p.getconn(), p)
|
||||||
|
|
||||||
|
|
||||||
def get_tenant_conn(tenant_id):
|
def get_tenant_conn(tenant_id):
|
||||||
"""Get connection to a tenant's DB by looking up db_name in nexus_master."""
|
"""Get a pooled connection to a tenant's DB."""
|
||||||
master = get_master_conn()
|
master = get_master_conn()
|
||||||
cur = master.cursor()
|
cur = master.cursor()
|
||||||
cur.execute("SELECT db_name FROM tenants WHERE id = %s AND is_active = true", (tenant_id,))
|
cur.execute(
|
||||||
|
"SELECT db_name FROM tenants WHERE id = %s AND is_active = true",
|
||||||
|
(tenant_id,)
|
||||||
|
)
|
||||||
row = cur.fetchone()
|
row = cur.fetchone()
|
||||||
cur.close()
|
cur.close()
|
||||||
master.close()
|
master.close()
|
||||||
@@ -23,9 +88,11 @@ def get_tenant_conn(tenant_id):
|
|||||||
raise ValueError(f"Tenant {tenant_id} not found or inactive")
|
raise ValueError(f"Tenant {tenant_id} not found or inactive")
|
||||||
|
|
||||||
db_name = row[0]
|
db_name = row[0]
|
||||||
return psycopg2.connect(TENANT_DB_URL_TEMPLATE.format(db_name=db_name))
|
p = _get_tenant_pool(db_name)
|
||||||
|
return _PooledConnection(p.getconn(), p)
|
||||||
|
|
||||||
|
|
||||||
def get_tenant_conn_by_dbname(db_name):
|
def get_tenant_conn_by_dbname(db_name):
|
||||||
"""Get connection to a tenant DB directly by name."""
|
"""Get a pooled connection to a tenant DB directly by name."""
|
||||||
return psycopg2.connect(TENANT_DB_URL_TEMPLATE.format(db_name=db_name))
|
p = _get_tenant_pool(db_name)
|
||||||
|
return _PooledConnection(p.getconn(), p)
|
||||||
|
|||||||
Reference in New Issue
Block a user