mirror of
https://github.com/tinygrad/tinygrad.git
synced 2026-02-06 04:35:00 -05:00
Ring allreduce try 2 (#3852)
* Ring allreduce v3 * Configurable size, number of gpus and jit in benchmark * ScheduleBarrier v0 * GB/s that make sense * ScheduleBarrier v0.1 * Fallback on 2 GPUs * ScheduleBarrier v0.2 * ScheduleBarrier v0.3 * ScheduleBarrier v0.3.1 * ScheduleBarrier v0.3.2 * Replace ScheduleBarrier with automatic optimization * unused import * fix comment * typing * better fallback * python 3.8 * RING=2 and use ContextVar * DEBUG >= 2 and change name * linter * type --------- Co-authored-by: George Hotz <72895+geohot@users.noreply.github.com> Co-authored-by: chenyu <chenyu@fastmail.com> Co-authored-by: nimlgen <138685161+nimlgen@users.noreply.github.com>
This commit is contained in:
60
test/external/external_benchmark_multitensor_allreduce.py
vendored
Normal file
60
test/external/external_benchmark_multitensor_allreduce.py
vendored
Normal file
@@ -0,0 +1,60 @@
|
||||
import time
|
||||
from tinygrad import Tensor, Device
|
||||
from tinygrad.lazy import LazyBuffer
|
||||
from tinygrad.ops import ReduceOps, GlobalCounters
|
||||
from tinygrad.features.multi import MultiLazyBuffer, all_reduce
|
||||
from tinygrad.features.jit import TinyJit
|
||||
from tinygrad.realize import create_schedule, run_schedule
|
||||
from tinygrad.helpers import getenv, Context, RING
|
||||
from typing import List, Union
|
||||
|
||||
def realize(x: Union[LazyBuffer, List[LazyBuffer]]):
|
||||
x = x if isinstance(x, list) else [x]
|
||||
run_schedule(create_schedule(x))
|
||||
for lb in x: Device[lb.device].synchronize()
|
||||
|
||||
def test(devs: List[str], N: int, iters:int = 10):
|
||||
def _wrapped(op: ReduceOps, t: Tensor) -> Tensor:
|
||||
return Tensor(MultiLazyBuffer(all_reduce(op, t.lazydata.lbs), 0), device=devs)
|
||||
_jitted = TinyJit(_wrapped) if getenv("USEJIT", 1) == 1 else _wrapped
|
||||
|
||||
secs, gflops, gbs = 0, 0, 0
|
||||
for i in range(-2, iters):
|
||||
GlobalCounters.reset()
|
||||
lbs = [Tensor.full((N,), float(1+i), device=d).contiguous().lazydata for i,d in enumerate(devs)]
|
||||
realize(lbs)
|
||||
start = time.time()
|
||||
realize(_jitted(ReduceOps.SUM, Tensor(MultiLazyBuffer(lbs, 0), device=devs)).lazydata.lbs)
|
||||
end = time.time()
|
||||
if i < 0:
|
||||
# First time is slow due to kernel compilation
|
||||
continue
|
||||
i_secs = end-start
|
||||
i_gflops = GlobalCounters.global_ops/i_secs/10**9
|
||||
i_gbs = (N*4)/i_secs/10**9
|
||||
print(f"{'ring_allreduce' if RING >= 2 else 'naive_allreduce'} iter {i+1}/{iters}: {i_secs:.6f} sec {i_gflops:.2f} GFLOP/s {i_gbs:.2f} GB/s")
|
||||
secs += i_secs
|
||||
gflops += i_gflops
|
||||
gbs += i_gbs
|
||||
|
||||
return (gflops/iters, gbs/iters, secs/iters)
|
||||
|
||||
|
||||
def main():
|
||||
dev, n_gpus = Device.DEFAULT, getenv("GPUS", 6) # number of gpus
|
||||
devs = tuple([f"{dev}:{x}" for x in range(n_gpus)])
|
||||
|
||||
sz = getenv("SZ", 1000) * 10**6 # size of data on each gpu
|
||||
f32 = 4 # 4 bytes
|
||||
N = sz//f32
|
||||
|
||||
print(f"Using {sz/10**9:.2f} GB of numbers on each of {n_gpus} GPUs, {n_gpus*sz/10**9:.2f} GB total.")
|
||||
with Context(RING=2):
|
||||
(ring_gflops, ring_gbs, ring_secs) = test(devs, N)
|
||||
with Context(RING=0):
|
||||
(naive_gflops, naive_gbs, naive_secs) = test(devs, N)
|
||||
print(f"Ring:\n {ring_secs:.6f} seconds/iter\n {ring_gflops:.2f} GFLOP/s\n {ring_gbs:.2f} GB/s")
|
||||
print(f"Naive:\n {naive_secs:.6f} seconds/iter\n {naive_gflops:.2f} GFLOP/s\n {naive_gbs:.2f} GB/s")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user