From f228c03f9fc5bc6d91af63ed24de631b441478f0 Mon Sep 17 00:00:00 2001 From: wozeparrot Date: Tue, 14 Oct 2025 07:53:55 -0700 Subject: [PATCH] fetch raid from cloud (#10799) * feat: initial tinyfs device * feat: don't allow compute on tinyfs device * feat: tensor helpers to load and store * feat: bufferview for tinyfs * fix: keep copy sizes correct * fix: recv large * clean: unneeded * feat: comment * clean: unneeded * clean: remove * clean: remove * feat: get request tag * feat: rename to cloud * feat: send request_id * feat: start computing tree * feat: compute store tree on this side * feat: jank chunked load * feat: more debugging * feat: rename to just load and store * feat: correct chunk count * fix: fix load for < 1mb * feat: comments * feat: don't truncate on block devices * feat: better way of testing block device * feat: don't need to pad that much * feat: connect to nodes directly on load * feat: cache connections * feat: don't hard code chunk size * feat: close mmap when closing file handle * feat: don't overwrite stuff on disk if storing from disk * clean: debug print * fix: close mmap * feat: await workers * feat: fast copy from tinyfs to disk * feat: don't copy to device on last * feat: use single socket per device * feat: raid in tinyfs * clean: remove import * clean: type * feat: maintain single event loop * feat: lower worker count * feat: use connection pool * feat: fetch mapping in its own process * fix: release lock * feat: don't fetch if exists * feat: req id only on stores * feat: always fetch * fix: rangeify * feat: allow specifying raid root * fix: dealloc buffer * feat: start support non 0 offset * clean: use cleaner * feat: don't pass to threadpool * clean: typing --- extra/tinyfs/fetch_raid.py | 39 +++++++++++++++++++++++++++++++++++++ extra/tinyfs/upload_raid.py | 31 +++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+) create mode 100644 extra/tinyfs/fetch_raid.py create mode 100644 extra/tinyfs/upload_raid.py diff --git a/extra/tinyfs/fetch_raid.py b/extra/tinyfs/fetch_raid.py new file mode 100644 index 0000000000..780fd157ea --- /dev/null +++ b/extra/tinyfs/fetch_raid.py @@ -0,0 +1,39 @@ +import json, multiprocessing +from pathlib import Path + +from tinygrad.tensor import Tensor +from tinygrad.helpers import tqdm, getenv + +raid_root = Path(getenv("RAID_ROOT", "/raid")) + +def fetch_file(item): + path, info = item + h, size = info["hash"], info["size"] + + path = raid_root / Path(path) + path.parent.mkdir(parents=True, exist_ok=True) + + try: + pt = Tensor(bytes.fromhex(h), device="CPU").load(size).to(f"disk:{path.as_posix()}").realize() + except Exception as e: + print(f"error fetching {path}, {h}, {size}: {e}") + raise + + pt.uop.buffer.deallocate() + +def fetch_mapping(): + mapping_tensor = Tensor(bytes.fromhex("d734f5e3be9f1e9d863bfaa4fc6c1ef2")).load(175866113).realize() + mapping = mapping_tensor.data().tobytes().decode() + mapping = json.loads(mapping) + mapped_files = mapping.items() + return list(mapped_files) + +if __name__ == "__main__": + with multiprocessing.Pool(processes=1) as pool: + mapped_files = pool.apply(fetch_mapping) + + print(f"fetched mapping for {len(mapped_files)} files") + + with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool: + for _ in tqdm(pool.imap_unordered(fetch_file, mapped_files), total=len(mapped_files)): + pass diff --git a/extra/tinyfs/upload_raid.py b/extra/tinyfs/upload_raid.py new file mode 100644 index 0000000000..0c1b6ee0ae --- /dev/null +++ b/extra/tinyfs/upload_raid.py @@ -0,0 +1,31 @@ +from pathlib import Path +import multiprocessing, json + +from tinygrad.tensor import Tensor +from tinygrad.helpers import tqdm + +raid_root = Path("/raid") + +def upload_file(path: Path): + pt = Tensor(path).realize() + h = pt.store().realize() + pt.uop.realized.deallocate() + return h.data().hex(), path, pt.nbytes() + +if __name__ == "__main__": + raid_files = sorted([p for p in raid_root.rglob("*") if p.is_file()]) + print(f"found {len(raid_files)} files in /raid") + + mapping = {} + with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool: + for h, p, s in tqdm(pool.imap_unordered(upload_file, raid_files), total=len(raid_files)): + mapping[p.relative_to(raid_root).as_posix()] = {"hash": h, "size": s} + + # sort the mapping by key + mapping = dict(sorted(mapping.items())) + + mapping = json.dumps(mapping).encode() + mapping_tensor = Tensor(mapping, device="CPU") + h = mapping_tensor.store().realize() + + print(f"final hash: {h.data().hex()}, size: {len(mapping)}")