diff --git a/examples/mlperf/dataloader.py b/examples/mlperf/dataloader.py index 95f5e4beac..b916e22098 100644 --- a/examples/mlperf/dataloader.py +++ b/examples/mlperf/dataloader.py @@ -33,6 +33,9 @@ def shuffled_indices(n): del indices[i] def loader_process(q_in, q_out, X:Tensor): + import signal + signal.signal(signal.SIGINT, lambda _, __: exit(0)) + with Context(DEBUG=0): while (_recv := q_in.get()) is not None: idx, fn = _recv @@ -64,24 +67,7 @@ def batch_load_resnet(batch_size=64, val=False, shuffle=True): files = get_val_files() if val else get_train_files() from extra.datasets.imagenet import get_imagenet_categories cir = get_imagenet_categories() - - BATCH_COUNT = 32 - #q_in, q_out = MyQueue(multiple_writers=False), MyQueue(multiple_readers=False) - q_in, q_out = Queue(), Queue() - - sz = (batch_size*BATCH_COUNT, 224, 224, 3) - shm = shared_memory.SharedMemory(name="resnet_X", create=True, size=prod(sz)) - # disk:shm is slower - #X = Tensor.empty(*sz, dtype=dtypes.uint8, device=f"disk:shm:{shm.name}") - X = Tensor.empty(*sz, dtype=dtypes.uint8, device=f"disk:/dev/shm/resnet_X") - Y = [None] * (batch_size*BATCH_COUNT) - - procs = [] - for _ in range(64): - p = Process(target=loader_process, args=(q_in, q_out, X)) - p.daemon = True - p.start() - procs.append(p) + BATCH_COUNT = min(32, len(files) // batch_size) gen = shuffled_indices(len(files)) if shuffle else iter(range(len(files))) def enqueue_batch(num): @@ -89,7 +75,6 @@ def batch_load_resnet(batch_size=64, val=False, shuffle=True): fn = files[next(gen)] q_in.put((idx, fn)) Y[idx] = cir[fn.split("/")[-2]] - for bn in range(BATCH_COUNT): enqueue_batch(bn) class Cookie: def __init__(self, num): self.num = num @@ -106,14 +91,35 @@ def batch_load_resnet(batch_size=64, val=False, shuffle=True): gotten[num] = 0 return X[num*batch_size:(num+1)*batch_size], Y[num*batch_size:(num+1)*batch_size], Cookie(num) - # NOTE: this is batch aligned, last ones are ignored - for _ in range(0, len(files)//batch_size): yield receive_batch() + #q_in, q_out = MyQueue(multiple_writers=False), MyQueue(multiple_readers=False) + q_in, q_out = Queue(), Queue() - # shutdown processes - for _ in procs: q_in.put(None) - for p in procs: p.join() - shm.close() - shm.unlink() + sz = (batch_size*BATCH_COUNT, 224, 224, 3) + shm = shared_memory.SharedMemory(name="resnet_X", create=True, size=prod(sz)) + procs = [] + + try: + # disk:shm is slower + #X = Tensor.empty(*sz, dtype=dtypes.uint8, device=f"disk:shm:{shm.name}") + X = Tensor.empty(*sz, dtype=dtypes.uint8, device=f"disk:/dev/shm/resnet_X") + Y = [None] * (batch_size*BATCH_COUNT) + + for _ in range(64): + p = Process(target=loader_process, args=(q_in, q_out, X)) + p.daemon = True + p.start() + procs.append(p) + + for bn in range(BATCH_COUNT): enqueue_batch(bn) + + # NOTE: this is batch aligned, last ones are ignored + for _ in range(0, len(files)//batch_size): yield receive_batch() + finally: + # shutdown processes + for _ in procs: q_in.put(None) + for p in procs: p.join() + shm.close() + shm.unlink() if __name__ == "__main__": from extra.datasets.imagenet import get_train_files, get_val_files