diff --git a/pos/migrations/runner.py b/pos/migrations/runner.py index 2e96434..c2cefaf 100755 --- a/pos/migrations/runner.py +++ b/pos/migrations/runner.py @@ -32,6 +32,7 @@ MIGRATIONS = { 'v2.9': 'v2.9_logistics.sql', 'v3.0': 'v3.0_public_api.sql', 'v3.1': 'v3.1_inventory_vehicle_compat.sql', + 'v3.2': 'v3.2_db_performance.sql', } diff --git a/pos/migrations/v3.2_db_performance.sql b/pos/migrations/v3.2_db_performance.sql new file mode 100644 index 0000000..0a15948 --- /dev/null +++ b/pos/migrations/v3.2_db_performance.sql @@ -0,0 +1,62 @@ +-- FASE 2: DB Performance Optimizations +-- Applies to: master DB (nexus_autoparts) and all tenant DBs + +-- ─── MASTER DB ───────────────────────────────── + +-- Index for parts name lookups (used by get_part_types, shop supplies) +CREATE INDEX IF NOT EXISTS idx_parts_name_part ON parts(name_part); +CREATE INDEX IF NOT EXISTS idx_parts_name_part_pattern ON parts(name_part text_pattern_ops); + +-- Partial index for warehouse_inventory stock lookups (already created in Fase 1, +-- kept here for completeness on fresh installs) +CREATE INDEX IF NOT EXISTS idx_wi_part_stock_positive +ON warehouse_inventory(part_id) WHERE stock_quantity > 0; + +-- ─── TENANT DB (run on each tenant) ──────────── + +-- O(1) stock summary table with trigger-based updates +CREATE TABLE IF NOT EXISTS inventory_stock_summary ( + inventory_id INT PRIMARY KEY REFERENCES inventory(id) ON DELETE CASCADE, + branch_id INT REFERENCES branches(id), + stock INT NOT NULL DEFAULT 0, + last_updated TIMESTAMPTZ DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_iss_branch ON inventory_stock_summary(branch_id); + +-- Trigger function: recalculates stock for an item after every operation insert +CREATE OR REPLACE FUNCTION update_stock_summary() +RETURNS TRIGGER AS $$ +BEGIN + INSERT INTO inventory_stock_summary (inventory_id, branch_id, stock) + SELECT i.id, i.branch_id, COALESCE(SUM(io.quantity), 0) + FROM inventory i + LEFT JOIN inventory_operations io ON io.inventory_id = i.id + WHERE i.id = NEW.inventory_id + GROUP BY i.id, i.branch_id + ON CONFLICT (inventory_id) DO UPDATE SET + stock = EXCLUDED.stock, + branch_id = EXCLUDED.branch_id, + last_updated = NOW(); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +DROP TRIGGER IF EXISTS trg_update_stock_summary ON inventory_operations; +CREATE TRIGGER trg_update_stock_summary +AFTER INSERT ON inventory_operations +FOR EACH ROW EXECUTE FUNCTION update_stock_summary(); + +-- Composite index for inventory_operations (used by stock + history queries) +CREATE INDEX IF NOT EXISTS idx_inv_ops_inventory_branch_created +ON inventory_operations(inventory_id, branch_id, created_at DESC); + +-- Initial population of summary table (idempotent) +INSERT INTO inventory_stock_summary (inventory_id, branch_id, stock) +SELECT i.id, i.branch_id, COALESCE(SUM(io.quantity), 0) +FROM inventory i +LEFT JOIN inventory_operations io ON io.inventory_id = i.id +GROUP BY i.id, i.branch_id +ON CONFLICT (inventory_id) DO UPDATE SET + stock = EXCLUDED.stock, + branch_id = EXCLUDED.branch_id, + last_updated = NOW(); diff --git a/pos/services/ai_chat.py b/pos/services/ai_chat.py index 6b319f5..8474132 100644 --- a/pos/services/ai_chat.py +++ b/pos/services/ai_chat.py @@ -140,7 +140,7 @@ def get_inventory_context(tenant_conn, branch_id=None): cur.execute(f""" SELECT COUNT(*) FROM inventory i WHERE {where} - AND COALESCE((SELECT SUM(quantity) FROM inventory_operations WHERE inventory_id = i.id), 0) <= 3 + AND COALESCE((SELECT stock FROM inventory_stock_summary WHERE inventory_id = i.id), 0) <= 3 """, params) low_stock = cur.fetchone()[0] or 0 diff --git a/pos/services/catalog_service.py b/pos/services/catalog_service.py index 8297cca..aa81eb5 100644 --- a/pos/services/catalog_service.py +++ b/pos/services/catalog_service.py @@ -1411,12 +1411,11 @@ def _get_local_stock_bulk(tenant_conn, branch_id, oem_numbers, catalog_part_ids) cur.execute(f""" SELECT i.id, i.part_number, i.catalog_part_id, i.price_1, i.price_2, i.price_3, i.cost, i.tax_rate, - COALESCE(SUM(io.quantity), 0) AS stock, + COALESCE(s.stock, 0) AS stock, i.image_url FROM inventory i - LEFT JOIN inventory_operations io ON io.inventory_id = i.id + LEFT JOIN inventory_stock_summary s ON s.inventory_id = i.id WHERE ({where}) AND i.is_active = true{branch_filter} - GROUP BY i.id """, params) result = {} @@ -1453,13 +1452,12 @@ def _get_local_stock_single(tenant_conn, branch_id, oem_part_number, catalog_par cur.execute(f""" SELECT i.id, i.price_1, i.price_2, i.price_3, i.cost, i.tax_rate, i.location, i.unit, i.barcode, - COALESCE(SUM(io.quantity), 0) AS stock, + COALESCE(s.stock, 0) AS stock, i.image_url FROM inventory i - LEFT JOIN inventory_operations io ON io.inventory_id = i.id + LEFT JOIN inventory_stock_summary s ON s.inventory_id = i.id WHERE (i.part_number = %s OR i.catalog_part_id = %s) AND i.is_active = true{branch_filter} - GROUP BY i.id LIMIT 1 """, params) diff --git a/pos/services/inventory_engine.py b/pos/services/inventory_engine.py index 82475a5..b518276 100644 --- a/pos/services/inventory_engine.py +++ b/pos/services/inventory_engine.py @@ -25,15 +25,33 @@ def _safe_g(attr, default=None): def get_stock(conn, inventory_id, branch_id=None): """Get current stock for an inventory item. Optionally filter by branch. - Uses Redis cache first, falls back to PostgreSQL SUM query. + Uses Redis cache first, then inventory_stock_summary, falls back to + PostgreSQL SUM query. """ # Try Redis first cached = get_cached_stock(inventory_id, branch_id) if cached is not None: return cached - # Fallback to PostgreSQL + # Use inventory_stock_summary (O(1) lookup) cur = conn.cursor() + if branch_id: + cur.execute( + "SELECT stock FROM inventory_stock_summary WHERE inventory_id = %s AND branch_id = %s", + (inventory_id, branch_id) + ) + else: + cur.execute( + "SELECT stock FROM inventory_stock_summary WHERE inventory_id = %s", + (inventory_id,) + ) + row = cur.fetchone() + if row is not None: + cur.close() + set_cached_stock(inventory_id, row[0], branch_id) + return row[0] + + # Fallback to PostgreSQL SUM (legacy, should not reach here if trigger works) if branch_id: cur.execute( "SELECT COALESCE(SUM(quantity), 0) FROM inventory_operations WHERE inventory_id = %s AND branch_id = %s", @@ -55,21 +73,17 @@ def get_stock(conn, inventory_id, branch_id=None): def get_stock_bulk(conn, branch_id=None): """Get stock for all items. Returns dict {inventory_id: stock_quantity}. - Uses PostgreSQL directly (bulk operation, Redis wouldn't help much here - unless we pre-populated all keys). + Uses inventory_stock_summary for O(1) bulk lookup. """ cur = conn.cursor() if branch_id: cur.execute(""" - SELECT inventory_id, COALESCE(SUM(quantity), 0) - FROM inventory_operations WHERE branch_id = %s - GROUP BY inventory_id + SELECT inventory_id, stock + FROM inventory_stock_summary WHERE branch_id = %s """, (branch_id,)) else: cur.execute(""" - SELECT inventory_id, COALESCE(SUM(quantity), 0) - FROM inventory_operations - GROUP BY inventory_id + SELECT inventory_id, stock FROM inventory_stock_summary """) stock_map = {r[0]: r[1] for r in cur.fetchall()} cur.close() diff --git a/pos/services/inventory_vehicle_compat.py b/pos/services/inventory_vehicle_compat.py index 9653557..dd04e99 100644 --- a/pos/services/inventory_vehicle_compat.py +++ b/pos/services/inventory_vehicle_compat.py @@ -194,11 +194,7 @@ def get_inventory_by_vehicle(tenant_conn, master_conn, mye_id, branch_id=None): i.image_url, i.description, COALESCE(s.stock, 0) as stock FROM inventory i JOIN inventory_vehicle_compat ivc ON ivc.inventory_id = i.id - LEFT JOIN ( - SELECT inventory_id, SUM(quantity) as stock - FROM inventory_operations - GROUP BY inventory_id - ) s ON s.inventory_id = i.id + LEFT JOIN inventory_stock_summary s ON s.inventory_id = i.id WHERE ivc.model_year_engine_id = %s {branch_filter} AND i.is_active = true ORDER BY i.name diff --git a/pos/services/peer_service.py b/pos/services/peer_service.py index 104b253..a5ff52b 100644 --- a/pos/services/peer_service.py +++ b/pos/services/peer_service.py @@ -93,11 +93,7 @@ def get_local_inventory(tenant_conn, query: str = None, limit: int = 50) -> list COALESCE(s.stock, 0) AS stock, i.unit, i.catalog_part_id FROM inventory i - LEFT JOIN ( - SELECT inventory_id, SUM(quantity) AS stock - FROM inventory_operations - GROUP BY inventory_id - ) s ON s.inventory_id = i.id + LEFT JOIN inventory_stock_summary s ON s.inventory_id = i.id WHERE {where} ORDER BY i.name LIMIT %s diff --git a/pos/services/pos_engine.py b/pos/services/pos_engine.py index 66c4b6a..94b593e 100644 --- a/pos/services/pos_engine.py +++ b/pos/services/pos_engine.py @@ -224,7 +224,7 @@ def process_sale(conn, sale_data): # Lock inventory rows to prevent race conditions on concurrent sales cur.execute(""" SELECT id, part_number, name, cost, price_1, price_2, price_3, - tax_rate, branch_id + tax_rate, branch_id, retail_price FROM inventory WHERE id = ANY(%s) AND is_active = true ORDER BY id @@ -327,10 +327,9 @@ def process_sale(conn, sale_data): # Create sale items (batch insert) and deduct inventory sale_items_data = [] for item in totals['items']: - # Fetch retail_price for savings calculation - cur.execute("SELECT retail_price FROM inventory WHERE id = %s", (item['inventory_id'],)) - rp_row = cur.fetchone() - retail_price = rp_row[0] if rp_row else None + # retail_price from preloaded bulk query (index 9) + inv = inv_rows.get(item['inventory_id']) + retail_price = inv[9] if inv else None sale_items_data.append(( sale_id, item['inventory_id'], item['part_number'], item['name'], diff --git a/pos/tenant_db.py b/pos/tenant_db.py index a99fb7e..7d31bf5 100644 --- a/pos/tenant_db.py +++ b/pos/tenant_db.py @@ -1,20 +1,85 @@ # /home/Autopartes/pos/tenant_db.py -"""Tenant DB connection manager. Gets a psycopg2 connection for a specific tenant.""" +"""Tenant DB connection manager with pooling. + +Uses psycopg2.pool.ThreadedConnectionPool for both master and tenant DBs. +Connections are returned to the pool on .close() via a thin wrapper — +this keeps the rest of the codebase unchanged. +""" import psycopg2 +from psycopg2 import pool from config import MASTER_DB_URL, TENANT_DB_URL_TEMPLATE +# ─── Pools ───────────────────────────────────── +_master_pool = None +_tenant_pools = {} + + +def _get_master_pool(): + """Lazy-initialize master DB connection pool.""" + global _master_pool + if _master_pool is None: + _master_pool = pool.ThreadedConnectionPool( + minconn=2, maxconn=10, dsn=MASTER_DB_URL + ) + return _master_pool + + +def _get_tenant_pool(db_name): + """Lazy-initialize tenant DB connection pool by db_name.""" + global _tenant_pools + if db_name not in _tenant_pools: + dsn = TENANT_DB_URL_TEMPLATE.format(db_name=db_name) + _tenant_pools[db_name] = pool.ThreadedConnectionPool( + minconn=2, maxconn=10, dsn=dsn + ) + return _tenant_pools[db_name] + + +class _PooledConnection: + """Thin wrapper that delegates all attribute access to the real + psycopg2 connection, but returns it to the pool on .close(). + """ + __slots__ = ('_conn', '_pool') + + def __init__(self, conn, db_pool): + self._conn = conn + self._pool = db_pool + + def __getattr__(self, name): + return getattr(self._conn, name) + + def close(self): + try: + self._pool.putconn(self._conn) + except Exception: + # If pool is already closed, fall back to real close + self._conn.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + +# ─── Public API ──────────────────────────────── + def get_master_conn(): - """Get connection to nexus_master DB.""" - return psycopg2.connect(MASTER_DB_URL) + """Get a pooled connection to the master DB.""" + p = _get_master_pool() + return _PooledConnection(p.getconn(), p) def get_tenant_conn(tenant_id): - """Get connection to a tenant's DB by looking up db_name in nexus_master.""" + """Get a pooled connection to a tenant's DB.""" master = get_master_conn() cur = master.cursor() - cur.execute("SELECT db_name FROM tenants WHERE id = %s AND is_active = true", (tenant_id,)) + cur.execute( + "SELECT db_name FROM tenants WHERE id = %s AND is_active = true", + (tenant_id,) + ) row = cur.fetchone() cur.close() master.close() @@ -23,9 +88,11 @@ def get_tenant_conn(tenant_id): raise ValueError(f"Tenant {tenant_id} not found or inactive") db_name = row[0] - return psycopg2.connect(TENANT_DB_URL_TEMPLATE.format(db_name=db_name)) + p = _get_tenant_pool(db_name) + return _PooledConnection(p.getconn(), p) def get_tenant_conn_by_dbname(db_name): - """Get connection to a tenant DB directly by name.""" - return psycopg2.connect(TENANT_DB_URL_TEMPLATE.format(db_name=db_name)) + """Get a pooled connection to a tenant DB directly by name.""" + p = _get_tenant_pool(db_name) + return _PooledConnection(p.getconn(), p)