mirror of
https://github.com/tinygrad/tinygrad.git
synced 2026-01-10 07:28:15 -05:00
cleanup lin tests without Kernel (#12041)
* cleanup lin tests without Kernel * no kernel.py there * remove that test
This commit is contained in:
@@ -1,60 +0,0 @@
|
||||
import unittest
|
||||
from tinygrad import Tensor, Device
|
||||
from tinygrad.helpers import prod
|
||||
from tinygrad.uop.ops import AxisType
|
||||
from tinygrad.codegen.opt.heuristic import hand_coded_optimizations
|
||||
|
||||
# TODO: remove this
|
||||
from tinygrad.codegen.opt.kernel import Kernel
|
||||
from test.test_linearizer import push_views, helper_linearizer_opt
|
||||
|
||||
class TestHandCodedOpts(unittest.TestCase):
|
||||
def test_masked_upcast(self):
|
||||
layer_1 = Tensor.cat(*[Tensor.empty(5) for _ in range(4)])
|
||||
layer_2 = Tensor.cat(layer_1.unsqueeze(0), Tensor.empty(6, 20))
|
||||
|
||||
s = layer_2.schedule()[-1]
|
||||
k = Kernel(push_views(s.ast))
|
||||
k.apply_opts(hand_coded_optimizations(k))
|
||||
assert len(k.bufs) == 6 # make sure all ops are done in one kernel
|
||||
# masked upcast should upcast masked axis of size 7
|
||||
# masked upcast should not upcast large (20) last axis
|
||||
# float4/other hcopt shouldn't upcast last axis, since we already have 7 upcast, and the last axis is not very contiguous
|
||||
assert k.upcasted == 1 and k.full_shape[-1] == 7
|
||||
|
||||
@unittest.skipIf(Device.DEFAULT in {"METAL", "WEBGPU"}, "METAL/WEBGPU split this kernel since it has 37 buffers")
|
||||
def test_masked_upcast_wino(self):
|
||||
monster = Tensor.stack(*[Tensor.stack(*[Tensor.empty(16) for _ in range(6)]) for _ in range(6)])
|
||||
|
||||
s = monster.schedule()[-1]
|
||||
k = Kernel(push_views(s.ast))
|
||||
k.apply_opts(hand_coded_optimizations(k))
|
||||
assert len(k.bufs) == 37 # make sure all ops are done in one kernel
|
||||
# should upcast the two Tensor.stacks
|
||||
assert k.upcasted >= 2 and k.full_shape[k.shape_len-k.upcasted:k.shape_len].count(6) == 2
|
||||
|
||||
def test_masked_upcast_many(self):
|
||||
layer_1 = Tensor.cat(Tensor.rand(3, 4), Tensor.rand(4, 4))
|
||||
layer_2 = Tensor.cat(layer_1.unsqueeze(0), Tensor.rand(6, 7, 4))
|
||||
layer_3 = Tensor.cat(layer_2.unsqueeze(0), Tensor.rand(6, 7, 7, 4))
|
||||
|
||||
k = helper_linearizer_opt(layer_3)[-1]
|
||||
assert len(k.bufs) == 5 # make sure all ops are done in one kernel
|
||||
# check that we don't do too many upcasts
|
||||
assert prod(k.full_shape[k.shape_len-k.upcasted:k.shape_len]) <= 49
|
||||
|
||||
@unittest.skipUnless(Device[Device.DEFAULT].renderer.has_local, "test requires locals")
|
||||
def test_matvec(self):
|
||||
N = 128
|
||||
a = Tensor.rand(1, N).realize()
|
||||
b = Tensor.rand(N, N).realize()
|
||||
c = a @ b
|
||||
|
||||
k = helper_linearizer_opt(c)[-1]
|
||||
|
||||
assert k.group_for_reduces == 1
|
||||
assert k.axis_types.count(AxisType.LOCAL) == 1
|
||||
assert k.upcasted == 1
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@@ -152,8 +152,9 @@ class TestTensorCores(unittest.TestCase):
|
||||
tc = Device[Device.DEFAULT].renderer.tensor_cores[0]
|
||||
x, y = Tensor.rand(128, 128, dtype=tc.dtype_in), Tensor.rand(128, 128, dtype=tc.dtype_in)
|
||||
r = x.matmul(y, dtype=tc.dtype_out)
|
||||
k = helper_linearizer_opt(r, [[Opt(OptOps.UNROLL, 0, 4)]], apply_tc=True, atol=3e-2, rtol=1e-3)[-1]
|
||||
for u in get_program(k.ast, k.opts, k.applied_opts).uops:
|
||||
opts = [Opt(OptOps.UNROLL, 0, 4)]
|
||||
ast = helper_linearizer_opt(r, [opts], apply_tc=True, atol=3e-2, rtol=1e-3)
|
||||
for u in get_program(ast, opts=opts).uops:
|
||||
if u.op is Ops.WMMA:
|
||||
assert u.src[-1].src[0].op != Ops.STORE
|
||||
|
||||
@@ -164,8 +165,9 @@ class TestTensorCores(unittest.TestCase):
|
||||
tc = [tc for tc in Device[Device.DEFAULT].renderer.tensor_cores if tc.dtype_in != tc.dtype_out][0]
|
||||
x, y = Tensor.rand(128, 128, dtype=tc.dtype_in), Tensor.rand(128, 128, dtype=tc.dtype_in)
|
||||
r = x.matmul(y, dtype=tc.dtype_out)
|
||||
k = helper_linearizer_opt(r, [[Opt(OptOps.UNROLL, 0, 4)]], apply_tc=True, atol=3e-2, rtol=1e-3)[-1]
|
||||
for u in get_program(k.ast, k.opts, k.applied_opts).uops:
|
||||
opts = [Opt(OptOps.UNROLL, 0, 4)]
|
||||
ast = helper_linearizer_opt(r, [opts], apply_tc=True, atol=3e-2, rtol=1e-3)
|
||||
for u in get_program(ast, opts=opts).uops:
|
||||
if u.op is Ops.WMMA:
|
||||
#assert u.src[-1].dtype == dtypes.float.vec(prod(tc.thread_local_sizes[2]))
|
||||
assert u.src[-1].src[0].op != Ops.STORE
|
||||
@@ -178,8 +180,9 @@ class TestTensorCores(unittest.TestCase):
|
||||
tc = [tc for tc in Device[Device.DEFAULT].renderer.tensor_cores if tc.dtype_in != tc.dtype_out][0]
|
||||
x, y = Tensor.rand(128, 128, dtype=tc.dtype_in), Tensor.rand(128, 128, dtype=tc.dtype_in)
|
||||
r = x.matmul(y, dtype=tc.dtype_out).relu()
|
||||
k = helper_linearizer_opt(r, [[Opt(OptOps.UNROLL, 0, 4)]], apply_tc=True, atol=3e-2, rtol=1e-3)[-1]
|
||||
for u in get_program(k.ast, k.opts, k.applied_opts).uops:
|
||||
opts = [Opt(OptOps.UNROLL, 0, 4)]
|
||||
ast = helper_linearizer_opt(r, [opts], apply_tc=True, atol=3e-2, rtol=1e-3)
|
||||
for u in get_program(ast, opts=opts).uops:
|
||||
if u.op is Ops.WMMA:
|
||||
#assert u.src[-1].dtype == dtypes.float.vec(prod(tc.thread_local_sizes[2]))
|
||||
assert u.src[-1].src[0].op != Ops.STORE
|
||||
|
||||
@@ -10,14 +10,10 @@ from tinygrad.shape.shapetracker import ShapeTracker
|
||||
from tinygrad.shape.view import View
|
||||
from tinygrad.tensor import Tensor, _to_np_dtype
|
||||
from tinygrad.engine.realize import run_schedule, lower_schedule, CompiledRunner, get_program
|
||||
from tinygrad.codegen.opt.heuristic import hand_coded_optimizations
|
||||
from tinygrad.helpers import Context, getenv, flatten, dedup, TC_SELECT, TC_OPT
|
||||
from tinygrad.dtype import DType, dtypes, PtrDType, AddrSpace
|
||||
from tinygrad.codegen import apply_rewrites, rewrites_for_views
|
||||
|
||||
# TODO: remove this
|
||||
from tinygrad.codegen.opt.kernel import Kernel
|
||||
|
||||
class TestLinearizer(unittest.TestCase):
|
||||
def test_arg_dedup(self):
|
||||
# NOTE: this realize exists because Tensor.numpy calls .contiguous() internally
|
||||
@@ -71,24 +67,24 @@ class TestLinearizer(unittest.TestCase):
|
||||
def test_two_nested_range(self):
|
||||
a = Tensor.randn(2, ).realize()
|
||||
out = a.reshape(2, 1).expand(2, 3).sum()
|
||||
lin = helper_linearizer_opt(out, wanna_output=[np.broadcast_to(a.numpy().reshape(2, 1), (2, 3)).sum()])[0]
|
||||
uops = get_program(lin.ast, lin.opts, []).uops
|
||||
ast = helper_linearizer_opt(out, wanna_output=[np.broadcast_to(a.numpy().reshape(2, 1), (2, 3)).sum()])
|
||||
uops = get_program(ast, opts=[]).uops
|
||||
ranges = [i for i,u in enumerate(uops) if u.op is Ops.RANGE]
|
||||
assert len(ranges) == 1 # NOTE: it collapses now
|
||||
|
||||
def test_three_nested_range(self):
|
||||
a = Tensor.randn(2, ).realize()
|
||||
out = a.reshape(2, 1).expand(2, 3).expand(2, 2, 3).sum()
|
||||
lin = helper_linearizer_opt(out, wanna_output=[np.broadcast_to(np.broadcast_to(a.numpy().reshape(2, 1), (2, 3)), (2, 2, 3)).sum()])[0]
|
||||
uops = get_program(lin.ast, lin.opts, []).uops
|
||||
ast = helper_linearizer_opt(out, wanna_output=[np.broadcast_to(np.broadcast_to(a.numpy().reshape(2, 1), (2, 3)), (2, 2, 3)).sum()])
|
||||
uops = get_program(ast, opts=[]).uops
|
||||
ranges = [i for i,u in enumerate(uops) if u.op is Ops.RANGE]
|
||||
assert len(ranges) == 1 # NOTE: it collapses now
|
||||
|
||||
def test_two_nested_range_alt_indexing(self):
|
||||
a = Tensor([2, 2]).realize()
|
||||
out = a.reshape(2, 1).pad(((1, 1), (1, 1)), value=2).sum()
|
||||
lin = helper_linearizer_opt(out, wanna_output=[24])[0]
|
||||
uops = get_program(lin.ast, lin.opts, []).uops
|
||||
ast = helper_linearizer_opt(out, wanna_output=[24])
|
||||
uops = get_program(ast, opts=[]).uops
|
||||
ranges = [i for i,u in enumerate(uops) if u.op is Ops.RANGE]
|
||||
# RANGE -> ALU -> RANGE -> ALU + LOAD -> STORE
|
||||
assert any(x.op in GroupOp.ALU for x in uops[ranges[0]:ranges[1]])
|
||||
@@ -99,8 +95,8 @@ class TestLinearizer(unittest.TestCase):
|
||||
a = Tensor.randn(4, 1).realize()
|
||||
b = Tensor.randn(1, 1).realize()
|
||||
out = (a + b[0]).sum() + b[0]
|
||||
lin = helper_linearizer_opt(out, wanna_output=[(a.numpy()+b.numpy()[0]).sum()+b.numpy()])[0]
|
||||
uops = get_program(lin.ast, lin.opts, []).uops
|
||||
ast = helper_linearizer_opt(out, wanna_output=[(a.numpy()+b.numpy()[0]).sum()+b.numpy()])
|
||||
uops = get_program(ast, opts=[]).uops
|
||||
ranges = [i for i,u in enumerate(uops) if u.op is Ops.RANGE]
|
||||
# LOAD -> RANGE -> LOAD -> STORE
|
||||
assert len([x for x in uops[:ranges[0]] if x.op is Ops.LOAD]) == 1
|
||||
@@ -109,8 +105,8 @@ class TestLinearizer(unittest.TestCase):
|
||||
a = Tensor.randn(2, ).realize()
|
||||
b = Tensor.randn(1, 1).realize()
|
||||
out = (a.reshape(2, 1).expand(2, 3) + b[0]).sum() + b[0]
|
||||
lin = helper_linearizer_opt(out, wanna_output=[(np.broadcast_to(a.numpy().reshape(2, 1), (2, 3)) + b.numpy()[0]).sum() + b.numpy()])[0]
|
||||
uops = get_program(lin.ast, lin.opts, []).uops
|
||||
ast = helper_linearizer_opt(out, wanna_output=[(np.broadcast_to(a.numpy().reshape(2, 1), (2, 3)) + b.numpy()[0]).sum() + b.numpy()])
|
||||
uops = get_program(ast, opts=[]).uops
|
||||
ranges = [i for i,u in enumerate(uops) if u.op is Ops.RANGE]
|
||||
assert len(ranges) == 1 # NOTE: it collapses now
|
||||
|
||||
@@ -221,9 +217,9 @@ class TestLinearizer(unittest.TestCase):
|
||||
x, y = Tensor.rand(128, 128), Tensor.rand(128, 128)
|
||||
r = (x@y).relu()
|
||||
opt = [Opt(OptOps.UNROLL, 0, 4), Opt(OptOps.UPCAST, 0, 4)]
|
||||
k = helper_linearizer_opt(r, [opt])[-1]
|
||||
ast = helper_linearizer_opt(r, [opt])
|
||||
# the uops graph is DEFINE_REG -> 4x STORE 0.0 -> RANGE -> 4x ALU -> 4x STORE -> ENDRANGE
|
||||
uops = get_program(k.ast, opts=opt).uops
|
||||
uops = get_program(ast, opts=opt).uops
|
||||
begin_range = [i for i, x in enumerate(uops) if x.op is Ops.RANGE][-1]
|
||||
end_range = [i for i, x in enumerate(uops) if x.op is Ops.ENDRANGE][0]
|
||||
for i,u in enumerate(uops): print(i, u.op, [uops.index(s) for s in u.src], u.arg, u.dtype)
|
||||
@@ -313,8 +309,8 @@ class TestLinearizer(unittest.TestCase):
|
||||
def test_default_global_reversed(self):
|
||||
# shrink so that the dims do not collapse
|
||||
t = Tensor.ones(5, 6, 7).contiguous().realize().shrink(((0, 4), (0, 5), (0, 6)))
|
||||
k = helper_linearizer_opt(t+1)[0]
|
||||
uops = get_program(k.ast, k.opts, k.applied_opts).uops
|
||||
ast = helper_linearizer_opt(t+1)
|
||||
uops = get_program(ast, opts=[]).uops
|
||||
idxs = dedup([uop for uop in uops if uop.op is Ops.SPECIAL])
|
||||
idxs = sorted(idxs, key=lambda uop: uop.arg)
|
||||
assert (idxs[0].arg, idxs[0].src[0].arg) == ('gidx0', 6), idxs[0]
|
||||
@@ -351,8 +347,8 @@ class TestLinearizer(unittest.TestCase):
|
||||
|
||||
def test_phi_simplification(self):
|
||||
def helper(t, max_ops=0):
|
||||
k = helper_linearizer_opt(t)[-1]
|
||||
uops = get_program(k.ast).uops
|
||||
ast = helper_linearizer_opt(t)
|
||||
uops = get_program(ast).uops
|
||||
# ignore kernel optimized IF statements for now
|
||||
if if_op:=next((u for u in uops if u.op is Ops.IF), None):
|
||||
uops = uops[:uops.index(if_op)]
|
||||
@@ -384,8 +380,8 @@ class TestLinearizer(unittest.TestCase):
|
||||
x, y = Tensor.randn(64,64), Tensor.randn(64,64)
|
||||
out = x.matmul(y)
|
||||
with Context(TC=0):
|
||||
k = helper_linearizer_opt(out)[-1]
|
||||
uops = get_program(k.ast, k.opts, k.applied_opts).uops
|
||||
ast = helper_linearizer_opt(out)
|
||||
uops = get_program(ast).uops
|
||||
# check that the float4 cast collapses
|
||||
store_vals = [u.src[1] for u in uops if u.op is Ops.STORE and u.src[0].dtype.addrspace != AddrSpace.REG]
|
||||
for val in store_vals:
|
||||
@@ -395,8 +391,8 @@ class TestLinearizer(unittest.TestCase):
|
||||
def test_grouped_store_values(self):
|
||||
x = Tensor.randn((4,3,6,6)).realize()
|
||||
out = x.flip((0,1)).contiguous()
|
||||
k = helper_linearizer_opt(out)[-1]
|
||||
store_val = [u.src[1] for u in get_program(k.ast, k.opts, k.applied_opts).uops if u.op is Ops.STORE][0]
|
||||
ast = helper_linearizer_opt(out)
|
||||
store_val = [u.src[1] for u in get_program(ast).uops if u.op is Ops.STORE][0]
|
||||
assert store_val.dtype == dtypes.float.vec(4) and store_val.op is not Ops.VECTORIZE
|
||||
|
||||
@unittest.skipUnless(Device[Device.DEFAULT].renderer.has_local, "test requires locals")
|
||||
@@ -407,9 +403,9 @@ class TestLinearizer(unittest.TestCase):
|
||||
out = x@y
|
||||
opt = [Opt(OptOps.LOCAL, 0, 4), Opt(OptOps.GROUPTOP, 0, 8),
|
||||
Opt(OptOps.UNROLL, 0, 4), Opt(OptOps.UPCAST, 0, 4), Opt(OptOps.UPCAST, 1, 2)] # upcast accs in both reduces
|
||||
k = helper_linearizer_opt(out, opts=[opt])[-1]
|
||||
ast = helper_linearizer_opt(out, opts=[opt])
|
||||
def get_recursive(uop): return set.union(set(uop.src), [uop], *[get_recursive(v) for v in uop.src])
|
||||
uops = get_program(k.ast, opts=opt).uops
|
||||
uops = get_program(ast, opts=opt).uops
|
||||
local_stores = [u for u in uops if u.op is Ops.STORE and any(x.op is Ops.DEFINE_LOCAL for x in get_recursive(u.src[0]))]
|
||||
global_stores = [u for u in uops if u.op is Ops.STORE and any(x.op is Ops.DEFINE_GLOBAL for x in get_recursive(u.src[0]))]
|
||||
barrier = [u for u in uops if u.op is Ops.BARRIER][0]
|
||||
@@ -428,8 +424,8 @@ class TestLinearizer(unittest.TestCase):
|
||||
def test_grouped_store_local_only(self):
|
||||
x, y = Tensor.rand(1,128), Tensor.rand(128, 128)
|
||||
r = (x@y).relu()
|
||||
k = helper_linearizer_opt(r)[-1]
|
||||
uops = get_program(k.ast).uops
|
||||
ast = helper_linearizer_opt(r)
|
||||
uops = get_program(ast).uops
|
||||
stores = [u for u in uops if u.op is Ops.STORE and u.src[0].dtype.addrspace != AddrSpace.REG]
|
||||
|
||||
# the float4 value stores directly in lds and we skip upcast
|
||||
@@ -496,7 +492,8 @@ def helper_linearizer_ast(ast:UOp, inputs:list[Tensor], *args, **kwargs):
|
||||
|
||||
def helper_linearizer_opt(r:Tensor|list[Tensor], *args, **kwargs):
|
||||
realized_ast, real_bufs = helper_realized_ast(r)
|
||||
return _helper_linearizer_opt_ast(realized_ast, real_bufs, *args, **kwargs)
|
||||
_helper_linearizer_opt_ast(realized_ast, real_bufs, *args, **kwargs)
|
||||
return realized_ast
|
||||
|
||||
def copyout_outputs(outbufs:list[Buffer]) -> list[np.ndarray]:
|
||||
return [np.frombuffer(x.as_buffer(), _to_np_dtype(x.dtype)) for x in outbufs]
|
||||
@@ -505,8 +502,7 @@ def reset_bufs(bufs:list[Buffer]):
|
||||
for buf in bufs: buf.copyin(np.zeros((buf.size, ), dtype=_to_np_dtype(buf.dtype)).data) # Zero to check that all values are filled
|
||||
|
||||
def _helper_linearizer_opt_ast(realized_ast:UOp, real_bufs:list[Buffer], opts=[],
|
||||
apply_tc=False, atol=1e-4, rtol=1e-4, color_sizes=[], wanna_output=[]) -> list[Kernel]:
|
||||
lins: list[Kernel] = []
|
||||
apply_tc=False, atol=1e-4, rtol=1e-4, color_sizes=[], wanna_output=[]):
|
||||
outbufs = [real_bufs[x.src[0].base.arg] for x in realized_ast.src]
|
||||
device = real_bufs[0].device
|
||||
wanna_output = [np.array(x).flatten() for x in wanna_output]
|
||||
@@ -514,17 +510,12 @@ def _helper_linearizer_opt_ast(realized_ast:UOp, real_bufs:list[Buffer], opts=[]
|
||||
def get_prg(opts): return CompiledRunner(replace(get_program(realized_ast, opts=opts), device=device))
|
||||
|
||||
def check_opt(opts):
|
||||
k = Kernel(realized_ast)
|
||||
lins.append(k)
|
||||
k.apply_opts(opts)
|
||||
prg = get_prg(opts=opts)
|
||||
reset_bufs(outbufs)
|
||||
prg.exec(real_bufs)
|
||||
for x,want in zip(copyout_outputs(outbufs), wanna_output): np.testing.assert_allclose(x, want, atol=atol, rtol=rtol)
|
||||
|
||||
# Get baseline if it is not provided, which is not optimized at all.
|
||||
k = Kernel(realized_ast)
|
||||
lins.append(k)
|
||||
prg = get_prg(opts=())
|
||||
prg.exec(real_bufs)
|
||||
if len(wanna_output) == 0: wanna_output = copyout_outputs(outbufs)
|
||||
@@ -532,16 +523,12 @@ def _helper_linearizer_opt_ast(realized_ast:UOp, real_bufs:list[Buffer], opts=[]
|
||||
for buf,want in zip(copyout_outputs(outbufs), wanna_output): np.testing.assert_allclose(buf, want, atol=atol, rtol=rtol)
|
||||
|
||||
# Check correctness of handcoded optimiztions.
|
||||
k = Kernel(realized_ast)
|
||||
k.apply_opts(hand_coded_optimizations(k))
|
||||
lins.append(k)
|
||||
prg = get_prg(opts=None)
|
||||
reset_bufs(outbufs)
|
||||
prg.exec(real_bufs)
|
||||
for buf,want in zip(copyout_outputs(outbufs), wanna_output): np.testing.assert_allclose(buf, want, atol=atol, rtol=rtol)
|
||||
for x in opts: # Check custom transformations if any.
|
||||
check_opt(([Opt(OptOps.TC, 0, (TC_SELECT.value, TC_OPT.value, 1))] if apply_tc else [])+x)
|
||||
return lins
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
@@ -1,56 +0,0 @@
|
||||
import unittest
|
||||
from tinygrad import Tensor, Device
|
||||
from tinygrad.codegen.opt.kernel import Kernel
|
||||
from tinygrad.device import Buffer
|
||||
from tinygrad.codegen.opt.search import get_test_global_size, bufs_from_lin
|
||||
from tinygrad.helpers import GlobalCounters
|
||||
from extra.optimization.helpers import time_linearizer
|
||||
from test.test_linearizer import push_views
|
||||
|
||||
class TestSearchUtil(unittest.TestCase):
|
||||
def test_get_test_global_size(self):
|
||||
self.assertEqual(get_test_global_size([256, 256, 256], 65536, {}), ([256, 16, 16], 256.0))
|
||||
self.assertEqual(get_test_global_size([65536, 1, 1], 256, {}), ([256, 1, 1], 256.0))
|
||||
self.assertEqual(get_test_global_size([77, 1, 1], 16, {}), ([9, 1, 1], 77/9))
|
||||
|
||||
def test_bufs_from_lin(self):
|
||||
a = Tensor([1,2,3,4]).realize()
|
||||
si = (a+1).schedule()[0]
|
||||
rawbufs = bufs_from_lin(Kernel(si.ast))
|
||||
assert len(rawbufs) == 2
|
||||
assert all(r is not None for r in rawbufs)
|
||||
assert all(isinstance(r, Buffer) for r in rawbufs)
|
||||
assert all(r.size > 0 for r in rawbufs)
|
||||
|
||||
def test_bufs_from_lin_alt(self):
|
||||
a = Tensor.randn(4, 4).realize()
|
||||
b = a+a[0]
|
||||
si = b.schedule()[0]
|
||||
rawbufs = bufs_from_lin(Kernel(push_views(si.ast)))
|
||||
assert len(rawbufs) == 2
|
||||
assert all(r is not None for r in rawbufs)
|
||||
assert all(isinstance(r, Buffer) for r in rawbufs)
|
||||
assert all(r.size > 0 for r in rawbufs)
|
||||
|
||||
class TestTimeLinearizer(unittest.TestCase):
|
||||
@unittest.skipIf(Device.DEFAULT == "WEBGPU", "WebGPU timestamps are low precision, tm is 0")
|
||||
def test_reasonable_time(self):
|
||||
a = Tensor([1,2,3,4]).realize()
|
||||
si = (a+1).schedule()[0]
|
||||
# create fresh empty buffers
|
||||
rawbufs = [Buffer(b.device, b.size, b.dtype).allocate() for b in si.bufs]
|
||||
tm = time_linearizer(Kernel(push_views(si.ast)), rawbufs, allow_test_size=False, cnt=10, disable_cache=True)
|
||||
assert tm > 0 and tm != float('inf')
|
||||
|
||||
# Ensure that the kernel count is not incremented by time_linearizer when clearing l2
|
||||
def test_kernel_count(self):
|
||||
ast = Tensor.zeros(16).contiguous().kernelize().uop.src[1].arg.ast
|
||||
lin = Kernel(push_views(ast))
|
||||
bufs = bufs_from_lin(lin)
|
||||
|
||||
kernel_count = GlobalCounters.kernel_count
|
||||
time_linearizer(lin, bufs, allow_test_size=False, cnt=2, disable_cache=True, clear_l2=True)
|
||||
assert GlobalCounters.kernel_count == kernel_count, "kernel count was incremented by time_linearizer"
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user