mirror of
https://github.com/tinygrad/tinygrad.git
synced 2026-01-10 15:38:29 -05:00
zero copy (#2531)
* zero copy * zero copy test * loads coder in milliseconds * zero copy for cpu and torch * src_from_buffer is None * SLOW_METAL_COPY there
This commit is contained in:
28
test/test_zero_copy.py
Normal file
28
test/test_zero_copy.py
Normal file
@@ -0,0 +1,28 @@
|
||||
import unittest
|
||||
from tinygrad import Tensor, Device
|
||||
#from tinygrad.helpers import CI
|
||||
import time
|
||||
|
||||
def time_tensor_numpy(out:Tensor):
|
||||
times = []
|
||||
for _ in range(5):
|
||||
st = time.perf_counter()
|
||||
out.lazydata.realized.toCPU()
|
||||
et = time.perf_counter() - st
|
||||
times.append(et)
|
||||
return min(times)
|
||||
|
||||
N = 4096
|
||||
class TestZeroCopy(unittest.TestCase):
|
||||
@unittest.skipIf(Device.DEFAULT not in {"CLANG", "LLVM", "CPU", "TORCH", "METAL"}, "device isn't zero copy")
|
||||
def test_zero_copy_from_default_to_cpu(self):
|
||||
demo = Tensor.rand(1).realize()
|
||||
t1 = time_tensor_numpy(demo)
|
||||
out = Tensor.rand(N, N).realize()
|
||||
t2 = time_tensor_numpy(out)
|
||||
gbps = out.nbytes()*1e-9/max(t2-t1, 1e-10)
|
||||
print(f"time(base): {t1*1e3:.2f} ms, time(copy): {t2*1e3:.2f} ms : copy speed {gbps:.2f} GB/s")
|
||||
self.assertGreater(gbps, 1000) # more than 1000 GB/s = no copy
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main(verbosity=2)
|
||||
@@ -37,22 +37,26 @@ class Buffer:
|
||||
def __init__(self, device:str, size:int, dtype:DType, opaque:Any=None):
|
||||
assert isinstance(dtype, DType)
|
||||
self.device, self.size, self.dtype = device, size, dtype
|
||||
self._buf = opaque if opaque is not None else Device[self.device].allocator.alloc(size, dtype)
|
||||
GlobalCounters.mem_used += self.size * self.dtype.itemsize
|
||||
self.allocator = Device[self.device].allocator
|
||||
self._buf = opaque if opaque is not None else self.allocator.alloc(size, dtype)
|
||||
# TODO: mem_used for all devices
|
||||
if self.device == Device.DEFAULT: GlobalCounters.mem_used += self.size * self.dtype.itemsize
|
||||
def __del__(self):
|
||||
GlobalCounters.mem_used -= self.size * self.dtype.itemsize
|
||||
Device[self.device].allocator.free(self._buf, self.size, self.dtype)
|
||||
if self.device == Device.DEFAULT: GlobalCounters.mem_used -= self.size * self.dtype.itemsize
|
||||
self.allocator.free(self._buf, self.size, self.dtype)
|
||||
def __repr__(self): return f"<buf device:{self.device} size:{self.size}>"
|
||||
def copyin(self, mv:memoryview):
|
||||
mv = mv.cast("B", shape=[self.size*self.dtype.itemsize])
|
||||
assert len(mv) == self.size*self.dtype.itemsize, f"size mismatch, {len(mv)=} != {self.dtype=} {self.size=}"
|
||||
Device[self.device].allocator.copyin(self._buf, mv)
|
||||
self.allocator.copyin(self._buf, mv)
|
||||
return self
|
||||
@staticmethod
|
||||
def fromCPU(device:str, x:np.ndarray): return Buffer(device, x.size, dtypes.from_np(x.dtype)).copyin(x.data)
|
||||
def toCPU(self) -> np.ndarray:
|
||||
# zero copy with as_buffer
|
||||
if hasattr(self.allocator, 'as_buffer'): return np.frombuffer(self.allocator.as_buffer(self._buf), dtype=np.dtype(self.dtype.np, metadata={"backing": self._buf}))
|
||||
ret = np.empty(self.size, self.dtype.np)
|
||||
if self.size > 0: Device[self.device].allocator.copyout(ret.data.cast("B", shape=[self.size*self.dtype.itemsize]), self._buf)
|
||||
if self.size > 0: self.allocator.copyout(ret.data.cast("B", shape=[self.size*self.dtype.itemsize]), self._buf)
|
||||
return ret
|
||||
|
||||
# TODO: size, dest, src are the same type. can we enforce this?
|
||||
@@ -251,6 +255,7 @@ def _get_optimized_linearizer(linearizer_opts:LinearizerOptions, ast:LazyOp) ->
|
||||
import ctypes
|
||||
class _MallocAllocator(LRUAllocator):
|
||||
def _alloc(self, size:int, dtype:DType): return (ctypes.c_uint8 * (size*dtype.itemsize))()
|
||||
def as_buffer(self, src) -> memoryview: return memoryview(src)
|
||||
def copyin(self, dest, src:memoryview): ctypes.memmove(dest, from_mv(src), len(src))
|
||||
def copyout(self, dest:memoryview, src): ctypes.memmove(from_mv(dest), src, len(dest))
|
||||
MallocAllocator = _MallocAllocator()
|
||||
|
||||
@@ -27,7 +27,6 @@ def run_schedule(schedule:List[ScheduleItem], disable_logging=False):
|
||||
# we don't have an output buffer, we have to create it, and create to max size if it has symbolic shape
|
||||
si.out.realized = si.out.output_buffer if si.out.output_buffer is not None else \
|
||||
Buffer(si.out.device, prod((s if isinstance(s, int) else s.max for s in si.out.shape)), si.out.dtype)
|
||||
#Device[si.out.device].buffer(prod((s if isinstance(s, int) else s.max for s in si.out.shape)), si.out.dtype, **si.out._device_extra_args())
|
||||
# TODO: size 0 should be removed from the schedule
|
||||
if si.out.realized.size != 0:
|
||||
if si.ast.op in LoadOps:
|
||||
@@ -56,19 +55,10 @@ def _realize_rand(buffer: Buffer, arg) -> None:
|
||||
|
||||
# *** one op LoadOps ***
|
||||
|
||||
#from tinygrad.runtime.lib import RawBufferMapped, RawBufferTransfer
|
||||
#from tinygrad.runtime.ops_disk import RawDiskBuffer
|
||||
def _realize_from(buffer: Buffer, src: Buffer) -> None:
|
||||
assert src.size == buffer.size, f"size mismatch on FROM {src.size=} != {buffer.size=}"
|
||||
if DEBUG >= 2: print(f"*** copy {buffer.device} <- {src.device} size {src.size:<16d} shape {buffer.size:5d} dtype {src.dtype}")
|
||||
buffer.copyin(src.toCPU().data)
|
||||
# TODO: make this generic
|
||||
#if isinstance(src.realized, RawDiskBuffer) and isinstance(buffer.realized, RawBufferMapped):
|
||||
# src.realized.readinto(buffer.realized._buffer())
|
||||
#elif isinstance(src.realized, RawBufferTransfer) and isinstance(buffer.realized, RawBufferTransfer) and getenv("P2P", 0) >= 1:
|
||||
# buffer.realized._transfer(src.realized)
|
||||
#else:
|
||||
#buffer.realized._copyin(src.realized.toCPU())
|
||||
|
||||
# *** n op LoadOps ***
|
||||
|
||||
|
||||
@@ -41,6 +41,7 @@ numpy_fxn_for_op: Dict[Op, Callable] = {
|
||||
|
||||
class NumpyAllocator(Allocator):
|
||||
def _alloc(self, size:int, dtype:DType): return np.empty(size, dtype.np)
|
||||
def as_buffer(self, src:np.ndarray) -> memoryview: return np.require(src, requirements='C').data
|
||||
def copyin(self, dest:np.ndarray, src:memoryview): np.copyto(dest, np.frombuffer(src, dest.dtype).reshape(dest.shape))
|
||||
def copyout(self, dest:memoryview, src:np.ndarray): np.copyto(np.frombuffer(dest, src.dtype).reshape(src.shape), src)
|
||||
|
||||
|
||||
@@ -44,6 +44,7 @@ class DiskAllocator(Allocator):
|
||||
if os.path.getsize(self.device) < size * dtype.itemsize: os.ftruncate(f.fileno(), size * dtype.itemsize)
|
||||
buf = UnderlyingDiskBuffer(f, mmap.mmap(f.fileno(), size * dtype.itemsize))
|
||||
return DiskBuffer(buf, size, dtype)
|
||||
def as_buffer(self, src:DiskBuffer): return src._buf()
|
||||
def copyin(self, dest:DiskBuffer, src:memoryview): dest._buf()[:] = src
|
||||
def copyout(self, dest:memoryview, src:DiskBuffer):
|
||||
if src.ud.fd is not None:
|
||||
|
||||
@@ -56,12 +56,27 @@ class MetalAllocator(LRUAllocator):
|
||||
ret = self.device.device.newBufferWithLength_options_(size*dtype.itemsize, Metal.MTLResourceStorageModeShared)
|
||||
if ret is None: raise MemoryError(f"Metal OOM while allocating {size=} {dtype=}")
|
||||
return ret
|
||||
def _async_copy(self, dest, src):
|
||||
assert src.length() == dest.length(), f"length mismatch {src.length()=} {dest.length()=}"
|
||||
command_buffer = self.device.mtl_queue.commandBuffer()
|
||||
encoder = command_buffer.blitCommandEncoder()
|
||||
encoder.copyFromBuffer_sourceOffset_toBuffer_destinationOffset_size_(src, 0, dest, 0, src.length())
|
||||
encoder.endEncoding()
|
||||
command_buffer.commit()
|
||||
self.device.mtl_buffers_in_flight.append(command_buffer)
|
||||
def _from_buffer(self, src:memoryview): return self.device.device.newBufferWithBytesNoCopy_length_options_deallocator_(src, len(src), Metal.MTLResourceStorageModeShared, None)
|
||||
def _free(self, opaque): opaque.release()
|
||||
def _buffer(self, src):
|
||||
def as_buffer(self, src) -> memoryview:
|
||||
self.device.synchronize()
|
||||
return src.contents().as_buffer(src.length())
|
||||
def copyin(self, dest, src:memoryview): self._buffer(dest)[:] = src
|
||||
def copyout(self, dest:memoryview, src): dest[:] = self._buffer(src)
|
||||
def copyin(self, dest, src:memoryview):
|
||||
src_from_buffer = None if getenv("SLOW_METAL_COPY") else self._from_buffer(src)
|
||||
if src_from_buffer is None:
|
||||
self.as_buffer(dest)[:] = src
|
||||
else:
|
||||
self.device.copies_in_flight.append(src)
|
||||
self._async_copy(dest, src_from_buffer)
|
||||
def copyout(self, dest:memoryview, src): dest[:] = self.as_buffer(src)
|
||||
|
||||
class MetalDevice(Compiled):
|
||||
compiler_device = None
|
||||
@@ -70,8 +85,10 @@ class MetalDevice(Compiled):
|
||||
if MetalDevice.compiler_device is None: MetalDevice.compiler_device = self.device
|
||||
self.mtl_queue = self.device.newCommandQueueWithMaxCommandBufferCount_(1024)
|
||||
self.mtl_buffers_in_flight: List[Any] = []
|
||||
self.copies_in_flight: List[memoryview] = []
|
||||
from tinygrad.runtime.graph.metal import MetalGraph
|
||||
super().__init__(MetalAllocator(self), LinearizerOptions(device="METAL"), MetalRenderer, compile_metal, functools.partial(MetalProgram, self), functools.partial(MetalGraph, self))
|
||||
def synchronize(self):
|
||||
for cbuf in self.mtl_buffers_in_flight: cbuf.waitUntilCompleted()
|
||||
self.copies_in_flight.clear()
|
||||
self.mtl_buffers_in_flight.clear()
|
||||
|
||||
@@ -43,6 +43,7 @@ torch_fxn_for_op: Dict[Op, Callable] = {
|
||||
|
||||
class TorchAllocator(Allocator):
|
||||
def _alloc(self, size:int, dtype:DType): return torch.empty([size], device=device, dtype=inverse_type_map[dtype])
|
||||
def as_buffer(self, src:torch.Tensor) -> memoryview: return np.require(src.numpy(), requirements='C').data
|
||||
def copyin(self, dest:torch.Tensor, src:memoryview): dest.copy_(torch.frombuffer(src, dtype=dest.dtype))
|
||||
def copyout(self, dest:memoryview, src:torch.Tensor): torch.frombuffer(dest, dtype=src.dtype).copy_(src.flatten())
|
||||
|
||||
|
||||
Reference in New Issue
Block a user