mirror of
https://github.com/acon96/home-llm.git
synced 2026-01-07 21:04:08 -05:00
369 lines
15 KiB
Python
369 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import argparse, os, re, json, csv, random
|
|
import torch
|
|
from datasets import load_dataset
|
|
from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig
|
|
from peft import PeftConfig, PeftModel
|
|
from tqdm import tqdm
|
|
|
|
torch.set_default_device("cuda")
|
|
|
|
CTX_SIZE = 2048
|
|
TRUST_REMOTE_CODE = False
|
|
|
|
|
|
"""
|
|
python3 evaluate.py stablehome-1_6b-rev3 --batch-size 8 --all-checkpoints
|
|
python3 evaluate.py tinyhome-rev1 --batch-size 12 --all-checkpoints
|
|
python3 evaluate.py stablehome-3b-rev6 --batch-size 4 --lora --overwrite
|
|
"""
|
|
|
|
service_call_regex = re.compile(r"```homeassistant\n([\S \t\n]*?)```")
|
|
json_regex = re.compile(r"({[\S \t]*?})")
|
|
service_names_regex = re.compile(r"\b\w+\.\w+\([^)]*\)")
|
|
entity_ids_regex = re.compile(r"\b\w+\.\w+(?=\s'|\s=)")
|
|
|
|
try:
|
|
with open("custom_components/llama_conversation/in_context_examples.csv", encoding="utf-8-sig") as f:
|
|
in_context_examples = list(csv.DictReader(f))
|
|
except:
|
|
in_context_examples = []
|
|
|
|
def icl_example_generator(num_examples, entity_names, service_names):
|
|
entity_domains = set([x.split(".")[0] for x in entity_names])
|
|
entity_names = entity_names[:]
|
|
|
|
# filter out examples for disabled services
|
|
selected_in_context_examples = []
|
|
for x in in_context_examples:
|
|
if x["service"] in service_names and x["service"].split(".")[0] in entity_domains:
|
|
selected_in_context_examples.append(x)
|
|
|
|
# if we filtered everything then just sample randomly
|
|
if len(selected_in_context_examples) == 0:
|
|
selected_in_context_examples = in_context_examples[:]
|
|
|
|
random.shuffle(selected_in_context_examples)
|
|
random.shuffle(entity_names)
|
|
|
|
num_examples_to_generate = min(num_examples, len(selected_in_context_examples))
|
|
if num_examples_to_generate < num_examples:
|
|
print(f"Attempted to generate {num_examples} ICL examples for conversation, but only {len(selected_in_context_examples)} are available!")
|
|
|
|
results = []
|
|
while len(results) < num_examples_to_generate:
|
|
if len(selected_in_context_examples) == 0:
|
|
break
|
|
|
|
chosen_example = selected_in_context_examples.pop()
|
|
chosen_service = chosen_example["service"]
|
|
potential_devices = [ x for x in entity_names if x.split(".")[0] == chosen_service.split(".")[0] ]
|
|
|
|
if len(potential_devices) == 0:
|
|
continue
|
|
else:
|
|
example = {
|
|
"to_say": chosen_example["response"],
|
|
"service": chosen_service,
|
|
"target_device": potential_devices[0],
|
|
}
|
|
results.insert(0, json.dumps(example))
|
|
|
|
return "\n".join(results)
|
|
|
|
def tokenize(tokenizer, prompt):
|
|
return tokenizer(prompt, return_tensors="pt", padding=True, truncation=True, max_length=CTX_SIZE)
|
|
|
|
def generate(model, tokenizer, prompts):
|
|
inputs = tokenize(tokenizer, prompts)
|
|
with torch.no_grad():
|
|
outputs = model.generate(**inputs)
|
|
text = tokenizer.batch_decode(outputs)
|
|
return text
|
|
|
|
def evaluate(output_folder, trained_model, trained_tokenizer, dataset, batch_size, use_icl):
|
|
# split = trained_tokenizer.apply_chat_template(conversation=[{"role": "assistant", "content": r"%%%%%%%%%%%%%%%%"}], tokenize=False).split( r"%%%%%%%%%%%%%%%%")[0].replace(trained_tokenizer.bos_token, "")
|
|
split = "<|start_header_id|>assistant<|end_header_id|>"
|
|
|
|
print("Evaluating...")
|
|
correct_answers = 0
|
|
total_answers = 0
|
|
color_mismatches = 0
|
|
|
|
# pre-allocate cuda buffers
|
|
inputs = trained_tokenizer([""] * batch_size, return_tensors="pt", max_length=CTX_SIZE, padding="max_length", truncation=True)
|
|
inputs = {k: v.to(trained_model.device) for k, v in inputs.items()}
|
|
with torch.no_grad():
|
|
outputs = trained_model(**inputs)
|
|
|
|
failed_examples = []
|
|
with tqdm(total=len(dataset), desc="Accuracy") as pbar:
|
|
for batch_start in range(0, len(dataset), batch_size):
|
|
batch = dataset[batch_start:batch_start + batch_size]
|
|
if "text" in batch:
|
|
prompts = [ example.split(split)[0] + split for example in batch["text"] ]
|
|
expected_responses = [ example.split(split)[1] for example in batch["text"] ]
|
|
else:
|
|
prompts = []
|
|
expected_responses = []
|
|
for example in batch["conversations"]:
|
|
conversation = [ { "role": x["from"], "content": x["value"] } for x in example if x["from"] != "assistant"]
|
|
|
|
if use_icl:
|
|
new_conversation = []
|
|
for turn in conversation:
|
|
if turn["role"] == "system":
|
|
entity_names = entity_ids_regex.findall(turn["content"])
|
|
service_names = [ x.split("(")[0] for x in service_names_regex.findall(turn["content"]) ]
|
|
icl_examples = icl_example_generator(5, entity_names, service_names)
|
|
turn["content"] = turn["content"] + "Respond to the following user instruction by responding in the same format as the following examples:\n" + icl_examples
|
|
new_conversation.append(turn)
|
|
conversation = new_conversation
|
|
|
|
prompts.append(trained_tokenizer.apply_chat_template(
|
|
conversation=conversation,
|
|
max_length=CTX_SIZE,
|
|
truncation=True,
|
|
tokenize=False,
|
|
add_generation_prompt=True,
|
|
))
|
|
|
|
if use_icl:
|
|
response = [x["value"] for x in example if x["from"] == "assistant"][0]
|
|
expected_calls = service_call_regex.findall(response)
|
|
to_say = service_call_regex.sub("", response)
|
|
expected_responses.append(expected_calls[0])
|
|
else:
|
|
expected_responses.append([x["value"] for x in example if x["from"] == "assistant"][0])
|
|
output = generate(trained_model, trained_tokenizer, prompts)
|
|
|
|
for model_output, expected_response in zip(output, expected_responses):
|
|
response = model_output.replace(trained_tokenizer.pad_token, "").replace(trained_tokenizer.eos_token, "").split(split)[1].strip()
|
|
|
|
expected_service_calls = []
|
|
|
|
if use_icl:
|
|
regex_to_use = json_regex
|
|
else:
|
|
regex_to_use = service_call_regex
|
|
|
|
for block in regex_to_use.findall(expected_response.strip()):
|
|
for line in block.split("\n"):
|
|
if len(line) == 0:
|
|
continue
|
|
expected_service_calls.append(json.loads(line))
|
|
total_answers = total_answers + 1
|
|
|
|
found_responses = regex_to_use.findall(response.strip())
|
|
|
|
if len(expected_service_calls) == 0:
|
|
total_answers = total_answers + 1
|
|
if len(found_responses) == 0:
|
|
correct_answers = correct_answers + 1
|
|
continue
|
|
else:
|
|
failed_examples.append({ "expected": expected_response, "actual": response, "extra_response": True })
|
|
continue
|
|
|
|
if len(found_responses) == 0:
|
|
failed_examples.append({ "expected": expected_response, "actual": response, "no_response_found": True })
|
|
continue
|
|
|
|
for block in found_responses:
|
|
for line in block.split("\n"):
|
|
if len(line) == 0:
|
|
continue
|
|
try:
|
|
json_output = json.loads(line)
|
|
except:
|
|
failed_examples.append({ "expected": expected_response, "actual": response, "invalid_json": True })
|
|
continue
|
|
|
|
if use_icl:
|
|
json_output.pop("to_say")
|
|
|
|
if json_output in expected_service_calls:
|
|
expected_service_calls.pop(expected_service_calls.index(json_output))
|
|
correct_answers = correct_answers + 1
|
|
elif "rgb_color" in json_output:
|
|
for sc in expected_service_calls:
|
|
sc = { **sc }
|
|
json_output_copy = { **json_output }
|
|
if not "rgb_color" in sc:
|
|
continue
|
|
del sc["rgb_color"]
|
|
del json_output_copy["rgb_color"]
|
|
if sc == json_output_copy:
|
|
correct_answers = correct_answers + 1
|
|
color_mismatches = color_mismatches + 1
|
|
else:
|
|
failed_examples.append({ "expected": expected_response, "actual": response })
|
|
else:
|
|
failed_examples.append({ "expected": expected_response, "actual": response })
|
|
|
|
pbar.update(batch_size)
|
|
pbar.set_description(f"Accuracy: {correct_answers/total_answers*100:.2f}% ({correct_answers}/{total_answers})")
|
|
|
|
accuracy = correct_answers/total_answers
|
|
print(f"Final Accuracy Rating: {accuracy*100:.2f}%")
|
|
print(f"Color Mismatches: {color_mismatches}")
|
|
|
|
with open(os.path.join(output_folder, "eval_results.json"), "w") as f:
|
|
json.dump({
|
|
"possible_answers": total_answers,
|
|
"correct_answers": correct_answers,
|
|
"accuracy": accuracy,
|
|
"color_mismatches": color_mismatches,
|
|
"failed_examples": failed_examples,
|
|
}, f, indent=4)
|
|
|
|
def load_model(model_name, is_lora, is_hf, load_in_8bit, checkpoint_name):
|
|
lora_folder = f"./loras/{model_name}/"
|
|
model_folder = f"./models/{model_name}/"
|
|
|
|
# tokenizer isn't saved into checkpoint folders
|
|
tokenizer_folder = model_folder
|
|
|
|
if checkpoint_name:
|
|
lora_folder = lora_folder + f"{checkpoint_name}/"
|
|
model_folder = model_folder + f"{checkpoint_name}/"
|
|
|
|
if is_hf:
|
|
print(f"Loading model {model_name}...")
|
|
trained_model = AutoModelForCausalLM.from_pretrained(
|
|
model_name,
|
|
trust_remote_code=TRUST_REMOTE_CODE,
|
|
torch_dtype=torch.bfloat16,
|
|
load_in_8bit=load_in_8bit,
|
|
)
|
|
|
|
trained_tokenizer = AutoTokenizer.from_pretrained(
|
|
model_name,
|
|
trust_remote_code=TRUST_REMOTE_CODE,
|
|
padding_side='left',
|
|
)
|
|
elif is_lora:
|
|
adapter_config = PeftConfig.from_pretrained(lora_folder)
|
|
base_model_name = adapter_config.base_model_name_or_path
|
|
print(f"Loading lora from {lora_folder} ({base_model_name})...")
|
|
|
|
base_model = AutoModelForCausalLM.from_pretrained(
|
|
base_model_name,
|
|
trust_remote_code=TRUST_REMOTE_CODE,
|
|
torch_dtype=torch.bfloat16,
|
|
)
|
|
trained_model = PeftModel.from_pretrained(
|
|
base_model,
|
|
lora_folder,
|
|
trust_remote_code=TRUST_REMOTE_CODE,
|
|
torch_dtype=torch.bfloat16,
|
|
)
|
|
|
|
trained_tokenizer = AutoTokenizer.from_pretrained(
|
|
base_model_name,
|
|
trust_remote_code=TRUST_REMOTE_CODE,
|
|
padding_side='left',
|
|
)
|
|
else:
|
|
print(f"Loading model from {model_folder}...")
|
|
trained_model = AutoModelForCausalLM.from_pretrained(
|
|
model_folder,
|
|
trust_remote_code=TRUST_REMOTE_CODE,
|
|
torch_dtype=torch.bfloat16,
|
|
load_in_8bit=load_in_8bit,
|
|
)
|
|
|
|
trained_tokenizer = AutoTokenizer.from_pretrained(
|
|
tokenizer_folder,
|
|
trust_remote_code=TRUST_REMOTE_CODE,
|
|
padding_side='left',
|
|
)
|
|
|
|
eos_token_id_to_use = trained_model.config.eos_token_id
|
|
if len(eos_token_id_to_use) > 0:
|
|
eos_token_id_to_use = trained_model.config.eos_token_id[0]
|
|
|
|
pad_token_id_to_use = trained_model.config.pad_token_id
|
|
if not trained_tokenizer.pad_token:
|
|
trained_tokenizer.pad_token = trained_tokenizer.eos_token
|
|
|
|
if len(trained_model.config.eos_token_id) > 0:
|
|
pad_token_id_to_use = trained_model.config.eos_token_id[0]
|
|
else:
|
|
pad_token_id_to_use = trained_model.config.eos_token_id
|
|
|
|
trained_model.generation_config = GenerationConfig(
|
|
max_new_tokens=128,
|
|
use_cache=True,
|
|
do_sample=True,
|
|
temperature=0.1,
|
|
top_k=40,
|
|
top_p=1.0,
|
|
repetition_penalty=1.15,
|
|
eos_token_id=trained_model.config.eos_token_id,
|
|
# eos_token_id=128009,
|
|
pad_token_id=pad_token_id_to_use,
|
|
)
|
|
|
|
return trained_model, trained_tokenizer
|
|
|
|
def main():
|
|
global in_context_examples
|
|
parser = argparse.ArgumentParser(description="Evaluate the function calling for a model")
|
|
parser.add_argument("model")
|
|
parser.add_argument("--dataset-file", default="./data/home_assistant_test.jsonl")
|
|
parser.add_argument("--batch-size", default=8)
|
|
parser.add_argument("--lora", default=False, action='store_const', const=True)
|
|
parser.add_argument("--all-checkpoints", default=False, action='store_const', const=True)
|
|
parser.add_argument("--overwrite", default=False, action='store_const', const=True)
|
|
parser.add_argument("--hf", default=False, action='store_const', const=True)
|
|
parser.add_argument("--load-in-8bit", default=False, action='store_const', const=True)
|
|
|
|
args = parser.parse_args()
|
|
batch_size = int(args.batch_size)
|
|
|
|
dataset = load_dataset("json", data_files={ "train": args.dataset_file })["train"]
|
|
|
|
print(f"Got {len(dataset)} examples to test")
|
|
|
|
if args.hf:
|
|
output_folder = "./"
|
|
trained_model, trained_tokenizer = load_model(args.model, args.lora, True, args.load_in_8bit, None)
|
|
evaluate(output_folder, trained_model, trained_tokenizer, dataset, batch_size, True)
|
|
|
|
else:
|
|
model_folder = f"./loras/{args.model}/" if args.lora else f"./models/{args.model}/"
|
|
|
|
if not os.path.isdir(model_folder):
|
|
print(f"Model Not Found: {args.model}")
|
|
return
|
|
|
|
|
|
if not args.all_checkpoints:
|
|
checkpoints = [None]
|
|
else:
|
|
checkpoints = [x for x in os.listdir(model_folder) if os.path.isdir(os.path.join(model_folder, x)) and "checkpoint" in x]
|
|
checkpoints = sorted(checkpoints, key=lambda x: int(x.split('-')[-1]))
|
|
checkpoints.append(None)
|
|
|
|
print(f"Found {len(checkpoints) - 1} checkpoints to test (plus the final model)")
|
|
|
|
for ckpt in checkpoints:
|
|
if ckpt:
|
|
output_folder = os.path.join(model_folder, ckpt)
|
|
else:
|
|
output_folder = model_folder
|
|
|
|
output_filename = os.path.join(output_folder, "eval_results.json")
|
|
if os.path.exists(output_filename):
|
|
if not args.overwrite:
|
|
print(f"Evaluation already exists for {output_folder}. Skipping...")
|
|
continue
|
|
|
|
trained_model, trained_tokenizer = load_model(args.model, args.lora, False, False, ckpt)
|
|
evaluate(output_folder, trained_model, trained_tokenizer, dataset, batch_size, False)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |