"""export_service.py — 将 Markdown 内容转换为 .docx 文件并返回可下载的临时链接。""" import io import json import secrets import time import unicodedata import urllib.parse from pathlib import Path from typing import Optional import mistune from docx import Document from docx.oxml import OxmlElement from docx.oxml.ns import qn from docx.shared import Pt, RGBColor from lxml import etree from app.config import settings from app.core.exceptions import ExportError # ------------------------------------------------------------------ # # 样式文件加载 # ------------------------------------------------------------------ # def load_style_file(style_id: Optional[str] = None) -> dict: """加载样式 JSON 文件并返回解析后的字典;style_id=None 时加载默认样式文件。""" if style_id is None: path = Path(settings.default_style_file) if not path.exists(): raise ExportError(f"默认样式文件不存在: {path}") else: # 阶段 1 占位:届时改为从数据库查路径 # path = style_repo.get_file_path(style_id) raise ExportError(f"样式 ID 暂不支持: {style_id}(阶段 1 功能)") try: with open(path, encoding="utf-8") as f: return json.load(f) except (OSError, json.JSONDecodeError) as exc: raise ExportError(f"样式文件解析失败: {exc}") from exc def build_style_map(style_data: dict) -> dict[str, dict]: """将样式列表转为双键映射(style_id 和 name 均可命中)。""" mapping: dict[str, dict] = {} for s in style_data.get("styles", []): if s.get("style_id"): mapping[s["style_id"]] = s if s.get("name"): mapping[s["name"]] = s return mapping # ------------------------------------------------------------------ # # JSON ↔ lxml 互转 # ------------------------------------------------------------------ # def dict_to_element(d: dict) -> etree._Element: """将 element_to_dict() 产生的字典还原为 lxml Element(styles.py 的逆操作)。""" tag = d["@tag"] attrib = {k: v for k, v in d.get("@attrib", {}).items()} elem = etree.Element(tag, attrib=attrib) if d.get("#text"): elem.text = d["#text"] if d.get("#tail"): elem.tail = d["#tail"] for child_tag, child_val in d.get("@children", {}).items(): # 同标签多子节点时存为列表 items = child_val if isinstance(child_val, list) else [child_val] for item in items: if isinstance(item, dict): child_elem = dict_to_element(item) elem.append(child_elem) return elem def inject_styles_from_json(doc: Document, style_data: dict) -> None: """ 将 JSON 中所有样式的 full_xml_definition 注入到文档的 节点。 已存在相同 w:styleId 的样式先移除再插入(upsert 语义),确保完整替换。 没有 full_xml_definition 的条目跳过。 """ styles_element = doc.styles.element # 根节点 for style_entry in style_data.get("styles", []): xml_def = style_entry.get("full_xml_definition") if not xml_def: continue # 从字典重建 lxml element try: new_elem = dict_to_element(xml_def) except Exception: continue # 单条解析失败不中断整体 # 取出 w:styleId 属性,用于查找并移除旧节点 style_id_key = qn("w:styleId") new_style_id = new_elem.get(style_id_key) if new_style_id: existing = styles_element.find( f'.//{qn("w:style")}[@{qn("w:styleId")}="{new_style_id}"]' ) if existing is not None: styles_element.remove(existing) styles_element.append(new_elem) # ------------------------------------------------------------------ # # 样式 ID 查找辅助 # ------------------------------------------------------------------ # def _resolve_style_id(style_map: dict, *lookup_keys: str) -> Optional[str]: """ 按优先级依次查找多个候选 key,返回第一个命中条目的 style_id。 用于将语义名称(如 "Normal"、"Heading 1")映射到 JSON 中实际的 w:styleId。 """ for key in lookup_keys: entry = style_map.get(key) if entry and entry.get("style_id"): return entry["style_id"] return None # ------------------------------------------------------------------ # # Markdown → python-docx 渲染器 # ------------------------------------------------------------------ # class DocxRenderer(mistune.BaseRenderer): """将 mistune AST token 流渲染到 python-docx Document 对象。""" # Word 内置标题样式的英文名(用于查找 style_map 时的候选 key) _HEADING_ALIASES = { 1: ["Heading 1", "heading 1"], 2: ["Heading 2", "heading 2"], 3: ["Heading 3", "heading 3"], 4: ["Heading 4", "heading 4"], 5: ["Heading 5", "heading 5"], 6: ["Heading 6", "heading 6"], } def __init__(self, style_map: dict, style_data: dict) -> None: """初始化渲染器,注入完整样式定义并缓存常用样式 ID。""" super().__init__() self.style_map = style_map self.doc = Document() # 用 full_xml_definition 整体替换文档样式表,一次性应用所有格式 inject_styles_from_json(self.doc, style_data) # 缓存 Normal 的 style_id,段落渲染时使用 self._normal_id: Optional[str] = _resolve_style_id( style_map, "Normal", "1" ) # ------ 块级元素 ------ # def heading(self, token: dict, state: mistune.core.BlockState) -> str: # # ## ### → 从 style_map 解析对应样式 ID;找不到则回退到 python-docx 内置标题 level = token["attrs"]["level"] children = token.get("children", []) text = self._extract_text(children) aliases = self._HEADING_ALIASES.get(level, [f"Heading {level}", f"heading {level}"]) heading_style_id = _resolve_style_id(self.style_map, *aliases) if heading_style_id: # 用 w:styleId 直接引用已注入的样式,不再手动设颜色/字号 para = self.doc.add_paragraph(text) try: para.style = self._get_style_by_id(heading_style_id) except KeyError: pass # 样式注入失败时保持默认样式,不中断渲染 else: para = self.doc.add_heading(text, level=level) return "" def paragraph(self, token: dict, state: mistune.core.BlockState) -> str: # 普通段落,内联样式由 _render_inline_children 处理 children = token.get("children", []) p = self.doc.add_paragraph() if self._normal_id: try: p.style = self._get_style_by_id(self._normal_id) except Exception: pass self._render_inline_children(p, children) return "" def blank_line(self, token: dict, state: mistune.core.BlockState) -> str: # 空行不插入任何内容,避免多余空段落 return "" def thematic_break(self, token: dict, state: mistune.core.BlockState) -> str: # --- 分隔线:通过 XML 段落底部边框实现 p = self.doc.add_paragraph() pPr = p._p.get_or_add_pPr() pBdr = OxmlElement("w:pBdr") bottom = OxmlElement("w:bottom") bottom.set(qn("w:val"), "single") bottom.set(qn("w:sz"), "6") bottom.set(qn("w:space"), "1") bottom.set(qn("w:color"), "auto") pBdr.append(bottom) pPr.append(pBdr) return "" def block_quote(self, token: dict, state: mistune.core.BlockState) -> str: # > 引用块 → Quote 样式(缩进+斜体) children = token.get("children", []) for child in children: text = self._extract_text(child.get("children", [])) quote_id = _resolve_style_id(self.style_map, "Quote", "Quote Char") if quote_id: p = self.doc.add_paragraph(text) try: p.style = self._get_style_by_id(quote_id) except Exception: p.style = "Quote" else: p = self.doc.add_paragraph(style="Quote") p.add_run(text) return "" def block_code(self, token: dict, state: mistune.core.BlockState) -> str: # 代码块 → No Spacing 样式,Courier New 10pt 深灰色(样式库通常无代码块样式,保持硬编码) code = token.get("raw", "") p = self.doc.add_paragraph(style="No Spacing") run = p.add_run(code) run.font.name = "Courier New" run.font.size = Pt(10) run.font.color.rgb = RGBColor(0x33, 0x33, 0x33) return "" def list(self, token: dict, state: mistune.core.BlockState) -> str: # 列表入口,判断有序/无序和嵌套深度,委托 _render_list_items 处理 ordered = token["attrs"].get("ordered", False) depth = token["attrs"].get("depth", 1) self._render_list_items(token.get("children", []), ordered, depth) return "" def _render_list_items(self, items: list, ordered: bool, depth: int) -> None: # 递归处理列表项;嵌套子列表 depth+1,对应 List Bullet/Number 2 样式 for item in items: children = item.get("children", []) for child in children: if child["type"] == "list": sub_ordered = child["attrs"].get("ordered", False) self._render_list_items( child.get("children", []), sub_ordered, depth + 1 ) else: text = self._extract_text(child.get("children", [child])) if ordered: style = "List Number" if depth == 1 else "List Number 2" else: style = "List Bullet" if depth == 1 else "List Bullet 2" self.doc.add_paragraph(text, style=style) def table(self, token: dict, state: mistune.core.BlockState) -> str: # 表格渲染:table_head 直接含 table_cell;table_body 含 table_row → table_cell children = token.get("children", []) if not children: return "" head_token = next((c for c in children if c["type"] == "table_head"), None) body_token = next((c for c in children if c["type"] == "table_body"), None) head_cells = head_token.get("children", []) if head_token else [] body_rows: list[list[dict]] = [] if body_token: for row in body_token.get("children", []): if row["type"] == "table_row": body_rows.append(row.get("children", [])) cols = len(head_cells) if cols == 0: return "" tbl = self.doc.add_table(rows=1 + len(body_rows), cols=cols) tbl.style = "Table Grid" # 表头行加粗 for c, cell_token in enumerate(head_cells): text = self._extract_text(cell_token.get("children", [])) cell = tbl.rows[0].cells[c] cell.text = text for para in cell.paragraphs: for run in para.runs: run.bold = True # 数据行 for r, row_cells in enumerate(body_rows): for c, cell_token in enumerate(row_cells): if c >= cols: break text = self._extract_text(cell_token.get("children", [])) tbl.rows[r + 1].cells[c].text = text return "" # ------ 内联样式 ------ # def _render_inline_children(self, paragraph, children: list) -> None: # 遍历子节点,按类型设置 run 样式:粗体/斜体/删除线/行内代码/换行 for child in children: ctype = child.get("type", "") raw = child.get("raw", "") if ctype == "text": paragraph.add_run(raw) elif ctype == "strong": run = paragraph.add_run(self._extract_text(child.get("children", []))) run.bold = True elif ctype == "emphasis": run = paragraph.add_run(self._extract_text(child.get("children", []))) run.italic = True elif ctype == "strikethrough": run = paragraph.add_run(self._extract_text(child.get("children", []))) run.font.strike = True elif ctype == "codespan": run = paragraph.add_run(raw) run.font.name = "Courier New" run.font.size = Pt(10) elif ctype == "linebreak": paragraph.add_run().add_break() elif ctype == "softlinebreak": paragraph.add_run(" ") else: # 未知类型:递归子节点或直接输出文本兜底 sub = child.get("children") if sub: self._render_inline_children(paragraph, sub) elif raw: paragraph.add_run(raw) # ------ 工具方法 ------ # def _get_style_by_id(self, style_id: str): """通过 w:styleId 从文档样式集中查找 python-docx Style 对象。""" for style in self.doc.styles: if style.style_id == style_id: return style raise KeyError(f"样式 ID 不存在: {style_id}") @staticmethod def _extract_text(children: list) -> str: # 递归提取纯文本,用于标题/列表/表格等不需要内联样式的场景 parts: list[str] = [] for child in children: if isinstance(child, dict): raw = child.get("raw", "") sub = child.get("children") if raw: parts.append(raw) if sub: parts.append(DocxRenderer._extract_text(sub)) return "".join(parts) def render_token(self, token: dict, state: mistune.core.BlockState) -> str: # mistune 分发入口:按 token type 找对应方法,找不到则递归子节点兜底 ttype = token["type"] func = getattr(self, ttype, None) if func: return func(token, state) children = token.get("children") if children: for child in children: self.render_token(child, state) return "" def render_children(self, token: dict, state: mistune.core.BlockState) -> str: # 遍历并渲染一个 token 的所有子节点 for child in token.get("children", []): self.render_token(child, state) return "" def __call__(self, tokens: list, state: mistune.core.BlockState) -> str: # 渲染器被调用时的入口,遍历顶层 token 列表 for token in tokens: self.render_token(token, state) return "" # ------------------------------------------------------------------ # # 公共接口 # ------------------------------------------------------------------ # def markdown_to_docx_bytes(content: str, style_map: dict, style_data: dict) -> bytes: """Markdown 字符串转 .docx 字节流,全程内存操作不落盘。""" renderer = DocxRenderer(style_map=style_map, style_data=style_data) # 必须显式启用插件:mistune 默认不解析表格和删除线 md = mistune.create_markdown( renderer=renderer, plugins=["table", "strikethrough", "url"], ) md(content) buf = io.BytesIO() renderer.doc.save(buf) return buf.getvalue() def _safe_filename(name: str) -> str: """过滤文件名非法字符,规范化全角字符,返回安全的文件名。""" name = unicodedata.normalize("NFKC", name) illegal = r'\/:*?"<>|' for ch in illegal: name = name.replace(ch, "_") return name.strip() or "document" async def export_doc( file_name: str, content: str, style_id: Optional[str] = None, ) -> dict: """导出入口:加载样式、生成 .docx 写入临时目录,返回 { download_url, file_name, expires_at, style_id }。""" # 1. 加载样式文件,构建映射 style_data = load_style_file(style_id) style_map = build_style_map(style_data) actual_style_id = style_id or "default" # 2. Markdown → docx 字节流 try: doc_bytes = markdown_to_docx_bytes(content, style_map, style_data) except Exception as exc: raise ExportError(f"Markdown 转换失败: {exc}") from exc # 3. 写入临时目录 safe_name = _safe_filename(file_name) token = secrets.token_urlsafe(8) final_name = f"{safe_name}_{token}.doc" tmp_dir = Path(settings.temp_dir) tmp_dir.mkdir(parents=True, exist_ok=True) file_path = tmp_dir / final_name try: file_path.write_bytes(doc_bytes) except OSError as exc: raise ExportError(f"文件写入失败: {exc}") from exc # 4. 生成下载链接 expires_at_ms = int((time.time() + settings.export_link_expires) * 1000) encoded_name = urllib.parse.quote(final_name, safe="-._~") download_url = f"{settings.base_url.rstrip('/')}/api/v1/files/{encoded_name}" return { "download_url": download_url, "file_name": final_name, "expires_at": expires_at_ms, "style_id": actual_style_id, }