Files
Consultoría AS 049d2133f9 Implementación inicial del sistema de automatización de redes sociales
- Estructura completa del proyecto con FastAPI
- Modelos de base de datos (productos, servicios, posts, calendario, interacciones)
- Publishers para X, Threads, Instagram, Facebook
- Generador de contenido con DeepSeek API
- Worker de Celery con tareas programadas
- Dashboard básico con templates HTML
- Docker Compose para despliegue
- Documentación completa

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-28 01:11:44 +00:00

79 lines
2.2 KiB
Python

"""
Configuración de seguridad y autenticación.
"""
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from app.core.config import settings
# Configuración de hashing de contraseñas
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Configuración de JWT
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 horas
# Security scheme
security = HTTPBearer()
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verificar contraseña contra hash."""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Generar hash de contraseña."""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Crear token JWT."""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def decode_token(token: str) -> dict:
"""Decodificar y validar token JWT."""
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token inválido o expirado",
headers={"WWW-Authenticate": "Bearer"},
)
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> dict:
"""Obtener usuario actual desde el token."""
token = credentials.credentials
payload = decode_token(token)
username = payload.get("sub")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token inválido"
)
return {"username": username}