From 00817cf65e90536218dcffd313d146b111ee03c1 Mon Sep 17 00:00:00 2001 From: qazal <77887910+Qazalin@users.noreply.github.com> Date: Tue, 17 Mar 2026 21:14:20 +0200 Subject: [PATCH] viz: all tests can run on the NULL device (#15328) * remove that * move to test_viz * get_cfg * do not use os.environ * hm * it's always on NULL * import renderer * no import * --- .github/workflows/test.yml | 2 +- extra/viz/cli.py | 5 +- test/null/test_viz.py | 189 +++++++++++++++++++++++++++++ test/testextra/test_cfg_viz.py | 214 --------------------------------- 4 files changed, 192 insertions(+), 218 deletions(-) delete mode 100644 test/testextra/test_cfg_viz.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0f774ff5b8..10be88f8e7 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -735,7 +735,7 @@ jobs: python3 -c "from tinygrad import Device; assert Device.DEFAULT in ['AMD'], Device.DEFAULT" DEBUG=5 FORWARD_ONLY=1 python3 test/test_tiny.py TestTiny.test_plus - name: Run pytest (amd) - run: python -m pytest -n=auto test/backend/test_ops.py test/backend/test_dtype.py test/backend/test_dtype_alu.py test/backend/test_linearizer.py test/backend/test_randomness.py test/backend/test_jit.py test/backend/test_graph.py test/backend/test_multitensor.py test/device/test_hcq.py test/testextra/test_cfg_viz.py test/external/external_test_am.py --durations=20 + run: python -m pytest -n=auto test/backend/test_ops.py test/backend/test_dtype.py test/backend/test_dtype_alu.py test/backend/test_linearizer.py test/backend/test_randomness.py test/backend/test_jit.py test/backend/test_graph.py test/backend/test_multitensor.py test/device/test_hcq.py test/external/external_test_am.py --durations=20 - name: Run TRANSCENDENTAL math run: TRANSCENDENTAL=2 python -m pytest -n=auto test/backend/test_ops.py::TestOps::test_sin test/backend/test_ops.py::TestOps::test_cos test/backend/test_ops.py::TestOps::test_tan test/backend/test_ops.py::TestOps::test_exp test/backend/test_ops.py::TestOps::test_log --durations=20 - name: Run process replay tests diff --git a/extra/viz/cli.py b/extra/viz/cli.py index d82e21cb66..91548b0f1c 100755 --- a/extra/viz/cli.py +++ b/extra/viz/cli.py @@ -1,11 +1,9 @@ #!/usr/bin/env python3 -import os -os.environ["VIZ"] = "0" import argparse, pathlib, sys, struct, json from typing import Iterator from tinygrad.viz import serve as viz from tinygrad.uop.ops import RewriteTrace -from tinygrad.helpers import temp, ansistrip, colored, time_to_str, ansilen +from tinygrad.helpers import temp, ansistrip, colored, time_to_str, ansilen, Context # ** generic helpers @@ -61,6 +59,7 @@ def decode_profile(data:bytes) -> dict: return {"dur":total_dur, "peak":global_peak, "layout":layout, "markers":markers} if __name__ == "__main__": + Context(VIZ=0, TRACK_MATCH_STATS=0).__enter__() parser = argparse.ArgumentParser() g_mode = parser.add_argument_group("mode") g_mode.add_argument("--profile", action="store_true", help="View profile trace") diff --git a/test/null/test_viz.py b/test/null/test_viz.py index 10758ab5e1..d840a2ea9b 100644 --- a/test/null/test_viz.py +++ b/test/null/test_viz.py @@ -642,5 +642,194 @@ class TestVizMemoryLayout(BaseTestViz): users = profile["layout"][f"{a.device} Memory"]["events"].pop()["arg"]["users"] self.assertEqual(len(programs), len(set(users)), n) +from tinygrad.uop.ops import KernelInfo +from tinygrad.viz.serve import amdgpu_cfg +from tinygrad.renderer.amd.dsl import s +from tinygrad.runtime.autogen.amd.rdna3.ins import (s_add_u32, s_branch, s_cbranch_execz, s_cbranch_scc0, s_cbranch_scc1, s_cmp_eq_i32, + s_cmp_eq_u64, s_code_end, s_endpgm, s_mov_b32, s_nop) +from extra.gemm.amd_asm_matmul import Kernel +from tinygrad.renderer.cstyle import AMDHIPRenderer + +class TestCfg(unittest.TestCase): + def setUp(self): self.arch = "gfx1100" + + def get_cfg(self, name:str, k:Kernel): + insts = k.finalize() + def fxn(out:UOp) -> UOp: + lidx = UOp.special(1, "lidx0") + gidx = UOp.special(1, "gidx0") + sink = UOp.sink(out.base, lidx, gidx, arg=KernelInfo(name=name)) + return UOp(Ops.PROGRAM, src=(sink, UOp(Ops.DEVICE, arg="NULL"), UOp(Ops.LINEAR, src=tuple([UOp(Ops.INS, arg=x) for x in insts])))) + with Context(EMULATE="AMD"): + out = Tensor.custom_kernel(Tensor.empty(1, device="NULL"), fxn=fxn)[0] + # TODO: uncomment the better version once EMULATE works in Context + #prg = out.schedule()[-1].lower().prg.p + prg = get_program(out.schedule()[-1].ast, AMDHIPRenderer(self.arch)) + return amdgpu_cfg(prg.lib, self.arch) + + def test_simple(self): + k = Kernel(arch=self.arch) + k.label("entry") + k.emit(s_branch(), target="bb1") + k.label("bb1") + k.emit(s_endpgm()) + k.emit(s_code_end()) + cfg = self.get_cfg("simple", k)["data"] + self.assertEqual(len(cfg["blocks"]), 2) + + def test_diamond(self): + k = Kernel(arch=self.arch) + k.label("entry") + k.emit(s_mov_b32(s[0], 0)) + k.emit(s_mov_b32(s[1], 0)) + k.emit(s_cmp_eq_u64(s[0:1], 0)) + k.emit(s_cbranch_scc1(), target="if") + k.emit(s_branch(), target="else") + k.label("if") + k.emit(s_nop(1)) + k.emit(s_branch(), target="end") + k.label("else") + k.emit(s_nop(0)) + k.label("end") + k.emit(s_endpgm()) + k.emit(s_code_end()) + ret = self.get_cfg("diamond", k) + cfg = ret["data"] + self.assertEqual(len(cfg["blocks"]), 5) + edge_count = sum(len(v) for v in cfg["paths"].values()) + self.assertEqual(edge_count, 5) + references:dict[str, list[str]] = {} + for pc, tokens in cfg["pc_tokens"].items(): + for t in tokens: + for key in t["keys"]: references.setdefault(key, []).append(pc) + self.assertEqual(len(references["r0"]), 2) + insts = [cfg["pc_tokens"][pc][0]["st"] for pc in references["r0"]] + self.assertEqual(insts, ['s_mov_b32', 's_cmp_eq_u64']) + end_block = [" ".join(t["st"] for t in cfg["pc_tokens"][pc]) for pc in list(cfg["blocks"].values())[-1]] + code_line = ret["src"].splitlines()[-1] + self.assertEqual(len(end_block), 2) + for st in [end_block[-1], code_line]: + assert st.startswith("s_code_end") and st.endswith("x)"), st + + def test_loop(self): + k = Kernel(arch=self.arch) + k.label("entry") + k.emit(s_mov_b32(s[1], 4)) + k.label("loop") + k.emit(s_add_u32(s[1], s[1], -1)) + k.emit(s_cmp_eq_i32(s[1], 0)) + k.emit(s_cbranch_scc0(), target="loop") + k.emit(s_endpgm()) + k.emit(s_code_end()) + self.get_cfg("simple_loop", k) + + def test_loop_branch(self): + k = Kernel(arch=self.arch) + k.label("entry") + k.emit(s_mov_b32(s[1], 4)) + k.label("loop") + k.emit(s_add_u32(s[1], s[1], -1)) + k.emit(s_cmp_eq_i32(s[1], 2)) + k.emit(s_cbranch_scc1(), target="cond") + k.emit(s_branch(), target="cont") + k.label("cond") + k.emit(s_add_u32(s[1], s[1], -2)) + k.label("cont") + k.emit(s_cmp_eq_i32(s[1], 0)) + k.emit(s_cbranch_scc0(), target="loop") + k.emit(s_endpgm()) + k.emit(s_code_end()) + self.get_cfg("loop_if", k) + + def test_loop_break(self): + k = Kernel(arch=self.arch) + k.label("entry") + k.emit(s_mov_b32(s[1], 8)) + k.label("loop") + k.emit(s_add_u32(s[1], s[1], -1)) + k.emit(s_cmp_eq_i32(s[1], 5)) + k.emit(s_cbranch_scc1(), target="break") + k.emit(s_cmp_eq_i32(s[1], 0)) + k.emit(s_cbranch_scc0(), target="loop") + k.label("break") + k.emit(s_endpgm()) + k.emit(s_code_end()) + self.get_cfg("loop_break", k) + + def test_switch(self): + k = Kernel(arch=self.arch) + k.label("entry") + k.emit(s_cmp_eq_i32(s[0], 0)) + k.emit(s_cbranch_scc1(), target="case0") + k.emit(s_cmp_eq_i32(s[0], 1)) + k.emit(s_cbranch_scc1(), target="case1") + k.emit(s_branch(), target="case2") + k.label("case0") + k.emit(s_nop(0)) + k.emit(s_branch(), target="join") + k.label("case1") + k.emit(s_nop(1)) + k.emit(s_branch(), target="join") + k.label("case2") + k.emit(s_nop(2)) + k.emit(s_branch(), target="join") + k.label("join") + k.emit(s_endpgm()) + k.emit(s_code_end()) + self.get_cfg("switch_case", k) + + def test_ping_pong(self): + k = Kernel(arch=self.arch) + k.label("entry") + k.emit(s_cmp_eq_i32(s[0], 0)) + k.emit(s_cbranch_scc1(), target="ping") + k.emit(s_branch(), target="pong") + k.label("ping") + k.emit(s_cmp_eq_i32(s[1], 0)) + k.emit(s_cbranch_scc1(), target="pong") + k.emit(s_branch(), target="end") + k.label("pong") + k.emit(s_cmp_eq_i32(s[2], 0)) + k.emit(s_cbranch_scc1(), target="ping") + k.label("end") + k.emit(s_endpgm()) + k.emit(s_code_end()) + self.get_cfg("ping_pong", k) + + def test_colored_blocks(self): + N = 10 + k = Kernel(arch=self.arch) + k.label("entry") + k.emit(s_branch(), target="init0") + for i in range(N): + loop = f"loop{i}" + k.label(f"init{i}") + k.emit(s_mov_b32(s[1], i + 1)) + k.emit(s_branch(), target=loop) + k.label(loop) + k.emit(s_nop(i & 7)) + k.emit(s_add_u32(s[1], s[1], -1)) + k.emit(s_cmp_eq_i32(s[1], 0)) + k.emit(s_cbranch_scc0(), target=loop) + k.emit(s_branch(), target=f"init{i+1}" if i + 1 < N else "end") + k.label("end") + k.emit(s_endpgm()) + k.emit(s_code_end()) + self.get_cfg("test_colored_blocks", k) + + def test_jump_back_to_end(self): + k = Kernel(arch=self.arch) + k.label("entry") + k.emit(s_mov_b32(s[1], 2)) + k.emit(s_cbranch_execz(), target="loop") + k.label("end") + k.emit(s_endpgm()) + k.label("loop") + k.emit(s_add_u32(s[1], s[1], -1)) + k.emit(s_cmp_eq_i32(s[1], 0)) + k.emit(s_branch(), target="end") + k.emit(s_code_end()) + self.get_cfg("jump_back_to_end", k) + if __name__ == "__main__": unittest.main() diff --git a/test/testextra/test_cfg_viz.py b/test/testextra/test_cfg_viz.py deleted file mode 100644 index f014040cbc..0000000000 --- a/test/testextra/test_cfg_viz.py +++ /dev/null @@ -1,214 +0,0 @@ -# ruff: noqa: F405, F403 -# allow define from star imports - -import unittest - -from tinygrad import Device, Tensor -from tinygrad.uop.ops import UOp, Ops, KernelInfo -from tinygrad.viz.serve import amdgpu_cfg - -from tinygrad.runtime.autogen.amd.rdna3.ins import * -from tinygrad.renderer.amd.dsl import s - -# TODO: this belongs to the dsl infrastructure -from extra.gemm.amd_asm_matmul import Kernel - -def run_asm(name:str, k:Kernel): - insts = k.finalize() - def fxn(out:UOp) -> UOp: - lidx = UOp.special(1, "lidx0") - gidx = UOp.special(1, "gidx0") - sink = UOp.sink(out.base, lidx, gidx, arg=KernelInfo(name=name)) - return UOp(Ops.PROGRAM, src=(sink, UOp(Ops.DEVICE, arg="AMD"), UOp(Ops.LINEAR, src=tuple([UOp(Ops.INS, arg=x) for x in insts])))) - out = Tensor.custom_kernel(Tensor.empty(1), fxn=fxn)[0] - ei = out.schedule()[-1].lower() - ei.run() - return ei - -@unittest.skipUnless(Device.DEFAULT == "AMD", "only on AMD") -class TestCfg(unittest.TestCase): - def setUp(self): - self.arch = Device["AMD"].arch - if not any(self.arch.startswith(a) for a in {"gfx11", "gfx12"}): - self.skipTest(f"tests written for RDNA, got arch {self.arch}") - - def test_simple(self): - k = Kernel(arch=Device["AMD"].arch) - k.label("entry") - k.emit(s_branch(), target="bb1") - k.label("bb1") - k.emit(s_endpgm()) - k.emit(s_code_end()) - run_asm("simple", k) - - def test_diamond(self): - k = Kernel(arch=Device["AMD"].arch) - k.label("entry") - k.emit(s_mov_b32(s[0], 0)) - k.emit(s_mov_b32(s[1], 0)) - k.emit(s_cmp_eq_u64(s[0:1], 0)) - k.emit(s_cbranch_scc1(), target="if") - k.emit(s_branch(), target="else") - k.label("if") - k.emit(s_nop(1)) - k.emit(s_branch(), target="end") - k.label("else") - k.emit(s_nop(0)) - k.label("end") - k.emit(s_endpgm()) - k.emit(s_code_end()) - ei = run_asm("diamond", k) - ret = amdgpu_cfg(ei.prg.p.lib, self.arch) - cfg = ret["data"] - self.assertEqual(len(cfg["blocks"]), 5) - edge_count = sum(len(v) for v in cfg["paths"].values()) - self.assertEqual(edge_count, 5) - references:dict[str, list[str]] = {} - for pc, tokens in cfg["pc_tokens"].items(): - for t in tokens: - for key in t["keys"]: references.setdefault(key, []).append(pc) - self.assertEqual(len(references["r0"]), 2) - insts = [cfg["pc_tokens"][pc][0]["st"] for pc in references["r0"]] - self.assertEqual(insts, ['s_mov_b32', 's_cmp_eq_u64']) - end_block = [" ".join(t["st"] for t in cfg["pc_tokens"][pc]) for pc in list(cfg["blocks"].values())[-1]] - code_line = ret["src"].splitlines()[-1] - self.assertEqual(len(end_block), 2) - for st in [end_block[-1], code_line]: - assert st.startswith("s_code_end") and st.endswith("x)"), st - - def test_loop(self): - k = Kernel(arch=Device["AMD"].arch) - k.label("entry") - k.emit(s_mov_b32(s[1], 4)) - k.label("loop") - k.emit(s_add_u32(s[1], s[1], -1)) - k.emit(s_cmp_eq_i32(s[1], 0)) - k.emit(s_cbranch_scc0(), target="loop") - k.emit(s_endpgm()) - k.emit(s_code_end()) - run_asm("simple_loop", k) - - def test_loop_branch(self): - k = Kernel(arch=Device["AMD"].arch) - k.label("entry") - k.emit(s_mov_b32(s[1], 4)) - k.label("loop") - k.emit(s_add_u32(s[1], s[1], -1)) - k.emit(s_cmp_eq_i32(s[1], 2)) - k.emit(s_cbranch_scc1(), target="cond") - k.emit(s_branch(), target="cont") - k.label("cond") - k.emit(s_add_u32(s[1], s[1], -2)) - k.label("cont") - k.emit(s_cmp_eq_i32(s[1], 0)) - k.emit(s_cbranch_scc0(), target="loop") - k.emit(s_endpgm()) - k.emit(s_code_end()) - run_asm("loop_if", k) - - def test_loop_break(self): - k = Kernel(arch=Device["AMD"].arch) - k.label("entry") - k.emit(s_mov_b32(s[1], 8)) - k.label("loop") - k.emit(s_add_u32(s[1], s[1], -1)) - k.emit(s_cmp_eq_i32(s[1], 5)) - k.emit(s_cbranch_scc1(), target="break") - k.emit(s_cmp_eq_i32(s[1], 0)) - k.emit(s_cbranch_scc0(), target="loop") - k.label("break") - k.emit(s_endpgm()) - k.emit(s_code_end()) - run_asm("loop_break", k) - - def test_switch(self): - k = Kernel(arch=Device["AMD"].arch) - k.label("entry") - k.emit(s_cmp_eq_i32(s[0], 0)) - k.emit(s_cbranch_scc1(), target="case0") - k.emit(s_cmp_eq_i32(s[0], 1)) - k.emit(s_cbranch_scc1(), target="case1") - k.emit(s_branch(), target="case2") - k.label("case0") - k.emit(s_nop(0)) - k.emit(s_branch(), target="join") - k.label("case1") - k.emit(s_nop(1)) - k.emit(s_branch(), target="join") - k.label("case2") - k.emit(s_nop(2)) - k.emit(s_branch(), target="join") - k.label("join") - k.emit(s_endpgm()) - k.emit(s_code_end()) - run_asm("switch_case", k) - - def test_ping_pong(self): - k = Kernel(arch=Device["AMD"].arch) - k.label("entry") - k.emit(s_cmp_eq_i32(s[0], 0)) - k.emit(s_cbranch_scc1(), target="ping") - k.emit(s_branch(), target="pong") - k.label("ping") - k.emit(s_cmp_eq_i32(s[1], 0)) - k.emit(s_cbranch_scc1(), target="pong") - k.emit(s_branch(), target="end") - k.label("pong") - k.emit(s_cmp_eq_i32(s[2], 0)) - k.emit(s_cbranch_scc1(), target="ping") - k.label("end") - k.emit(s_endpgm()) - k.emit(s_code_end()) - run_asm("ping_pong", k) - - def test_colored_blocks(self): - N = 10 - k = Kernel(arch=Device["AMD"].arch) - k.label("entry") - k.emit(s_branch(), target="init0") - for i in range(N): - loop = f"loop{i}" - k.label(f"init{i}") - k.emit(s_mov_b32(s[1], i + 1)) - k.emit(s_branch(), target=loop) - k.label(loop) - k.emit(s_nop(i & 7)) - k.emit(s_add_u32(s[1], s[1], -1)) - k.emit(s_cmp_eq_i32(s[1], 0)) - k.emit(s_cbranch_scc0(), target=loop) - k.emit(s_branch(), target=f"init{i+1}" if i + 1 < N else "end") - k.label("end") - k.emit(s_endpgm()) - k.emit(s_code_end()) - run_asm("test_colored_blocks", k) - - def test_jump_back_to_end(self): - k = Kernel(arch=Device["AMD"].arch) - k.label("entry") - k.emit(s_mov_b32(s[1], 2)) - k.emit(s_cbranch_execz(), target="loop") - k.label("end") - k.emit(s_endpgm()) - k.label("loop") - k.emit(s_add_u32(s[1], s[1], -1)) - k.emit(s_cmp_eq_i32(s[1], 0)) - k.emit(s_branch(), target="end") - k.emit(s_code_end()) - run_asm("jump_back_to_end", k) - - def test_hit_count(self): - k = Kernel(arch=Device["AMD"].arch) - k.label("entry") - k.emit(s_mov_b32(s[1], 1)) - k.emit(s_branch(), target="alt") - k.label("continue") - k.emit(s_mov_b32(s[2], 2)) - k.emit(s_add_u32(s[1], s[1], s[2])) - k.label("alt") - k.emit(s_add_u32(s[1], s[1], -1)) - k.emit(s_endpgm()) - k.emit(s_code_end()) - run_asm("test_hit_count", k) - -if __name__ == "__main__": - unittest.main()