#!/usr/bin/env python3 """Bulk-sync parts from PostgreSQL master DB into Meilisearch. Usage: python3 scripts/sync_meilisearch.py [--clear] Requires environment variables: MASTER_DB_URL=postgresql://user:pass@localhost/nexus_autoparts """ import os import sys import argparse sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'pos')) import psycopg2 from services.meili_search import ensure_index, index_parts_bulk, clear_index, health_check def fetch_parts(conn, batch_size=5000): """Yield parts from PostgreSQL as dicts.""" cur = conn.cursor(name='parts_cursor') cur.execute(""" SELECT id_part, oem_part_number, name_part, name_es, description, description_es, image_url, group_id FROM parts ORDER BY id_part """) while True: rows = cur.fetchmany(batch_size) if not rows: break for row in rows: yield { 'id_part': row[0], 'oem_part_number': row[1], 'name_part': row[2], 'name_es': row[3] or row[2], 'description': row[4] or '', 'description_es': row[5] or '', 'image_url': row[6] or '', 'group_id': row[7], } cur.close() def main(): parser = argparse.ArgumentParser(description='Sync parts to Meilisearch') parser.add_argument('--clear', action='store_true', help='Clear index before sync') parser.add_argument('--batch-size', type=int, default=5000, help='PostgreSQL fetch batch size') parser.add_argument('--index-batch', type=int, default=1000, help='Meilisearch upload batch size') args = parser.parse_args() print("Meilisearch Sync") print("=" * 50) if not health_check(): print("ERROR: Meilisearch is not reachable.") print(f" URL: {os.environ.get('MEILI_URL', 'http://localhost:7700')}") sys.exit(1) master_db_url = os.environ.get('MASTER_DB_URL') if not master_db_url: print("ERROR: MASTER_DB_URL environment variable is required.") sys.exit(1) ensure_index() if args.clear: print("Clearing existing index...") clear_index() print(f"Connecting to PostgreSQL...") conn = psycopg2.connect(master_db_url) # Count total cur = conn.cursor() cur.execute("SELECT COUNT(*) FROM parts") total_rows = cur.fetchone()[0] cur.close() print(f"Parts to index: {total_rows}") print("Indexing...") indexed = index_parts_bulk(fetch_parts(conn, args.batch_size), batch_size=args.index_batch) conn.close() print(f"Done. Indexed {indexed} documents.") if __name__ == '__main__': main()