- Nombre de app: PrintForge - Logo generado: icono + version completa - Integracion en navbar y favicon de todas las paginas - QR code apunta a https://3d.consultoria-as.com - README actualizado con URL de produccion
740 lines
31 KiB
Python
740 lines
31 KiB
Python
import os
|
|
import shutil
|
|
import hashlib
|
|
import json
|
|
import zipfile
|
|
import qrcode
|
|
import io
|
|
import tempfile
|
|
import urllib.parse
|
|
import httpx
|
|
from fastapi import APIRouter, Depends, UploadFile, File, Form, HTTPException, Query, Request, Body
|
|
from fastapi.responses import FileResponse, StreamingResponse, JSONResponse
|
|
from sqlalchemy.orm import Session
|
|
from typing import Optional, List
|
|
|
|
def _cache_headers(seconds: int):
|
|
return {"Cache-Control": f"public, max-age={seconds}"}
|
|
|
|
from app.database import get_db
|
|
from app.models import Model3D, Tag, ModelFile, Rating, Comment, Collection
|
|
from app.schemas import (
|
|
Model3DResponse, Model3DUpdate, RatingResponse, CommentResponse,
|
|
CollectionResponse, CollectionCreate, CollectionDetailResponse
|
|
)
|
|
from app.parsers import parse_model_file, generate_thumbnail, generate_generic_thumbnail
|
|
|
|
router = APIRouter(prefix="/api/models", tags=["models"])
|
|
|
|
UPLOAD_DIR = "uploads"
|
|
THUMBNAIL_DIR = "thumbnails"
|
|
IMAGES_DIR = "images"
|
|
os.makedirs(UPLOAD_DIR, exist_ok=True)
|
|
os.makedirs(THUMBNAIL_DIR, exist_ok=True)
|
|
os.makedirs(IMAGES_DIR, exist_ok=True)
|
|
|
|
|
|
def _file_hash(file_path: str) -> str:
|
|
h = hashlib.sha256()
|
|
with open(file_path, 'rb') as f:
|
|
while chunk := f.read(8192):
|
|
h.update(chunk)
|
|
return h.hexdigest()
|
|
|
|
|
|
def _get_or_create_tags(db: Session, tag_names: List[str]) -> List[Tag]:
|
|
tags = []
|
|
for name in tag_names:
|
|
name = name.strip().lower()
|
|
if not name:
|
|
continue
|
|
tag = db.query(Tag).filter(Tag.name == name).first()
|
|
if not tag:
|
|
tag = Tag(name=name)
|
|
db.add(tag)
|
|
db.flush()
|
|
tags.append(tag)
|
|
return tags
|
|
|
|
|
|
def _calc_avg_rating(model: Model3D) -> Optional[float]:
|
|
if not model.ratings:
|
|
return None
|
|
return round(sum(r.stars for r in model.ratings) / len(model.ratings), 1)
|
|
|
|
|
|
# --- Cost estimator helpers ---
|
|
def _estimate_print_time_seconds(volume_cm3: float) -> int:
|
|
# Rough estimate: ~1 hour per 10cm3 at standard speed
|
|
return int(volume_cm3 * 360)
|
|
|
|
|
|
# --- Process a single STL file into a model ---
|
|
def _process_single_file(db: Session, file_path: str, title: str, description: str,
|
|
author: str, license_str: str, tags_str: str, category: str,
|
|
part_name: Optional[str] = None, is_primary: bool = False) -> Model3D:
|
|
file_hash = _file_hash(file_path)
|
|
existing = db.query(Model3D).filter(Model3D.file_hash == file_hash).first()
|
|
if existing:
|
|
os.remove(file_path)
|
|
raise HTTPException(status_code=409,
|
|
detail=f"File already exists as '{existing.title}' (ID: {existing.id})")
|
|
|
|
try:
|
|
model_data = parse_model_file(file_path)
|
|
except Exception as e:
|
|
os.remove(file_path)
|
|
raise HTTPException(status_code=400, detail=f"Failed to parse: {str(e)}")
|
|
|
|
thumbnail_path = os.path.join(THUMBNAIL_DIR, f"{os.path.splitext(os.path.basename(file_path))[0]}.webp")
|
|
generate_thumbnail(model_data['vertices'], thumbnail_path)
|
|
|
|
db_model = Model3D(
|
|
title=title,
|
|
filename=os.path.basename(file_path),
|
|
description=description,
|
|
author=author,
|
|
license=license_str,
|
|
category=category,
|
|
file_size=os.path.getsize(file_path),
|
|
file_hash=file_hash,
|
|
width=model_data['width'],
|
|
height=model_data['height'],
|
|
depth=model_data['depth'],
|
|
faces=model_data['faces'],
|
|
thumbnail_path=thumbnail_path,
|
|
)
|
|
db.add(db_model)
|
|
db.flush()
|
|
|
|
db_file = ModelFile(
|
|
model_id=db_model.id,
|
|
filename=os.path.basename(file_path),
|
|
file_path=file_path,
|
|
file_type='stl' if file_path.lower().endswith('.stl') else '3mf',
|
|
part_name=part_name,
|
|
is_primary=is_primary,
|
|
file_size=os.path.getsize(file_path),
|
|
file_hash=file_hash,
|
|
)
|
|
db.add(db_file)
|
|
|
|
tag_names = [t.strip() for t in (tags_str or '').split(',') if t.strip()]
|
|
db_model.tags = _get_or_create_tags(db, tag_names)
|
|
db.commit()
|
|
db.refresh(db_model)
|
|
return db_model
|
|
|
|
|
|
# =============================================================================
|
|
# STATIC ROUTES (no path parameters) - MUST come before dynamic routes
|
|
# =============================================================================
|
|
|
|
@router.get("/", response_model=List[Model3DResponse])
|
|
def list_models(
|
|
search: Optional[str] = None, category: Optional[str] = None, tag: Optional[str] = None,
|
|
min_width: Optional[float] = None, max_width: Optional[float] = None,
|
|
min_height: Optional[float] = None, max_height: Optional[float] = None,
|
|
min_depth: Optional[float] = None, max_depth: Optional[float] = None,
|
|
min_faces: Optional[int] = None, max_faces: Optional[int] = None,
|
|
date_from: Optional[str] = None, date_to: Optional[str] = None,
|
|
sort_by: Optional[str] = Query("newest", enum=["newest", "oldest", "most_downloaded", "largest", "most_faces", "highest_rated"]),
|
|
skip: int = Query(0, ge=0), limit: int = Query(24, ge=1, le=100),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
query = db.query(Model3D)
|
|
if search: query = query.filter(Model3D.title.contains(search))
|
|
if category: query = query.filter(Model3D.category == category)
|
|
if tag: query = query.join(Model3D.tags).filter(Tag.name.contains(tag.lower()))
|
|
if min_width: query = query.filter(Model3D.width >= min_width)
|
|
if max_width: query = query.filter(Model3D.width <= max_width)
|
|
if min_height: query = query.filter(Model3D.height >= min_height)
|
|
if max_height: query = query.filter(Model3D.height <= max_height)
|
|
if min_depth: query = query.filter(Model3D.depth >= min_depth)
|
|
if max_depth: query = query.filter(Model3D.depth <= max_depth)
|
|
if min_faces: query = query.filter(Model3D.faces >= min_faces)
|
|
if max_faces: query = query.filter(Model3D.faces <= max_faces)
|
|
if date_from: query = query.filter(Model3D.created_at >= date_from)
|
|
if date_to: query = query.filter(Model3D.created_at <= date_to)
|
|
|
|
if sort_by == "newest": query = query.order_by(Model3D.created_at.desc())
|
|
elif sort_by == "oldest": query = query.order_by(Model3D.created_at.asc())
|
|
elif sort_by == "most_downloaded": query = query.order_by(Model3D.download_count.desc())
|
|
elif sort_by == "largest": query = query.order_by(Model3D.file_size.desc())
|
|
elif sort_by == "most_faces": query = query.order_by(Model3D.faces.desc())
|
|
elif sort_by == "highest_rated":
|
|
models = query.all()
|
|
models.sort(key=lambda m: _calc_avg_rating(m) or 0, reverse=True)
|
|
return models[skip:skip+limit]
|
|
|
|
models = query.offset(skip).limit(limit).all()
|
|
return JSONResponse(content=[Model3DResponse.model_validate(m).model_dump(mode='json') for m in models], headers=_cache_headers(60))
|
|
|
|
|
|
@router.get("/tags", response_model=List[dict])
|
|
def list_tags(search: Optional[str] = None, db: Session = Depends(get_db)):
|
|
query = db.query(Tag)
|
|
if search: query = query.filter(Tag.name.contains(search.lower()))
|
|
tags = query.order_by(Tag.name).all()
|
|
data = [{"id": t.id, "name": t.name, "count": len(t.models)} for t in tags]
|
|
return JSONResponse(content=data, headers=_cache_headers(300))
|
|
|
|
|
|
@router.get("/collections/all", response_model=List[CollectionResponse])
|
|
def list_collections(db: Session = Depends(get_db)):
|
|
collections = db.query(Collection).order_by(Collection.created_at.desc()).all()
|
|
result = []
|
|
for c in collections:
|
|
resp = CollectionResponse.model_validate(c)
|
|
resp.model_count = len(c.models)
|
|
result.append(resp)
|
|
return result
|
|
|
|
|
|
@router.post("/collections", response_model=CollectionResponse)
|
|
def create_collection(data: CollectionCreate, db: Session = Depends(get_db)):
|
|
coll = Collection(name=data.name, description=data.description)
|
|
db.add(coll)
|
|
db.commit()
|
|
db.refresh(coll)
|
|
resp = CollectionResponse.model_validate(coll)
|
|
resp.model_count = 0
|
|
return resp
|
|
|
|
|
|
@router.get("/system/backup")
|
|
def backup_all(db: Session = Depends(get_db)):
|
|
from datetime import datetime
|
|
import glob
|
|
zip_buffer = io.BytesIO()
|
|
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
|
|
db_path = "stl_repo.db"
|
|
if os.path.exists(db_path): zf.write(db_path, arcname="backup/stl_repo.db")
|
|
for f in glob.glob("uploads/*"):
|
|
if os.path.isfile(f): zf.write(f, arcname=f"backup/{f}")
|
|
for f in glob.glob("thumbnails/*"):
|
|
if os.path.isfile(f): zf.write(f, arcname=f"backup/{f}")
|
|
for f in glob.glob("images/*"):
|
|
if os.path.isfile(f): zf.write(f, arcname=f"backup/{f}")
|
|
models = db.query(Model3D).all()
|
|
metadata = []
|
|
for m in models:
|
|
metadata.append({
|
|
"id": m.id, "title": m.title, "filename": m.filename,
|
|
"description": m.description, "author": m.author, "license": m.license,
|
|
"category": m.category, "tags": [t.name for t in m.tags],
|
|
"faces": m.faces, "width": m.width, "height": m.height, "depth": m.depth,
|
|
"created_at": m.created_at.isoformat() if m.created_at else None,
|
|
"download_count": m.download_count,
|
|
"files": [{"filename": f.filename, "file_type": f.file_type, "part_name": f.part_name, "is_primary": f.is_primary} for f in m.files]
|
|
})
|
|
zf.writestr("backup/metadata.json", json.dumps(metadata, indent=2, default=str))
|
|
zip_buffer.seek(0)
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
return StreamingResponse(zip_buffer, media_type="application/zip",
|
|
headers={"Content-Disposition": f"attachment; filename=stl_repo_backup_{timestamp}.zip"})
|
|
|
|
|
|
# =============================================================================
|
|
# MODEL CREATION
|
|
# =============================================================================
|
|
|
|
@router.post("/", response_model=Model3DResponse)
|
|
async def create_model(
|
|
title: str = Form(...),
|
|
description: Optional[str] = Form(None),
|
|
author: Optional[str] = Form(None),
|
|
license: Optional[str] = Form(None),
|
|
tags: Optional[str] = Form(None),
|
|
category: Optional[str] = Form(None),
|
|
files: List[UploadFile] = File(default=[]),
|
|
images: List[UploadFile] = File(default=[]),
|
|
part_names: Optional[str] = Form(None),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
if not files:
|
|
raise HTTPException(status_code=400, detail="At least one 3D file is required")
|
|
|
|
part_names_map = {}
|
|
if part_names:
|
|
try:
|
|
part_names_map = json.loads(part_names)
|
|
except:
|
|
pass
|
|
|
|
saved_3d_files = []
|
|
total_faces = 0
|
|
|
|
for idx, file in enumerate(files):
|
|
if not file.filename:
|
|
continue
|
|
lower_name = file.filename.lower()
|
|
if not (lower_name.endswith('.stl') or lower_name.endswith('.3mf')):
|
|
for sf in saved_3d_files:
|
|
if os.path.exists(sf['path']): os.remove(sf['path'])
|
|
raise HTTPException(status_code=400, detail=f"Unsupported format: {file.filename}")
|
|
|
|
file_path = os.path.join(UPLOAD_DIR, file.filename)
|
|
with open(file_path, "wb") as buffer:
|
|
shutil.copyfileobj(file.file, buffer)
|
|
|
|
file_hash = _file_hash(file_path)
|
|
existing = db.query(Model3D).filter(Model3D.file_hash == file_hash).first()
|
|
if existing:
|
|
os.remove(file_path)
|
|
for sf in saved_3d_files:
|
|
if os.path.exists(sf['path']): os.remove(sf['path'])
|
|
raise HTTPException(status_code=409,
|
|
detail=f"File '{file.filename}' already exists as '{existing.title}' (ID: {existing.id})")
|
|
|
|
try:
|
|
model_data = parse_model_file(file_path)
|
|
except Exception as e:
|
|
os.remove(file_path)
|
|
for sf in saved_3d_files:
|
|
if os.path.exists(sf['path']): os.remove(sf['path'])
|
|
raise HTTPException(status_code=400, detail=f"Failed to parse {file.filename}: {str(e)}")
|
|
|
|
saved_3d_files.append({
|
|
'path': file_path, 'filename': file.filename, 'hash': file_hash,
|
|
'size': os.path.getsize(file_path), 'data': model_data,
|
|
'is_primary': idx == 0, 'part_name': part_names_map.get(str(idx), None),
|
|
})
|
|
if idx == 0:
|
|
total_faces += model_data['faces']
|
|
|
|
if not saved_3d_files:
|
|
raise HTTPException(status_code=400, detail="No valid 3D files provided")
|
|
|
|
primary = saved_3d_files[0]
|
|
thumbnail_path = os.path.join(THUMBNAIL_DIR, f"{os.path.splitext(primary['filename'])[0]}.webp")
|
|
generate_thumbnail(primary['data']['vertices'], thumbnail_path)
|
|
|
|
db_model = Model3D(
|
|
title=title, filename=primary['filename'], description=description,
|
|
author=author, license=license, category=category,
|
|
file_size=sum(f['size'] for f in saved_3d_files),
|
|
file_hash=primary['hash'], width=primary['data']['width'],
|
|
height=primary['data']['height'], depth=primary['data']['depth'],
|
|
faces=total_faces, thumbnail_path=thumbnail_path,
|
|
)
|
|
db.add(db_model)
|
|
db.flush()
|
|
|
|
for sf in saved_3d_files:
|
|
db_file = ModelFile(
|
|
model_id=db_model.id, filename=sf['filename'], file_path=sf['path'],
|
|
file_type='stl' if sf['filename'].lower().endswith('.stl') else '3mf',
|
|
part_name=sf['part_name'], is_primary=sf['is_primary'],
|
|
file_size=sf['size'], file_hash=sf['hash'],
|
|
)
|
|
db.add(db_file)
|
|
|
|
for img in images:
|
|
if not img.filename: continue
|
|
lower_name = img.filename.lower()
|
|
if not (lower_name.endswith('.jpg') or lower_name.endswith('.jpeg') or lower_name.endswith('.png')):
|
|
continue
|
|
img_path = os.path.join(IMAGES_DIR, img.filename)
|
|
with open(img_path, "wb") as buffer:
|
|
shutil.copyfileobj(img.file, buffer)
|
|
db.add(ModelFile(
|
|
model_id=db_model.id, filename=img.filename, file_path=img_path,
|
|
file_type='image', is_primary=False, file_size=os.path.getsize(img_path),
|
|
))
|
|
|
|
tag_names = [t.strip() for t in (tags or '').split(',') if t.strip()]
|
|
db_model.tags = _get_or_create_tags(db, tag_names)
|
|
db.commit()
|
|
db.refresh(db_model)
|
|
response_data = Model3DResponse.model_validate(db_model)
|
|
response_data.avg_rating = _calc_avg_rating(db_model)
|
|
return response_data
|
|
|
|
|
|
# =============================================================================
|
|
# IMPORT / BULK / BATCH
|
|
# =============================================================================
|
|
|
|
@router.post("/import-url", response_model=Model3DResponse)
|
|
async def import_from_url(
|
|
url: str = Form(...),
|
|
title: str = Form(...),
|
|
description: Optional[str] = Form(None),
|
|
author: Optional[str] = Form(None),
|
|
license: Optional[str] = Form(None),
|
|
tags: Optional[str] = Form(None),
|
|
category: Optional[str] = Form(None),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
MAX_SIZE = 50 * 1024 * 1024 # 50MB
|
|
try:
|
|
parsed = urllib.parse.urlparse(url)
|
|
if not parsed.scheme or not parsed.netloc:
|
|
raise HTTPException(status_code=400, detail="Invalid URL")
|
|
except Exception:
|
|
raise HTTPException(status_code=400, detail="Invalid URL")
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client:
|
|
r = await client.get(url)
|
|
r.raise_for_status()
|
|
if len(r.content) > MAX_SIZE:
|
|
raise HTTPException(status_code=400, detail="File too large (max 50MB)")
|
|
except httpx.RequestError as e:
|
|
raise HTTPException(status_code=400, detail=f"Download failed: {str(e)}")
|
|
|
|
filename = os.path.basename(parsed.path) or "downloaded.stl"
|
|
if not filename.lower().endswith(('.stl', '.3mf')):
|
|
filename += ".stl"
|
|
|
|
file_path = os.path.join(UPLOAD_DIR, filename)
|
|
with open(file_path, 'wb') as f:
|
|
f.write(r.content)
|
|
|
|
db_model = _process_single_file(db, file_path, title, description, author, license, tags, category, is_primary=True)
|
|
response_data = Model3DResponse.model_validate(db_model)
|
|
response_data.avg_rating = _calc_avg_rating(db_model)
|
|
return response_data
|
|
|
|
|
|
@router.post("/bulk-zip", response_model=List[Model3DResponse])
|
|
async def bulk_zip_upload(
|
|
zip_file: UploadFile = File(...),
|
|
description: Optional[str] = Form(None),
|
|
author: Optional[str] = Form(None),
|
|
license: Optional[str] = Form(None),
|
|
tags: Optional[str] = Form(None),
|
|
category: Optional[str] = Form(None),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
if not zip_file.filename or not zip_file.filename.lower().endswith('.zip'):
|
|
raise HTTPException(status_code=400, detail="Only ZIP files are allowed")
|
|
|
|
tmp_path = os.path.join(tempfile.gettempdir(), zip_file.filename)
|
|
with open(tmp_path, 'wb') as f:
|
|
shutil.copyfileobj(zip_file.file, f)
|
|
|
|
extract_dir = tempfile.mkdtemp()
|
|
try:
|
|
with zipfile.ZipFile(tmp_path, 'r') as zf:
|
|
zf.extractall(extract_dir)
|
|
except zipfile.BadZipFile:
|
|
os.remove(tmp_path)
|
|
raise HTTPException(status_code=400, detail="Invalid ZIP file")
|
|
|
|
results = []
|
|
for root, dirs, files in os.walk(extract_dir):
|
|
for fname in files:
|
|
lower = fname.lower()
|
|
if lower.endswith('.stl') or lower.endswith('.3mf'):
|
|
src = os.path.join(root, fname)
|
|
dst = os.path.join(UPLOAD_DIR, fname)
|
|
# avoid collisions
|
|
if os.path.exists(dst):
|
|
base, ext = os.path.splitext(fname)
|
|
fname = f"{base}_{hashlib.md5(src.encode()).hexdigest()[:6]}{ext}"
|
|
dst = os.path.join(UPLOAD_DIR, fname)
|
|
shutil.move(src, dst)
|
|
try:
|
|
db_model = _process_single_file(
|
|
db, dst, os.path.splitext(fname)[0],
|
|
description, author, license, tags, category, is_primary=True
|
|
)
|
|
response_data = Model3DResponse.model_validate(db_model)
|
|
response_data.avg_rating = _calc_avg_rating(db_model)
|
|
results.append(response_data)
|
|
except HTTPException:
|
|
pass # skip duplicates
|
|
|
|
os.remove(tmp_path)
|
|
shutil.rmtree(extract_dir, ignore_errors=True)
|
|
return results
|
|
|
|
|
|
@router.post("/batch-download")
|
|
def batch_download(
|
|
model_ids: List[int] = Body(...),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
if not model_ids:
|
|
raise HTTPException(status_code=400, detail="No model IDs provided")
|
|
|
|
zip_buffer = io.BytesIO()
|
|
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
|
|
for mid in model_ids:
|
|
model = db.query(Model3D).filter(Model3D.id == mid).first()
|
|
if not model:
|
|
continue
|
|
for mf in model.files:
|
|
if mf.file_type in ('stl', '3mf') and os.path.exists(mf.file_path):
|
|
zf.write(mf.file_path, arcname=f"{model.title.replace(' ', '_')}/{mf.filename}")
|
|
|
|
zip_buffer.seek(0)
|
|
return StreamingResponse(
|
|
zip_buffer, media_type="application/zip",
|
|
headers={"Content-Disposition": "attachment; filename=batch_download.zip"}
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# DYNAMIC MODEL ROUTES
|
|
# =============================================================================
|
|
|
|
@router.get("/{model_id}", response_model=Model3DResponse)
|
|
def get_model(model_id: int, db: Session = Depends(get_db)):
|
|
model = db.query(Model3D).filter(Model3D.id == model_id).first()
|
|
if not model: raise HTTPException(status_code=404, detail="Model not found")
|
|
response_data = Model3DResponse.model_validate(model)
|
|
response_data.avg_rating = _calc_avg_rating(model)
|
|
return response_data
|
|
|
|
|
|
@router.put("/{model_id}", response_model=Model3DResponse)
|
|
def update_model(model_id: int, data: Model3DUpdate, db: Session = Depends(get_db)):
|
|
model = db.query(Model3D).filter(Model3D.id == model_id).first()
|
|
if not model: raise HTTPException(status_code=404, detail="Model not found")
|
|
if data.title is not None: model.title = data.title
|
|
if data.description is not None: model.description = data.description
|
|
if data.author is not None: model.author = data.author
|
|
if data.license is not None: model.license = data.license
|
|
if data.category is not None: model.category = data.category
|
|
if data.tag_names is not None: model.tags = _get_or_create_tags(db, data.tag_names)
|
|
db.commit()
|
|
db.refresh(model)
|
|
response_data = Model3DResponse.model_validate(model)
|
|
response_data.avg_rating = _calc_avg_rating(model)
|
|
return response_data
|
|
|
|
|
|
@router.delete("/{model_id}")
|
|
def delete_model(model_id: int, db: Session = Depends(get_db)):
|
|
model = db.query(Model3D).filter(Model3D.id == model_id).first()
|
|
if not model: raise HTTPException(status_code=404, detail="Model not found")
|
|
for mf in model.files:
|
|
if os.path.exists(mf.file_path): os.remove(mf.file_path)
|
|
if model.thumbnail_path and os.path.exists(model.thumbnail_path): os.remove(model.thumbnail_path)
|
|
db.delete(model)
|
|
db.commit()
|
|
return {"detail": "Model deleted"}
|
|
|
|
|
|
# --- Validation & Estimation ---
|
|
|
|
@router.get("/{model_id}/validate")
|
|
def validate_mesh(model_id: int, db: Session = Depends(get_db)):
|
|
model = db.query(Model3D).filter(Model3D.id == model_id).first()
|
|
if not model:
|
|
raise HTTPException(status_code=404, detail="Model not found")
|
|
|
|
primary = next((f for f in model.files if f.is_primary and f.file_type in ('stl', '3mf')), None)
|
|
if not primary:
|
|
raise HTTPException(status_code=404, detail="No 3D file found")
|
|
|
|
try:
|
|
import trimesh
|
|
mesh = trimesh.load(primary.file_path, force='mesh')
|
|
volume = abs(float(mesh.volume)) if mesh.is_watertight else 0.0
|
|
area = float(mesh.area)
|
|
is_watertight = bool(mesh.is_watertight)
|
|
is_winding_consistent = bool(mesh.is_winding_consistent)
|
|
bounds = mesh.bounds.tolist()
|
|
euler = int(mesh.euler_number)
|
|
|
|
# Count holes via euler characteristic
|
|
holes = 0 if is_watertight else max(0, 2 - euler)
|
|
|
|
return {
|
|
"is_watertight": is_watertight,
|
|
"is_winding_consistent": is_winding_consistent,
|
|
"volume_cm3": round(volume / 1000, 3) if volume else 0,
|
|
"surface_area_cm2": round(area / 100, 2),
|
|
"bounds_mm": bounds,
|
|
"euler_number": euler,
|
|
"estimated_holes": holes,
|
|
"face_count": len(mesh.faces),
|
|
"vertex_count": len(mesh.vertices),
|
|
}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Validation failed: {str(e)}")
|
|
|
|
|
|
@router.get("/{model_id}/estimate")
|
|
def estimate_print(model_id: int, price_per_kg: float = Query(20.0), material_density: float = Query(1.24), db: Session = Depends(get_db)):
|
|
model = db.query(Model3D).filter(Model3D.id == model_id).first()
|
|
if not model:
|
|
raise HTTPException(status_code=404, detail="Model not found")
|
|
|
|
primary = next((f for f in model.files if f.is_primary and f.file_type in ('stl', '3mf')), None)
|
|
if not primary:
|
|
raise HTTPException(status_code=404, detail="No 3D file found")
|
|
|
|
try:
|
|
import trimesh
|
|
mesh = trimesh.load(primary.file_path, force='mesh')
|
|
volume_cm3 = abs(float(mesh.volume)) / 1000.0
|
|
grams = volume_cm3 * material_density
|
|
cost = (grams / 1000.0) * price_per_kg
|
|
seconds = _estimate_print_time_seconds(volume_cm3)
|
|
hours = seconds // 3600
|
|
mins = (seconds % 3600) // 60
|
|
|
|
return {
|
|
"volume_cm3": round(volume_cm3, 2),
|
|
"grams": round(grams, 2),
|
|
"cost": round(cost, 2),
|
|
"estimated_time": f"{hours}h {mins}m",
|
|
"estimated_seconds": seconds,
|
|
"price_per_kg": price_per_kg,
|
|
"material_density": material_density,
|
|
}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Estimation failed: {str(e)}")
|
|
|
|
|
|
# --- Ratings ---
|
|
|
|
@router.post("/{model_id}/ratings", response_model=RatingResponse)
|
|
def create_rating(model_id: int, stars: int = Query(..., ge=1, le=5), db: Session = Depends(get_db)):
|
|
model = db.query(Model3D).filter(Model3D.id == model_id).first()
|
|
if not model: raise HTTPException(status_code=404, detail="Model not found")
|
|
rating = Rating(model_id=model_id, stars=stars)
|
|
db.add(rating)
|
|
db.commit()
|
|
db.refresh(rating)
|
|
return rating
|
|
|
|
|
|
@router.get("/{model_id}/ratings", response_model=List[RatingResponse])
|
|
def list_ratings(model_id: int, db: Session = Depends(get_db)):
|
|
model = db.query(Model3D).filter(Model3D.id == model_id).first()
|
|
if not model: raise HTTPException(status_code=404, detail="Model not found")
|
|
return db.query(Rating).filter(Rating.model_id == model_id).order_by(Rating.created_at.desc()).all()
|
|
|
|
|
|
# --- Comments ---
|
|
|
|
@router.post("/{model_id}/comments", response_model=CommentResponse)
|
|
def create_comment(model_id: int, text: str = Query(..., min_length=1), author_name: Optional[str] = Query(None), db: Session = Depends(get_db)):
|
|
model = db.query(Model3D).filter(Model3D.id == model_id).first()
|
|
if not model: raise HTTPException(status_code=404, detail="Model not found")
|
|
comment = Comment(model_id=model_id, text=text, author_name=author_name)
|
|
db.add(comment)
|
|
db.commit()
|
|
db.refresh(comment)
|
|
return comment
|
|
|
|
|
|
@router.get("/{model_id}/comments", response_model=List[CommentResponse])
|
|
def list_comments(model_id: int, db: Session = Depends(get_db)):
|
|
model = db.query(Model3D).filter(Model3D.id == model_id).first()
|
|
if not model: raise HTTPException(status_code=404, detail="Model not found")
|
|
return db.query(Comment).filter(Comment.model_id == model_id).order_by(Comment.created_at.desc()).all()
|
|
|
|
|
|
# --- Collections (dynamic) ---
|
|
|
|
@router.get("/collections/{collection_id}", response_model=CollectionDetailResponse)
|
|
def get_collection(collection_id: int, db: Session = Depends(get_db)):
|
|
coll = db.query(Collection).filter(Collection.id == collection_id).first()
|
|
if not coll: raise HTTPException(status_code=404, detail="Collection not found")
|
|
resp = CollectionDetailResponse.model_validate(coll)
|
|
resp.model_count = len(coll.models)
|
|
return resp
|
|
|
|
|
|
@router.post("/collections/{collection_id}/add/{model_id}")
|
|
def add_to_collection(collection_id: int, model_id: int, db: Session = Depends(get_db)):
|
|
coll = db.query(Collection).filter(Collection.id == collection_id).first()
|
|
if not coll: raise HTTPException(status_code=404, detail="Collection not found")
|
|
model = db.query(Model3D).filter(Model3D.id == model_id).first()
|
|
if not model: raise HTTPException(status_code=404, detail="Model not found")
|
|
if model not in coll.models:
|
|
coll.models.append(model)
|
|
db.commit()
|
|
return {"detail": "Model added to collection"}
|
|
|
|
|
|
@router.delete("/collections/{collection_id}/remove/{model_id}")
|
|
def remove_from_collection(collection_id: int, model_id: int, db: Session = Depends(get_db)):
|
|
coll = db.query(Collection).filter(Collection.id == collection_id).first()
|
|
if not coll: raise HTTPException(status_code=404, detail="Collection not found")
|
|
model = db.query(Model3D).filter(Model3D.id == model_id).first()
|
|
if not model: raise HTTPException(status_code=404, detail="Model not found")
|
|
if model in coll.models:
|
|
coll.models.remove(model)
|
|
db.commit()
|
|
return {"detail": "Model removed from collection"}
|
|
|
|
|
|
@router.delete("/collections/{collection_id}")
|
|
def delete_collection(collection_id: int, db: Session = Depends(get_db)):
|
|
coll = db.query(Collection).filter(Collection.id == collection_id).first()
|
|
if not coll: raise HTTPException(status_code=404, detail="Collection not found")
|
|
db.delete(coll)
|
|
db.commit()
|
|
return {"detail": "Collection deleted"}
|
|
|
|
|
|
# --- Downloads & QR ---
|
|
|
|
@router.get("/{model_id}/download")
|
|
def download_model(model_id: int, file_id: Optional[int] = None, db: Session = Depends(get_db)):
|
|
model = db.query(Model3D).filter(Model3D.id == model_id).first()
|
|
if not model: raise HTTPException(status_code=404, detail="Model not found")
|
|
if file_id:
|
|
mf = db.query(ModelFile).filter(ModelFile.id == file_id, ModelFile.model_id == model_id).first()
|
|
if not mf: raise HTTPException(status_code=404, detail="File not found")
|
|
if not os.path.exists(mf.file_path): raise HTTPException(status_code=404, detail="File not found on disk")
|
|
model.download_count += 1; db.commit()
|
|
return FileResponse(path=mf.file_path, filename=mf.filename, media_type="application/octet-stream")
|
|
primary = next((f for f in model.files if f.is_primary and f.file_type in ('stl', '3mf')), None)
|
|
file_path = primary.file_path if primary else os.path.join(UPLOAD_DIR, model.filename)
|
|
if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="File not found on disk")
|
|
model.download_count += 1; db.commit()
|
|
return FileResponse(path=file_path, filename=model.filename, media_type="application/octet-stream")
|
|
|
|
|
|
@router.get("/{model_id}/download-all")
|
|
def download_all(model_id: int, db: Session = Depends(get_db)):
|
|
model = db.query(Model3D).filter(Model3D.id == model_id).first()
|
|
if not model: raise HTTPException(status_code=404, detail="Model not found")
|
|
model_files = [f for f in model.files if f.file_type in ('stl', '3mf')]
|
|
if not model_files: raise HTTPException(status_code=404, detail="No 3D files found")
|
|
zip_buffer = io.BytesIO()
|
|
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
|
|
for mf in model_files:
|
|
if os.path.exists(mf.file_path): zf.write(mf.file_path, arcname=mf.filename)
|
|
zip_buffer.seek(0)
|
|
model.download_count += 1; db.commit()
|
|
return StreamingResponse(zip_buffer, media_type="application/zip",
|
|
headers={"Content-Disposition": f"attachment; filename={model.title.replace(' ', '_')}_all_parts.zip"})
|
|
|
|
|
|
@router.get("/{model_id}/thumbnail")
|
|
def get_thumbnail(model_id: int, db: Session = Depends(get_db)):
|
|
model = db.query(Model3D).filter(Model3D.id == model_id).first()
|
|
if not model: raise HTTPException(status_code=404, detail="Model not found")
|
|
# Try WEBP first, fallback to PNG
|
|
if model.thumbnail_path:
|
|
if os.path.exists(model.thumbnail_path):
|
|
media_type = "image/webp" if model.thumbnail_path.lower().endswith('.webp') else "image/png"
|
|
return FileResponse(path=model.thumbnail_path, media_type=media_type, headers=_cache_headers(86400))
|
|
# Fallback to PNG with same base name
|
|
png_path = os.path.splitext(model.thumbnail_path)[0] + '.png'
|
|
if os.path.exists(png_path):
|
|
return FileResponse(path=png_path, media_type="image/png", headers=_cache_headers(86400))
|
|
raise HTTPException(status_code=404, detail="Thumbnail not found")
|
|
|
|
|
|
@router.get("/{model_id}/qr")
|
|
def get_qr(model_id: int, db: Session = Depends(get_db)):
|
|
model = db.query(Model3D).filter(Model3D.id == model_id).first()
|
|
if not model: raise HTTPException(status_code=404, detail="Model not found")
|
|
url = f"https://3d.consultoria-as.com/model/{model_id}"
|
|
qr = qrcode.QRCode(version=1, box_size=10, border=2)
|
|
qr.add_data(url); qr.make(fit=True)
|
|
img = qr.make_image(fill_color="#0f172a", back_color="#ffffff")
|
|
buf = io.BytesIO(); img.save(buf, format='PNG'); buf.seek(0)
|
|
return StreamingResponse(buf, media_type="image/png")
|