amd timeline semaphores (#4416)

* amd timeline semaphores

* v2

* fixes

* reset signals

* fix

* rollover test

* small fixes

* linter

* copyin
This commit is contained in:
nimlgen
2024-05-07 11:17:32 +03:00
committed by GitHub
parent 17faae091b
commit e3bb85fd0e
2 changed files with 155 additions and 123 deletions

View File

@@ -6,9 +6,10 @@ from tinygrad.runtime.ops_amd import AMDDevice, HWCopyQueue, HWPM4Queue
def _time_queue(q, d):
st = time.perf_counter()
q.signal(d.completion_signal)
q.signal(d.timeline_signal, d.timeline_value)
q.submit(d)
d._wait_signal(d.completion_signal)
d._wait_signal(d.timeline_signal, d.timeline_value)
d.timeline_value += 1
return time.perf_counter() - st
class TestHCQ(unittest.TestCase):
@@ -31,64 +32,80 @@ class TestHCQ(unittest.TestCase):
def setUp(self):
TestHCQ.a.lazydata.buffer.copyin(memoryview(bytearray(struct.pack("ff", 0, 1))))
TestHCQ.b.lazydata.buffer.copyin(memoryview(bytearray(struct.pack("ff", 0, 0))))
TestHCQ.d0.synchronize() # wait for copyins to complete
def test_run_1000_times_one_submit(self):
temp_signal, temp_value = TestHCQ.d0._get_signal(value=0), 0
q = TestHCQ.compute_queue()
for _ in range(1000):
q.exec(TestHCQ.runner.clprg, TestHCQ.d0.kernargs_ptr, TestHCQ.runner.global_size, TestHCQ.runner.local_size)
q.signal(temp_signal, temp_value + 1).wait(temp_signal, temp_value + 1)
temp_value += 1
q.exec(TestHCQ.runner.clprg, TestHCQ.d0.kernargs_ptr+len(TestHCQ.addr), TestHCQ.runner.global_size, TestHCQ.runner.local_size)
q.signal(TestHCQ.d0.completion_signal)
q.signal(temp_signal, temp_value + 1).wait(temp_signal, temp_value + 1)
temp_value += 1
q.signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
q.submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.completion_signal)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
TestHCQ.d0.timeline_value += 1
assert (val:=TestHCQ.a.lazydata.buffer.as_buffer().cast("f")[0]) == 2000.0, f"got val {val}"
def test_run_1000_times(self):
temp_signal = TestHCQ.d0._get_signal(value=0)
q = TestHCQ.compute_queue()
q.exec(TestHCQ.runner.clprg, TestHCQ.d0.kernargs_ptr, TestHCQ.runner.global_size, TestHCQ.runner.local_size)
q.signal(temp_signal, 2).wait(temp_signal, 2)
q.exec(TestHCQ.runner.clprg, TestHCQ.d0.kernargs_ptr+len(TestHCQ.addr), TestHCQ.runner.global_size,
TestHCQ.runner.local_size).signal(TestHCQ.d0.completion_signal)
TestHCQ.runner.local_size)
for _ in range(1000):
temp_signal.value = 1
q.submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.completion_signal)
TestHCQ.d0.completion_signal.value = 1
# confirm signal was reset
with self.assertRaises(RuntimeError):
TestHCQ.d0._wait_signal(TestHCQ.d0.completion_signal, timeout=50)
TestHCQ.compute_queue().signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
TestHCQ.d0.timeline_value += 1
assert (val:=TestHCQ.a.lazydata.buffer.as_buffer().cast("f")[0]) == 2000.0, f"got val {val}"
def test_run_to_3(self):
temp_signal = TestHCQ.d0._get_signal(value=0)
q = TestHCQ.compute_queue()
q.exec(TestHCQ.runner.clprg, TestHCQ.d0.kernargs_ptr, TestHCQ.runner.global_size, TestHCQ.runner.local_size)
q.signal(temp_signal, 1).wait(temp_signal, 1)
q.exec(TestHCQ.runner.clprg, TestHCQ.d0.kernargs_ptr+len(TestHCQ.addr), TestHCQ.runner.global_size, TestHCQ.runner.local_size)
q.exec(TestHCQ.runner.clprg, TestHCQ.d0.kernargs_ptr, TestHCQ.runner.global_size, TestHCQ.runner.local_size).signal(TestHCQ.d0.completion_signal)
q.submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.completion_signal)
q.signal(temp_signal, 2).wait(temp_signal, 2)
q.exec(TestHCQ.runner.clprg, TestHCQ.d0.kernargs_ptr, TestHCQ.runner.global_size, TestHCQ.runner.local_size)
q.signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
TestHCQ.d0.timeline_value += 1
assert (val:=TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[0]) == 3.0, f"got val {val}"
def test_wait_signal(self):
TestHCQ.d0.completion_signal.value = 1
TestHCQ.compute_queue().wait(TestHCQ.d0.completion_signal).signal(TestHCQ.d0.completion_signal).submit(TestHCQ.d0)
temp_signal = TestHCQ.d0._get_signal(value=0)
TestHCQ.compute_queue().wait(temp_signal, value=1).signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
with self.assertRaises(RuntimeError):
TestHCQ.d0._wait_signal(TestHCQ.d0.completion_signal, timeout=50)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value, timeout=50)
# clean up
TestHCQ.d0.completion_signal.value = 0
TestHCQ.d0._wait_signal(TestHCQ.d0.completion_signal, timeout=1000, skip_check=True)
temp_signal.value = 1
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value, timeout=100)
TestHCQ.d0.timeline_value += 1
def test_wait_copy_signal(self):
TestHCQ.d0.completion_signal.value = 1
HWCopyQueue().wait(TestHCQ.d0.completion_signal).signal(TestHCQ.d0.completion_signal).submit(TestHCQ.d0)
temp_signal = TestHCQ.d0._get_signal(value=0)
HWCopyQueue().wait(temp_signal, value=1).signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
with self.assertRaises(RuntimeError):
TestHCQ.d0._wait_signal(TestHCQ.d0.completion_signal, timeout=50)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value, timeout=50)
# clean up
TestHCQ.d0.completion_signal.value = 0
TestHCQ.d0._wait_signal(TestHCQ.d0.completion_signal, timeout=1000, skip_check=True)
temp_signal.value = 1
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value, timeout=100)
TestHCQ.d0.timeline_value += 1
def test_run_normal(self):
q = TestHCQ.compute_queue()
q.exec(TestHCQ.runner.clprg, TestHCQ.d0.kernargs_ptr, TestHCQ.runner.global_size, TestHCQ.runner.local_size)
q.signal(TestHCQ.d0.completion_signal)
q.submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.completion_signal)
q.signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
TestHCQ.d0.timeline_value += 1
assert (val:=TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[0]) == 1.0, f"got val {val}"
def test_submit_empty_queues(self):
@@ -96,46 +113,53 @@ class TestHCQ(unittest.TestCase):
HWCopyQueue().submit(TestHCQ.d0)
def test_signal_timeout(self):
TestHCQ.d0.completion_signal.value = 1
with self.assertRaises(RuntimeError):
TestHCQ.d0._wait_signal(TestHCQ.d0.completion_signal, timeout=50)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value, timeout=50)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value + 122, timeout=50)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value - 1, timeout=50)
def test_signal(self):
TestHCQ.compute_queue().signal(TestHCQ.d0.completion_signal).submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.completion_signal)
new_timeline_value = TestHCQ.d0.timeline_value + 0xff
TestHCQ.compute_queue().signal(TestHCQ.d0.timeline_signal, new_timeline_value).submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, new_timeline_value)
TestHCQ.d0.timeline_value = new_timeline_value + 1 # update to not break runtime
def test_copy_signal(self):
HWCopyQueue().signal(TestHCQ.d0.completion_signal).submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.completion_signal)
new_timeline_value = TestHCQ.d0.timeline_value + 0xff
HWCopyQueue().signal(TestHCQ.d0.timeline_signal, new_timeline_value).submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, new_timeline_value)
TestHCQ.d0.timeline_value = new_timeline_value + 1 # update to not break runtime
def test_run_signal(self):
q = TestHCQ.compute_queue()
q.exec(TestHCQ.runner.clprg, TestHCQ.d0.kernargs_ptr, TestHCQ.runner.global_size, TestHCQ.runner.local_size)
q.signal(TestHCQ.d0.completion_signal)
q.signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
q.submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.completion_signal)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
TestHCQ.d0.timeline_value += 1
assert (val:=TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[0]) == 1.0, f"got val {val}"
def test_copy_1000_times(self):
q = HWCopyQueue()
q.copy(TestHCQ.a.lazydata.buffer._buf.va_addr, TestHCQ.b.lazydata.buffer._buf.va_addr, 8)
q.copy(TestHCQ.b.lazydata.buffer._buf.va_addr, TestHCQ.a.lazydata.buffer._buf.va_addr, 8)
q.signal(TestHCQ.d0.completion_signal)
for i in range(1000):
for _ in range(1000):
q.submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.completion_signal)
TestHCQ.d0.completion_signal.value = 1
# confirm signal was reset
HWCopyQueue().signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
TestHCQ.d0.timeline_value += 1
# confirm the signal didn't exceed the put value
with self.assertRaises(RuntimeError):
TestHCQ.d0._wait_signal(TestHCQ.d0.completion_signal, timeout=50)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value + 1, timeout=50)
assert (val:=TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[1]) == 0.0, f"got val {val}"
def test_copy(self):
q = HWCopyQueue()
q.copy(TestHCQ.b.lazydata.buffer._buf.va_addr, TestHCQ.a.lazydata.buffer._buf.va_addr, 8)
q.signal(TestHCQ.d0.completion_signal)
q.signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
q.submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.completion_signal)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
TestHCQ.d0.timeline_value += 1
assert (val:=TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[1]) == 1.0, f"got val {val}"
def test_copy_bandwidth(self):
@@ -166,27 +190,45 @@ class TestHCQ(unittest.TestCase):
q = TestHCQ.compute_queue()
qc = HWCopyQueue()
q.exec(TestHCQ.runner.clprg, TestHCQ.d0.kernargs_ptr, TestHCQ.runner.global_size, TestHCQ.runner.local_size) # b = [1, 2]
q.signal(sig:=AMDDevice._get_signal(10))
qc.wait(sig)
q.signal(sig:=AMDDevice._get_signal(value=0), value=1)
qc.wait(sig, value=1)
qc.copy(TestHCQ.a.lazydata.buffer._buf.va_addr, TestHCQ.b.lazydata.buffer._buf.va_addr, 8)
qc.signal(TestHCQ.d0.completion_signal)
sig.value = 1
qc.signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
qc.submit(TestHCQ.d0)
time.sleep(0.02) # give it time for the wait to fail
q.submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.completion_signal)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
TestHCQ.d0.timeline_value += 1
assert (val:=TestHCQ.a.lazydata.buffer.as_buffer().cast("f")[0]) == 1.0, f"got val {val}"
def test_cross_device_signal(self):
d1 = Device["AMD:1"]
q1 = TestHCQ.compute_queue()
q2 = TestHCQ.compute_queue()
q1.signal(TestHCQ.d0.completion_signal)
q2.wait(TestHCQ.d0.completion_signal)
q1.signal(sig:=AMDDevice._get_signal(value=0), value=0xfff)
q2.wait(sig, value=0xfff)
q2.signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
q2.submit(TestHCQ.d0)
q1.signal(d1.timeline_signal, d1.timeline_value)
q1.submit(d1)
TestHCQ.d0._wait_signal(TestHCQ.d0.completion_signal)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
TestHCQ.d0.timeline_value += 1
d1._wait_signal(d1.timeline_signal, d1.timeline_value)
d1.timeline_value += 1
def test_timeline_signal_rollover(self):
TestHCQ.d0.timeline_value = (1 << 32) - 20 # close value to reset
TestHCQ.compute_queue().signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value - 1).submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value - 1)
for _ in range(40):
q = TestHCQ.compute_queue()
q.wait(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value - 1)
q.exec(TestHCQ.runner.clprg, TestHCQ.d0.kernargs_ptr, TestHCQ.runner.global_size, TestHCQ.runner.local_size)
q.signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
TestHCQ.d0.timeline_value += 1
assert (val:=TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[0]) == 1.0, f"got val {val}"
if __name__ == "__main__":
unittest.main()

