"""export_record_service.py — 导出记录 CRUD。""" from pathlib import Path from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from app.core.exceptions import RecordNotFoundError from app.models.export_record import ExportRecord class ExportRecordService: def __init__(self, db: AsyncSession) -> None: self.db = db # ------------------------------------------------------------------ # # 写入 # ------------------------------------------------------------------ # async def create_record( self, *, user_id: str, file_name: str, file_path: str, file_size: int, download_url: str, document_id: str | None, style_id: str, ) -> ExportRecord: record = ExportRecord( user_id=user_id, file_name=file_name, file_path=file_path, file_size=file_size, download_url=download_url, document_id=document_id, style_id=style_id, ) self.db.add(record) await self.db.commit() await self.db.refresh(record) return record # ------------------------------------------------------------------ # # 查询列表 # ------------------------------------------------------------------ # async def list_records( self, user_id: str, page: int = 1, page_size: int = 20, sort_order: str = "desc", ) -> tuple[list[ExportRecord], int]: """返回 (records, total)。""" base_q = select(ExportRecord).where(ExportRecord.user_id == user_id) # 总数 count_q = select(func.count()).select_from(base_q.subquery()) total: int = (await self.db.execute(count_q)).scalar_one() # 分页 + 排序 order_col = ( ExportRecord.created_at.desc() if sort_order.lower() != "asc" else ExportRecord.created_at.asc() ) offset = (page - 1) * page_size result = await self.db.execute( base_q.order_by(order_col).offset(offset).limit(page_size) ) records = list(result.scalars().all()) return records, total # ------------------------------------------------------------------ # # 查询单条(校验归属) # ------------------------------------------------------------------ # async def get_record(self, record_id: str, user_id: str) -> ExportRecord: result = await self.db.execute( select(ExportRecord).where( ExportRecord.id == record_id, ExportRecord.user_id == user_id, ) ) record = result.scalar_one_or_none() if record is None: raise RecordNotFoundError(record_id) return record # ------------------------------------------------------------------ # # 硬删除 # ------------------------------------------------------------------ # async def delete_record(self, record_id: str, user_id: str) -> None: """删除数据库记录并同步删除磁盘文件。""" record = await self.get_record(record_id, user_id) # 先删磁盘文件,文件不存在时静默忽略 try: Path(record.file_path).unlink(missing_ok=True) except OSError: pass await self.db.delete(record) await self.db.commit()