viz: amdgpu arch cleanup (#14790)

* viz: amdgpu arch cleanup

* don't do that

* simpler sqttmap

* work

* self.arch
This commit is contained in:
qazal
2026-02-16 15:48:12 +08:00
committed by GitHub
parent 401095e3e7
commit ac62d28ddc
4 changed files with 13 additions and 16 deletions

View File

@@ -58,9 +58,8 @@ class TestSQTTMapBase(unittest.TestCase):
data = pickle.load(f)
sqtt_events = [e for e in data if type(e).__name__ == "ProfileSQTTEvent"]
kern_events = {e.tag:e for e in data if type(e).__name__ == "ProfileProgramEvent"}
dev = next((e for e in data if type(e).__name__ == "ProfileDeviceEvent" and e.device.startswith("AMD")), None)
if sqtt_events and kern_events and dev:
cls.examples[pkl_path.stem] = (sqtt_events, kern_events, dev.props["gfx_target_version"])
if sqtt_events and kern_events:
cls.examples[pkl_path.stem] = (sqtt_events, kern_events, cls.target)
def test_rocprof_inst_traces_match(self):
for name, (events, kern_events, target) in self.examples.items():

View File

@@ -28,8 +28,8 @@ def run_asm(name:str, k:Kernel):
@unittest.skipUnless(Device.DEFAULT == "AMD", "only on AMD")
class TestCfg(unittest.TestCase):
def setUp(self):
arch = Device["AMD"].arch
if not any(arch.startswith(a) for a in {"gfx11", "gfx12"}):
self.arch = Device["AMD"].arch
if not any(self.arch.startswith(a) for a in {"gfx11", "gfx12"}):
self.skipTest(f"tests written for RDNA, got arch {arch}")
def test_simple(self):
@@ -58,7 +58,7 @@ class TestCfg(unittest.TestCase):
k.emit(s_endpgm())
k.emit(s_code_end())
ei = run_asm("diamond", k)
cfg = amdgpu_cfg(ei.prg.p.lib, Device[Device.DEFAULT].device_props()["gfx_target_version"])["data"]
cfg = amdgpu_cfg(ei.prg.p.lib, self.arch)["data"]
self.assertEqual(len(cfg["blocks"]), 5)
edge_count = sum(len(v) for v in cfg["paths"].values())
self.assertEqual(edge_count, 5)

View File

@@ -575,7 +575,7 @@ class InstructionInfo:
wave: int
inst: Inst
def map_insts(data:bytes, lib:bytes, target:int) -> Iterator[tuple[PacketType, InstructionInfo|None]]:
def map_insts(data:bytes, lib:bytes, target:str) -> Iterator[tuple[PacketType, InstructionInfo|None]]:
"""maps SQTT packets to instructions, yields (packet, instruction_info or None)"""
# map pcs to insts
from tinygrad.viz.serve import amd_decode

View File

@@ -173,7 +173,7 @@ def rel_ts(ts:int|Decimal, start_ts:int) -> int:
device_ts_diffs:dict[str, Decimal] = {}
def cpu_ts_diff(device:str) -> Decimal: return device_ts_diffs.get(device, Decimal(0))
amdgpu_targets:dict[str, int] = {}
amdgpu_targets:dict[str, str] = {}
DevEvent = ProfileRangeEvent|ProfileGraphEntry|ProfilePointEvent
def flatten_events(profile:list[ProfileEvent]) -> Generator[tuple[Decimal, Decimal, DevEvent], None, None]:
@@ -314,7 +314,7 @@ def load_counters(profile:list[ProfileEvent]) -> None:
steps.append(create_step("SQTT", ("/prg-sqtt", len(ctxs), len(steps)), ((k, tag), sqtt, prg_events[k])))
ctxs.append({"name":f"Exec {name}"+(f" n{run_number[k]}" if run_number[k] > 1 else ""), "steps":steps})
def sqtt_timeline(data:bytes, lib:bytes, target:int) -> list[ProfileEvent]:
def sqtt_timeline(data:bytes, lib:bytes, target:str) -> list[ProfileEvent]:
from tinygrad.renderer.amd.sqtt import map_insts, InstructionInfo, PacketType, INST, InstOp, VALUINST, IMMEDIATE, IMMEDIATE_MASK, VMEMEXEC, ALUEXEC
ret:list[ProfileEvent] = []
rows:dict[str, None] = {}
@@ -389,7 +389,7 @@ def get_profile(profile:list[ProfileEvent], sort_fn:Callable[[str], Any]=device_
device_ts_diffs[ev.device] = ev.tdiff
if (d:=ev.device.split(":")[0]) == "AMD":
device_decoders[d] = load_counters
amdgpu_targets[d] = unwrap(ev.props)["gfx_target_version"]
amdgpu_targets[d] = f"gfx{unwrap(ev.props)['gfx_target_version']//1000}"
# load device specific counters
for fxn in device_decoders.values(): fxn(profile)
# map events per device
@@ -436,7 +436,7 @@ def amd_readelf(lib:bytes) -> list[dict]:
return [{"label":f"{resource} Alloc", "value":val} for resource,val in [("VGPR", (vgpr_gran+1)*8-7), ("LDS",kd.group_segment_fixed_size),
("Scratch", kd.private_segment_fixed_size)] if val > 0]
def amd_decode(lib:bytes, target:int) -> dict[int, Any]: # Any is the Inst class from tinygrad.renderer.amd.dsl
def amd_decode(lib:bytes, target:str) -> dict[int, Any]: # Any is the Inst class from tinygrad.renderer.amd.dsl
from tinygrad.runtime.support.elf import elf_loader
from tinygrad.renderer.amd import detect_format
from tinygrad.renderer.amd.dsl import Inst
@@ -444,7 +444,7 @@ def amd_decode(lib:bytes, target:int) -> dict[int, Any]: # Any is the Inst class
text = next((sh for sh in sections if sh.name == ".text"), None)
assert text is not None, "no .text section found in ELF"
off, buf = text.header.sh_addr, text.content
arch = {11:"rdna3", 12:"rdna4"}.get(target//10000, "cdna")
arch = "rdna3" if target.startswith("gfx11") else "rdna4" if target.startswith("gfx12") else "cdna"
addr_table:dict[int, Inst] = {}
offset = 0
while offset < len(buf):
@@ -462,7 +462,7 @@ def parse_branch(inst) -> int|None:
return None
COND_TAKEN, COND_NOT_TAKEN, UNCOND = range(3)
def amdgpu_cfg(lib:bytes, target:int) -> dict:
def amdgpu_cfg(lib:bytes, target:str) -> dict:
# decode
pc_table = amd_decode(lib, target)
# get leaders
@@ -477,9 +477,7 @@ def amdgpu_cfg(lib:bytes, target:int) -> dict:
disasm = {pc:str(inst) for pc,inst in pc_table.items()}
asm_width = max(len(asm) for asm in disasm.values())
for pc, inst in pc_table.items():
# skip instructions only used for padding
if (asm:=disasm[pc]) == "s_code_end": continue
lines.append(f" {asm:<{asm_width}} // {pc:012X}")
lines.append(f" {disasm[pc]:<{asm_width}} // {pc:012X}")
if pc in leaders:
paths[curr:=pc] = {}
blocks[pc] = []