feat: implementar 12 mejoras, tests, docs y optimizaciones

- Fase A: license templates, search history, cost estimator
- Fase B: import URL, bulk ZIP, batch download
- Fase C: comparison mode, mesh validation, measurement tool
- Fase D: cross-section clipping, overhang heatmap, layer animation
- Refactor Pydantic/SQLAlchemy warnings
- 24 tests pytest
- README actualizado
- WebP thumbnails, lazy loading, cache headers
This commit is contained in:
Consultoria AS
2026-04-27 09:14:58 +00:00
commit 14b307110d
31 changed files with 5386 additions and 0 deletions

0
app/__init__.py Normal file
View File

20
app/database.py Normal file
View File

@@ -0,0 +1,20 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
import os
DATABASE_URL = "sqlite:///./stl_repo.db"
engine = create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

38
app/main.py Normal file
View File

@@ -0,0 +1,38 @@
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from app.database import engine, Base
from app.routers import models
from app.migrate import run_migrations
import os
# Create tables and run migrations
Base.metadata.create_all(bind=engine)
run_migrations()
app = FastAPI(title="STL Repository", version="2.1.0")
app.include_router(models.router)
# Serve static files
app.mount("/static", StaticFiles(directory="static"), name="static")
# Serve uploads, thumbnails and images directly
app.mount("/uploads", StaticFiles(directory="uploads"), name="uploads")
app.mount("/thumbnails", StaticFiles(directory="thumbnails"), name="thumbnails")
app.mount("/images", StaticFiles(directory="images"), name="images")
@app.get("/")
def root():
return FileResponse("static/index.html")
@app.get("/upload")
def upload_page():
return FileResponse("static/upload.html")
@app.get("/model/{model_id}")
def detail_page(model_id: int):
return FileResponse("static/detail.html")

59
app/migrate.py Normal file
View File

@@ -0,0 +1,59 @@
import os
from sqlalchemy import text
from app.database import engine, SessionLocal
from app.models import Base, Tag, ModelFile
def run_migrations():
"""Run manual migrations for SQLite."""
Base.metadata.create_all(bind=engine)
db = SessionLocal()
try:
# Check if we need to migrate old tags (CSV string) to new tag structure
result = db.execute(text("SELECT name FROM sqlite_master WHERE type='table' AND name='tags'"))
if result.fetchone():
# Check if there are models with old-style tags (comma separated) and no model_tags entries
from app.models import Model3D
models = db.query(Model3D).all()
for model in models:
# Migrate tags if model has tags string but no tag relationships
if not model.tags and model.__dict__.get('tags_col'):
tag_names = [t.strip().lower() for t in model.tags_col.split(',') if t.strip()]
for name in tag_names:
tag = db.query(Tag).filter(Tag.name == name).first()
if not tag:
tag = Tag(name=name)
db.add(tag)
db.flush()
if tag not in model.tags:
model.tags.append(tag)
# Check if we need to migrate files to model_files
result = db.execute(text("SELECT name FROM sqlite_master WHERE type='table' AND name='model_files'"))
if result.fetchone():
from app.models import Model3D
models = db.query(Model3D).all()
for model in models:
# Check if model already has file records
if not model.files:
file_path = os.path.join("uploads", model.filename)
if os.path.exists(file_path):
mf = ModelFile(
model_id=model.id,
filename=model.filename,
file_path=file_path,
file_type='stl',
is_primary=True,
file_size=model.file_size,
file_hash=model.file_hash,
)
db.add(mf)
db.commit()
print("Migrations completed successfully")
except Exception as e:
print(f"Migration error: {e}")
db.rollback()
finally:
db.close()

105
app/models.py Normal file
View File

@@ -0,0 +1,105 @@
from sqlalchemy import Column, Integer, String, Float, DateTime, Text, Boolean, Table, ForeignKey
from sqlalchemy.orm import relationship
from datetime import datetime, timezone
from app.database import Base
# Many-to-many association tables
model_tags = Table(
'model_tags',
Base.metadata,
Column('model_id', Integer, ForeignKey('models.id'), primary_key=True),
Column('tag_id', Integer, ForeignKey('tags.id'), primary_key=True)
)
collection_models = Table(
'collection_models',
Base.metadata,
Column('collection_id', Integer, ForeignKey('collections.id'), primary_key=True),
Column('model_id', Integer, ForeignKey('models.id'), primary_key=True)
)
class Tag(Base):
__tablename__ = "tags"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, unique=True, nullable=False, index=True)
models = relationship("Model3D", secondary=model_tags, back_populates="tags")
class Model3D(Base):
__tablename__ = "models"
id = Column(Integer, primary_key=True, index=True)
title = Column(String, nullable=False)
filename = Column(String, nullable=False)
description = Column(Text, nullable=True)
author = Column(String, nullable=True)
license = Column(String, nullable=True)
category = Column(String, nullable=True)
file_size = Column(Integer, nullable=True)
file_hash = Column(String, nullable=True, index=True)
width = Column(Float, nullable=True)
height = Column(Float, nullable=True)
depth = Column(Float, nullable=True)
faces = Column(Integer, nullable=True)
created_at = Column(DateTime, default=datetime.now(timezone.utc))
thumbnail_path = Column(String, nullable=True)
download_count = Column(Integer, default=0)
tags = relationship("Tag", secondary=model_tags, back_populates="models")
files = relationship("ModelFile", back_populates="model", cascade="all, delete-orphan")
ratings = relationship("Rating", back_populates="model", cascade="all, delete-orphan")
comments = relationship("Comment", back_populates="model", cascade="all, delete-orphan")
collections = relationship("Collection", secondary=collection_models, back_populates="models")
class ModelFile(Base):
__tablename__ = "model_files"
id = Column(Integer, primary_key=True, index=True)
model_id = Column(Integer, ForeignKey("models.id"), nullable=False)
filename = Column(String, nullable=False)
file_path = Column(String, nullable=False)
file_type = Column(String, nullable=False, default='stl')
part_name = Column(String, nullable=True)
is_primary = Column(Boolean, default=False)
file_size = Column(Integer, nullable=True)
file_hash = Column(String, nullable=True)
model = relationship("Model3D", back_populates="files")
class Rating(Base):
__tablename__ = "ratings"
id = Column(Integer, primary_key=True, index=True)
model_id = Column(Integer, ForeignKey("models.id"), nullable=False)
stars = Column(Integer, nullable=False) # 1-5
created_at = Column(DateTime, default=datetime.now(timezone.utc))
model = relationship("Model3D", back_populates="ratings")
class Collection(Base):
__tablename__ = "collections"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
description = Column(Text, nullable=True)
created_at = Column(DateTime, default=datetime.now(timezone.utc))
models = relationship("Model3D", secondary=collection_models, back_populates="collections")
class Comment(Base):
__tablename__ = "comments"
id = Column(Integer, primary_key=True, index=True)
model_id = Column(Integer, ForeignKey("models.id"), nullable=False)
author_name = Column(String, nullable=True)
text = Column(Text, nullable=False)
created_at = Column(DateTime, default=datetime.now(timezone.utc))
model = relationship("Model3D", back_populates="comments")

