mirror of
https://github.com/tinygrad/tinygrad.git
synced 2026-04-07 03:00:26 -04:00
start scheduler process replay (#5656)
This commit is contained in:
33
test/external/process_replay/process_replay.py
vendored
33
test/external/process_replay/process_replay.py
vendored
@@ -1,19 +1,22 @@
|
||||
#!/usr/bin/env python3
|
||||
# compare kernels created by HEAD against master
|
||||
import difflib, pickle, multiprocessing, os, logging, sqlite3
|
||||
from typing import List
|
||||
from tinygrad.codegen.kernel import Kernel
|
||||
from tinygrad.helpers import Context, ContextVar, colored, db_connection, VERSION, getenv, tqdm
|
||||
from tinygrad.ops import LazyOp
|
||||
|
||||
PAGE_SIZE = 100
|
||||
REF = os.getenv("GITHUB_REF_NAME", "")
|
||||
MAX_DIFF_PCT = getenv("PROCESS_REPLAY_MAX_DIFF_PCT", 20)
|
||||
TABLE_NAME = f"process_replay_{getenv('GITHUB_RUN_ID', 'HEAD')}_{VERSION}"
|
||||
REF_TABLE_NAME = f"process_replay_master_{VERSION}"
|
||||
ASSERT_DIFF = getenv("ASSERT_PROCESS_REPLAY", int((k:="[run_process_replay]") in os.getenv("COMMIT_MESSAGE", k) or k in os.getenv("PR_TITLE", k)))
|
||||
SKIP_PROCESS_REPLAY = (k:="[skip_process_replay]") in os.getenv("COMMIT_MESSAGE", "") or k in os.getenv("PR_TITLE", "") or REF == "master"
|
||||
early_stop = multiprocessing.Event()
|
||||
logging.basicConfig(level=logging.INFO, format='%(message)s')
|
||||
|
||||
def process_replay(offset:int):
|
||||
def process_replay(offset:int, ref_schedule:List[LazyOp]):
|
||||
if early_stop.is_set(): return
|
||||
conn = db_connection()
|
||||
cur = conn.cursor()
|
||||
@@ -36,6 +39,10 @@ def process_replay(offset:int):
|
||||
if ASSERT_DIFF: raise e
|
||||
continue
|
||||
# try compare
|
||||
if getenv("COMPARE_SCHEDULE") and ast not in ref_schedule:
|
||||
with Context(**{k:v for k,v in ctx.items() if k in ContextVar._cache and k != "DEBUG"}):
|
||||
print(opts.render(name, Kernel(ast, opts=opts).linearize().uops))
|
||||
continue
|
||||
try: assert compare_src == good_src
|
||||
except AssertionError as e:
|
||||
changed += 1
|
||||
@@ -53,20 +60,34 @@ def process_replay(offset:int):
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
||||
def get_ref_schedule(offset:int, ref_schedule):
|
||||
conn = db_connection()
|
||||
cur = conn.cursor()
|
||||
cur.execute(f"SELECT val FROM '{REF_TABLE_NAME}' LIMIT ? OFFSET ?", (PAGE_SIZE, offset))
|
||||
for row in cur.fetchall(): ref_schedule.append(pickle.loads(row[0])[0])
|
||||
|
||||
if __name__ == "__main__":
|
||||
if SKIP_PROCESS_REPLAY:
|
||||
logging.info("skipping process replay.")
|
||||
exit(0)
|
||||
conn = db_connection()
|
||||
cur = conn.cursor()
|
||||
ref_schedule = multiprocessing.Manager().list()
|
||||
if getenv("COMPARE_SCHEDULE"):
|
||||
row_count = cur.execute(f"select count(*) from '{REF_TABLE_NAME}'").fetchone()[0]
|
||||
processes = []
|
||||
for i in tqdm(range(0, row_count, PAGE_SIZE)):
|
||||
processes.append(p:=multiprocessing.Process(target=get_ref_schedule, args=(i, ref_schedule)))
|
||||
p.start()
|
||||
for p in processes: p.join()
|
||||
try: row_count = cur.execute(f"select count(*) from '{TABLE_NAME}'").fetchone()[0]
|
||||
except sqlite3.OperationalError:
|
||||
logging.warning(f"{TABLE_NAME} isn't accessible in master, did DB_VERSION change?")
|
||||
exit(0)
|
||||
conn.commit()
|
||||
cur.close()
|
||||
offsets = range(0, row_count, PAGE_SIZE)
|
||||
with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
|
||||
list(tqdm(pool.imap(process_replay, offsets), total=len(offsets)))
|
||||
pool.close()
|
||||
pool.join()
|
||||
processes = []
|
||||
for i in tqdm(range(0, row_count, PAGE_SIZE)):
|
||||
processes.append(p:=multiprocessing.Process(target=process_replay, args=(i, ref_schedule)))
|
||||
p.start()
|
||||
for p in processes: p.join()
|
||||
|
||||
Reference in New Issue
Block a user