add HWCommandQueue base class for hcq devices (#5303)

* add HWCommandQueue as base queue for hcq devices

* try this

* fixes

* comments

* linter

* linetr2

* linter

* linter

* fixed

* revert this
This commit is contained in:
nimlgen
2024-07-11 16:19:13 +03:00
committed by GitHub
parent dc3ea78560
commit bd77efda2f
3 changed files with 171 additions and 143 deletions

View File

@@ -3,7 +3,7 @@ import multiprocessing
from dataclasses import dataclass
from collections import defaultdict
from typing import List, Optional, Dict, Tuple, Any, cast, Protocol
import importlib, inspect, functools, pathlib, os, ctypes, atexit, time, contextlib
import importlib, inspect, functools, pathlib, os, ctypes, atexit, time, contextlib, array
from tinygrad.helpers import getenv, diskcache_get, diskcache_put, DEBUG, GlobalCounters, flat_mv, from_mv, ProfileLogger, PROFILE
from tinygrad.dtype import DType, ImageDType
from tinygrad.renderer import Renderer
@@ -186,6 +186,84 @@ class Compiled:
# **************** for HCQ Compatible Devices ****************
def hcq_command(func):
"""
Decorator for HWCommandQueue commands.
Enables command indexing and stores metadata for command updates.
Usage:
@hcq_command
def command_method(self, ...): ...
"""
def __wrapper(self, *args, **kwargs):
self.cmds_offset.append(len(self.q))
func(self, *args, **kwargs)
self.cmds_len.append(len(self.q) - self.cmds_offset[-1])
self.cmds_meta.append(func.__name__)
return self
return __wrapper
class HWCommandQueue:
def __init__(self): self.q, self.binded_device, self.cmds_offset, self.cmds_len, self.cmds_meta = [], None, [], [], []
def __len__(self): return len(self.cmds_offset)
def _patch(self, cmd_idx, offset, data): self.q[(st:=self.cmds_offset[cmd_idx]+offset):st+len(data)] = array.array('I', data)
@hcq_command
def signal(self, signal, value): self._signal(signal, value)
def _signal(self, signal, value): raise NotImplementedError("backend should overload this function")
@hcq_command
def wait(self, signal, value): self._wait(signal, value)
def _wait(self, signal, value): raise NotImplementedError("backend should overload this function")
@hcq_command
def timestamp(self, signal): self._timestamp(signal)
def _timestamp(self, signal): raise NotImplementedError("backend should overload this function")
def update_signal(self, cmd_idx, signal=None, value=None):
if self.cmds_meta[cmd_idx] != "signal": raise RuntimeError("called update_signal not on a signal command")
self._update_signal(cmd_idx, signal, value)
return self
def _update_signal(self, cmd_idx, signal, value): raise NotImplementedError("backend should overload this function")
def update_wait(self, cmd_idx, signal=None, value=None):
if self.cmds_meta[cmd_idx] != "wait": raise RuntimeError("called update_wait not on a wait command")
self._update_wait(cmd_idx, signal, value)
return self
def _update_wait(self, cmd_idx, signal, value): raise NotImplementedError("backend should overload this function")
def submit(self, device:HCQCompatCompiled):
self._submit(device)
return self
def _submit(self, device:HCQCompatCompiled): raise NotImplementedError("backend should overload this function")
class HWComputeQueue(HWCommandQueue):
@hcq_command
def memory_barrier(self): self._memory_barrier()
def _memory_barrier(self): pass
@hcq_command
def exec(self, prg, kernargs, global_size, local_size): self._exec(prg, kernargs, global_size, local_size)
def _exec(self, prg, kernargs, global_size, local_size): raise NotImplementedError("backend should overload this function")
def update_exec(self, cmd_idx, global_size, local_size):
if self.cmds_meta[cmd_idx] != "exec": raise RuntimeError("called update_exec not on an exec command")
self._update_exec(cmd_idx, global_size, local_size)
return self
def _update_exec(self, cmd_idx, global_size, local_size): raise NotImplementedError("backend should overload this function")
class HWCopyQueue(HWCommandQueue):
@hcq_command
def copy(self, dest, src, copy_size): self._copy(dest, src, copy_size)
def _copy(self, dest, src, copy_size): raise NotImplementedError("backend should overload this function")
def update_copy(self, cmd_idx, dest=None, src=None):
if self.cmds_meta[cmd_idx] != "copy": raise RuntimeError("called update_copy not on an copy command")
self._update_copy(cmd_idx, dest, src)
return self
def _update_copy(self, cmd_idx, dest, src): raise NotImplementedError("backend should overload this function")
@contextlib.contextmanager
def hcq_profile(dev, enabled, desc, queue_type=None, queue=None):
st, en = (dev._alloc_signal(), dev._alloc_signal()) if enabled else (None, None)

View File

@@ -2,7 +2,8 @@ from __future__ import annotations
from typing import Tuple, List, Any
import os, fcntl, ctypes, ctypes.util, functools, re, pathlib, mmap, struct, errno, subprocess, time, array
from dataclasses import dataclass
from tinygrad.device import HCQCompatCompiled, HCQCompatAllocator, HCQCompatAllocRes, Compiler, CompileError, BufferOptions, hcq_profile
from tinygrad.device import HCQCompatCompiled, HCQCompatAllocator, HCQCompatAllocRes, HWComputeQueue, HWCopyQueue, hcq_profile, \
Compiler, CompileError, BufferOptions
from tinygrad.helpers import getenv, init_c_struct_t, to_mv, round_up, DEBUG, PROFILE, mv_address
from tinygrad.renderer.cstyle import AMDRenderer
from tinygrad.runtime.driver.hip_comgr import compile_hip
@@ -71,17 +72,9 @@ class AMDCompiler(Compiler):
try: return compile_hip(src, self.arch)
except RuntimeError as e: raise CompileError(e) from e
class HWQueue:
def __init__(self): self.q, self.cmd_offsets = [], [0]
def _mark_command_end(self):
self.cmd_offsets.append(len(self.q))
return self
def _patch(self, off, data): self.q[off:off+len(data)] = array.array('I', data)
def __len__(self): return len(self.cmd_offsets) - 1
class HWPM4Queue(HWQueue):
class AMDComputeQueue(HWComputeQueue):
def __init__(self):
self.binded_device, self.ptr_to_dispatch_packet = None, {}
self.ptr_to_dispatch_packet = {}
super().__init__()
def __del__(self):
@@ -97,14 +90,13 @@ class HWPM4Queue(HWQueue):
amd_gpu.PACKET3_ACQUIRE_MEM_GCR_CNTL_GLV_INV(glv) | amd_gpu.PACKET3_ACQUIRE_MEM_GCR_CNTL_GL1_INV(gl1) | \
amd_gpu.PACKET3_ACQUIRE_MEM_GCR_CNTL_GL2_INV(gl2) | amd_gpu.PACKET3_ACQUIRE_MEM_GCR_CNTL_GL2_WB(gl2)]
def memory_barrier(self):
def _memory_barrier(self):
self.q += [amd_gpu.PACKET3(amd_gpu.PACKET3_WAIT_REG_MEM, 5), amd_gpu.WAIT_REG_MEM_MEM_SPACE(0) | amd_gpu.WAIT_REG_MEM_OPERATION(1) | \
amd_gpu.WAIT_REG_MEM_FUNCTION(WAIT_REG_MEM_FUNCTION_EQ) | amd_gpu.WAIT_REG_MEM_ENGINE(0), nbioreg(regBIF_BX_PF1_GPU_HDP_FLUSH_REQ),
nbioreg(regBIF_BX_PF1_GPU_HDP_FLUSH_DONE), 0xffffffff, 0xffffffff, 0x20]
self._invalidate_cache()
return self._mark_command_end()
def exec(self, prg, kernargs, global_size:Tuple[int,int,int]=(1,1,1), local_size:Tuple[int,int,int]=(1,1,1)):
def _exec(self, prg, kernargs, global_size:Tuple[int,int,int]=(1,1,1), local_size:Tuple[int,int,int]=(1,1,1)):
self._invalidate_cache()
user_data = [*data64_le(kernargs)]
@@ -130,24 +122,19 @@ class HWPM4Queue(HWQueue):
self.q += [amd_gpu.PACKET3(amd_gpu.PACKET3_DISPATCH_DIRECT, 3), *global_size, CS_W32_EN | FORCE_START_AT_000 | COMPUTE_SHADER_EN]
self.q += [amd_gpu.PACKET3(amd_gpu.PACKET3_EVENT_WRITE, 0), amd_gpu.EVENT_TYPE(7) | amd_gpu.EVENT_INDEX(4)]
return self._mark_command_end()
def update_exec(self, cmd_idx, global_size, local_size):
# Patch the exec cmd with new launch dims
assert self.q[self.cmd_offsets[cmd_idx] + 60] == amd_gpu.PACKET3(amd_gpu.PACKET3_DISPATCH_DIRECT, 3), f"Command at index {cmd_idx} is not exec"
self.q[self.cmd_offsets[cmd_idx] + 52 : self.cmd_offsets[cmd_idx] + 55] = array.array('I', local_size)
self.q[self.cmd_offsets[cmd_idx] + 61 : self.cmd_offsets[cmd_idx] + 64] = array.array('I', global_size)
def _update_exec(self, cmd_idx, global_size, local_size):
self._patch(cmd_idx, offset=52, data=local_size)
self._patch(cmd_idx, offset=61, data=global_size)
if (dp:=self.ptr_to_dispatch_packet.get(cmd_idx)) is not None:
dp.workgroup_size_x, dp.workgroup_size_y, dp.workgroup_size_z = local_size[0], local_size[1], local_size[2]
dp.grid_size_x, dp.grid_size_y, dp.grid_size_z = global_size[0]*local_size[0], global_size[1]*local_size[1], global_size[2]*local_size[2]
def wait(self, signal:hsa.amd_signal_t, value=0):
def _wait(self, signal:hsa.amd_signal_t, value=0):
addr = ctypes.addressof(signal) + SIGNAL_VALUE_OFFSET
self.q += [amd_gpu.PACKET3(amd_gpu.PACKET3_WAIT_REG_MEM, 5),
amd_gpu.WAIT_REG_MEM_MEM_SPACE(1) | amd_gpu.WAIT_REG_MEM_OPERATION(0) | amd_gpu.WAIT_REG_MEM_FUNCTION(WAIT_REG_MEM_FUNCTION_GEQ) | \
amd_gpu.WAIT_REG_MEM_ENGINE(0), *data64_le(addr), value, 0xffffffff, 4]
return self._mark_command_end()
def _release_mem(self, mem_event_type, mem_data_sel, mem_int_sel, address, value=0, cst=0, cache_flush=False):
cache_flush_flags = 0
@@ -164,34 +151,29 @@ class HWPM4Queue(HWQueue):
amd_gpu.PACKET3_RELEASE_MEM_DATA_SEL(mem_data_sel) | amd_gpu.PACKET3_RELEASE_MEM_INT_SEL(mem_int_sel) | amd_gpu.PACKET3_RELEASE_MEM_DST_SEL(0),
*data64_le(address), *data64_le(value), cst]
def timestamp(self, sig):
def _timestamp(self, signal):
self._release_mem(CACHE_FLUSH_AND_INV_TS_EVENT, mem_data_sel=3, mem_int_sel=0,
address=ctypes.addressof(sig) + getattr(hsa.amd_signal_t, 'start_ts').offset)
return self._mark_command_end()
address=ctypes.addressof(signal) + getattr(hsa.amd_signal_t, 'start_ts').offset)
def signal(self, signal:hsa.amd_signal_t, value=0):
def _signal(self, signal:hsa.amd_signal_t, value=0):
# NOTE: this needs an EOP buffer on the queue or it will NULL pointer
self._release_mem(CACHE_FLUSH_AND_INV_TS_EVENT, mem_data_sel=1, mem_int_sel=2, address=ctypes.addressof(signal) + SIGNAL_VALUE_OFFSET,
value=value, cache_flush=True)
if signal.event_mailbox_ptr != 0:
self._release_mem(CACHE_FLUSH_AND_INV_TS_EVENT, mem_data_sel=1, mem_int_sel=2, address=signal.event_mailbox_ptr,
value=signal.event_id, cst=signal.event_id, cache_flush=True)
return self._mark_command_end()
def update_wait(self, cmd_idx, signal=None, value=None):
assert self.q[self.cmd_offsets[cmd_idx]] == amd_gpu.PACKET3(amd_gpu.PACKET3_WAIT_REG_MEM, 5), f"Command at index {cmd_idx} is not wait"
if signal is not None: self._patch(self.cmd_offsets[cmd_idx] + 2, [*data64_le(ctypes.addressof(signal) + SIGNAL_VALUE_OFFSET)])
if value is not None: self.q[self.cmd_offsets[cmd_idx] + 4] = value
return self
def _update_wait(self, cmd_idx, signal=None, value=None):
if signal is not None: self._patch(cmd_idx, offset=2, data=data64_le(ctypes.addressof(signal) + SIGNAL_VALUE_OFFSET))
if value is not None: self._patch(cmd_idx, offset=4, data=[value])
def update_signal(self, cmd_idx, signal=None, value=None):
assert self.q[self.cmd_offsets[cmd_idx]] == amd_gpu.PACKET3(amd_gpu.PACKET3_RELEASE_MEM, 6), f"Command at index {cmd_idx} is not signal"
if signal is not None:
self._patch(self.cmd_offsets[cmd_idx] + 3, [*data64_le(ctypes.addressof(signal) + SIGNAL_VALUE_OFFSET)])
if self.cmd_offsets[cmd_idx + 1] - self.cmd_offsets[cmd_idx] > 8: # has trap info
self._patch(self.cmd_offsets[cmd_idx] + 8 + 3, [*data64_le(signal.event_mailbox_ptr), *data64_le(signal.event_id), signal.event_id])
if value is not None: self._patch(self.cmd_offsets[cmd_idx] + 5, [*data64_le(value)])
return self
def _update_signal(self, cmd_idx, signal=None, value=None):
if signal is not None: self._patch(cmd_idx, offset=3, data=data64_le(ctypes.addressof(signal) + SIGNAL_VALUE_OFFSET))
if value is not None: self._patch(cmd_idx, offset=5, data=data64_le(value))
# Check if the signal command has mailptr part
if signal is not None and self.cmds_len[cmd_idx] > 8:
self._patch(cmd_idx, offset=11, data=[*data64_le(signal.event_mailbox_ptr), *data64_le(signal.event_id), signal.event_id])
def bind(self, device: AMDDevice):
self.binded_device = device
@@ -203,7 +185,7 @@ class HWPM4Queue(HWQueue):
len(self.q) | amd_gpu.INDIRECT_BUFFER_VALID]
self.q = hw_view # type: ignore
def submit(self, device: AMDDevice):
def _submit(self, device):
cmds = self.indirect_cmd if device == self.binded_device else self.q
for i, value in enumerate(cmds): device.compute_queue.ring[(device.compute_queue.put_value + i) % len(device.compute_queue.ring)] = value
@@ -211,10 +193,9 @@ class HWPM4Queue(HWQueue):
device.compute_queue.put_value += len(cmds)
device.compute_queue.write_ptr[0] = device.compute_queue.put_value
device.compute_queue.doorbell[0] = device.compute_queue.put_value
return self
SDMA_MAX_COPY_SIZE = 0x400000
class HWCopyQueue(HWQueue):
class AMDCopyQueue(HWCopyQueue):
def __init__(self):
self.internal_cmd_sizes, self.copy_cmds_per_copy = [], {}
super().__init__()
@@ -223,13 +204,13 @@ class HWCopyQueue(HWQueue):
self.q += arr
self.internal_cmd_sizes.append(len(arr))
def copy(self, dest, src, copy_size):
def _copy(self, dest, src, copy_size):
# Invalidate cache inv
self._q([amd_gpu.SDMA_OP_GCR_REQ, 0, amd_gpu.SDMA_GCR_GLM_INV | amd_gpu.SDMA_GCR_GLK_INV | amd_gpu.SDMA_GCR_GLK_WB | amd_gpu.SDMA_GCR_GLV_INV | \
amd_gpu.SDMA_GCR_GL1_INV | amd_gpu.SDMA_GCR_GL2_WB | amd_gpu.SDMA_GCR_GL2_INV, 0, 0])
copied, copy_commands = 0, (copy_size + SDMA_MAX_COPY_SIZE - 1) // SDMA_MAX_COPY_SIZE
self.copy_cmds_per_copy[len(self)] = copy_commands
self.copy_cmds_per_copy[len(self) - 1] = copy_commands
for _ in range(copy_commands):
step_copy_size = min(copy_size - copied, SDMA_MAX_COPY_SIZE)
@@ -241,48 +222,33 @@ class HWCopyQueue(HWQueue):
# Invalidate cache wb
self._q([amd_gpu.SDMA_OP_GCR_REQ, 0, amd_gpu.SDMA_GCR_GLK_WB | amd_gpu.SDMA_GCR_GL2_WB, 0, 0])
return self._mark_command_end()
def update_copy(self, cmd_idx, dest=None, src=None):
def _update_copy(self, cmd_idx, dest=None, src=None):
for i in range(self.copy_cmds_per_copy[cmd_idx]):
if src is not None: self.q[(sigoff:=self.cmd_offsets[cmd_idx]+8+i*7):sigoff+2] = array.array('I', [*data64_le(src + SDMA_MAX_COPY_SIZE*i)])
if dest is not None: self.q[(sigoff:=self.cmd_offsets[cmd_idx]+10+i*7):sigoff+2] = array.array('I', [*data64_le(dest + SDMA_MAX_COPY_SIZE*i)])
return self
if src is not None: self._patch(cmd_idx, offset=8+i*7, data=[*data64_le(src + SDMA_MAX_COPY_SIZE*i)])
if dest is not None: self._patch(cmd_idx, offset=10+i*7, data=[*data64_le(dest + SDMA_MAX_COPY_SIZE*i)])
def signal(self, signal: hsa.amd_signal_t, value=0):
def _signal(self, signal: hsa.amd_signal_t, value=0):
self._q([amd_gpu.SDMA_OP_FENCE | amd_gpu.SDMA_PKT_FENCE_HEADER_MTYPE(3), *data64_le(ctypes.addressof(signal) + SIGNAL_VALUE_OFFSET), value])
if signal.event_mailbox_ptr != 0:
self._q([amd_gpu.SDMA_OP_FENCE | amd_gpu.SDMA_PKT_FENCE_HEADER_MTYPE(3), *data64_le(signal.event_mailbox_ptr), signal.event_id])
self._q([amd_gpu.SDMA_OP_TRAP, amd_gpu.SDMA_PKT_TRAP_INT_CONTEXT_INT_CONTEXT(signal.event_id)])
return self._mark_command_end()
def wait(self, signal: hsa.amd_signal_t, value=0):
def _wait(self, signal: hsa.amd_signal_t, value=0):
self._q([amd_gpu.SDMA_OP_POLL_REGMEM | amd_gpu.SDMA_PKT_POLL_REGMEM_HEADER_FUNC(WAIT_REG_MEM_FUNCTION_GEQ) | \
amd_gpu.SDMA_PKT_POLL_REGMEM_HEADER_MEM_POLL(1), *data64_le(ctypes.addressof(signal) + SIGNAL_VALUE_OFFSET), value, 0xffffffff,
amd_gpu.SDMA_PKT_POLL_REGMEM_DW5_INTERVAL(0x04) | amd_gpu.SDMA_PKT_POLL_REGMEM_DW5_RETRY_COUNT(0xfff)])
return self._mark_command_end()
def _update_signal(self, cmd_idx, signal=None, value=None): return self._update_wait(cmd_idx, signal, value) # the same offsets and commands
def _update_wait(self, cmd_idx, signal=None, value=None):
if signal is not None: self._patch(cmd_idx, offset=1, data=data64_le(ctypes.addressof(signal) + SIGNAL_VALUE_OFFSET))
if value is not None: self._patch(cmd_idx, offset=3, data=[value])
def update_signal(self, cmd_idx, signal=None, value=None):
assert self.q[self.cmd_offsets[cmd_idx]] & 0xf == amd_gpu.SDMA_OP_FENCE, f"Command at index {cmd_idx} is not signal"
if signal is not None: self._patch(self.cmd_offsets[cmd_idx] + 1, [*data64_le(ctypes.addressof(signal) + SIGNAL_VALUE_OFFSET)])
if value is not None: self.q[self.cmd_offsets[cmd_idx] + 3] = value
return self
def update_wait(self, cmd_idx, signal=None, value=None):
assert self.q[self.cmd_offsets[cmd_idx]] & 0xf == amd_gpu.SDMA_OP_POLL_REGMEM, f"Command at index {cmd_idx} is not wait"
if signal is not None: self._patch(self.cmd_offsets[cmd_idx] + 1, [*data64_le(ctypes.addressof(signal) + SIGNAL_VALUE_OFFSET)])
if value is not None: self.q[self.cmd_offsets[cmd_idx] + 3] = value
return self
def timestamp(self, sig: hsa.amd_signal_t):
def _timestamp(self, signal:hsa.amd_signal_t):
self._q([amd_gpu.SDMA_OP_TIMESTAMP | amd_gpu.SDMA_PKT_TIMESTAMP_GET_HEADER_SUB_OP(amd_gpu.SDMA_SUBOP_TIMESTAMP_GET_GLOBAL),
*data64_le(ctypes.addressof(sig) + getattr(hsa.amd_signal_t, 'start_ts').offset)])
return self._mark_command_end()
*data64_le(ctypes.addressof(signal) + getattr(hsa.amd_signal_t, 'start_ts').offset)])
def submit(self, device: AMDDevice):
def _submit(self, device):
if device.sdma_queue.put_value - device.sdma_queue.read_ptr[0] > device.sdma_queue.ring.nbytes: raise RuntimeError("SDMA queue overrun")
tail_blit_dword = 0
@@ -304,7 +270,6 @@ class HWCopyQueue(HWQueue):
device.sdma_queue.write_ptr[0] = device.sdma_queue.put_value
device.sdma_queue.doorbell[0] = device.sdma_queue.put_value
return self
SHT_PROGBITS, SHF_ALLOC = 0x1, 0x2
class AMDProgram:
@@ -351,7 +316,7 @@ class AMDProgram:
self.prog_addr = self.lib_gpu.va_addr + entry_point + code.kernel_code_entry_byte_offset
HWPM4Queue().memory_barrier().submit(self.device)
AMDComputeQueue().memory_barrier().submit(self.device)
# NOTE: no programs are ever freed
def __del__(self):
@@ -371,7 +336,7 @@ class AMDProgram:
for i in range(len(args)): args_st.__setattr__(f'f{i}', args[i].va_addr)
for i in range(len(vals)): args_st.__setattr__(f'v{i}', vals[i])
q = HWPM4Queue().wait(self.device.timeline_signal, self.device.timeline_value - 1).memory_barrier()
q = AMDComputeQueue().wait(self.device.timeline_signal, self.device.timeline_value - 1).memory_barrier()
with hcq_profile(self.device, queue=q, desc=self.name, enabled=wait or PROFILE) as (sig_st, sig_en):
q.exec(self, self.device.kernargs_ptr, global_size, local_size)
@@ -520,8 +485,9 @@ class AMDDevice(HCQCompatCompiled):
self.compute_queue = self._alloc_queue(kfd.KFD_IOC_QUEUE_TYPE_COMPUTE, 0x100000, ctx_save_restore_size=0x2C02000, eop_buffer_size=0x1000)
self.sdma_queue = self._alloc_queue(kfd.KFD_IOC_QUEUE_TYPE_SDMA, 0x100000)
super().__init__(device, AMDAllocator(self), AMDRenderer(), AMDCompiler(self.arch), functools.partial(AMDProgram, self), HWPM4Queue, HWCopyQueue,
timeline_signals=[self._alloc_signal(sync_event=sync_event), self._alloc_signal(sync_event=kio.create_event(AMDDevice.kfd, auto_reset=1))])
timeline_signals=[self._alloc_signal(sync_event=sync_event), self._alloc_signal(sync_event=kio.create_event(AMDDevice.kfd, auto_reset=1))]
super().__init__(device, AMDAllocator(self), AMDRenderer(), AMDCompiler(self.arch), functools.partial(AMDProgram, self),
AMDComputeQueue, AMDCopyQueue, timeline_signals)
def _gpu2cpu_time(self, gpu_time, is_copy):
if is_copy: return self.copy_cpu_start_time + (gpu_time - self.copy_gpu_start_time) / 1e2

