Initial commit: Full Crawl API implementation
Some checks failed
CI / Test (push) Has been cancelled
Deploy / Deploy to Staging (push) Has been cancelled
CI / Build & Push (push) Has been cancelled
Deploy / Deploy to Production (push) Has been cancelled

This commit is contained in:
2026-04-29 07:03:48 +00:00
commit 62994d4f3d
92 changed files with 6176 additions and 0 deletions

26
crates/worker/Cargo.toml Normal file
View File

@@ -0,0 +1,26 @@
[package]
name = "worker"
version = "0.1.0"
edition = "2021"
[dependencies]
shared = { path = "../shared" }
db = { path = "../db" }
tokio = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
redis = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["json", "env-filter"] }
chrono = { workspace = true }
thiserror = { workspace = true }
anyhow = { workspace = true }
aws-config = { workspace = true }
aws-sdk-s3 = { workspace = true }
config = { workspace = true }
tokio-util = { workspace = true }
futures = { workspace = true }
uuid = { workspace = true }
reqwest = { workspace = true }
sentry = "0.36"
sqlx = { workspace = true }

230
crates/worker/src/main.rs Normal file
View File

@@ -0,0 +1,230 @@
use chrono::Utc;
use db::connection::create_pool;
use redis::AsyncCommands;
use shared::{
config::AppConfig,
queue::{Job, JobResult, QUEUE_NAME, RESULT_PREFIX},
};
use std::time::{Duration, Instant};
use tokio::process::Command;
use tokio::time::sleep;
use tracing::{info_span, Instrument};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let sentry_dsn = std::env::var("SENTRY_DSN").ok();
let _guard = sentry_dsn.map(|dsn| {
sentry::init((dsn, sentry::ClientOptions {
release: sentry::release_name!(),
..Default::default()
}))
});
let json_logging = std::env::var("JSON_LOGGING").unwrap_or_else(|_| "false".to_string()) == "true";
if json_logging {
tracing_subscriber::registry()
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| "worker=debug".into()))
.with(
tracing_subscriber::fmt::layer()
.json()
.with_current_span(true)
.with_span_list(true)
.with_target(true),
)
.init();
} else {
tracing_subscriber::registry()
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| "worker=debug".into()))
.with(tracing_subscriber::fmt::layer())
.init();
}
let config = AppConfig::from_env()?;
let db = create_pool(&config.database_url).await?;
let redis_client = redis::Client::open(config.redis_url.clone())?;
let mut redis_conn = redis_client.get_multiplexed_tokio_connection().await?;
tracing::info!("Worker started. Waiting for jobs...");
loop {
let job_json: Option<(String, String)> = redis::cmd("BLPOP")
.arg(QUEUE_NAME)
.arg(5)
.query_async(&mut redis_conn)
.await?;
if let Some((_, json)) = job_json {
let job: Job = match serde_json::from_str(&json) {
Ok(j) => j,
Err(e) => {
tracing::error!("Failed to deserialize job: {}", e);
continue;
}
};
let span = info_span!(
"process_job",
job_id = %job.id,
user_id = %job.user_id,
endpoint = %job.endpoint,
url = %job.url,
);
process_single_job(&config, &db, &mut redis_conn, &job)
.instrument(span)
.await;
}
}
}
async fn process_single_job(
config: &AppConfig,
db: &sqlx::PgPool,
redis_conn: &mut redis::aio::MultiplexedConnection,
job: &Job,
) {
tracing::info!("Processing job {}: {} {}", job.id, job.endpoint, job.url);
let start = Instant::now();
let result = process_job_with_retry(config, job).await;
let duration = start.elapsed().as_millis() as i64;
let job_result = match result {
Ok(data) => JobResult {
id: job.id,
success: true,
data: Some(data),
error: None,
duration_ms: duration,
},
Err(e) => {
tracing::error!("Job {} failed after retries: {}", job.id, e);
JobResult {
id: job.id,
success: false,
data: None,
error: Some(e.clone()),
duration_ms: duration,
}
}
};
let result_json = serde_json::to_string(&job_result).unwrap();
let result_key = format!("{}{}", RESULT_PREFIX, job.id);
let _: () = redis_conn.set_ex(&result_key, result_json, 300).await.unwrap_or(());
let status = if job_result.success { "success" } else { "error" };
let _ = db::repos::usage_logs::create(
db,
job.user_id,
job.api_key_id,
&job.endpoint,
&job.url,
status,
1,
duration,
)
.await;
if let Some(webhook_url) = &job.webhook_url {
let _ = send_webhook(webhook_url, &job_result).await;
}
if !job_result.success {
let dlq_key = format!("crawlapi:dlq:{}", job.id);
let dlq_data = serde_json::json!({
"job": job,
"error": job_result.error,
"failed_at": Utc::now().to_rfc3339(),
});
let _: () = redis_conn.set_ex(dlq_key, dlq_data.to_string(), 86400).await.unwrap_or(());
tracing::warn!("Job {} moved to DLQ", job.id);
}
tracing::info!("Job {} completed in {}ms", job.id, duration);
}
async fn process_job_with_retry(config: &AppConfig, job: &Job) -> Result<serde_json::Value, String> {
let max_retries = 3;
let mut last_error = String::new();
for attempt in 0..max_retries {
if attempt > 0 {
let backoff = Duration::from_secs(2_u64.pow(attempt as u32));
tracing::info!(
"Retrying job {} (attempt {}/{}), waiting {:?}",
job.id,
attempt + 1,
max_retries,
backoff
);
sleep(backoff).await;
}
match process_job(config, job).await {
Ok(data) => return Ok(data),
Err(e) => {
last_error = e;
tracing::warn!(
"Job {} attempt {}/{} failed: {}",
job.id,
attempt + 1,
max_retries,
last_error
);
}
}
}
Err(format!("Failed after {} retries: {}", max_retries, last_error))
}
async fn process_job(config: &AppConfig, job: &Job) -> Result<serde_json::Value, String> {
let script_path = &config.playwright_script_path;
let mut cmd = Command::new("node");
cmd.arg(script_path)
.arg(&job.endpoint)
.arg(serde_json::to_string(&job.url).unwrap())
.arg(serde_json::to_string(&job.options).unwrap())
.env("OUTPUT_DIR", "/tmp/crawlapi")
.env("BROWSER_POOL_SIZE", std::env::var("BROWSER_POOL_SIZE").unwrap_or_else(|_| "5".to_string()))
.env("MAX_PAGES_PER_BROWSER", std::env::var("MAX_PAGES_PER_BROWSER").unwrap_or_else(|_| "10".to_string()));
if let Ok(proxy_url) = std::env::var("PROXY_URL") {
cmd.env("PROXY_URL", proxy_url);
}
if let Ok(captcha_key) = std::env::var("CAPTCHA_API_KEY") {
cmd.env("CAPTCHA_API_KEY", captcha_key);
}
let output = cmd.output()
.await
.map_err(|e| format!("Failed to execute browser: {}", e))?;
if !output.status.success() {
return Err(format!("Browser error: {}", String::from_utf8_lossy(&output.stderr)));
}
let stdout = String::from_utf8_lossy(&output.stdout);
let result: serde_json::Value = serde_json::from_str(&stdout)
.map_err(|e| format!("Invalid JSON from browser: {} | output: {}", e, stdout))?;
Ok(result)
}
async fn send_webhook(url: &str, result: &JobResult) -> Result<(), reqwest::Error> {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?;
let _ = client
.post(url)
.json(result)
.send()
.await?;
Ok(())
}