amd aql queue (#11708)

* amd aql queue

* xcc

* fiz

* aql better

* llvm

* no for aql

* wrap

* is_sql

* am support

* complete

* fix

* mypy

* minor
This commit is contained in:
nimlgen
2025-08-24 19:53:00 +03:00
committed by GitHub
parent 1fa09d9ede
commit bba088ef11

View File

@@ -4,10 +4,11 @@ import os, ctypes, ctypes.util, struct, hashlib, functools, importlib, mmap, err
assert sys.platform != 'win32'
from dataclasses import dataclass
from tinygrad.runtime.support.hcq import HCQCompiled, HCQAllocator, HCQBuffer, HWQueue, CLikeArgsState, HCQSignal, HCQProgram, FileIOInterface
from tinygrad.runtime.support.hcq import MMIOInterface
from tinygrad.runtime.support.hcq import MMIOInterface, BumpAllocator
from tinygrad.uop.ops import sint
from tinygrad.device import Compiled, DMAFdRef, BufferSpec
from tinygrad.helpers import getenv, to_mv, round_up, data64_le, all_same, flatten, DEBUG, AMD_LLVM, PROFILE, ProfileEvent, suppress_finalizing
from tinygrad.helpers import lo32, hi32
from tinygrad.renderer.cstyle import AMDRenderer
from tinygrad.renderer.llvmir import AMDLLVMRenderer
from tinygrad.runtime.autogen import kfd, hsa, pci, sqtt
@@ -24,6 +25,8 @@ EVENT_INDEX_PARTIAL_FLUSH = 4 # based on a comment in nvd.h
WAIT_REG_MEM_FUNCTION_EQ = 3 # ==
WAIT_REG_MEM_FUNCTION_NEQ = 4 # !=
WAIT_REG_MEM_FUNCTION_GEQ = 5 # >=
AQL_HDR = (1 << hsa.HSA_PACKET_HEADER_BARRIER) | (hsa.HSA_FENCE_SCOPE_SYSTEM << hsa.HSA_PACKET_HEADER_SCACQUIRE_FENCE_SCOPE) \
| (hsa.HSA_FENCE_SCOPE_SYSTEM << hsa.HSA_PACKET_HEADER_SCRELEASE_FENCE_SCOPE)
class AMDSignal(HCQSignal):
def __init__(self, *args, **kwargs): super().__init__(*args, **{**kwargs, 'timestamp_divider': 100})
@@ -284,7 +287,7 @@ class AMDComputeQueue(HWQueue):
def wait(self, signal:AMDSignal, value:sint=0):
self.wait_reg_mem(mem=signal.value_addr, value=value, mask=0xffffffff)
if self.dev.xccs > 1: self.xcc_barrier()
if self.dev.xccs > 1 and not self.dev.is_aql: self.xcc_barrier()
return self
def timestamp(self, signal:AMDSignal):
@@ -329,6 +332,41 @@ class AMDComputeQueue(HWQueue):
dev.compute_queue.put_value += len(cmds)
dev.compute_queue.signal_doorbell(dev)
class AMDComputeAQLQueue(AMDComputeQueue):
def exec(self, prg:AMDProgram, args_state:CLikeArgsState, global_size:tuple[sint, ...], local_size:tuple[sint, ...]):
self.bind_args_state(args_state)
self._q.append(pkt:=hsa.hsa_kernel_dispatch_packet_t(header=AQL_HDR | (hsa.HSA_PACKET_TYPE_KERNEL_DISPATCH << hsa.HSA_PACKET_HEADER_TYPE),
setup=3<<hsa.HSA_KERNEL_DISPATCH_PACKET_SETUP_DIMENSIONS, private_segment_size=prg.private_segment_size,
group_segment_size=prg.group_segment_size, kernel_object=prg.aql_prog_addr, kernarg_address=args_state.buf.va_addr))
self.bind_sints_to_mem(*local_size, mem=(pkt_view:=MMIOInterface(addr=ctypes.addressof(pkt), nbytes=ctypes.sizeof(pkt))), fmt='H', offset=4)
self.bind_sints_to_mem(*[l * g for l,g in zip(local_size, global_size)], mem=pkt_view, fmt='I', offset=12)
def bind(self, dev:AMDDevice): pass # not supported
def _submit(self, dev:AMDDevice):
pm4_batch:list[int] = []
aql_bytes = bytes()
def flush_pm4_batch():
nonlocal pm4_batch
if not pm4_batch: return bytes()
dev.pm4_ibs.cpu_view().view(off:=dev.pm4_ib_alloc.alloc(len(pm4_batch) * 4), fmt='I')[:len(pm4_batch)] = array.array('I', pm4_batch)
pkt = [AQL_HDR | (hsa.HSA_PACKET_TYPE_VENDOR_SPECIFIC << hsa.HSA_PACKET_HEADER_TYPE) | (1 << 16),
self.pm4.PACKET3(self.pm4.PACKET3_INDIRECT_BUFFER, 2), *data64_le(dev.pm4_ibs.va_addr+off), len(pm4_batch)|self.pm4.INDIRECT_BUFFER_VALID, 10]
pm4_batch.clear()
return bytes(array.array('I', pkt + [0] * 10))
for cmd in self._q:
if isinstance(cmd, hsa.hsa_kernel_dispatch_packet_t): aql_bytes += flush_pm4_batch() + bytes(cmd)
else: pm4_batch.append(cmd)
aql_bytes += flush_pm4_batch()
assert len(aql_bytes) < dev.compute_queue.ring.nbytes, "submit is too large for the queue"
cp_bytes = min(len(aql_bytes), (dev.compute_queue.ring.nbytes - (dev.compute_queue.put_value * 64) % dev.compute_queue.ring.nbytes))
dev.compute_queue.ring.view(offset=(dev.compute_queue.put_value * 64) % dev.compute_queue.ring.nbytes, fmt='B')[:cp_bytes] = aql_bytes[:cp_bytes]
if (tail_bytes:=(len(aql_bytes) - cp_bytes)) > 0: dev.compute_queue.ring.view(offset=0, fmt='B')[:tail_bytes] = aql_bytes[cp_bytes:]
dev.compute_queue.put_value += len(aql_bytes) // 64
dev.compute_queue.signal_doorbell(dev, doorbell_value=dev.compute_queue.put_value-1)
class AMDCopyQueue(HWQueue):
def __init__(self, dev, max_copy_size=0x40000000):
self.dev, self.sdma, self.internal_cmd_sizes, self.max_copy_size = dev, dev.sdma, [], max_copy_size
@@ -456,6 +494,7 @@ class AMDProgram(HCQProgram):
self.rsrc1: int = code.compute_pgm_rsrc1 | ((1 << 20) if (11,0,0) <= self.dev.target < (12,0,0) else 0)
self.rsrc2: int = code.compute_pgm_rsrc2 | (lds_size << 15)
self.rsrc3: int = image[rodata_entry+44:rodata_entry+48].cast("I")[0] # NOTE: kernel descriptor, not in amd_kernel_code_t struct
self.aql_prog_addr: int = self.lib_gpu.va_addr + rodata_entry
self.prog_addr: int = self.lib_gpu.va_addr + rodata_entry + code.kernel_code_entry_byte_offset
# Some programs use hsa_kernel_dispatch_packet_t to read workgroup sizes during execution.
# The packet is represented as a pointer and set up in SGPRs. Space for the packet is allocated as part of the kernel arguments.
@@ -505,7 +544,7 @@ class AMDQueueDesc:
return cls(ring=queues[0].ring, put_value=queues[0].put_value, doorbells=flatten(q.doorbells for q in queues),
read_ptrs=flatten(q.read_ptrs for q in queues), write_ptrs=flatten(q.write_ptrs for q in queues))
def signal_doorbell(self, dev):
def signal_doorbell(self, dev, doorbell_value:int|None=None):
for write_ptr in self.write_ptrs: write_ptr[0] = self.put_value
# Ensure all prior writes are visible to the GPU.
@@ -513,7 +552,7 @@ class AMDQueueDesc:
# Flush hdp if queue is in dev mem.
if dev.is_am() and not dev.is_usb(): dev.iface.dev_impl.gmc.flush_hdp()
for doorbell in self.doorbells: doorbell[0] = self.put_value
for doorbell in self.doorbells: doorbell[0] = self.put_value if doorbell_value is None else doorbell_value
class KFDIface:
kfd:FileIOInterface|None = None
@@ -668,6 +707,7 @@ class PCIIface(PCIIfaceBase):
def create_queue(self, queue_type, ring, gart, rptr, wptr, eop_buffer=None, cwsr_buffer=None, ctl_stack_size=0, ctx_save_restore_size=0, xcc_id=0):
assert cwsr_buffer is None, "no cwsr buffer for am"
assert queue_type != kfd.KFD_IOC_QUEUE_TYPE_COMPUTE_AQL, "no AQL queues for am"
if queue_type == kfd.KFD_IOC_QUEUE_TYPE_SDMA:
self.dev_impl.sdma.setup_ring(ring_addr=ring.va_addr, ring_size=ring.size, rptr_addr=gart.va_addr+rptr, wptr_addr=gart.va_addr+wptr,
@@ -767,7 +807,13 @@ class AMDDevice(HCQCompiled):
nbio_pad = (0,) if self.target[0] == 9 else ()
self.nbio = AMDIP(nbio_name, self.iface.ip_versions[am.NBIF_HWIP], {i:nbio_pad+x for i,x in self.iface.ip_offsets[am.NBIF_HWIP].items()})
self.compute_queue = self.create_queue(kfd.KFD_IOC_QUEUE_TYPE_COMPUTE, 0x2000 if self.is_usb() else (16 << 20), eop_buffer_size=0x1000,
self.is_aql = getenv("AMD_AQL", 0)
if self.is_aql:
self.pm4_ibs = self.iface.alloc(0x2000 if self.is_usb() else (16 << 20), uncached=True, cpu_access=True)
self.pm4_ib_alloc = BumpAllocator(self.pm4_ibs.size, wrap=True)
self.compute_queue = self.create_queue(kfd.KFD_IOC_QUEUE_TYPE_COMPUTE_AQL if self.is_aql else kfd.KFD_IOC_QUEUE_TYPE_COMPUTE,
0x2000 if self.is_usb() else (16 << 20), eop_buffer_size=0x1000,
ctx_save_restore_size=0 if self.is_am() else wg_data_size + ctl_stack_size, ctl_stack_size=ctl_stack_size, debug_memory_size=debug_memory_size)
max_copy_size = 0x40000000 if self.iface.ip_versions[am.SDMA0_HWIP][0] >= 5 else 0x400000
@@ -775,7 +821,8 @@ class AMDDevice(HCQCompiled):
super().__init__(device, AMDAllocator(self), AMDLLVMRenderer(self.arch) if AMD_LLVM else AMDRenderer(self.arch),
AMDLLVMCompiler(self.arch) if AMD_LLVM else HIPCompiler(self.arch), functools.partial(AMDProgram, self),
AMDSignal, functools.partial(AMDComputeQueue, self), functools.partial(AMDCopyQueue, self, max_copy_size=max_copy_size),
AMDSignal, functools.partial(AMDComputeAQLQueue if self.is_aql else AMDComputeQueue, self),
functools.partial(AMDCopyQueue, self, max_copy_size=max_copy_size),
kernargs_size=(8 << 10) if self.is_usb() else (16 << 20), sigalloc_size=0x100 if self.is_usb() else 0x1000)
# Scratch setup
@@ -784,10 +831,10 @@ class AMDDevice(HCQCompiled):
# XCC setup
self.xcc_sync: tuple[AMDSignal, AMDSignal]|None = None
if self.xccs > 1:
if self.xccs > 1 and not self.is_aql:
self.xcc_sync_area = self.allocator.alloc(0x1000, BufferSpec(nolru=True, cpu_access=True))
self.xcc_sync = (AMDSignal(base_buf=self.xcc_sync_area), AMDSignal(base_buf=self.xcc_sync_area.offset(256)))
AMDComputeQueue(self).xcc_config().submit(self)
cast(AMDComputeQueue, self.hw_compute_queue_t()).xcc_config().submit(self)
# SQTT is disabled by default because of runtime overhead and big file sizes (~200mb to Tensor.full() two 4096x4096 tensors and matmul them)
self.sqtt_enabled = PROFILE and bool(getenv("SQTT", 0))
@@ -802,17 +849,25 @@ class AMDDevice(HCQCompiled):
self.sqtt_buffers = [self.allocator.alloc(SQTT_BUFFER_SIZE*1024*1024, BufferSpec(cpu_access=True, nolru=True)) for _ in range(SQTT_NUM)]
self.sqtt_itrace_se_mask = getenv("SQTT_ITRACE_SE_MASK", 2) # -1 enable all, 0 disable all, >0 bitmask for where to enable instruction tracing
self.cmd_id = 0
AMDComputeQueue(self).sqtt_start(self.sqtt_buffers, self.sqtt_itrace_se_mask).submit(self)
cast(AMDComputeQueue, self.hw_compute_queue_t()).sqtt_start(self.sqtt_buffers, self.sqtt_itrace_se_mask).submit(self)
def create_queue(self, queue_type, ring_size, ctx_save_restore_size=0, eop_buffer_size=0, ctl_stack_size=0, debug_memory_size=0):
ring = self.iface.alloc(ring_size, uncached=True, cpu_access=True)
gart = self.iface.alloc(0x100, uncached=True, cpu_access=True)
if queue_type == kfd.KFD_IOC_QUEUE_TYPE_COMPUTE_AQL:
aql_desc = hsa.amd_queue_t(queue_properties=hsa.AMD_QUEUE_PROPERTIES_IS_PTR64 | hsa.AMD_QUEUE_PROPERTIES_ENABLE_PROFILING,
read_dispatch_id_field_base_byte_offset=getattr(hsa.amd_queue_t, 'read_dispatch_id').offset,
max_cu_id=self.max_cu_id, max_wave_id=self.max_wave_id)
gart.cpu_view().view(fmt='B')[:ctypes.sizeof(aql_desc)] = bytes(aql_desc)
self.aql_desc = hsa.amd_queue_t.from_address(gart.va_addr)
cwsr_buffer_size = round_up((ctx_save_restore_size + debug_memory_size) * self.iface.props.get('num_xcc', 1), mmap.PAGESIZE)
cwsr_buffer = self.iface.alloc(cwsr_buffer_size) if ctx_save_restore_size else None
eop_buffer = self.iface.alloc(eop_buffer_size) if eop_buffer_size else None
return AMDQueueDesc.multi(*(self.iface.create_queue(queue_type, ring, gart, rptr=0, wptr=0x10, eop_buffer=eop_buffer, cwsr_buffer=cwsr_buffer,
return AMDQueueDesc.multi(*(self.iface.create_queue(queue_type, ring, gart, rptr=getattr(hsa.amd_queue_t, 'read_dispatch_id').offset,
wptr=getattr(hsa.amd_queue_t, 'write_dispatch_id').offset, eop_buffer=eop_buffer, cwsr_buffer=cwsr_buffer,
xcc_id=xcc_id, ctx_save_restore_size=ctx_save_restore_size, ctl_stack_size=ctl_stack_size)
for xcc_id in range(self.xccs if queue_type == kfd.KFD_IOC_QUEUE_TYPE_COMPUTE else 1)))
@@ -832,8 +887,16 @@ class AMDDevice(HCQCompiled):
self.tmpring_size = waves << 12 | wavesize
self.max_private_segment_size = required
if hasattr(self, 'aql_desc'):
self.aql_desc.scratch_backing_memory_location = self.scratch.va_addr
self.aql_desc.scratch_backing_memory_byte_size = self.scratch.size
self.aql_desc.scratch_wave64_lane_byte_size = self.max_private_segment_size * (self.aql_desc.max_wave_id + 1) // 64
self.aql_desc.scratch_resource_descriptor[:] = [lo32(self.scratch.va_addr), hi32(self.scratch.va_addr) | (1 << 30), lo32(self.scratch.size),
0x20814fac] # FORMAT=BUF_FORMAT_32_UINT,OOB_SELECT=2,ADD_TID_ENABLE=1,TYPE=SQ_RSRC_BUF,SQ_SELs
self.aql_desc.compute_tmpring_size = self.tmpring_size
def invalidate_caches(self):
AMDComputeQueue(self).memory_barrier().signal(self.timeline_signal, self.next_timeline()).submit(self)
self.hw_compute_queue_t().memory_barrier().signal(self.timeline_signal, self.next_timeline()).submit(self)
self.synchronize()
def on_device_hang(self): self.iface.on_device_hang()
@@ -842,7 +905,8 @@ class AMDDevice(HCQCompiled):
if self.sqtt_enabled:
wptrs_buf = self.allocator.alloc(round_up(len(self.sqtt_buffers), 0x1000), BufferSpec(cpu_access=True, nolru=True))
wptrs = to_mv(wptrs_buf.va_addr, wptrs_buf.size)
AMDComputeQueue(self).sqtt_stop(len(self.sqtt_buffers), wptrs_buf).signal(self.timeline_signal, self.next_timeline()).submit(self)
cast(AMDComputeQueue, self.hw_compute_queue_t()).sqtt_stop(len(self.sqtt_buffers), wptrs_buf) \
.signal(self.timeline_signal, self.next_timeline()).submit(self)
self.synchronize()
if DEBUG>=2: print('Saving SQTT in profile...')
for i,buf0 in enumerate(self.sqtt_buffers):