View File

@@ -1,8 +1,9 @@
from __future__ import annotations
import os, ctypes, contextlib, pathlib, re, fcntl, functools, mmap, struct, tempfile, hashlib, subprocess, time, array
from typing import Tuple, List, Any
from typing import Tuple, List, Any, cast
from dataclasses import dataclass
from tinygrad.device import HCQCompatCompiled, HCQCompatAllocator, HCQCompatAllocRes, Compiler, CompileError, BufferOptions, hcq_profile
from tinygrad.device import HCQCompatCompiled, HCQCompatAllocator, HCQCompatAllocRes, HWCommandQueue, HWComputeQueue, HWCopyQueue, hcq_command, \
hcq_profile, Compiler, CompileError, BufferOptions
from tinygrad.helpers import getenv, from_mv, mv_address, init_c_struct_t, to_mv, round_up, to_char_p_p, DEBUG, prod, PROFILE
from tinygrad.renderer.cstyle import NVRenderer
from tinygrad.runtime.ops_cuda import check as cuda_check, _get_bytes, CUDACompiler, PTXCompiler, PTX
@@ -90,38 +91,26 @@ class NVPTXCompiler(NVCompiler):
raise CompileError(f"compile failed: {_get_bytes(handle, nvrtc.nvJitLinkGetErrorLog, nvrtc.nvJitLinkGetErrorLogSize, jitlink_check).decode()}")
return _get_bytes(handle, nvrtc.nvJitLinkGetLinkedCubin, nvrtc.nvJitLinkGetLinkedCubinSize, jitlink_check)
class HWQueue:
def __init__(self): self.q, self.binded_device, self.cmd_offsets = [], None, [0]
class NVCommandQueue(HWCommandQueue): # pylint: disable=abstract-method
def __del__(self):
if self.binded_device is not None:
self.binded_device.synchronize() # Synchronize to ensure the buffer is no longer in use.
self.binded_device._gpu_free(self.hw_page)
def _mark_command_end(self):
self.cmd_offsets.append(len(self.q))
return self
def __len__(self): return len(self.cmd_offsets) - 1
def memory_barrier(self): return self._mark_command_end()
def wait(self, signal, value=0):
def _wait(self, signal, value=0):
self.q += [nvmethod(0, nv_gpu.NVC56F_SEM_ADDR_LO, 5), *nvdata64_le(ctypes.addressof(from_mv(signal))), *nvdata64_le(value),
(3 << 0) | (1 << 24)] # ACQUIRE | PAYLOAD_SIZE_64BIT
return self._mark_command_end()
def timestamp(self, signal): return HWQueue.signal(self, signal, timestamp=True)
def signal(self, signal, value=0, timestamp=False):
def _signal(self, signal, value=0, timestamp=False):
self.q += [nvmethod(0, nv_gpu.NVC56F_SEM_ADDR_LO, 5), *nvdata64_le(ctypes.addressof(from_mv(signal))), *nvdata64_le(value),
(1 << 0) | (1 << 20) | (1 << 24) | ((1 << 25) if timestamp else 0)] # RELEASE | RELEASE_WFI | PAYLOAD_SIZE_64BIT | RELEASE_TIMESTAMP
self.q += [nvmethod(0, nv_gpu.NVC56F_NON_STALL_INTERRUPT, 1), 0x0]
return self._mark_command_end()
def _timestamp(self, signal): return NVCommandQueue._signal(self, signal, timestamp=True)
def update_signal(self, cmd_idx, signal=None, value=None): return self.update_wait(cmd_idx, signal, value) # the same offsets and commands
def update_wait(self, cmd_idx, signal=None, value=None):
if signal is not None: self.q[(sigoff:=self.cmd_offsets[cmd_idx]+1):sigoff+2] = array.array('I', [*nvdata64_le(mv_address(signal))])
if value is not None: self.q[(valoff:=self.cmd_offsets[cmd_idx]+3):valoff+2] = array.array('I', [*nvdata64_le(value)])
return self
def _update_signal(self, cmd_idx, signal=None, value=None): return self._update_wait(cmd_idx, signal, value) # the same offsets and commands
def _update_wait(self, cmd_idx, signal=None, value=None):
if signal is not None: self.q[(sigoff:=self.cmds_offset[cmd_idx]+1):sigoff+2] = array.array('I', nvdata64_le(mv_address(signal)))
if value is not None: self.q[(valoff:=self.cmds_offset[cmd_idx]+3):valoff+2] = array.array('I', nvdata64_le(value))
def bind(self, device: NVDevice):
self.binded_device = device
@@ -132,7 +121,7 @@ class HWQueue:
# From now on, the queue is on the device for faster submission.
self.q = hw_view # type: ignore
def _submit(self, dev, gpfifo:GPFifo):
def _submit_to_gpfifo(self, dev, gpfifo:GPFifo):
if len(self.q) == 0: return
if dev == self.binded_device: cmdq_addr = self.hw_page.va_addr
@@ -151,29 +140,31 @@ class HWQueue:
dev.gpu_mmio[0x90 // 4] = gpfifo.token
gpfifo.put_value += 1
class HWComputeQueue(HWQueue):
class NVComputeQueue(NVCommandQueue, HWComputeQueue):
def __init__(self):
self.cmd_idx_to_qmd, self.cmd_idx_to_global_dims, self.cmd_idx_to_local_dims = {}, {}, {}
super().__init__()
@hcq_command
def copy_from_cpu(self, gpuaddr, data):
self.q += [nvmethod(1, nv_gpu.NVC6C0_OFFSET_OUT_UPPER, 2), *nvdata64(gpuaddr)]
self.q += [nvmethod(1, nv_gpu.NVC6C0_LINE_LENGTH_IN, 2), len(data)*4, 0x1]
self.q += [nvmethod(1, nv_gpu.NVC6C0_LAUNCH_DMA, 1), 0x41]
self.q += [nvmethod(1, nv_gpu.NVC6C0_LOAD_INLINE_DATA, len(data), typ=6)] + list(data)
return self._mark_command_end()
def exec(self, prg, kernargs, global_size=(1,1,1), local_size=(1,1,1)):
def _exec(self, prg, kernargs, global_size, local_size):
cmd_idx = len(self) - 1
ctypes.memmove(qmd_addr:=(kernargs + round_up(prg.constbuf_0_size, 1 << 8)), ctypes.addressof(prg.qmd), 0x40 * 4)
self.cmd_idx_to_qmd[len(self)] = qmd = qmd_struct_t.from_address(qmd_addr) # Save qmd for later update
self.cmd_idx_to_global_dims[len(self)] = to_mv(qmd_addr + nv_gpu.NVC6C0_QMDV03_00_CTA_RASTER_WIDTH[1] // 8, 12).cast('I')
self.cmd_idx_to_local_dims[len(self)] = to_mv(qmd_addr + nv_gpu.NVC6C0_QMDV03_00_CTA_THREAD_DIMENSION0[1] // 8, 6).cast('H')
self.cmd_idx_to_qmd[cmd_idx] = qmd = qmd_struct_t.from_address(qmd_addr) # Save qmd for later update
self.cmd_idx_to_global_dims[cmd_idx] = to_mv(qmd_addr + nv_gpu.NVC6C0_QMDV03_00_CTA_RASTER_WIDTH[1] // 8, 12).cast('I')
self.cmd_idx_to_local_dims[cmd_idx] = to_mv(qmd_addr + nv_gpu.NVC6C0_QMDV03_00_CTA_THREAD_DIMENSION0[1] // 8, 6).cast('H')
qmd.cta_raster_width, qmd.cta_raster_height, qmd.cta_raster_depth = global_size
qmd.cta_thread_dimension0, qmd.cta_thread_dimension1, qmd.cta_thread_dimension2 = local_size
qmd.constant_buffer_addr_upper_0, qmd.constant_buffer_addr_lower_0 = nvdata64(kernargs)
if (prev_qmd:=self.cmd_idx_to_qmd.get(len(self) - 1)) is None:
if (prev_qmd:=self.cmd_idx_to_qmd.get(cmd_idx - 1)) is None:
self.q += [nvmethod(1, nv_gpu.NVC6C0_INVALIDATE_SHADER_CACHES_NO_WFI, 1), (1 << 12) | (1 << 4) | (1 << 0)]
self.q += [nvmethod(1, nv_gpu.NVC6C0_SEND_PCAS_A, 0x1), qmd_addr >> 8]
self.q += [nvmethod(1, nv_gpu.NVC6C0_SEND_SIGNALING_PCAS2_B, 0x1), 9]
@@ -182,52 +173,45 @@ class HWComputeQueue(HWQueue):
prev_qmd.dependent_qmd0_action = 1
prev_qmd.dependent_qmd0_prefetch = 1
prev_qmd.dependent_qmd0_enable = 1
return self._mark_command_end()
def update_exec(self, cmd_idx, global_size, local_size):
def _update_exec(self, cmd_idx, global_size, local_size):
# Patch the exec cmd with new launch dims
self.cmd_idx_to_global_dims[cmd_idx][:] = array.array('I', global_size)
self.cmd_idx_to_local_dims[cmd_idx][:] = array.array('H', local_size)
def signal(self, signal, value=0):
if (prev_qmd:=self.cmd_idx_to_qmd.get(len(self) - 1)) is None or prev_qmd.release0_enable == 1: return super().signal(signal, value)
def _signal(self, signal, value=0):
if (prev_qmd:=self.cmd_idx_to_qmd.get(len(self) - 2)) is None or prev_qmd.release0_enable == 1: return super()._signal(signal, value)
prev_qmd.release0_address_upper, prev_qmd.release0_address_lower = nvdata64(ctypes.addressof(from_mv(signal)))
prev_qmd.release0_payload_upper, prev_qmd.release0_payload_lower = nvdata64(value)
prev_qmd.release0_enable = 1
self.cmd_idx_to_qmd[len(self)] = prev_qmd # this command is embedded into qmd.
return self._mark_command_end()
self.cmd_idx_to_qmd[len(self) - 1] = prev_qmd # this command is embedded into qmd.
def update_signal(self, cmd_idx, signal=None, value=None):
if (qmd:=self.cmd_idx_to_qmd.get(cmd_idx)) is None: return super().update_signal(cmd_idx, signal, value)
def _update_signal(self, cmd_idx, signal=None, value=None):
if (qmd:=self.cmd_idx_to_qmd.get(cmd_idx)) is None: return super()._update_signal(cmd_idx, signal, value)
if signal is not None: qmd.release0_address_upper, qmd.release0_address_lower = nvdata64(ctypes.addressof(from_mv(signal)))
if value is not None: qmd.release0_payload_upper, qmd.release0_payload_lower = nvdata64(value)
return self
def submit(self, dev:NVDevice): self._submit(dev, dev.compute_gpfifo)
def _submit(self, device): self._submit_to_gpfifo(device, cast(NVDevice, device).compute_gpfifo)
class HWCopyQueue(HWQueue):
def copy(self, dest, src, copy_size):
class NVCopyQueue(NVCommandQueue, HWCopyQueue):
def _copy(self, dest, src, copy_size):
self.q += [nvmethod(4, nv_gpu.NVC6B5_OFFSET_IN_UPPER, 4), *nvdata64(src), *nvdata64(dest)]
self.q += [nvmethod(4, nv_gpu.NVC6B5_LINE_LENGTH_IN, 1), copy_size]
self.q += [nvmethod(4, nv_gpu.NVC6B5_LAUNCH_DMA, 1), 0x182] # TRANSFER_TYPE_NON_PIPELINED | DST_MEMORY_LAYOUT_PITCH | SRC_MEMORY_LAYOUT_PITCH
return self._mark_command_end()
def update_copy(self, cmd_idx, dest=None, src=None):
if dest is not None: self.q[(sigoff:=self.cmd_offsets[cmd_idx]+3):sigoff+2] = array.array('I', [*nvdata64(dest)])
if src is not None: self.q[(sigoff:=self.cmd_offsets[cmd_idx]+1):sigoff+2] = array.array('I', [*nvdata64(src)])
return self
def _update_copy(self, cmd_idx, dest=None, src=None):
if dest is not None: self._patch(cmd_idx, offset=3, data=nvdata64(dest))
if src is not None: self._patch(cmd_idx, offset=1, data=nvdata64(src))
def signal(self, signal, value=0):
def _signal(self, signal, value=0):
self.q += [nvmethod(4, nv_gpu.NVC6B5_SET_SEMAPHORE_A, 4), *nvdata64(ctypes.addressof(from_mv(signal))), value, 4]
self.q += [nvmethod(4, nv_gpu.NVC6B5_LAUNCH_DMA, 1), 0x14]
return self._mark_command_end()
def update_signal(self, cmd_idx, signal=None, value=None):
if signal is not None: self.q[(sigoff:=self.cmd_offsets[cmd_idx]+1):sigoff+2] = array.array('I', [*nvdata64(mv_address(signal))])
if value is not None: self.q[self.cmd_offsets[cmd_idx]+3] = value
return self
def _update_signal(self, cmd_idx, signal=None, value=None):
if signal is not None: self._patch(cmd_idx, offset=1, data=nvdata64(mv_address(signal)))
if value is not None: self._patch(cmd_idx, offset=3, data=[value])
def submit(self, dev:NVDevice): self._submit(dev, dev.dma_gpfifo)
def _submit(self, device): self._submit_to_gpfifo(device, cast(NVDevice, device).dma_gpfifo)
SHT_PROGBITS, SHT_NOBITS, SHF_ALLOC, SHF_EXECINSTR = 0x1, 0x8, 0x2, 0x4
class NVProgram:
@@ -306,12 +290,12 @@ class NVProgram:
elif self.rel_info[rel_i+2] == 0x38: self.program[self.rel_info[rel_i]//4 + 1] = (global_init_addr & 0xffffffff) # R_CUDA_ABS32_LO_32
else: raise RuntimeError(f"unknown reloc: {self.rel_info[rel_i+2]}")
HWComputeQueue().wait(self.device.timeline_signal, self.device.timeline_value - 1).submit(self.device)
NVComputeQueue().wait(self.device.timeline_signal, self.device.timeline_value - 1).submit(self.device)
for st in range(0, len(self.program), 4095):
HWComputeQueue().copy_from_cpu(self.lib_gpu.va_addr+st*4, self.program[st:st+4095]).submit(self.device)
NVComputeQueue().copy_from_cpu(self.lib_gpu.va_addr+st*4, self.program[st:st+4095]).submit(self.device)
if self.global_init is not None:
HWComputeQueue().copy_from_cpu(load_addr:=(self.lib_gpu.va_addr + off), self.global_init).submit(self.device)
NVComputeQueue().copy_from_cpu(load_addr:=(self.lib_gpu.va_addr + off), self.global_init).submit(self.device)
off += round_up(self.global_init.nbytes, 128)
if 4 in constant_buffers_data: # >= 12.4
# Constbuffer 4 contains a pointer to nv.global.init, load section and set up the pointer.
@@ -324,10 +308,10 @@ class NVProgram:
self.qmd.__setattr__(f'constant_buffer_size_shifted4_{i}', data.nbytes)
self.qmd.__setattr__(f'constant_buffer_valid_{i}', 1)
HWComputeQueue().copy_from_cpu(self.lib_gpu.va_addr + off, data).submit(self.device)
NVComputeQueue().copy_from_cpu(self.lib_gpu.va_addr + off, data).submit(self.device)
off += round_up(data.nbytes, 128)
HWComputeQueue().signal(self.device.timeline_signal, self.device.timeline_value).submit(self.device)
NVComputeQueue().signal(self.device.timeline_signal, self.device.timeline_value).submit(self.device)
self.device.timeline_value += 1
self.device.synchronize()
@@ -346,7 +330,7 @@ class NVProgram:
if MOCKGPU: self.constbuffer_0[0:2] = [len(args), len(vals)]
kernargs = [arg_half for arg in args for arg_half in nvdata64_le(arg.va_addr)] + list(vals)
q = HWComputeQueue().wait(self.device.timeline_signal, self.device.timeline_value - 1) \
q = NVComputeQueue().wait(self.device.timeline_signal, self.device.timeline_value - 1) \
.copy_from_cpu(self.device.kernargs_ptr, self.constbuffer_0 + kernargs)
with hcq_profile(self.device, queue=q, desc=self.name, enabled=wait or PROFILE) as (sig_st, sig_en):
@@ -561,7 +545,7 @@ class NVDevice(HCQCompatCompiled):
compiler_t = (PTXCompiler if PTX else CUDACompiler) if MOCKGPU else (NVPTXCompiler if PTX else NVCompiler)
super().__init__(device, NVAllocator(self), PTXRenderer(self.arch, device="NV") if PTX else NVRenderer(self.arch), compiler_t(self.arch),
functools.partial(NVProgram, self), HWComputeQueue, HWCopyQueue, timeline_signals=[self._alloc_signal(), self._alloc_signal()])
functools.partial(NVProgram, self), NVComputeQueue, NVCopyQueue, timeline_signals=[self._alloc_signal(), self._alloc_signal()])
self._cmdq_setup_compute_gpfifo()
self._cmdq_setup_dma_gpfifo()
@@ -624,7 +608,7 @@ class NVDevice(HCQCompatCompiled):
# Set windows addresses to not collide with other allocated buffers.
self.shared_mem_window, self.local_mem_window, self.slm_per_thread = 0xfe000000, 0xff000000, 0
queue = HWComputeQueue()
queue = NVComputeQueue()
queue.q += [nvmethod(1, nv_gpu.NVC6C0_SET_OBJECT, 1), self.compute_class]
queue.q += [nvmethod(1, nv_gpu.NVC6C0_SET_SHADER_LOCAL_MEMORY_WINDOW_A, 2), *nvdata64(self.local_mem_window)]
queue.q += [nvmethod(1, nv_gpu.NVC6C0_SET_SHADER_SHARED_MEMORY_WINDOW_A, 2), *nvdata64(self.shared_mem_window)]
@@ -634,7 +618,7 @@ class NVDevice(HCQCompatCompiled):
self.synchronize()
def _cmdq_setup_dma_gpfifo(self):
queue = HWCopyQueue()
queue = NVCopyQueue()
queue.q += [nvmethod(4, nv_gpu.NVC6C0_SET_OBJECT, 1), nv_gpu.AMPERE_DMA_COPY_B]
queue.signal(self.timeline_signal, self.timeline_value).submit(self)
self.timeline_value += 1
@@ -652,7 +636,7 @@ class NVDevice(HCQCompatCompiled):
bytes_per_tpc = round_up(bytes_per_warp * 48 * 2, 0x8000)
self.shader_local_mem = self._gpu_alloc(round_up(bytes_per_tpc * 64, 0x20000), huge_page=True, contig=True)
queue = HWComputeQueue()
queue = NVComputeQueue()
queue.q += [nvmethod(1, nv_gpu.NVC6C0_SET_SHADER_LOCAL_MEMORY_A, 2), *nvdata64(self.shader_local_mem.va_addr)]
queue.q += [nvmethod(1, nv_gpu.NVC6C0_SET_SHADER_LOCAL_MEMORY_NON_THROTTLED_A, 3), *nvdata64(bytes_per_tpc), 0x40]
queue.signal(self.timeline_signal, self.timeline_value).submit(self)