gen_catalog.py 14.7 KB
#!/usr/bin/env python3
"""
gen_catalog.py — query the live DB and emit per-object markdown stubs in docs/auto-catalog/.

Connection: reads ~/.my.cnf (or any file the env var DB_DEFAULTS_FILE points at)
            for [client] credentials. Database name is taken from .my.cnf
            (`database = ...`) unless overridden by DB_NAME.

Writes:
    docs/auto-catalog/tables/<table_name>.md
    docs/auto-catalog/views/<view_name>.md
    docs/auto-catalog/procedures/<proc_name>.md
    docs/auto-catalog/functions/<fn_name>.md
    docs/auto-catalog/{tables,views,procedures,functions}/index.md

Idempotent: rewrites everything under docs/auto-catalog/.
"""
from __future__ import annotations

import collections
import configparser
import os
import shutil
import sys
from pathlib import Path

import pymysql

ROOT = Path(__file__).resolve().parent.parent
OUT = ROOT / "docs" / "auto-catalog"


def read_my_cnf() -> tuple[dict, str]:
    """Parse ~/.my.cnf [client] section. Returns (connect_kwargs, db_name)."""
    cnf_path = Path(os.environ.get("DB_DEFAULTS_FILE", "~/.my.cnf")).expanduser()
    cp = configparser.ConfigParser(strict=False, allow_no_value=True)
    cp.read(cnf_path)
    if "client" not in cp:
        raise RuntimeError(f"[client] section not found in {cnf_path}")
    c = cp["client"]
    db = os.environ.get("DB_NAME") or (c.get("database") or "").strip()
    if not db:
        raise RuntimeError("DB name not specified (DB_NAME env or `database=` in .my.cnf)")
    return {
        "host": c.get("host", "127.0.0.1"),
        "port": int(c.get("port", "3306")),
        "user": c.get("user"),
        "password": c.get("password"),
        "charset": "utf8mb4",
        "cursorclass": pymysql.cursors.DictCursor,
    }, db


_SAFE_FILENAME = str.maketrans({c: "_" for c in r'/\:*?"<>|()'})


def slug(name: str) -> str:
    """Filename-safe slug. Routine names with parens or path separators get sanitized;
    everything else (including CJK) passes through — modern filesystems handle UTF-8."""
    return name.translate(_SAFE_FILENAME).strip()


def fmt_bytes(n: int | None) -> str:
    if n is None:
        return ""
    n = float(n)
    for unit in ("B", "KB", "MB", "GB"):
        if n < 1024:
            return f"{n:.1f} {unit}"
        n /= 1024
    return f"{n:.1f} TB"


def write_index(path: Path, kind: str, items: list[dict], summary_keys: list[tuple[str, str]]) -> None:
    lines = [f"# {kind.title()} (auto-generated)\n"]
    lines.append("> Regenerated by `scripts/gen_catalog.py`. Do not hand-edit.\n")
    lines.append("")
    lines.append(f"**Total: {len(items)}**.")
    lines.append("")
    headers = ["Name"] + [h for _, h in summary_keys]
    lines.append("| " + " | ".join(headers) + " |")
    lines.append("|" + "|".join(["---"] * len(headers)) + "|")
    for item in items:
        name = item["name"]
        link = f"[`{name}`]({slug(name)}.md)"
        row = [link] + [str(item.get(k, "") or "") for k, _ in summary_keys]
        # escape pipe chars in cell content
        row = [c.replace("|", "\\|") for c in row]
        lines.append("| " + " | ".join(row) + " |")
    path.write_text("\n".join(lines) + "\n", encoding="utf-8")


