Files
stl-repo/app/routers/models.py
Consultoria AS 0764be4945 feat: galeria muestra foto impresa + estimacion por placa en detalle
- Galeria: cards muestran imagen de referencia si existe, fallback a thumbnail 3D
- Detalle: nueva seccion 'Placas de impresion' con estimacion individual por parte
- Endpoint /estimate ahora acepta file_id para calcular por archivo
- Visualizacion de placa 220x220mm con proporcion de pieza y totales acumulados
2026-05-01 08:23:03 +00:00

751 lines
31 KiB
Python

import os
import shutil
import hashlib
import json
import zipfile
import qrcode
import io
import tempfile
import urllib.parse
import httpx
from fastapi import APIRouter, Depends, UploadFile, File, Form, HTTPException, Query, Request, Body
from fastapi.responses import FileResponse, StreamingResponse, JSONResponse
from sqlalchemy.orm import Session
from typing import Optional, List
def _cache_headers(seconds: int):
return {"Cache-Control": f"public, max-age={seconds}"}
from app.database import get_db
from app.models import Model3D, Tag, ModelFile, Rating, Comment, Collection
from app.schemas import (
Model3DResponse, Model3DUpdate, RatingResponse, CommentResponse,
CollectionResponse, CollectionCreate, CollectionDetailResponse
)
from app.parsers import parse_model_file, generate_thumbnail, generate_generic_thumbnail
router = APIRouter(prefix="/api/models", tags=["models"])
UPLOAD_DIR = "uploads"
THUMBNAIL_DIR = "thumbnails"
IMAGES_DIR = "images"
os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(THUMBNAIL_DIR, exist_ok=True)
os.makedirs(IMAGES_DIR, exist_ok=True)
def _file_hash(file_path: str) -> str:
h = hashlib.sha256()
with open(file_path, 'rb') as f:
while chunk := f.read(8192):
h.update(chunk)
return h.hexdigest()
def _get_or_create_tags(db: Session, tag_names: List[str]) -> List[Tag]:
tags = []
for name in tag_names:
name = name.strip().lower()
if not name:
continue
tag = db.query(Tag).filter(Tag.name == name).first()
if not tag:
tag = Tag(name=name)
db.add(tag)
db.flush()
tags.append(tag)
return tags
def _calc_avg_rating(model: Model3D) -> Optional[float]:
if not model.ratings:
return None
return round(sum(r.stars for r in model.ratings) / len(model.ratings), 1)
# --- Cost estimator helpers ---
def _estimate_print_time_seconds(volume_cm3: float) -> int:
# Rough estimate: ~1 hour per 10cm3 at standard speed
return int(volume_cm3 * 360)
# --- Process a single STL file into a model ---
def _process_single_file(db: Session, file_path: str, title: str, description: str,
author: str, license_str: str, tags_str: str, category: str,
part_name: Optional[str] = None, is_primary: bool = False) -> Model3D:
file_hash = _file_hash(file_path)
existing = db.query(Model3D).filter(Model3D.file_hash == file_hash).first()
if existing:
os.remove(file_path)
raise HTTPException(status_code=409,
detail=f"File already exists as '{existing.title}' (ID: {existing.id})")
try:
model_data = parse_model_file(file_path)
except Exception as e:
os.remove(file_path)
raise HTTPException(status_code=400, detail=f"Failed to parse: {str(e)}")
thumbnail_path = os.path.join(THUMBNAIL_DIR, f"{os.path.splitext(os.path.basename(file_path))[0]}.webp")
generate_thumbnail(model_data['vertices'], thumbnail_path)
db_model = Model3D(
title=title,
filename=os.path.basename(file_path),
description=description,
author=author,
license=license_str,
category=category,
file_size=os.path.getsize(file_path),
file_hash=file_hash,
width=model_data['width'],
height=model_data['height'],
depth=model_data['depth'],
faces=model_data['faces'],
thumbnail_path=thumbnail_path,
)
db.add(db_model)
db.flush()
db_file = ModelFile(
model_id=db_model.id,
filename=os.path.basename(file_path),
file_path=file_path,
file_type='stl' if file_path.lower().endswith('.stl') else '3mf',
part_name=part_name,
is_primary=is_primary,
file_size=os.path.getsize(file_path),
file_hash=file_hash,
)
db.add(db_file)
tag_names = [t.strip() for t in (tags_str or '').split(',') if t.strip()]
db_model.tags = _get_or_create_tags(db, tag_names)
db.commit()
db.refresh(db_model)
return db_model
# =============================================================================
# STATIC ROUTES (no path parameters) - MUST come before dynamic routes
# =============================================================================
@router.get("/", response_model=List[Model3DResponse])
def list_models(
search: Optional[str] = None, category: Optional[str] = None, tag: Optional[str] = None,
min_width: Optional[float] = None, max_width: Optional[float] = None,
min_height: Optional[float] = None, max_height: Optional[float] = None,
min_depth: Optional[float] = None, max_depth: Optional[float] = None,
min_faces: Optional[int] = None, max_faces: Optional[int] = None,
date_from: Optional[str] = None, date_to: Optional[str] = None,
sort_by: Optional[str] = Query("newest", enum=["newest", "oldest", "most_downloaded", "largest", "most_faces", "highest_rated"]),
skip: int = Query(0, ge=0), limit: int = Query(24, ge=1, le=100),
db: Session = Depends(get_db)
):
query = db.query(Model3D)
if search: query = query.filter(Model3D.title.contains(search))
if category: query = query.filter(Model3D.category == category)
if tag: query = query.join(Model3D.tags).filter(Tag.name.contains(tag.lower()))
if min_width: query = query.filter(Model3D.width >= min_width)
if max_width: query = query.filter(Model3D.width <= max_width)
if min_height: query = query.filter(Model3D.height >= min_height)
if max_height: query = query.filter(Model3D.height <= max_height)
if min_depth: query = query.filter(Model3D.depth >= min_depth)
if max_depth: query = query.filter(Model3D.depth <= max_depth)
if min_faces: query = query.filter(Model3D.faces >= min_faces)
if max_faces: query = query.filter(Model3D.faces <= max_faces)
if date_from: query = query.filter(Model3D.created_at >= date_from)
if date_to: query = query.filter(Model3D.created_at <= date_to)
if sort_by == "newest": query = query.order_by(Model3D.created_at.desc())
elif sort_by == "oldest": query = query.order_by(Model3D.created_at.asc())
elif sort_by == "most_downloaded": query = query.order_by(Model3D.download_count.desc())
elif sort_by == "largest": query = query.order_by(Model3D.file_size.desc())
elif sort_by == "most_faces": query = query.order_by(Model3D.faces.desc())
elif sort_by == "highest_rated":
models = query.all()
models.sort(key=lambda m: _calc_avg_rating(m) or 0, reverse=True)
return models[skip:skip+limit]
models = query.offset(skip).limit(limit).all()
return JSONResponse(content=[Model3DResponse.model_validate(m).model_dump(mode='json') for m in models], headers=_cache_headers(60))
@router.get("/tags", response_model=List[dict])
def list_tags(search: Optional[str] = None, db: Session = Depends(get_db)):
query = db.query(Tag)
if search: query = query.filter(Tag.name.contains(search.lower()))
tags = query.order_by(Tag.name).all()
data = [{"id": t.id, "name": t.name, "count": len(t.models)} for t in tags]
return JSONResponse(content=data, headers=_cache_headers(300))
@router.get("/collections/all", response_model=List[CollectionResponse])
def list_collections(db: Session = Depends(get_db)):
collections = db.query(Collection).order_by(Collection.created_at.desc()).all()
result = []
for c in collections:
resp = CollectionResponse.model_validate(c)
resp.model_count = len(c.models)
result.append(resp)
return result
@router.post("/collections", response_model=CollectionResponse)
def create_collection(data: CollectionCreate, db: Session = Depends(get_db)):
coll = Collection(name=data.name, description=data.description)
db.add(coll)
db.commit()
db.refresh(coll)
resp = CollectionResponse.model_validate(coll)
resp.model_count = 0
return resp
@router.get("/system/backup")
def backup_all(db: Session = Depends(get_db)):
from datetime import datetime
import glob
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
db_path = "stl_repo.db"
if os.path.exists(db_path): zf.write(db_path, arcname="backup/stl_repo.db")
for f in glob.glob("uploads/*"):
if os.path.isfile(f): zf.write(f, arcname=f"backup/{f}")
for f in glob.glob("thumbnails/*"):
if os.path.isfile(f): zf.write(f, arcname=f"backup/{f}")
for f in glob.glob("images/*"):
if os.path.isfile(f): zf.write(f, arcname=f"backup/{f}")
models = db.query(Model3D).all()
metadata = []
for m in models:
metadata.append({
"id": m.id, "title": m.title, "filename": m.filename,
"description": m.description, "author": m.author, "license": m.license,
"category": m.category, "tags": [t.name for t in m.tags],
"faces": m.faces, "width": m.width, "height": m.height, "depth": m.depth,
"created_at": m.created_at.isoformat() if m.created_at else None,
"download_count": m.download_count,
"files": [{"filename": f.filename, "file_type": f.file_type, "part_name": f.part_name, "is_primary": f.is_primary} for f in m.files]
})
zf.writestr("backup/metadata.json", json.dumps(metadata, indent=2, default=str))
zip_buffer.seek(0)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
return StreamingResponse(zip_buffer, media_type="application/zip",
headers={"Content-Disposition": f"attachment; filename=stl_repo_backup_{timestamp}.zip"})
# =============================================================================
# MODEL CREATION
# =============================================================================
@router.post("/", response_model=Model3DResponse)
async def create_model(
title: str = Form(...),
description: Optional[str] = Form(None),
author: Optional[str] = Form(None),
license: Optional[str] = Form(None),
tags: Optional[str] = Form(None),
category: Optional[str] = Form(None),
files: List[UploadFile] = File(default=[]),
images: List[UploadFile] = File(default=[]),
part_names: Optional[str] = Form(None),
db: Session = Depends(get_db)
):
if not files:
raise HTTPException(status_code=400, detail="At least one 3D file is required")
part_names_map = {}
if part_names:
try:
part_names_map = json.loads(part_names)
except:
pass
saved_3d_files = []
total_faces = 0
for idx, file in enumerate(files):
if not file.filename:
continue
lower_name = file.filename.lower()
if not (lower_name.endswith('.stl') or lower_name.endswith('.3mf')):
for sf in saved_3d_files:
if os.path.exists(sf['path']): os.remove(sf['path'])
raise HTTPException(status_code=400, detail=f"Unsupported format: {file.filename}")
file_path = os.path.join(UPLOAD_DIR, file.filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
file_hash = _file_hash(file_path)
existing = db.query(Model3D).filter(Model3D.file_hash == file_hash).first()
if existing:
os.remove(file_path)
for sf in saved_3d_files:
if os.path.exists(sf['path']): os.remove(sf['path'])
raise HTTPException(status_code=409,
detail=f"File '{file.filename}' already exists as '{existing.title}' (ID: {existing.id})")
try:
model_data = parse_model_file(file_path)
except Exception as e:
os.remove(file_path)
for sf in saved_3d_files:
if os.path.exists(sf['path']): os.remove(sf['path'])
raise HTTPException(status_code=400, detail=f"Failed to parse {file.filename}: {str(e)}")
saved_3d_files.append({
'path': file_path, 'filename': file.filename, 'hash': file_hash,
'size': os.path.getsize(file_path), 'data': model_data,
'is_primary': idx == 0, 'part_name': part_names_map.get(str(idx), None),
})
if idx == 0:
total_faces += model_data['faces']
if not saved_3d_files:
raise HTTPException(status_code=400, detail="No valid 3D files provided")
primary = saved_3d_files[0]
thumbnail_path = os.path.join(THUMBNAIL_DIR, f"{os.path.splitext(primary['filename'])[0]}.webp")
generate_thumbnail(primary['data']['vertices'], thumbnail_path)
db_model = Model3D(
title=title, filename=primary['filename'], description=description,
author=author, license=license, category=category,
file_size=sum(f['size'] for f in saved_3d_files),
file_hash=primary['hash'], width=primary['data']['width'],
height=primary['data']['height'], depth=primary['data']['depth'],
faces=total_faces, thumbnail_path=thumbnail_path,
)
db.add(db_model)
db.flush()
for sf in saved_3d_files:
db_file = ModelFile(
model_id=db_model.id, filename=sf['filename'], file_path=sf['path'],
file_type='stl' if sf['filename'].lower().endswith('.stl') else '3mf',
part_name=sf['part_name'], is_primary=sf['is_primary'],
file_size=sf['size'], file_hash=sf['hash'],
)
db.add(db_file)
for img in images:
if not img.filename: continue
lower_name = img.filename.lower()
if not (lower_name.endswith('.jpg') or lower_name.endswith('.jpeg') or lower_name.endswith('.png')):
continue
img_path = os.path.join(IMAGES_DIR, img.filename)
with open(img_path, "wb") as buffer:
shutil.copyfileobj(img.file, buffer)
db.add(ModelFile(
model_id=db_model.id, filename=img.filename, file_path=img_path,
file_type='image', is_primary=False, file_size=os.path.getsize(img_path),
))
tag_names = [t.strip() for t in (tags or '').split(',') if t.strip()]
db_model.tags = _get_or_create_tags(db, tag_names)
db.commit()
db.refresh(db_model)
response_data = Model3DResponse.model_validate(db_model)
response_data.avg_rating = _calc_avg_rating(db_model)
return response_data
# =============================================================================
# IMPORT / BULK / BATCH
# =============================================================================
@router.post("/import-url", response_model=Model3DResponse)
async def import_from_url(
url: str = Form(...),
title: str = Form(...),
description: Optional[str] = Form(None),
author: Optional[str] = Form(None),
license: Optional[str] = Form(None),
tags: Optional[str] = Form(None),
category: Optional[str] = Form(None),
db: Session = Depends(get_db)
):
MAX_SIZE = 50 * 1024 * 1024 # 50MB
try:
parsed = urllib.parse.urlparse(url)
if not parsed.scheme or not parsed.netloc:
raise HTTPException(status_code=400, detail="Invalid URL")
except Exception:
raise HTTPException(status_code=400, detail="Invalid URL")
try:
async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client:
r = await client.get(url)
r.raise_for_status()
if len(r.content) > MAX_SIZE:
raise HTTPException(status_code=400, detail="File too large (max 50MB)")
except httpx.RequestError as e:
raise HTTPException(status_code=400, detail=f"Download failed: {str(e)}")
filename = os.path.basename(parsed.path) or "downloaded.stl"
if not filename.lower().endswith(('.stl', '.3mf')):
filename += ".stl"
file_path = os.path.join(UPLOAD_DIR, filename)
with open(file_path, 'wb') as f:
f.write(r.content)
db_model = _process_single_file(db, file_path, title, description, author, license, tags, category, is_primary=True)
response_data = Model3DResponse.model_validate(db_model)
response_data.avg_rating = _calc_avg_rating(db_model)
return response_data
@router.post("/bulk-zip", response_model=List[Model3DResponse])
async def bulk_zip_upload(
zip_file: UploadFile = File(...),
description: Optional[str] = Form(None),
author: Optional[str] = Form(None),
license: Optional[str] = Form(None),
tags: Optional[str] = Form(None),
category: Optional[str] = Form(None),
db: Session = Depends(get_db)
):
if not zip_file.filename or not zip_file.filename.lower().endswith('.zip'):
raise HTTPException(status_code=400, detail="Only ZIP files are allowed")
tmp_path = os.path.join(tempfile.gettempdir(), zip_file.filename)
with open(tmp_path, 'wb') as f:
shutil.copyfileobj(zip_file.file, f)
extract_dir = tempfile.mkdtemp()
try:
with zipfile.ZipFile(tmp_path, 'r') as zf:
zf.extractall(extract_dir)
except zipfile.BadZipFile:
os.remove(tmp_path)
raise HTTPException(status_code=400, detail="Invalid ZIP file")
results = []
for root, dirs, files in os.walk(extract_dir):
for fname in files:
lower = fname.lower()
if lower.endswith('.stl') or lower.endswith('.3mf'):
src = os.path.join(root, fname)
dst = os.path.join(UPLOAD_DIR, fname)
# avoid collisions
if os.path.exists(dst):
base, ext = os.path.splitext(fname)
fname = f"{base}_{hashlib.md5(src.encode()).hexdigest()[:6]}{ext}"
dst = os.path.join(UPLOAD_DIR, fname)
shutil.move(src, dst)
try:
db_model = _process_single_file(
db, dst, os.path.splitext(fname)[0],
description, author, license, tags, category, is_primary=True
)
response_data = Model3DResponse.model_validate(db_model)
response_data.avg_rating = _calc_avg_rating(db_model)
results.append(response_data)
except HTTPException:
pass # skip duplicates
os.remove(tmp_path)
shutil.rmtree(extract_dir, ignore_errors=True)
return results
@router.post("/batch-download")
def batch_download(
model_ids: List[int] = Body(...),
db: Session = Depends(get_db)
):
if not model_ids:
raise HTTPException(status_code=400, detail="No model IDs provided")
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for mid in model_ids:
model = db.query(Model3D).filter(Model3D.id == mid).first()
if not model:
continue
for mf in model.files:
if mf.file_type in ('stl', '3mf') and os.path.exists(mf.file_path):
zf.write(mf.file_path, arcname=f"{model.title.replace(' ', '_')}/{mf.filename}")
zip_buffer.seek(0)
return StreamingResponse(
zip_buffer, media_type="application/zip",
headers={"Content-Disposition": "attachment; filename=batch_download.zip"}
)
# =============================================================================
# DYNAMIC MODEL ROUTES
# =============================================================================
@router.get("/{model_id}", response_model=Model3DResponse)
def get_model(model_id: int, db: Session = Depends(get_db)):
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model: raise HTTPException(status_code=404, detail="Model not found")
response_data = Model3DResponse.model_validate(model)
response_data.avg_rating = _calc_avg_rating(model)
return response_data
@router.put("/{model_id}", response_model=Model3DResponse)
def update_model(model_id: int, data: Model3DUpdate, db: Session = Depends(get_db)):
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model: raise HTTPException(status_code=404, detail="Model not found")
if data.title is not None: model.title = data.title
if data.description is not None: model.description = data.description
if data.author is not None: model.author = data.author
if data.license is not None: model.license = data.license
if data.category is not None: model.category = data.category
if data.tag_names is not None: model.tags = _get_or_create_tags(db, data.tag_names)
db.commit()
db.refresh(model)
response_data = Model3DResponse.model_validate(model)
response_data.avg_rating = _calc_avg_rating(model)
return response_data
@router.delete("/{model_id}")
def delete_model(model_id: int, db: Session = Depends(get_db)):
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model: raise HTTPException(status_code=404, detail="Model not found")
for mf in model.files:
if os.path.exists(mf.file_path): os.remove(mf.file_path)
if model.thumbnail_path and os.path.exists(model.thumbnail_path): os.remove(model.thumbnail_path)
db.delete(model)
db.commit()
return {"detail": "Model deleted"}
# --- Validation & Estimation ---
@router.get("/{model_id}/validate")
def validate_mesh(model_id: int, db: Session = Depends(get_db)):
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model:
raise HTTPException(status_code=404, detail="Model not found")
primary = next((f for f in model.files if f.is_primary and f.file_type in ('stl', '3mf')), None)
if not primary:
raise HTTPException(status_code=404, detail="No 3D file found")
try:
import trimesh
mesh = trimesh.load(primary.file_path, force='mesh')
volume = abs(float(mesh.volume)) if mesh.is_watertight else 0.0
area = float(mesh.area)
is_watertight = bool(mesh.is_watertight)
is_winding_consistent = bool(mesh.is_winding_consistent)
bounds = mesh.bounds.tolist()
euler = int(mesh.euler_number)
# Count holes via euler characteristic
holes = 0 if is_watertight else max(0, 2 - euler)
return {
"is_watertight": is_watertight,
"is_winding_consistent": is_winding_consistent,
"volume_cm3": round(volume / 1000, 3) if volume else 0,
"surface_area_cm2": round(area / 100, 2),
"bounds_mm": bounds,
"euler_number": euler,
"estimated_holes": holes,
"face_count": len(mesh.faces),
"vertex_count": len(mesh.vertices),
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Validation failed: {str(e)}")
@router.get("/{model_id}/estimate")
def estimate_print(model_id: int, file_id: Optional[int] = Query(None), price_per_kg: float = Query(20.0), material_density: float = Query(1.24), db: Session = Depends(get_db)):
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model:
raise HTTPException(status_code=404, detail="Model not found")
if file_id:
target = next((f for f in model.files if f.id == file_id and f.file_type in ('stl', '3mf')), None)
else:
target = next((f for f in model.files if f.is_primary and f.file_type in ('stl', '3mf')), None)
if not target:
raise HTTPException(status_code=404, detail="No 3D file found")
try:
import trimesh
mesh = trimesh.load(target.file_path, force='mesh')
volume_cm3 = abs(float(mesh.volume)) / 1000.0
grams = volume_cm3 * material_density
cost = (grams / 1000.0) * price_per_kg
seconds = _estimate_print_time_seconds(volume_cm3)
hours = seconds // 3600
mins = (seconds % 3600) // 60
bounds = mesh.bounds
dims = bounds[1] - bounds[0]
return {
"file_id": target.id,
"part_name": target.part_name or target.filename,
"volume_cm3": round(volume_cm3, 2),
"grams": round(grams, 2),
"cost": round(cost, 2),
"estimated_time": f"{hours}h {mins}m",
"estimated_seconds": seconds,
"price_per_kg": price_per_kg,
"material_density": material_density,
"width_mm": round(float(dims[0]), 1),
"depth_mm": round(float(dims[1]), 1),
"height_mm": round(float(dims[2]), 1),
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Estimation failed: {str(e)}")
# --- Ratings ---
@router.post("/{model_id}/ratings", response_model=RatingResponse)
def create_rating(model_id: int, stars: int = Query(..., ge=1, le=5), db: Session = Depends(get_db)):
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model: raise HTTPException(status_code=404, detail="Model not found")
rating = Rating(model_id=model_id, stars=stars)
db.add(rating)
db.commit()
db.refresh(rating)
return rating
@router.get("/{model_id}/ratings", response_model=List[RatingResponse])
def list_ratings(model_id: int, db: Session = Depends(get_db)):
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model: raise HTTPException(status_code=404, detail="Model not found")
return db.query(Rating).filter(Rating.model_id == model_id).order_by(Rating.created_at.desc()).all()
# --- Comments ---
@router.post("/{model_id}/comments", response_model=CommentResponse)
def create_comment(model_id: int, text: str = Query(..., min_length=1), author_name: Optional[str] = Query(None), db: Session = Depends(get_db)):
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model: raise HTTPException(status_code=404, detail="Model not found")
comment = Comment(model_id=model_id, text=text, author_name=author_name)
db.add(comment)
db.commit()
db.refresh(comment)
return comment
@router.get("/{model_id}/comments", response_model=List[CommentResponse])
def list_comments(model_id: int, db: Session = Depends(get_db)):
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model: raise HTTPException(status_code=404, detail="Model not found")
return db.query(Comment).filter(Comment.model_id == model_id).order_by(Comment.created_at.desc()).all()
# --- Collections (dynamic) ---
@router.get("/collections/{collection_id}", response_model=CollectionDetailResponse)
def get_collection(collection_id: int, db: Session = Depends(get_db)):
coll = db.query(Collection).filter(Collection.id == collection_id).first()
if not coll: raise HTTPException(status_code=404, detail="Collection not found")
resp = CollectionDetailResponse.model_validate(coll)
resp.model_count = len(coll.models)
return resp
@router.post("/collections/{collection_id}/add/{model_id}")
def add_to_collection(collection_id: int, model_id: int, db: Session = Depends(get_db)):
coll = db.query(Collection).filter(Collection.id == collection_id).first()
if not coll: raise HTTPException(status_code=404, detail="Collection not found")
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model: raise HTTPException(status_code=404, detail="Model not found")
if model not in coll.models:
coll.models.append(model)
db.commit()
return {"detail": "Model added to collection"}
@router.delete("/collections/{collection_id}/remove/{model_id}")
def remove_from_collection(collection_id: int, model_id: int, db: Session = Depends(get_db)):
coll = db.query(Collection).filter(Collection.id == collection_id).first()
if not coll: raise HTTPException(status_code=404, detail="Collection not found")
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model: raise HTTPException(status_code=404, detail="Model not found")
if model in coll.models:
coll.models.remove(model)
db.commit()
return {"detail": "Model removed from collection"}
@router.delete("/collections/{collection_id}")
def delete_collection(collection_id: int, db: Session = Depends(get_db)):
coll = db.query(Collection).filter(Collection.id == collection_id).first()
if not coll: raise HTTPException(status_code=404, detail="Collection not found")
db.delete(coll)
db.commit()
return {"detail": "Collection deleted"}
# --- Downloads & QR ---
@router.get("/{model_id}/download")
def download_model(model_id: int, file_id: Optional[int] = None, db: Session = Depends(get_db)):
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model: raise HTTPException(status_code=404, detail="Model not found")
if file_id:
mf = db.query(ModelFile).filter(ModelFile.id == file_id, ModelFile.model_id == model_id).first()
if not mf: raise HTTPException(status_code=404, detail="File not found")
if not os.path.exists(mf.file_path): raise HTTPException(status_code=404, detail="File not found on disk")
model.download_count += 1; db.commit()
return FileResponse(path=mf.file_path, filename=mf.filename, media_type="application/octet-stream")
primary = next((f for f in model.files if f.is_primary and f.file_type in ('stl', '3mf')), None)
file_path = primary.file_path if primary else os.path.join(UPLOAD_DIR, model.filename)
if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="File not found on disk")
model.download_count += 1; db.commit()
return FileResponse(path=file_path, filename=model.filename, media_type="application/octet-stream")
@router.get("/{model_id}/download-all")
def download_all(model_id: int, db: Session = Depends(get_db)):
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model: raise HTTPException(status_code=404, detail="Model not found")
model_files = [f for f in model.files if f.file_type in ('stl', '3mf')]
if not model_files: raise HTTPException(status_code=404, detail="No 3D files found")
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for mf in model_files:
if os.path.exists(mf.file_path): zf.write(mf.file_path, arcname=mf.filename)
zip_buffer.seek(0)
model.download_count += 1; db.commit()
return StreamingResponse(zip_buffer, media_type="application/zip",
headers={"Content-Disposition": f"attachment; filename={model.title.replace(' ', '_')}_all_parts.zip"})
@router.get("/{model_id}/thumbnail")
def get_thumbnail(model_id: int, db: Session = Depends(get_db)):
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model: raise HTTPException(status_code=404, detail="Model not found")
# Try WEBP first, fallback to PNG
if model.thumbnail_path:
if os.path.exists(model.thumbnail_path):
media_type = "image/webp" if model.thumbnail_path.lower().endswith('.webp') else "image/png"
return FileResponse(path=model.thumbnail_path, media_type=media_type, headers=_cache_headers(86400))
# Fallback to PNG with same base name
png_path = os.path.splitext(model.thumbnail_path)[0] + '.png'
if os.path.exists(png_path):
return FileResponse(path=png_path, media_type="image/png", headers=_cache_headers(86400))
raise HTTPException(status_code=404, detail="Thumbnail not found")
@router.get("/{model_id}/qr")
def get_qr(model_id: int, db: Session = Depends(get_db)):
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model: raise HTTPException(status_code=404, detail="Model not found")
url = f"https://3d.consultoria-as.com/model/{model_id}"
qr = qrcode.QRCode(version=1, box_size=10, border=2)
qr.add_data(url); qr.make(fit=True)
img = qr.make_image(fill_color="#0f172a", back_color="#ffffff")
buf = io.BytesIO(); img.save(buf, format='PNG'); buf.seek(0)
return StreamingResponse(buf, media_type="image/png")