feat(phase-6): Complete testing and deployment setup
Testing: - Add pytest configuration (pytest.ini) - Add test fixtures (tests/conftest.py) - Add ContentGenerator tests (13 tests) - Add ContentScheduler tests (16 tests) - Add PublisherManager tests (16 tests) - All 45 tests passing Production Docker: - Add docker-compose.prod.yml with healthchecks, resource limits - Add Dockerfile.prod with multi-stage build, non-root user - Add nginx.prod.conf with SSL, rate limiting, security headers - Add .env.prod.example template Maintenance Scripts: - Add backup.sh for database and media backups - Add restore.sh for database restoration - Add cleanup.sh for log rotation and Docker cleanup - Add healthcheck.sh with Telegram alerts Documentation: - Add DEPLOY.md with complete deployment guide Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
1
tests/__init__.py
Normal file
1
tests/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Tests for the social media automation system."""
|
||||
124
tests/conftest.py
Normal file
124
tests/conftest.py
Normal file
@@ -0,0 +1,124 @@
|
||||
"""
|
||||
Test fixtures and configuration.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, AsyncMock, patch
|
||||
from datetime import datetime, timedelta
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
from app.core.database import Base
|
||||
|
||||
|
||||
# In-memory SQLite for testing
|
||||
TEST_DATABASE_URL = "sqlite:///:memory:"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_engine():
|
||||
"""Create an in-memory SQLite engine for testing."""
|
||||
engine = create_engine(
|
||||
TEST_DATABASE_URL,
|
||||
connect_args={"check_same_thread": False}
|
||||
)
|
||||
Base.metadata.create_all(bind=engine)
|
||||
yield engine
|
||||
Base.metadata.drop_all(bind=engine)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_session(test_engine):
|
||||
"""Create a test database session."""
|
||||
TestSessionLocal = sessionmaker(
|
||||
autocommit=False, autoflush=False, bind=test_engine
|
||||
)
|
||||
session = TestSessionLocal()
|
||||
yield session
|
||||
session.close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_openai_client():
|
||||
"""Mock OpenAI client for DeepSeek API tests."""
|
||||
mock_client = MagicMock()
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.choices = [MagicMock()]
|
||||
mock_response.choices[0].message.content = "Generated test content #TechTip #AI"
|
||||
|
||||
mock_client.chat.completions.create.return_value = mock_response
|
||||
|
||||
return mock_client
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_httpx_client():
|
||||
"""Mock httpx client for API calls."""
|
||||
mock_client = AsyncMock()
|
||||
mock_response = AsyncMock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.json.return_value = {"id": "123", "success": True}
|
||||
mock_client.get.return_value = mock_response
|
||||
mock_client.post.return_value = mock_response
|
||||
return mock_client
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_product():
|
||||
"""Sample product data for testing."""
|
||||
return {
|
||||
"name": "Laptop HP Pavilion",
|
||||
"description": "Laptop potente para trabajo y gaming",
|
||||
"price": 15999.00,
|
||||
"category": "laptops",
|
||||
"specs": {
|
||||
"processor": "Intel Core i5",
|
||||
"ram": "16GB",
|
||||
"storage": "512GB SSD"
|
||||
},
|
||||
"highlights": ["Alta velocidad", "Diseño compacto", "Garantía 2 años"]
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_service():
|
||||
"""Sample service data for testing."""
|
||||
return {
|
||||
"name": "Automatización con IA",
|
||||
"description": "Automatiza tus procesos con inteligencia artificial",
|
||||
"category": "ai_automation",
|
||||
"target_sectors": ["retail", "manufactura", "servicios"],
|
||||
"benefits": ["Reduce costos", "Aumenta productividad", "24/7 operación"],
|
||||
"call_to_action": "Agenda una demo gratuita"
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_interaction():
|
||||
"""Sample interaction data for testing."""
|
||||
return {
|
||||
"content": "¿Qué procesador recomiendas para edición de video?",
|
||||
"type": "comment",
|
||||
"platform": "x",
|
||||
"author": "user123"
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_settings():
|
||||
"""Mock settings for testing."""
|
||||
with patch('app.core.config.settings') as mock:
|
||||
mock.DEEPSEEK_API_KEY = "test-api-key"
|
||||
mock.DEEPSEEK_BASE_URL = "https://api.deepseek.com"
|
||||
mock.BUSINESS_NAME = "Consultoría AS"
|
||||
mock.BUSINESS_LOCATION = "Tijuana, México"
|
||||
mock.BUSINESS_WEBSITE = "https://consultoria-as.com"
|
||||
mock.CONTENT_TONE = "Profesional pero accesible"
|
||||
yield mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fixed_datetime():
|
||||
"""Fixed datetime for consistent testing."""
|
||||
return datetime(2024, 6, 15, 10, 0, 0) # Saturday 10:00
|
||||
180
tests/test_content_generator.py
Normal file
180
tests/test_content_generator.py
Normal file
@@ -0,0 +1,180 @@
|
||||
"""
|
||||
Tests for ContentGenerator service.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch, AsyncMock
|
||||
|
||||
|
||||
class TestContentGenerator:
|
||||
"""Tests for the ContentGenerator class."""
|
||||
|
||||
@pytest.fixture
|
||||
def generator(self, mock_settings):
|
||||
"""Create a ContentGenerator instance with mocked client."""
|
||||
with patch('app.services.content_generator.OpenAI') as mock_openai:
|
||||
mock_client = MagicMock()
|
||||
mock_response = MagicMock()
|
||||
mock_response.choices = [MagicMock()]
|
||||
mock_response.choices[0].message.content = "Test content #AI #Tech"
|
||||
mock_client.chat.completions.create.return_value = mock_response
|
||||
mock_openai.return_value = mock_client
|
||||
|
||||
from app.services.content_generator import ContentGenerator
|
||||
gen = ContentGenerator()
|
||||
gen._client = mock_client
|
||||
yield gen
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_tip_tech(self, generator):
|
||||
"""Test generating a tech tip."""
|
||||
result = await generator.generate_tip_tech(
|
||||
category="seguridad",
|
||||
platform="x"
|
||||
)
|
||||
|
||||
assert result is not None
|
||||
assert len(result) > 0
|
||||
generator.client.chat.completions.create.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_tip_tech_with_template(self, generator):
|
||||
"""Test generating a tech tip with a template."""
|
||||
result = await generator.generate_tip_tech(
|
||||
category="productividad",
|
||||
platform="threads",
|
||||
template="Tip del día: {tip}"
|
||||
)
|
||||
|
||||
assert result is not None
|
||||
call_args = generator.client.chat.completions.create.call_args
|
||||
assert "template" in str(call_args).lower() or "TEMPLATE" in str(call_args)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_product_post(self, generator, sample_product):
|
||||
"""Test generating a product post."""
|
||||
result = await generator.generate_product_post(
|
||||
product=sample_product,
|
||||
platform="instagram"
|
||||
)
|
||||
|
||||
assert result is not None
|
||||
call_args = generator.client.chat.completions.create.call_args
|
||||
messages = call_args.kwargs.get('messages', call_args[1].get('messages', []))
|
||||
|
||||
# Verify product info was included in prompt
|
||||
user_message = messages[-1]['content']
|
||||
assert sample_product['name'] in user_message
|
||||
# Price is formatted with commas, check for the value
|
||||
assert "15,999" in user_message or "15999" in user_message
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_service_post(self, generator, sample_service):
|
||||
"""Test generating a service post."""
|
||||
result = await generator.generate_service_post(
|
||||
service=sample_service,
|
||||
platform="facebook"
|
||||
)
|
||||
|
||||
assert result is not None
|
||||
call_args = generator.client.chat.completions.create.call_args
|
||||
messages = call_args.kwargs.get('messages', call_args[1].get('messages', []))
|
||||
|
||||
user_message = messages[-1]['content']
|
||||
assert sample_service['name'] in user_message
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_thread(self, generator):
|
||||
"""Test generating a thread."""
|
||||
generator.client.chat.completions.create.return_value.choices[0].message.content = \
|
||||
"1/ Post uno\n2/ Post dos\n3/ Post tres"
|
||||
|
||||
result = await generator.generate_thread(
|
||||
topic="Cómo proteger tu contraseña",
|
||||
num_posts=3
|
||||
)
|
||||
|
||||
assert isinstance(result, list)
|
||||
assert len(result) == 3
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_response_suggestion(self, generator, sample_interaction):
|
||||
"""Test generating response suggestions."""
|
||||
generator.client.chat.completions.create.return_value.choices[0].message.content = \
|
||||
"1. Respuesta corta\n2. Respuesta media\n3. Respuesta larga"
|
||||
|
||||
result = await generator.generate_response_suggestion(
|
||||
interaction_content=sample_interaction['content'],
|
||||
interaction_type=sample_interaction['type']
|
||||
)
|
||||
|
||||
assert isinstance(result, list)
|
||||
assert len(result) <= 3
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_adapt_content_for_platform(self, generator):
|
||||
"""Test adapting content for different platforms."""
|
||||
original = "Este es un tip muy largo sobre seguridad informática con muchos detalles"
|
||||
|
||||
result = await generator.adapt_content_for_platform(
|
||||
content=original,
|
||||
target_platform="x"
|
||||
)
|
||||
|
||||
assert result is not None
|
||||
call_args = generator.client.chat.completions.create.call_args
|
||||
messages = call_args.kwargs.get('messages', call_args[1].get('messages', []))
|
||||
|
||||
user_message = messages[-1]['content']
|
||||
assert "280" in user_message # X character limit
|
||||
|
||||
def test_get_system_prompt(self, generator, mock_settings):
|
||||
"""Test that system prompt includes business info."""
|
||||
prompt = generator._get_system_prompt()
|
||||
|
||||
assert mock_settings.BUSINESS_NAME in prompt
|
||||
assert mock_settings.BUSINESS_LOCATION in prompt
|
||||
|
||||
def test_lazy_initialization_without_api_key(self):
|
||||
"""Test that client raises error without API key."""
|
||||
with patch('app.services.content_generator.settings') as mock:
|
||||
mock.DEEPSEEK_API_KEY = None
|
||||
|
||||
from app.services.content_generator import ContentGenerator
|
||||
gen = ContentGenerator()
|
||||
|
||||
with pytest.raises(ValueError, match="DEEPSEEK_API_KEY"):
|
||||
_ = gen.client
|
||||
|
||||
|
||||
class TestCharacterLimits:
|
||||
"""Tests for character limit handling."""
|
||||
|
||||
@pytest.mark.parametrize("platform,expected_limit", [
|
||||
("x", 280),
|
||||
("threads", 500),
|
||||
("instagram", 2200),
|
||||
("facebook", 500),
|
||||
])
|
||||
@pytest.mark.asyncio
|
||||
async def test_platform_character_limits(self, platform, expected_limit, mock_settings):
|
||||
"""Test that correct character limits are used per platform."""
|
||||
with patch('app.services.content_generator.OpenAI') as mock_openai:
|
||||
mock_client = MagicMock()
|
||||
mock_response = MagicMock()
|
||||
mock_response.choices = [MagicMock()]
|
||||
mock_response.choices[0].message.content = "Test"
|
||||
mock_client.chat.completions.create.return_value = mock_response
|
||||
mock_openai.return_value = mock_client
|
||||
|
||||
from app.services.content_generator import ContentGenerator
|
||||
gen = ContentGenerator()
|
||||
gen._client = mock_client
|
||||
|
||||
await gen.generate_tip_tech("test", platform)
|
||||
|
||||
call_args = mock_client.chat.completions.create.call_args
|
||||
messages = call_args.kwargs.get('messages', call_args[1].get('messages', []))
|
||||
user_message = messages[-1]['content']
|
||||
|
||||
assert str(expected_limit) in user_message
|
||||
278
tests/test_publisher_manager.py
Normal file
278
tests/test_publisher_manager.py
Normal file
@@ -0,0 +1,278 @@
|
||||
"""
|
||||
Tests for PublisherManager service.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, AsyncMock, patch
|
||||
|
||||
|
||||
class TestPublisherManager:
|
||||
"""Tests for the PublisherManager class."""
|
||||
|
||||
@pytest.fixture
|
||||
def mock_publishers(self):
|
||||
"""Create mock publishers."""
|
||||
x_publisher = MagicMock()
|
||||
x_publisher.char_limit = 280
|
||||
x_publisher.validate_content.return_value = True
|
||||
x_publisher.client = MagicMock()
|
||||
x_publisher.publish = AsyncMock(return_value=MagicMock(
|
||||
success=True, post_id="123", url="https://x.com/post/123"
|
||||
))
|
||||
|
||||
threads_publisher = MagicMock()
|
||||
threads_publisher.char_limit = 500
|
||||
threads_publisher.validate_content.return_value = True
|
||||
threads_publisher.access_token = "token"
|
||||
threads_publisher.user_id = "user123"
|
||||
threads_publisher.publish = AsyncMock(return_value=MagicMock(
|
||||
success=True, post_id="456", url="https://threads.net/post/456"
|
||||
))
|
||||
|
||||
fb_publisher = MagicMock()
|
||||
fb_publisher.char_limit = 63206
|
||||
fb_publisher.validate_content.return_value = True
|
||||
fb_publisher.access_token = "token"
|
||||
fb_publisher.page_id = "page123"
|
||||
fb_publisher.publish = AsyncMock(return_value=MagicMock(
|
||||
success=True, post_id="789"
|
||||
))
|
||||
|
||||
ig_publisher = MagicMock()
|
||||
ig_publisher.char_limit = 2200
|
||||
ig_publisher.validate_content.return_value = True
|
||||
ig_publisher.access_token = "token"
|
||||
ig_publisher.account_id = "acc123"
|
||||
ig_publisher.publish = AsyncMock(return_value=MagicMock(
|
||||
success=True, post_id="101"
|
||||
))
|
||||
|
||||
return {
|
||||
"x": x_publisher,
|
||||
"threads": threads_publisher,
|
||||
"facebook": fb_publisher,
|
||||
"instagram": ig_publisher
|
||||
}
|
||||
|
||||
@pytest.fixture
|
||||
def manager(self, mock_publishers):
|
||||
"""Create a PublisherManager with mocked publishers."""
|
||||
with patch('app.publishers.manager.XPublisher', return_value=mock_publishers["x"]), \
|
||||
patch('app.publishers.manager.ThreadsPublisher', return_value=mock_publishers["threads"]), \
|
||||
patch('app.publishers.manager.FacebookPublisher', return_value=mock_publishers["facebook"]), \
|
||||
patch('app.publishers.manager.InstagramPublisher', return_value=mock_publishers["instagram"]):
|
||||
|
||||
from app.publishers.manager import PublisherManager, Platform
|
||||
mgr = PublisherManager()
|
||||
# Override with mocks
|
||||
mgr._publishers = {
|
||||
Platform.X: mock_publishers["x"],
|
||||
Platform.THREADS: mock_publishers["threads"],
|
||||
Platform.FACEBOOK: mock_publishers["facebook"],
|
||||
Platform.INSTAGRAM: mock_publishers["instagram"],
|
||||
}
|
||||
return mgr
|
||||
|
||||
def test_init(self, manager):
|
||||
"""Test manager initialization."""
|
||||
assert manager._publishers is not None
|
||||
assert len(manager._publishers) == 4
|
||||
|
||||
def test_get_publisher(self, manager):
|
||||
"""Test getting a specific publisher."""
|
||||
from app.publishers.manager import Platform
|
||||
|
||||
publisher = manager.get_publisher(Platform.X)
|
||||
assert publisher is not None
|
||||
|
||||
def test_get_available_platforms(self, manager):
|
||||
"""Test getting available platforms."""
|
||||
available = manager.get_available_platforms()
|
||||
|
||||
assert isinstance(available, list)
|
||||
assert "x" in available
|
||||
assert "threads" in available
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_publish_single_platform(self, manager):
|
||||
"""Test publishing to a single platform."""
|
||||
from app.publishers.manager import Platform
|
||||
|
||||
result = await manager.publish(
|
||||
platform=Platform.X,
|
||||
content="Test post #Testing"
|
||||
)
|
||||
|
||||
assert result.success is True
|
||||
assert result.post_id == "123"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_publish_content_too_long(self, manager, mock_publishers):
|
||||
"""Test that too long content fails validation."""
|
||||
from app.publishers.manager import Platform
|
||||
|
||||
mock_publishers["x"].validate_content.return_value = False
|
||||
|
||||
result = await manager.publish(
|
||||
platform=Platform.X,
|
||||
content="x" * 300 # Exceeds 280 limit
|
||||
)
|
||||
|
||||
assert result.success is False
|
||||
assert "límite" in result.error_message.lower() or "excede" in result.error_message.lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_publish_unsupported_platform(self, manager):
|
||||
"""Test publishing to unsupported platform."""
|
||||
result = await manager.publish(
|
||||
platform=MagicMock(value="unsupported"),
|
||||
content="Test"
|
||||
)
|
||||
|
||||
assert result.success is False
|
||||
assert "no soportada" in result.error_message.lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_publish_to_multiple_parallel(self, manager):
|
||||
"""Test publishing to multiple platforms in parallel."""
|
||||
from app.publishers.manager import Platform
|
||||
|
||||
result = await manager.publish_to_multiple(
|
||||
platforms=[Platform.X, Platform.THREADS],
|
||||
content="Multi-platform test #Test",
|
||||
parallel=True
|
||||
)
|
||||
|
||||
assert result.success is True
|
||||
assert len(result.successful_platforms) >= 1
|
||||
assert "x" in result.results
|
||||
assert "threads" in result.results
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_publish_to_multiple_sequential(self, manager):
|
||||
"""Test publishing to multiple platforms sequentially."""
|
||||
from app.publishers.manager import Platform
|
||||
|
||||
result = await manager.publish_to_multiple(
|
||||
platforms=[Platform.X, Platform.FACEBOOK],
|
||||
content="Sequential test",
|
||||
parallel=False
|
||||
)
|
||||
|
||||
assert result.success is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_publish_to_multiple_with_dict_content(self, manager):
|
||||
"""Test publishing with platform-specific content."""
|
||||
from app.publishers.manager import Platform
|
||||
|
||||
content = {
|
||||
"x": "Short post for X #X",
|
||||
"threads": "Longer post for Threads with more details #Threads"
|
||||
}
|
||||
|
||||
result = await manager.publish_to_multiple(
|
||||
platforms=[Platform.X, Platform.THREADS],
|
||||
content=content
|
||||
)
|
||||
|
||||
assert result.success is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_publish_with_image_meta_platforms(self, manager, mock_publishers):
|
||||
"""Test that Meta platforms get public image URL."""
|
||||
from app.publishers.manager import Platform
|
||||
|
||||
with patch('app.publishers.manager.image_upload') as mock_upload:
|
||||
mock_upload.upload_from_path = AsyncMock(
|
||||
return_value="https://imgbb.com/image.jpg"
|
||||
)
|
||||
|
||||
result = await manager.publish(
|
||||
platform=Platform.THREADS,
|
||||
content="Post with image",
|
||||
image_path="/local/image.jpg"
|
||||
)
|
||||
|
||||
mock_upload.upload_from_path.assert_called_once_with("/local/image.jpg")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_test_connection(self, manager, mock_publishers):
|
||||
"""Test connection testing."""
|
||||
from app.publishers.manager import Platform
|
||||
|
||||
mock_publishers["x"].client.get_me.return_value = MagicMock(
|
||||
data=MagicMock(username="testuser", name="Test", id=123)
|
||||
)
|
||||
|
||||
result = await manager.test_connection(Platform.X)
|
||||
|
||||
assert result["platform"] == "x"
|
||||
assert result["configured"] is True
|
||||
assert result["connected"] is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_test_all_connections(self, manager, mock_publishers):
|
||||
"""Test testing all connections."""
|
||||
mock_publishers["x"].client.get_me.return_value = MagicMock(
|
||||
data=MagicMock(username="test", name="Test", id=123)
|
||||
)
|
||||
|
||||
results = await manager.test_all_connections()
|
||||
|
||||
assert len(results) == 4
|
||||
assert "x" in results
|
||||
assert "threads" in results
|
||||
|
||||
|
||||
class TestMultiPublishResult:
|
||||
"""Tests for MultiPublishResult dataclass."""
|
||||
|
||||
def test_successful_platforms(self):
|
||||
"""Test getting successful platforms."""
|
||||
from app.publishers.manager import MultiPublishResult
|
||||
from app.publishers.base import PublishResult
|
||||
|
||||
results = {
|
||||
"x": PublishResult(success=True, post_id="123"),
|
||||
"threads": PublishResult(success=False, error_message="Error"),
|
||||
"facebook": PublishResult(success=True, post_id="456")
|
||||
}
|
||||
|
||||
multi = MultiPublishResult(success=True, results=results, errors=[])
|
||||
|
||||
assert set(multi.successful_platforms) == {"x", "facebook"}
|
||||
|
||||
def test_failed_platforms(self):
|
||||
"""Test getting failed platforms."""
|
||||
from app.publishers.manager import MultiPublishResult
|
||||
from app.publishers.base import PublishResult
|
||||
|
||||
results = {
|
||||
"x": PublishResult(success=True, post_id="123"),
|
||||
"threads": PublishResult(success=False, error_message="Error"),
|
||||
}
|
||||
|
||||
multi = MultiPublishResult(success=True, results=results, errors=[])
|
||||
|
||||
assert multi.failed_platforms == ["threads"]
|
||||
|
||||
|
||||
class TestPlatformEnum:
|
||||
"""Tests for Platform enum."""
|
||||
|
||||
def test_platform_values(self):
|
||||
"""Test platform enum values."""
|
||||
from app.publishers.manager import Platform
|
||||
|
||||
assert Platform.X.value == "x"
|
||||
assert Platform.THREADS.value == "threads"
|
||||
assert Platform.FACEBOOK.value == "facebook"
|
||||
assert Platform.INSTAGRAM.value == "instagram"
|
||||
|
||||
def test_platform_is_string_enum(self):
|
||||
"""Test platform enum is string."""
|
||||
from app.publishers.manager import Platform
|
||||
|
||||
assert isinstance(Platform.X, str)
|
||||
assert Platform.X == "x"
|
||||
259
tests/test_scheduler.py
Normal file
259
tests/test_scheduler.py
Normal file
@@ -0,0 +1,259 @@
|
||||
"""
|
||||
Tests for ContentScheduler service.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
|
||||
class TestContentScheduler:
|
||||
"""Tests for the ContentScheduler class."""
|
||||
|
||||
@pytest.fixture
|
||||
def mock_db_session(self):
|
||||
"""Create a mock database session."""
|
||||
mock_session = MagicMock()
|
||||
mock_query = MagicMock()
|
||||
mock_query.filter.return_value = mock_query
|
||||
mock_query.first.return_value = None # No existing posts
|
||||
mock_query.all.return_value = []
|
||||
mock_query.order_by.return_value = mock_query
|
||||
mock_session.query.return_value = mock_query
|
||||
return mock_session
|
||||
|
||||
@pytest.fixture
|
||||
def scheduler(self, mock_db_session):
|
||||
"""Create a ContentScheduler with mocked database."""
|
||||
with patch('app.services.scheduler.SessionLocal', return_value=mock_db_session):
|
||||
from app.services.scheduler import ContentScheduler
|
||||
return ContentScheduler()
|
||||
|
||||
def test_init(self, scheduler):
|
||||
"""Test scheduler initialization."""
|
||||
assert scheduler.posting_times is not None
|
||||
assert "x" in scheduler.posting_times
|
||||
assert "threads" in scheduler.posting_times
|
||||
|
||||
def test_get_next_available_slot_weekday(self, scheduler, mock_db_session, fixed_datetime):
|
||||
"""Test getting next slot on a weekday."""
|
||||
# Monday 10:00
|
||||
weekday = datetime(2024, 6, 17, 10, 0, 0)
|
||||
|
||||
with patch('app.services.scheduler.SessionLocal', return_value=mock_db_session):
|
||||
result = scheduler.get_next_available_slot("x", after=weekday)
|
||||
|
||||
assert result is not None
|
||||
assert result > weekday
|
||||
|
||||
def test_get_next_available_slot_weekend(self, scheduler, mock_db_session):
|
||||
"""Test getting next slot on a weekend."""
|
||||
# Saturday 10:00
|
||||
weekend = datetime(2024, 6, 15, 10, 0, 0)
|
||||
|
||||
with patch('app.services.scheduler.SessionLocal', return_value=mock_db_session):
|
||||
result = scheduler.get_next_available_slot("x", after=weekend)
|
||||
|
||||
assert result is not None
|
||||
|
||||
def test_get_next_available_slot_late_night(self, scheduler, mock_db_session):
|
||||
"""Test that late night moves to next day."""
|
||||
# 11 PM
|
||||
late_night = datetime(2024, 6, 17, 23, 0, 0)
|
||||
|
||||
with patch('app.services.scheduler.SessionLocal', return_value=mock_db_session):
|
||||
result = scheduler.get_next_available_slot("x", after=late_night)
|
||||
|
||||
# Should be next day
|
||||
assert result.date() > late_night.date()
|
||||
|
||||
def test_get_available_slots(self, scheduler, mock_db_session, fixed_datetime):
|
||||
"""Test getting all available slots."""
|
||||
with patch('app.services.scheduler.SessionLocal', return_value=mock_db_session):
|
||||
slots = scheduler.get_available_slots(
|
||||
platform="x",
|
||||
start_date=fixed_datetime,
|
||||
days=3
|
||||
)
|
||||
|
||||
assert len(slots) > 0
|
||||
for slot in slots:
|
||||
assert slot.platform == "x"
|
||||
assert slot.available is True
|
||||
|
||||
def test_schedule_post(self, scheduler, mock_db_session):
|
||||
"""Test scheduling a post."""
|
||||
mock_post = MagicMock()
|
||||
mock_post.id = 1
|
||||
mock_post.platforms = ["x"]
|
||||
mock_post.status = "draft"
|
||||
|
||||
mock_db_session.query.return_value.filter.return_value.first.return_value = mock_post
|
||||
|
||||
with patch('app.services.scheduler.SessionLocal', return_value=mock_db_session):
|
||||
result = scheduler.schedule_post(
|
||||
post_id=1,
|
||||
scheduled_at=datetime(2024, 6, 20, 12, 0, 0)
|
||||
)
|
||||
|
||||
assert result == datetime(2024, 6, 20, 12, 0, 0)
|
||||
assert mock_post.status == "scheduled"
|
||||
mock_db_session.commit.assert_called_once()
|
||||
|
||||
def test_schedule_post_auto_time(self, scheduler, mock_db_session):
|
||||
"""Test scheduling with auto-selected time."""
|
||||
mock_post = MagicMock()
|
||||
mock_post.id = 1
|
||||
mock_post.platforms = ["x"]
|
||||
|
||||
# First call returns the post, second returns None (no conflicts)
|
||||
mock_db_session.query.return_value.filter.return_value.first.side_effect = [
|
||||
mock_post, None, None, None, None, None, None
|
||||
]
|
||||
|
||||
with patch('app.services.scheduler.SessionLocal', return_value=mock_db_session):
|
||||
result = scheduler.schedule_post(post_id=1)
|
||||
|
||||
assert result is not None
|
||||
assert mock_post.scheduled_at is not None
|
||||
|
||||
def test_schedule_post_not_found(self, scheduler, mock_db_session):
|
||||
"""Test scheduling a non-existent post."""
|
||||
mock_db_session.query.return_value.filter.return_value.first.return_value = None
|
||||
|
||||
with patch('app.services.scheduler.SessionLocal', return_value=mock_db_session):
|
||||
with pytest.raises(ValueError, match="not found"):
|
||||
scheduler.schedule_post(post_id=999)
|
||||
|
||||
def test_reschedule_post(self, scheduler, mock_db_session):
|
||||
"""Test rescheduling a post."""
|
||||
mock_post = MagicMock()
|
||||
mock_post.id = 1
|
||||
mock_post.status = "scheduled"
|
||||
|
||||
mock_db_session.query.return_value.filter.return_value.first.return_value = mock_post
|
||||
|
||||
new_time = datetime(2024, 6, 25, 14, 0, 0)
|
||||
|
||||
with patch('app.services.scheduler.SessionLocal', return_value=mock_db_session):
|
||||
result = scheduler.reschedule_post(post_id=1, new_time=new_time)
|
||||
|
||||
assert result is True
|
||||
assert mock_post.scheduled_at == new_time
|
||||
mock_db_session.commit.assert_called_once()
|
||||
|
||||
def test_reschedule_published_post_fails(self, scheduler, mock_db_session):
|
||||
"""Test that published posts cannot be rescheduled."""
|
||||
mock_post = MagicMock()
|
||||
mock_post.status = "published"
|
||||
|
||||
mock_db_session.query.return_value.filter.return_value.first.return_value = mock_post
|
||||
|
||||
with patch('app.services.scheduler.SessionLocal', return_value=mock_db_session):
|
||||
result = scheduler.reschedule_post(
|
||||
post_id=1,
|
||||
new_time=datetime(2024, 6, 25, 14, 0, 0)
|
||||
)
|
||||
|
||||
assert result is False
|
||||
|
||||
def test_cancel_scheduled(self, scheduler, mock_db_session):
|
||||
"""Test canceling a scheduled post."""
|
||||
mock_post = MagicMock()
|
||||
mock_post.id = 1
|
||||
mock_post.status = "scheduled"
|
||||
mock_post.scheduled_at = datetime(2024, 6, 20, 12, 0, 0)
|
||||
|
||||
mock_db_session.query.return_value.filter.return_value.first.return_value = mock_post
|
||||
|
||||
with patch('app.services.scheduler.SessionLocal', return_value=mock_db_session):
|
||||
result = scheduler.cancel_scheduled(post_id=1)
|
||||
|
||||
assert result is True
|
||||
assert mock_post.status == "draft"
|
||||
assert mock_post.scheduled_at is None
|
||||
|
||||
def test_get_calendar(self, scheduler, mock_db_session):
|
||||
"""Test getting calendar view."""
|
||||
mock_posts = [
|
||||
MagicMock(
|
||||
id=1,
|
||||
content="Test post 1",
|
||||
platforms=["x"],
|
||||
status="scheduled",
|
||||
scheduled_at=datetime(2024, 6, 17, 12, 0, 0),
|
||||
content_type="tip"
|
||||
),
|
||||
MagicMock(
|
||||
id=2,
|
||||
content="Test post 2",
|
||||
platforms=["threads"],
|
||||
status="scheduled",
|
||||
scheduled_at=datetime(2024, 6, 17, 14, 0, 0),
|
||||
content_type="product"
|
||||
)
|
||||
]
|
||||
|
||||
mock_query = MagicMock()
|
||||
mock_query.filter.return_value = mock_query
|
||||
mock_query.order_by.return_value = mock_query
|
||||
mock_query.all.return_value = mock_posts
|
||||
mock_db_session.query.return_value = mock_query
|
||||
|
||||
with patch('app.services.scheduler.SessionLocal', return_value=mock_db_session):
|
||||
result = scheduler.get_calendar(
|
||||
start_date=datetime(2024, 6, 15),
|
||||
end_date=datetime(2024, 6, 20)
|
||||
)
|
||||
|
||||
assert "2024-06-17" in result
|
||||
assert len(result["2024-06-17"]) == 2
|
||||
|
||||
def test_auto_fill_calendar(self, scheduler, mock_db_session, fixed_datetime):
|
||||
"""Test auto-filling calendar with suggested slots."""
|
||||
with patch('app.services.scheduler.SessionLocal', return_value=mock_db_session):
|
||||
slots = scheduler.auto_fill_calendar(
|
||||
start_date=fixed_datetime,
|
||||
days=3,
|
||||
platforms=["x", "threads"]
|
||||
)
|
||||
|
||||
assert len(slots) > 0
|
||||
# Should be sorted by datetime
|
||||
for i in range(1, len(slots)):
|
||||
assert slots[i].datetime >= slots[i-1].datetime
|
||||
|
||||
|
||||
class TestOptimalTimes:
|
||||
"""Tests for optimal posting times configuration."""
|
||||
|
||||
def test_x_has_weekday_times(self):
|
||||
"""Test that X platform has weekday times defined."""
|
||||
from app.data.content_templates import OPTIMAL_POSTING_TIMES
|
||||
|
||||
assert "x" in OPTIMAL_POSTING_TIMES
|
||||
assert "weekday" in OPTIMAL_POSTING_TIMES["x"]
|
||||
assert len(OPTIMAL_POSTING_TIMES["x"]["weekday"]) > 0
|
||||
|
||||
def test_all_platforms_have_times(self):
|
||||
"""Test all platforms have posting times."""
|
||||
from app.data.content_templates import OPTIMAL_POSTING_TIMES
|
||||
|
||||
expected_platforms = ["x", "threads", "instagram", "facebook"]
|
||||
|
||||
for platform in expected_platforms:
|
||||
assert platform in OPTIMAL_POSTING_TIMES
|
||||
assert "weekday" in OPTIMAL_POSTING_TIMES[platform]
|
||||
assert "weekend" in OPTIMAL_POSTING_TIMES[platform]
|
||||
|
||||
def test_time_format(self):
|
||||
"""Test that times are in correct HH:MM format."""
|
||||
from app.data.content_templates import OPTIMAL_POSTING_TIMES
|
||||
|
||||
import re
|
||||
time_pattern = re.compile(r'^([01]?[0-9]|2[0-3]):[0-5][0-9]$')
|
||||
|
||||
for platform, times in OPTIMAL_POSTING_TIMES.items():
|
||||
for day_type in ["weekday", "weekend"]:
|
||||
for time_str in times.get(day_type, []):
|
||||
assert time_pattern.match(time_str), f"Invalid time format: {time_str}"
|
||||
Reference in New Issue
Block a user