def gen_tables(conn, db: str, out_dir: Path) -> list[dict]:
    out_dir.mkdir(parents=True, exist_ok=True)
    cur = conn.cursor()
    cur.execute("""
        SELECT TABLE_NAME, TABLE_TYPE, ENGINE, TABLE_ROWS, DATA_LENGTH,
               TABLE_COLLATION, CREATE_TIME, UPDATE_TIME, TABLE_COMMENT
        FROM information_schema.tables
        WHERE table_schema = %s
        ORDER BY TABLE_NAME
    """, (db,))
    tables_rows = cur.fetchall()

    cur.execute("""
        SELECT TABLE_NAME, ORDINAL_POSITION, COLUMN_NAME, COLUMN_TYPE,
               IS_NULLABLE, COLUMN_KEY, COLUMN_DEFAULT, EXTRA, COLUMN_COMMENT
        FROM information_schema.columns
        WHERE table_schema = %s
        ORDER BY TABLE_NAME, ORDINAL_POSITION
    """, (db,))
    cols_by_table = collections.defaultdict(list)
    for c in cur.fetchall():
        cols_by_table[c["TABLE_NAME"]].append(c)

    cur.execute("""
        SELECT TABLE_NAME, INDEX_NAME, NON_UNIQUE, SEQ_IN_INDEX, COLUMN_NAME, INDEX_TYPE
        FROM information_schema.statistics
        WHERE table_schema = %s
        ORDER BY TABLE_NAME, INDEX_NAME, SEQ_IN_INDEX
    """, (db,))
    idx_by_table = collections.defaultdict(list)
    for i in cur.fetchall():
        idx_by_table[i["TABLE_NAME"]].append(i)

    base_tables = [t for t in tables_rows if t["TABLE_TYPE"] == "BASE TABLE"]
    views_meta = [t for t in tables_rows if t["TABLE_TYPE"] == "VIEW"]

    for t in base_tables:
        name = t["TABLE_NAME"]
        rows = t.get("TABLE_ROWS")
        size = fmt_bytes(t.get("DATA_LENGTH"))
        comment = (t.get("TABLE_COMMENT") or "").strip()
        cols = cols_by_table.get(name, [])
        idxs = idx_by_table.get(name, [])

        md = [f"# `{name}`\n"]
        if comment:
            md.append(f"> {comment}\n")
        md.append("")
        md.append(f"- **Rows (estimated):** {rows if rows is not None else ''}")
        md.append(f"- **Data size:** {size}")
        md.append(f"- **Engine:** {t.get('ENGINE') or ''}")
        md.append(f"- **Collation:** {t.get('TABLE_COLLATION') or ''}")
        md.append(f"- **Created:** {t.get('CREATE_TIME') or ''}")
        md.append(f"- **Updated:** {t.get('UPDATE_TIME') or ''}")
        md.append("")

        md.append("## Columns")
        md.append("")
        md.append("| # | Name | Type | Null | Key | Default | Extra | Comment |")
        md.append("|---|---|---|---|---|---|---|---|")
        for c in cols:
            md.append("| {pos} | `{cn}` | `{ct}` | {nu} | {kk} | {df} | {ex} | {cm} |".format(
                pos=c.get("ORDINAL_POSITION", ""),
                cn=c.get("COLUMN_NAME", ""),
                ct=c.get("COLUMN_TYPE", ""),
                nu=c.get("IS_NULLABLE", ""),
                kk=c.get("COLUMN_KEY", ""),
                df=("" if c.get("COLUMN_DEFAULT") is None else c.get("COLUMN_DEFAULT")).replace("|", "\\|") if isinstance(c.get("COLUMN_DEFAULT"), str) else (c.get("COLUMN_DEFAULT") or ""),
                ex=(c.get("EXTRA") or "").replace("|", "\\|"),
                cm=(c.get("COLUMN_COMMENT") or "").replace("|", "\\|").replace("\n", " "),
            ))
        md.append("")

        if idxs:
            md.append("## Indexes")
            md.append("")
            by_idx = collections.OrderedDict()
            for i in idxs:
                by_idx.setdefault(i["INDEX_NAME"], []).append(i)
            md.append("| Name | Unique | Type | Columns |")
            md.append("|---|---|---|---|")
            for idx_name, parts in by_idx.items():
                parts.sort(key=lambda x: int(x.get("SEQ_IN_INDEX") or 0))
                unique = "✓" if int(parts[0].get("NON_UNIQUE") or 0) == 0 else ""
                cols_in_idx = ", ".join(f"`{p['COLUMN_NAME']}`" for p in parts)
                md.append(f"| `{idx_name}` | {unique} | {parts[0].get('INDEX_TYPE') or ''} | {cols_in_idx} |")
            md.append("")

        md.append("## Narrative")
        md.append("")
        md.append("_No human-written narrative yet — when this table is exercised by a "
                  "[vertical slice](../../slices/index.md), link from there back to this page._")
        md.append("")

        (out_dir / f"{slug(name)}.md").write_text("\n".join(md), encoding="utf-8")

    items = []
    for t in sorted(base_tables, key=lambda x: int(x.get("DATA_LENGTH") or 0), reverse=True):
        items.append({
            "name": t["TABLE_NAME"],
            "rows": t.get("TABLE_ROWS"),
            "size": fmt_bytes(t.get("DATA_LENGTH")),
            "comment": (t.get("TABLE_COMMENT") or "").strip()[:80],
        })
    write_index(
        out_dir / "index.md", "tables", items,
        [("rows", "Rows"), ("size", "Size"), ("comment", "Comment")],
    )

    return views_meta


