| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 |
- from pathlib import Path
- from fastapi import APIRouter
- from fastapi.responses import FileResponse
- from app.config import settings
- from app.core.exceptions import DocumentNotFoundError, ExportError
- from app.schemas.export import ExportDocRequest, ExportDocResponse
- from app.services.export_service import export_doc
- router = APIRouter(tags=["Export"])
- def _ok(data: dict) -> dict:
- return {"code": 0, "message": "Success", "data": data}
- # ------------------------------------------------------------------ #
- # POST /export/doc 导出 .doc 并返回下载链接
- # ------------------------------------------------------------------ #
- @router.post("/export/doc", summary="导出 .doc 文件")
- async def export_document(body: ExportDocRequest) -> dict:
- # 若传入 documentId,按需建立 DB session 同步更新草稿
- if body.document_id:
- try:
- from app.core.database import AsyncSessionLocal
- from app.services.document_service import DocumentService
- from app.schemas.document import UpdateDocumentRequest
- async with AsyncSessionLocal() as db:
- svc = DocumentService(db)
- await svc.update_document(
- body.document_id,
- UpdateDocumentRequest(content=body.content),
- )
- except DocumentNotFoundError:
- pass # 文档不存在不阻塞导出
- except Exception:
- pass # DB 不可用时同样不阻塞导出
- result = await export_doc(
- file_name=body.file_name,
- content=body.content,
- style_id=body.style_id,
- )
- resp = ExportDocResponse(
- download_url=result["download_url"],
- file_name=result["file_name"],
- expires_at=result["expires_at"],
- style_id=result["style_id"],
- )
- return _ok(resp.model_dump(by_alias=True))
- # ------------------------------------------------------------------ #
- # GET /files/{filename} 临时文件下载端点
- # ------------------------------------------------------------------ #
- @router.get("/files/{filename}", summary="下载导出文件", include_in_schema=False)
- async def download_file(filename: str) -> FileResponse:
- # FastAPI 有时不会自动解码路径参数,手动解码一次确保中文还原
- from urllib.parse import unquote
- decoded = unquote(filename)
- safe = Path(decoded).name
- file_path = Path(settings.temp_dir) / safe
- if not file_path.exists():
- raise ExportError(f"文件不存在或已过期: {safe}")
- return FileResponse(
- path=str(file_path),
- filename=safe,
- media_type="application/msword",
- )
|