gen_catalog.py 12.3 KB
#!/usr/bin/env python3
"""
gen_catalog.py — turn recon/*.tsv into per-object markdown stubs in docs/auto-catalog/.

Reads:
    ../recon/tables.tsv
    ../recon/columns.tsv
    ../recon/indexes.tsv
    ../recon/routines.tsv
    ../recon/routine_params.tsv
    ../recon/views.tsv

Writes:
    docs/auto-catalog/tables/<table_name>.md
    docs/auto-catalog/views/<view_name>.md
    docs/auto-catalog/procedures/<proc_name>.md
    docs/auto-catalog/functions/<fn_name>.md
    docs/auto-catalog/{tables,views,procedures,functions}/index.md  (sortable lists)

Idempotent: rewrites everything under docs/auto-catalog/ except the four leaf
sub-directories' index.md headers (those are regenerated too).
"""
from __future__ import annotations

import csv
import collections
import sys
from pathlib import Path

ROOT = Path(__file__).resolve().parent.parent
RECON = ROOT.parent / "recon"
OUT = ROOT / "docs" / "auto-catalog"


def read_tsv(path: Path) -> list[dict[str, str]]:
    # Read as bytes, split *only* on LF, then strip CR/NUL. Python's text-mode
    # `open()` does universal-newline translation that treats a bare CR as a line
    # break — but procs in xly were authored on Windows and routine comments
    # contain bare CRs from CRLF endings, splitting one logical row across many.
    # csv.QUOTE_NONE: MySQL's TSV is unquoted; a stray `"` would otherwise cause
    # the reader to swallow multiple lines into one field.
    raw = path.read_bytes().decode("utf-8", errors="replace")
    lines = (ln.replace("\x00", "").replace("\r", " ") for ln in raw.split("\n"))
    reader = csv.DictReader(lines, delimiter="\t", quoting=csv.QUOTE_NONE)
    return [{k: ("" if v in ("NULL", None) else v) for k, v in row.items()} for row in reader]


_SAFE_FILENAME = str.maketrans({c: "_" for c in r'/\:*?"<>|()'})

def slug(name: str) -> str:
    """Filename-safe slug. Routine names with parens or path separators get sanitized;
    everything else (including CJK) passes through — modern filesystems handle UTF-8.
    """
    return name.translate(_SAFE_FILENAME).strip()


def fmt_bytes(n_str: str) -> str:
    if not n_str:
        return ""
    try:
        n = int(n_str)
    except ValueError:
        return n_str
    for unit in ("B", "KB", "MB", "GB"):
        if n < 1024:
            return f"{n:.1f} {unit}"
        n /= 1024
    return f"{n:.1f} TB"


def write_index(path: Path, kind: str, items: list[dict[str, str]], summary_keys: list[tuple[str, str]]) -> None:
    """items: list of dicts with 'name' + extra columns. summary_keys: list of (key, header)."""
    lines = [f"# {kind.title()} (auto-generated)\n"]
    lines.append("> Regenerated by `scripts/gen_catalog.py`. Do not hand-edit.\n")
    lines.append("")
    lines.append(f"**Total: {len(items)}**.")
    lines.append("")
    headers = ["Name"] + [h for _, h in summary_keys]
    lines.append("| " + " | ".join(headers) + " |")
    lines.append("|" + "|".join(["---"] * len(headers)) + "|")
    for item in items:
        name = item["name"]
        link = f"[`{name}`]({slug(name)}.md)"
        row = [link] + [item.get(k, "") for k, _ in summary_keys]
        lines.append("| " + " | ".join(row) + " |")
    path.write_text("\n".join(lines) + "\n", encoding="utf-8")


