diff --git a/tinygrad/device.py b/tinygrad/device.py index dbeb65ade9..7bbc552ae2 100644 --- a/tinygrad/device.py +++ b/tinygrad/device.py @@ -3,7 +3,7 @@ import multiprocessing from dataclasses import dataclass from collections import defaultdict from typing import List, Optional, Dict, Tuple, Any, cast, Protocol -import importlib, inspect, functools, pathlib, os, ctypes, atexit, time, contextlib +import importlib, inspect, functools, pathlib, os, ctypes, atexit, time, contextlib, array from tinygrad.helpers import getenv, diskcache_get, diskcache_put, DEBUG, GlobalCounters, flat_mv, from_mv, ProfileLogger, PROFILE from tinygrad.dtype import DType, ImageDType from tinygrad.renderer import Renderer @@ -186,6 +186,84 @@ class Compiled: # **************** for HCQ Compatible Devices **************** +def hcq_command(func): + """ + Decorator for HWCommandQueue commands. + + Enables command indexing and stores metadata for command updates. + + Usage: + @hcq_command + def command_method(self, ...): ... + """ + def __wrapper(self, *args, **kwargs): + self.cmds_offset.append(len(self.q)) + func(self, *args, **kwargs) + self.cmds_len.append(len(self.q) - self.cmds_offset[-1]) + self.cmds_meta.append(func.__name__) + return self + return __wrapper + +class HWCommandQueue: + def __init__(self): self.q, self.binded_device, self.cmds_offset, self.cmds_len, self.cmds_meta = [], None, [], [], [] + def __len__(self): return len(self.cmds_offset) + def _patch(self, cmd_idx, offset, data): self.q[(st:=self.cmds_offset[cmd_idx]+offset):st+len(data)] = array.array('I', data) + + @hcq_command + def signal(self, signal, value): self._signal(signal, value) + def _signal(self, signal, value): raise NotImplementedError("backend should overload this function") + + @hcq_command + def wait(self, signal, value): self._wait(signal, value) + def _wait(self, signal, value): raise NotImplementedError("backend should overload this function") + + @hcq_command + def timestamp(self, signal): self._timestamp(signal) + def _timestamp(self, signal): raise NotImplementedError("backend should overload this function") + + def update_signal(self, cmd_idx, signal=None, value=None): + if self.cmds_meta[cmd_idx] != "signal": raise RuntimeError("called update_signal not on a signal command") + self._update_signal(cmd_idx, signal, value) + return self + def _update_signal(self, cmd_idx, signal, value): raise NotImplementedError("backend should overload this function") + + def update_wait(self, cmd_idx, signal=None, value=None): + if self.cmds_meta[cmd_idx] != "wait": raise RuntimeError("called update_wait not on a wait command") + self._update_wait(cmd_idx, signal, value) + return self + def _update_wait(self, cmd_idx, signal, value): raise NotImplementedError("backend should overload this function") + + def submit(self, device:HCQCompatCompiled): + self._submit(device) + return self + def _submit(self, device:HCQCompatCompiled): raise NotImplementedError("backend should overload this function") + +class HWComputeQueue(HWCommandQueue): + @hcq_command + def memory_barrier(self): self._memory_barrier() + def _memory_barrier(self): pass + + @hcq_command + def exec(self, prg, kernargs, global_size, local_size): self._exec(prg, kernargs, global_size, local_size) + def _exec(self, prg, kernargs, global_size, local_size): raise NotImplementedError("backend should overload this function") + + def update_exec(self, cmd_idx, global_size, local_size): + if self.cmds_meta[cmd_idx] != "exec": raise RuntimeError("called update_exec not on an exec command") + self._update_exec(cmd_idx, global_size, local_size) + return self + def _update_exec(self, cmd_idx, global_size, local_size): raise NotImplementedError("backend should overload this function") + +class HWCopyQueue(HWCommandQueue): + @hcq_command + def copy(self, dest, src, copy_size): self._copy(dest, src, copy_size) + def _copy(self, dest, src, copy_size): raise NotImplementedError("backend should overload this function") + + def update_copy(self, cmd_idx, dest=None, src=None): + if self.cmds_meta[cmd_idx] != "copy": raise RuntimeError("called update_copy not on an copy command") + self._update_copy(cmd_idx, dest, src) + return self + def _update_copy(self, cmd_idx, dest, src): raise NotImplementedError("backend should overload this function") + @contextlib.contextmanager def hcq_profile(dev, enabled, desc, queue_type=None, queue=None): st, en = (dev._alloc_signal(), dev._alloc_signal()) if enabled else (None, None) diff --git a/tinygrad/runtime/ops_amd.py b/tinygrad/runtime/ops_amd.py index 335543b071..a0c13a4b3c 100644 --- a/tinygrad/runtime/ops_amd.py +++ b/tinygrad/runtime/ops_amd.py @@ -2,7 +2,8 @@ from __future__ import annotations from typing import Tuple, List, Any import os, fcntl, ctypes, ctypes.util, functools, re, pathlib, mmap, struct, errno, subprocess, time, array from dataclasses import dataclass -from tinygrad.device import HCQCompatCompiled, HCQCompatAllocator, HCQCompatAllocRes, Compiler, CompileError, BufferOptions, hcq_profile +from tinygrad.device import HCQCompatCompiled, HCQCompatAllocator, HCQCompatAllocRes, HWComputeQueue, HWCopyQueue, hcq_profile, \ + Compiler, CompileError, BufferOptions from tinygrad.helpers import getenv, init_c_struct_t, to_mv, round_up, DEBUG, PROFILE, mv_address from tinygrad.renderer.cstyle import AMDRenderer from tinygrad.runtime.driver.hip_comgr import compile_hip @@ -71,17 +72,9 @@ class AMDCompiler(Compiler): try: return compile_hip(src, self.arch) except RuntimeError as e: raise CompileError(e) from e -class HWQueue: - def __init__(self): self.q, self.cmd_offsets = [], [0] - def _mark_command_end(self): - self.cmd_offsets.append(len(self.q)) - return self - def _patch(self, off, data): self.q[off:off+len(data)] = array.array('I', data) - def __len__(self): return len(self.cmd_offsets) - 1 - -class HWPM4Queue(HWQueue): +class AMDComputeQueue(HWComputeQueue): def __init__(self): - self.binded_device, self.ptr_to_dispatch_packet = None, {} + self.ptr_to_dispatch_packet = {} super().__init__() def __del__(self): @@ -97,14 +90,13 @@ class HWPM4Queue(HWQueue): amd_gpu.PACKET3_ACQUIRE_MEM_GCR_CNTL_GLV_INV(glv) | amd_gpu.PACKET3_ACQUIRE_MEM_GCR_CNTL_GL1_INV(gl1) | \ amd_gpu.PACKET3_ACQUIRE_MEM_GCR_CNTL_GL2_INV(gl2) | amd_gpu.PACKET3_ACQUIRE_MEM_GCR_CNTL_GL2_WB(gl2)] - def memory_barrier(self): + def _memory_barrier(self): self.q += [amd_gpu.PACKET3(amd_gpu.PACKET3_WAIT_REG_MEM, 5), amd_gpu.WAIT_REG_MEM_MEM_SPACE(0) | amd_gpu.WAIT_REG_MEM_OPERATION(1) | \ amd_gpu.WAIT_REG_MEM_FUNCTION(WAIT_REG_MEM_FUNCTION_EQ) | amd_gpu.WAIT_REG_MEM_ENGINE(0), nbioreg(regBIF_BX_PF1_GPU_HDP_FLUSH_REQ), nbioreg(regBIF_BX_PF1_GPU_HDP_FLUSH_DONE), 0xffffffff, 0xffffffff, 0x20] self._invalidate_cache() - return self._mark_command_end() - def exec(self, prg, kernargs, global_size:Tuple[int,int,int]=(1,1,1), local_size:Tuple[int,int,int]=(1,1,1)): + def _exec(self, prg, kernargs, global_size:Tuple[int,int,int]=(1,1,1), local_size:Tuple[int,int,int]=(1,1,1)): self._invalidate_cache() user_data = [*data64_le(kernargs)] @@ -130,24 +122,19 @@ class HWPM4Queue(HWQueue): self.q += [amd_gpu.PACKET3(amd_gpu.PACKET3_DISPATCH_DIRECT, 3), *global_size, CS_W32_EN | FORCE_START_AT_000 | COMPUTE_SHADER_EN] self.q += [amd_gpu.PACKET3(amd_gpu.PACKET3_EVENT_WRITE, 0), amd_gpu.EVENT_TYPE(7) | amd_gpu.EVENT_INDEX(4)] - return self._mark_command_end() - - def update_exec(self, cmd_idx, global_size, local_size): - # Patch the exec cmd with new launch dims - assert self.q[self.cmd_offsets[cmd_idx] + 60] == amd_gpu.PACKET3(amd_gpu.PACKET3_DISPATCH_DIRECT, 3), f"Command at index {cmd_idx} is not exec" - self.q[self.cmd_offsets[cmd_idx] + 52 : self.cmd_offsets[cmd_idx] + 55] = array.array('I', local_size) - self.q[self.cmd_offsets[cmd_idx] + 61 : self.cmd_offsets[cmd_idx] + 64] = array.array('I', global_size) + def _update_exec(self, cmd_idx, global_size, local_size): + self._patch(cmd_idx, offset=52, data=local_size) + self._patch(cmd_idx, offset=61, data=global_size) if (dp:=self.ptr_to_dispatch_packet.get(cmd_idx)) is not None: dp.workgroup_size_x, dp.workgroup_size_y, dp.workgroup_size_z = local_size[0], local_size[1], local_size[2] dp.grid_size_x, dp.grid_size_y, dp.grid_size_z = global_size[0]*local_size[0], global_size[1]*local_size[1], global_size[2]*local_size[2] - def wait(self, signal:hsa.amd_signal_t, value=0): + def _wait(self, signal:hsa.amd_signal_t, value=0): addr = ctypes.addressof(signal) + SIGNAL_VALUE_OFFSET self.q += [amd_gpu.PACKET3(amd_gpu.PACKET3_WAIT_REG_MEM, 5), amd_gpu.WAIT_REG_MEM_MEM_SPACE(1) | amd_gpu.WAIT_REG_MEM_OPERATION(0) | amd_gpu.WAIT_REG_MEM_FUNCTION(WAIT_REG_MEM_FUNCTION_GEQ) | \ amd_gpu.WAIT_REG_MEM_ENGINE(0), *data64_le(addr), value, 0xffffffff, 4] - return self._mark_command_end() def _release_mem(self, mem_event_type, mem_data_sel, mem_int_sel, address, value=0, cst=0, cache_flush=False): cache_flush_flags = 0 @@ -164,34 +151,29 @@ class HWPM4Queue(HWQueue): amd_gpu.PACKET3_RELEASE_MEM_DATA_SEL(mem_data_sel) | amd_gpu.PACKET3_RELEASE_MEM_INT_SEL(mem_int_sel) | amd_gpu.PACKET3_RELEASE_MEM_DST_SEL(0), *data64_le(address), *data64_le(value), cst] - def timestamp(self, sig): + def _timestamp(self, signal): self._release_mem(CACHE_FLUSH_AND_INV_TS_EVENT, mem_data_sel=3, mem_int_sel=0, - address=ctypes.addressof(sig) + getattr(hsa.amd_signal_t, 'start_ts').offset) - return self._mark_command_end() + address=ctypes.addressof(signal) + getattr(hsa.amd_signal_t, 'start_ts').offset) - def signal(self, signal:hsa.amd_signal_t, value=0): + def _signal(self, signal:hsa.amd_signal_t, value=0): # NOTE: this needs an EOP buffer on the queue or it will NULL pointer self._release_mem(CACHE_FLUSH_AND_INV_TS_EVENT, mem_data_sel=1, mem_int_sel=2, address=ctypes.addressof(signal) + SIGNAL_VALUE_OFFSET, value=value, cache_flush=True) if signal.event_mailbox_ptr != 0: self._release_mem(CACHE_FLUSH_AND_INV_TS_EVENT, mem_data_sel=1, mem_int_sel=2, address=signal.event_mailbox_ptr, value=signal.event_id, cst=signal.event_id, cache_flush=True) - return self._mark_command_end() - def update_wait(self, cmd_idx, signal=None, value=None): - assert self.q[self.cmd_offsets[cmd_idx]] == amd_gpu.PACKET3(amd_gpu.PACKET3_WAIT_REG_MEM, 5), f"Command at index {cmd_idx} is not wait" - if signal is not None: self._patch(self.cmd_offsets[cmd_idx] + 2, [*data64_le(ctypes.addressof(signal) + SIGNAL_VALUE_OFFSET)]) - if value is not None: self.q[self.cmd_offsets[cmd_idx] + 4] = value - return self + def _update_wait(self, cmd_idx, signal=None, value=None): + if signal is not None: self._patch(cmd_idx, offset=2, data=data64_le(ctypes.addressof(signal) + SIGNAL_VALUE_OFFSET)) + if value is not None: self._patch(cmd_idx, offset=4, data=[value]) - def update_signal(self, cmd_idx, signal=None, value=None): - assert self.q[self.cmd_offsets[cmd_idx]] == amd_gpu.PACKET3(amd_gpu.PACKET3_RELEASE_MEM, 6), f"Command at index {cmd_idx} is not signal" - if signal is not None: - self._patch(self.cmd_offsets[cmd_idx] + 3, [*data64_le(ctypes.addressof(signal) + SIGNAL_VALUE_OFFSET)]) - if self.cmd_offsets[cmd_idx + 1] - self.cmd_offsets[cmd_idx] > 8: # has trap info - self._patch(self.cmd_offsets[cmd_idx] + 8 + 3, [*data64_le(signal.event_mailbox_ptr), *data64_le(signal.event_id), signal.event_id]) - if value is not None: self._patch(self.cmd_offsets[cmd_idx] + 5, [*data64_le(value)]) - return self + def _update_signal(self, cmd_idx, signal=None, value=None): + if signal is not None: self._patch(cmd_idx, offset=3, data=data64_le(ctypes.addressof(signal) + SIGNAL_VALUE_OFFSET)) + if value is not None: self._patch(cmd_idx, offset=5, data=data64_le(value)) + + # Check if the signal command has mailptr part + if signal is not None and self.cmds_len[cmd_idx] > 8: + self._patch(cmd_idx, offset=11, data=[*data64_le(signal.event_mailbox_ptr), *data64_le(signal.event_id), signal.event_id]) def bind(self, device: AMDDevice): self.binded_device = device @@ -203,7 +185,7 @@ class HWPM4Queue(HWQueue): len(self.q) | amd_gpu.INDIRECT_BUFFER_VALID] self.q = hw_view # type: ignore - def submit(self, device: AMDDevice): + def _submit(self, device): cmds = self.indirect_cmd if device == self.binded_device else self.q for i, value in enumerate(cmds): device.compute_queue.ring[(device.compute_queue.put_value + i) % len(device.compute_queue.ring)] = value @@ -211,10 +193,9 @@ class HWPM4Queue(HWQueue): device.compute_queue.put_value += len(cmds) device.compute_queue.write_ptr[0] = device.compute_queue.put_value device.compute_queue.doorbell[0] = device.compute_queue.put_value - return self SDMA_MAX_COPY_SIZE = 0x400000 -class HWCopyQueue(HWQueue): +class AMDCopyQueue(HWCopyQueue): def __init__(self): self.internal_cmd_sizes, self.copy_cmds_per_copy = [], {} super().__init__() @@ -223,13 +204,13 @@ class HWCopyQueue(HWQueue): self.q += arr self.internal_cmd_sizes.append(len(arr)) - def copy(self, dest, src, copy_size): + def _copy(self, dest, src, copy_size): # Invalidate cache inv self._q([amd_gpu.SDMA_OP_GCR_REQ, 0, amd_gpu.SDMA_GCR_GLM_INV | amd_gpu.SDMA_GCR_GLK_INV | amd_gpu.SDMA_GCR_GLK_WB | amd_gpu.SDMA_GCR_GLV_INV | \ amd_gpu.SDMA_GCR_GL1_INV | amd_gpu.SDMA_GCR_GL2_WB | amd_gpu.SDMA_GCR_GL2_INV, 0, 0]) copied, copy_commands = 0, (copy_size + SDMA_MAX_COPY_SIZE - 1) // SDMA_MAX_COPY_SIZE - self.copy_cmds_per_copy[len(self)] = copy_commands + self.copy_cmds_per_copy[len(self) - 1] = copy_commands for _ in range(copy_commands): step_copy_size = min(copy_size - copied, SDMA_MAX_COPY_SIZE) @@ -241,48 +222,33 @@ class HWCopyQueue(HWQueue): # Invalidate cache wb self._q([amd_gpu.SDMA_OP_GCR_REQ, 0, amd_gpu.SDMA_GCR_GLK_WB | amd_gpu.SDMA_GCR_GL2_WB, 0, 0]) - return self._mark_command_end() - - def update_copy(self, cmd_idx, dest=None, src=None): + def _update_copy(self, cmd_idx, dest=None, src=None): for i in range(self.copy_cmds_per_copy[cmd_idx]): - if src is not None: self.q[(sigoff:=self.cmd_offsets[cmd_idx]+8+i*7):sigoff+2] = array.array('I', [*data64_le(src + SDMA_MAX_COPY_SIZE*i)]) - if dest is not None: self.q[(sigoff:=self.cmd_offsets[cmd_idx]+10+i*7):sigoff+2] = array.array('I', [*data64_le(dest + SDMA_MAX_COPY_SIZE*i)]) - return self + if src is not None: self._patch(cmd_idx, offset=8+i*7, data=[*data64_le(src + SDMA_MAX_COPY_SIZE*i)]) + if dest is not None: self._patch(cmd_idx, offset=10+i*7, data=[*data64_le(dest + SDMA_MAX_COPY_SIZE*i)]) - def signal(self, signal: hsa.amd_signal_t, value=0): + def _signal(self, signal: hsa.amd_signal_t, value=0): self._q([amd_gpu.SDMA_OP_FENCE | amd_gpu.SDMA_PKT_FENCE_HEADER_MTYPE(3), *data64_le(ctypes.addressof(signal) + SIGNAL_VALUE_OFFSET), value]) if signal.event_mailbox_ptr != 0: self._q([amd_gpu.SDMA_OP_FENCE | amd_gpu.SDMA_PKT_FENCE_HEADER_MTYPE(3), *data64_le(signal.event_mailbox_ptr), signal.event_id]) self._q([amd_gpu.SDMA_OP_TRAP, amd_gpu.SDMA_PKT_TRAP_INT_CONTEXT_INT_CONTEXT(signal.event_id)]) - return self._mark_command_end() - - def wait(self, signal: hsa.amd_signal_t, value=0): + def _wait(self, signal: hsa.amd_signal_t, value=0): self._q([amd_gpu.SDMA_OP_POLL_REGMEM | amd_gpu.SDMA_PKT_POLL_REGMEM_HEADER_FUNC(WAIT_REG_MEM_FUNCTION_GEQ) | \ amd_gpu.SDMA_PKT_POLL_REGMEM_HEADER_MEM_POLL(1), *data64_le(ctypes.addressof(signal) + SIGNAL_VALUE_OFFSET), value, 0xffffffff, amd_gpu.SDMA_PKT_POLL_REGMEM_DW5_INTERVAL(0x04) | amd_gpu.SDMA_PKT_POLL_REGMEM_DW5_RETRY_COUNT(0xfff)]) - return self._mark_command_end() + def _update_signal(self, cmd_idx, signal=None, value=None): return self._update_wait(cmd_idx, signal, value) # the same offsets and commands + def _update_wait(self, cmd_idx, signal=None, value=None): + if signal is not None: self._patch(cmd_idx, offset=1, data=data64_le(ctypes.addressof(signal) + SIGNAL_VALUE_OFFSET)) + if value is not None: self._patch(cmd_idx, offset=3, data=[value]) - def update_signal(self, cmd_idx, signal=None, value=None): - assert self.q[self.cmd_offsets[cmd_idx]] & 0xf == amd_gpu.SDMA_OP_FENCE, f"Command at index {cmd_idx} is not signal" - if signal is not None: self._patch(self.cmd_offsets[cmd_idx] + 1, [*data64_le(ctypes.addressof(signal) + SIGNAL_VALUE_OFFSET)]) - if value is not None: self.q[self.cmd_offsets[cmd_idx] + 3] = value - return self - - def update_wait(self, cmd_idx, signal=None, value=None): - assert self.q[self.cmd_offsets[cmd_idx]] & 0xf == amd_gpu.SDMA_OP_POLL_REGMEM, f"Command at index {cmd_idx} is not wait" - if signal is not None: self._patch(self.cmd_offsets[cmd_idx] + 1, [*data64_le(ctypes.addressof(signal) + SIGNAL_VALUE_OFFSET)]) - if value is not None: self.q[self.cmd_offsets[cmd_idx] + 3] = value - return self - - def timestamp(self, sig: hsa.amd_signal_t): + def _timestamp(self, signal:hsa.amd_signal_t): self._q([amd_gpu.SDMA_OP_TIMESTAMP | amd_gpu.SDMA_PKT_TIMESTAMP_GET_HEADER_SUB_OP(amd_gpu.SDMA_SUBOP_TIMESTAMP_GET_GLOBAL), - *data64_le(ctypes.addressof(sig) + getattr(hsa.amd_signal_t, 'start_ts').offset)]) - return self._mark_command_end() + *data64_le(ctypes.addressof(signal) + getattr(hsa.amd_signal_t, 'start_ts').offset)]) - def submit(self, device: AMDDevice): + def _submit(self, device): if device.sdma_queue.put_value - device.sdma_queue.read_ptr[0] > device.sdma_queue.ring.nbytes: raise RuntimeError("SDMA queue overrun") tail_blit_dword = 0 @@ -304,7 +270,6 @@ class HWCopyQueue(HWQueue): device.sdma_queue.write_ptr[0] = device.sdma_queue.put_value device.sdma_queue.doorbell[0] = device.sdma_queue.put_value - return self SHT_PROGBITS, SHF_ALLOC = 0x1, 0x2 class AMDProgram: @@ -351,7 +316,7 @@ class AMDProgram: self.prog_addr = self.lib_gpu.va_addr + entry_point + code.kernel_code_entry_byte_offset - HWPM4Queue().memory_barrier().submit(self.device) + AMDComputeQueue().memory_barrier().submit(self.device) # NOTE: no programs are ever freed def __del__(self): @@ -371,7 +336,7 @@ class AMDProgram: for i in range(len(args)): args_st.__setattr__(f'f{i}', args[i].va_addr) for i in range(len(vals)): args_st.__setattr__(f'v{i}', vals[i]) - q = HWPM4Queue().wait(self.device.timeline_signal, self.device.timeline_value - 1).memory_barrier() + q = AMDComputeQueue().wait(self.device.timeline_signal, self.device.timeline_value - 1).memory_barrier() with hcq_profile(self.device, queue=q, desc=self.name, enabled=wait or PROFILE) as (sig_st, sig_en): q.exec(self, self.device.kernargs_ptr, global_size, local_size) @@ -520,8 +485,9 @@ class AMDDevice(HCQCompatCompiled): self.compute_queue = self._alloc_queue(kfd.KFD_IOC_QUEUE_TYPE_COMPUTE, 0x100000, ctx_save_restore_size=0x2C02000, eop_buffer_size=0x1000) self.sdma_queue = self._alloc_queue(kfd.KFD_IOC_QUEUE_TYPE_SDMA, 0x100000) - super().__init__(device, AMDAllocator(self), AMDRenderer(), AMDCompiler(self.arch), functools.partial(AMDProgram, self), HWPM4Queue, HWCopyQueue, - timeline_signals=[self._alloc_signal(sync_event=sync_event), self._alloc_signal(sync_event=kio.create_event(AMDDevice.kfd, auto_reset=1))]) + timeline_signals=[self._alloc_signal(sync_event=sync_event), self._alloc_signal(sync_event=kio.create_event(AMDDevice.kfd, auto_reset=1))] + super().__init__(device, AMDAllocator(self), AMDRenderer(), AMDCompiler(self.arch), functools.partial(AMDProgram, self), + AMDComputeQueue, AMDCopyQueue, timeline_signals) def _gpu2cpu_time(self, gpu_time, is_copy): if is_copy: return self.copy_cpu_start_time + (gpu_time - self.copy_gpu_start_time) / 1e2 diff --git a/tinygrad/runtime/ops_nv.py b/tinygrad/runtime/ops_nv.py index 7dfad7a3d7..cf90734496 100644 --- a/tinygrad/runtime/ops_nv.py +++ b/tinygrad/runtime/ops_nv.py @@ -1,8 +1,9 @@ from __future__ import annotations import os, ctypes, contextlib, pathlib, re, fcntl, functools, mmap, struct, tempfile, hashlib, subprocess, time, array -from typing import Tuple, List, Any +from typing import Tuple, List, Any, cast from dataclasses import dataclass -from tinygrad.device import HCQCompatCompiled, HCQCompatAllocator, HCQCompatAllocRes, Compiler, CompileError, BufferOptions, hcq_profile +from tinygrad.device import HCQCompatCompiled, HCQCompatAllocator, HCQCompatAllocRes, HWCommandQueue, HWComputeQueue, HWCopyQueue, hcq_command, \ + hcq_profile, Compiler, CompileError, BufferOptions from tinygrad.helpers import getenv, from_mv, mv_address, init_c_struct_t, to_mv, round_up, to_char_p_p, DEBUG, prod, PROFILE from tinygrad.renderer.cstyle import NVRenderer from tinygrad.runtime.ops_cuda import check as cuda_check, _get_bytes, CUDACompiler, PTXCompiler, PTX @@ -90,38 +91,26 @@ class NVPTXCompiler(NVCompiler): raise CompileError(f"compile failed: {_get_bytes(handle, nvrtc.nvJitLinkGetErrorLog, nvrtc.nvJitLinkGetErrorLogSize, jitlink_check).decode()}") return _get_bytes(handle, nvrtc.nvJitLinkGetLinkedCubin, nvrtc.nvJitLinkGetLinkedCubinSize, jitlink_check) -class HWQueue: - def __init__(self): self.q, self.binded_device, self.cmd_offsets = [], None, [0] +class NVCommandQueue(HWCommandQueue): # pylint: disable=abstract-method def __del__(self): if self.binded_device is not None: self.binded_device.synchronize() # Synchronize to ensure the buffer is no longer in use. self.binded_device._gpu_free(self.hw_page) - def _mark_command_end(self): - self.cmd_offsets.append(len(self.q)) - return self - def __len__(self): return len(self.cmd_offsets) - 1 - - def memory_barrier(self): return self._mark_command_end() - - def wait(self, signal, value=0): + def _wait(self, signal, value=0): self.q += [nvmethod(0, nv_gpu.NVC56F_SEM_ADDR_LO, 5), *nvdata64_le(ctypes.addressof(from_mv(signal))), *nvdata64_le(value), (3 << 0) | (1 << 24)] # ACQUIRE | PAYLOAD_SIZE_64BIT - return self._mark_command_end() - def timestamp(self, signal): return HWQueue.signal(self, signal, timestamp=True) - - def signal(self, signal, value=0, timestamp=False): + def _signal(self, signal, value=0, timestamp=False): self.q += [nvmethod(0, nv_gpu.NVC56F_SEM_ADDR_LO, 5), *nvdata64_le(ctypes.addressof(from_mv(signal))), *nvdata64_le(value), (1 << 0) | (1 << 20) | (1 << 24) | ((1 << 25) if timestamp else 0)] # RELEASE | RELEASE_WFI | PAYLOAD_SIZE_64BIT | RELEASE_TIMESTAMP self.q += [nvmethod(0, nv_gpu.NVC56F_NON_STALL_INTERRUPT, 1), 0x0] - return self._mark_command_end() + def _timestamp(self, signal): return NVCommandQueue._signal(self, signal, timestamp=True) - def update_signal(self, cmd_idx, signal=None, value=None): return self.update_wait(cmd_idx, signal, value) # the same offsets and commands - def update_wait(self, cmd_idx, signal=None, value=None): - if signal is not None: self.q[(sigoff:=self.cmd_offsets[cmd_idx]+1):sigoff+2] = array.array('I', [*nvdata64_le(mv_address(signal))]) - if value is not None: self.q[(valoff:=self.cmd_offsets[cmd_idx]+3):valoff+2] = array.array('I', [*nvdata64_le(value)]) - return self + def _update_signal(self, cmd_idx, signal=None, value=None): return self._update_wait(cmd_idx, signal, value) # the same offsets and commands + def _update_wait(self, cmd_idx, signal=None, value=None): + if signal is not None: self.q[(sigoff:=self.cmds_offset[cmd_idx]+1):sigoff+2] = array.array('I', nvdata64_le(mv_address(signal))) + if value is not None: self.q[(valoff:=self.cmds_offset[cmd_idx]+3):valoff+2] = array.array('I', nvdata64_le(value)) def bind(self, device: NVDevice): self.binded_device = device @@ -132,7 +121,7 @@ class HWQueue: # From now on, the queue is on the device for faster submission. self.q = hw_view # type: ignore - def _submit(self, dev, gpfifo:GPFifo): + def _submit_to_gpfifo(self, dev, gpfifo:GPFifo): if len(self.q) == 0: return if dev == self.binded_device: cmdq_addr = self.hw_page.va_addr @@ -151,29 +140,31 @@ class HWQueue: dev.gpu_mmio[0x90 // 4] = gpfifo.token gpfifo.put_value += 1 -class HWComputeQueue(HWQueue): +class NVComputeQueue(NVCommandQueue, HWComputeQueue): def __init__(self): self.cmd_idx_to_qmd, self.cmd_idx_to_global_dims, self.cmd_idx_to_local_dims = {}, {}, {} super().__init__() + @hcq_command def copy_from_cpu(self, gpuaddr, data): self.q += [nvmethod(1, nv_gpu.NVC6C0_OFFSET_OUT_UPPER, 2), *nvdata64(gpuaddr)] self.q += [nvmethod(1, nv_gpu.NVC6C0_LINE_LENGTH_IN, 2), len(data)*4, 0x1] self.q += [nvmethod(1, nv_gpu.NVC6C0_LAUNCH_DMA, 1), 0x41] self.q += [nvmethod(1, nv_gpu.NVC6C0_LOAD_INLINE_DATA, len(data), typ=6)] + list(data) - return self._mark_command_end() - def exec(self, prg, kernargs, global_size=(1,1,1), local_size=(1,1,1)): + def _exec(self, prg, kernargs, global_size, local_size): + cmd_idx = len(self) - 1 + ctypes.memmove(qmd_addr:=(kernargs + round_up(prg.constbuf_0_size, 1 << 8)), ctypes.addressof(prg.qmd), 0x40 * 4) - self.cmd_idx_to_qmd[len(self)] = qmd = qmd_struct_t.from_address(qmd_addr) # Save qmd for later update - self.cmd_idx_to_global_dims[len(self)] = to_mv(qmd_addr + nv_gpu.NVC6C0_QMDV03_00_CTA_RASTER_WIDTH[1] // 8, 12).cast('I') - self.cmd_idx_to_local_dims[len(self)] = to_mv(qmd_addr + nv_gpu.NVC6C0_QMDV03_00_CTA_THREAD_DIMENSION0[1] // 8, 6).cast('H') + self.cmd_idx_to_qmd[cmd_idx] = qmd = qmd_struct_t.from_address(qmd_addr) # Save qmd for later update + self.cmd_idx_to_global_dims[cmd_idx] = to_mv(qmd_addr + nv_gpu.NVC6C0_QMDV03_00_CTA_RASTER_WIDTH[1] // 8, 12).cast('I') + self.cmd_idx_to_local_dims[cmd_idx] = to_mv(qmd_addr + nv_gpu.NVC6C0_QMDV03_00_CTA_THREAD_DIMENSION0[1] // 8, 6).cast('H') qmd.cta_raster_width, qmd.cta_raster_height, qmd.cta_raster_depth = global_size qmd.cta_thread_dimension0, qmd.cta_thread_dimension1, qmd.cta_thread_dimension2 = local_size qmd.constant_buffer_addr_upper_0, qmd.constant_buffer_addr_lower_0 = nvdata64(kernargs) - if (prev_qmd:=self.cmd_idx_to_qmd.get(len(self) - 1)) is None: + if (prev_qmd:=self.cmd_idx_to_qmd.get(cmd_idx - 1)) is None: self.q += [nvmethod(1, nv_gpu.NVC6C0_INVALIDATE_SHADER_CACHES_NO_WFI, 1), (1 << 12) | (1 << 4) | (1 << 0)] self.q += [nvmethod(1, nv_gpu.NVC6C0_SEND_PCAS_A, 0x1), qmd_addr >> 8] self.q += [nvmethod(1, nv_gpu.NVC6C0_SEND_SIGNALING_PCAS2_B, 0x1), 9] @@ -182,52 +173,45 @@ class HWComputeQueue(HWQueue): prev_qmd.dependent_qmd0_action = 1 prev_qmd.dependent_qmd0_prefetch = 1 prev_qmd.dependent_qmd0_enable = 1 - return self._mark_command_end() - def update_exec(self, cmd_idx, global_size, local_size): + def _update_exec(self, cmd_idx, global_size, local_size): # Patch the exec cmd with new launch dims self.cmd_idx_to_global_dims[cmd_idx][:] = array.array('I', global_size) self.cmd_idx_to_local_dims[cmd_idx][:] = array.array('H', local_size) - def signal(self, signal, value=0): - if (prev_qmd:=self.cmd_idx_to_qmd.get(len(self) - 1)) is None or prev_qmd.release0_enable == 1: return super().signal(signal, value) + def _signal(self, signal, value=0): + if (prev_qmd:=self.cmd_idx_to_qmd.get(len(self) - 2)) is None or prev_qmd.release0_enable == 1: return super()._signal(signal, value) prev_qmd.release0_address_upper, prev_qmd.release0_address_lower = nvdata64(ctypes.addressof(from_mv(signal))) prev_qmd.release0_payload_upper, prev_qmd.release0_payload_lower = nvdata64(value) prev_qmd.release0_enable = 1 - self.cmd_idx_to_qmd[len(self)] = prev_qmd # this command is embedded into qmd. - return self._mark_command_end() + self.cmd_idx_to_qmd[len(self) - 1] = prev_qmd # this command is embedded into qmd. - def update_signal(self, cmd_idx, signal=None, value=None): - if (qmd:=self.cmd_idx_to_qmd.get(cmd_idx)) is None: return super().update_signal(cmd_idx, signal, value) + def _update_signal(self, cmd_idx, signal=None, value=None): + if (qmd:=self.cmd_idx_to_qmd.get(cmd_idx)) is None: return super()._update_signal(cmd_idx, signal, value) if signal is not None: qmd.release0_address_upper, qmd.release0_address_lower = nvdata64(ctypes.addressof(from_mv(signal))) if value is not None: qmd.release0_payload_upper, qmd.release0_payload_lower = nvdata64(value) - return self - def submit(self, dev:NVDevice): self._submit(dev, dev.compute_gpfifo) + def _submit(self, device): self._submit_to_gpfifo(device, cast(NVDevice, device).compute_gpfifo) -class HWCopyQueue(HWQueue): - def copy(self, dest, src, copy_size): +class NVCopyQueue(NVCommandQueue, HWCopyQueue): + def _copy(self, dest, src, copy_size): self.q += [nvmethod(4, nv_gpu.NVC6B5_OFFSET_IN_UPPER, 4), *nvdata64(src), *nvdata64(dest)] self.q += [nvmethod(4, nv_gpu.NVC6B5_LINE_LENGTH_IN, 1), copy_size] self.q += [nvmethod(4, nv_gpu.NVC6B5_LAUNCH_DMA, 1), 0x182] # TRANSFER_TYPE_NON_PIPELINED | DST_MEMORY_LAYOUT_PITCH | SRC_MEMORY_LAYOUT_PITCH - return self._mark_command_end() - def update_copy(self, cmd_idx, dest=None, src=None): - if dest is not None: self.q[(sigoff:=self.cmd_offsets[cmd_idx]+3):sigoff+2] = array.array('I', [*nvdata64(dest)]) - if src is not None: self.q[(sigoff:=self.cmd_offsets[cmd_idx]+1):sigoff+2] = array.array('I', [*nvdata64(src)]) - return self + def _update_copy(self, cmd_idx, dest=None, src=None): + if dest is not None: self._patch(cmd_idx, offset=3, data=nvdata64(dest)) + if src is not None: self._patch(cmd_idx, offset=1, data=nvdata64(src)) - def signal(self, signal, value=0): + def _signal(self, signal, value=0): self.q += [nvmethod(4, nv_gpu.NVC6B5_SET_SEMAPHORE_A, 4), *nvdata64(ctypes.addressof(from_mv(signal))), value, 4] self.q += [nvmethod(4, nv_gpu.NVC6B5_LAUNCH_DMA, 1), 0x14] - return self._mark_command_end() - def update_signal(self, cmd_idx, signal=None, value=None): - if signal is not None: self.q[(sigoff:=self.cmd_offsets[cmd_idx]+1):sigoff+2] = array.array('I', [*nvdata64(mv_address(signal))]) - if value is not None: self.q[self.cmd_offsets[cmd_idx]+3] = value - return self + def _update_signal(self, cmd_idx, signal=None, value=None): + if signal is not None: self._patch(cmd_idx, offset=1, data=nvdata64(mv_address(signal))) + if value is not None: self._patch(cmd_idx, offset=3, data=[value]) - def submit(self, dev:NVDevice): self._submit(dev, dev.dma_gpfifo) + def _submit(self, device): self._submit_to_gpfifo(device, cast(NVDevice, device).dma_gpfifo) SHT_PROGBITS, SHT_NOBITS, SHF_ALLOC, SHF_EXECINSTR = 0x1, 0x8, 0x2, 0x4 class NVProgram: @@ -306,12 +290,12 @@ class NVProgram: elif self.rel_info[rel_i+2] == 0x38: self.program[self.rel_info[rel_i]//4 + 1] = (global_init_addr & 0xffffffff) # R_CUDA_ABS32_LO_32 else: raise RuntimeError(f"unknown reloc: {self.rel_info[rel_i+2]}") - HWComputeQueue().wait(self.device.timeline_signal, self.device.timeline_value - 1).submit(self.device) + NVComputeQueue().wait(self.device.timeline_signal, self.device.timeline_value - 1).submit(self.device) for st in range(0, len(self.program), 4095): - HWComputeQueue().copy_from_cpu(self.lib_gpu.va_addr+st*4, self.program[st:st+4095]).submit(self.device) + NVComputeQueue().copy_from_cpu(self.lib_gpu.va_addr+st*4, self.program[st:st+4095]).submit(self.device) if self.global_init is not None: - HWComputeQueue().copy_from_cpu(load_addr:=(self.lib_gpu.va_addr + off), self.global_init).submit(self.device) + NVComputeQueue().copy_from_cpu(load_addr:=(self.lib_gpu.va_addr + off), self.global_init).submit(self.device) off += round_up(self.global_init.nbytes, 128) if 4 in constant_buffers_data: # >= 12.4 # Constbuffer 4 contains a pointer to nv.global.init, load section and set up the pointer. @@ -324,10 +308,10 @@ class NVProgram: self.qmd.__setattr__(f'constant_buffer_size_shifted4_{i}', data.nbytes) self.qmd.__setattr__(f'constant_buffer_valid_{i}', 1) - HWComputeQueue().copy_from_cpu(self.lib_gpu.va_addr + off, data).submit(self.device) + NVComputeQueue().copy_from_cpu(self.lib_gpu.va_addr + off, data).submit(self.device) off += round_up(data.nbytes, 128) - HWComputeQueue().signal(self.device.timeline_signal, self.device.timeline_value).submit(self.device) + NVComputeQueue().signal(self.device.timeline_signal, self.device.timeline_value).submit(self.device) self.device.timeline_value += 1 self.device.synchronize() @@ -346,7 +330,7 @@ class NVProgram: if MOCKGPU: self.constbuffer_0[0:2] = [len(args), len(vals)] kernargs = [arg_half for arg in args for arg_half in nvdata64_le(arg.va_addr)] + list(vals) - q = HWComputeQueue().wait(self.device.timeline_signal, self.device.timeline_value - 1) \ + q = NVComputeQueue().wait(self.device.timeline_signal, self.device.timeline_value - 1) \ .copy_from_cpu(self.device.kernargs_ptr, self.constbuffer_0 + kernargs) with hcq_profile(self.device, queue=q, desc=self.name, enabled=wait or PROFILE) as (sig_st, sig_en): @@ -561,7 +545,7 @@ class NVDevice(HCQCompatCompiled): compiler_t = (PTXCompiler if PTX else CUDACompiler) if MOCKGPU else (NVPTXCompiler if PTX else NVCompiler) super().__init__(device, NVAllocator(self), PTXRenderer(self.arch, device="NV") if PTX else NVRenderer(self.arch), compiler_t(self.arch), - functools.partial(NVProgram, self), HWComputeQueue, HWCopyQueue, timeline_signals=[self._alloc_signal(), self._alloc_signal()]) + functools.partial(NVProgram, self), NVComputeQueue, NVCopyQueue, timeline_signals=[self._alloc_signal(), self._alloc_signal()]) self._cmdq_setup_compute_gpfifo() self._cmdq_setup_dma_gpfifo() @@ -624,7 +608,7 @@ class NVDevice(HCQCompatCompiled): # Set windows addresses to not collide with other allocated buffers. self.shared_mem_window, self.local_mem_window, self.slm_per_thread = 0xfe000000, 0xff000000, 0 - queue = HWComputeQueue() + queue = NVComputeQueue() queue.q += [nvmethod(1, nv_gpu.NVC6C0_SET_OBJECT, 1), self.compute_class] queue.q += [nvmethod(1, nv_gpu.NVC6C0_SET_SHADER_LOCAL_MEMORY_WINDOW_A, 2), *nvdata64(self.local_mem_window)] queue.q += [nvmethod(1, nv_gpu.NVC6C0_SET_SHADER_SHARED_MEMORY_WINDOW_A, 2), *nvdata64(self.shared_mem_window)] @@ -634,7 +618,7 @@ class NVDevice(HCQCompatCompiled): self.synchronize() def _cmdq_setup_dma_gpfifo(self): - queue = HWCopyQueue() + queue = NVCopyQueue() queue.q += [nvmethod(4, nv_gpu.NVC6C0_SET_OBJECT, 1), nv_gpu.AMPERE_DMA_COPY_B] queue.signal(self.timeline_signal, self.timeline_value).submit(self) self.timeline_value += 1 @@ -652,7 +636,7 @@ class NVDevice(HCQCompatCompiled): bytes_per_tpc = round_up(bytes_per_warp * 48 * 2, 0x8000) self.shader_local_mem = self._gpu_alloc(round_up(bytes_per_tpc * 64, 0x20000), huge_page=True, contig=True) - queue = HWComputeQueue() + queue = NVComputeQueue() queue.q += [nvmethod(1, nv_gpu.NVC6C0_SET_SHADER_LOCAL_MEMORY_A, 2), *nvdata64(self.shader_local_mem.va_addr)] queue.q += [nvmethod(1, nv_gpu.NVC6C0_SET_SHADER_LOCAL_MEMORY_NON_THROTTLED_A, 3), *nvdata64(bytes_per_tpc), 0x40] queue.signal(self.timeline_signal, self.timeline_value).submit(self)