"""Public API Engine: API key management, rate limiting, request logging. Provides: - API key generation and validation (SHA-256 hashed) - Per-key rate limiting (requests per minute / day) - Request logging for analytics and abuse detection """ import hashlib import json import secrets import time from datetime import datetime, timedelta def generate_api_key(): """Generate a secure API key. Returns (full_key, key_hash, key_prefix).""" full_key = 'nx_' + secrets.token_urlsafe(32) key_hash = hashlib.sha256(full_key.encode()).hexdigest() key_prefix = full_key[:8] return full_key, key_hash, key_prefix def hash_api_key(full_key): return hashlib.sha256(full_key.encode()).hexdigest() def create_api_key(conn, tenant_id, name, scopes=None, rate_limit_rpm=60, rate_limit_rpd=10000, created_by=None, expires_at=None): """Create a new API key. Returns (key_id, full_key).""" full_key, key_hash, key_prefix = generate_api_key() cur = conn.cursor() cur.execute(""" INSERT INTO api_keys (tenant_id, name, key_hash, key_prefix, scopes, rate_limit_rpm, rate_limit_rpd, created_by, expires_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id """, (tenant_id, name, key_hash, key_prefix, json.dumps(scopes) if scopes else '["read"]', rate_limit_rpm, rate_limit_rpd, created_by, expires_at)) key_id = cur.fetchone()[0] conn.commit() cur.close() return key_id, full_key def validate_api_key(conn, full_key): """Validate an API key. Returns dict with key info or None.""" key_hash = hash_api_key(full_key) cur = conn.cursor() cur.execute(""" SELECT id, tenant_id, name, scopes, rate_limit_rpm, rate_limit_rpd, is_active, expires_at FROM api_keys WHERE key_hash = %s """, (key_hash,)) row = cur.fetchone() cur.close() if not row: return None key_id, tenant_id, name, scopes, rpm, rpd, is_active, expires = row if not is_active: return {'valid': False, 'reason': 'inactive'} if expires and datetime.utcnow() > expires: return {'valid': False, 'reason': 'expired'} return { 'valid': True, 'key_id': key_id, 'tenant_id': tenant_id, 'name': name, 'scopes': scopes, 'rate_limit_rpm': rpm, 'rate_limit_rpd': rpd, } def check_rate_limit(conn, key_id, rpm, rpd): """Check if API key is within rate limits. Returns (allowed, headers).""" cur = conn.cursor() now = datetime.utcnow() # Minute window minute_start = now.replace(second=0, microsecond=0) cur.execute(""" SELECT COALESCE(SUM(request_count), 0) FROM api_rate_limit_counters WHERE api_key_id = %s AND window_type = 'minute' AND window_start = %s """, (key_id, minute_start)) minute_count = cur.fetchone()[0] or 0 # Day window day_start = now.replace(hour=0, minute=0, second=0, microsecond=0) cur.execute(""" SELECT COALESCE(SUM(request_count), 0) FROM api_rate_limit_counters WHERE api_key_id = %s AND window_type = 'day' AND window_start = %s """, (key_id, day_start)) day_count = cur.fetchone()[0] or 0 allowed = minute_count < rpm and day_count < rpd headers = { 'X-RateLimit-Limit-Minute': str(rpm), 'X-RateLimit-Remaining-Minute': str(max(0, rpm - minute_count - 1)), 'X-RateLimit-Limit-Day': str(rpd), 'X-RateLimit-Remaining-Day': str(max(0, rpd - day_count - 1)), } cur.close() return allowed, headers def increment_rate_limit(conn, key_id): """Increment request counters for an API key.""" cur = conn.cursor() now = datetime.utcnow() minute_start = now.replace(second=0, microsecond=0) day_start = now.replace(hour=0, minute=0, second=0, microsecond=0) for window_type, window_start in [('minute', minute_start), ('day', day_start)]: cur.execute(""" INSERT INTO api_rate_limit_counters (api_key_id, window_start, window_type, request_count) VALUES (%s, %s, %s, 1) ON CONFLICT (api_key_id, window_start, window_type) DO UPDATE SET request_count = api_rate_limit_counters.request_count + 1 """, (key_id, window_start, window_type)) conn.commit() cur.close() def log_api_request(conn, key_id, tenant_id, method, path, status_code, response_time_ms, ip_address, user_agent): cur = conn.cursor() cur.execute(""" INSERT INTO api_request_logs (api_key_id, tenant_id, method, path, status_code, response_time_ms, ip_address, user_agent) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """, (key_id, tenant_id, method, path, status_code, response_time_ms, ip_address, user_agent)) # Update last_used_at if key_id: cur.execute(""" UPDATE api_keys SET last_used_at = NOW() WHERE id = %s """, (key_id,)) conn.commit() cur.close() def list_api_keys(conn, tenant_id): cur = conn.cursor() cur.execute(""" SELECT id, name, key_prefix, scopes, rate_limit_rpm, rate_limit_rpd, is_active, last_used_at, expires_at, created_at FROM api_keys WHERE tenant_id = %s ORDER BY created_at DESC """, (tenant_id,)) keys = [] for r in cur.fetchall(): keys.append({ 'id': r[0], 'name': r[1], 'key_prefix': r[2], 'scopes': r[3], 'rate_limit_rpm': r[4], 'rate_limit_rpd': r[5], 'is_active': r[6], 'last_used_at': str(r[7]) if r[7] else None, 'expires_at': str(r[8]) if r[8] else None, 'created_at': str(r[9]), }) cur.close() return keys def revoke_api_key(conn, key_id): cur = conn.cursor() cur.execute("UPDATE api_keys SET is_active = false WHERE id = %s", (key_id,)) conn.commit() cur.close() return True def delete_api_key(conn, key_id): cur = conn.cursor() cur.execute("DELETE FROM api_keys WHERE id = %s", (key_id,)) conn.commit() cur.close() return True