def gen_tables(out_dir: Path) -> None:
    out_dir.mkdir(parents=True, exist_ok=True)
    tables = read_tsv(RECON / "tables.tsv")
    columns = read_tsv(RECON / "columns.tsv")
    indexes = read_tsv(RECON / "indexes.tsv")

    cols_by_table = collections.defaultdict(list)
    for c in columns:
        cols_by_table[c["TABLE_NAME"]].append(c)

    idx_by_table = collections.defaultdict(list)
    for i in indexes:
        idx_by_table[i["TABLE_NAME"]].append(i)

    base_tables = [t for t in tables if t["TABLE_TYPE"] == "BASE TABLE"]
    views = [t for t in tables if t["TABLE_TYPE"] == "VIEW"]

    for t in base_tables:
        name = t["TABLE_NAME"]
        rows = t.get("TABLE_ROWS", "")
        size = fmt_bytes(t.get("DATA_LENGTH", ""))
        comment = t.get("TABLE_COMMENT", "").strip()
        cols = cols_by_table.get(name, [])
        idxs = idx_by_table.get(name, [])

        md = [f"# `{name}`\n"]
        if comment:
            md.append(f"> {comment}\n")
        md.append("")
        md.append(f"- **Rows (estimated):** {rows}")
        md.append(f"- **Data size:** {size}")
        md.append(f"- **Engine:** {t.get('ENGINE','')}")
        md.append(f"- **Collation:** {t.get('TABLE_COLLATION','')}")
        md.append(f"- **Created:** {t.get('CREATE_TIME','')}")
        md.append(f"- **Updated:** {t.get('UPDATE_TIME','')}")
        md.append("")

        # columns
        md.append("## Columns")
        md.append("")
        md.append("| # | Name | Type | Null | Key | Default | Extra | Comment |")
        md.append("|---|---|---|---|---|---|---|---|")
        for c in cols:
            md.append("| {ORDINAL_POSITION} | `{COLUMN_NAME}` | `{COLUMN_TYPE}` | {IS_NULLABLE} | {COLUMN_KEY} | {default} | {EXTRA} | {COLUMN_COMMENT} |".format(
                ORDINAL_POSITION=c.get("ORDINAL_POSITION", ""),
                COLUMN_NAME=c.get("COLUMN_NAME", ""),
                COLUMN_TYPE=c.get("COLUMN_TYPE", ""),
                IS_NULLABLE=c.get("IS_NULLABLE", ""),
                COLUMN_KEY=c.get("COLUMN_KEY", ""),
                default=(c.get("COLUMN_DEFAULT") or ""),
                EXTRA=c.get("EXTRA", ""),
                COLUMN_COMMENT=c.get("COLUMN_COMMENT", "").replace("|", "\\|"),
            ))
        md.append("")

        # indexes (grouped by name, ordered by SEQ_IN_INDEX)
        if idxs:
            md.append("## Indexes")
            md.append("")
            by_idx = collections.OrderedDict()
            for i in idxs:
                by_idx.setdefault(i["INDEX_NAME"], []).append(i)
            md.append("| Name | Unique | Type | Columns |")
            md.append("|---|---|---|---|")
            for idx_name, parts in by_idx.items():
                parts.sort(key=lambda x: int(x.get("SEQ_IN_INDEX", "0") or 0))
                unique = "✓" if parts[0].get("NON_UNIQUE") == "0" else ""
                cols_in_idx = ", ".join(f"`{p['COLUMN_NAME']}`" for p in parts)
                md.append(f"| `{idx_name}` | {unique} | {parts[0].get('INDEX_TYPE','')} | {cols_in_idx} |")
            md.append("")

        md.append("## Narrative")
        md.append("")
        md.append("_No human-written narrative yet — when this table is exercised by a "
                  "[vertical slice](../../slices/index.md), link from there back to this page._")
        md.append("")

        (out_dir / f"{slug(name)}.md").write_text("\n".join(md), encoding="utf-8")

    # Index page — list all base tables, sortable by size desc
    items = []
    for t in sorted(base_tables, key=lambda x: int(x.get("DATA_LENGTH") or 0), reverse=True):
        items.append({
            "name": t["TABLE_NAME"],
            "rows": t.get("TABLE_ROWS", ""),
            "size": fmt_bytes(t.get("DATA_LENGTH", "")),
            "comment": (t.get("TABLE_COMMENT") or "").strip()[:80],
        })
    write_index(
        out_dir / "index.md", "tables", items,
        [("rows", "Rows"), ("size", "Size"), ("comment", "Comment")],
    )

    return views  # pass through for view generation


def gen_views(out_dir: Path, views_meta: list[dict]) -> None:
    out_dir.mkdir(parents=True, exist_ok=True)
    view_defs = read_tsv(RECON / "views.tsv")
    def_by_name = {v["TABLE_NAME"]: v for v in view_defs}
    columns = read_tsv(RECON / "columns.tsv")
    cols_by_view = collections.defaultdict(list)
    for c in columns:
        cols_by_view[c["TABLE_NAME"]].append(c)

    for v in views_meta:
        name = v["TABLE_NAME"]
        d = def_by_name.get(name, {})
        cols = cols_by_view.get(name, [])

        md = [f"# `{name}` (view)\n"]
        comment = (v.get("TABLE_COMMENT") or "").strip()
        if comment:
            md.append(f"> {comment}\n")
        md.append("")
        md.append(f"- **Updatable:** {d.get('IS_UPDATABLE','')}")
        md.append(f"- **Definer:** `{d.get('DEFINER','')}`")
        md.append("")
        if cols:
            md.append("## Columns")
            md.append("")
            md.append("| # | Name | Type | Null |")
            md.append("|---|---|---|---|")
            for c in cols:
                md.append(f"| {c.get('ORDINAL_POSITION','')} | `{c.get('COLUMN_NAME','')}` | `{c.get('COLUMN_TYPE','')}` | {c.get('IS_NULLABLE','')} |")
            md.append("")
        defn = d.get("VIEW_DEFINITION", "")
        if defn:
            md.append("## Definition")
            md.append("")
            md.append("```sql")
            md.append(defn)
            md.append("```")
            md.append("")
        md.append("## Narrative")
        md.append("")
        md.append("_No human-written narrative yet._")
        md.append("")
        (out_dir / f"{slug(name)}.md").write_text("\n".join(md), encoding="utf-8")

    # MySQL fills information_schema.TABLES.TABLE_COMMENT with the literal "VIEW"
    # for views; treat that as no comment.
    def _vc(v):
        c = (v.get("TABLE_COMMENT") or "").strip()
        return "" if c == "VIEW" else c[:80]
    items = [{"name": v["TABLE_NAME"], "comment": _vc(v)} for v in sorted(views_meta, key=lambda x: x["TABLE_NAME"])]
    write_index(out_dir / "index.md", "views", items, [("comment", "Comment")])


