- Corrige UNIQUE constraint que fallaba por duplicados → índice normal - Aumenta BATCH_SIZE a 10M + synchronous_commit=off para velocidad - Particionamiento completado: 2.16B filas en 16 particiones - vehicle_parts_old conservada como rollback (254 GB) - Minify script y Quart producción ya commiteados
233 lines
7.2 KiB
Python
233 lines
7.2 KiB
Python
#!/usr/bin/env python3
|
|
"""Partition vehicle_parts by HASH(part_id) into 16 partitions.
|
|
|
|
This is a HIGH-RISK operation on a 254 GB table. Run ONLY during maintenance window.
|
|
|
|
Strategy:
|
|
1. Create partitioned table vehicle_parts_new with 16 hash partitions.
|
|
2. Migrate data in batches of 500K rows (checkpoint-friendly).
|
|
3. Atomically swap: rename old -> _old, new -> vehicle_parts.
|
|
4. Validate counts and indexes.
|
|
5. Drop old table after validation (or keep for rollback).
|
|
|
|
Usage:
|
|
export MASTER_DB_URL="postgresql://postgres@/nexus_autoparts"
|
|
python3 partition_vehicle_parts.py --dry-run
|
|
python3 partition_vehicle_parts.py
|
|
|
|
Requires:
|
|
- PostgreSQL 11+ (partitioning support)
|
|
- ~300 GB free disk space during migration
|
|
- Maintenance window (table is locked briefly during swap)
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
|
|
import psycopg2
|
|
|
|
DSN = os.environ.get('MASTER_DB_URL', 'postgresql://postgres@/nexus_autoparts')
|
|
BATCH_SIZE = 10_000_000
|
|
PARTITIONS = 16
|
|
|
|
|
|
def log(msg):
|
|
print(f"[{datetime.now().isoformat(timespec='seconds')}] {msg}", flush=True)
|
|
|
|
|
|
def get_conn():
|
|
return psycopg2.connect(DSN)
|
|
|
|
|
|
def table_exists(cur, name):
|
|
cur.execute("SELECT 1 FROM pg_tables WHERE tablename = %s", (name,))
|
|
return cur.fetchone() is not None
|
|
|
|
|
|
def get_row_count(cur, name):
|
|
cur.execute(f"SELECT COUNT(*) FROM {name}")
|
|
return cur.fetchone()[0]
|
|
|
|
|
|
def get_max_id(cur, name):
|
|
cur.execute(f"SELECT MAX(id_vehicle_part) FROM {name}")
|
|
row = cur.fetchone()
|
|
return row[0] or 0
|
|
|
|
|
|
def create_partitioned_table(cur):
|
|
log("Creating vehicle_parts_new (partitioned)...")
|
|
cur.execute("""
|
|
CREATE TABLE vehicle_parts_new (
|
|
id_vehicle_part BIGSERIAL,
|
|
part_id INTEGER NOT NULL,
|
|
model_year_engine_id INTEGER NOT NULL,
|
|
created_at TIMESTAMPTZ DEFAULT NOW()
|
|
) PARTITION BY HASH (part_id);
|
|
""")
|
|
|
|
for i in range(PARTITIONS):
|
|
cur.execute(f"""
|
|
CREATE TABLE vehicle_parts_p{i}
|
|
PARTITION OF vehicle_parts_new
|
|
FOR VALUES WITH (MODULUS {PARTITIONS}, REMAINDER {i});
|
|
""")
|
|
log(f"Created {PARTITIONS} partitions.")
|
|
|
|
|
|
def migrate_data(cur, dry_run=False):
|
|
max_id = get_max_id(cur, 'vehicle_parts')
|
|
log(f"Max id_vehicle_part in source: {max_id}")
|
|
if max_id == 0:
|
|
log("Source table appears empty. Nothing to migrate.")
|
|
return
|
|
|
|
start = 1
|
|
total_inserted = 0
|
|
batch_num = 0
|
|
t0 = time.time()
|
|
|
|
while start <= max_id:
|
|
end = start + BATCH_SIZE - 1
|
|
if dry_run:
|
|
log(f"[DRY-RUN] Would migrate id_vehicle_part {start}..{end}")
|
|
else:
|
|
cur.execute("""
|
|
INSERT INTO vehicle_parts_new (id_vehicle_part, part_id, model_year_engine_id, created_at)
|
|
SELECT id_vehicle_part, part_id, model_year_engine_id, created_at
|
|
FROM vehicle_parts
|
|
WHERE id_vehicle_part BETWEEN %s AND %s
|
|
ON CONFLICT DO NOTHING;
|
|
""", (start, end))
|
|
inserted = cur.rowcount
|
|
total_inserted += inserted
|
|
batch_num += 1
|
|
if batch_num % 10 == 0:
|
|
elapsed = time.time() - t0
|
|
rate = total_inserted / elapsed if elapsed > 0 else 0
|
|
log(f" Batch {batch_num}: {start}..{end} | inserted={total_inserted} | {rate:.0f} rows/s")
|
|
start = end + 1
|
|
|
|
if not dry_run:
|
|
log(f"Migration complete. Total inserted: {total_inserted}")
|
|
|
|
|
|
def create_indexes(cur):
|
|
log("Creating indexes on vehicle_parts_new...")
|
|
cur.execute("""
|
|
CREATE INDEX idx_vp_new_part ON vehicle_parts_new(part_id);
|
|
""")
|
|
cur.execute("""
|
|
CREATE INDEX idx_vp_new_mye ON vehicle_parts_new(model_year_engine_id);
|
|
""")
|
|
cur.execute("""
|
|
CREATE INDEX idx_vp_new_mye_part ON vehicle_parts_new(model_year_engine_id, part_id);
|
|
""")
|
|
log("Indexes created.")
|
|
|
|
|
|
def swap_tables(cur):
|
|
log("Swapping tables (exclusive lock)...")
|
|
# Brief exclusive lock on the old table
|
|
cur.execute("LOCK TABLE vehicle_parts IN ACCESS EXCLUSIVE MODE;")
|
|
cur.execute("ALTER TABLE vehicle_parts RENAME TO vehicle_parts_old;")
|
|
cur.execute("ALTER TABLE vehicle_parts_new RENAME TO vehicle_parts;")
|
|
log("Swap complete.")
|
|
|
|
|
|
def validate(cur):
|
|
old_count = get_row_count(cur, 'vehicle_parts_old')
|
|
new_count = get_row_count(cur, 'vehicle_parts')
|
|
log(f"Validation: old={old_count} | new={new_count}")
|
|
if old_count != new_count:
|
|
log(f"WARNING: Row count mismatch! diff={old_count - new_count}")
|
|
return False
|
|
log("Validation PASSED.")
|
|
return True
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--dry-run', action='store_true',
|
|
help='Show what would be done without executing')
|
|
parser.add_argument('--skip-swap', action='store_true',
|
|
help='Create new table and migrate, but do not swap')
|
|
parser.add_argument('--skip-drop', action='store_true',
|
|
help='Keep old table after swap (for rollback)')
|
|
args = parser.parse_args()
|
|
|
|
if args.dry_run:
|
|
log("=== DRY RUN ===")
|
|
|
|
log("Connecting to database...")
|
|
conn = get_conn()
|
|
conn.autocommit = False
|
|
cur = conn.cursor()
|
|
cur.execute("SET synchronous_commit = off;")
|
|
cur.execute("SET work_mem = '256MB';")
|
|
|
|
try:
|
|
# Check prerequisites
|
|
if not table_exists(cur, 'vehicle_parts'):
|
|
log("ERROR: vehicle_parts does not exist.")
|
|
sys.exit(1)
|
|
|
|
if table_exists(cur, 'vehicle_parts_new'):
|
|
log("WARNING: vehicle_parts_new already exists. Dropping...")
|
|
if not args.dry_run:
|
|
cur.execute("DROP TABLE IF EXISTS vehicle_parts_new CASCADE;")
|
|
conn.commit()
|
|
|
|
# Step 1: Create partitioned table
|
|
if not args.dry_run:
|
|
create_partitioned_table(cur)
|
|
conn.commit()
|
|
|
|
# Step 2: Migrate data
|
|
migrate_data(cur, dry_run=args.dry_run)
|
|
if not args.dry_run:
|
|
conn.commit()
|
|
|
|
# Step 3: Create indexes
|
|
if not args.dry_run:
|
|
create_indexes(cur)
|
|
conn.commit()
|
|
|
|
# Step 4: Swap
|
|
if not args.dry_run and not args.skip_swap:
|
|
swap_tables(cur)
|
|
conn.commit()
|
|
|
|
# Step 5: Validate
|
|
if validate(cur):
|
|
if not args.skip_drop:
|
|
log("Dropping old table...")
|
|
cur.execute("DROP TABLE vehicle_parts_old CASCADE;")
|
|
conn.commit()
|
|
log("Old table dropped.")
|
|
else:
|
|
log("Old table kept as vehicle_parts_old.")
|
|
else:
|
|
log("VALIDATION FAILED. Rolling back swap...")
|
|
cur.execute("ALTER TABLE vehicle_parts RENAME TO vehicle_parts_new;")
|
|
cur.execute("ALTER TABLE vehicle_parts_old RENAME TO vehicle_parts;")
|
|
conn.commit()
|
|
log("Rollback complete.")
|
|
sys.exit(1)
|
|
|
|
log("Done.")
|
|
except Exception as exc:
|
|
conn.rollback()
|
|
log(f"ERROR: {exc}")
|
|
raise
|
|
finally:
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|