def gen_views(conn, db: str, out_dir: Path, views_meta: list[dict]) -> None:
    out_dir.mkdir(parents=True, exist_ok=True)
    cur = conn.cursor()
    cur.execute("""
        SELECT TABLE_NAME, VIEW_DEFINITION, IS_UPDATABLE, DEFINER
        FROM information_schema.views
        WHERE table_schema = %s
    """, (db,))
    def_by_name = {v["TABLE_NAME"]: v for v in cur.fetchall()}

    cur.execute("""
        SELECT TABLE_NAME, ORDINAL_POSITION, COLUMN_NAME, COLUMN_TYPE, IS_NULLABLE
        FROM information_schema.columns
        WHERE table_schema = %s
        ORDER BY TABLE_NAME, ORDINAL_POSITION
    """, (db,))
    cols_by_view = collections.defaultdict(list)
    for c in cur.fetchall():
        cols_by_view[c["TABLE_NAME"]].append(c)

    for v in views_meta:
        name = v["TABLE_NAME"]
        d = def_by_name.get(name, {})
        cols = cols_by_view.get(name, [])

        md = [f"# `{name}` (view)\n"]
        comment = (v.get("TABLE_COMMENT") or "").strip()
        # MySQL stores the literal "VIEW" in TABLE_COMMENT for views; ignore it.
        if comment and comment.upper() != "VIEW":
            md.append(f"> {comment}\n")
        md.append("")
        md.append(f"- **Updatable:** {d.get('IS_UPDATABLE') or ''}")
        md.append(f"- **Definer:** `{d.get('DEFINER') or ''}`")
        md.append("")
        if cols:
            md.append("## Columns")
            md.append("")
            md.append("| # | Name | Type | Null |")
            md.append("|---|---|---|---|")
            for c in cols:
                md.append(f"| {c.get('ORDINAL_POSITION') or ''} | `{c.get('COLUMN_NAME') or ''}` | `{c.get('COLUMN_TYPE') or ''}` | {c.get('IS_NULLABLE') or ''} |")
            md.append("")
        defn = (d.get("VIEW_DEFINITION") or "").strip()
        if defn:
            md.append("## Definition")
            md.append("")
            md.append("```sql")
            md.append(defn)
            md.append("```")
            md.append("")
        md.append("## Narrative")
        md.append("")
        md.append("_No human-written narrative yet._")
        md.append("")
        (out_dir / f"{slug(name)}.md").write_text("\n".join(md), encoding="utf-8")

    def _vc(v):
        c = (v.get("TABLE_COMMENT") or "").strip()
        return "" if c.upper() == "VIEW" else c[:80]

    items = [{"name": v["TABLE_NAME"], "comment": _vc(v)}
             for v in sorted(views_meta, key=lambda x: x["TABLE_NAME"])]
    write_index(out_dir / "index.md", "views", items, [("comment", "Comment")])


