export_service.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488
  1. """export_service.py — 将 Markdown 内容转换为 .docx 文件并返回可下载的临时链接。"""
  2. import io
  3. import json
  4. import secrets
  5. import unicodedata
  6. from pathlib import Path
  7. from typing import Optional
  8. import mistune
  9. from docx import Document
  10. from docx.oxml import OxmlElement
  11. from docx.oxml.ns import qn
  12. from docx.shared import Pt, RGBColor
  13. from lxml import etree
  14. from app.config import settings
  15. from app.core.exceptions import ExportError
  16. # ------------------------------------------------------------------ #
  17. # 样式文件加载
  18. # ------------------------------------------------------------------ #
  19. def load_style_file(style_id: Optional[str] = None) -> dict:
  20. """加载样式 JSON 文件并返回解析后的字典;style_id=None 时加载默认样式文件。"""
  21. if style_id is None:
  22. path = Path(settings.default_style_file)
  23. if not path.exists():
  24. raise ExportError(f"默认样式文件不存在: {path}")
  25. else:
  26. # 阶段 1 占位:届时改为从数据库查路径
  27. # path = style_repo.get_file_path(style_id)
  28. raise ExportError(f"样式 ID 暂不支持: {style_id}(阶段 1 功能)")
  29. try:
  30. with open(path, encoding="utf-8") as f:
  31. return json.load(f)
  32. except (OSError, json.JSONDecodeError) as exc:
  33. raise ExportError(f"样式文件解析失败: {exc}") from exc
  34. def build_style_map(style_data: dict) -> dict[str, dict]:
  35. """将样式列表转为双键映射(style_id 和 name 均可命中)。"""
  36. mapping: dict[str, dict] = {}
  37. for s in style_data.get("styles", []):
  38. if s.get("style_id"):
  39. mapping[s["style_id"]] = s
  40. if s.get("name"):
  41. mapping[s["name"]] = s
  42. return mapping
  43. # ------------------------------------------------------------------ #
  44. # JSON ↔ lxml 互转
  45. # ------------------------------------------------------------------ #
  46. def dict_to_element(d: dict) -> etree._Element:
  47. """将 element_to_dict() 产生的字典还原为 lxml Element(styles.py 的逆操作)。"""
  48. tag = d["@tag"]
  49. attrib = {k: v for k, v in d.get("@attrib", {}).items()}
  50. elem = etree.Element(tag, attrib=attrib)
  51. if d.get("#text"):
  52. elem.text = d["#text"]
  53. if d.get("#tail"):
  54. elem.tail = d["#tail"]
  55. for child_tag, child_val in d.get("@children", {}).items():
  56. # 同标签多子节点时存为列表
  57. items = child_val if isinstance(child_val, list) else [child_val]
  58. for item in items:
  59. if isinstance(item, dict):
  60. child_elem = dict_to_element(item)
  61. elem.append(child_elem)
  62. return elem
  63. def inject_styles_from_json(doc: Document, style_data: dict) -> None:
  64. """将 JSON 中所有样式的 full_xml_definition 以 upsert 方式注入到文档的 <w:styles> 节点,无定义的条目跳过。"""
  65. styles_element = doc.styles.element # <w:styles> 根节点
  66. for style_entry in style_data.get("styles", []):
  67. xml_def = style_entry.get("full_xml_definition")
  68. if not xml_def:
  69. continue
  70. # 从字典重建 lxml element
  71. try:
  72. new_elem = dict_to_element(xml_def)
  73. except Exception:
  74. continue # 单条解析失败不中断整体
  75. # 取出 w:styleId 属性,用于查找并移除旧节点
  76. style_id_key = qn("w:styleId")
  77. new_style_id = new_elem.get(style_id_key)
  78. if new_style_id:
  79. existing = styles_element.find(
  80. f'.//{qn("w:style")}[@{qn("w:styleId")}="{new_style_id}"]'
  81. )
  82. if existing is not None:
  83. styles_element.remove(existing)
  84. styles_element.append(new_elem)
  85. # ------------------------------------------------------------------ #
  86. # 样式 ID 查找辅助
  87. # ------------------------------------------------------------------ #
  88. def _resolve_style_id(style_map: dict, *lookup_keys: str) -> Optional[str]:
  89. """按优先级依次查找多个候选 key,返回第一个命中条目的 style_id,用于将语义名称映射到 JSON 中实际的 w:styleId。"""
  90. for key in lookup_keys:
  91. entry = style_map.get(key)
  92. if entry and entry.get("style_id"):
  93. return entry["style_id"]
  94. return None
  95. # ------------------------------------------------------------------ #
  96. # Markdown → python-docx 渲染器
  97. # ------------------------------------------------------------------ #
  98. class DocxRenderer(mistune.BaseRenderer):
  99. """将 mistune AST token 流渲染到 python-docx Document 对象。"""
  100. # Word 内置标题样式的英文名(用于查找 style_map 时的候选 key)
  101. _HEADING_ALIASES = {
  102. 1: ["Heading 1", "heading 1"],
  103. 2: ["Heading 2", "heading 2"],
  104. 3: ["Heading 3", "heading 3"],
  105. 4: ["Heading 4", "heading 4"],
  106. 5: ["Heading 5", "heading 5"],
  107. 6: ["Heading 6", "heading 6"],
  108. }
  109. def __init__(self, style_map: dict, style_data: dict) -> None:
  110. """初始化渲染器,注入完整样式定义并缓存常用样式 ID。"""
  111. super().__init__()
  112. self.style_map = style_map
  113. self.doc = Document()
  114. # 用 full_xml_definition 整体替换文档样式表,一次性应用所有格式
  115. inject_styles_from_json(self.doc, style_data)
  116. # 缓存 Normal 的 style_id,段落渲染时使用
  117. self._normal_id: Optional[str] = _resolve_style_id(
  118. style_map, "Normal", "1"
  119. )
  120. # ------ 块级元素 ------ #
  121. def heading(self, token: dict, state: mistune.core.BlockState) -> str:
  122. # # ## ### → 从 style_map 解析对应样式 ID;找不到则回退到 python-docx 内置标题
  123. level = token["attrs"]["level"]
  124. children = token.get("children", [])
  125. text = self._extract_text(children)
  126. aliases = self._HEADING_ALIASES.get(level, [f"Heading {level}", f"heading {level}"])
  127. heading_style_id = _resolve_style_id(self.style_map, *aliases)
  128. if heading_style_id:
  129. # 用 w:styleId 直接引用已注入的样式,不再手动设颜色/字号
  130. para = self.doc.add_paragraph(text)
  131. try:
  132. para.style = self._get_style_by_id(heading_style_id)
  133. except KeyError:
  134. pass # 样式注入失败时保持默认样式,不中断渲染
  135. else:
  136. para = self.doc.add_heading(text, level=level)
  137. return ""
  138. def paragraph(self, token: dict, state: mistune.core.BlockState) -> str:
  139. # 普通段落,内联样式由 _render_inline_children 处理
  140. children = token.get("children", [])
  141. p = self.doc.add_paragraph()
  142. if self._normal_id:
  143. try:
  144. p.style = self._get_style_by_id(self._normal_id)
  145. except Exception:
  146. pass
  147. self._render_inline_children(p, children)
  148. return ""
  149. def blank_line(self, token: dict, state: mistune.core.BlockState) -> str:
  150. # 空行不插入任何内容,避免多余空段落
  151. return ""
  152. def thematic_break(self, token: dict, state: mistune.core.BlockState) -> str:
  153. # --- 分隔线:通过 XML 段落底部边框实现
  154. p = self.doc.add_paragraph()
  155. pPr = p._p.get_or_add_pPr()
  156. pBdr = OxmlElement("w:pBdr")
  157. bottom = OxmlElement("w:bottom")
  158. bottom.set(qn("w:val"), "single")
  159. bottom.set(qn("w:sz"), "6")
  160. bottom.set(qn("w:space"), "1")
  161. bottom.set(qn("w:color"), "auto")
  162. pBdr.append(bottom)
  163. pPr.append(pBdr)
  164. return ""
  165. def block_quote(self, token: dict, state: mistune.core.BlockState) -> str:
  166. # > 引用块 → Quote 样式(缩进+斜体)
  167. children = token.get("children", [])
  168. for child in children:
  169. text = self._extract_text(child.get("children", []))
  170. quote_id = _resolve_style_id(self.style_map, "Quote", "Quote Char")
  171. if quote_id:
  172. p = self.doc.add_paragraph(text)
  173. try:
  174. p.style = self._get_style_by_id(quote_id)
  175. except Exception:
  176. p.style = "Quote"
  177. else:
  178. p = self.doc.add_paragraph(style="Quote")
  179. p.add_run(text)
  180. return ""
  181. def block_code(self, token: dict, state: mistune.core.BlockState) -> str:
  182. # 代码块 → No Spacing 样式,Courier New 10pt 深灰色(样式库通常无代码块样式,保持硬编码)
  183. code = token.get("raw", "")
  184. p = self.doc.add_paragraph(style="No Spacing")
  185. run = p.add_run(code)
  186. run.font.name = "Courier New"
  187. run.font.size = Pt(10)
  188. run.font.color.rgb = RGBColor(0x33, 0x33, 0x33)
  189. return ""
  190. def list(self, token: dict, state: mistune.core.BlockState) -> str:
  191. # 列表入口,判断有序/无序和嵌套深度,委托 _render_list_items 处理
  192. ordered = token["attrs"].get("ordered", False)
  193. depth = token["attrs"].get("depth", 1)
  194. self._render_list_items(token.get("children", []), ordered, depth)
  195. return ""
  196. def _render_list_items(self, items: list, ordered: bool, depth: int) -> None:
  197. # 递归处理列表项;嵌套子列表 depth+1,对应 List Bullet/Number 2 样式
  198. for item in items:
  199. children = item.get("children", [])
  200. for child in children:
  201. if child["type"] == "list":
  202. sub_ordered = child["attrs"].get("ordered", False)
  203. self._render_list_items(
  204. child.get("children", []), sub_ordered, depth + 1
  205. )
  206. else:
  207. text = self._extract_text(child.get("children", [child]))
  208. if ordered:
  209. style = "List Number" if depth == 1 else "List Number 2"
  210. else:
  211. style = "List Bullet" if depth == 1 else "List Bullet 2"
  212. self.doc.add_paragraph(text, style=style)
  213. def table(self, token: dict, state: mistune.core.BlockState) -> str:
  214. # 表格渲染:table_head 直接含 table_cell;table_body 含 table_row → table_cell
  215. children = token.get("children", [])
  216. if not children:
  217. return ""
  218. head_token = next((c for c in children if c["type"] == "table_head"), None)
  219. body_token = next((c for c in children if c["type"] == "table_body"), None)
  220. head_cells = head_token.get("children", []) if head_token else []
  221. body_rows: list[list[dict]] = []
  222. if body_token:
  223. for row in body_token.get("children", []):
  224. if row["type"] == "table_row":
  225. body_rows.append(row.get("children", []))
  226. cols = len(head_cells)
  227. if cols == 0:
  228. return ""
  229. tbl = self.doc.add_table(rows=1 + len(body_rows), cols=cols)
  230. tbl.style = "Table Grid"
  231. # 表头行加粗
  232. for c, cell_token in enumerate(head_cells):
  233. text = self._extract_text(cell_token.get("children", []))
  234. cell = tbl.rows[0].cells[c]
  235. cell.text = text
  236. for para in cell.paragraphs:
  237. for run in para.runs:
  238. run.bold = True
  239. # 数据行
  240. for r, row_cells in enumerate(body_rows):
  241. for c, cell_token in enumerate(row_cells):
  242. if c >= cols:
  243. break
  244. text = self._extract_text(cell_token.get("children", []))
  245. tbl.rows[r + 1].cells[c].text = text
  246. return ""
  247. # ------ 内联样式 ------ #
  248. def _render_inline_children(self, paragraph, children: list) -> None:
  249. # 遍历子节点,按类型设置 run 样式:粗体/斜体/删除线/行内代码/换行
  250. for child in children:
  251. ctype = child.get("type", "")
  252. raw = child.get("raw", "")
  253. if ctype == "text":
  254. paragraph.add_run(raw)
  255. elif ctype == "strong":
  256. run = paragraph.add_run(self._extract_text(child.get("children", [])))
  257. run.bold = True
  258. elif ctype == "emphasis":
  259. run = paragraph.add_run(self._extract_text(child.get("children", [])))
  260. run.italic = True
  261. elif ctype == "strikethrough":
  262. run = paragraph.add_run(self._extract_text(child.get("children", [])))
  263. run.font.strike = True
  264. elif ctype == "codespan":
  265. run = paragraph.add_run(raw)
  266. run.font.name = "Courier New"
  267. run.font.size = Pt(10)
  268. elif ctype == "linebreak":
  269. paragraph.add_run().add_break()
  270. elif ctype == "softlinebreak":
  271. paragraph.add_run(" ")
  272. else:
  273. # 未知类型:递归子节点或直接输出文本兜底
  274. sub = child.get("children")
  275. if sub:
  276. self._render_inline_children(paragraph, sub)
  277. elif raw:
  278. paragraph.add_run(raw)
  279. # ------ 工具方法 ------ #
  280. def _get_style_by_id(self, style_id: str):
  281. """通过 w:styleId 从文档样式集中查找 python-docx Style 对象。"""
  282. for style in self.doc.styles:
  283. if style.style_id == style_id:
  284. return style
  285. raise KeyError(f"样式 ID 不存在: {style_id}")
  286. @staticmethod
  287. def _extract_text(children: list) -> str:
  288. # 递归提取纯文本,用于标题/列表/表格等不需要内联样式的场景
  289. parts: list[str] = []
  290. for child in children:
  291. if isinstance(child, dict):
  292. raw = child.get("raw", "")
  293. sub = child.get("children")
  294. if raw:
  295. parts.append(raw)
  296. if sub:
  297. parts.append(DocxRenderer._extract_text(sub))
  298. return "".join(parts)
  299. def render_token(self, token: dict, state: mistune.core.BlockState) -> str:
  300. # mistune 分发入口:按 token type 找对应方法,找不到则递归子节点兜底
  301. ttype = token["type"]
  302. func = getattr(self, ttype, None)
  303. if func:
  304. return func(token, state)
  305. children = token.get("children")
  306. if children:
  307. for child in children:
  308. self.render_token(child, state)
  309. return ""
  310. def render_children(self, token: dict, state: mistune.core.BlockState) -> str:
  311. # 遍历并渲染一个 token 的所有子节点
  312. for child in token.get("children", []):
  313. self.render_token(child, state)
  314. return ""
  315. def __call__(self, tokens: list, state: mistune.core.BlockState) -> str:
  316. # 渲染器被调用时的入口,遍历顶层 token 列表
  317. for token in tokens:
  318. self.render_token(token, state)
  319. return ""
  320. # ------------------------------------------------------------------ #
  321. # 公共接口
  322. # ------------------------------------------------------------------ #
  323. def markdown_to_docx_bytes(content: str, style_map: dict, style_data: dict) -> bytes:
  324. """Markdown 字符串转 .docx 字节流,全程内存操作不落盘。"""
  325. renderer = DocxRenderer(style_map=style_map, style_data=style_data)
  326. # 必须显式启用插件:mistune 默认不解析表格和删除线
  327. md = mistune.create_markdown(
  328. renderer=renderer,
  329. plugins=["table", "strikethrough", "url"],
  330. )
  331. md(content)
  332. buf = io.BytesIO()
  333. renderer.doc.save(buf)
  334. return buf.getvalue()
  335. def _safe_filename(name: str) -> str:
  336. """过滤文件名非法字符,规范化全角字符,返回安全的文件名。"""
  337. name = unicodedata.normalize("NFKC", name)
  338. illegal = r'\/:*?"<>|'
  339. for ch in illegal:
  340. name = name.replace(ch, "_")
  341. return name.strip() or "document"
  342. async def export_doc(
  343. user_id: str,
  344. file_name: str,
  345. content: str,
  346. style_id: Optional[str] = None,
  347. document_id: Optional[str] = None,
  348. ) -> dict:
  349. """导出入口:加载样式、生成 .docx,按用户/日期分区写入,写入 export_records 记录,返回 { record_id, download_url, file_name, style_id, warning }。"""
  350. from datetime import date
  351. # 1. 加载样式文件,构建映射
  352. style_data = load_style_file(style_id)
  353. style_map = build_style_map(style_data)
  354. actual_style_id = style_id or "default"
  355. # 2. Markdown → docx 字节流
  356. try:
  357. doc_bytes = markdown_to_docx_bytes(content, style_map, style_data)
  358. except Exception as exc:
  359. raise ExportError(f"Markdown 转换失败: {exc}") from exc
  360. # 3. 按 ./tmp/{user_id}/{YYYY-MM-DD}/ 分区写入
  361. safe_name = _safe_filename(file_name)
  362. token = secrets.token_urlsafe(8)
  363. final_name = f"{safe_name}_{token}.doc"
  364. today = date.today().strftime("%Y-%m-%d")
  365. user_dir = Path(settings.temp_dir) / user_id / today
  366. user_dir.mkdir(parents=True, exist_ok=True)
  367. file_path = user_dir / final_name
  368. try:
  369. file_path.write_bytes(doc_bytes)
  370. except OSError as exc:
  371. raise ExportError(f"文件写入失败: {exc}") from exc
  372. file_size = file_path.stat().st_size
  373. # 4. 生成永久下载链接(无过期时间)
  374. download_url = (
  375. f"{settings.base_url.rstrip('/')}/api/v1/export/records"
  376. # 占位,record_id 写入 DB 后拼接
  377. )
  378. # 5. 写入 export_records 数据库记录
  379. from app.core.database import AsyncSessionLocal
  380. from app.services.export_record_service import ExportRecordService
  381. async with AsyncSessionLocal() as db:
  382. svc = ExportRecordService(db)
  383. record = await svc.create_record(
  384. user_id=user_id,
  385. file_name=final_name,
  386. file_path=str(file_path),
  387. file_size=file_size,
  388. download_url="", # 先占位,下面用 record_id 补全
  389. document_id=document_id,
  390. style_id=actual_style_id,
  391. )
  392. # 用真实 record_id 拼接永久链接并回写(含 userId,前端直接使用无需再拼)
  393. record_id = record.id
  394. download_url = (
  395. f"{settings.base_url.rstrip('/')}/api/v1/export/records/{record_id}/download"
  396. f"?userId={user_id}"
  397. )
  398. record.download_url = download_url
  399. await db.commit()
  400. # 6. 导出后检查磁盘配额
  401. from app.services.storage_monitor import check_quota
  402. warning = check_quota(user_id)
  403. return {
  404. "record_id": record_id,
  405. "download_url": download_url,
  406. "file_name": final_name,
  407. "style_id": actual_style_id,
  408. "warning": warning,
  409. }