Files
tinygrad/extra/export_model.py
nimlgen 4164666c72 programinfo (#15942)
* programinfo

* fix

* m

* x

* x

* changes

* x

* fix

* rm
2026-04-27 23:12:03 +03:00

303 lines
17 KiB
Python

from typing import Tuple, Dict, List, Optional
from tinygrad.dtype import DType, dtypes
from tinygrad.tensor import Tensor
from tinygrad.device import Device, Buffer
from tinygrad.engine.jit import TinyJit
from tinygrad.nn.state import get_state_dict
from tinygrad.helpers import Context, to_mv, prod
from tinygrad.uop.ops import Ops, UOp
from tinygrad.codegen import to_program
import json
from collections import OrderedDict
EXPORT_SUPPORTED_DEVICE = ["WEBGPU", "CPU", "CUDA", "CL"]
_KERNEL_ASTS = {Ops.SINK, Ops.PROGRAM}
def iter_kernel_calls(linear:UOp):
"""Yield kernel CALLs from a LINEAR UOp. Toposort descends naturally into CUSTOM_FUNCTION graph batches; gate stops at kernel ASTs."""
return (u for u in linear.toposort(gate=lambda x: x.op not in _KERNEL_ASTS) if u.op is Ops.CALL and u.src[0].op in _KERNEL_ASTS)
def compile_net(linear:UOp, output_bufs:List[Buffer]) -> Tuple[Dict[str,str], List, Dict[str,Tuple[int,DType,int]], Dict[str,Buffer]]:
output_name = {id(b): f"output{i}" for i, b in enumerate(output_bufs)}
functions, bufs, bufs_to_save, statements, n = {}, {}, {}, [], 0
def name_of(bu:UOp, is_out:bool) -> str:
nonlocal n
if bu.op is Ops.PARAM: key, name, size = ("in", bu.arg), f"input{bu.arg}", prod(bu.shape)*bu.dtype.itemsize
else:
b = bu.buffer
key, size = (id(b.base), b.offset, b.size, b.dtype), b.size*b.dtype.itemsize
if key in bufs: return bufs[key][0]
if (name:=output_name.get(id(b))) is None:
name, n = f"buf_{n}", n+1
if not is_out: bufs_to_save[name] = b
bufs[key] = (name, size, bu.dtype, key)
return name
for call in iter_kernel_calls(linear):
arg_uops = [b for b in call.src[1:] if b.op is not Ops.BIND]
prg = to_program(call.src[0], Device[arg_uops[0].device].renderer)
info = prg.arg
functions[info.function_name] = prg.src[3].arg
cargs = [name_of(bu, i == 0) for i, bu in enumerate(arg_uops)] + [v for v in info.vars if v.op is Ops.DEFINE_VAR]
statements.append((info.function_name, cargs, info.global_size, info.local_size))
return functions, statements, {name:(size, dtype, key) for name, size, dtype, key in bufs.values()}, bufs_to_save
def jit_model(model, *args) -> Tuple[UOp, List[Buffer]]:
assert hasattr(model, "forward") or callable(model), "model needs a forward function"
@TinyJit
def run(*x):
out = model.forward(*x) if hasattr(model, "forward") else model(*x)
assert isinstance(out, (tuple, list, Tensor)), "model output must be a Tensor, tuple, or a list of Tensors for export"
out = [out] if isinstance(out, Tensor) else out
return [o.realize() for o in out]
# run twice to trigger JIT capture
for _ in range(2): the_output = run(*args)
assert run.captured is not None
return run.captured.linear, [o.uop.base.realized for o in the_output]
def export_model_clang(functions:Dict[str,str], statements:Dict[str,Tuple[str,int,int]], bufs:Dict[str,Tuple[str,int,int]],
bufs_to_save:Dict[str,Tensor], input_names:List[str], output_names:List[str], weight_names={}, model_name="model", symbolic_vars={}, wasm=False) -> str:
headers = ["#include <tgmath.h>"]
cprog = list(functions.values())
dtype_map = {dtypes.int: "int", dtypes.float: "float", dtypes.uchar: "unsigned char", dtypes.char: "signed char", dtypes.half: "__fp16", dtypes.uint: "unsigned int"}
inputs = [(name, dtype_map[bufs[name][1]], bufs[name][0]) for name in input_names + list(symbolic_vars.values())]
outputs = [(name, dtype_map[bufs[name][1]], bufs[name][0]) for name in output_names]
forward_args = ",".join(f"{dtype}{'*' if name not in symbolic_vars.values() else ''} {name}" for name,dtype,_ in (outputs+inputs if wasm else inputs+outputs))
if not wasm:
for name,cl in bufs_to_save.items():
weight = ''.join(["\\x%02X"%x for x in bytes(to_mv(cl._buf.va_addr, cl._buf.size))])
cprog.append(f"unsigned char {name}_data[] = \"{weight}\";")
cprog += [f"{dtype_map[dtype]} {name}[{len}];" if name not in bufs_to_save else f"{dtype_map[dtype]} *{name} = ({dtype_map[dtype]} *){name}_data;" for name,(len,dtype,_key) in bufs.items() if name not in input_names+output_names]
cprog += [f"void net({forward_args}) {{"] + [f"{name}({', '.join(args)});" for (name, args, _global_size, _local_size) in statements] + ["}"]
return '\n'.join(headers + cprog)
else:
if bufs_to_save:
headers += ["#include <stddef.h>"]
bufs_to_save = {k:v for k,v in bufs.items() if v[2] in weight_names} # causes random seeds to be set as zeroes, not exported as a model weight
buf_to_name = OrderedDict((buf_name, {"name": weight_names[data[2]], "idx": i}) for i, (buf_name, data) in enumerate(bufs_to_save.items()))
cprog.append(f"void* bufs[{len(buf_to_name)}];")
cprog.append(f"""void set_buf(size_t index, void* ptr) {{\n bufs[index] = ptr;\n}}""")
for name in set(bufs.keys()) - set(bufs_to_save.keys()) - set(input_names + output_names):
n_bytes, dtype, _ = bufs[name]
cprog += [f"{dtype_map[dtype]} {name}[{n_bytes // dtype.itemsize}];"]
cprog += [f"void net({forward_args})"] + ["{"]
get_weight_ptr = lambda x: f"({dtype_map[bufs_to_save[x][1]]} *)bufs[{buf_to_name[x]['idx']}]" if x in bufs_to_save else x
cprog += [f" {name}({', '.join(map(get_weight_ptr, args))});" for (name, args, _global_size, _local_size) in statements] + ["}"]
weightMapping = "" if not bufs_to_save else f"""\nconst weightNames = [{", ".join([f'"{weight_name}"' for weight_name in [v["name"] for v in buf_to_name.values()]])}];
const {model_name}_name_to_id = Object.fromEntries(weightNames.map((name, index) => [name, index]));\n"""
top = f"""import {model_name}Module from './{model_name}.js'{weightMapping}"""
whitespace = "\n "
js_wrapper = f"""{top}\nvar {model_name} = async function() {{
const wasm = await {model_name}Module();
{whitespace.join(f"const {name}Ptr = wasm._malloc({n_bytes});" for name, _, n_bytes in outputs+inputs if name not in symbolic_vars.values())}
return {{
run: ({",".join(name for name,_,_ in inputs)}) => {{
{(whitespace + " ").join(f"wasm.HEAPU8.set({name}, {name}Ptr);" for name,_,_ in inputs if name not in symbolic_vars.values())}
wasm._net({", ".join(f"{name}{'Ptr' if name not in symbolic_vars.values() else ''}" for name,_,_ in outputs+inputs)});
{(whitespace + " ").join(f"const {name} = wasm.HEAPU8.slice({name}Ptr, {name}Ptr + {n_bytes});" for name,_,n_bytes in outputs)}
return [{", ".join(f"{name}" for name,_,_ in outputs)}];
}},
wasm: wasm
}}
}}\nexport {{ {model_name}, {model_name}_name_to_id }};"""
return '\n'.join(headers + cprog), js_wrapper
def dtype_to_js_type(dtype: DType) -> str:
return f"{'Uint' if dtype in dtypes.uints else 'Int' if (dtype in dtypes.sints or dtype == dtypes.bool) else 'Float'}{8*dtype.itemsize}Array"
def export_model_webgpu(functions, statements, bufs, weight_names, input_names, output_names, model_name, symbolic_vars={}, stream_weights=False) -> Tuple[str,int,int]:
kernel_code = '\n\n'.join([f"const {key} = `{code.replace(key, 'main')}`;" for key, code in functions.items()])
kernel_names = ', '.join([name for (name, _, _, _) in statements])
input_names += list(symbolic_vars.values())
input_buffer_types = [dtype_to_js_type(bufs[inp_name][1]) for inp_name in input_names]
output_buffer_types = [dtype_to_js_type(bufs[out_name][1]) for out_name in output_names]
buf_type = lambda x: "uniform" if x in set(symbolic_vars.values()) else "storage"
create_bind_group_layouts = ",".join([
"device.createBindGroupLayout({{entries: [{{binding: 0, visibility: GPUShaderStage.COMPUTE, buffer: {{ type: 'uniform' }}}}, {}]}})".format(
",".join([f"{{binding: {argIdx+1}, visibility: GPUShaderStage.COMPUTE, buffer: {{ type: '{buf_type(argName)}' }} }}" for argIdx, argName in enumerate(args)])
)
for _, (_, args, _, _) in enumerate(statements)
])
layouts = f"const layouts=[{create_bind_group_layouts}]"
kernel_calls = '\n '.join([f"addComputePass(device, commandEncoder, pipelines[{i}], layouts[{i}], infinityBuf, [{', '.join(args)}], [{', '.join(str(x) for x in global_size)}]);" for i, (_name, args, global_size, _local_size) in enumerate(statements) ])
buf_type = lambda x: "createUniformBuf" if x in set(uop.arg[0] for uop in symbolic_vars) else "createEmptyBuf"
map_to_external_weight = lambda _key: f"state_dict['{weight_names[_key]}']" if stream_weights else f"getTensorBuffer(safetensor, metadata['{weight_names[_key]}'])"
_bufs = '\n '.join([f"const {name} = " + (f"{buf_type(_key)}(device, {size});" if _key not in weight_names else f"createWeightBuf(device, {size}, {map_to_external_weight(_key)})") + ";" for name,(size,dtype,_key) in bufs.items()])
gpu_write_bufs = '\n '.join([f"const gpuWriteBuffer{i} = device.createBuffer({{size:{input_name}.size, usage: GPUBufferUsage.COPY_SRC | GPUBufferUsage.MAP_WRITE }});" for i,input_name in enumerate(input_names)])
input_writers = '\n '.join([f"await gpuWriteBuffer{i}.mapAsync(GPUMapMode.WRITE);\n new {input_buffer_types[i]}(gpuWriteBuffer{i}.getMappedRange()).set(" + f'_{inp_name});' + f"\n gpuWriteBuffer{i}.unmap();\n commandEncoder.copyBufferToBuffer(gpuWriteBuffer{i}, 0, {inp_name}, 0, gpuWriteBuffer{i}.size);" for i,inp_name in enumerate(input_names)])
gpu_read_bufs = '\n '.join([f"const gpuReadBuffer{i} = device.createBuffer({{size:{output_name}.size, usage: GPUBufferUsage.COPY_DST | GPUBufferUsage.MAP_READ }});" for i,output_name in enumerate(output_names)])
outbuf_copies = '\n '.join([f"commandEncoder.copyBufferToBuffer({output_name}, 0, gpuReadBuffer{i}, 0, output{i}.size);" for i,output_name in enumerate(output_names)])
output_readers = '\n '.join([f"await gpuReadBuffer{i}.mapAsync(GPUMapMode.READ);\n const resultBuffer{i} = new {output_buffer_types[i]}(gpuReadBuffer{i}.size/{bufs[output_names[i]][1].itemsize});\n resultBuffer{i}.set(new {output_buffer_types[i]}(gpuReadBuffer{i}.getMappedRange()));\n gpuReadBuffer{i}.unmap();" for i in range(len(output_names))])
output_return = '[{}]'.format(",".join([f'resultBuffer{i}' for i in range(len(output_names))]))
getTensorMetadata = f"""\nconst getTensorMetadata = (safetensorBuffer) => {{
const metadataLength = Number(new DataView(safetensorBuffer.buffer).getBigUint64(0, true));
const metadata = JSON.parse(new TextDecoder("utf8").decode(safetensorBuffer.subarray(8, 8 + metadataLength)));
return Object.fromEntries(Object.entries(metadata).filter(([k, v]) => k !== "__metadata__").map(([k, v]) => [k, {{...v, data_offsets: v.data_offsets.map(x => 8 + metadataLength + x)}}]));
}};\n""" if not stream_weights else ""
return f"""
const {model_name} = (() => {{
const getTensorBuffer = (safetensorBuffer, tensorMetadata) => {{
return safetensorBuffer.subarray(...tensorMetadata.data_offsets);
}};
{getTensorMetadata}
const createEmptyBuf = (device, size) => {{
return device.createBuffer({{size, usage: GPUBufferUsage.STORAGE | GPUBufferUsage.COPY_SRC | GPUBufferUsage.COPY_DST }});
}};
const createUniformBuf = (device, size) => {{
return device.createBuffer({{size, usage: GPUBufferUsage.UNIFORM | GPUBufferUsage.COPY_DST}})
}}
const createInfinityUniformBuf = (device) => {{
const size = 4;
const buf = device.createBuffer({{
mappedAtCreation: true,
size,
usage: GPUBufferUsage.UNIFORM | GPUBufferUsage.COPY_SRC | GPUBufferUsage.COPY_DST
}});
new Float32Array(buf.getMappedRange())[0] = Infinity;
buf.unmap();
return buf;
}};
const createWeightBuf = (device, size, data) => {{
const buf = device.createBuffer({{ size, usage: GPUBufferUsage.STORAGE{" | GPUBufferUsage.COPY_DST" if stream_weights else ", mappedAtCreation: true"} }});
{"data.bytes = buf;" if stream_weights else "new Uint8Array(buf.getMappedRange()).set(data); buf.unmap();"}
return buf;
}};
const addComputePass = (device, commandEncoder, pipeline, layout, infinityUniformBuf, bufs, workgroup) => {{
const bindGroup = device.createBindGroup({{
layout: layout,
entries: [
{{ binding: 0, resource: {{ buffer: infinityUniformBuf }} }},
...bufs.map((buffer, index) => ({{ binding: index + 1, resource: {{ buffer }} }}))
]
}});
const passEncoder = commandEncoder.beginComputePass();
passEncoder.setPipeline(pipeline);
passEncoder.setBindGroup(0, bindGroup);
passEncoder.dispatchWorkgroups(...workgroup);
passEncoder.end();
}};
{kernel_code}
const setupNet = async (device, {"state_dict" if stream_weights else "safetensor"}) => {{
{"const metadata = getTensorMetadata(safetensor);" if not stream_weights else ""}
const infinityBuf = createInfinityUniformBuf(device);
{layouts}
{_bufs}
{gpu_write_bufs}
{gpu_read_bufs}
const kernels = [{kernel_names}];
const pipelines = await Promise.all(kernels.map(async (name, i) => {{
return await device.createComputePipelineAsync({{
layout: device.createPipelineLayout({{
bindGroupLayouts: [layouts[i]],
}}),
compute: {{
module: device.createShaderModule({{
code: name,
}}),
entryPoint: "main",
}},
}});
}}))
return async ({",".join([f"_{input_name}" for input_name in input_names])}) => {{
const commandEncoder = device.createCommandEncoder();
{input_writers}
{kernel_calls}
{outbuf_copies}
const gpuCommands = commandEncoder.finish();
device.queue.submit([gpuCommands]);
{output_readers}
return {output_return};
}}
}}
const load = async (device, weight_path) => {{ return await fetch(weight_path).then(x => x.arrayBuffer()).then(x => setupNet(device, new Uint8Array(x))); }}
return {{ load, setupNet }};
}})();
export default {model_name};
"""
def export_model(model, target:str, *inputs, model_name: Optional[str] = "model", stream_weights=False):
assert Device.DEFAULT in EXPORT_SUPPORTED_DEVICE, f"only {', '.join(EXPORT_SUPPORTED_DEVICE)} are supported"
# NOTE: CPU_COUNT=1, since export does not support threading
with Context(JIT=2, CPU_COUNT=1): linear, output_bufs = jit_model(model, *inputs)
functions, statements, bufs, bufs_to_save = compile_net(linear, output_bufs)
state = get_state_dict(model)
weight_names = {(id(b), b.offset, b.size, b.dtype): name for name, x in state.items() if (b:=x.uop.base.realized) is not None}
input_names = [f"input{i}" for i in range(len(inputs))]
output_names = [f"output{i}" for i in range(len(output_bufs))]
# handle symbolic variables; TODO: refactor to fix some of this stuff upstream in tinygrad
symbolic_vars = OrderedDict()
for i, (_, args, global_size, _) in enumerate(statements):
for j, var in enumerate(args):
if getattr(var, "op", None) is Ops.DEFINE_VAR and isinstance(getattr(var, "arg", None), tuple) and isinstance(var.arg[0], str):
if var not in symbolic_vars:
symbolic_vars[var] = var.arg[0]
bufs[symbolic_vars[var]] = (var.dtype.itemsize, var.dtype, symbolic_vars[var])
statements[i][1][j] = symbolic_vars[var]
if global_size:
for j, dim in enumerate(global_size):
if getattr(dim, "op", None) is Ops.ADD and len(dim.src) == 2 and {dim.src[0].op, dim.src[1].op} == {Ops.DEFINE_VAR, Ops.CONST}:
name, val = dim.src if dim.src[1].op is Ops.CONST else reversed(dim.src)
global_size[j] = f"_{name.arg[0]}[0] + {val.arg}"
prg = ""
if target == "clang":
prg = export_model_clang(functions, statements, bufs, bufs_to_save, input_names, output_names)
elif target == "wasm":
return export_model_clang(functions, statements, bufs, bufs_to_save, input_names, output_names, weight_names, model_name, symbolic_vars, wasm=True)
elif target == "webgpu":
prg = export_model_webgpu(functions, statements, bufs, weight_names, input_names, output_names, model_name, symbolic_vars, stream_weights)
else:
prg = json.dumps({
"backend": Device.DEFAULT,
"inputs": [{
"size": bufs[name][0],
"dtype": bufs[name][1].name
} for name in input_names],
"outputs": [{
"size": bufs[name][0],
"dtype": bufs[name][1].name
} for name in output_names],
"functions": functions,
"statements": [{
"kernel": kernel,
"args": args,
"global_size": global_size,
"local_size": local_size
} for (kernel, args, global_size, local_size) in statements],
"buffers": {
name: {
"size": size,
"dtype": dtype.name,
"id": weight_names[_key] if _key in weight_names else ""
} for name, (size,dtype,_key) in bufs.items() if name not in ["input", "outputs"]
}
})
return prg, {input:bufs[input][0] for input in input_names}, {output:bufs[output][0] for output in output_names}, state