make disk tensor tests process safe (#14584)

This commit is contained in:
George Hotz
2026-02-06 15:39:55 +08:00
committed by GitHub
parent cf73d7e2a7
commit 3c26ce29b2

View File

@@ -4,9 +4,17 @@ from tinygrad import Tensor, Device, dtypes
from tinygrad.device import is_dtype_supported
from tinygrad.dtype import DType, DTYPES_DICT
from tinygrad.nn.state import safe_load, safe_save, get_state_dict, torch_load
from tinygrad.helpers import Timing, fetch, temp, OSX
from tinygrad.helpers import Timing, fetch, OSX
from test.helpers import slow
class TempDirTestCase(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.TemporaryDirectory()
def tearDown(self):
self.temp_dir.cleanup()
def tmp(self, name:str) -> str:
return (pathlib.Path(self.temp_dir.name) / name).as_posix()
def compare_weights_both(url):
import torch
fn = fetch(url)
@@ -84,7 +92,7 @@ class TestRawDiskBuffer(unittest.TestCase):
pathlib.Path(tmp).unlink()
@unittest.skipUnless(is_dtype_supported(dtypes.uint8), "need uint8")
class TestSafetensors(unittest.TestCase):
class TestSafetensors(TempDirTestCase):
def test_real_safetensors(self):
import torch
from safetensors.torch import save_file
@@ -95,19 +103,19 @@ class TestSafetensors(unittest.TestCase):
"weight3": torch.arange(0, 17, dtype=torch.int32).reshape(17,1,1),
"weight4": torch.arange(0, 2, dtype=torch.uint8),
}
save_file(tensors, temp("real.safetensors"))
save_file(tensors, self.tmp("real.safetensors"))
ret = safe_load(temp("real.safetensors"))
ret = safe_load(self.tmp("real.safetensors"))
for k,v in tensors.items(): np.testing.assert_array_equal(ret[k].numpy(), v.numpy())
safe_save(ret, temp("real.safetensors_alt"))
with open(temp("real.safetensors"), "rb") as f:
with open(temp("real.safetensors_alt"), "rb") as g:
safe_save(ret, self.tmp("real.safetensors_alt"))
with open(self.tmp("real.safetensors"), "rb") as f:
with open(self.tmp("real.safetensors_alt"), "rb") as g:
assert f.read() == g.read()
ret2 = safe_load(temp("real.safetensors_alt"))
ret2 = safe_load(self.tmp("real.safetensors_alt"))
for k,v in tensors.items(): np.testing.assert_array_equal(ret2[k].numpy(), v.numpy())
def test_real_safetensors_open(self):
fn = temp("real_safe")
fn = self.tmp("real_safe")
state_dict = {"tmp": Tensor.rand(10,10)}
safe_save(state_dict, fn)
import os
@@ -123,15 +131,15 @@ class TestSafetensors(unittest.TestCase):
from extra.models.efficientnet import EfficientNet
model = EfficientNet(0)
state_dict = get_state_dict(model)
safe_save(state_dict, temp("eff0"))
state_dict_loaded = safe_load(temp("eff0"))
safe_save(state_dict, self.tmp("eff0"))
state_dict_loaded = safe_load(self.tmp("eff0"))
assert sorted(state_dict_loaded.keys()) == sorted(state_dict.keys())
for k,v in state_dict.items():
np.testing.assert_array_equal(v.numpy(), state_dict_loaded[k].numpy())
# load with the real safetensors
from safetensors import safe_open
with safe_open(temp("eff0"), framework="pt", device="cpu") as f:
with safe_open(self.tmp("eff0"), framework="pt", device="cpu") as f:
assert sorted(f.keys()) == sorted(state_dict.keys())
for k in f.keys():
np.testing.assert_array_equal(f.get_tensor(k).numpy(), state_dict[k].numpy())
@@ -155,9 +163,9 @@ class TestSafetensors(unittest.TestCase):
def test_metadata(self):
metadata = {"hello": "world"}
safe_save({}, temp('metadata.safetensors'), metadata)
safe_save({}, self.tmp('metadata.safetensors'), metadata)
import struct
with open(temp('metadata.safetensors'), 'rb') as f:
with open(self.tmp('metadata.safetensors'), 'rb') as f:
dat = f.read()
sz = struct.unpack(">Q", dat[0:8])[0]
import json
@@ -167,7 +175,7 @@ class TestSafetensors(unittest.TestCase):
for dtype in DTYPES_DICT.values():
if dtype in [dtypes.bfloat16]: continue # not supported in numpy
if not is_dtype_supported(dtype): continue
path = temp(f"ones.{dtype}.safetensors")
path = self.tmp(f"ones.{dtype}.safetensors")
ones = Tensor(np.random.rand(10,10), dtype=dtype)
safe_save(get_state_dict(ones), path)
np.testing.assert_equal(ones.numpy(), list(safe_load(path).values())[0].numpy())
@@ -189,9 +197,9 @@ class TestSafetensors(unittest.TestCase):
"weight_I16": torch.tensor([127, 64], dtype=torch.short),
"weight_BF16": torch.randn((2, 2), dtype=torch.bfloat16),
}
save_file(tensors, temp("dtypes.safetensors"))
save_file(tensors, self.tmp("dtypes.safetensors"))
loaded = safe_load(temp("dtypes.safetensors"))
loaded = safe_load(self.tmp("dtypes.safetensors"))
for k,v in loaded.items():
if v.dtype != dtypes.bfloat16:
assert v.numpy().dtype == tensors[k].numpy().dtype
@@ -203,57 +211,52 @@ class TestSafetensors(unittest.TestCase):
"weight_U32": np.array([1, 2, 3], dtype=np.uint32),
"weight_U64": np.array([1, 2, 3], dtype=np.uint64),
}
np_save_file(tensors, temp("dtypes.safetensors"))
np_save_file(tensors, self.tmp("dtypes.safetensors"))
loaded = safe_load(temp("dtypes.safetensors"))
loaded = safe_load(self.tmp("dtypes.safetensors"))
for k,v in loaded.items():
assert v.numpy().dtype == tensors[k].dtype
np.testing.assert_allclose(v.numpy(), tensors[k])
def helper_test_disk_tensor(fn, data, np_fxn, tinygrad_fxn=None):
def helper_test_disk_tensor(tmp, fn, data, np_fxn, tinygrad_fxn=None):
if tinygrad_fxn is None: tinygrad_fxn = np_fxn
pathlib.Path(temp(fn)).unlink(missing_ok=True)
tinygrad_tensor = Tensor(data, device="CPU").to(f"disk:{temp(fn)}")
pathlib.Path(tmp(fn)).unlink(missing_ok=True)
tinygrad_tensor = Tensor(data, device="CPU").to(f"disk:{tmp(fn)}")
numpy_arr = np.array(data)
tinygrad_fxn(tinygrad_tensor)
np_fxn(numpy_arr)
np.testing.assert_allclose(tinygrad_tensor.numpy(), numpy_arr)
class TestDiskTensor(unittest.TestCase):
class TestDiskTensor(TempDirTestCase):
def test_empty(self):
pathlib.Path(temp("dt_empty")).unlink(missing_ok=True)
Tensor.empty(100, 100, device=f"disk:{temp('dt_empty')}")
Tensor.empty(100, 100, device=f"disk:{self.tmp('dt_empty')}")
def test_simple_read(self):
fn = pathlib.Path(temp("dt_simple_read"))
fn.unlink(missing_ok=True)
fn = pathlib.Path(self.tmp("dt_simple_read"))
fn.write_bytes(bytes(range(256)))
t = Tensor.empty(16, 16, device=f"disk:{temp('dt_simple_read')}", dtype=dtypes.uint8)
t = Tensor.empty(16, 16, device=f"disk:{self.tmp('dt_simple_read')}", dtype=dtypes.uint8)
out = t[1].to(Device.DEFAULT).tolist()
assert out == list(range(16, 32))
def test_simple_read_bitcast(self):
fn = pathlib.Path(temp("dt_simple_read_bitcast"))
fn.unlink(missing_ok=True)
fn = pathlib.Path(self.tmp("dt_simple_read_bitcast"))
fn.write_bytes(bytes(range(256))*2)
t = Tensor.empty(16, 16*2, device=f"disk:{temp('dt_simple_read_bitcast')}", dtype=dtypes.uint8)
t = Tensor.empty(16, 16*2, device=f"disk:{self.tmp('dt_simple_read_bitcast')}", dtype=dtypes.uint8)
out = t[1].bitcast(dtypes.uint16).to(Device.DEFAULT).tolist()
tout = [(x//256, x%256) for x in out]
assert tout == list([(x+1,x) for x in range(32,64,2)])
def test_simple_read_bitcast_alt(self):
fn = pathlib.Path(temp("dt_simple_read_bitcast_alt"))
fn.unlink(missing_ok=True)
fn = pathlib.Path(self.tmp("dt_simple_read_bitcast_alt"))
fn.write_bytes(bytes(range(256))*2)
t = Tensor.empty(16, 16*2, device=f"disk:{temp('dt_simple_read_bitcast_alt')}", dtype=dtypes.uint8)
t = Tensor.empty(16, 16*2, device=f"disk:{self.tmp('dt_simple_read_bitcast_alt')}", dtype=dtypes.uint8)
out = t.bitcast(dtypes.uint16)[1].to(Device.DEFAULT).tolist()
tout = [(x//256, x%256) for x in out]
assert tout == list([(x+1,x) for x in range(32,64,2)])
def test_strided_read(self):
# test non-contiguous (strided) read - should read elements at indices 0, 2, 4
pathlib.Path(temp(fn:="dt_strided_read")).unlink(missing_ok=True)
dt = Tensor([0, 1, 2, 3, 4, 5]).to(f"disk:{temp(fn)}")
dt = Tensor([0, 1, 2, 3, 4, 5]).to(f"disk:{self.tmp('dt_strided_read')}")
result = dt[::2].tolist()
# TODO: dt[::2] selects indices 0, 2, 4, so result should be [0, 2, 4]
# self.assertEqual(result, [0, 2, 4])
@@ -261,43 +264,38 @@ class TestDiskTensor(unittest.TestCase):
def test_permuted_read(self):
# test non-contiguous (permuted) read - should read transposed
pathlib.Path(temp(fn:="dt_permuted_read")).unlink(missing_ok=True)
dt = Tensor([[0, 1, 2], [3, 4, 5]]).to(f"disk:{temp(fn)}")
dt = Tensor([[0, 1, 2], [3, 4, 5]]).to(f"disk:{self.tmp('dt_permuted_read')}")
result = dt.T.tolist()
# TODO: transpose should give [[0, 3], [1, 4], [2, 5]]
# self.assertEqual(result, [[0, 3], [1, 4], [2, 5]])
self.assertEqual(result, [[0, 1], [2, 3], [4, 5]]) # wrong!
def test_write_ones(self):
pathlib.Path(temp("dt_write_ones")).unlink(missing_ok=True)
out = Tensor.ones(10, 10, device="CPU").contiguous()
outdisk = out.to(f"disk:{temp('dt_write_ones')}")
outdisk = out.to(f"disk:{self.tmp('dt_write_ones')}")
print(outdisk)
outdisk.realize()
del out, outdisk
import struct
# test file
with open(temp("dt_write_ones"), "rb") as f:
with open(self.tmp("dt_write_ones"), "rb") as f:
assert f.read() == struct.pack('<f', 1.0) * 100 == b"\x00\x00\x80\x3F" * 100
# test load alt
reloaded = Tensor.empty(10, 10, device=f"disk:{temp('dt_write_ones')}")
reloaded = Tensor.empty(10, 10, device=f"disk:{self.tmp('dt_write_ones')}")
np.testing.assert_almost_equal(reloaded.numpy(), np.ones((10, 10)))
def test_simple_setitem(self):
pathlib.Path(temp(fn:="dt_simple_setitem")).unlink(missing_ok=True)
data = [[1],[2]]
src = Tensor(data)
dt = src.to(f"disk:{temp(fn)}")
dt = src.to(f"disk:{self.tmp('dt_simple_setitem')}")
dt[1] = [3]
self.assertEqual(dt.tolist(), [[1], [3]])
def test_strided_setitem(self):
# test non-contiguous (strided) setitem - should set elements at indices 0, 2, 4
pathlib.Path(temp(fn:="dt_strided_setitem")).unlink(missing_ok=True)
dt = Tensor([1, 2, 3, 4, 5, 6]).to(f"disk:{temp(fn)}")
dt = Tensor([1, 2, 3, 4, 5, 6]).to(f"disk:{self.tmp('dt_strided_setitem')}")
dt[::2] = Tensor([10, 20, 30])
# TODO: dt[::2] selects indices 0, 2, 4, so result should be [10, 2, 20, 4, 30, 6]
# self.assertEqual(dt.tolist(), [10, 2, 20, 4, 30, 6])
@@ -305,39 +303,35 @@ class TestDiskTensor(unittest.TestCase):
def test_assign_const_to_disk(self):
# assign from CONST (Tensor.full) to disk - source has no buffer, needs contiguous first
pathlib.Path(temp(fn:="dt_assign_const")).unlink(missing_ok=True)
dt = Tensor.empty(4, device=f"disk:{temp(fn)}", dtype=dtypes.int32)
dt = Tensor.empty(4, device=f"disk:{self.tmp('dt_assign_const')}", dtype=dtypes.int32)
dt.assign(Tensor.full((4,), 42, dtype=dtypes.int32)).realize()
np.testing.assert_array_equal(dt.numpy(), [42, 42, 42, 42])
def test_assign_slice_from_const(self):
# slice assign from CONST to disk - tests size calculation when no RANGE ops
pathlib.Path(temp(fn:="dt_slice_const")).unlink(missing_ok=True)
dt = Tensor([0, 1, 2, 3], dtype=dtypes.int32).to(f"disk:{temp(fn)}")
dt = Tensor([0, 1, 2, 3], dtype=dtypes.int32).to(f"disk:{self.tmp('dt_slice_const')}")
dt[1:3].assign(Tensor.full((2,), 99, dtype=dtypes.int32)).realize()
np.testing.assert_array_equal(dt.numpy(), [0, 99, 99, 3])
def test_disk_to_disk_copy(self):
# disk-to-disk copy needs to go through CPU
pathlib.Path(temp(fn1:="dt_d2d_src")).unlink(missing_ok=True)
pathlib.Path(temp(fn2:="dt_d2d_dst")).unlink(missing_ok=True)
src = Tensor([1, 2, 3, 4], dtype=dtypes.int32).to(f"disk:{temp(fn1)}")
dst = Tensor.empty(4, device=f"disk:{temp(fn2)}", dtype=dtypes.int32)
src = Tensor([1, 2, 3, 4], dtype=dtypes.int32).to(f"disk:{self.tmp('dt_d2d_src')}")
dst = Tensor.empty(4, device=f"disk:{self.tmp('dt_d2d_dst')}", dtype=dtypes.int32)
dst.assign(src.to("CPU")).realize()
np.testing.assert_array_equal(dst.numpy(), [1, 2, 3, 4])
def test_assign_slice(self):
def assign(x,s,y): x[s] = y
helper_test_disk_tensor("dt_assign_slice_1", [0,1,2,3], lambda x: assign(x, slice(0,2), [13, 12]))
helper_test_disk_tensor("dt_assign_slice_2", [[0,1,2,3],[4,5,6,7]], lambda x: assign(x, slice(0,1), [[13, 12, 11, 10]]))
helper_test_disk_tensor(self.tmp, "dt_assign_slice_1", [0,1,2,3], lambda x: assign(x, slice(0,2), [13, 12]))
helper_test_disk_tensor(self.tmp, "dt_assign_slice_2", [[0,1,2,3],[4,5,6,7]], lambda x: assign(x, slice(0,1), [[13, 12, 11, 10]]))
def test_reshape(self):
helper_test_disk_tensor("dt_reshape_1", [1,2,3,4,5], lambda x: x.reshape((1,5)))
helper_test_disk_tensor("dt_reshape_2", [1,2,3,4], lambda x: x.reshape((2,2)))
helper_test_disk_tensor(self.tmp, "dt_reshape_1", [1,2,3,4,5], lambda x: x.reshape((1,5)))
helper_test_disk_tensor(self.tmp, "dt_reshape_2", [1,2,3,4], lambda x: x.reshape((2,2)))
def test_assign_to_different_dtype(self):
# NOTE: this is similar to Y_train in fetch_cifar
t = Tensor.empty(10, device=f'disk:{temp("dt_assign_to_different_dtype")}', dtype=dtypes.int64)
t = Tensor.empty(10, device=f'disk:{self.tmp("dt_assign_to_different_dtype")}', dtype=dtypes.int64)
for i in range(5):
data = np.array([3, 3])
@@ -349,8 +343,7 @@ class TestDiskTensor(unittest.TestCase):
def test_assign_with_bitcast(self):
# bitcast assign is used in safe_save for writing header length
# bitcast on source side works, bitcast on target side raises
pathlib.Path(temp(fn:="dt_assign_bitcast")).unlink(missing_ok=True)
t = Tensor.empty(16, device=f"disk:{temp(fn)}", dtype=dtypes.uint8)
t = Tensor.empty(16, device=f"disk:{self.tmp('dt_assign_bitcast')}", dtype=dtypes.uint8)
# correct way: bitcast the source to match target dtype
t[0:8].assign(Tensor([12345], dtype=dtypes.int64, device="CPU").bitcast(dtypes.uint8))
val = int.from_bytes(t[0:8].data(), 'little')
@@ -361,8 +354,7 @@ class TestDiskTensor(unittest.TestCase):
def test_assign_to_bitcast_view(self):
# assign float values to a float32 view of a uint8 disk buffer (used by safe_save)
pathlib.Path(temp(fn:="dt_bitcast_view_assign")).unlink(missing_ok=True)
t = Tensor.empty(32, device=f"disk:{temp(fn)}", dtype=dtypes.uint8)
t = Tensor.empty(32, device=f"disk:{self.tmp('dt_bitcast_view_assign')}", dtype=dtypes.uint8)
# create float32 view of bytes 8-24 (4 floats)
float_view = t[8:24].bitcast(dtypes.float32)
float_view.assign(Tensor([1.0, 2.0, 3.0, 4.0], dtype=dtypes.float32, device="CPU"))
@@ -370,21 +362,20 @@ class TestDiskTensor(unittest.TestCase):
def test_assign_cross_device(self):
# disk assign allows cross-device (source on GPU/CPU, target on disk)
pathlib.Path(temp(fn:="dt_assign_cross")).unlink(missing_ok=True)
t = Tensor.empty(4, device=f"disk:{temp(fn)}", dtype=dtypes.float32)
t = Tensor.empty(4, device=f"disk:{self.tmp('dt_assign_cross')}", dtype=dtypes.float32)
src = Tensor([1.0, 2.0, 3.0, 4.0]) # on default device
t.assign(src)
np.testing.assert_array_equal(t.numpy(), [1.0, 2.0, 3.0, 4.0])
def test_bitcast(self):
with open(temp('dt_bitcast'), "wb") as f: f.write(bytes(range(10,20)))
t = Tensor.empty(5, dtype=dtypes.int16, device=f"disk:{temp('dt_bitcast')}")
with open(self.tmp('dt_bitcast'), "wb") as f: f.write(bytes(range(10,20)))
t = Tensor.empty(5, dtype=dtypes.int16, device=f"disk:{self.tmp('dt_bitcast')}")
ret = t.to("CPU").bitcast(dtypes.uint16) + 1
assert ret.tolist() == [2827, 3341, 3855, 4369, 4883]
def test_bitcast_view(self):
with open(temp('dt_bitcast_view'), "wb") as f: f.write(bytes(range(10, 24)))
t = Tensor.empty(3, dtype=dtypes.uint, device=f"disk:{temp('dt_bitcast_view')}").shrink([(0, 2)])
with open(self.tmp('dt_bitcast_view'), "wb") as f: f.write(bytes(range(10, 24)))
t = Tensor.empty(3, dtype=dtypes.uint, device=f"disk:{self.tmp('dt_bitcast_view')}").shrink([(0, 2)])
ret = t.bitcast(dtypes.uint16).to("CPU") + 1
assert ret.tolist() == [2827, 3341, 3855, 4369]
@@ -392,59 +383,55 @@ class TestDiskTensor(unittest.TestCase):
@unittest.skipUnless(is_dtype_supported(dtypes.bfloat16), "bfloat16 not supported")
def test_bf16_disk_write_read(self):
t = Tensor([10000, -1, -1000, -10000, 20], dtype=dtypes.float32)
t.to(f"disk:{temp('dt_bf16_disk_write_read_f32')}").realize()
t.to(f"disk:{self.tmp('dt_bf16_disk_write_read_f32')}").realize()
# hack to "cast" f32 -> bf16
with open(temp('dt_bf16_disk_write_read_f32'), "rb") as f: dat = f.read()
with open(self.tmp('dt_bf16_disk_write_read_f32'), "rb") as f: dat = f.read()
adat = b''.join([dat[i+2:i+4] for i in range(0, len(dat), 4)])
with open(temp('dt_bf16_disk_write_read_bf16'), "wb") as f: f.write(adat)
with open(self.tmp('dt_bf16_disk_write_read_bf16'), "wb") as f: f.write(adat)
t = Tensor.empty(5, dtype=dtypes.bfloat16, device=f"disk:{temp('dt_bf16_disk_write_read_bf16')}")
t = Tensor.empty(5, dtype=dtypes.bfloat16, device=f"disk:{self.tmp('dt_bf16_disk_write_read_bf16')}")
ct = t.to(Device.DEFAULT).cast(dtypes.float)
assert ct.numpy().tolist() == [9984., -1, -1000, -9984, 20]
def test_copy_from_disk(self):
fn = pathlib.Path(temp("dt_copy_from_disk"))
fn.unlink(missing_ok=True)
fn = pathlib.Path(self.tmp("dt_copy_from_disk"))
fn.write_bytes(bytes(range(256))*1024)
t = Tensor.empty(256*1024, device=f"disk:{temp('dt_copy_from_disk')}", dtype=dtypes.uint8)
t = Tensor.empty(256*1024, device=f"disk:{self.tmp('dt_copy_from_disk')}", dtype=dtypes.uint8)
on_dev = t.to(Device.DEFAULT).realize()
np.testing.assert_equal(on_dev.numpy(), t.numpy())
def test_copy_from_disk_offset(self):
fn = pathlib.Path(temp("dt_copy_from_disk_offset"))
fn.unlink(missing_ok=True)
fn = pathlib.Path(self.tmp("dt_copy_from_disk_offset"))
fn.write_bytes(bytes(range(256))*1024)
for off in [314, 991, 2048, 4096]:
t = Tensor.empty(256*1024, device=f"disk:{temp('dt_copy_from_disk_offset')}", dtype=dtypes.uint8)[off:]
t = Tensor.empty(256*1024, device=f"disk:{self.tmp('dt_copy_from_disk_offset')}", dtype=dtypes.uint8)[off:]
on_dev = t.to(Device.DEFAULT).realize()
np.testing.assert_equal(on_dev.numpy(), t.numpy())
@slow
def test_copy_from_disk_huge(self):
fn = pathlib.Path(temp("dt_copy_from_disk_huge"))
fn.unlink(missing_ok=True)
fn = pathlib.Path(self.tmp("dt_copy_from_disk_huge"))
fn.write_bytes(bytes(range(256))*1024*256)
for off in [0, 551]:
t = Tensor.empty(256*1024*256, device=f"disk:{temp('dt_copy_from_disk_huge')}", dtype=dtypes.uint8)[off:]
t = Tensor.empty(256*1024*256, device=f"disk:{self.tmp('dt_copy_from_disk_huge')}", dtype=dtypes.uint8)[off:]
on_dev = t.to(Device.DEFAULT).realize()
np.testing.assert_equal(on_dev.numpy(), t.numpy())
@unittest.skip("this allocates a lot of RAM")
@unittest.skipUnless(OSX, "seems to only be an issue on macOS with file size >2 GiB")
def test_copy_to_cpu_not_truncated(self):
with open((fn:=temp("dt_copy_to_cpu_not_truncated")), "wb") as f: f.write(b'\x01' * (size := int(2 * 1024**3)) + (test := b"test"))
fn = self.tmp("dt_copy_to_cpu_not_truncated")
with open(fn, "wb") as f: f.write(b'\x01' * (size := int(2 * 1024**3)) + (test := b"test"))
x = Tensor.empty(size + len(test), dtype=dtypes.uint8, device=f"disk:{fn}").to("CPU").realize()
assert x[size:].data().tobytes() == test
def test_disk_device_reuse(self):
from tinygrad.runtime.ops_disk import DiskDevice
fn = pathlib.Path(temp("dt_device_reuse"))
fn.unlink(missing_ok=True)
fn = pathlib.Path(self.tmp("dt_device_reuse"))
fn.write_bytes(bytes(range(256)))
# create first tensor and realize it
t1 = Tensor.empty(128, device=f"disk:{fn}", dtype=dtypes.uint8)
@@ -466,8 +453,7 @@ class TestDiskTensor(unittest.TestCase):
def test_disk_open_failure_state(self):
from tinygrad.runtime.ops_disk import DiskDevice
fn = pathlib.Path(temp("dt_open_failure"))
fn.unlink(missing_ok=True)
fn = pathlib.Path(self.tmp("dt_open_failure"))
fn.write_bytes(bytes(range(256)))
os.chmod(fn, 0o000)
try:
@@ -486,8 +472,7 @@ class TestDiskTensor(unittest.TestCase):
assert disk_device.size == 200
def test_disk_permission_error(self):
fn = pathlib.Path(temp("dt_permission"))
fn.unlink(missing_ok=True)
fn = pathlib.Path(self.tmp("dt_permission"))
fn.write_bytes(bytes(range(256)))
os.chmod(fn, 0o000)
try:
@@ -496,17 +481,14 @@ class TestDiskTensor(unittest.TestCase):
finally:
os.chmod(fn, 0o644)
class TestPathTensor(unittest.TestCase):
class TestPathTensor(TempDirTestCase):
def setUp(self):
self.temp_dir = tempfile.TemporaryDirectory()
super().setUp()
self.test_file = pathlib.Path(self.temp_dir.name) / "test_file.bin"
self.test_data = np.arange(100, dtype=np.uint8).tobytes()
with open(self.test_file, "wb") as f:
f.write(self.test_data)
def tearDown(self):
self.temp_dir.cleanup()
def test_path_tensor_no_device(self):
t = Tensor(self.test_file)
self.assertEqual(t.shape, (100,))
@@ -557,10 +539,10 @@ class TestPathTensor(unittest.TestCase):
os.chmod(test_file, 0o644)
assert Tensor(pathlib.Path(test_file)).tolist(), list(range(10))
class TestDiskTensorMovement(unittest.TestCase):
class TestDiskTensorMovement(TempDirTestCase):
def setUp(self):
self.fn = pathlib.Path(temp("custom_disk_range"))
self.fn.unlink(missing_ok=True)
super().setUp()
self.fn = pathlib.Path(self.tmp("custom_disk_range"))
Tensor.arange(100, dtype=dtypes.uint8).to(f"disk:{str(self.fn)}").realize()
def test_simple_read(self):