Files
Autoparts-DB/pos/blueprints/erp_bp.py
consultoria-as 2cfe4b3913 feat(api): add BNPL, ERP, WhatsApp Cloud, Supplier Portal stubs
- bnpl_bp.py: APLAZO/Kueski/Clip application workflow (mock)
- erp_bp.py: Aspel/CONTPAQi/SAP/Odoo sync jobs (mock)
- whatsapp_cloud_bp.py: Meta Cloud API webhook, messages, templates
- supplier_portal_bp.py: demand by zone/branch and top-parts analytics
- app.py: register all new blueprints
2026-04-29 06:31:03 +00:00

80 lines
2.3 KiB
Python

"""ERP Sync Blueprint — Integration with Aspel, CONTPAQi, SAP, Odoo.
Stubs with architecture ready for real connectors.
"""
from flask import Blueprint, request, jsonify, g
from functools import wraps
import uuid
from datetime import datetime
erp_bp = Blueprint('erp', __name__, url_prefix='/pos/api/erp')
from middleware import require_auth
# ─── Mock sync jobs ───
_mock_jobs = {}
@erp_bp.route('/providers', methods=['GET'])
@require_auth()
def list_providers():
return jsonify({
'providers': [
{'id': 'aspel_sae', 'name': 'Aspel SAE', 'type': 'file_exchange', 'enabled': False},
{'id': 'contpaqi', 'name': 'CONTPAQi', 'type': 'file_exchange', 'enabled': False},
{'id': 'sap_b1', 'name': 'SAP Business One', 'type': 'api', 'enabled': False},
{'id': 'odoo', 'name': 'Odoo', 'type': 'api', 'enabled': False},
]
})
@erp_bp.route('/sync', methods=['POST'])
@require_auth()
def start_sync():
data = request.get_json() or {}
provider = data.get('provider')
sync_type = data.get('sync_type', 'sales') # sales, inventory, customers
if not provider:
return jsonify({'error': 'provider is required'}), 400
job_id = str(uuid.uuid4())
_mock_jobs[job_id] = {
'id': job_id,
'provider': provider,
'sync_type': sync_type,
'status': 'queued',
'records_synced': 0,
'errors': [],
'created_at': datetime.utcnow().isoformat(),
'started_at': None,
'finished_at': None,
}
return jsonify(_mock_jobs[job_id]), 201
@erp_bp.route('/sync/<job_id>', methods=['GET'])
@require_auth()
def get_sync_status(job_id):
job = _mock_jobs.get(job_id)
if not job:
return jsonify({'error': 'Job not found'}), 404
return jsonify(job)
@erp_bp.route('/sync/<job_id>/run', methods=['POST'])
@require_auth()
def run_sync(job_id):
"""Mock execute sync (in production this triggers a Celery task)."""
job = _mock_jobs.get(job_id)
if not job:
return jsonify({'error': 'Job not found'}), 404
job['status'] = 'running'
job['started_at'] = datetime.utcnow().isoformat()
# Mock completion
job['status'] = 'completed'
job['records_synced'] = 42
job['finished_at'] = datetime.utcnow().isoformat()
return jsonify(job)