#!/usr/bin/env python3 """Partition vehicle_parts by HASH(part_id) into 16 partitions. This is a HIGH-RISK operation on a 254 GB table. Run ONLY during maintenance window. Strategy: 1. Create partitioned table vehicle_parts_new with 16 hash partitions. 2. Migrate data in batches of 500K rows (checkpoint-friendly). 3. Atomically swap: rename old -> _old, new -> vehicle_parts. 4. Validate counts and indexes. 5. Drop old table after validation (or keep for rollback). Usage: export MASTER_DB_URL="postgresql://postgres@/nexus_autoparts" python3 partition_vehicle_parts.py --dry-run python3 partition_vehicle_parts.py Requires: - PostgreSQL 11+ (partitioning support) - ~300 GB free disk space during migration - Maintenance window (table is locked briefly during swap) """ import argparse import os import sys import time from datetime import datetime import psycopg2 DSN = os.environ.get('MASTER_DB_URL', 'postgresql://postgres@/nexus_autoparts') BATCH_SIZE = 10_000_000 PARTITIONS = 16 def log(msg): print(f"[{datetime.now().isoformat(timespec='seconds')}] {msg}", flush=True) def get_conn(): return psycopg2.connect(DSN) def table_exists(cur, name): cur.execute("SELECT 1 FROM pg_tables WHERE tablename = %s", (name,)) return cur.fetchone() is not None def get_row_count(cur, name): cur.execute(f"SELECT COUNT(*) FROM {name}") return cur.fetchone()[0] def get_max_id(cur, name): cur.execute(f"SELECT MAX(id_vehicle_part) FROM {name}") row = cur.fetchone() return row[0] or 0 def create_partitioned_table(cur): log("Creating vehicle_parts_new (partitioned)...") cur.execute(""" CREATE TABLE vehicle_parts_new ( id_vehicle_part BIGSERIAL, part_id INTEGER NOT NULL, model_year_engine_id INTEGER NOT NULL, created_at TIMESTAMPTZ DEFAULT NOW() ) PARTITION BY HASH (part_id); """) for i in range(PARTITIONS): cur.execute(f""" CREATE TABLE vehicle_parts_p{i} PARTITION OF vehicle_parts_new FOR VALUES WITH (MODULUS {PARTITIONS}, REMAINDER {i}); """) log(f"Created {PARTITIONS} partitions.") def migrate_data(cur, dry_run=False): max_id = get_max_id(cur, 'vehicle_parts') log(f"Max id_vehicle_part in source: {max_id}") if max_id == 0: log("Source table appears empty. Nothing to migrate.") return start = 1 total_inserted = 0 batch_num = 0 t0 = time.time() while start <= max_id: end = start + BATCH_SIZE - 1 if dry_run: log(f"[DRY-RUN] Would migrate id_vehicle_part {start}..{end}") else: cur.execute(""" INSERT INTO vehicle_parts_new (id_vehicle_part, part_id, model_year_engine_id, created_at) SELECT id_vehicle_part, part_id, model_year_engine_id, created_at FROM vehicle_parts WHERE id_vehicle_part BETWEEN %s AND %s ON CONFLICT DO NOTHING; """, (start, end)) inserted = cur.rowcount total_inserted += inserted batch_num += 1 if batch_num % 10 == 0: elapsed = time.time() - t0 rate = total_inserted / elapsed if elapsed > 0 else 0 log(f" Batch {batch_num}: {start}..{end} | inserted={total_inserted} | {rate:.0f} rows/s") start = end + 1 if not dry_run: log(f"Migration complete. Total inserted: {total_inserted}") def create_indexes(cur): log("Creating indexes on vehicle_parts_new...") cur.execute(""" CREATE INDEX idx_vp_new_part ON vehicle_parts_new(part_id); """) cur.execute(""" CREATE INDEX idx_vp_new_mye ON vehicle_parts_new(model_year_engine_id); """) cur.execute(""" CREATE INDEX idx_vp_new_mye_part ON vehicle_parts_new(model_year_engine_id, part_id); """) log("Indexes created.") def swap_tables(cur): log("Swapping tables (exclusive lock)...") # Brief exclusive lock on the old table cur.execute("LOCK TABLE vehicle_parts IN ACCESS EXCLUSIVE MODE;") cur.execute("ALTER TABLE vehicle_parts RENAME TO vehicle_parts_old;") cur.execute("ALTER TABLE vehicle_parts_new RENAME TO vehicle_parts;") log("Swap complete.") def validate(cur): old_count = get_row_count(cur, 'vehicle_parts_old') new_count = get_row_count(cur, 'vehicle_parts') log(f"Validation: old={old_count} | new={new_count}") if old_count != new_count: log(f"WARNING: Row count mismatch! diff={old_count - new_count}") return False log("Validation PASSED.") return True def main(): parser = argparse.ArgumentParser() parser.add_argument('--dry-run', action='store_true', help='Show what would be done without executing') parser.add_argument('--skip-swap', action='store_true', help='Create new table and migrate, but do not swap') parser.add_argument('--skip-drop', action='store_true', help='Keep old table after swap (for rollback)') args = parser.parse_args() if args.dry_run: log("=== DRY RUN ===") log("Connecting to database...") conn = get_conn() conn.autocommit = False cur = conn.cursor() cur.execute("SET synchronous_commit = off;") cur.execute("SET work_mem = '256MB';") try: # Check prerequisites if not table_exists(cur, 'vehicle_parts'): log("ERROR: vehicle_parts does not exist.") sys.exit(1) if table_exists(cur, 'vehicle_parts_new'): log("WARNING: vehicle_parts_new already exists. Dropping...") if not args.dry_run: cur.execute("DROP TABLE IF EXISTS vehicle_parts_new CASCADE;") conn.commit() # Step 1: Create partitioned table if not args.dry_run: create_partitioned_table(cur) conn.commit() # Step 2: Migrate data migrate_data(cur, dry_run=args.dry_run) if not args.dry_run: conn.commit() # Step 3: Create indexes if not args.dry_run: create_indexes(cur) conn.commit() # Step 4: Swap if not args.dry_run and not args.skip_swap: swap_tables(cur) conn.commit() # Step 5: Validate if validate(cur): if not args.skip_drop: log("Dropping old table...") cur.execute("DROP TABLE vehicle_parts_old CASCADE;") conn.commit() log("Old table dropped.") else: log("Old table kept as vehicle_parts_old.") else: log("VALIDATION FAILED. Rolling back swap...") cur.execute("ALTER TABLE vehicle_parts RENAME TO vehicle_parts_new;") cur.execute("ALTER TABLE vehicle_parts_old RENAME TO vehicle_parts;") conn.commit() log("Rollback complete.") sys.exit(1) log("Done.") except Exception as exc: conn.rollback() log(f"ERROR: {exc}") raise finally: cur.close() conn.close() if __name__ == '__main__': main()