- bnpl_bp.py: APLAZO/Kueski/Clip application workflow (mock) - erp_bp.py: Aspel/CONTPAQi/SAP/Odoo sync jobs (mock) - whatsapp_cloud_bp.py: Meta Cloud API webhook, messages, templates - supplier_portal_bp.py: demand by zone/branch and top-parts analytics - app.py: register all new blueprints
80 lines
2.3 KiB
Python
80 lines
2.3 KiB
Python
"""ERP Sync Blueprint — Integration with Aspel, CONTPAQi, SAP, Odoo.
|
|
|
|
Stubs with architecture ready for real connectors.
|
|
"""
|
|
from flask import Blueprint, request, jsonify, g
|
|
from functools import wraps
|
|
import uuid
|
|
from datetime import datetime
|
|
|
|
erp_bp = Blueprint('erp', __name__, url_prefix='/pos/api/erp')
|
|
|
|
|
|
from middleware import require_auth
|
|
|
|
|
|
# ─── Mock sync jobs ───
|
|
_mock_jobs = {}
|
|
|
|
|
|
@erp_bp.route('/providers', methods=['GET'])
|
|
@require_auth()
|
|
def list_providers():
|
|
return jsonify({
|
|
'providers': [
|
|
{'id': 'aspel_sae', 'name': 'Aspel SAE', 'type': 'file_exchange', 'enabled': False},
|
|
{'id': 'contpaqi', 'name': 'CONTPAQi', 'type': 'file_exchange', 'enabled': False},
|
|
{'id': 'sap_b1', 'name': 'SAP Business One', 'type': 'api', 'enabled': False},
|
|
{'id': 'odoo', 'name': 'Odoo', 'type': 'api', 'enabled': False},
|
|
]
|
|
})
|
|
|
|
|
|
@erp_bp.route('/sync', methods=['POST'])
|
|
@require_auth()
|
|
def start_sync():
|
|
data = request.get_json() or {}
|
|
provider = data.get('provider')
|
|
sync_type = data.get('sync_type', 'sales') # sales, inventory, customers
|
|
if not provider:
|
|
return jsonify({'error': 'provider is required'}), 400
|
|
|
|
job_id = str(uuid.uuid4())
|
|
_mock_jobs[job_id] = {
|
|
'id': job_id,
|
|
'provider': provider,
|
|
'sync_type': sync_type,
|
|
'status': 'queued',
|
|
'records_synced': 0,
|
|
'errors': [],
|
|
'created_at': datetime.utcnow().isoformat(),
|
|
'started_at': None,
|
|
'finished_at': None,
|
|
}
|
|
return jsonify(_mock_jobs[job_id]), 201
|
|
|
|
|
|
@erp_bp.route('/sync/<job_id>', methods=['GET'])
|
|
@require_auth()
|
|
def get_sync_status(job_id):
|
|
job = _mock_jobs.get(job_id)
|
|
if not job:
|
|
return jsonify({'error': 'Job not found'}), 404
|
|
return jsonify(job)
|
|
|
|
|
|
@erp_bp.route('/sync/<job_id>/run', methods=['POST'])
|
|
@require_auth()
|
|
def run_sync(job_id):
|
|
"""Mock execute sync (in production this triggers a Celery task)."""
|
|
job = _mock_jobs.get(job_id)
|
|
if not job:
|
|
return jsonify({'error': 'Job not found'}), 404
|
|
job['status'] = 'running'
|
|
job['started_at'] = datetime.utcnow().isoformat()
|
|
# Mock completion
|
|
job['status'] = 'completed'
|
|
job['records_synced'] = 42
|
|
job['finished_at'] = datetime.utcnow().isoformat()
|
|
return jsonify(job)
|