mirror of
https://github.com/tinygrad/tinygrad.git
synced 2026-04-29 03:00:14 -04:00
run_schedule is so simple now (#4130)
This commit is contained in:
@@ -20,6 +20,9 @@ class Buffer:
|
||||
if initial_value is not None:
|
||||
self.allocate()
|
||||
self.copyin(memoryview(initial_value))
|
||||
def ensure_allocated(self) -> Buffer:
|
||||
if not hasattr(self, '_buf'): self.allocate()
|
||||
return self
|
||||
def allocate(self, opaque=None) -> Buffer:
|
||||
assert not hasattr(self, '_buf'), "can't allocate already allocated buffer"
|
||||
from tinygrad.device import Device
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
from typing import List, Dict, Optional
|
||||
from tinygrad.helpers import getenv, colored
|
||||
from tinygrad.helpers import colored
|
||||
from tinygrad.ops import ScheduleItem, BufferOps, LoadOps
|
||||
from tinygrad.device import JITRunner, Device, BufferCopy, BufferXfer, update_stats
|
||||
from tinygrad.buffer import Buffer
|
||||
@@ -11,7 +11,11 @@ class CustomOp(JITRunner):
|
||||
super().__init__()
|
||||
def __call__(self, rawbufs:List[Buffer], var_vals:Dict[Variable, int], wait=False, jit=False): self.fxn(*rawbufs)
|
||||
|
||||
def lower_schedule_item(si:ScheduleItem) -> Optional[JITRunner]:
|
||||
class EmptyOp(JITRunner):
|
||||
def __call__(self, rawbufs:List[Buffer], var_vals:Dict[Variable, int], wait=False, jit=False):
|
||||
update_stats(colored(f"empty {rawbufs[0].size:10d} {rawbufs[0].dtype}", "yellow"), 0, 0, {}, jit, 1, device=rawbufs[0].device)
|
||||
|
||||
def lower_schedule_item(si:ScheduleItem) -> JITRunner:
|
||||
assert len(set(x.device for x in si.outputs+si.inputs)) == 1 or si.ast[0].op is LoadOps.COPY
|
||||
if si.ast[0].op is BufferOps.STORE: return Device[si.outputs[0].device].get_runner(*si.ast)
|
||||
assert len(si.ast) == 1 and len(si.outputs) == 1, "only ASTRunner supports multioutput"
|
||||
@@ -20,22 +24,18 @@ def lower_schedule_item(si:ScheduleItem) -> Optional[JITRunner]:
|
||||
if hasattr(Device[out.device].allocator, 'transfer') and out.device.split(":")[0] == si.inputs[0].device.split(":")[0]: return BufferXfer()
|
||||
return BufferCopy()
|
||||
if ast.op is LoadOps.CUSTOM: return CustomOp(ast.arg)
|
||||
return None
|
||||
if ast.op is LoadOps.EMPTY: return EmptyOp()
|
||||
raise RuntimeError(f"don't know how to lower {ast}")
|
||||
|
||||
logops = open(getenv("LOGOPS", ""), "a") if getenv("LOGOPS", "") else None
|
||||
def run_schedule(schedule:List[ScheduleItem], var_vals:Optional[Dict[Variable, int]] = None):
|
||||
while len(schedule):
|
||||
si = schedule.pop(0)
|
||||
if logops and si.ast[0].op not in LoadOps and not any(i.device.startswith("DISK:") for i in si.inputs): logops.write(str(si.ast)+"\n")
|
||||
|
||||
# get the program
|
||||
prg = lower_schedule_item(si)
|
||||
|
||||
for out in si.outputs:
|
||||
# we don't have an output buffer, we have to create it, and create to max size if it has symbolic shape
|
||||
if out.size > 0 and not hasattr(out, "_buf"): out.allocate()
|
||||
# allocate output buffers
|
||||
for out in si.outputs: out.ensure_allocated()
|
||||
|
||||
# run the function (put it in JIT)
|
||||
real_buffers = [x for x in si.outputs+si.inputs if x.size != 0]
|
||||
if prg: prg.exec(real_buffers, var_vals if var_vals is not None else {})
|
||||
elif (out:=si.outputs[0]).size > 0: update_stats(colored(f"empty {out.size:10d} {out.dtype}", "yellow"), 0, 0, {}, None, 1, device=out.device)
|
||||
prg.exec(list(si.outputs+si.inputs), var_vals if var_vals is not None else {})
|
||||
@@ -4,7 +4,7 @@ from dataclasses import dataclass
|
||||
from typing import Tuple, List, Dict, Optional, Set, DefaultDict
|
||||
from tinygrad.ops import LoadOps, ScheduleItem, BufferOps, LazyOp, ReduceOps, ConstBuffer, MemBuffer, BinaryOps, UnaryOps
|
||||
from tinygrad.features.graph import log_lazybuffer, realized_lazybuffer
|
||||
from tinygrad.helpers import GRAPH, DEBUG, GlobalCounters, prod, dedup, all_int, merge_dicts
|
||||
from tinygrad.helpers import GRAPH, DEBUG, GlobalCounters, prod, dedup, all_int, merge_dicts, getenv
|
||||
from tinygrad.shape.symbolic import Variable
|
||||
from tinygrad.dtype import ImageDType, dtypes
|
||||
from tinygrad.lazy import LazyBuffer
|
||||
@@ -13,6 +13,9 @@ from tinygrad.shape.shapetracker import ShapeTracker
|
||||
# creation can recurse a lot
|
||||
sys.setrecursionlimit(10000)
|
||||
|
||||
# optionally log the ops to disk
|
||||
logops = open(getenv("LOGOPS", ""), "a") if getenv("LOGOPS", "") else None
|
||||
|
||||
# TODO: it's unfortunate this needs to exist, but because of ASSIGN, we have to retain the LazyBuffer structure until post toposort
|
||||
@dataclass(frozen=True)
|
||||
class _LBScheduleItem:
|
||||
@@ -203,18 +206,18 @@ def create_schedule_with_vars(outs:List[LazyBuffer], seen:Optional[Set[LazyBuffe
|
||||
# breadth first ordering
|
||||
graph: DefaultDict[LazyBuffer, List[LazyBuffer]] = defaultdict(list)
|
||||
in_degree: DefaultDict[LazyBuffer, int] = defaultdict(int)
|
||||
for key, si in prescheduled.items():
|
||||
for key, lsi in prescheduled.items():
|
||||
# realize outputs after all parents are realized
|
||||
scheduled_parents = set(schedule_targets[x].outputs[0] for x in si.inputs if x in schedule_targets)
|
||||
scheduled_parents = set(schedule_targets[x].outputs[0] for x in lsi.inputs if x in schedule_targets)
|
||||
for x in scheduled_parents:
|
||||
graph[x].append(key)
|
||||
in_degree[key] += 1
|
||||
# realize outputs before a parent is assigned to
|
||||
parents_assigns = set(schedule_targets[assign_targets[x]].outputs[0] for x in si.inputs if x in assign_targets)
|
||||
parents_assigns = set(schedule_targets[assign_targets[x]].outputs[0] for x in lsi.inputs if x in assign_targets)
|
||||
for assign in parents_assigns:
|
||||
graph[key].append(assign)
|
||||
in_degree[assign] += 1
|
||||
for out in si.outputs: del out.srcs # can only schedule once
|
||||
for out in lsi.outputs: del out.srcs # can only schedule once
|
||||
|
||||
queue = deque(si for key, si in prescheduled.items() if in_degree[key] == 0)
|
||||
schedule: List[ScheduleItem] = []
|
||||
@@ -227,7 +230,8 @@ def create_schedule_with_vars(outs:List[LazyBuffer], seen:Optional[Set[LazyBuffe
|
||||
kernel_number += 1
|
||||
for out in ps.outputs: realized_lazybuffer(out, kernel_number)
|
||||
var_vals = merge_dicts([var_vals, ps.var_vals])
|
||||
schedule.append(ScheduleItem(ps.ast, tuple(x.buffer for x in ps.outputs if x.size != 0), tuple(x.buffer for x in ps.inputs if x.size != 0)))
|
||||
schedule.append(si:=ScheduleItem(ps.ast, tuple(x.buffer for x in ps.outputs if x.size != 0), tuple(x.buffer for x in ps.inputs if x.size != 0)))
|
||||
if logops and si.ast[0].op not in LoadOps and not any(i.device.startswith("DISK:") for i in si.inputs): logops.write(str(si.ast)+"\n")
|
||||
for x in graph[ps.outputs[0]]:
|
||||
in_degree[x] -= 1
|
||||
if in_degree[x] == 0: queue.append(prescheduled[x])
|
||||
|
||||
Reference in New Issue
Block a user