mirror of
https://github.com/tinygrad/tinygrad.git
synced 2026-01-08 22:48:25 -05:00
fetch raid from cloud (#10799)
* feat: initial tinyfs device * feat: don't allow compute on tinyfs device * feat: tensor helpers to load and store * feat: bufferview for tinyfs * fix: keep copy sizes correct * fix: recv large * clean: unneeded * feat: comment * clean: unneeded * clean: remove * clean: remove * feat: get request tag * feat: rename to cloud * feat: send request_id * feat: start computing tree * feat: compute store tree on this side * feat: jank chunked load * feat: more debugging * feat: rename to just load and store * feat: correct chunk count * fix: fix load for < 1mb * feat: comments * feat: don't truncate on block devices * feat: better way of testing block device * feat: don't need to pad that much * feat: connect to nodes directly on load * feat: cache connections * feat: don't hard code chunk size * feat: close mmap when closing file handle * feat: don't overwrite stuff on disk if storing from disk * clean: debug print * fix: close mmap * feat: await workers * feat: fast copy from tinyfs to disk * feat: don't copy to device on last * feat: use single socket per device * feat: raid in tinyfs * clean: remove import * clean: type * feat: maintain single event loop * feat: lower worker count * feat: use connection pool * feat: fetch mapping in its own process * fix: release lock * feat: don't fetch if exists * feat: req id only on stores * feat: always fetch * fix: rangeify * feat: allow specifying raid root * fix: dealloc buffer * feat: start support non 0 offset * clean: use cleaner * feat: don't pass to threadpool * clean: typing
This commit is contained in:
39
extra/tinyfs/fetch_raid.py
Normal file
39
extra/tinyfs/fetch_raid.py
Normal file
@@ -0,0 +1,39 @@
|
||||
import json, multiprocessing
|
||||
from pathlib import Path
|
||||
|
||||
from tinygrad.tensor import Tensor
|
||||
from tinygrad.helpers import tqdm, getenv
|
||||
|
||||
raid_root = Path(getenv("RAID_ROOT", "/raid"))
|
||||
|
||||
def fetch_file(item):
|
||||
path, info = item
|
||||
h, size = info["hash"], info["size"]
|
||||
|
||||
path = raid_root / Path(path)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
try:
|
||||
pt = Tensor(bytes.fromhex(h), device="CPU").load(size).to(f"disk:{path.as_posix()}").realize()
|
||||
except Exception as e:
|
||||
print(f"error fetching {path}, {h}, {size}: {e}")
|
||||
raise
|
||||
|
||||
pt.uop.buffer.deallocate()
|
||||
|
||||
def fetch_mapping():
|
||||
mapping_tensor = Tensor(bytes.fromhex("d734f5e3be9f1e9d863bfaa4fc6c1ef2")).load(175866113).realize()
|
||||
mapping = mapping_tensor.data().tobytes().decode()
|
||||
mapping = json.loads(mapping)
|
||||
mapped_files = mapping.items()
|
||||
return list(mapped_files)
|
||||
|
||||
if __name__ == "__main__":
|
||||
with multiprocessing.Pool(processes=1) as pool:
|
||||
mapped_files = pool.apply(fetch_mapping)
|
||||
|
||||
print(f"fetched mapping for {len(mapped_files)} files")
|
||||
|
||||
with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
|
||||
for _ in tqdm(pool.imap_unordered(fetch_file, mapped_files), total=len(mapped_files)):
|
||||
pass
|
||||
31
extra/tinyfs/upload_raid.py
Normal file
31
extra/tinyfs/upload_raid.py
Normal file
@@ -0,0 +1,31 @@
|
||||
from pathlib import Path
|
||||
import multiprocessing, json
|
||||
|
||||
from tinygrad.tensor import Tensor
|
||||
from tinygrad.helpers import tqdm
|
||||
|
||||
raid_root = Path("/raid")
|
||||
|
||||
def upload_file(path: Path):
|
||||
pt = Tensor(path).realize()
|
||||
h = pt.store().realize()
|
||||
pt.uop.realized.deallocate()
|
||||
return h.data().hex(), path, pt.nbytes()
|
||||
|
||||
if __name__ == "__main__":
|
||||
raid_files = sorted([p for p in raid_root.rglob("*") if p.is_file()])
|
||||
print(f"found {len(raid_files)} files in /raid")
|
||||
|
||||
mapping = {}
|
||||
with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
|
||||
for h, p, s in tqdm(pool.imap_unordered(upload_file, raid_files), total=len(raid_files)):
|
||||
mapping[p.relative_to(raid_root).as_posix()] = {"hash": h, "size": s}
|
||||
|
||||
# sort the mapping by key
|
||||
mapping = dict(sorted(mapping.items()))
|
||||
|
||||
mapping = json.dumps(mapping).encode()
|
||||
mapping_tensor = Tensor(mapping, device="CPU")
|
||||
h = mapping_tensor.store().realize()
|
||||
|
||||
print(f"final hash: {h.data().hex()}, size: {len(mapping)}")
|
||||
Reference in New Issue
Block a user