208
app/parsers.py Normal file
View File

@@ -0,0 +1,208 @@
import struct
import numpy as np
from PIL import Image, ImageDraw
import os
import zipfile
import xml.etree.ElementTree as ET
def parse_stl_file(file_path: str):
"""Parse an STL file (binary or ASCII) and return mesh data + metadata."""
with open(file_path, 'rb') as f:
header = f.read(80)
is_binary = False
if not header.startswith(b'solid'):
is_binary = True
else:
with open(file_path, 'rb') as f:
f.read(80)
tri_count_bytes = f.read(4)
if len(tri_count_bytes) == 4:
tri_count = struct.unpack('<I', tri_count_bytes)[0]
file_size = os.path.getsize(file_path)
expected = 80 + 4 + tri_count * 50
if file_size == expected:
is_binary = True
if is_binary:
return _parse_binary(file_path)
else:
return _parse_ascii(file_path)
def _parse_binary(file_path: str):
with open(file_path, 'rb') as f:
f.read(80)
tri_count = struct.unpack('<I', f.read(4))[0]
vertices = []
for _ in range(tri_count):
f.read(12) # normal
v1 = struct.unpack('<3f', f.read(12))
v2 = struct.unpack('<3f', f.read(12))
v3 = struct.unpack('<3f', f.read(12))
f.read(2)
vertices.extend([v1, v2, v3])
vertices = np.array(vertices, dtype=np.float32)
return _compute_metadata(vertices, tri_count)
def _parse_ascii(file_path: str):
vertices = []
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
for line in f:
line = line.strip().lower()
if line.startswith('vertex'):
parts = line.split()
if len(parts) >= 4:
v = [float(parts[1]), float(parts[2]), float(parts[3])]
vertices.append(v)
vertices = np.array(vertices, dtype=np.float32)
tri_count = len(vertices) // 3
return _compute_metadata(vertices, tri_count)
def _compute_metadata(vertices: np.ndarray, tri_count: int):
if len(vertices) == 0:
return {
'vertices': vertices,
'faces': 0,
'width': 0.0,
'height': 0.0,
'depth': 0.0,
}
min_v = vertices.min(axis=0)
max_v = vertices.max(axis=0)
dims = max_v - min_v
return {
'vertices': vertices,
'faces': tri_count,
'width': float(dims[0]),
'height': float(dims[1]),
'depth': float(dims[2]),
}
def parse_3mf_file(file_path: str):
"""Parse a 3MF file (zip with XML) and return mesh data + metadata."""
vertices = []
tri_count = 0
try:
with zipfile.ZipFile(file_path, 'r') as zf:
model_path = None
for name in zf.namelist():
if name.endswith('3dmodel.model') or name.endswith('.model'):
model_path = name
break
if not model_path:
raise ValueError("No 3D model found in 3MF archive")
with zf.open(model_path) as mf:
tree = ET.parse(mf)
root = tree.getroot()
# Find namespace
ns = {'m': root.tag.split('}')[0].strip('{') if '}' in root.tag else ''}
if ns['m']:
ns = {'m': ns['m']}
mesh = root.find('.//m:mesh', ns)
else:
mesh = root.find('.//mesh')
if mesh is None:
raise ValueError("No mesh found in 3MF model")
verts_elem = mesh.find('m:vertices', ns) if ns else mesh.find('vertices')
if verts_elem is not None:
tag = 'm:vertex' if ns else 'vertex'
for v in verts_elem.findall(tag, ns) if ns else verts_elem.findall(tag):
x = float(v.get('x', 0))
y = float(v.get('y', 0))
z = float(v.get('z', 0))
vertices.append([x, y, z])
tris_elem = mesh.find('m:triangles', ns) if ns else mesh.find('triangles')
if tris_elem is not None:
tag = 'm:triangle' if ns else 'triangle'
tri_verts = []
for t in tris_elem.findall(tag, ns) if ns else tris_elem.findall(tag):
v1 = int(t.get('v1', 0))
v2 = int(t.get('v2', 0))
v3 = int(t.get('v3', 0))
tri_verts.extend([vertices[v1], vertices[v2], vertices[v3]])
vertices = tri_verts
tri_count = len(tris_verts) // 3
else:
vertices = []
except Exception as e:
raise ValueError(f"Failed to parse 3MF: {str(e)}")
vertices = np.array(vertices, dtype=np.float32)
return _compute_metadata(vertices, tri_count)
def parse_model_file(file_path: str):
"""Auto-detect format and parse."""
lower = file_path.lower()
if lower.endswith('.stl'):
return parse_stl_file(file_path)
elif lower.endswith('.3mf'):
return parse_3mf_file(file_path)
else:
raise ValueError("Unsupported file format. Only STL and 3MF are supported.")
def generate_thumbnail(vertices: np.ndarray, output_path: str, size: int = 256):
"""Generate a simple orthographic thumbnail from vertices."""
if len(vertices) == 0:
img = Image.new('RGB', (size, size), color=(30, 30, 30))
img.save(output_path)
return
min_v = vertices.min(axis=0)
max_v = vertices.max(axis=0)
dims = max_v - min_v
scale = max(dims[0], dims[1])
if scale == 0:
scale = 1.0
margin = 20
img_size = size - 2 * margin
img = Image.new('RGB', (size, size), color=(30, 30, 30))
draw = ImageDraw.Draw(img)
for i in range(0, len(vertices), 3):
tri = vertices[i:i+3]
pts = []
for v in tri:
x = margin + int(((v[0] - min_v[0]) / scale) * img_size)
y = margin + int(((1.0 - (v[1] - min_v[1]) / scale)) * img_size)
pts.append((x, y))
if len(pts) == 3:
z_avg = sum(v[2] for v in tri) / 3.0
z_norm = (z_avg - min_v[2]) / (dims[2] if dims[2] > 0 else 1)
brightness = int(80 + z_norm * 120)
color = (brightness, brightness, int(brightness * 1.1))
draw.polygon(pts, fill=color, outline=(50, 50, 60))
fmt = 'WEBP' if output_path.lower().endswith('.webp') else 'PNG'
img.save(output_path, fmt)
def generate_generic_thumbnail(output_path: str, size: int = 256, label: str = "3D"):
"""Generate a generic thumbnail for unsupported previews."""
img = Image.new('RGB', (size, size), color=(30, 30, 30))
draw = ImageDraw.Draw(img)
draw.rectangle([40, 40, size-40, size-40], outline=(6, 182, 212), width=3)
try:
from PIL import ImageFont
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 36)
except:
font = ImageFont.load_default()
bbox = draw.textbbox((0,0), label, font=font)
text_w = bbox[2] - bbox[0]
text_h = bbox[3] - bbox[1]
draw.text(((size - text_w) // 2, (size - text_h) // 2), label, fill=(6, 182, 212), font=font)
img.save(output_path, 'PNG')

0
app/routers/__init__.py Normal file
View File

739
app/routers/models.py Normal file
View File

@@ -0,0 +1,739 @@
import os
import shutil
import hashlib
import json
import zipfile
import qrcode
import io
import tempfile
import urllib.parse
import httpx
from fastapi import APIRouter, Depends, UploadFile, File, Form, HTTPException, Query, Request, Body
from fastapi.responses import FileResponse, StreamingResponse, JSONResponse
from sqlalchemy.orm import Session
from typing import Optional, List
def _cache_headers(seconds: int):
return {"Cache-Control": f"public, max-age={seconds}"}
from app.database import get_db
from app.models import Model3D, Tag, ModelFile, Rating, Comment, Collection
from app.schemas import (
Model3DResponse, Model3DUpdate, RatingResponse, CommentResponse,
CollectionResponse, CollectionCreate, CollectionDetailResponse
)
from app.parsers import parse_model_file, generate_thumbnail, generate_generic_thumbnail
router = APIRouter(prefix="/api/models", tags=["models"])
UPLOAD_DIR = "uploads"
THUMBNAIL_DIR = "thumbnails"
IMAGES_DIR = "images"
os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(THUMBNAIL_DIR, exist_ok=True)
os.makedirs(IMAGES_DIR, exist_ok=True)
def _file_hash(file_path: str) -> str:
h = hashlib.sha256()
with open(file_path, 'rb') as f:
while chunk := f.read(8192):
h.update(chunk)
return h.hexdigest()
def _get_or_create_tags(db: Session, tag_names: List[str]) -> List[Tag]:
tags = []
for name in tag_names:
name = name.strip().lower()
if not name:
continue
tag = db.query(Tag).filter(Tag.name == name).first()
if not tag:
tag = Tag(name=name)
db.add(tag)
db.flush()
tags.append(tag)
return tags
def _calc_avg_rating(model: Model3D) -> Optional[float]:
if not model.ratings:
return None
return round(sum(r.stars for r in model.ratings) / len(model.ratings), 1)
# --- Cost estimator helpers ---
def _estimate_print_time_seconds(volume_cm3: float) -> int:
# Rough estimate: ~1 hour per 10cm3 at standard speed
return int(volume_cm3 * 360)
# --- Process a single STL file into a model ---
def _process_single_file(db: Session, file_path: str, title: str, description: str,
author: str, license_str: str, tags_str: str, category: str,
part_name: Optional[str] = None, is_primary: bool = False) -> Model3D:
file_hash = _file_hash(file_path)
existing = db.query(Model3D).filter(Model3D.file_hash == file_hash).first()
if existing:
os.remove(file_path)
raise HTTPException(status_code=409,
detail=f"File already exists as '{existing.title}' (ID: {existing.id})")
try:
model_data = parse_model_file(file_path)
except Exception as e:
os.remove(file_path)
raise HTTPException(status_code=400, detail=f"Failed to parse: {str(e)}")
thumbnail_path = os.path.join(THUMBNAIL_DIR, f"{os.path.splitext(os.path.basename(file_path))[0]}.webp")
generate_thumbnail(model_data['vertices'], thumbnail_path)
db_model = Model3D(
title=title,
filename=os.path.basename(file_path),
description=description,
author=author,
license=license_str,
category=category,
file_size=os.path.getsize(file_path),
file_hash=file_hash,
width=model_data['width'],
height=model_data['height'],
depth=model_data['depth'],
faces=model_data['faces'],
thumbnail_path=thumbnail_path,
)
db.add(db_model)
db.flush()
db_file = ModelFile(
model_id=db_model.id,
filename=os.path.basename(file_path),
file_path=file_path,
file_type='stl' if file_path.lower().endswith('.stl') else '3mf',
part_name=part_name,
is_primary=is_primary,
file_size=os.path.getsize(file_path),
file_hash=file_hash,
)
db.add(db_file)
tag_names = [t.strip() for t in (tags_str or '').split(',') if t.strip()]
db_model.tags = _get_or_create_tags(db, tag_names)
db.commit()
db.refresh(db_model)
return db_model
# =============================================================================
# STATIC ROUTES (no path parameters) - MUST come before dynamic routes
# =============================================================================
@router.get("/", response_model=List[Model3DResponse])
def list_models(
search: Optional[str] = None, category: Optional[str] = None, tag: Optional[str] = None,
min_width: Optional[float] = None, max_width: Optional[float] = None,
min_height: Optional[float] = None, max_height: Optional[float] = None,
min_depth: Optional[float] = None, max_depth: Optional[float] = None,
min_faces: Optional[int] = None, max_faces: Optional[int] = None,
date_from: Optional[str] = None, date_to: Optional[str] = None,
sort_by: Optional[str] = Query("newest", enum=["newest", "oldest", "most_downloaded", "largest", "most_faces", "highest_rated"]),
skip: int = Query(0, ge=0), limit: int = Query(24, ge=1, le=100),
db: Session = Depends(get_db)
):
query = db.query(Model3D)
if search: query = query.filter(Model3D.title.contains(search))
if category: query = query.filter(Model3D.category == category)
if tag: query = query.join(Model3D.tags).filter(Tag.name.contains(tag.lower()))
if min_width: query = query.filter(Model3D.width >= min_width)
if max_width: query = query.filter(Model3D.width <= max_width)
if min_height: query = query.filter(Model3D.height >= min_height)
if max_height: query = query.filter(Model3D.height <= max_height)
if min_depth: query = query.filter(Model3D.depth >= min_depth)
if max_depth: query = query.filter(Model3D.depth <= max_depth)
if min_faces: query = query.filter(Model3D.faces >= min_faces)
if max_faces: query = query.filter(Model3D.faces <= max_faces)
if date_from: query = query.filter(Model3D.created_at >= date_from)
if date_to: query = query.filter(Model3D.created_at <= date_to)
if sort_by == "newest": query = query.order_by(Model3D.created_at.desc())
elif sort_by == "oldest": query = query.order_by(Model3D.created_at.asc())
elif sort_by == "most_downloaded": query = query.order_by(Model3D.download_count.desc())
elif sort_by == "largest": query = query.order_by(Model3D.file_size.desc())
elif sort_by == "most_faces": query = query.order_by(Model3D.faces.desc())
elif sort_by == "highest_rated":
models = query.all()
models.sort(key=lambda m: _calc_avg_rating(m) or 0, reverse=True)
return models[skip:skip+limit]
models = query.offset(skip).limit(limit).all()
return JSONResponse(content=[Model3DResponse.model_validate(m).model_dump(mode='json') for m in models], headers=_cache_headers(60))
@router.get("/tags", response_model=List[dict])
def list_tags(search: Optional[str] = None, db: Session = Depends(get_db)):
query = db.query(Tag)
if search: query = query.filter(Tag.name.contains(search.lower()))
tags = query.order_by(Tag.name).all()
data = [{"id": t.id, "name": t.name, "count": len(t.models)} for t in tags]
return JSONResponse(content=data, headers=_cache_headers(300))
@router.get("/collections/all", response_model=List[CollectionResponse])
def list_collections(db: Session = Depends(get_db)):
collections = db.query(Collection).order_by(Collection.created_at.desc()).all()
result = []
for c in collections:
resp = CollectionResponse.model_validate(c)
resp.model_count = len(c.models)
result.append(resp)
return result
@router.post("/collections", response_model=CollectionResponse)
def create_collection(data: CollectionCreate, db: Session = Depends(get_db)):
coll = Collection(name=data.name, description=data.description)
db.add(coll)
db.commit()
db.refresh(coll)
resp = CollectionResponse.model_validate(coll)
resp.model_count = 0
return resp
@router.get("/system/backup")
def backup_all(db: Session = Depends(get_db)):
from datetime import datetime
import glob
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
db_path = "stl_repo.db"
if os.path.exists(db_path): zf.write(db_path, arcname="backup/stl_repo.db")
for f in glob.glob("uploads/*"):
if os.path.isfile(f): zf.write(f, arcname=f"backup/{f}")
for f in glob.glob("thumbnails/*"):
if os.path.isfile(f): zf.write(f, arcname=f"backup/{f}")
for f in glob.glob("images/*"):
if os.path.isfile(f): zf.write(f, arcname=f"backup/{f}")
models = db.query(Model3D).all()
metadata = []
for m in models:
metadata.append({
"id": m.id, "title": m.title, "filename": m.filename,
"description": m.description, "author": m.author, "license": m.license,
"category": m.category, "tags": [t.name for t in m.tags],
"faces": m.faces, "width": m.width, "height": m.height, "depth": m.depth,
"created_at": m.created_at.isoformat() if m.created_at else None,
"download_count": m.download_count,
"files": [{"filename": f.filename, "file_type": f.file_type, "part_name": f.part_name, "is_primary": f.is_primary} for f in m.files]
})
zf.writestr("backup/metadata.json", json.dumps(metadata, indent=2, default=str))
zip_buffer.seek(0)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
return StreamingResponse(zip_buffer, media_type="application/zip",
headers={"Content-Disposition": f"attachment; filename=stl_repo_backup_{timestamp}.zip"})
# =============================================================================
# MODEL CREATION
# =============================================================================
@router.post("/", response_model=Model3DResponse)
async def create_model(
title: str = Form(...),
description: Optional[str] = Form(None),
author: Optional[str] = Form(None),
license: Optional[str] = Form(None),
tags: Optional[str] = Form(None),
category: Optional[str] = Form(None),
files: List[UploadFile] = File(default=[]),
images: List[UploadFile] = File(default=[]),
part_names: Optional[str] = Form(None),
db: Session = Depends(get_db)
):
if not files:
raise HTTPException(status_code=400, detail="At least one 3D file is required")
part_names_map = {}
if part_names:
try:
part_names_map = json.loads(part_names)
except:
pass
saved_3d_files = []
total_faces = 0
for idx, file in enumerate(files):
if not file.filename:
continue
lower_name = file.filename.lower()
if not (lower_name.endswith('.stl') or lower_name.endswith('.3mf')):
for sf in saved_3d_files:
if os.path.exists(sf['path']): os.remove(sf['path'])
raise HTTPException(status_code=400, detail=f"Unsupported format: {file.filename}")
file_path = os.path.join(UPLOAD_DIR, file.filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
file_hash = _file_hash(file_path)
existing = db.query(Model3D).filter(Model3D.file_hash == file_hash).first()
if existing:
os.remove(file_path)
for sf in saved_3d_files:
if os.path.exists(sf['path']): os.remove(sf['path'])
raise HTTPException(status_code=409,
detail=f"File '{file.filename}' already exists as '{existing.title}' (ID: {existing.id})")
try:
model_data = parse_model_file(file_path)
except Exception as e:
os.remove(file_path)
for sf in saved_3d_files:
if os.path.exists(sf['path']): os.remove(sf['path'])
raise HTTPException(status_code=400, detail=f"Failed to parse {file.filename}: {str(e)}")
saved_3d_files.append({
'path': file_path, 'filename': file.filename, 'hash': file_hash,
'size': os.path.getsize(file_path), 'data': model_data,
'is_primary': idx == 0, 'part_name': part_names_map.get(str(idx), None),
})
if idx == 0:
total_faces += model_data['faces']
if not saved_3d_files:
raise HTTPException(status_code=400, detail="No valid 3D files provided")
primary = saved_3d_files[0]
thumbnail_path = os.path.join(THUMBNAIL_DIR, f"{os.path.splitext(primary['filename'])[0]}.webp")
generate_thumbnail(primary['data']['vertices'], thumbnail_path)
db_model = Model3D(
title=title, filename=primary['filename'], description=description,
author=author, license=license, category=category,
file_size=sum(f['size'] for f in saved_3d_files),
file_hash=primary['hash'], width=primary['data']['width'],
height=primary['data']['height'], depth=primary['data']['depth'],
faces=total_faces, thumbnail_path=thumbnail_path,
)
db.add(db_model)
db.flush()
for sf in saved_3d_files:
db_file = ModelFile(
model_id=db_model.id, filename=sf['filename'], file_path=sf['path'],
file_type='stl' if sf['filename'].lower().endswith('.stl') else '3mf',
part_name=sf['part_name'], is_primary=sf['is_primary'],
file_size=sf['size'], file_hash=sf['hash'],
)
db.add(db_file)
for img in images:
if not img.filename: continue
lower_name = img.filename.lower()
if not (lower_name.endswith('.jpg') or lower_name.endswith('.jpeg') or lower_name.endswith('.png')):
continue
img_path = os.path.join(IMAGES_DIR, img.filename)
with open(img_path, "wb") as buffer:
shutil.copyfileobj(img.file, buffer)
db.add(ModelFile(
model_id=db_model.id, filename=img.filename, file_path=img_path,
file_type='image', is_primary=False, file_size=os.path.getsize(img_path),
))
tag_names = [t.strip() for t in (tags or '').split(',') if t.strip()]
db_model.tags = _get_or_create_tags(db, tag_names)
db.commit()
db.refresh(db_model)
response_data = Model3DResponse.model_validate(db_model)
response_data.avg_rating = _calc_avg_rating(db_model)
return response_data
# =============================================================================
# IMPORT / BULK / BATCH
# =============================================================================
@router.post("/import-url", response_model=Model3DResponse)
async def import_from_url(
url: str = Form(...),
title: str = Form(...),
description: Optional[str] = Form(None),
author: Optional[str] = Form(None),
license: Optional[str] = Form(None),
tags: Optional[str] = Form(None),
category: Optional[str] = Form(None),
db: Session = Depends(get_db)
):
MAX_SIZE = 50 * 1024 * 1024 # 50MB
try:
parsed = urllib.parse.urlparse(url)
if not parsed.scheme or not parsed.netloc:
raise HTTPException(status_code=400, detail="Invalid URL")
except Exception:
raise HTTPException(status_code=400, detail="Invalid URL")
try:
async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client:
r = await client.get(url)
r.raise_for_status()
if len(r.content) > MAX_SIZE:
raise HTTPException(status_code=400, detail="File too large (max 50MB)")
except httpx.RequestError as e:
raise HTTPException(status_code=400, detail=f"Download failed: {str(e)}")
filename = os.path.basename(parsed.path) or "downloaded.stl"
if not filename.lower().endswith(('.stl', '.3mf')):
filename += ".stl"
file_path = os.path.join(UPLOAD_DIR, filename)
with open(file_path, 'wb') as f:
f.write(r.content)
db_model = _process_single_file(db, file_path, title, description, author, license, tags, category, is_primary=True)
response_data = Model3DResponse.model_validate(db_model)
response_data.avg_rating = _calc_avg_rating(db_model)
return response_data
@router.post("/bulk-zip", response_model=List[Model3DResponse])
async def bulk_zip_upload(
zip_file: UploadFile = File(...),
description: Optional[str] = Form(None),
author: Optional[str] = Form(None),
license: Optional[str] = Form(None),
tags: Optional[str] = Form(None),
category: Optional[str] = Form(None),
db: Session = Depends(get_db)
):
if not zip_file.filename or not zip_file.filename.lower().endswith('.zip'):
raise HTTPException(status_code=400, detail="Only ZIP files are allowed")
tmp_path = os.path.join(tempfile.gettempdir(), zip_file.filename)
with open(tmp_path, 'wb') as f:
shutil.copyfileobj(zip_file.file, f)
extract_dir = tempfile.mkdtemp()
try:
with zipfile.ZipFile(tmp_path, 'r') as zf:
zf.extractall(extract_dir)
except zipfile.BadZipFile:
os.remove(tmp_path)
raise HTTPException(status_code=400, detail="Invalid ZIP file")
results = []
for root, dirs, files in os.walk(extract_dir):
for fname in files:
lower = fname.lower()
if lower.endswith('.stl') or lower.endswith('.3mf'):
src = os.path.join(root, fname)
dst = os.path.join(UPLOAD_DIR, fname)
# avoid collisions
if os.path.exists(dst):
base, ext = os.path.splitext(fname)
fname = f"{base}_{hashlib.md5(src.encode()).hexdigest()[:6]}{ext}"
dst = os.path.join(UPLOAD_DIR, fname)
shutil.move(src, dst)
try:
db_model = _process_single_file(
db, dst, os.path.splitext(fname)[0],
description, author, license, tags, category, is_primary=True
)
response_data = Model3DResponse.model_validate(db_model)
response_data.avg_rating = _calc_avg_rating(db_model)
results.append(response_data)
except HTTPException:
pass # skip duplicates
os.remove(tmp_path)
shutil.rmtree(extract_dir, ignore_errors=True)
return results
@router.post("/batch-download")
def batch_download(
model_ids: List[int] = Body(...),
db: Session = Depends(get_db)
):
if not model_ids:
raise HTTPException(status_code=400, detail="No model IDs provided")
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for mid in model_ids:
model = db.query(Model3D).filter(Model3D.id == mid).first()
if not model:
continue
for mf in model.files:
if mf.file_type in ('stl', '3mf') and os.path.exists(mf.file_path):
zf.write(mf.file_path, arcname=f"{model.title.replace(' ', '_')}/{mf.filename}")
zip_buffer.seek(0)
return StreamingResponse(
zip_buffer, media_type="application/zip",
headers={"Content-Disposition": "attachment; filename=batch_download.zip"}
)
# =============================================================================
# DYNAMIC MODEL ROUTES
# =============================================================================
@router.get("/{model_id}", response_model=Model3DResponse)
def get_model(model_id: int, db: Session = Depends(get_db)):
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model: raise HTTPException(status_code=404, detail="Model not found")
response_data = Model3DResponse.model_validate(model)
response_data.avg_rating = _calc_avg_rating(model)
return response_data
@router.put("/{model_id}", response_model=Model3DResponse)
def update_model(model_id: int, data: Model3DUpdate, db: Session = Depends(get_db)):
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model: raise HTTPException(status_code=404, detail="Model not found")
if data.title is not None: model.title = data.title
if data.description is not None: model.description = data.description
if data.author is not None: model.author = data.author
if data.license is not None: model.license = data.license
if data.category is not None: model.category = data.category
if data.tag_names is not None: model.tags = _get_or_create_tags(db, data.tag_names)
db.commit()
db.refresh(model)
response_data = Model3DResponse.model_validate(model)
response_data.avg_rating = _calc_avg_rating(model)
return response_data
@router.delete("/{model_id}")
def delete_model(model_id: int, db: Session = Depends(get_db)):
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model: raise HTTPException(status_code=404, detail="Model not found")
for mf in model.files:
if os.path.exists(mf.file_path): os.remove(mf.file_path)
if model.thumbnail_path and os.path.exists(model.thumbnail_path): os.remove(model.thumbnail_path)
db.delete(model)
db.commit()
return {"detail": "Model deleted"}
# --- Validation & Estimation ---
@router.get("/{model_id}/validate")
def validate_mesh(model_id: int, db: Session = Depends(get_db)):
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model:
raise HTTPException(status_code=404, detail="Model not found")
primary = next((f for f in model.files if f.is_primary and f.file_type in ('stl', '3mf')), None)
if not primary:
raise HTTPException(status_code=404, detail="No 3D file found")
try:
import trimesh
mesh = trimesh.load(primary.file_path, force='mesh')
volume = abs(float(mesh.volume)) if mesh.is_watertight else 0.0
area = float(mesh.area)
is_watertight = bool(mesh.is_watertight)
is_winding_consistent = bool(mesh.is_winding_consistent)
bounds = mesh.bounds.tolist()
euler = int(mesh.euler_number)
# Count holes via euler characteristic
holes = 0 if is_watertight else max(0, 2 - euler)
return {
"is_watertight": is_watertight,
"is_winding_consistent": is_winding_consistent,
"volume_cm3": round(volume / 1000, 3) if volume else 0,
"surface_area_cm2": round(area / 100, 2),
"bounds_mm": bounds,
"euler_number": euler,
"estimated_holes": holes,
"face_count": len(mesh.faces),
"vertex_count": len(mesh.vertices),
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Validation failed: {str(e)}")
@router.get("/{model_id}/estimate")
def estimate_print(model_id: int, price_per_kg: float = Query(20.0), material_density: float = Query(1.24), db: Session = Depends(get_db)):
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model:
raise HTTPException(status_code=404, detail="Model not found")
primary = next((f for f in model.files if f.is_primary and f.file_type in ('stl', '3mf')), None)
if not primary:
raise HTTPException(status_code=404, detail="No 3D file found")
try:
import trimesh
mesh = trimesh.load(primary.file_path, force='mesh')
volume_cm3 = abs(float(mesh.volume)) / 1000.0
grams = volume_cm3 * material_density
cost = (grams / 1000.0) * price_per_kg
seconds = _estimate_print_time_seconds(volume_cm3)
hours = seconds // 3600
mins = (seconds % 3600) // 60
return {
"volume_cm3": round(volume_cm3, 2),
"grams": round(grams, 2),
"cost": round(cost, 2),
"estimated_time": f"{hours}h {mins}m",
"estimated_seconds": seconds,
"price_per_kg": price_per_kg,
"material_density": material_density,
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Estimation failed: {str(e)}")
# --- Ratings ---
@router.post("/{model_id}/ratings", response_model=RatingResponse)
def create_rating(model_id: int, stars: int = Query(..., ge=1, le=5), db: Session = Depends(get_db)):
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model: raise HTTPException(status_code=404, detail="Model not found")
rating = Rating(model_id=model_id, stars=stars)
db.add(rating)
db.commit()
db.refresh(rating)
return rating
@router.get("/{model_id}/ratings", response_model=List[RatingResponse])
def list_ratings(model_id: int, db: Session = Depends(get_db)):
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model: raise HTTPException(status_code=404, detail="Model not found")
return db.query(Rating).filter(Rating.model_id == model_id).order_by(Rating.created_at.desc()).all()
# --- Comments ---
@router.post("/{model_id}/comments", response_model=CommentResponse)
def create_comment(model_id: int, text: str = Query(..., min_length=1), author_name: Optional[str] = Query(None), db: Session = Depends(get_db)):
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model: raise HTTPException(status_code=404, detail="Model not found")
comment = Comment(model_id=model_id, text=text, author_name=author_name)
db.add(comment)
db.commit()
db.refresh(comment)
return comment
@router.get("/{model_id}/comments", response_model=List[CommentResponse])
def list_comments(model_id: int, db: Session = Depends(get_db)):
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model: raise HTTPException(status_code=404, detail="Model not found")
return db.query(Comment).filter(Comment.model_id == model_id).order_by(Comment.created_at.desc()).all()
# --- Collections (dynamic) ---
@router.get("/collections/{collection_id}", response_model=CollectionDetailResponse)
def get_collection(collection_id: int, db: Session = Depends(get_db)):
coll = db.query(Collection).filter(Collection.id == collection_id).first()
if not coll: raise HTTPException(status_code=404, detail="Collection not found")
resp = CollectionDetailResponse.model_validate(coll)
resp.model_count = len(coll.models)
return resp
@router.post("/collections/{collection_id}/add/{model_id}")
def add_to_collection(collection_id: int, model_id: int, db: Session = Depends(get_db)):
coll = db.query(Collection).filter(Collection.id == collection_id).first()
if not coll: raise HTTPException(status_code=404, detail="Collection not found")
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model: raise HTTPException(status_code=404, detail="Model not found")
if model not in coll.models:
coll.models.append(model)
db.commit()
return {"detail": "Model added to collection"}
@router.delete("/collections/{collection_id}/remove/{model_id}")
def remove_from_collection(collection_id: int, model_id: int, db: Session = Depends(get_db)):
coll = db.query(Collection).filter(Collection.id == collection_id).first()
if not coll: raise HTTPException(status_code=404, detail="Collection not found")
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model: raise HTTPException(status_code=404, detail="Model not found")
if model in coll.models:
coll.models.remove(model)
db.commit()
return {"detail": "Model removed from collection"}
@router.delete("/collections/{collection_id}")
def delete_collection(collection_id: int, db: Session = Depends(get_db)):
coll = db.query(Collection).filter(Collection.id == collection_id).first()
if not coll: raise HTTPException(status_code=404, detail="Collection not found")
db.delete(coll)
db.commit()
return {"detail": "Collection deleted"}
# --- Downloads & QR ---
@router.get("/{model_id}/download")
def download_model(model_id: int, file_id: Optional[int] = None, db: Session = Depends(get_db)):
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model: raise HTTPException(status_code=404, detail="Model not found")
if file_id:
mf = db.query(ModelFile).filter(ModelFile.id == file_id, ModelFile.model_id == model_id).first()
if not mf: raise HTTPException(status_code=404, detail="File not found")
if not os.path.exists(mf.file_path): raise HTTPException(status_code=404, detail="File not found on disk")
model.download_count += 1; db.commit()
return FileResponse(path=mf.file_path, filename=mf.filename, media_type="application/octet-stream")
primary = next((f for f in model.files if f.is_primary and f.file_type in ('stl', '3mf')), None)
file_path = primary.file_path if primary else os.path.join(UPLOAD_DIR, model.filename)
if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="File not found on disk")
model.download_count += 1; db.commit()
return FileResponse(path=file_path, filename=model.filename, media_type="application/octet-stream")
@router.get("/{model_id}/download-all")
def download_all(model_id: int, db: Session = Depends(get_db)):
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model: raise HTTPException(status_code=404, detail="Model not found")
model_files = [f for f in model.files if f.file_type in ('stl', '3mf')]
if not model_files: raise HTTPException(status_code=404, detail="No 3D files found")
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for mf in model_files:
if os.path.exists(mf.file_path): zf.write(mf.file_path, arcname=mf.filename)
zip_buffer.seek(0)
model.download_count += 1; db.commit()
return StreamingResponse(zip_buffer, media_type="application/zip",
headers={"Content-Disposition": f"attachment; filename={model.title.replace(' ', '_')}_all_parts.zip"})
@router.get("/{model_id}/thumbnail")
def get_thumbnail(model_id: int, db: Session = Depends(get_db)):
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model: raise HTTPException(status_code=404, detail="Model not found")
# Try WEBP first, fallback to PNG
if model.thumbnail_path:
if os.path.exists(model.thumbnail_path):
media_type = "image/webp" if model.thumbnail_path.lower().endswith('.webp') else "image/png"
return FileResponse(path=model.thumbnail_path, media_type=media_type, headers=_cache_headers(86400))
# Fallback to PNG with same base name
png_path = os.path.splitext(model.thumbnail_path)[0] + '.png'
if os.path.exists(png_path):
return FileResponse(path=png_path, media_type="image/png", headers=_cache_headers(86400))
raise HTTPException(status_code=404, detail="Thumbnail not found")
@router.get("/{model_id}/qr")
def get_qr(model_id: int, db: Session = Depends(get_db)):
model = db.query(Model3D).filter(Model3D.id == model_id).first()
if not model: raise HTTPException(status_code=404, detail="Model not found")
url = f"http://192.168.10.104:8000/model/{model_id}"
qr = qrcode.QRCode(version=1, box_size=10, border=2)
qr.add_data(url); qr.make(fit=True)
img = qr.make_image(fill_color="#0f172a", back_color="#ffffff")
buf = io.BytesIO(); img.save(buf, format='PNG'); buf.seek(0)
return StreamingResponse(buf, media_type="image/png")

108
app/schemas.py Normal file
View File

@@ -0,0 +1,108 @@
from pydantic import BaseModel, ConfigDict
from typing import Optional, List
from datetime import datetime
class TagBase(BaseModel):
name: str
class TagCreate(TagBase):
pass
class TagResponse(TagBase):
id: int
model_config = ConfigDict(from_attributes=True)
class ModelFileBase(BaseModel):
filename: str
file_type: str = 'stl'
part_name: Optional[str] = None
is_primary: bool = False
file_size: Optional[int] = None
class ModelFileResponse(ModelFileBase):
id: int
file_path: str
model_config = ConfigDict(from_attributes=True)
class RatingResponse(BaseModel):
id: int
stars: int
created_at: datetime
model_config = ConfigDict(from_attributes=True)
class CommentResponse(BaseModel):
id: int
author_name: Optional[str]
text: str
created_at: datetime
model_config = ConfigDict(from_attributes=True)
class CollectionBase(BaseModel):
name: str
description: Optional[str] = None
class CollectionCreate(CollectionBase):
pass
class CollectionResponse(CollectionBase):
id: int
created_at: datetime
model_count: int = 0
model_config = ConfigDict(from_attributes=True)
class CollectionDetailResponse(CollectionResponse):
models: List['Model3DResponse'] = []
model_config = ConfigDict(from_attributes=True)
class Model3DBase(BaseModel):
title: str
description: Optional[str] = None
author: Optional[str] = None
license: Optional[str] = None
category: Optional[str] = None
class Model3DCreate(Model3DBase):
tag_names: Optional[List[str]] = []
class Model3DUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
author: Optional[str] = None
license: Optional[str] = None
category: Optional[str] = None
tag_names: Optional[List[str]] = None
class Model3DResponse(Model3DBase):
id: int
filename: str
file_size: Optional[int]
file_hash: Optional[str]
width: Optional[float]
height: Optional[float]
depth: Optional[float]
faces: Optional[int]
created_at: datetime
thumbnail_path: Optional[str]
download_count: int
avg_rating: Optional[float] = None
tags: List[TagResponse] = []
files: List[ModelFileResponse] = []
ratings: List[RatingResponse] = []
comments: List[CommentResponse] = []
collections: List[CollectionResponse] = []
model_config = ConfigDict(from_attributes=True)

127
app/stl_parser.py Normal file
View File

@@ -0,0 +1,127 @@
import struct
import numpy as np
from PIL import Image, ImageDraw
import os
def parse_stl(file_path: str):
"""Parse an STL file (binary or ASCII) and return mesh data + metadata."""
with open(file_path, 'rb') as f:
header = f.read(80)
is_binary = False
if not header.startswith(b'solid'):
is_binary = True
else:
# Some binary files also start with 'solid', check further
with open(file_path, 'rb') as f:
f.read(80)
tri_count_bytes = f.read(4)
if len(tri_count_bytes) == 4:
tri_count = struct.unpack('<I', tri_count_bytes)[0]
file_size = os.path.getsize(file_path)
expected = 80 + 4 + tri_count * 50
if file_size == expected:
is_binary = True
if is_binary:
return _parse_binary(file_path)
else:
return _parse_ascii(file_path)
def _parse_binary(file_path: str):
with open(file_path, 'rb') as f:
f.read(80) # skip header
tri_count = struct.unpack('<I', f.read(4))[0]
vertices = []
for _ in range(tri_count):
f.read(12) # normal
v1 = struct.unpack('<3f', f.read(12))
v2 = struct.unpack('<3f', f.read(12))
v3 = struct.unpack('<3f', f.read(12))
f.read(2) # attribute byte count
vertices.extend([v1, v2, v3])
vertices = np.array(vertices, dtype=np.float32)
return _compute_metadata(vertices, tri_count)
def _parse_ascii(file_path: str):
vertices = []
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
for line in f:
line = line.strip().lower()
if line.startswith('vertex'):
parts = line.split()
if len(parts) >= 4:
v = [float(parts[1]), float(parts[2]), float(parts[3])]
vertices.append(v)
vertices = np.array(vertices, dtype=np.float32)
tri_count = len(vertices) // 3
return _compute_metadata(vertices, tri_count)
def _compute_metadata(vertices: np.ndarray, tri_count: int):
if len(vertices) == 0:
return {
'vertices': vertices,
'faces': 0,
'width': 0.0,
'height': 0.0,
'depth': 0.0,
}
min_v = vertices.min(axis=0)
max_v = vertices.max(axis=0)
dims = max_v - min_v
return {
'vertices': vertices,
'faces': tri_count,
'width': float(dims[0]),
'height': float(dims[1]),
'depth': float(dims[2]),
}
def generate_thumbnail(vertices: np.ndarray, output_path: str, size: int = 256):
"""Generate a simple orthographic thumbnail from vertices."""
if len(vertices) == 0:
img = Image.new('RGB', (size, size), color=(30, 30, 30))
img.save(output_path)
return
# Project to XY plane, normalize to image coords
min_v = vertices.min(axis=0)
max_v = vertices.max(axis=0)
dims = max_v - min_v
scale = max(dims[0], dims[1])
if scale == 0:
scale = 1.0
margin = 20
img_size = size - 2 * margin
img = Image.new('RGB', (size, size), color=(30, 30, 30))
draw = ImageDraw.Draw(img)
# Draw triangles
for i in range(0, len(vertices), 3):
tri = vertices[i:i+3]
pts = []
for v in tri:
x = margin + int(((v[0] - min_v[0]) / scale) * img_size)
y = margin + int(((1.0 - (v[1] - min_v[1]) / scale)) * img_size)
pts.append((x, y))
if len(pts) == 3:
# Simple shading based on Z
z_avg = sum(v[2] for v in tri) / 3.0
z_norm = (z_avg - min_v[2]) / (dims[2] if dims[2] > 0 else 1)
brightness = int(80 + z_norm * 120)
color = (brightness, brightness, int(brightness * 1.1))
draw.polygon(pts, fill=color, outline=(50, 50, 60))
img.save(output_path, 'PNG')