Files
home-llm/evaluate.py
2024-02-22 21:12:38 -05:00

193 lines
8.7 KiB
Python

#!/usr/bin/env python3
import argparse, os, re, json
import torch
from datasets import load_dataset
from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig
from peft import PeftConfig, PeftModel
from tqdm import tqdm
CTX_SIZE = 2048
"""
python3 evaluate.py stablehome-3b-rev1/checkpoint-400 --batch-size 4 --lora && \
python3 evaluate.py stablehome-3b-rev1/checkpoint-800 --batch-size 4 --lora && \
python3 evaluate.py stablehome-3b-rev1/checkpoint-1200 --batch-size 4 --lora && \
python3 evaluate.py stablehome-3b-rev1/checkpoint-1600 --batch-size 4 --lora && \
python3 evaluate.py stablehome-3b-rev1/checkpoint-2000 --batch-size 4 --lora && \
python3 evaluate.py stablehome-3b-rev1/checkpoint-2400 --batch-size 4 --lora && \
python3 evaluate.py stablehome-3b-rev1/checkpoint-2800 --batch-size 4 --lora && \
python3 evaluate.py stablehome-3b-rev1/checkpoint-3200 --batch-size 4 --lora && \
python3 evaluate.py stablehome-3b-rev1/checkpoint-3600 --batch-size 4 --lora && \
python3 evaluate.py stablehome-3b-rev1/checkpoint-4000 --batch-size 4 --lora && \
python3 evaluate.py stablehome-3b-rev1 --batch-size 4 --lora
"""
def tokenize(tokenizer, prompt):
return tokenizer(prompt, return_tensors="pt", padding=True, truncation=True, max_length=CTX_SIZE)
def generate(model, tokenizer, prompts):
inputs = tokenize(tokenizer, prompts)
with torch.no_grad():
outputs = model.generate(**inputs)
text = tokenizer.batch_decode(outputs)
return text
def main():
parser = argparse.ArgumentParser(description="Evaluate the function calling for a model")
parser.add_argument("model")
parser.add_argument("--dataset_file", default="./data/home_assistant_test.jsonl")
parser.add_argument("--batch-size", default=8)
parser.add_argument("--lora", default=False,action='store_const', const=True)
args = parser.parse_args()
lora_folder = f"./loras/{args.model}"
model_folder = f"./models/{args.model}"
dataset = load_dataset("json", data_files={ "train": args.dataset_file })["train"]
print(f"Got {len(dataset)} examples to test")
# filter out examples that are status requests
if "text" in dataset:
dataset = dataset.filter(lambda example: "```homeassistant" in example["text"])
else:
dataset = dataset.filter(lambda example: "```homeassistant" in example["conversations"][2]["value"])
service_call_regex = re.compile(r"```homeassistant\n([\S \t\n]*?)```")
torch.set_default_device("cuda")
if args.lora:
adapter_config = PeftConfig.from_pretrained(lora_folder)
base_model_name = adapter_config.base_model_name_or_path
print(f"Loading lora from {lora_folder} ({base_model_name})...")
base_model = AutoModelForCausalLM.from_pretrained(
base_model_name,
trust_remote_code=True,
torch_dtype=torch.bfloat16,
)
trained_tokenizer = AutoTokenizer.from_pretrained(base_model_name, trust_remote_code=True, padding_side='left')
trained_model = PeftModel.from_pretrained(base_model, lora_folder, trust_remote_code=True, torch_dtype=torch.bfloat16)
output_folder = lora_folder
else:
print(f"Loading model from {model_folder}...")
trained_model = AutoModelForCausalLM.from_pretrained(
model_folder,
trust_remote_code=True,
torch_dtype=torch.bfloat16,
)
trained_tokenizer = AutoTokenizer.from_pretrained(model_folder, trust_remote_code=True, padding_side='left')
output_folder = model_folder
trained_model.generation_config = GenerationConfig(
max_new_tokens=128,
use_cache=True,
do_sample=True,
temperature=0.1,
top_k=40,
top_p=1.0,
repetition_penalty=1.15,
eos_token_id=trained_model.config.eos_token_id,
pad_token_id=trained_model.config.pad_token_id if trained_model.config.pad_token_id else trained_model.config.eos_token_id,
)
split = trained_tokenizer.apply_chat_template(conversation=[{"role": "assistant", "content": r"%%%%%%%%%%%%%%%%"}], tokenize=False).split( r"%%%%%%%%%%%%%%%%")[0]
print("Evaluating...")
batch_size = int(args.batch_size)
correct_answers = 0
total_answers = 0
color_mismatches = 0
# pre-allocate cuda buffers
inputs = trained_tokenizer([""] * batch_size, return_tensors="pt", max_length=CTX_SIZE, padding="max_length", truncation=True)
inputs = {k: v.to(trained_model.device) for k, v in inputs.items()}
with torch.no_grad():
outputs = trained_model(**inputs)
failed_examples = []
with tqdm(total=len(dataset), desc="Accuracy") as pbar:
for batch_start in range(0, len(dataset), batch_size):
batch = dataset[batch_start:batch_start + batch_size]
if "text" in batch:
prompts = [ example.split(split)[0] + split for example in batch["text"] ]
expected_responses = [ example.split(split)[1] for example in batch["text"] ]
else:
prompts = []
expected_responses = []
for example in batch["conversations"]:
conversation = [ { "role": x["from"], "content": x["value"] } for x in example if x["from"] != "assistant"]
prompts.append(trained_tokenizer.apply_chat_template(
conversation=conversation,
max_length=CTX_SIZE,
truncation=True,
tokenize=False,
add_generation_prompt=True,
))
expected_responses.append([x["value"] for x in example if x["from"] == "assistant"][0])
output = generate(trained_model, trained_tokenizer, prompts)
for model_output, expected_response in zip(output, expected_responses):
response = model_output.replace(trained_tokenizer.pad_token, "").replace(trained_tokenizer.eos_token, "").split(split)[1]
expected_service_calls = []
for block in service_call_regex.findall(expected_response.strip()):
for line in block.split("\n"):
if len(line) == 0:
continue
expected_service_calls.append(json.loads(line))
total_answers = total_answers + 1
for block in service_call_regex.findall(response.strip()):
for line in block.split("\n"):
if len(line) == 0:
continue
try:
json_output = json.loads(line)
except:
failed_examples.append({ "expected": expected_response, "actual": response, "invalid_json": True })
continue
if json_output in expected_service_calls:
expected_service_calls.pop(expected_service_calls.index(json_output))
correct_answers = correct_answers + 1
elif "rgb_color" in json_output:
for sc in expected_service_calls:
sc = { **sc }
json_output_copy = { **json_output }
if not "rgb_color" in sc:
continue
del sc["rgb_color"]
del json_output_copy["rgb_color"]
if sc == json_output_copy:
correct_answers = correct_answers + 1
color_mismatches = color_mismatches + 1
else:
failed_examples.append({ "expected": expected_response, "actual": response })
else:
failed_examples.append({ "expected": expected_response, "actual": response })
pbar.update(batch_size)
pbar.set_description(f"Accuracy: {correct_answers/total_answers*100:.2f}% ({correct_answers}/{total_answers})")
accuracy = correct_answers/total_answers
print(f"Final Accuracy Rating: {accuracy*100:.2f}%")
print(f"Color Mismatches: {color_mismatches}")
with open(os.path.join(output_folder, "eval_results.json"), "w") as f:
json.dump({
"possible_answers": total_answers,
"correct_answers": correct_answers,
"accuracy": accuracy,
"color_mismatches": color_mismatches,
"failed_examples": failed_examples,
}, f, indent=4)
if __name__ == "__main__":
main()