def gen_routines(procs_dir: Path, fns_dir: Path) -> None:
    procs_dir.mkdir(parents=True, exist_ok=True)
    fns_dir.mkdir(parents=True, exist_ok=True)
    routines = read_tsv(RECON / "routines.tsv")
    params = read_tsv(RECON / "routine_params.tsv")

    p_by_routine = collections.defaultdict(list)
    for p in params:
        p_by_routine[(p["ROUTINE_TYPE"], p["SPECIFIC_NAME"])].append(p)

    for r in routines:
        name = r["ROUTINE_NAME"]
        rtype = r["ROUTINE_TYPE"]
        out_dir = procs_dir if rtype == "PROCEDURE" else fns_dir
        prms = sorted(p_by_routine.get((rtype, name), []), key=lambda x: int(x.get("ORDINAL_POSITION", "0") or 0))

        md = [f"# `{name}` ({rtype.lower()})\n"]
        comment = (r.get("ROUTINE_COMMENT") or "").strip()
        if comment:
            md.append(f"> {comment}\n")
        md.append("")
        md.append(f"- **Type:** {rtype}")
        if rtype == "FUNCTION":
            md.append(f"- **Returns:** `{r.get('RETURN_TYPE','')}`")
        md.append(f"- **Deterministic:** {r.get('IS_DETERMINISTIC','')}")
        md.append(f"- **SQL data access:** {r.get('SQL_DATA_ACCESS','')}")
        md.append(f"- **Created:** {r.get('CREATED','')}")
        md.append(f"- **Last altered:** {r.get('LAST_ALTERED','')}")
        md.append("")

        md.append("## Parameters")
        md.append("")
        if prms:
            md.append("| # | Mode | Name | Type |")
            md.append("|---|---|---|---|")
            for p in prms:
                md.append(f"| {p.get('ORDINAL_POSITION','')} | {p.get('PARAMETER_MODE','')} | `{p.get('PARAMETER_NAME','')}` | `{p.get('DTD_IDENTIFIER','')}` |")
        else:
            md.append("_No parameters._")
        md.append("")

        md.append("## Body")
        md.append("")
        md.append("_Body is not pre-cached. To inspect: `mysql --defaults-file=~/.my.cnf "
                  f"-e 'SHOW CREATE {rtype} `{name}`'`._")
        md.append("")

        md.append("## Narrative")
        md.append("")
        md.append("_No human-written narrative yet._")
        md.append("")

        (out_dir / f"{slug(name)}.md").write_text("\n".join(md), encoding="utf-8")

    procs = [r for r in routines if r["ROUTINE_TYPE"] == "PROCEDURE"]
    fns = [r for r in routines if r["ROUTINE_TYPE"] == "FUNCTION"]

    write_index(
        procs_dir / "index.md", "procedures",
        [{"name": r["ROUTINE_NAME"], "comment": (r.get("ROUTINE_COMMENT") or "").strip()[:80]} for r in sorted(procs, key=lambda x: x["ROUTINE_NAME"])],
        [("comment", "Comment")],
    )
    write_index(
        fns_dir / "index.md", "functions",
        [{"name": r["ROUTINE_NAME"], "comment": (r.get("ROUTINE_COMMENT") or "").strip()[:80]} for r in sorted(fns, key=lambda x: x["ROUTINE_NAME"])],
        [("comment", "Comment")],
    )


def main() -> int:
    if not RECON.exists():
        print(f"recon dir not found at {RECON}", file=sys.stderr)
        return 1
    print(f"Reading from {RECON}")
    print(f"Writing to   {OUT}")
    views = gen_tables(OUT / "tables")
    gen_views(OUT / "views", views)
    gen_routines(OUT / "procedures", OUT / "functions")
    print("Done.")
    return 0


if __name__ == "__main__":
    sys.exit(main())