| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488 |
- """export_service.py — 将 Markdown 内容转换为 .docx 文件并返回可下载的临时链接。"""
- import io
- import json
- import secrets
- import unicodedata
- from pathlib import Path
- from typing import Optional
- import mistune
- from docx import Document
- from docx.oxml import OxmlElement
- from docx.oxml.ns import qn
- from docx.shared import Pt, RGBColor
- from lxml import etree
- from app.config import settings
- from app.core.exceptions import ExportError
- # ------------------------------------------------------------------ #
- # 样式文件加载
- # ------------------------------------------------------------------ #
- def load_style_file(style_id: Optional[str] = None) -> dict:
- """加载样式 JSON 文件并返回解析后的字典;style_id=None 时加载默认样式文件。"""
- if style_id is None:
- path = Path(settings.default_style_file)
- if not path.exists():
- raise ExportError(f"默认样式文件不存在: {path}")
- else:
- # 阶段 1 占位:届时改为从数据库查路径
- # path = style_repo.get_file_path(style_id)
- raise ExportError(f"样式 ID 暂不支持: {style_id}(阶段 1 功能)")
- try:
- with open(path, encoding="utf-8") as f:
- return json.load(f)
- except (OSError, json.JSONDecodeError) as exc:
- raise ExportError(f"样式文件解析失败: {exc}") from exc
- def build_style_map(style_data: dict) -> dict[str, dict]:
- """将样式列表转为双键映射(style_id 和 name 均可命中)。"""
- mapping: dict[str, dict] = {}
- for s in style_data.get("styles", []):
- if s.get("style_id"):
- mapping[s["style_id"]] = s
- if s.get("name"):
- mapping[s["name"]] = s
- return mapping
- # ------------------------------------------------------------------ #
- # JSON ↔ lxml 互转
- # ------------------------------------------------------------------ #
- def dict_to_element(d: dict) -> etree._Element:
- """将 element_to_dict() 产生的字典还原为 lxml Element(styles.py 的逆操作)。"""
- tag = d["@tag"]
- attrib = {k: v for k, v in d.get("@attrib", {}).items()}
- elem = etree.Element(tag, attrib=attrib)
- if d.get("#text"):
- elem.text = d["#text"]
- if d.get("#tail"):
- elem.tail = d["#tail"]
- for child_tag, child_val in d.get("@children", {}).items():
- # 同标签多子节点时存为列表
- items = child_val if isinstance(child_val, list) else [child_val]
- for item in items:
- if isinstance(item, dict):
- child_elem = dict_to_element(item)
- elem.append(child_elem)
- return elem
- def inject_styles_from_json(doc: Document, style_data: dict) -> None:
- """将 JSON 中所有样式的 full_xml_definition 以 upsert 方式注入到文档的 <w:styles> 节点,无定义的条目跳过。"""
- styles_element = doc.styles.element # <w:styles> 根节点
- for style_entry in style_data.get("styles", []):
- xml_def = style_entry.get("full_xml_definition")
- if not xml_def:
- continue
- # 从字典重建 lxml element
- try:
- new_elem = dict_to_element(xml_def)
- except Exception:
- continue # 单条解析失败不中断整体
- # 取出 w:styleId 属性,用于查找并移除旧节点
- style_id_key = qn("w:styleId")
- new_style_id = new_elem.get(style_id_key)
- if new_style_id:
- existing = styles_element.find(
- f'.//{qn("w:style")}[@{qn("w:styleId")}="{new_style_id}"]'
- )
- if existing is not None:
- styles_element.remove(existing)
- styles_element.append(new_elem)
- # ------------------------------------------------------------------ #
- # 样式 ID 查找辅助
- # ------------------------------------------------------------------ #
- def _resolve_style_id(style_map: dict, *lookup_keys: str) -> Optional[str]:
- """按优先级依次查找多个候选 key,返回第一个命中条目的 style_id,用于将语义名称映射到 JSON 中实际的 w:styleId。"""
- for key in lookup_keys:
- entry = style_map.get(key)
- if entry and entry.get("style_id"):
- return entry["style_id"]
- return None
- # ------------------------------------------------------------------ #
- # Markdown → python-docx 渲染器
- # ------------------------------------------------------------------ #
- class DocxRenderer(mistune.BaseRenderer):
- """将 mistune AST token 流渲染到 python-docx Document 对象。"""
- # Word 内置标题样式的英文名(用于查找 style_map 时的候选 key)
- _HEADING_ALIASES = {
- 1: ["Heading 1", "heading 1"],
- 2: ["Heading 2", "heading 2"],
- 3: ["Heading 3", "heading 3"],
- 4: ["Heading 4", "heading 4"],
- 5: ["Heading 5", "heading 5"],
- 6: ["Heading 6", "heading 6"],
- }
- def __init__(self, style_map: dict, style_data: dict) -> None:
- """初始化渲染器,注入完整样式定义并缓存常用样式 ID。"""
- super().__init__()
- self.style_map = style_map
- self.doc = Document()
- # 用 full_xml_definition 整体替换文档样式表,一次性应用所有格式
- inject_styles_from_json(self.doc, style_data)
- # 缓存 Normal 的 style_id,段落渲染时使用
- self._normal_id: Optional[str] = _resolve_style_id(
- style_map, "Normal", "1"
- )
- # ------ 块级元素 ------ #
- def heading(self, token: dict, state: mistune.core.BlockState) -> str:
- # # ## ### → 从 style_map 解析对应样式 ID;找不到则回退到 python-docx 内置标题
- level = token["attrs"]["level"]
- children = token.get("children", [])
- text = self._extract_text(children)
- aliases = self._HEADING_ALIASES.get(level, [f"Heading {level}", f"heading {level}"])
- heading_style_id = _resolve_style_id(self.style_map, *aliases)
- if heading_style_id:
- # 用 w:styleId 直接引用已注入的样式,不再手动设颜色/字号
- para = self.doc.add_paragraph(text)
- try:
- para.style = self._get_style_by_id(heading_style_id)
- except KeyError:
- pass # 样式注入失败时保持默认样式,不中断渲染
- else:
- para = self.doc.add_heading(text, level=level)
- return ""
- def paragraph(self, token: dict, state: mistune.core.BlockState) -> str:
- # 普通段落,内联样式由 _render_inline_children 处理
- children = token.get("children", [])
- p = self.doc.add_paragraph()
- if self._normal_id:
- try:
- p.style = self._get_style_by_id(self._normal_id)
- except Exception:
- pass
- self._render_inline_children(p, children)
- return ""
- def blank_line(self, token: dict, state: mistune.core.BlockState) -> str:
- # 空行不插入任何内容,避免多余空段落
- return ""
- def thematic_break(self, token: dict, state: mistune.core.BlockState) -> str:
- # --- 分隔线:通过 XML 段落底部边框实现
- p = self.doc.add_paragraph()
- pPr = p._p.get_or_add_pPr()
- pBdr = OxmlElement("w:pBdr")
- bottom = OxmlElement("w:bottom")
- bottom.set(qn("w:val"), "single")
- bottom.set(qn("w:sz"), "6")
- bottom.set(qn("w:space"), "1")
- bottom.set(qn("w:color"), "auto")
- pBdr.append(bottom)
- pPr.append(pBdr)
- return ""
- def block_quote(self, token: dict, state: mistune.core.BlockState) -> str:
- # > 引用块 → Quote 样式(缩进+斜体)
- children = token.get("children", [])
- for child in children:
- text = self._extract_text(child.get("children", []))
- quote_id = _resolve_style_id(self.style_map, "Quote", "Quote Char")
- if quote_id:
- p = self.doc.add_paragraph(text)
- try:
- p.style = self._get_style_by_id(quote_id)
- except Exception:
- p.style = "Quote"
- else:
- p = self.doc.add_paragraph(style="Quote")
- p.add_run(text)
- return ""
- def block_code(self, token: dict, state: mistune.core.BlockState) -> str:
- # 代码块 → No Spacing 样式,Courier New 10pt 深灰色(样式库通常无代码块样式,保持硬编码)
- code = token.get("raw", "")
- p = self.doc.add_paragraph(style="No Spacing")
- run = p.add_run(code)
- run.font.name = "Courier New"
- run.font.size = Pt(10)
- run.font.color.rgb = RGBColor(0x33, 0x33, 0x33)
- return ""
- def list(self, token: dict, state: mistune.core.BlockState) -> str:
- # 列表入口,判断有序/无序和嵌套深度,委托 _render_list_items 处理
- ordered = token["attrs"].get("ordered", False)
- depth = token["attrs"].get("depth", 1)
- self._render_list_items(token.get("children", []), ordered, depth)
- return ""
- def _render_list_items(self, items: list, ordered: bool, depth: int) -> None:
- # 递归处理列表项;嵌套子列表 depth+1,对应 List Bullet/Number 2 样式
- for item in items:
- children = item.get("children", [])
- for child in children:
- if child["type"] == "list":
- sub_ordered = child["attrs"].get("ordered", False)
- self._render_list_items(
- child.get("children", []), sub_ordered, depth + 1
- )
- else:
- text = self._extract_text(child.get("children", [child]))
- if ordered:
- style = "List Number" if depth == 1 else "List Number 2"
- else:
- style = "List Bullet" if depth == 1 else "List Bullet 2"
- self.doc.add_paragraph(text, style=style)
- def table(self, token: dict, state: mistune.core.BlockState) -> str:
- # 表格渲染:table_head 直接含 table_cell;table_body 含 table_row → table_cell
- children = token.get("children", [])
- if not children:
- return ""
- head_token = next((c for c in children if c["type"] == "table_head"), None)
- body_token = next((c for c in children if c["type"] == "table_body"), None)
- head_cells = head_token.get("children", []) if head_token else []
- body_rows: list[list[dict]] = []
- if body_token:
- for row in body_token.get("children", []):
- if row["type"] == "table_row":
- body_rows.append(row.get("children", []))
- cols = len(head_cells)
- if cols == 0:
- return ""
- tbl = self.doc.add_table(rows=1 + len(body_rows), cols=cols)
- tbl.style = "Table Grid"
- # 表头行加粗
- for c, cell_token in enumerate(head_cells):
- text = self._extract_text(cell_token.get("children", []))
- cell = tbl.rows[0].cells[c]
- cell.text = text
- for para in cell.paragraphs:
- for run in para.runs:
- run.bold = True
- # 数据行
- for r, row_cells in enumerate(body_rows):
- for c, cell_token in enumerate(row_cells):
- if c >= cols:
- break
- text = self._extract_text(cell_token.get("children", []))
- tbl.rows[r + 1].cells[c].text = text
- return ""
- # ------ 内联样式 ------ #
- def _render_inline_children(self, paragraph, children: list) -> None:
- # 遍历子节点,按类型设置 run 样式:粗体/斜体/删除线/行内代码/换行
- for child in children:
- ctype = child.get("type", "")
- raw = child.get("raw", "")
- if ctype == "text":
- paragraph.add_run(raw)
- elif ctype == "strong":
- run = paragraph.add_run(self._extract_text(child.get("children", [])))
- run.bold = True
- elif ctype == "emphasis":
- run = paragraph.add_run(self._extract_text(child.get("children", [])))
- run.italic = True
- elif ctype == "strikethrough":
- run = paragraph.add_run(self._extract_text(child.get("children", [])))
- run.font.strike = True
- elif ctype == "codespan":
- run = paragraph.add_run(raw)
- run.font.name = "Courier New"
- run.font.size = Pt(10)
- elif ctype == "linebreak":
- paragraph.add_run().add_break()
- elif ctype == "softlinebreak":
- paragraph.add_run(" ")
- else:
- # 未知类型:递归子节点或直接输出文本兜底
- sub = child.get("children")
- if sub:
- self._render_inline_children(paragraph, sub)
- elif raw:
- paragraph.add_run(raw)
- # ------ 工具方法 ------ #
- def _get_style_by_id(self, style_id: str):
- """通过 w:styleId 从文档样式集中查找 python-docx Style 对象。"""
- for style in self.doc.styles:
- if style.style_id == style_id:
- return style
- raise KeyError(f"样式 ID 不存在: {style_id}")
- @staticmethod
- def _extract_text(children: list) -> str:
- # 递归提取纯文本,用于标题/列表/表格等不需要内联样式的场景
- parts: list[str] = []
- for child in children:
- if isinstance(child, dict):
- raw = child.get("raw", "")
- sub = child.get("children")
- if raw:
- parts.append(raw)
- if sub:
- parts.append(DocxRenderer._extract_text(sub))
- return "".join(parts)
- def render_token(self, token: dict, state: mistune.core.BlockState) -> str:
- # mistune 分发入口:按 token type 找对应方法,找不到则递归子节点兜底
- ttype = token["type"]
- func = getattr(self, ttype, None)
- if func:
- return func(token, state)
- children = token.get("children")
- if children:
- for child in children:
- self.render_token(child, state)
- return ""
- def render_children(self, token: dict, state: mistune.core.BlockState) -> str:
- # 遍历并渲染一个 token 的所有子节点
- for child in token.get("children", []):
- self.render_token(child, state)
- return ""
- def __call__(self, tokens: list, state: mistune.core.BlockState) -> str:
- # 渲染器被调用时的入口,遍历顶层 token 列表
- for token in tokens:
- self.render_token(token, state)
- return ""
- # ------------------------------------------------------------------ #
- # 公共接口
- # ------------------------------------------------------------------ #
- def markdown_to_docx_bytes(content: str, style_map: dict, style_data: dict) -> bytes:
- """Markdown 字符串转 .docx 字节流,全程内存操作不落盘。"""
- renderer = DocxRenderer(style_map=style_map, style_data=style_data)
- # 必须显式启用插件:mistune 默认不解析表格和删除线
- md = mistune.create_markdown(
- renderer=renderer,
- plugins=["table", "strikethrough", "url"],
- )
- md(content)
- buf = io.BytesIO()
- renderer.doc.save(buf)
- return buf.getvalue()
- def _safe_filename(name: str) -> str:
- """过滤文件名非法字符,规范化全角字符,返回安全的文件名。"""
- name = unicodedata.normalize("NFKC", name)
- illegal = r'\/:*?"<>|'
- for ch in illegal:
- name = name.replace(ch, "_")
- return name.strip() or "document"
- async def export_doc(
- user_id: str,
- file_name: str,
- content: str,
- style_id: Optional[str] = None,
- document_id: Optional[str] = None,
- ) -> dict:
- """导出入口:加载样式、生成 .docx,按用户/日期分区写入,写入 export_records 记录,返回 { record_id, download_url, file_name, style_id, warning }。"""
- from datetime import date
- # 1. 加载样式文件,构建映射
- style_data = load_style_file(style_id)
- style_map = build_style_map(style_data)
- actual_style_id = style_id or "default"
- # 2. Markdown → docx 字节流
- try:
- doc_bytes = markdown_to_docx_bytes(content, style_map, style_data)
- except Exception as exc:
- raise ExportError(f"Markdown 转换失败: {exc}") from exc
- # 3. 按 ./tmp/{user_id}/{YYYY-MM-DD}/ 分区写入
- safe_name = _safe_filename(file_name)
- token = secrets.token_urlsafe(8)
- final_name = f"{safe_name}_{token}.doc"
- today = date.today().strftime("%Y-%m-%d")
- user_dir = Path(settings.temp_dir) / user_id / today
- user_dir.mkdir(parents=True, exist_ok=True)
- file_path = user_dir / final_name
- try:
- file_path.write_bytes(doc_bytes)
- except OSError as exc:
- raise ExportError(f"文件写入失败: {exc}") from exc
- file_size = file_path.stat().st_size
- # 4. 生成永久下载链接(无过期时间)
- download_url = (
- f"{settings.base_url.rstrip('/')}/api/v1/export/records"
- # 占位,record_id 写入 DB 后拼接
- )
- # 5. 写入 export_records 数据库记录
- from app.core.database import AsyncSessionLocal
- from app.services.export_record_service import ExportRecordService
- async with AsyncSessionLocal() as db:
- svc = ExportRecordService(db)
- record = await svc.create_record(
- user_id=user_id,
- file_name=final_name,
- file_path=str(file_path),
- file_size=file_size,
- download_url="", # 先占位,下面用 record_id 补全
- document_id=document_id,
- style_id=actual_style_id,
- )
- # 用真实 record_id 拼接永久链接并回写(含 userId,前端直接使用无需再拼)
- record_id = record.id
- download_url = (
- f"{settings.base_url.rstrip('/')}/api/v1/export/records/{record_id}/download"
- f"?userId={user_id}"
- )
- record.download_url = download_url
- await db.commit()
- # 6. 导出后检查磁盘配额
- from app.services.storage_monitor import check_quota
- warning = check_quota(user_id)
- return {
- "record_id": record_id,
- "download_url": download_url,
- "file_name": final_name,
- "style_id": actual_style_id,
- "warning": warning,
- }
|