"""storage_monitor.py — 磁盘配额检查与后台定时任务。""" import asyncio import logging import shutil from pathlib import Path from app.config import settings logger = logging.getLogger(__name__) # ------------------------------------------------------------------ # # 磁盘统计工具 # ------------------------------------------------------------------ # def _dir_size(path: Path) -> int: """递归计算目录占用字节数;目录不存在返回 0。""" if not path.exists(): return 0 return sum(f.stat().st_size for f in path.rglob("*") if f.is_file()) def get_storage_info() -> dict: """ 扫描 tmp/ 目录,返回存储统计信息: { disk_total_bytes, disk_used_bytes, tmp_total_bytes, quota_bytes, quota_exceeded, active_users, per_user_quota_bytes, users: [{ user_id, used_bytes, quota_exceeded }] } """ tmp_path = Path(settings.temp_dir) disk = shutil.disk_usage(tmp_path if tmp_path.exists() else ".") disk_total = disk.total disk_used = disk.used quota_bytes = int(disk_total * settings.disk_quota_ratio) tmp_total = _dir_size(tmp_path) quota_exceeded = tmp_total > quota_bytes # 遍历用户子目录(tmp/{user_id}/) users: list[dict] = [] if tmp_path.exists(): for user_dir in sorted(tmp_path.iterdir()): if not user_dir.is_dir(): continue used = _dir_size(user_dir) if used > 0: users.append({"user_id": user_dir.name, "used_bytes": used}) active_users = len(users) per_user_quota = (quota_bytes // active_users) if active_users > 0 else quota_bytes for u in users: u["quota_exceeded"] = u["used_bytes"] > per_user_quota return { "disk_total_bytes": disk_total, "disk_used_bytes": disk_used, "tmp_total_bytes": tmp_total, "quota_bytes": quota_bytes, "quota_exceeded": quota_exceeded, "active_users": active_users, "per_user_quota_bytes": per_user_quota, "users": users, } # ------------------------------------------------------------------ # # 配额检查(导出后调用 / 定时任务共用) # ------------------------------------------------------------------ # def check_quota(user_id: str) -> str | None: """ 检查存储配额,返回用户侧 warning 文本;无超限时返回 None。 同时将管理员级别超限情况写入日志。 """ info = get_storage_info() # 全局超限 → 记录管理员日志 if info["quota_exceeded"]: logger.warning( "[存储告警] tmp/ 总占用 %d bytes 超过配额 %d bytes(磁盘 %.0f%%)", info["tmp_total_bytes"], info["quota_bytes"], settings.disk_quota_ratio * 100, ) # 检查当前用户是否超个人配额 user_entry = next((u for u in info["users"] if u["user_id"] == user_id), None) if user_entry and user_entry["quota_exceeded"]: logger.warning( "[存储告警] 用户 %s 占用 %d bytes 超过均分配额 %d bytes", user_id, user_entry["used_bytes"], info["per_user_quota_bytes"], ) return "您的存储空间已超出限额,请删除旧文件释放空间" return None # ------------------------------------------------------------------ # # 后台定时任务 # ------------------------------------------------------------------ # async def _periodic_check(interval_seconds: int = 1800) -> None: """每 interval_seconds 秒(默认 30 分钟)执行一次全量磁盘检查。""" while True: await asyncio.sleep(interval_seconds) try: info = get_storage_info() if info["quota_exceeded"]: logger.warning( "[定时检查] tmp/ 总占用 %d bytes 超过配额 %d bytes", info["tmp_total_bytes"], info["quota_bytes"], ) for u in info["users"]: if u["quota_exceeded"]: logger.warning( "[定时检查] 用户 %s 占用 %d bytes 超过均分配额 %d bytes", u["user_id"], u["used_bytes"], info["per_user_quota_bytes"], ) except Exception: logger.exception("[定时检查] 磁盘检查异常") def start_background_monitor() -> asyncio.Task: """在当前事件循环中启动后台定时任务,返回 Task 对象。""" return asyncio.create_task(_periodic_check())