| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- """export_record_service.py — 导出记录 CRUD。"""
- from pathlib import Path
- from sqlalchemy import func, select
- from sqlalchemy.ext.asyncio import AsyncSession
- from app.core.exceptions import RecordNotFoundError
- from app.models.export_record import ExportRecord
- class ExportRecordService:
- def __init__(self, db: AsyncSession) -> None:
- self.db = db
- # ------------------------------------------------------------------ #
- # 写入
- # ------------------------------------------------------------------ #
- async def create_record(
- self,
- *,
- user_id: str,
- file_name: str,
- file_path: str,
- file_size: int,
- download_url: str,
- document_id: str | None,
- style_id: str,
- ) -> ExportRecord:
- record = ExportRecord(
- user_id=user_id,
- file_name=file_name,
- file_path=file_path,
- file_size=file_size,
- download_url=download_url,
- document_id=document_id,
- style_id=style_id,
- )
- self.db.add(record)
- await self.db.commit()
- await self.db.refresh(record)
- return record
- # ------------------------------------------------------------------ #
- # 查询列表
- # ------------------------------------------------------------------ #
- async def list_records(
- self,
- user_id: str,
- page: int = 1,
- page_size: int = 20,
- sort_order: str = "desc",
- ) -> tuple[list[ExportRecord], int]:
- """返回 (records, total)。"""
- base_q = select(ExportRecord).where(ExportRecord.user_id == user_id)
- # 总数
- count_q = select(func.count()).select_from(base_q.subquery())
- total: int = (await self.db.execute(count_q)).scalar_one()
- # 分页 + 排序
- order_col = (
- ExportRecord.created_at.desc()
- if sort_order.lower() != "asc"
- else ExportRecord.created_at.asc()
- )
- offset = (page - 1) * page_size
- result = await self.db.execute(
- base_q.order_by(order_col).offset(offset).limit(page_size)
- )
- records = list(result.scalars().all())
- return records, total
- # ------------------------------------------------------------------ #
- # 查询单条(校验归属)
- # ------------------------------------------------------------------ #
- async def get_record(self, record_id: str, user_id: str) -> ExportRecord:
- result = await self.db.execute(
- select(ExportRecord).where(
- ExportRecord.id == record_id,
- ExportRecord.user_id == user_id,
- )
- )
- record = result.scalar_one_or_none()
- if record is None:
- raise RecordNotFoundError(record_id)
- return record
- # ------------------------------------------------------------------ #
- # 硬删除
- # ------------------------------------------------------------------ #
- async def delete_record(self, record_id: str, user_id: str) -> None:
- """删除数据库记录并同步删除磁盘文件。"""
- record = await self.get_record(record_id, user_id)
- # 先删磁盘文件,文件不存在时静默忽略
- try:
- Path(record.file_path).unlink(missing_ok=True)
- except OSError:
- pass
- await self.db.delete(record)
- await self.db.commit()
|