diff --git a/test/test_zero_copy.py b/test/test_zero_copy.py new file mode 100644 index 0000000000..7b7cd2f69b --- /dev/null +++ b/test/test_zero_copy.py @@ -0,0 +1,28 @@ +import unittest +from tinygrad import Tensor, Device +#from tinygrad.helpers import CI +import time + +def time_tensor_numpy(out:Tensor): + times = [] + for _ in range(5): + st = time.perf_counter() + out.lazydata.realized.toCPU() + et = time.perf_counter() - st + times.append(et) + return min(times) + +N = 4096 +class TestZeroCopy(unittest.TestCase): + @unittest.skipIf(Device.DEFAULT not in {"CLANG", "LLVM", "CPU", "TORCH", "METAL"}, "device isn't zero copy") + def test_zero_copy_from_default_to_cpu(self): + demo = Tensor.rand(1).realize() + t1 = time_tensor_numpy(demo) + out = Tensor.rand(N, N).realize() + t2 = time_tensor_numpy(out) + gbps = out.nbytes()*1e-9/max(t2-t1, 1e-10) + print(f"time(base): {t1*1e3:.2f} ms, time(copy): {t2*1e3:.2f} ms : copy speed {gbps:.2f} GB/s") + self.assertGreater(gbps, 1000) # more than 1000 GB/s = no copy + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/tinygrad/device.py b/tinygrad/device.py index f54cff5d9f..d51c87b944 100644 --- a/tinygrad/device.py +++ b/tinygrad/device.py @@ -37,22 +37,26 @@ class Buffer: def __init__(self, device:str, size:int, dtype:DType, opaque:Any=None): assert isinstance(dtype, DType) self.device, self.size, self.dtype = device, size, dtype - self._buf = opaque if opaque is not None else Device[self.device].allocator.alloc(size, dtype) - GlobalCounters.mem_used += self.size * self.dtype.itemsize + self.allocator = Device[self.device].allocator + self._buf = opaque if opaque is not None else self.allocator.alloc(size, dtype) + # TODO: mem_used for all devices + if self.device == Device.DEFAULT: GlobalCounters.mem_used += self.size * self.dtype.itemsize def __del__(self): - GlobalCounters.mem_used -= self.size * self.dtype.itemsize - Device[self.device].allocator.free(self._buf, self.size, self.dtype) + if self.device == Device.DEFAULT: GlobalCounters.mem_used -= self.size * self.dtype.itemsize + self.allocator.free(self._buf, self.size, self.dtype) def __repr__(self): return f"" def copyin(self, mv:memoryview): mv = mv.cast("B", shape=[self.size*self.dtype.itemsize]) assert len(mv) == self.size*self.dtype.itemsize, f"size mismatch, {len(mv)=} != {self.dtype=} {self.size=}" - Device[self.device].allocator.copyin(self._buf, mv) + self.allocator.copyin(self._buf, mv) return self @staticmethod def fromCPU(device:str, x:np.ndarray): return Buffer(device, x.size, dtypes.from_np(x.dtype)).copyin(x.data) def toCPU(self) -> np.ndarray: + # zero copy with as_buffer + if hasattr(self.allocator, 'as_buffer'): return np.frombuffer(self.allocator.as_buffer(self._buf), dtype=np.dtype(self.dtype.np, metadata={"backing": self._buf})) ret = np.empty(self.size, self.dtype.np) - if self.size > 0: Device[self.device].allocator.copyout(ret.data.cast("B", shape=[self.size*self.dtype.itemsize]), self._buf) + if self.size > 0: self.allocator.copyout(ret.data.cast("B", shape=[self.size*self.dtype.itemsize]), self._buf) return ret # TODO: size, dest, src are the same type. can we enforce this? @@ -251,6 +255,7 @@ def _get_optimized_linearizer(linearizer_opts:LinearizerOptions, ast:LazyOp) -> import ctypes class _MallocAllocator(LRUAllocator): def _alloc(self, size:int, dtype:DType): return (ctypes.c_uint8 * (size*dtype.itemsize))() + def as_buffer(self, src) -> memoryview: return memoryview(src) def copyin(self, dest, src:memoryview): ctypes.memmove(dest, from_mv(src), len(src)) def copyout(self, dest:memoryview, src): ctypes.memmove(from_mv(dest), src, len(dest)) MallocAllocator = _MallocAllocator() diff --git a/tinygrad/realize.py b/tinygrad/realize.py index 2553a26f17..89e5735bf9 100644 --- a/tinygrad/realize.py +++ b/tinygrad/realize.py @@ -27,7 +27,6 @@ def run_schedule(schedule:List[ScheduleItem], disable_logging=False): # we don't have an output buffer, we have to create it, and create to max size if it has symbolic shape si.out.realized = si.out.output_buffer if si.out.output_buffer is not None else \ Buffer(si.out.device, prod((s if isinstance(s, int) else s.max for s in si.out.shape)), si.out.dtype) - #Device[si.out.device].buffer(prod((s if isinstance(s, int) else s.max for s in si.out.shape)), si.out.dtype, **si.out._device_extra_args()) # TODO: size 0 should be removed from the schedule if si.out.realized.size != 0: if si.ast.op in LoadOps: @@ -56,19 +55,10 @@ def _realize_rand(buffer: Buffer, arg) -> None: # *** one op LoadOps *** -#from tinygrad.runtime.lib import RawBufferMapped, RawBufferTransfer -#from tinygrad.runtime.ops_disk import RawDiskBuffer def _realize_from(buffer: Buffer, src: Buffer) -> None: assert src.size == buffer.size, f"size mismatch on FROM {src.size=} != {buffer.size=}" if DEBUG >= 2: print(f"*** copy {buffer.device} <- {src.device} size {src.size:<16d} shape {buffer.size:5d} dtype {src.dtype}") buffer.copyin(src.toCPU().data) - # TODO: make this generic - #if isinstance(src.realized, RawDiskBuffer) and isinstance(buffer.realized, RawBufferMapped): - # src.realized.readinto(buffer.realized._buffer()) - #elif isinstance(src.realized, RawBufferTransfer) and isinstance(buffer.realized, RawBufferTransfer) and getenv("P2P", 0) >= 1: - # buffer.realized._transfer(src.realized) - #else: - #buffer.realized._copyin(src.realized.toCPU()) # *** n op LoadOps *** diff --git a/tinygrad/runtime/ops_cpu.py b/tinygrad/runtime/ops_cpu.py index 8543b3028f..91253d9e63 100644 --- a/tinygrad/runtime/ops_cpu.py +++ b/tinygrad/runtime/ops_cpu.py @@ -41,6 +41,7 @@ numpy_fxn_for_op: Dict[Op, Callable] = { class NumpyAllocator(Allocator): def _alloc(self, size:int, dtype:DType): return np.empty(size, dtype.np) + def as_buffer(self, src:np.ndarray) -> memoryview: return np.require(src, requirements='C').data def copyin(self, dest:np.ndarray, src:memoryview): np.copyto(dest, np.frombuffer(src, dest.dtype).reshape(dest.shape)) def copyout(self, dest:memoryview, src:np.ndarray): np.copyto(np.frombuffer(dest, src.dtype).reshape(src.shape), src) diff --git a/tinygrad/runtime/ops_disk.py b/tinygrad/runtime/ops_disk.py index 6e07579314..7a48ad7cb9 100644 --- a/tinygrad/runtime/ops_disk.py +++ b/tinygrad/runtime/ops_disk.py @@ -44,6 +44,7 @@ class DiskAllocator(Allocator): if os.path.getsize(self.device) < size * dtype.itemsize: os.ftruncate(f.fileno(), size * dtype.itemsize) buf = UnderlyingDiskBuffer(f, mmap.mmap(f.fileno(), size * dtype.itemsize)) return DiskBuffer(buf, size, dtype) + def as_buffer(self, src:DiskBuffer): return src._buf() def copyin(self, dest:DiskBuffer, src:memoryview): dest._buf()[:] = src def copyout(self, dest:memoryview, src:DiskBuffer): if src.ud.fd is not None: diff --git a/tinygrad/runtime/ops_metal.py b/tinygrad/runtime/ops_metal.py index 5e103c9013..724ece905b 100644 --- a/tinygrad/runtime/ops_metal.py +++ b/tinygrad/runtime/ops_metal.py @@ -56,12 +56,27 @@ class MetalAllocator(LRUAllocator): ret = self.device.device.newBufferWithLength_options_(size*dtype.itemsize, Metal.MTLResourceStorageModeShared) if ret is None: raise MemoryError(f"Metal OOM while allocating {size=} {dtype=}") return ret + def _async_copy(self, dest, src): + assert src.length() == dest.length(), f"length mismatch {src.length()=} {dest.length()=}" + command_buffer = self.device.mtl_queue.commandBuffer() + encoder = command_buffer.blitCommandEncoder() + encoder.copyFromBuffer_sourceOffset_toBuffer_destinationOffset_size_(src, 0, dest, 0, src.length()) + encoder.endEncoding() + command_buffer.commit() + self.device.mtl_buffers_in_flight.append(command_buffer) + def _from_buffer(self, src:memoryview): return self.device.device.newBufferWithBytesNoCopy_length_options_deallocator_(src, len(src), Metal.MTLResourceStorageModeShared, None) def _free(self, opaque): opaque.release() - def _buffer(self, src): + def as_buffer(self, src) -> memoryview: self.device.synchronize() return src.contents().as_buffer(src.length()) - def copyin(self, dest, src:memoryview): self._buffer(dest)[:] = src - def copyout(self, dest:memoryview, src): dest[:] = self._buffer(src) + def copyin(self, dest, src:memoryview): + src_from_buffer = None if getenv("SLOW_METAL_COPY") else self._from_buffer(src) + if src_from_buffer is None: + self.as_buffer(dest)[:] = src + else: + self.device.copies_in_flight.append(src) + self._async_copy(dest, src_from_buffer) + def copyout(self, dest:memoryview, src): dest[:] = self.as_buffer(src) class MetalDevice(Compiled): compiler_device = None @@ -70,8 +85,10 @@ class MetalDevice(Compiled): if MetalDevice.compiler_device is None: MetalDevice.compiler_device = self.device self.mtl_queue = self.device.newCommandQueueWithMaxCommandBufferCount_(1024) self.mtl_buffers_in_flight: List[Any] = [] + self.copies_in_flight: List[memoryview] = [] from tinygrad.runtime.graph.metal import MetalGraph super().__init__(MetalAllocator(self), LinearizerOptions(device="METAL"), MetalRenderer, compile_metal, functools.partial(MetalProgram, self), functools.partial(MetalGraph, self)) def synchronize(self): for cbuf in self.mtl_buffers_in_flight: cbuf.waitUntilCompleted() + self.copies_in_flight.clear() self.mtl_buffers_in_flight.clear() diff --git a/tinygrad/runtime/ops_torch.py b/tinygrad/runtime/ops_torch.py index 8ae758f9c0..c661f6abb5 100644 --- a/tinygrad/runtime/ops_torch.py +++ b/tinygrad/runtime/ops_torch.py @@ -43,6 +43,7 @@ torch_fxn_for_op: Dict[Op, Callable] = { class TorchAllocator(Allocator): def _alloc(self, size:int, dtype:DType): return torch.empty([size], device=device, dtype=inverse_type_map[dtype]) + def as_buffer(self, src:torch.Tensor) -> memoryview: return np.require(src.numpy(), requirements='C').data def copyin(self, dest:torch.Tensor, src:memoryview): dest.copy_(torch.frombuffer(src, dtype=dest.dtype)) def copyout(self, dest:memoryview, src:torch.Tensor): torch.frombuffer(dest, dtype=src.dtype).copy_(src.flatten())