#!/usr/bin/env python3 """ Migración de respaldo Punto Zero (MySQL) a tenant PostgreSQL de Nexus. Uso: MYSQL_HOST=127.0.0.1 MYSQL_PORT=3307 MYSQL_DB=datos1 \ TENANT_DB_URL=postgresql://postgres@localhost/tenant_refaccionaria_la_casita \ BRANCH_ID=1 \ python3 scripts/migrate_pz_to_tenant.py Requiere pymysql y psycopg2. Instalar con: pip3 install --target /tmp/pylibs pymysql psycopg2-binary """ import os import sys # Librerías instaladas fuera del sistema porque el venv del proyecto no tiene pip sys.path.insert(0, "/tmp/pylibs") import pymysql import psycopg2 from psycopg2.extras import execute_values MYSQL_HOST = os.getenv("MYSQL_HOST", "127.0.0.1") MYSQL_PORT = int(os.getenv("MYSQL_PORT", "3307")) MYSQL_DB = os.getenv("MYSQL_DB", "datos1") MYSQL_USER = os.getenv("MYSQL_USER", "root") MYSQL_PASS = os.getenv("MYSQL_PASS", "") PG_URL = os.getenv( "TENANT_DB_URL", "postgresql://postgres@localhost/tenant_refaccionaria_la_casita", ) BRANCH_ID = int(os.getenv("BRANCH_ID", "1")) def mysql_conn(): return pymysql.connect( host=MYSQL_HOST, port=MYSQL_PORT, user=MYSQL_USER, password=MYSQL_PASS, db=MYSQL_DB, charset="latin1", cursorclass=pymysql.cursors.Cursor, ) def pg_conn(): return psycopg2.connect(PG_URL) def clean(s): if s is None: return "" return str(s).strip() def migrate_customers(mysql, pg): cur = mysql.cursor() cur.execute( """ SELECT Clave, Nombre, RFC, Domicilio, Colonia, CP, Ciudad, Estado, Telefono1, Email, LimiteCredito, Precio, Desc1, Suspendido FROM clientes """ ) rows = [] for r in cur.fetchall(): name = clean(r[1]) if not name: continue address_parts = [clean(r[3]), clean(r[4]), clean(r[6]), clean(r[7])] address = ", ".join([p for p in address_parts if p]) or None phone = clean(r[8]) or None email = clean(r[9]) or None rfc = clean(r[2]) or None cp = clean(r[5]) or None credit_limit = float(r[10] or 0) price_tier = int(r[11] if r[11] is not None else 1) if price_tier not in (1, 2, 3): price_tier = 1 max_discount = float(r[12] or 0) is_active = int(r[13] or 0) == 0 rows.append( ( BRANCH_ID, name, rfc, None, # razon_social None, # regimen_fiscal None, # uso_cfdi cp, email, phone, address, price_tier, credit_limit, 0.0, # credit_balance is_active, None, # vehicle_info max_discount, ) ) cur.close() if not rows: print("No hay clientes para migrar.") return pgcur = pg.cursor() execute_values( pgcur, """ INSERT INTO customers ( branch_id, name, rfc, razon_social, regimen_fiscal, uso_cfdi, cp, email, phone, address, price_tier, credit_limit, credit_balance, is_active, vehicle_info, max_discount_pct ) VALUES %s """, rows, ) pg.commit() pgcur.close() print(f"Migrados {len(rows)} clientes.") def migrate_inventory(mysql, pg): cur = mysql.cursor() cur.execute("SELECT Id, Marca FROM marcas") brand_map = {row[0]: clean(row[1]) for row in cur.fetchall()} cur.execute( """ SELECT p.Clave, p.Descrip, p.DescripDetallada, p.Costo, p.Precio1, p.Precio2, p.Precio3, p.IVA, p.Marca, p.Linea, p.SubLinea, p.UnidadMedida, p.Ubicacion, p.ClaveSAT, p.EnLista, p.Bloqueado FROM productos p """ ) rows = [] for r in cur.fetchall(): sku = clean(r[0]) name = clean(r[1]) if not sku or not name: continue cost = float(r[3] or 0) price_1 = float(r[4] or 0) price_2 = float(r[5] or 0) price_3 = float(r[6] or 0) iva = r[7] tax_rate = float(iva) / 100.0 if iva else 0.16 brand = brand_map.get(r[8]) or None is_active = int(r[14] or 0) == 1 and int(r[15] or 0) == 0 location = clean(r[12]) or None unit = "PZA" rows.append( ( BRANCH_ID, sku, None, # barcode name, None, # description (se omite descripción detallada por instrucción del usuario) None, # category_id brand, None, # vehicle_compatibility unit, cost, price_1, price_2, price_3, tax_rate, 0, # min_stock 0, # max_stock location, None, # image_url is_active, None, # catalog_part_id ) ) cur.close() if not rows: print("No hay productos para migrar.") return pgcur = pg.cursor() execute_values( pgcur, """ INSERT INTO inventory ( branch_id, part_number, barcode, name, description, category_id, brand, vehicle_compatibility, unit, cost, price_1, price_2, price_3, tax_rate, min_stock, max_stock, location, image_url, is_active, catalog_part_id ) VALUES %s """, rows, ) pg.commit() pgcur.close() print(f"Migrados {len(rows)} productos.") def main(): mysql = mysql_conn() pg = pg_conn() pgcur = pg.cursor() # Tenant está limpio; truncamos para partir de cero pgcur.execute( "TRUNCATE TABLE customers, inventory, inventory_stock RESTART IDENTITY CASCADE" ) pg.commit() pgcur.close() print("Tablas destino truncadas.") migrate_customers(mysql, pg) migrate_inventory(mysql, pg) mysql.close() pg.close() print("Migración completada.") if __name__ == "__main__": main()