View File

@@ -1,6 +1,6 @@
from __future__ import annotations
from typing import Tuple, List, Any
import os, fcntl, ctypes, functools, re, pathlib, mmap, struct, errno, subprocess
from typing import Tuple, List, Any, cast
import os, fcntl, ctypes, functools, re, pathlib, mmap, struct, errno, subprocess, time
from tinygrad.device import Compiled, LRUAllocator, Compiler, CompilerOptions
from tinygrad.buffer import BufferOptions
from tinygrad.helpers import getenv, from_mv, init_c_struct_t, to_mv, round_up, DEBUG
@@ -105,6 +105,9 @@ regBIF_BX_PF1_GPU_HDP_FLUSH_DONE = 0x0107
CACHE_FLUSH_AND_INV_TS_EVENT = 0x14
CS_PARTIAL_FLUSH = 0x7
WAIT_REG_MEM_FUNCTION_EQ = 3 # ==
WAIT_REG_MEM_FUNCTION_GEQ = 5 # >=
COMPUTE_SHADER_EN = 1
FORCE_START_AT_000 = 1 << 2
CS_W32_EN = 1 << 15
@@ -157,8 +160,8 @@ class HWPM4Queue:
def hdp_flush(self):
self.q += [amd_gpu.PACKET3(amd_gpu.PACKET3_WAIT_REG_MEM, 5),
amd_gpu.WAIT_REG_MEM_MEM_SPACE(0) | amd_gpu.WAIT_REG_MEM_OPERATION(1) | amd_gpu.WAIT_REG_MEM_FUNCTION(3) | amd_gpu.WAIT_REG_MEM_ENGINE(0),
regBIF_BX_PF1_GPU_HDP_FLUSH_REQ, regBIF_BX_PF1_GPU_HDP_FLUSH_DONE, 0x0, 0x0, 0x20]
amd_gpu.WAIT_REG_MEM_MEM_SPACE(0) | amd_gpu.WAIT_REG_MEM_OPERATION(1) | amd_gpu.WAIT_REG_MEM_FUNCTION(WAIT_REG_MEM_FUNCTION_EQ) | \
amd_gpu.WAIT_REG_MEM_ENGINE(0), regBIF_BX_PF1_GPU_HDP_FLUSH_REQ, regBIF_BX_PF1_GPU_HDP_FLUSH_DONE, 0x0, 0x0, 0x20]
def invalidate_cache(self):
# overkill?
@@ -206,16 +209,13 @@ class HWPM4Queue:
self.q += [amd_gpu.PACKET3(amd_gpu.PACKET3_DISPATCH_DIRECT, 3),
global_size[0],global_size[1],global_size[2], CS_W32_EN | FORCE_START_AT_000 | COMPUTE_SHADER_EN]
# have to self wait since flush doesn't work
self.signal(sig:=AMDDevice._get_signal())
self.wait(sig)
return self
def wait(self, signal:hsa.amd_signal_t, value=0):
addr = ctypes.addressof(signal) + SIGNAL_VALUE_OFFSET
self.q += [amd_gpu.PACKET3(amd_gpu.PACKET3_WAIT_REG_MEM, 5),
amd_gpu.WAIT_REG_MEM_MEM_SPACE(1) | amd_gpu.WAIT_REG_MEM_OPERATION(0) | amd_gpu.WAIT_REG_MEM_FUNCTION(3) | amd_gpu.WAIT_REG_MEM_ENGINE(0),
addr&0xFFFFFFFF, addr>>32, value, 0xffffffff, 4]
amd_gpu.WAIT_REG_MEM_MEM_SPACE(1) | amd_gpu.WAIT_REG_MEM_OPERATION(0) | amd_gpu.WAIT_REG_MEM_FUNCTION(WAIT_REG_MEM_FUNCTION_GEQ) | \
amd_gpu.WAIT_REG_MEM_ENGINE(0), addr&0xFFFFFFFF, addr>>32, value, 0xffffffff, 4]
return self
def timestamp(self, addr):
@@ -228,8 +228,6 @@ class HWPM4Queue:
return self
def signal(self, signal:hsa.amd_signal_t, value=0):
#assert signal.value == 0, f"entering signal without it being set to 0, but {signal.value}"
signal.value = 1
# NOTE: this needs an EOP buffer on the queue or it will NULL pointer
addr = ctypes.addressof(signal) + SIGNAL_VALUE_OFFSET
self.q += [amd_gpu.PACKET3(amd_gpu.PACKET3_RELEASE_MEM, 6),
@@ -306,18 +304,16 @@ class HWCopyQueue:
return self
def signal(self, signal:hsa.amd_signal_t, value=0):
#assert signal.value == 0
signal.value = 1
self.q.append(sdma_pkts.fence(op=amd_gpu.SDMA_OP_FENCE, mtype=3, addr=ctypes.addressof(signal) + SIGNAL_VALUE_OFFSET, data=value))
if signal.event_mailbox_ptr != 0:
self.q.append(sdma_pkts.fence(op=amd_gpu.SDMA_OP_FENCE, mtype=3, addr=signal.event_mailbox_ptr, data=signal.event_id))
self.q.append(sdma_pkts.trap(op=amd_gpu.SDMA_OP_TRAP, int_ctx=signal.event_id))
return self
def wait(self, signal:hsa.amd_signal_t):
self.q.append(sdma_pkts.poll_regmem(op=amd_gpu.SDMA_OP_POLL_REGMEM, mem_poll=1, func=0x3,
def wait(self, signal:hsa.amd_signal_t, value=0):
self.q.append(sdma_pkts.poll_regmem(op=amd_gpu.SDMA_OP_POLL_REGMEM, mem_poll=1, func=WAIT_REG_MEM_FUNCTION_GEQ,
addr=ctypes.addressof(signal) + SIGNAL_VALUE_OFFSET,
value=0, mask=0xffffffff, interval=0x04, retry_count=0xfff))
value=value, mask=0xffffffff, interval=0x04, retry_count=0xfff))
return self
class AMDProgram:
@@ -339,8 +335,6 @@ class AMDProgram:
for _, sh_type, sh_flags, sh_addr, sh_offset, sh_size, _, _, _ in sections:
if sh_type == SHT_PROGBITS and sh_flags & SHF_ALLOC: lib_gpu_view[sh_addr:sh_addr+sh_size] = self.lib[sh_offset:sh_offset+sh_size]
self.device._submit_cache_inv(gli=2)
entry_point = min(sh[3] for sh in sections if sh[1] == SHT_PROGBITS and sh[2] & SHF_ALLOC)
self.handle = self.lib_gpu.va_addr + entry_point
self.group_segment_size = lib_gpu_view.cast("I")[entry_point//4]
@@ -349,6 +343,8 @@ class AMDProgram:
assert self.private_segment_size <= self.device.max_private_segment_size, \
f"{self.private_segment_size=} > {self.device.max_private_segment_size=}"
HWPM4Queue().invalidate_cache().submit(self.device)
# NOTE: no programs are ever freed
def __del__(self):
if hasattr(self, 'lib_gpu'): self.device._gpu_free(self.lib_gpu)
@@ -367,24 +363,25 @@ class AMDProgram:
for i in range(len(vals)): args_st.__setattr__(f'v{i}', vals[i])
q = HWPM4Queue()
if wait: q.timestamp(ctypes.addressof(self.device.completion_signal) + getattr(hsa.amd_signal_t, 'start_ts').offset)
q.wait(self.device.timeline_signal, self.device.timeline_value - 1)
if wait: q.timestamp(ctypes.addressof(self.device.timeline_signal) + getattr(hsa.amd_signal_t, 'start_ts').offset)
q.exec(self, self.device.kernargs_ptr, global_size, local_size)
if wait:
q.timestamp(ctypes.addressof(self.device.completion_signal) + getattr(hsa.amd_signal_t, 'end_ts').offset)
q.signal(self.device.completion_signal)
q.submit(self.device)
if wait: q.timestamp(ctypes.addressof(self.device.timeline_signal) + getattr(hsa.amd_signal_t, 'end_ts').offset)
q.signal(self.device.timeline_signal, self.device.timeline_value).submit(self.device)
self.device.timeline_value += 1
self.device.kernargs_ptr += self.kernargs_segment_size
if wait:
self.device._wait_signal(self.device.completion_signal)
#assert (wp:=self.device.amd_aql_queue.write_dispatch_id) == (rp:=self.device.amd_aql_queue.read_dispatch_id), f"didn't run {wp} != {rp}"
return (self.device.completion_signal.end_ts-self.device.completion_signal.start_ts)/1e8
self.device._wait_signal(self.device.timeline_signal, self.device.timeline_value - 1)
return (self.device.timeline_signal.end_ts - self.device.timeline_signal.start_ts) / 1e8
class AMDAllocator(LRUAllocator):
def __init__(self, device:AMDDevice):
self.device = device
# NOTE: KFD_IOC_ALLOC_MEM_FLAGS_GTT doesn't work here for readinto
self.b = [self.device._gpu_alloc(SDMA_MAX_COPY_SIZE*4, kfd.KFD_IOC_ALLOC_MEM_FLAGS_USERPTR, public=True) for _ in range(2)]
self.b = [self.device._gpu_alloc(SDMA_MAX_COPY_SIZE, kfd.KFD_IOC_ALLOC_MEM_FLAGS_USERPTR, public=True) for _ in range(16)]
self.b_timeline = [0] * len(self.b)
self.b_next = 0
super().__init__()
def _alloc(self, size:int, options:BufferOptions):
@@ -420,25 +417,34 @@ class AMDAllocator(LRUAllocator):
def copyin(self, dest, src: memoryview):
for i in range(0, src.nbytes, self.b[0].size):
ctypes.memmove(self.b[1].va_addr, from_mv(src[i:]), lsize:=min(self.b[0].size, src.nbytes-i))
if i != 0: self.device._wait_signal(self.device.signal_sdma)
self.b = self.b[::-1]
self.device._submit_sdma(dest.va_addr+i, self.b[0].va_addr, lsize, completion_signal=self.device.signal_sdma)
self.device._wait_signal(self.device.signal_sdma)
self.b_next = (self.b_next + 1) % len(self.b)
AMDDevice._wait_signal(self.device.timeline_signal, self.b_timeline[self.b_next])
ctypes.memmove(self.b[self.b_next].va_addr, from_mv(src[i:]), lsize:=min(self.b[self.b_next].size, src.nbytes-i))
HWCopyQueue().wait(self.device.timeline_signal, self.device.timeline_value - 1) \
.copy(dest.va_addr+i, self.b[self.b_next].va_addr, lsize) \
.signal(self.device.timeline_signal, self.device.timeline_value).submit(self.device)
self.b_timeline[self.b_next] = self.device.timeline_value
self.device.timeline_value += 1
def copyout(self, dest:memoryview, src):
self.device.synchronize()
for i in range(0, dest.nbytes, self.b[0].size):
self.device._submit_sdma(self.b[0].va_addr, src.va_addr+i, lsize:=min(self.b[0].size, dest.nbytes-i), completion_signal=self.device.signal_sdma)
self.device._wait_signal(self.device.signal_sdma)
HWCopyQueue().wait(self.device.timeline_signal, self.device.timeline_value - 1) \
.copy(self.b[0].va_addr, src.va_addr+i, lsize:=min(self.b[0].size, dest.nbytes-i)) \
.signal(self.device.timeline_signal, self.device.timeline_value).submit(self.device)
AMDDevice._wait_signal(self.device.timeline_signal, self.device.timeline_value)
self.device.timeline_value += 1
ctypes.memmove(from_mv(dest[i:]), self.b[0].va_addr, lsize)
def transfer(self, dest, src, sz:int, src_dev:AMDDevice, dest_dev:AMDDevice):
dest_dev._gpu_map(src)
q = HWPM4Queue().signal(sig := AMDDevice._get_signal())
HWCopyQueue().wait(sig).copy(dest.va_addr, src.va_addr, sz).signal(sigc := AMDDevice._get_signal()).submit(dest_dev)
HWPM4Queue().wait(sigc).submit(dest_dev)
q.wait(sigc).submit(src_dev)
src_dev._gpu_map(dest)
HWCopyQueue().wait(src_dev.timeline_signal, src_dev.timeline_value - 1) \
.wait(dest_dev.timeline_signal, dest_dev.timeline_value - 1) \
.copy(dest.va_addr, src.va_addr, sz) \
.signal(src_dev.timeline_signal, src_dev.timeline_value).submit(src_dev)
HWPM4Queue().wait(src_dev.timeline_signal, src_dev.timeline_value).submit(dest_dev)
src_dev.timeline_value += 1
MAP_FIXED, MAP_NORESERVE = 0x10, 0x400
class AMDDevice(Compiled):
@@ -480,14 +486,14 @@ class AMDDevice(Compiled):
kio.free_memory_of_gpu(self.kfd, handle=mem.handle)
@classmethod
def _get_signal(self, num=None, sync_event=None) -> hsa.amd_signal_t:
def _get_signal(self, num=None, sync_event=None, value=0) -> hsa.amd_signal_t:
if num is None:
num = AMDDevice.signal_number
AMDDevice.signal_number += 1
if AMDDevice.signal_number == SIGNAL_COUNT: AMDDevice.signal_number = 16
#print("signal", num)
ret = hsa.amd_signal_t.from_address(AMDDevice.signals_page.va_addr + SIGNAL_SIZE*num)
ret.value = 0
ret.value = value
ret.kind = hsa.AMD_SIGNAL_KIND_USER
if sync_event is not None:
ret.event_mailbox_ptr = AMDDevice.event_page.va_addr + sync_event.event_slot_index*8
@@ -495,16 +501,15 @@ class AMDDevice(Compiled):
return ret
@classmethod
def _wait_signal(self, signal:hsa.amd_signal_t, timeout=10000, skip_check=False):
def _wait_signal(self, signal:hsa.amd_signal_t, value=0, timeout=10000):
assert signal.event_id != 0, "can't wait on this signal"
evt_arr = (kfd.struct_kfd_event_data * 1)()
evt_arr[0].event_id = signal.event_id
ret = kio.wait_events(AMDDevice.kfd, events_ptr=ctypes.addressof(evt_arr), num_events=1, wait_for_all=1, timeout=timeout)
if ret.wait_result != 0: raise RuntimeError(f"wait_result: {ret.wait_result}, {timeout} ms TIMEOUT!")
evt_arr = (kfd.struct_kfd_event_data)(event_id=signal.event_id)
#val = signal.value
#while val != 0: val = signal.value
assert skip_check or signal.value == 0, f"not set to 0, but {signal.value}"
start_time = time.time() * 1000
while (time.time() * 1000 - start_time) < timeout:
if signal.value >= value: return
kio.wait_events(AMDDevice.kfd, events_ptr=ctypes.addressof(evt_arr), num_events=1, wait_for_all=1, timeout=100)
raise RuntimeError(f"wait_signal: not set to {value}, but {signal.value}, {timeout} ms TIMEOUT!")
def __init__(self, device:str=""):
if AMDDevice.kfd == -1:
@@ -527,8 +532,9 @@ class AMDDevice(Compiled):
self._gpu_map(AMDDevice.event_page)
sync_event = kio.create_event(AMDDevice.kfd, auto_reset=1)
self.completion_signal = AMDDevice._get_signal(self.device_id*2, sync_event=sync_event)
self.signal_sdma = AMDDevice._get_signal(self.device_id*2+1, sync_event=kio.create_event(AMDDevice.kfd, auto_reset=1))
self.timeline_value: int = 1
self.timeline_signal = AMDDevice._get_signal(self.device_id*2, sync_event=sync_event)
self._shadow_timeline_signal = AMDDevice._get_signal(self.device_id*2+1, sync_event=kio.create_event(AMDDevice.kfd, auto_reset=1))
self.kernargs = self._gpu_alloc(0x1000000, kfd.KFD_IOC_ALLOC_MEM_FLAGS_VRAM)
self.kernargs_ptr = self.kernargs.va_addr
@@ -576,28 +582,12 @@ class AMDDevice(Compiled):
super().__init__(device, AMDAllocator(self), AMDCompiler(self.arch), functools.partial(AMDProgram, self))
def _submit_sdma(self, dest, src, copy_size, wait_signals=None, completion_signal=None):
q = HWCopyQueue()
if wait_signals is not None:
# NOTE: we check only low 32 bits to be zeroed, we don't use higher values for signals
for sig in wait_signals: q.wait(ctypes.addressof(sig) + getattr(hsa.amd_signal_t, 'value').offset)
if completion_signal is not None: q.timestamp(ctypes.addressof(completion_signal) + getattr(hsa.amd_signal_t, 'start_ts').offset)
q.copy(dest, src, copy_size)
if completion_signal is not None: q.timestamp(ctypes.addressof(completion_signal) + getattr(hsa.amd_signal_t, 'end_ts').offset)
if completion_signal is not None: q.signal(completion_signal)
q.submit(self)
def _submit_cache_inv(self, addr=0x0, sz=(1 << 64)-1, gli=0, glv=0, glk=0, gl1=0, gl2=0):
HWPM4Queue().invalidate_cache().signal(self.completion_signal).submit(self)
self._wait_signal(self.completion_signal)
assert (wp:=(self.pm4_write_pointer[0]%(self.pm4_ring.size//4))) == (rp:=self.pm4_read_pointer[0]), \
f"didn't run {wp} != {rp} len {self.pm4_ring.size//4}"
def synchronize(self):
HWPM4Queue().signal(self.completion_signal).submit(self)
self._wait_signal(self.completion_signal)
assert (wp:=(self.pm4_write_pointer[0]%(self.pm4_ring.size//4))) == (rp:=self.pm4_read_pointer[0]), \
f"didn't run {wp} != {rp} len {self.pm4_ring.size//4}"
AMDDevice._wait_signal(self.timeline_signal, self.timeline_value - 1)
# reset kernargs
self.kernargs_ptr = self.kernargs.va_addr
if self.timeline_value > (1 << 31):
self.timeline_signal, self._shadow_timeline_signal = self._shadow_timeline_signal, self.timeline_signal
self.timeline_signal.value, self.timeline_value = 0, 1
cast(AMDAllocator, self.allocator).b_timeline = [0] * len(cast(AMDAllocator, self.allocator).b)