"""ERP Sync Engine: Aspel SAE, Contpaqi, SAP B1, Odoo integration stubs. This module provides the architecture for bidirectional sync between Nexus POS and external ERP systems. Actual implementations require: - Aspel SAE: ODBC connection or REST API via Aspel NOI/SAE middleware - Contpaqi: Contpaqi SDK or REST API via Facturama/Contpaqi middleware - SAP B1: Service Layer REST API - Odoo: XML-RPC or JSON-RPC API Tables (erp_sync_configs, erp_sync_logs, erp_sync_queue) created in v2.6. """ from datetime import datetime ERP_HANDLERS = {} def register_erp_handler(erp_type, handler_class): ERP_HANDLERS[erp_type] = handler_class def get_erp_handler(erp_type): return ERP_HANDLERS.get(erp_type) # ─── ERP Sync Configuration ───────────────────────────── def create_sync_config(conn, tenant_id, erp_type, api_endpoint, api_username, api_password, database_name=None, company_code=None, sync_direction='bidirectional', sync_inventory=False, sync_sales=False, sync_customers=False, sync_frequency_minutes=60): cur = conn.cursor() cur.execute(""" INSERT INTO erp_sync_configs (tenant_id, erp_type, api_endpoint, api_username, api_password_encrypted, database_name, company_code, sync_direction, sync_inventory, sync_sales, sync_customers, sync_frequency_minutes) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id """, (tenant_id, erp_type, api_endpoint, api_username, api_password, database_name, company_code, sync_direction, sync_inventory, sync_sales, sync_customers, sync_frequency_minutes)) config_id = cur.fetchone()[0] conn.commit() cur.close() return config_id def get_sync_config(conn, tenant_id, erp_type): cur = conn.cursor() cur.execute(""" SELECT id, tenant_id, erp_type, is_active, api_endpoint, api_username, database_name, company_code, sync_direction, sync_inventory, sync_sales, sync_customers, sync_frequency_minutes, last_sync_at, last_sync_error, created_at FROM erp_sync_configs WHERE tenant_id = %s AND erp_type = %s """, (tenant_id, erp_type)) row = cur.fetchone() cur.close() if not row: return None return { 'id': row[0], 'tenant_id': row[1], 'erp_type': row[2], 'is_active': row[3], 'api_endpoint': row[4], 'api_username': row[5], 'database_name': row[6], 'company_code': row[7], 'sync_direction': row[8], 'sync_inventory': row[9], 'sync_sales': row[10], 'sync_customers': row[11], 'sync_frequency_minutes': row[12], 'last_sync_at': str(row[13]) if row[13] else None, 'last_sync_error': row[14], 'created_at': str(row[15]), } # ─── Sync Queue ───────────────────────────── def queue_for_sync(conn, config_id, entity_type, entity_id, action='update', priority=5): """Add an entity to the sync queue.""" cur = conn.cursor() cur.execute(""" INSERT INTO erp_sync_queue (config_id, entity_type, entity_id, action, priority) VALUES (%s, %s, %s, %s, %s) ON CONFLICT DO NOTHING RETURNING id """, (config_id, entity_type, entity_id, action, priority)) row = cur.fetchone() conn.commit() cur.close() return row[0] if row else None def get_pending_queue(conn, config_id, limit=100): cur = conn.cursor() cur.execute(""" SELECT id, entity_type, entity_id, action, priority, retry_count, created_at FROM erp_sync_queue WHERE config_id = %s AND status IN ('pending', 'failed') ORDER BY priority ASC, created_at ASC LIMIT %s """, (config_id, limit)) items = [] for r in cur.fetchall(): items.append({ 'id': r[0], 'entity_type': r[1], 'entity_id': r[2], 'action': r[3], 'priority': r[4], 'retry_count': r[5], 'created_at': str(r[6]), }) cur.close() return items def mark_queue_item(conn, queue_id, status, error=None): cur = conn.cursor() if status == 'completed': cur.execute(""" UPDATE erp_sync_queue SET status = %s, processed_at = NOW(), last_error = NULL WHERE id = %s """, (status, queue_id)) elif status == 'failed': cur.execute(""" UPDATE erp_sync_queue SET status = %s, retry_count = retry_count + 1, last_error = %s WHERE id = %s """, (status, error, queue_id)) conn.commit() cur.close() # ─── Sync Logs ───────────────────────────── def log_sync_run(conn, config_id, sync_type, direction, status, records_processed=0, records_failed=0, error_message=None, details=None): cur = conn.cursor() cur.execute(""" INSERT INTO erp_sync_logs (config_id, sync_type, direction, status, records_processed, records_failed, error_message, details) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) RETURNING id """, (config_id, sync_type, direction, status, records_processed, records_failed, error_message, details)) log_id = cur.fetchone()[0] conn.commit() cur.close() return log_id def complete_sync_run(conn, log_id, status, records_processed, records_failed, error_message=None, details=None): cur = conn.cursor() cur.execute(""" UPDATE erp_sync_logs SET status = %s, records_processed = %s, records_failed = %s, error_message = %s, details = COALESCE(details, '{}'::jsonb) || COALESCE(%s, '{}'::jsonb), completed_at = NOW() WHERE id = %s """, (status, records_processed, records_failed, error_message, details, log_id)) conn.commit() cur.close() # ─── Generic Sync Runner ───────────────────────────── def run_sync(conn, config_id, sync_type='full'): """Run a sync job for a configured ERP. This is the main entry point. It looks up the config, instantiates the appropriate handler, and executes the sync. """ cur = conn.cursor() cur.execute("SELECT erp_type, tenant_id FROM erp_sync_configs WHERE id = %s", (config_id,)) row = cur.fetchone() cur.close() if not row: raise ValueError(f"Sync config {config_id} not found") erp_type, tenant_id = row handler_class = get_erp_handler(erp_type) if not handler_class: log_sync_run(conn, config_id, sync_type, 'to_erp', 'failed', error_message=f"No handler registered for ERP type: {erp_type}") return {'status': 'failed', 'error': f'No handler for {erp_type}'} handler = handler_class(conn, config_id, tenant_id) return handler.sync(sync_type) # ─── Base ERP Handler Class ───────────────────────────── class BaseERPHandler: """Base class for ERP-specific sync handlers.""" def __init__(self, conn, config_id, tenant_id): self.conn = conn self.config_id = config_id self.tenant_id = tenant_id def sync(self, sync_type): raise NotImplementedError def sync_inventory_to_erp(self): raise NotImplementedError def sync_inventory_from_erp(self): raise NotImplementedError def sync_sales_to_erp(self): raise NotImplementedError def sync_customers_to_erp(self): raise NotImplementedError def sync_customers_from_erp(self): raise NotImplementedError # ─── Aspel SAE Stub ───────────────────────────── class AspelSAEHandler(BaseERPHandler): """Stub for Aspel SAE sync. Aspel SAE is a desktop ERP. Integration options: 1. ODBC direct to the SQL Server / Interbase database 2. Aspel REST API (via Aspel NOI middleware) 3. File-based sync (CSV/XML export/import) Recommended: Option 2 (REST API) if available, else Option 3. """ def sync(self, sync_type): log_id = log_sync_run(self.conn, self.config_id, sync_type, 'bidirectional', 'running') # Stub: would call Aspel API here complete_sync_run( self.conn, log_id, 'failed', 0, 0, error_message='Aspel SAE handler not implemented. Requires Aspel REST API or ODBC connection.', details={'message': 'Stub implementation. Activate when Aspel credentials are available.'} ) return {'status': 'failed', 'error': 'Aspel SAE handler not implemented'} # ─── Contpaqi Stub ───────────────────────────── class ContpaqiHandler(BaseERPHandler): """Stub for Contpaqi sync. Contpaqi integration options: 1. Contpaqi SDK (COM/ActiveX on Windows) 2. Contpaqi REST API (via middleware like Facturama) 3. Direct database access to Firebird/Interbase Recommended: Option 2 for cloud deployments. """ def sync(self, sync_type): log_id = log_sync_run(self.conn, self.config_id, sync_type, 'bidirectional', 'running') complete_sync_run( self.conn, log_id, 'failed', 0, 0, error_message='Contpaqi handler not implemented. Requires Contpaqi SDK or REST middleware.', details={'message': 'Stub implementation. Activate when Contpaqi credentials are available.'} ) return {'status': 'failed', 'error': 'Contpaqi handler not implemented'} # ─── SAP B1 Stub ───────────────────────────── class SAPB1Handler(BaseERPHandler): """Stub for SAP Business One sync. SAP B1 provides a Service Layer REST API (OData-based). Requires: Service Layer URL, Company DB, username, password. """ def sync(self, sync_type): log_id = log_sync_run(self.conn, self.config_id, sync_type, 'bidirectional', 'running') complete_sync_run( self.conn, log_id, 'failed', 0, 0, error_message='SAP B1 handler not implemented. Requires Service Layer endpoint.', details={'message': 'Stub implementation. Activate when SAP B1 Service Layer is available.'} ) return {'status': 'failed', 'error': 'SAP B1 handler not implemented'} # ─── Odoo Stub ───────────────────────────── class OdooHandler(BaseERPHandler): """Stub for Odoo sync. Odoo provides XML-RPC and JSON-RPC APIs. Requires: URL, database name, username, API key. """ def sync(self, sync_type): log_id = log_sync_run(self.conn, self.config_id, sync_type, 'bidirectional', 'running') complete_sync_run( self.conn, log_id, 'failed', 0, 0, error_message='Odoo handler not implemented. Requires Odoo URL and API credentials.', details={'message': 'Stub implementation. Activate when Odoo credentials are available.'} ) return {'status': 'failed', 'error': 'Odoo handler not implemented'} # Register handlers register_erp_handler('aspel_sae', AspelSAEHandler) register_erp_handler('contpaqi', ContpaqiHandler) register_erp_handler('sap_b1', SAPB1Handler) register_erp_handler('odoo', OdooHandler)