#!/usr/bin/env python3 """ gen_catalog.py — turn recon/*.tsv into per-object markdown stubs in docs/auto-catalog/. Reads: ../recon/tables.tsv ../recon/columns.tsv ../recon/indexes.tsv ../recon/routines.tsv ../recon/routine_params.tsv ../recon/views.tsv Writes: docs/auto-catalog/tables/.md docs/auto-catalog/views/.md docs/auto-catalog/procedures/.md docs/auto-catalog/functions/.md docs/auto-catalog/{tables,views,procedures,functions}/index.md (sortable lists) Idempotent: rewrites everything under docs/auto-catalog/ except the four leaf sub-directories' index.md headers (those are regenerated too). """ from __future__ import annotations import csv import collections import sys from pathlib import Path ROOT = Path(__file__).resolve().parent.parent RECON = ROOT.parent / "recon" OUT = ROOT / "docs" / "auto-catalog" def read_tsv(path: Path) -> list[dict[str, str]]: # Read as bytes, split *only* on LF, then strip CR/NUL. Python's text-mode # `open()` does universal-newline translation that treats a bare CR as a line # break — but procs in xly were authored on Windows and routine comments # contain bare CRs from CRLF endings, splitting one logical row across many. # csv.QUOTE_NONE: MySQL's TSV is unquoted; a stray `"` would otherwise cause # the reader to swallow multiple lines into one field. raw = path.read_bytes().decode("utf-8", errors="replace") lines = (ln.replace("\x00", "").replace("\r", " ") for ln in raw.split("\n")) reader = csv.DictReader(lines, delimiter="\t", quoting=csv.QUOTE_NONE) return [{k: ("" if v in ("NULL", None) else v) for k, v in row.items()} for row in reader] _SAFE_FILENAME = str.maketrans({c: "_" for c in r'/\:*?"<>|()'}) def slug(name: str) -> str: """Filename-safe slug. Routine names with parens or path separators get sanitized; everything else (including CJK) passes through — modern filesystems handle UTF-8. """ return name.translate(_SAFE_FILENAME).strip() def fmt_bytes(n_str: str) -> str: if not n_str: return "" try: n = int(n_str) except ValueError: return n_str for unit in ("B", "KB", "MB", "GB"): if n < 1024: return f"{n:.1f} {unit}" n /= 1024 return f"{n:.1f} TB" def write_index(path: Path, kind: str, items: list[dict[str, str]], summary_keys: list[tuple[str, str]]) -> None: """items: list of dicts with 'name' + extra columns. summary_keys: list of (key, header).""" lines = [f"# {kind.title()} (auto-generated)\n"] lines.append("> Regenerated by `scripts/gen_catalog.py`. Do not hand-edit.\n") lines.append("") lines.append(f"**Total: {len(items)}**.") lines.append("") headers = ["Name"] + [h for _, h in summary_keys] lines.append("| " + " | ".join(headers) + " |") lines.append("|" + "|".join(["---"] * len(headers)) + "|") for item in items: name = item["name"] link = f"[`{name}`]({slug(name)}.md)" row = [link] + [item.get(k, "") for k, _ in summary_keys] lines.append("| " + " | ".join(row) + " |") path.write_text("\n".join(lines) + "\n", encoding="utf-8") def gen_tables(out_dir: Path) -> None: out_dir.mkdir(parents=True, exist_ok=True) tables = read_tsv(RECON / "tables.tsv") columns = read_tsv(RECON / "columns.tsv") indexes = read_tsv(RECON / "indexes.tsv") cols_by_table = collections.defaultdict(list) for c in columns: cols_by_table[c["TABLE_NAME"]].append(c) idx_by_table = collections.defaultdict(list) for i in indexes: idx_by_table[i["TABLE_NAME"]].append(i) base_tables = [t for t in tables if t["TABLE_TYPE"] == "BASE TABLE"] views = [t for t in tables if t["TABLE_TYPE"] == "VIEW"] for t in base_tables: name = t["TABLE_NAME"] rows = t.get("TABLE_ROWS", "") size = fmt_bytes(t.get("DATA_LENGTH", "")) comment = t.get("TABLE_COMMENT", "").strip() cols = cols_by_table.get(name, []) idxs = idx_by_table.get(name, []) md = [f"# `{name}`\n"] if comment: md.append(f"> {comment}\n") md.append("") md.append(f"- **Rows (estimated):** {rows}") md.append(f"- **Data size:** {size}") md.append(f"- **Engine:** {t.get('ENGINE','')}") md.append(f"- **Collation:** {t.get('TABLE_COLLATION','')}") md.append(f"- **Created:** {t.get('CREATE_TIME','')}") md.append(f"- **Updated:** {t.get('UPDATE_TIME','')}") md.append("") # columns md.append("## Columns") md.append("") md.append("| # | Name | Type | Null | Key | Default | Extra | Comment |") md.append("|---|---|---|---|---|---|---|---|") for c in cols: md.append("| {ORDINAL_POSITION} | `{COLUMN_NAME}` | `{COLUMN_TYPE}` | {IS_NULLABLE} | {COLUMN_KEY} | {default} | {EXTRA} | {COLUMN_COMMENT} |".format( ORDINAL_POSITION=c.get("ORDINAL_POSITION", ""), COLUMN_NAME=c.get("COLUMN_NAME", ""), COLUMN_TYPE=c.get("COLUMN_TYPE", ""), IS_NULLABLE=c.get("IS_NULLABLE", ""), COLUMN_KEY=c.get("COLUMN_KEY", ""), default=(c.get("COLUMN_DEFAULT") or ""), EXTRA=c.get("EXTRA", ""), COLUMN_COMMENT=c.get("COLUMN_COMMENT", "").replace("|", "\\|"), )) md.append("") # indexes (grouped by name, ordered by SEQ_IN_INDEX) if idxs: md.append("## Indexes") md.append("") by_idx = collections.OrderedDict() for i in idxs: by_idx.setdefault(i["INDEX_NAME"], []).append(i) md.append("| Name | Unique | Type | Columns |") md.append("|---|---|---|---|") for idx_name, parts in by_idx.items(): parts.sort(key=lambda x: int(x.get("SEQ_IN_INDEX", "0") or 0)) unique = "✓" if parts[0].get("NON_UNIQUE") == "0" else "" cols_in_idx = ", ".join(f"`{p['COLUMN_NAME']}`" for p in parts) md.append(f"| `{idx_name}` | {unique} | {parts[0].get('INDEX_TYPE','')} | {cols_in_idx} |") md.append("") md.append("## Narrative") md.append("") md.append("_No human-written narrative yet — when this table is exercised by a " "[vertical slice](../../slices/index.md), link from there back to this page._") md.append("") (out_dir / f"{slug(name)}.md").write_text("\n".join(md), encoding="utf-8") # Index page — list all base tables, sortable by size desc items = [] for t in sorted(base_tables, key=lambda x: int(x.get("DATA_LENGTH") or 0), reverse=True): items.append({ "name": t["TABLE_NAME"], "rows": t.get("TABLE_ROWS", ""), "size": fmt_bytes(t.get("DATA_LENGTH", "")), "comment": (t.get("TABLE_COMMENT") or "").strip()[:80], }) write_index( out_dir / "index.md", "tables", items, [("rows", "Rows"), ("size", "Size"), ("comment", "Comment")], ) return views # pass through for view generation def gen_views(out_dir: Path, views_meta: list[dict]) -> None: out_dir.mkdir(parents=True, exist_ok=True) view_defs = read_tsv(RECON / "views.tsv") def_by_name = {v["TABLE_NAME"]: v for v in view_defs} columns = read_tsv(RECON / "columns.tsv") cols_by_view = collections.defaultdict(list) for c in columns: cols_by_view[c["TABLE_NAME"]].append(c) for v in views_meta: name = v["TABLE_NAME"] d = def_by_name.get(name, {}) cols = cols_by_view.get(name, []) md = [f"# `{name}` (view)\n"] comment = (v.get("TABLE_COMMENT") or "").strip() if comment: md.append(f"> {comment}\n") md.append("") md.append(f"- **Updatable:** {d.get('IS_UPDATABLE','')}") md.append(f"- **Definer:** `{d.get('DEFINER','')}`") md.append("") if cols: md.append("## Columns") md.append("") md.append("| # | Name | Type | Null |") md.append("|---|---|---|---|") for c in cols: md.append(f"| {c.get('ORDINAL_POSITION','')} | `{c.get('COLUMN_NAME','')}` | `{c.get('COLUMN_TYPE','')}` | {c.get('IS_NULLABLE','')} |") md.append("") defn = d.get("VIEW_DEFINITION", "") if defn: md.append("## Definition") md.append("") md.append("```sql") md.append(defn) md.append("```") md.append("") md.append("## Narrative") md.append("") md.append("_No human-written narrative yet._") md.append("") (out_dir / f"{slug(name)}.md").write_text("\n".join(md), encoding="utf-8") # MySQL fills information_schema.TABLES.TABLE_COMMENT with the literal "VIEW" # for views; treat that as no comment. def _vc(v): c = (v.get("TABLE_COMMENT") or "").strip() return "" if c == "VIEW" else c[:80] items = [{"name": v["TABLE_NAME"], "comment": _vc(v)} for v in sorted(views_meta, key=lambda x: x["TABLE_NAME"])] write_index(out_dir / "index.md", "views", items, [("comment", "Comment")]) def gen_routines(procs_dir: Path, fns_dir: Path) -> None: procs_dir.mkdir(parents=True, exist_ok=True) fns_dir.mkdir(parents=True, exist_ok=True) routines = read_tsv(RECON / "routines.tsv") params = read_tsv(RECON / "routine_params.tsv") p_by_routine = collections.defaultdict(list) for p in params: p_by_routine[(p["ROUTINE_TYPE"], p["SPECIFIC_NAME"])].append(p) for r in routines: name = r["ROUTINE_NAME"] rtype = r["ROUTINE_TYPE"] out_dir = procs_dir if rtype == "PROCEDURE" else fns_dir prms = sorted(p_by_routine.get((rtype, name), []), key=lambda x: int(x.get("ORDINAL_POSITION", "0") or 0)) md = [f"# `{name}` ({rtype.lower()})\n"] comment = (r.get("ROUTINE_COMMENT") or "").strip() if comment: md.append(f"> {comment}\n") md.append("") md.append(f"- **Type:** {rtype}") if rtype == "FUNCTION": md.append(f"- **Returns:** `{r.get('RETURN_TYPE','')}`") md.append(f"- **Deterministic:** {r.get('IS_DETERMINISTIC','')}") md.append(f"- **SQL data access:** {r.get('SQL_DATA_ACCESS','')}") md.append(f"- **Created:** {r.get('CREATED','')}") md.append(f"- **Last altered:** {r.get('LAST_ALTERED','')}") md.append("") md.append("## Parameters") md.append("") if prms: md.append("| # | Mode | Name | Type |") md.append("|---|---|---|---|") for p in prms: md.append(f"| {p.get('ORDINAL_POSITION','')} | {p.get('PARAMETER_MODE','')} | `{p.get('PARAMETER_NAME','')}` | `{p.get('DTD_IDENTIFIER','')}` |") else: md.append("_No parameters._") md.append("") md.append("## Body") md.append("") md.append("_Body is not pre-cached. To inspect: `mysql --defaults-file=~/.my.cnf " f"-e 'SHOW CREATE {rtype} `{name}`'`._") md.append("") md.append("## Narrative") md.append("") md.append("_No human-written narrative yet._") md.append("") (out_dir / f"{slug(name)}.md").write_text("\n".join(md), encoding="utf-8") procs = [r for r in routines if r["ROUTINE_TYPE"] == "PROCEDURE"] fns = [r for r in routines if r["ROUTINE_TYPE"] == "FUNCTION"] write_index( procs_dir / "index.md", "procedures", [{"name": r["ROUTINE_NAME"], "comment": (r.get("ROUTINE_COMMENT") or "").strip()[:80]} for r in sorted(procs, key=lambda x: x["ROUTINE_NAME"])], [("comment", "Comment")], ) write_index( fns_dir / "index.md", "functions", [{"name": r["ROUTINE_NAME"], "comment": (r.get("ROUTINE_COMMENT") or "").strip()[:80]} for r in sorted(fns, key=lambda x: x["ROUTINE_NAME"])], [("comment", "Comment")], ) def main() -> int: if not RECON.exists(): print(f"recon dir not found at {RECON}", file=sys.stderr) return 1 print(f"Reading from {RECON}") print(f"Writing to {OUT}") views = gen_tables(OUT / "tables") gen_views(OUT / "views", views) gen_routines(OUT / "procedures", OUT / "functions") print("Done.") return 0 if __name__ == "__main__": sys.exit(main())