"""Webhook dispatch service for dropshipping and external integrations. Sends POST requests to configured target URLs with retry logic. Can be called synchronously or enqueued via Celery. """ import json import logging import requests import threading from typing import Optional logger = logging.getLogger(__name__) def _send_post(url: str, payload: dict, headers: Optional[dict] = None, timeout: int = 10): """Send a POST request and return (success, status_code, response_text).""" default_headers = {"Content-Type": "application/json"} if headers: default_headers.update(headers) try: resp = requests.post(url, json=payload, headers=default_headers, timeout=timeout) success = 200 <= resp.status_code < 300 if not success: logger.warning("Webhook %s returned %s: %s", url, resp.status_code, resp.text[:200]) return success, resp.status_code, resp.text except requests.exceptions.Timeout: logger.warning("Webhook %s timed out after %ss", url, timeout) return False, 0, "timeout" except Exception as e: logger.warning("Webhook %s failed: %s", url, e) return False, 0, str(e) def dispatch_webhook_sync(target_url: str, event_type: str, payload: dict, secret: Optional[str] = None): """Send webhook synchronously (use inside Celery tasks for async).""" full_payload = { "event": event_type, "data": payload, } headers = {} if secret: headers["X-Webhook-Secret"] = secret success, status, body = _send_post(target_url, full_payload, headers=headers) return {"success": success, "status": status, "body": body[:500]} def dispatch_webhooks_bulk(target_urls: list[str], event_type: str, payload: dict, secret: Optional[str] = None): """Dispatch to multiple URLs concurrently using threads.""" results = [] threads = [] def _send(url): result = dispatch_webhook_sync(url, event_type, payload, secret=secret) results.append({"url": url, **result}) for url in target_urls: t = threading.Thread(target=_send, args=(url,)) t.start() threads.append(t) for t in threads: t.join(timeout=15) return results