| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- """storage_monitor.py — 磁盘配额检查与后台定时任务。"""
- import asyncio
- import logging
- import shutil
- from pathlib import Path
- from app.config import settings
- logger = logging.getLogger(__name__)
- # ------------------------------------------------------------------ #
- # 磁盘统计工具
- # ------------------------------------------------------------------ #
- def _dir_size(path: Path) -> int:
- """递归计算目录占用字节数;目录不存在返回 0。"""
- if not path.exists():
- return 0
- return sum(f.stat().st_size for f in path.rglob("*") if f.is_file())
- def get_storage_info() -> dict:
- """
- 扫描 tmp/ 目录,返回存储统计信息:
- {
- disk_total_bytes, disk_used_bytes,
- tmp_total_bytes, quota_bytes, quota_exceeded,
- active_users, per_user_quota_bytes,
- users: [{ user_id, used_bytes, quota_exceeded }]
- }
- """
- tmp_path = Path(settings.temp_dir)
- disk = shutil.disk_usage(tmp_path if tmp_path.exists() else ".")
- disk_total = disk.total
- disk_used = disk.used
- quota_bytes = int(disk_total * settings.disk_quota_ratio)
- tmp_total = _dir_size(tmp_path)
- quota_exceeded = tmp_total > quota_bytes
- # 遍历用户子目录(tmp/{user_id}/)
- users: list[dict] = []
- if tmp_path.exists():
- for user_dir in sorted(tmp_path.iterdir()):
- if not user_dir.is_dir():
- continue
- used = _dir_size(user_dir)
- if used > 0:
- users.append({"user_id": user_dir.name, "used_bytes": used})
- active_users = len(users)
- per_user_quota = (quota_bytes // active_users) if active_users > 0 else quota_bytes
- for u in users:
- u["quota_exceeded"] = u["used_bytes"] > per_user_quota
- return {
- "disk_total_bytes": disk_total,
- "disk_used_bytes": disk_used,
- "tmp_total_bytes": tmp_total,
- "quota_bytes": quota_bytes,
- "quota_exceeded": quota_exceeded,
- "active_users": active_users,
- "per_user_quota_bytes": per_user_quota,
- "users": users,
- }
- # ------------------------------------------------------------------ #
- # 配额检查(导出后调用 / 定时任务共用)
- # ------------------------------------------------------------------ #
- def check_quota(user_id: str) -> str | None:
- """
- 检查存储配额,返回用户侧 warning 文本;无超限时返回 None。
- 同时将管理员级别超限情况写入日志。
- """
- info = get_storage_info()
- # 全局超限 → 记录管理员日志
- if info["quota_exceeded"]:
- logger.warning(
- "[存储告警] tmp/ 总占用 %d bytes 超过配额 %d bytes(磁盘 %.0f%%)",
- info["tmp_total_bytes"],
- info["quota_bytes"],
- settings.disk_quota_ratio * 100,
- )
- # 检查当前用户是否超个人配额
- user_entry = next((u for u in info["users"] if u["user_id"] == user_id), None)
- if user_entry and user_entry["quota_exceeded"]:
- logger.warning(
- "[存储告警] 用户 %s 占用 %d bytes 超过均分配额 %d bytes",
- user_id,
- user_entry["used_bytes"],
- info["per_user_quota_bytes"],
- )
- return "您的存储空间已超出限额,请删除旧文件释放空间"
- return None
- # ------------------------------------------------------------------ #
- # 后台定时任务
- # ------------------------------------------------------------------ #
- async def _periodic_check(interval_seconds: int = 1800) -> None:
- """每 interval_seconds 秒(默认 30 分钟)执行一次全量磁盘检查。"""
- while True:
- await asyncio.sleep(interval_seconds)
- try:
- info = get_storage_info()
- if info["quota_exceeded"]:
- logger.warning(
- "[定时检查] tmp/ 总占用 %d bytes 超过配额 %d bytes",
- info["tmp_total_bytes"],
- info["quota_bytes"],
- )
- for u in info["users"]:
- if u["quota_exceeded"]:
- logger.warning(
- "[定时检查] 用户 %s 占用 %d bytes 超过均分配额 %d bytes",
- u["user_id"],
- u["used_bytes"],
- info["per_user_quota_bytes"],
- )
- except Exception:
- logger.exception("[定时检查] 磁盘检查异常")
- def start_background_monitor() -> asyncio.Task:
- """在当前事件循环中启动后台定时任务,返回 Task 对象。"""
- return asyncio.create_task(_periodic_check())
|