#!/usr/bin/env python3 """ gen_catalog.py — query the live DB and emit per-object markdown stubs in docs/auto-catalog/. Connection: reads ~/.my.cnf (or any file the env var DB_DEFAULTS_FILE points at) for [client] credentials. Database name is taken from .my.cnf (`database = ...`) unless overridden by DB_NAME. Writes: docs/auto-catalog/tables/.md docs/auto-catalog/views/.md docs/auto-catalog/procedures/.md docs/auto-catalog/functions/.md docs/auto-catalog/{tables,views,procedures,functions}/index.md Idempotent: rewrites everything under docs/auto-catalog/. """ from __future__ import annotations import collections import configparser import os import shutil import sys from pathlib import Path import pymysql ROOT = Path(__file__).resolve().parent.parent OUT = ROOT / "docs" / "auto-catalog" # The Narrative section is the one place humans hand-edit auto-catalog pages # (see narrate_routines.md for the playbook). collect_existing_narratives() # snapshots these before the rmtree so the regen preserves them. NARRATIVE_HEADER = "## Narrative" NARRATIVE_PLACEHOLDER_PREFIX = "_No human-written narrative" def collect_existing_narratives(root: Path) -> dict[str, str]: """Map "/" → preserved Narrative body. Anything that still holds the auto-generated placeholder is skipped, so the regen rewrites it fresh in the new language/format.""" preserved: dict[str, str] = {} for sub in ("tables", "views", "procedures", "functions"): d = root / sub if not d.exists(): continue for f in d.glob("*.md"): if f.name == "index.md": continue content = f.read_text(encoding="utf-8") idx = content.find(NARRATIVE_HEADER) if idx < 0: continue block = content[idx + len(NARRATIVE_HEADER):].lstrip("\n").rstrip() if not block or block.startswith(NARRATIVE_PLACEHOLDER_PREFIX): continue preserved[f"{sub}/{f.stem}"] = block return preserved def read_my_cnf() -> tuple[dict, str]: """Parse ~/.my.cnf [client] section. Returns (connect_kwargs, db_name).""" cnf_path = Path(os.environ.get("DB_DEFAULTS_FILE", "~/.my.cnf")).expanduser() cp = configparser.ConfigParser(strict=False, allow_no_value=True) cp.read(cnf_path) if "client" not in cp: raise RuntimeError(f"[client] section not found in {cnf_path}") c = cp["client"] db = os.environ.get("DB_NAME") or (c.get("database") or "").strip() if not db: raise RuntimeError("DB name not specified (DB_NAME env or `database=` in .my.cnf)") return { "host": c.get("host", "127.0.0.1"), "port": int(c.get("port", "3306")), "user": c.get("user"), "password": c.get("password"), "charset": "utf8mb4", "cursorclass": pymysql.cursors.DictCursor, }, db _SAFE_FILENAME = str.maketrans({c: "_" for c in r'/\:*?"<>|()'}) def slug(name: str) -> str: """Filename-safe slug. Routine names with parens or path separators get sanitized; everything else (including CJK) passes through — modern filesystems handle UTF-8.""" return name.translate(_SAFE_FILENAME).strip() def fmt_bytes(n: int | None) -> str: if n is None: return "" n = float(n) for unit in ("B", "KB", "MB", "GB"): if n < 1024: return f"{n:.1f} {unit}" n /= 1024 return f"{n:.1f} TB" def write_index(path: Path, kind: str, items: list[dict], summary_keys: list[tuple[str, str]]) -> None: lines = [f"# {kind.title()} (auto-generated)\n"] lines.append("> Regenerated by `scripts/gen_catalog.py`. Do not hand-edit.\n") lines.append("") lines.append(f"**Total: {len(items)}**.") lines.append("") headers = ["Name"] + [h for _, h in summary_keys] lines.append("| " + " | ".join(headers) + " |") lines.append("|" + "|".join(["---"] * len(headers)) + "|") for item in items: name = item["name"] link = f"[`{name}`]({slug(name)}.md)" row = [link] + [str(item.get(k, "") or "") for k, _ in summary_keys] # escape pipe chars in cell content row = [c.replace("|", "\\|") for c in row] lines.append("| " + " | ".join(row) + " |") path.write_text("\n".join(lines) + "\n", encoding="utf-8") def gen_tables(conn, db: str, out_dir: Path, narratives: dict[str, str]) -> list[dict]: out_dir.mkdir(parents=True, exist_ok=True) cur = conn.cursor() cur.execute(""" SELECT TABLE_NAME, TABLE_TYPE, ENGINE, TABLE_COLLATION, TABLE_COMMENT FROM information_schema.tables WHERE table_schema = %s ORDER BY TABLE_NAME """, (db,)) tables_rows = cur.fetchall() cur.execute(""" SELECT TABLE_NAME, ORDINAL_POSITION, COLUMN_NAME, COLUMN_TYPE, IS_NULLABLE, COLUMN_KEY, COLUMN_DEFAULT, EXTRA, COLUMN_COMMENT FROM information_schema.columns WHERE table_schema = %s ORDER BY TABLE_NAME, ORDINAL_POSITION """, (db,)) cols_by_table = collections.defaultdict(list) for c in cur.fetchall(): cols_by_table[c["TABLE_NAME"]].append(c) cur.execute(""" SELECT TABLE_NAME, INDEX_NAME, NON_UNIQUE, SEQ_IN_INDEX, COLUMN_NAME, INDEX_TYPE FROM information_schema.statistics WHERE table_schema = %s ORDER BY TABLE_NAME, INDEX_NAME, SEQ_IN_INDEX """, (db,)) idx_by_table = collections.defaultdict(list) for i in cur.fetchall(): idx_by_table[i["TABLE_NAME"]].append(i) base_tables = [t for t in tables_rows if t["TABLE_TYPE"] == "BASE TABLE"] views_meta = [t for t in tables_rows if t["TABLE_TYPE"] == "VIEW"] for t in base_tables: name = t["TABLE_NAME"] comment = (t.get("TABLE_COMMENT") or "").strip() cols = cols_by_table.get(name, []) idxs = idx_by_table.get(name, []) md = [f"# `{name}`\n"] if comment: md.append(f"> {comment}\n") md.append("") md.append(f"- **Engine:** {t.get('ENGINE') or ''}") md.append(f"- **Collation:** {t.get('TABLE_COLLATION') or ''}") md.append("") md.append("## Columns") md.append("") md.append("| # | Name | Type | Null | Key | Default | Extra | Comment |") md.append("|---|---|---|---|---|---|---|---|") for c in cols: md.append("| {pos} | `{cn}` | `{ct}` | {nu} | {kk} | {df} | {ex} | {cm} |".format( pos=c.get("ORDINAL_POSITION", ""), cn=c.get("COLUMN_NAME", ""), ct=c.get("COLUMN_TYPE", ""), nu=c.get("IS_NULLABLE", ""), kk=c.get("COLUMN_KEY", ""), df=("" if c.get("COLUMN_DEFAULT") is None else c.get("COLUMN_DEFAULT")).replace("|", "\\|") if isinstance(c.get("COLUMN_DEFAULT"), str) else (c.get("COLUMN_DEFAULT") or ""), ex=(c.get("EXTRA") or "").replace("|", "\\|"), cm=(c.get("COLUMN_COMMENT") or "").replace("|", "\\|").replace("\n", " "), )) md.append("") if idxs: md.append("## Indexes") md.append("") by_idx = collections.OrderedDict() for i in idxs: by_idx.setdefault(i["INDEX_NAME"], []).append(i) md.append("| Name | Unique | Type | Columns |") md.append("|---|---|---|---|") for idx_name, parts in by_idx.items(): parts.sort(key=lambda x: int(x.get("SEQ_IN_INDEX") or 0)) unique = "✓" if int(parts[0].get("NON_UNIQUE") or 0) == 0 else "" cols_in_idx = ", ".join(f"`{p['COLUMN_NAME']}`" for p in parts) md.append(f"| `{idx_name}` | {unique} | {parts[0].get('INDEX_TYPE') or ''} | {cols_in_idx} |") md.append("") md.append("## Narrative") md.append("") preserved = narratives.get(f"tables/{slug(name)}") if preserved: md.append(preserved) else: md.append("_No human-written narrative yet — when this table is exercised by a " "[vertical slice](../../slices/index.md), link from there back to this page._") md.append("") (out_dir / f"{slug(name)}.md").write_text("\n".join(md), encoding="utf-8") items = [ { "name": t["TABLE_NAME"], "comment": (t.get("TABLE_COMMENT") or "").strip()[:80], } for t in sorted(base_tables, key=lambda x: x["TABLE_NAME"]) ] write_index(out_dir / "index.md", "tables", items, [("comment", "Comment")]) return views_meta def gen_views(conn, db: str, out_dir: Path, views_meta: list[dict], narratives: dict[str, str]) -> None: out_dir.mkdir(parents=True, exist_ok=True) cur = conn.cursor() cur.execute(""" SELECT TABLE_NAME, VIEW_DEFINITION, IS_UPDATABLE, DEFINER FROM information_schema.views WHERE table_schema = %s """, (db,)) def_by_name = {v["TABLE_NAME"]: v for v in cur.fetchall()} # MySQL fully-qualifies every identifier in VIEW_DEFINITION as # ``.`table`.`column`. The schema name is environment-specific — # strip it so the rendered SQL is portable across deployments. schema_prefix = f"`{db}`." cur.execute(""" SELECT TABLE_NAME, ORDINAL_POSITION, COLUMN_NAME, COLUMN_TYPE, IS_NULLABLE FROM information_schema.columns WHERE table_schema = %s ORDER BY TABLE_NAME, ORDINAL_POSITION """, (db,)) cols_by_view = collections.defaultdict(list) for c in cur.fetchall(): cols_by_view[c["TABLE_NAME"]].append(c) for v in views_meta: name = v["TABLE_NAME"] d = def_by_name.get(name, {}) cols = cols_by_view.get(name, []) md = [f"# `{name}` (view)\n"] comment = (v.get("TABLE_COMMENT") or "").strip() # MySQL stores the literal "VIEW" in TABLE_COMMENT for views; ignore it. if comment and comment.upper() != "VIEW": md.append(f"> {comment}\n") md.append("") md.append(f"- **Updatable:** {d.get('IS_UPDATABLE') or ''}") md.append("") if cols: md.append("## Columns") md.append("") md.append("| # | Name | Type | Null |") md.append("|---|---|---|---|") for c in cols: md.append(f"| {c.get('ORDINAL_POSITION') or ''} | `{c.get('COLUMN_NAME') or ''}` | `{c.get('COLUMN_TYPE') or ''}` | {c.get('IS_NULLABLE') or ''} |") md.append("") defn = (d.get("VIEW_DEFINITION") or "").strip() if defn: defn = defn.replace(schema_prefix, "") md.append("## Definition") md.append("") md.append("```sql") md.append(defn) md.append("```") md.append("") md.append("## Narrative") md.append("") preserved = narratives.get(f"views/{slug(name)}") if preserved: md.append(preserved) else: md.append("_No human-written narrative yet._") md.append("") (out_dir / f"{slug(name)}.md").write_text("\n".join(md), encoding="utf-8") def _vc(v): c = (v.get("TABLE_COMMENT") or "").strip() return "" if c.upper() == "VIEW" else c[:80] items = [{"name": v["TABLE_NAME"], "comment": _vc(v)} for v in sorted(views_meta, key=lambda x: x["TABLE_NAME"])] write_index(out_dir / "index.md", "views", items, [("comment", "Comment")]) def gen_routines(conn, db: str, procs_dir: Path, fns_dir: Path, narratives: dict[str, str]) -> None: procs_dir.mkdir(parents=True, exist_ok=True) fns_dir.mkdir(parents=True, exist_ok=True) cur = conn.cursor() cur.execute(""" SELECT ROUTINE_NAME, ROUTINE_TYPE, DTD_IDENTIFIER AS RETURN_TYPE, IS_DETERMINISTIC, SQL_DATA_ACCESS, ROUTINE_COMMENT FROM information_schema.routines WHERE routine_schema = %s ORDER BY ROUTINE_NAME """, (db,)) routines = cur.fetchall() cur.execute(""" SELECT ROUTINE_TYPE, SPECIFIC_NAME, ORDINAL_POSITION, PARAMETER_MODE, PARAMETER_NAME, DTD_IDENTIFIER FROM information_schema.parameters WHERE specific_schema = %s ORDER BY SPECIFIC_NAME, ORDINAL_POSITION """, (db,)) p_by_routine = collections.defaultdict(list) for p in cur.fetchall(): p_by_routine[(p["ROUTINE_TYPE"], p["SPECIFIC_NAME"])].append(p) for r in routines: name = r["ROUTINE_NAME"] rtype = r["ROUTINE_TYPE"] out_dir = procs_dir if rtype == "PROCEDURE" else fns_dir prms = sorted( p_by_routine.get((rtype, name), []), key=lambda x: int(x.get("ORDINAL_POSITION") or 0), ) md = [f"# `{name}` ({rtype.lower()})\n"] comment = (r.get("ROUTINE_COMMENT") or "").strip() if comment: md.append(f"> {comment}\n") md.append("") md.append(f"- **Type:** {rtype}") if rtype == "FUNCTION": md.append(f"- **Returns:** `{r.get('RETURN_TYPE') or ''}`") md.append(f"- **Deterministic:** {r.get('IS_DETERMINISTIC') or ''}") md.append(f"- **SQL data access:** {r.get('SQL_DATA_ACCESS') or ''}") md.append("") md.append("## Parameters") md.append("") if prms: md.append("| # | Mode | Name | Type |") md.append("|---|---|---|---|") for p in prms: # Functions list a position-0 row for the return type with no name — # skip it; we render the return type via RETURN_TYPE above. if rtype == "FUNCTION" and (p.get("PARAMETER_NAME") is None or int(p.get("ORDINAL_POSITION") or 0) == 0): continue md.append(f"| {p.get('ORDINAL_POSITION') or ''} | {p.get('PARAMETER_MODE') or ''} | `{p.get('PARAMETER_NAME') or ''}` | `{p.get('DTD_IDENTIFIER') or ''}` |") else: md.append("_No parameters._") md.append("") md.append("## Body") md.append("") md.append("_Body is not pre-cached. To inspect: `mysql --defaults-file=~/.my.cnf " f"-e 'SHOW CREATE {rtype} `{name}`'`._") md.append("") md.append("## Narrative") md.append("") kind_sub = "procedures" if rtype == "PROCEDURE" else "functions" preserved = narratives.get(f"{kind_sub}/{slug(name)}") if preserved: md.append(preserved) else: md.append("_No human-written narrative yet._") md.append("") (out_dir / f"{slug(name)}.md").write_text("\n".join(md), encoding="utf-8") procs = [r for r in routines if r["ROUTINE_TYPE"] == "PROCEDURE"] fns = [r for r in routines if r["ROUTINE_TYPE"] == "FUNCTION"] write_index( procs_dir / "index.md", "procedures", [{"name": r["ROUTINE_NAME"], "comment": (r.get("ROUTINE_COMMENT") or "").strip()[:80]} for r in sorted(procs, key=lambda x: x["ROUTINE_NAME"])], [("comment", "Comment")], ) write_index( fns_dir / "index.md", "functions", [{"name": r["ROUTINE_NAME"], "comment": (r.get("ROUTINE_COMMENT") or "").strip()[:80]} for r in sorted(fns, key=lambda x: x["ROUTINE_NAME"])], [("comment", "Comment")], ) def main() -> int: cnf, db = read_my_cnf() print(f"Connecting to {cnf['host']}:{cnf['port']} db={db} as {cnf['user']}") conn = pymysql.connect(database=db, **cnf) try: # Snapshot hand-written Narrative sections so the regen preserves them. narratives = collect_existing_narratives(OUT) if narratives: print(f"Preserving {len(narratives)} hand-written narrative(s).") # Wipe and rewrite each output dir to keep it strictly in sync with the live DB. for sub in ("tables", "views", "procedures", "functions"): d = OUT / sub if d.exists(): shutil.rmtree(d) print(f"Writing to {OUT}") views_meta = gen_tables(conn, db, OUT / "tables", narratives) gen_views(conn, db, OUT / "views", views_meta, narratives) gen_routines(conn, db, OUT / "procedures", OUT / "functions", narratives) print("Done.") finally: conn.close() return 0 if __name__ == "__main__": sys.exit(main())