def gen_routines(conn, db: str, procs_dir: Path, fns_dir: Path) -> None:
    procs_dir.mkdir(parents=True, exist_ok=True)
    fns_dir.mkdir(parents=True, exist_ok=True)
    cur = conn.cursor()
    cur.execute("""
        SELECT ROUTINE_NAME, ROUTINE_TYPE, DTD_IDENTIFIER AS RETURN_TYPE,
               IS_DETERMINISTIC, SQL_DATA_ACCESS, CREATED, LAST_ALTERED,
               ROUTINE_COMMENT
        FROM information_schema.routines
        WHERE routine_schema = %s
        ORDER BY ROUTINE_NAME
    """, (db,))
    routines = cur.fetchall()

    cur.execute("""
        SELECT ROUTINE_TYPE, SPECIFIC_NAME, ORDINAL_POSITION, PARAMETER_MODE,
               PARAMETER_NAME, DTD_IDENTIFIER
        FROM information_schema.parameters
        WHERE specific_schema = %s
        ORDER BY SPECIFIC_NAME, ORDINAL_POSITION
    """, (db,))
    p_by_routine = collections.defaultdict(list)
    for p in cur.fetchall():
        p_by_routine[(p["ROUTINE_TYPE"], p["SPECIFIC_NAME"])].append(p)

    for r in routines:
        name = r["ROUTINE_NAME"]
        rtype = r["ROUTINE_TYPE"]
        out_dir = procs_dir if rtype == "PROCEDURE" else fns_dir
        prms = sorted(
            p_by_routine.get((rtype, name), []),
            key=lambda x: int(x.get("ORDINAL_POSITION") or 0),
        )

        md = [f"# `{name}` ({rtype.lower()})\n"]
        comment = (r.get("ROUTINE_COMMENT") or "").strip()
        if comment:
            md.append(f"> {comment}\n")
        md.append("")
        md.append(f"- **Type:** {rtype}")
        if rtype == "FUNCTION":
            md.append(f"- **Returns:** `{r.get('RETURN_TYPE') or ''}`")
        md.append(f"- **Deterministic:** {r.get('IS_DETERMINISTIC') or ''}")
        md.append(f"- **SQL data access:** {r.get('SQL_DATA_ACCESS') or ''}")
        md.append(f"- **Created:** {r.get('CREATED') or ''}")
        md.append(f"- **Last altered:** {r.get('LAST_ALTERED') or ''}")
        md.append("")

        md.append("## Parameters")
        md.append("")
        if prms:
            md.append("| # | Mode | Name | Type |")
            md.append("|---|---|---|---|")
            for p in prms:
                # Functions list a position-0 row for the return type with no name —
                # skip it; we render the return type via RETURN_TYPE above.
                if rtype == "FUNCTION" and (p.get("PARAMETER_NAME") is None or int(p.get("ORDINAL_POSITION") or 0) == 0):
                    continue
                md.append(f"| {p.get('ORDINAL_POSITION') or ''} | {p.get('PARAMETER_MODE') or ''} | `{p.get('PARAMETER_NAME') or ''}` | `{p.get('DTD_IDENTIFIER') or ''}` |")
        else:
            md.append("_No parameters._")
        md.append("")

        md.append("## Body")
        md.append("")
        md.append("_Body is not pre-cached. To inspect: `mysql --defaults-file=~/.my.cnf "
                  f"-e 'SHOW CREATE {rtype} `{name}`'`._")
        md.append("")

        md.append("## Narrative")
        md.append("")
        md.append("_No human-written narrative yet._")
        md.append("")

        (out_dir / f"{slug(name)}.md").write_text("\n".join(md), encoding="utf-8")

    procs = [r for r in routines if r["ROUTINE_TYPE"] == "PROCEDURE"]
    fns = [r for r in routines if r["ROUTINE_TYPE"] == "FUNCTION"]

    write_index(
        procs_dir / "index.md", "procedures",
        [{"name": r["ROUTINE_NAME"], "comment": (r.get("ROUTINE_COMMENT") or "").strip()[:80]}
         for r in sorted(procs, key=lambda x: x["ROUTINE_NAME"])],
        [("comment", "Comment")],
    )
    write_index(
        fns_dir / "index.md", "functions",
        [{"name": r["ROUTINE_NAME"], "comment": (r.get("ROUTINE_COMMENT") or "").strip()[:80]}
         for r in sorted(fns, key=lambda x: x["ROUTINE_NAME"])],
        [("comment", "Comment")],
    )


def main() -> int:
    cnf, db = read_my_cnf()
    print(f"Connecting to {cnf['host']}:{cnf['port']} db={db} as {cnf['user']}")
    conn = pymysql.connect(database=db, **cnf)
    try:
        # Wipe and rewrite each output dir to keep it strictly in sync with the live DB.
        for sub in ("tables", "views", "procedures", "functions"):
            d = OUT / sub
            if d.exists():
                shutil.rmtree(d)
        print(f"Writing to {OUT}")
        views_meta = gen_tables(conn, db, OUT / "tables")
        gen_views(conn, db, OUT / "views", views_meta)
        gen_routines(conn, db, OUT / "procedures", OUT / "functions")
        print("Done.")
    finally:
        conn.close()
    return 0


if __name__ == "__main__":
    sys.exit(main())