""" Import/Export screen for the NEXUS AUTOPARTS console application. Provides a simple menu flow to import CSV files into the database or export data to JSON files. Uses the renderer's show_input and show_message dialogs for all user interaction. """ import csv import json import os from console.core.screens import Screen from console.core.keybindings import Key from console.config import APP_NAME, VERSION # Import type mapping: menu choice -> (label, table hint) _IMPORT_TYPES = { '1': ('Categorias', 'categories'), '2': ('Grupos', 'groups'), '3': ('Partes', 'parts'), '4': ('Fabricantes', 'manufacturers'), '5': ('Aftermarket', 'aftermarket'), '6': ('CrossRef', 'crossref'), '7': ('Fitment', 'fitment'), } # Export type mapping _EXPORT_TYPES = { '1': ('Categorias', 'categories'), '2': ('Grupos', 'groups'), '3': ('Partes', 'parts'), '4': ('Fabricantes', 'manufacturers'), '5': ('Cross-References', 'crossref'), } # Footer for the main menu _FOOTER_MENU = [ ("1-3", "Seleccionar"), ("ESC", "Atras"), ] class AdminImportScreen(Screen): """Import/Export data screen with simple menu flow.""" def __init__(self): super().__init__(name="admin_import", title="Importar / Exportar Datos") self._selected = 0 # ------------------------------------------------------------------ # Render # ------------------------------------------------------------------ def render(self, context, db, renderer): renderer.draw_header( f" {APP_NAME} v{VERSION}", " IMPORTAR / EXPORTAR DATOS ", ) menu_items = [ ("1", "Importar CSV"), ("2", "Exportar datos a JSON"), ("3", "Volver"), ] renderer.draw_menu( menu_items, selected_index=self._selected, title="IMPORTAR / EXPORTAR DATOS", ) renderer.draw_footer(_FOOTER_MENU) # ------------------------------------------------------------------ # Key handling # ------------------------------------------------------------------ def on_key(self, key, context, db, renderer, nav): # ESC or '3': go back if key == Key.ESCAPE or key == ord('3'): return "back" # Arrow navigation if key == Key.UP: if self._selected > 0: self._selected -= 1 return None if key == Key.DOWN: if self._selected < 2: self._selected += 1 return None # ENTER: activate selected if key == Key.ENTER: if self._selected == 0: self._do_import(db, renderer) elif self._selected == 1: self._do_export(db, renderer) else: return "back" return None # Direct number keys if key == ord('1'): self._do_import(db, renderer) return None if key == ord('2'): self._do_export(db, renderer) return None return None # ------------------------------------------------------------------ # Import flow # ------------------------------------------------------------------ def _do_import(self, db, renderer): """Run the CSV import flow using dialogs.""" # Ask for import type type_prompt = ( "Tipo de datos:\n" "1=Categorias 2=Grupos 3=Partes\n" "4=Fabricantes 5=Aftermarket\n" "6=CrossRef 7=Fitment" ) renderer.show_message(type_prompt, "info") type_choice = renderer.show_input("Tipo (1-7)", max_len=1) if type_choice is None or type_choice not in _IMPORT_TYPES: renderer.show_message("Tipo no valido o cancelado", "error") return type_label, type_key = _IMPORT_TYPES[type_choice] # Ask for file path file_path = renderer.show_input("Ruta del archivo CSV", max_len=60) if file_path is None or not file_path.strip(): renderer.show_message("Importacion cancelada", "info") return file_path = file_path.strip() if not os.path.isfile(file_path): renderer.show_message(f"Archivo no encontrado:\n{file_path}", "error") return # Confirm confirmed = renderer.show_message( f"Importar {type_label} desde:\n{file_path}", "confirm", ) if not confirmed: return # Process the CSV try: count = self._process_csv(db, type_key, file_path) renderer.show_message( f"Importacion completada\n{count} registros procesados", "info", ) except Exception as exc: renderer.show_message(f"Error en importacion:\n{exc}", "error") def _process_csv(self, db, type_key, file_path): """Read a CSV file and insert records into the database. Returns the number of records processed. """ count = 0 with open(file_path, newline='', encoding='utf-8') as fh: reader = csv.DictReader(fh) for row in reader: self._insert_row(db, type_key, row) count += 1 return count def _insert_row(self, db, type_key, row): """Insert a single CSV row into the appropriate table.""" if type_key == 'categories': db._execute( "INSERT INTO part_categories (name, name_es, slug, icon_name, display_order) " "VALUES (?, ?, ?, ?, ?)", ( row.get('name', ''), row.get('name_es', ''), row.get('slug', ''), row.get('icon_name', ''), int(row.get('display_order', 0) or 0), ), ) elif type_key == 'groups': db._execute( "INSERT INTO part_groups (category_id, name, name_es, slug, display_order) " "VALUES (?, ?, ?, ?, ?)", ( int(row.get('category_id', 0) or 0), row.get('name', ''), row.get('name_es', ''), row.get('slug', ''), int(row.get('display_order', 0) or 0), ), ) elif type_key == 'parts': db.create_part({ 'oem_part_number': row.get('oem_part_number', ''), 'name': row.get('name', ''), 'name_es': row.get('name_es', ''), 'group_id': int(row['group_id']) if row.get('group_id') else None, 'description': row.get('description', ''), 'description_es': row.get('description_es', ''), 'weight_kg': float(row['weight_kg']) if row.get('weight_kg') else None, 'material': row.get('material', ''), }) elif type_key == 'manufacturers': db.create_manufacturer({ 'name': row.get('name', ''), 'type': row.get('type', ''), 'quality_tier': row.get('quality_tier', ''), 'country': row.get('country', ''), 'logo_url': row.get('logo_url', ''), 'website': row.get('website', ''), }) elif type_key == 'aftermarket': db._execute( "INSERT INTO aftermarket_parts " "(oem_part_id, manufacturer_id, part_number, name, name_es, " " quality_tier, price_usd, warranty_months, in_stock) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", ( int(row.get('oem_part_id', 0) or 0), int(row.get('manufacturer_id', 0) or 0), row.get('part_number', ''), row.get('name', ''), row.get('name_es', ''), row.get('quality_tier', ''), float(row['price_usd']) if row.get('price_usd') else None, int(row['warranty_months']) if row.get('warranty_months') else None, int(row.get('in_stock', 1) or 1), ), ) elif type_key == 'crossref': db.create_crossref({ 'part_id': int(row.get('part_id', 0) or 0), 'cross_reference_number': row.get('cross_reference_number', ''), 'reference_type': row.get('reference_type', ''), 'source': row.get('source', ''), 'notes': row.get('notes', ''), }) elif type_key == 'fitment': db._execute( "INSERT INTO vehicle_parts " "(part_id, model_year_engine_id, quantity_required, position, fitment_notes) " "VALUES (?, ?, ?, ?, ?)", ( int(row.get('part_id', 0) or 0), int(row.get('model_year_engine_id', 0) or 0), int(row.get('quantity_required', 1) or 1), row.get('position', ''), row.get('fitment_notes', ''), ), ) # ------------------------------------------------------------------ # Export flow # ------------------------------------------------------------------ def _do_export(self, db, renderer): """Run the JSON export flow using dialogs.""" # Ask for export type type_prompt = ( "Tipo de datos:\n" "1=Categorias 2=Grupos 3=Partes\n" "4=Fabricantes 5=CrossRef" ) renderer.show_message(type_prompt, "info") type_choice = renderer.show_input("Tipo (1-5)", max_len=1) if type_choice is None or type_choice not in _EXPORT_TYPES: renderer.show_message("Tipo no valido o cancelado", "error") return type_label, type_key = _EXPORT_TYPES[type_choice] # Ask for output path default_name = f"{type_key}_export.json" out_path = renderer.show_input( f"Archivo de salida [{default_name}]", max_len=60 ) if out_path is None: renderer.show_message("Exportacion cancelada", "info") return if not out_path.strip(): out_path = default_name # Fetch data and write try: data = self._fetch_export_data(db, type_key) with open(out_path.strip(), 'w', encoding='utf-8') as fh: json.dump(data, fh, ensure_ascii=False, indent=2) renderer.show_message( f"Exportacion completada\n{len(data)} registros -> {out_path.strip()}", "info", ) except Exception as exc: renderer.show_message(f"Error en exportacion:\n{exc}", "error") def _fetch_export_data(self, db, type_key): """Fetch the data to export based on the type key.""" if type_key == 'categories': return db.get_categories() elif type_key == 'groups': # Export all groups across all categories categories = db.get_categories() groups = [] for cat in categories: groups.extend(db.get_groups(cat['id'])) return groups elif type_key == 'parts': return db.get_parts(page=1, per_page=100) elif type_key == 'manufacturers': return db.get_manufacturers() elif type_key == 'crossref': return db.get_crossrefs_paginated(page=1, per_page=100) return []