storage_monitor.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. """storage_monitor.py — 磁盘配额检查与后台定时任务。"""
  2. import asyncio
  3. import logging
  4. import shutil
  5. from pathlib import Path
  6. from app.config import settings
  7. logger = logging.getLogger(__name__)
  8. # ------------------------------------------------------------------ #
  9. # 磁盘统计工具
  10. # ------------------------------------------------------------------ #
  11. def _dir_size(path: Path) -> int:
  12. """递归计算目录占用字节数;目录不存在返回 0。"""
  13. if not path.exists():
  14. return 0
  15. return sum(f.stat().st_size for f in path.rglob("*") if f.is_file())
  16. def get_storage_info() -> dict:
  17. """
  18. 扫描 tmp/ 目录,返回存储统计信息:
  19. {
  20. disk_total_bytes, disk_used_bytes,
  21. tmp_total_bytes, quota_bytes, quota_exceeded,
  22. active_users, per_user_quota_bytes,
  23. users: [{ user_id, used_bytes, quota_exceeded }]
  24. }
  25. """
  26. tmp_path = Path(settings.temp_dir)
  27. disk = shutil.disk_usage(tmp_path if tmp_path.exists() else ".")
  28. disk_total = disk.total
  29. disk_used = disk.used
  30. quota_bytes = int(disk_total * settings.disk_quota_ratio)
  31. tmp_total = _dir_size(tmp_path)
  32. quota_exceeded = tmp_total > quota_bytes
  33. # 遍历用户子目录(tmp/{user_id}/)
  34. users: list[dict] = []
  35. if tmp_path.exists():
  36. for user_dir in sorted(tmp_path.iterdir()):
  37. if not user_dir.is_dir():
  38. continue
  39. used = _dir_size(user_dir)
  40. if used > 0:
  41. users.append({"user_id": user_dir.name, "used_bytes": used})
  42. active_users = len(users)
  43. per_user_quota = (quota_bytes // active_users) if active_users > 0 else quota_bytes
  44. for u in users:
  45. u["quota_exceeded"] = u["used_bytes"] > per_user_quota
  46. return {
  47. "disk_total_bytes": disk_total,
  48. "disk_used_bytes": disk_used,
  49. "tmp_total_bytes": tmp_total,
  50. "quota_bytes": quota_bytes,
  51. "quota_exceeded": quota_exceeded,
  52. "active_users": active_users,
  53. "per_user_quota_bytes": per_user_quota,
  54. "users": users,
  55. }
  56. # ------------------------------------------------------------------ #
  57. # 配额检查(导出后调用 / 定时任务共用)
  58. # ------------------------------------------------------------------ #
  59. def check_quota(user_id: str) -> str | None:
  60. """
  61. 检查存储配额,返回用户侧 warning 文本;无超限时返回 None。
  62. 同时将管理员级别超限情况写入日志。
  63. """
  64. info = get_storage_info()
  65. # 全局超限 → 记录管理员日志
  66. if info["quota_exceeded"]:
  67. logger.warning(
  68. "[存储告警] tmp/ 总占用 %d bytes 超过配额 %d bytes(磁盘 %.0f%%)",
  69. info["tmp_total_bytes"],
  70. info["quota_bytes"],
  71. settings.disk_quota_ratio * 100,
  72. )
  73. # 检查当前用户是否超个人配额
  74. user_entry = next((u for u in info["users"] if u["user_id"] == user_id), None)
  75. if user_entry and user_entry["quota_exceeded"]:
  76. logger.warning(
  77. "[存储告警] 用户 %s 占用 %d bytes 超过均分配额 %d bytes",
  78. user_id,
  79. user_entry["used_bytes"],
  80. info["per_user_quota_bytes"],
  81. )
  82. return "您的存储空间已超出限额,请删除旧文件释放空间"
  83. return None
  84. # ------------------------------------------------------------------ #
  85. # 后台定时任务
  86. # ------------------------------------------------------------------ #
  87. async def _periodic_check(interval_seconds: int = 1800) -> None:
  88. """每 interval_seconds 秒(默认 30 分钟)执行一次全量磁盘检查。"""
  89. while True:
  90. await asyncio.sleep(interval_seconds)
  91. try:
  92. info = get_storage_info()
  93. if info["quota_exceeded"]:
  94. logger.warning(
  95. "[定时检查] tmp/ 总占用 %d bytes 超过配额 %d bytes",
  96. info["tmp_total_bytes"],
  97. info["quota_bytes"],
  98. )
  99. for u in info["users"]:
  100. if u["quota_exceeded"]:
  101. logger.warning(
  102. "[定时检查] 用户 %s 占用 %d bytes 超过均分配额 %d bytes",
  103. u["user_id"],
  104. u["used_bytes"],
  105. info["per_user_quota_bytes"],
  106. )
  107. except Exception:
  108. logger.exception("[定时检查] 磁盘检查异常")
  109. def start_background_monitor() -> asyncio.Task:
  110. """在当前事件循环中启动后台定时任务,返回 Task 对象。"""
  111. return asyncio.create_task(_periodic_check())