Files
tinygrad/extra/tinyfs/fetch_raid.py
wozeparrot f228c03f9f fetch raid from cloud (#10799)
* feat: initial tinyfs device

* feat: don't allow compute on tinyfs device

* feat: tensor helpers to load and store

* feat: bufferview for tinyfs

* fix: keep copy sizes correct

* fix: recv large

* clean: unneeded

* feat: comment

* clean: unneeded

* clean: remove

* clean: remove

* feat: get request tag

* feat: rename to cloud

* feat: send request_id

* feat: start computing tree

* feat: compute store tree on this side

* feat: jank chunked load

* feat: more debugging

* feat: rename to just load and store

* feat: correct chunk count

* fix: fix load for < 1mb

* feat: comments

* feat: don't truncate on block devices

* feat: better way of testing block device

* feat: don't need to pad that much

* feat: connect to nodes directly on load

* feat: cache connections

* feat: don't hard code chunk size

* feat: close mmap when closing file handle

* feat: don't overwrite stuff on disk if storing from disk

* clean: debug print

* fix: close mmap

* feat: await workers

* feat: fast copy from tinyfs to disk

* feat: don't copy to device on last

* feat: use single socket per device

* feat: raid in tinyfs

* clean: remove import

* clean: type

* feat: maintain single event loop

* feat: lower worker count

* feat: use connection pool

* feat: fetch mapping in its own process

* fix: release lock

* feat: don't fetch if exists

* feat: req id only on stores

* feat: always fetch

* fix: rangeify

* feat: allow specifying raid root

* fix: dealloc buffer

* feat: start support non 0 offset

* clean: use cleaner

* feat: don't pass to threadpool

* clean: typing
2025-10-14 07:53:55 -07:00

40 lines
1.2 KiB
Python

import json, multiprocessing
from pathlib import Path
from tinygrad.tensor import Tensor
from tinygrad.helpers import tqdm, getenv
raid_root = Path(getenv("RAID_ROOT", "/raid"))
def fetch_file(item):
path, info = item
h, size = info["hash"], info["size"]
path = raid_root / Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
try:
pt = Tensor(bytes.fromhex(h), device="CPU").load(size).to(f"disk:{path.as_posix()}").realize()
except Exception as e:
print(f"error fetching {path}, {h}, {size}: {e}")
raise
pt.uop.buffer.deallocate()
def fetch_mapping():
mapping_tensor = Tensor(bytes.fromhex("d734f5e3be9f1e9d863bfaa4fc6c1ef2")).load(175866113).realize()
mapping = mapping_tensor.data().tobytes().decode()
mapping = json.loads(mapping)
mapped_files = mapping.items()
return list(mapped_files)
if __name__ == "__main__":
with multiprocessing.Pool(processes=1) as pool:
mapped_files = pool.apply(fetch_mapping)
print(f"fetched mapping for {len(mapped_files)} files")
with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
for _ in tqdm(pool.imap_unordered(fetch_file, mapped_files), total=len(mapped_files)):
pass