#!/usr/bin/env python3 """ gen_catalog.py — query the live DB and emit per-object markdown stubs in docs/auto-catalog/. Connection: reads ~/.my.cnf (or any file the env var DB_DEFAULTS_FILE points at) for [client] credentials. Database name is taken from .my.cnf (`database = ...`) unless overridden by DB_NAME. Writes: docs/auto-catalog/tables/.md docs/auto-catalog/views/.md docs/auto-catalog/procedures/.md docs/auto-catalog/functions/.md docs/auto-catalog/{tables,views,procedures,functions}/index.md Idempotent: rewrites everything under docs/auto-catalog/. """ from __future__ import annotations import collections import configparser import os import shutil import sys from pathlib import Path import pymysql ROOT = Path(__file__).resolve().parent.parent OUT = ROOT / "docs" / "auto-catalog" # 说明 section is the only hand-edited part of the auto-catalog. We snapshot # any non-placeholder content before the rmtree and re-inject it during write, # so the regeneration preserves human prose. Mirrors en/scripts/gen_catalog.py. NARRATIVE_HEADER = "## 说明" NARRATIVE_PLACEHOLDER_PREFIX = "_暂无人工说明" def collect_existing_narratives(root: Path) -> dict[str, str]: """Map "/" → preserved 说明 body. Files that still hold the auto-generated placeholder are skipped so the regen rewrites them fresh.""" preserved: dict[str, str] = {} for sub in ("tables", "views", "procedures", "functions"): d = root / sub if not d.exists(): continue for f in d.glob("*.md"): if f.name == "index.md": continue content = f.read_text(encoding="utf-8") idx = content.find(NARRATIVE_HEADER) if idx < 0: continue block = content[idx + len(NARRATIVE_HEADER):].lstrip("\n").rstrip() if not block or block.startswith(NARRATIVE_PLACEHOLDER_PREFIX): continue preserved[f"{sub}/{f.stem}"] = block return preserved def read_my_cnf() -> tuple[dict, str]: """Parse ~/.my.cnf [client] section. Returns (connect_kwargs, db_name).""" cnf_path = Path(os.environ.get("DB_DEFAULTS_FILE", "~/.my.cnf")).expanduser() cp = configparser.ConfigParser(strict=False, allow_no_value=True) cp.read(cnf_path) if "client" not in cp: raise RuntimeError(f"[client] section not found in {cnf_path}") c = cp["client"] db = os.environ.get("DB_NAME") or (c.get("database") or "").strip() if not db: raise RuntimeError("DB name not specified (DB_NAME env or `database=` in .my.cnf)") return { "host": c.get("host", "127.0.0.1"), "port": int(c.get("port", "3306")), "user": c.get("user"), "password": c.get("password"), "charset": "utf8mb4", "cursorclass": pymysql.cursors.DictCursor, }, db _SAFE_FILENAME = str.maketrans({c: "_" for c in r'/\:*?"<>|()'}) def slug(name: str) -> str: """Filename-safe slug. Routine names with parens or path separators get sanitized; everything else (including CJK) passes through — modern filesystems handle UTF-8.""" return name.translate(_SAFE_FILENAME).strip() def fmt_bytes(n: int | None) -> str: if n is None: return "" n = float(n) for unit in ("B", "KB", "MB", "GB"): if n < 1024: return f"{n:.1f} {unit}" n /= 1024 return f"{n:.1f} TB" def write_index(path: Path, kind: str, items: list[dict], summary_keys: list[tuple[str, str]]) -> None: kind_titles = { "tables": "数据表", "views": "视图", "procedures": "存储过程", "functions": "函数", } lines = [f"# {kind_titles.get(kind, kind.title())}(自动生成)\n"] lines.append("> 由 `scripts/gen_catalog.py` 重新生成。请勿手工编辑。\n") lines.append("") lines.append(f"**总数:{len(items)}**。") lines.append("") header_map = { "Name": "名称", "Rows": "行数", "Size": "大小", "Comment": "备注", } headers = ["名称"] + [header_map.get(h, h) for _, h in summary_keys] lines.append("| " + " | ".join(headers) + " |") lines.append("|" + "|".join(["---"] * len(headers)) + "|") for item in items: name = item["name"] link = f"[`{name}`]({slug(name)}.md)" row = [link] + [str(item.get(k, "") or "") for k, _ in summary_keys] # escape pipe chars in cell content row = [c.replace("|", "\\|") for c in row] lines.append("| " + " | ".join(row) + " |") path.write_text("\n".join(lines) + "\n", encoding="utf-8") def gen_tables(conn, db: str, out_dir: Path, narratives: dict[str, str]) -> list[dict]: out_dir.mkdir(parents=True, exist_ok=True) cur = conn.cursor() cur.execute(""" SELECT TABLE_NAME, TABLE_TYPE, ENGINE, TABLE_ROWS, DATA_LENGTH, TABLE_COLLATION, CREATE_TIME, UPDATE_TIME, TABLE_COMMENT FROM information_schema.tables WHERE table_schema = %s ORDER BY TABLE_NAME """, (db,)) tables_rows = cur.fetchall() cur.execute(""" SELECT TABLE_NAME, ORDINAL_POSITION, COLUMN_NAME, COLUMN_TYPE, IS_NULLABLE, COLUMN_KEY, COLUMN_DEFAULT, EXTRA, COLUMN_COMMENT FROM information_schema.columns WHERE table_schema = %s ORDER BY TABLE_NAME, ORDINAL_POSITION """, (db,)) cols_by_table = collections.defaultdict(list) for c in cur.fetchall(): cols_by_table[c["TABLE_NAME"]].append(c) cur.execute(""" SELECT TABLE_NAME, INDEX_NAME, NON_UNIQUE, SEQ_IN_INDEX, COLUMN_NAME, INDEX_TYPE FROM information_schema.statistics WHERE table_schema = %s ORDER BY TABLE_NAME, INDEX_NAME, SEQ_IN_INDEX """, (db,)) idx_by_table = collections.defaultdict(list) for i in cur.fetchall(): idx_by_table[i["TABLE_NAME"]].append(i) base_tables = [t for t in tables_rows if t["TABLE_TYPE"] == "BASE TABLE"] views_meta = [t for t in tables_rows if t["TABLE_TYPE"] == "VIEW"] for t in base_tables: name = t["TABLE_NAME"] rows = t.get("TABLE_ROWS") size = fmt_bytes(t.get("DATA_LENGTH")) comment = (t.get("TABLE_COMMENT") or "").strip() cols = cols_by_table.get(name, []) idxs = idx_by_table.get(name, []) md = [f"# `{name}`\n"] if comment: md.append(f"> {comment}\n") md.append("") md.append(f"- **预估行数:** {rows if rows is not None else ''}") md.append(f"- **数据大小:** {size}") md.append(f"- **引擎:** {t.get('ENGINE') or ''}") md.append(f"- **排序规则:** {t.get('TABLE_COLLATION') or ''}") md.append(f"- **创建时间:** {t.get('CREATE_TIME') or ''}") md.append(f"- **更新时间:** {t.get('UPDATE_TIME') or ''}") md.append("") md.append("## 字段") md.append("") md.append("| # | 名称 | 类型 | 可空 | 键 | 默认值 | 额外属性 | 备注 |") md.append("|---|---|---|---|---|---|---|---|") for c in cols: md.append("| {pos} | `{cn}` | `{ct}` | {nu} | {kk} | {df} | {ex} | {cm} |".format( pos=c.get("ORDINAL_POSITION", ""), cn=c.get("COLUMN_NAME", ""), ct=c.get("COLUMN_TYPE", ""), nu=c.get("IS_NULLABLE", ""), kk=c.get("COLUMN_KEY", ""), df=("" if c.get("COLUMN_DEFAULT") is None else c.get("COLUMN_DEFAULT")).replace("|", "\\|") if isinstance(c.get("COLUMN_DEFAULT"), str) else (c.get("COLUMN_DEFAULT") or ""), ex=(c.get("EXTRA") or "").replace("|", "\\|"), cm=(c.get("COLUMN_COMMENT") or "").replace("|", "\\|").replace("\n", " "), )) md.append("") if idxs: md.append("## 索引") md.append("") by_idx = collections.OrderedDict() for i in idxs: by_idx.setdefault(i["INDEX_NAME"], []).append(i) md.append("| 名称 | 唯一 | 类型 | 字段 |") md.append("|---|---|---|---|") for idx_name, parts in by_idx.items(): parts.sort(key=lambda x: int(x.get("SEQ_IN_INDEX") or 0)) unique = "✓" if int(parts[0].get("NON_UNIQUE") or 0) == 0 else "" cols_in_idx = ", ".join(f"`{p['COLUMN_NAME']}`" for p in parts) md.append(f"| `{idx_name}` | {unique} | {parts[0].get('INDEX_TYPE') or ''} | {cols_in_idx} |") md.append("") md.append("## 说明") md.append("") preserved = narratives.get(f"tables/{slug(name)}") if preserved: md.append(preserved) else: md.append("_暂无人工说明;当某个[垂直切片](../../slices/index.md)使用这张表时,请从切片反向链接到本页。_") md.append("") (out_dir / f"{slug(name)}.md").write_text("\n".join(md), encoding="utf-8") items = [] for t in sorted(base_tables, key=lambda x: int(x.get("DATA_LENGTH") or 0), reverse=True): items.append({ "name": t["TABLE_NAME"], "rows": t.get("TABLE_ROWS"), "size": fmt_bytes(t.get("DATA_LENGTH")), "comment": (t.get("TABLE_COMMENT") or "").strip()[:80], }) write_index( out_dir / "index.md", "tables", items, [("rows", "Rows"), ("size", "Size"), ("comment", "Comment")], ) return views_meta def gen_views(conn, db: str, out_dir: Path, views_meta: list[dict], narratives: dict[str, str]) -> None: out_dir.mkdir(parents=True, exist_ok=True) cur = conn.cursor() cur.execute(""" SELECT TABLE_NAME, VIEW_DEFINITION, IS_UPDATABLE, DEFINER FROM information_schema.views WHERE table_schema = %s """, (db,)) def_by_name = {v["TABLE_NAME"]: v for v in cur.fetchall()} cur.execute(""" SELECT TABLE_NAME, ORDINAL_POSITION, COLUMN_NAME, COLUMN_TYPE, IS_NULLABLE FROM information_schema.columns WHERE table_schema = %s ORDER BY TABLE_NAME, ORDINAL_POSITION """, (db,)) cols_by_view = collections.defaultdict(list) for c in cur.fetchall(): cols_by_view[c["TABLE_NAME"]].append(c) for v in views_meta: name = v["TABLE_NAME"] d = def_by_name.get(name, {}) cols = cols_by_view.get(name, []) md = [f"# `{name}`(视图)\n"] comment = (v.get("TABLE_COMMENT") or "").strip() # MySQL stores the literal "VIEW" in TABLE_COMMENT for views; ignore it. if comment and comment.upper() != "VIEW": md.append(f"> {comment}\n") md.append("") md.append(f"- **是否可更新:** {d.get('IS_UPDATABLE') or ''}") md.append(f"- **定义者:** `{d.get('DEFINER') or ''}`") md.append("") if cols: md.append("## 字段") md.append("") md.append("| # | 名称 | 类型 | 可空 |") md.append("|---|---|---|---|") for c in cols: md.append(f"| {c.get('ORDINAL_POSITION') or ''} | `{c.get('COLUMN_NAME') or ''}` | `{c.get('COLUMN_TYPE') or ''}` | {c.get('IS_NULLABLE') or ''} |") md.append("") defn = (d.get("VIEW_DEFINITION") or "").strip() if defn: md.append("## 定义") md.append("") md.append("```sql") md.append(defn) md.append("```") md.append("") md.append("## 说明") md.append("") preserved = narratives.get(f"views/{slug(name)}") if preserved: md.append(preserved) else: md.append("_暂无人工说明。_") md.append("") (out_dir / f"{slug(name)}.md").write_text("\n".join(md), encoding="utf-8") def _vc(v): c = (v.get("TABLE_COMMENT") or "").strip() return "" if c.upper() == "VIEW" else c[:80] items = [{"name": v["TABLE_NAME"], "comment": _vc(v)} for v in sorted(views_meta, key=lambda x: x["TABLE_NAME"])] write_index(out_dir / "index.md", "views", items, [("comment", "Comment")]) def gen_routines(conn, db: str, procs_dir: Path, fns_dir: Path, narratives: dict[str, str]) -> None: procs_dir.mkdir(parents=True, exist_ok=True) fns_dir.mkdir(parents=True, exist_ok=True) cur = conn.cursor() cur.execute(""" SELECT ROUTINE_NAME, ROUTINE_TYPE, DTD_IDENTIFIER AS RETURN_TYPE, IS_DETERMINISTIC, SQL_DATA_ACCESS, CREATED, LAST_ALTERED, ROUTINE_COMMENT FROM information_schema.routines WHERE routine_schema = %s ORDER BY ROUTINE_NAME """, (db,)) routines = cur.fetchall() cur.execute(""" SELECT ROUTINE_TYPE, SPECIFIC_NAME, ORDINAL_POSITION, PARAMETER_MODE, PARAMETER_NAME, DTD_IDENTIFIER FROM information_schema.parameters WHERE specific_schema = %s ORDER BY SPECIFIC_NAME, ORDINAL_POSITION """, (db,)) p_by_routine = collections.defaultdict(list) for p in cur.fetchall(): p_by_routine[(p["ROUTINE_TYPE"], p["SPECIFIC_NAME"])].append(p) for r in routines: name = r["ROUTINE_NAME"] rtype = r["ROUTINE_TYPE"] out_dir = procs_dir if rtype == "PROCEDURE" else fns_dir prms = sorted( p_by_routine.get((rtype, name), []), key=lambda x: int(x.get("ORDINAL_POSITION") or 0), ) rtype_label = "存储过程" if rtype == "PROCEDURE" else "函数" md = [f"# `{name}`({rtype_label})\n"] comment = (r.get("ROUTINE_COMMENT") or "").strip() if comment: md.append(f"> {comment}\n") md.append("") md.append(f"- **类型:** {rtype}") if rtype == "FUNCTION": md.append(f"- **返回值:** `{r.get('RETURN_TYPE') or ''}`") md.append(f"- **确定性:** {r.get('IS_DETERMINISTIC') or ''}") md.append(f"- **SQL 数据访问:** {r.get('SQL_DATA_ACCESS') or ''}") md.append(f"- **创建时间:** {r.get('CREATED') or ''}") md.append(f"- **最后修改:** {r.get('LAST_ALTERED') or ''}") md.append("") md.append("## 参数") md.append("") if prms: md.append("| # | 模式 | 名称 | 类型 |") md.append("|---|---|---|---|") for p in prms: # Functions list a position-0 row for the return type with no name — # skip it; we render the return type via RETURN_TYPE above. if rtype == "FUNCTION" and (p.get("PARAMETER_NAME") is None or int(p.get("ORDINAL_POSITION") or 0) == 0): continue md.append(f"| {p.get('ORDINAL_POSITION') or ''} | {p.get('PARAMETER_MODE') or ''} | `{p.get('PARAMETER_NAME') or ''}` | `{p.get('DTD_IDENTIFIER') or ''}` |") else: md.append("_无参数。_") md.append("") md.append("## 主体") md.append("") md.append("_主体未预缓存。如需查看:`mysql --defaults-file=~/.my.cnf " f"-e 'SHOW CREATE {rtype} `{name}`'`。_") md.append("") md.append("## 说明") md.append("") kind_sub = "procedures" if rtype == "PROCEDURE" else "functions" preserved = narratives.get(f"{kind_sub}/{slug(name)}") if preserved: md.append(preserved) else: md.append("_暂无人工说明。_") md.append("") (out_dir / f"{slug(name)}.md").write_text("\n".join(md), encoding="utf-8") procs = [r for r in routines if r["ROUTINE_TYPE"] == "PROCEDURE"] fns = [r for r in routines if r["ROUTINE_TYPE"] == "FUNCTION"] write_index( procs_dir / "index.md", "procedures", [{"name": r["ROUTINE_NAME"], "comment": (r.get("ROUTINE_COMMENT") or "").strip()[:80]} for r in sorted(procs, key=lambda x: x["ROUTINE_NAME"])], [("comment", "Comment")], ) write_index( fns_dir / "index.md", "functions", [{"name": r["ROUTINE_NAME"], "comment": (r.get("ROUTINE_COMMENT") or "").strip()[:80]} for r in sorted(fns, key=lambda x: x["ROUTINE_NAME"])], [("comment", "Comment")], ) def main() -> int: cnf, db = read_my_cnf() print(f"Connecting to {cnf['host']}:{cnf['port']} db={db} as {cnf['user']}") conn = pymysql.connect(database=db, **cnf) try: # 先快照已存在的人工说明,确保重生成时保留。 narratives = collect_existing_narratives(OUT) if narratives: print(f"Preserving {len(narratives)} hand-written narrative(s).") # Wipe and rewrite each output dir to keep it strictly in sync with the live DB. for sub in ("tables", "views", "procedures", "functions"): d = OUT / sub if d.exists(): shutil.rmtree(d) print(f"Writing to {OUT}") views_meta = gen_tables(conn, db, OUT / "tables", narratives) gen_views(conn, db, OUT / "views", views_meta, narratives) gen_routines(conn, db, OUT / "procedures", OUT / "functions", narratives) print("Done.") finally: conn.close() return 0 if __name__ == "__main__": sys.exit(main())