mirror of
https://github.com/nod-ai/AMD-SHARK-Studio.git
synced 2026-04-03 03:00:17 -04:00
* Add H2OGPT * Add UI tab for h2ogpt * Add source files from h2ogpt * Add the rest of the files * Add h2ogpt support * Add SHARK Compilation support for langchain model for cli mode --------- Co-authored-by: George Petterson <gpetters@protonmail.com>
381 lines
13 KiB
Python
381 lines
13 KiB
Python
import inspect
|
|
import os
|
|
from functools import partial
|
|
from typing import Dict, Any, Optional, List
|
|
from langchain.callbacks.manager import CallbackManagerForLLMRun
|
|
from pydantic import root_validator
|
|
from langchain.llms import gpt4all
|
|
from dotenv import dotenv_values
|
|
|
|
from utils import FakeTokenizer
|
|
|
|
|
|
def get_model_tokenizer_gpt4all(base_model, **kwargs):
|
|
# defaults (some of these are generation parameters, so need to be passed in at generation time)
|
|
model_kwargs = dict(
|
|
n_threads=os.cpu_count() // 2,
|
|
temp=kwargs.get("temperature", 0.2),
|
|
top_p=kwargs.get("top_p", 0.75),
|
|
top_k=kwargs.get("top_k", 40),
|
|
n_ctx=2048 - 256,
|
|
)
|
|
env_gpt4all_file = ".env_gpt4all"
|
|
model_kwargs.update(dotenv_values(env_gpt4all_file))
|
|
# make int or float if can to satisfy types for class
|
|
for k, v in model_kwargs.items():
|
|
try:
|
|
if float(v) == int(v):
|
|
model_kwargs[k] = int(v)
|
|
else:
|
|
model_kwargs[k] = float(v)
|
|
except:
|
|
pass
|
|
|
|
if base_model == "llama":
|
|
if "model_path_llama" not in model_kwargs:
|
|
raise ValueError("No model_path_llama in %s" % env_gpt4all_file)
|
|
model_path = model_kwargs.pop("model_path_llama")
|
|
# FIXME: GPT4All version of llama doesn't handle new quantization, so use llama_cpp_python
|
|
from llama_cpp import Llama
|
|
|
|
# llama sets some things at init model time, not generation time
|
|
func_names = list(inspect.signature(Llama.__init__).parameters)
|
|
model_kwargs = {
|
|
k: v for k, v in model_kwargs.items() if k in func_names
|
|
}
|
|
model_kwargs["n_ctx"] = int(model_kwargs["n_ctx"])
|
|
model = Llama(model_path=model_path, **model_kwargs)
|
|
elif base_model in "gpt4all_llama":
|
|
if (
|
|
"model_name_gpt4all_llama" not in model_kwargs
|
|
and "model_path_gpt4all_llama" not in model_kwargs
|
|
):
|
|
raise ValueError(
|
|
"No model_name_gpt4all_llama or model_path_gpt4all_llama in %s"
|
|
% env_gpt4all_file
|
|
)
|
|
model_name = model_kwargs.pop("model_name_gpt4all_llama")
|
|
model_type = "llama"
|
|
from gpt4all import GPT4All as GPT4AllModel
|
|
|
|
model = GPT4AllModel(model_name=model_name, model_type=model_type)
|
|
elif base_model in "gptj":
|
|
if (
|
|
"model_name_gptj" not in model_kwargs
|
|
and "model_path_gptj" not in model_kwargs
|
|
):
|
|
raise ValueError(
|
|
"No model_name_gpt4j or model_path_gpt4j in %s"
|
|
% env_gpt4all_file
|
|
)
|
|
model_name = model_kwargs.pop("model_name_gptj")
|
|
model_type = "gptj"
|
|
from gpt4all import GPT4All as GPT4AllModel
|
|
|
|
model = GPT4AllModel(model_name=model_name, model_type=model_type)
|
|
else:
|
|
raise ValueError("No such base_model %s" % base_model)
|
|
return model, FakeTokenizer(), "cpu"
|
|
|
|
|
|
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
|
|
|
|
|
|
class H2OStreamingStdOutCallbackHandler(StreamingStdOutCallbackHandler):
|
|
def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
|
|
"""Run on new LLM token. Only available when streaming is enabled."""
|
|
# streaming to std already occurs without this
|
|
# sys.stdout.write(token)
|
|
# sys.stdout.flush()
|
|
pass
|
|
|
|
|
|
def get_model_kwargs(env_kwargs, default_kwargs, cls, exclude_list=[]):
|
|
# default from class
|
|
model_kwargs = {
|
|
k: v.default
|
|
for k, v in dict(inspect.signature(cls).parameters).items()
|
|
if k not in exclude_list
|
|
}
|
|
# from our defaults
|
|
model_kwargs.update(default_kwargs)
|
|
# from user defaults
|
|
model_kwargs.update(env_kwargs)
|
|
# ensure only valid keys
|
|
func_names = list(inspect.signature(cls).parameters)
|
|
model_kwargs = {k: v for k, v in model_kwargs.items() if k in func_names}
|
|
return model_kwargs
|
|
|
|
|
|
def get_llm_gpt4all(
|
|
model_name,
|
|
model=None,
|
|
max_new_tokens=256,
|
|
temperature=0.1,
|
|
repetition_penalty=1.0,
|
|
top_k=40,
|
|
top_p=0.7,
|
|
streaming=False,
|
|
callbacks=None,
|
|
prompter=None,
|
|
verbose=False,
|
|
):
|
|
assert prompter is not None
|
|
env_gpt4all_file = ".env_gpt4all"
|
|
env_kwargs = dotenv_values(env_gpt4all_file)
|
|
n_ctx = env_kwargs.pop("n_ctx", 2048 - max_new_tokens)
|
|
default_kwargs = dict(
|
|
context_erase=0.5,
|
|
n_batch=1,
|
|
n_ctx=n_ctx,
|
|
n_predict=max_new_tokens,
|
|
repeat_last_n=64 if repetition_penalty != 1.0 else 0,
|
|
repeat_penalty=repetition_penalty,
|
|
temp=temperature,
|
|
temperature=temperature,
|
|
top_k=top_k,
|
|
top_p=top_p,
|
|
use_mlock=True,
|
|
verbose=verbose,
|
|
)
|
|
if model_name == "llama":
|
|
cls = H2OLlamaCpp
|
|
model_path = (
|
|
env_kwargs.pop("model_path_llama") if model is None else model
|
|
)
|
|
model_kwargs = get_model_kwargs(
|
|
env_kwargs, default_kwargs, cls, exclude_list=["lc_kwargs"]
|
|
)
|
|
model_kwargs.update(
|
|
dict(
|
|
model_path=model_path,
|
|
callbacks=callbacks,
|
|
streaming=streaming,
|
|
prompter=prompter,
|
|
)
|
|
)
|
|
llm = cls(**model_kwargs)
|
|
llm.client.verbose = verbose
|
|
elif model_name == "gpt4all_llama":
|
|
cls = H2OGPT4All
|
|
model_path = (
|
|
env_kwargs.pop("model_path_gpt4all_llama")
|
|
if model is None
|
|
else model
|
|
)
|
|
model_kwargs = get_model_kwargs(
|
|
env_kwargs, default_kwargs, cls, exclude_list=["lc_kwargs"]
|
|
)
|
|
model_kwargs.update(
|
|
dict(
|
|
model=model_path,
|
|
backend="llama",
|
|
callbacks=callbacks,
|
|
streaming=streaming,
|
|
prompter=prompter,
|
|
)
|
|
)
|
|
llm = cls(**model_kwargs)
|
|
elif model_name == "gptj":
|
|
cls = H2OGPT4All
|
|
model_path = (
|
|
env_kwargs.pop("model_path_gptj") if model is None else model
|
|
)
|
|
model_kwargs = get_model_kwargs(
|
|
env_kwargs, default_kwargs, cls, exclude_list=["lc_kwargs"]
|
|
)
|
|
model_kwargs.update(
|
|
dict(
|
|
model=model_path,
|
|
backend="gptj",
|
|
callbacks=callbacks,
|
|
streaming=streaming,
|
|
prompter=prompter,
|
|
)
|
|
)
|
|
llm = cls(**model_kwargs)
|
|
else:
|
|
raise RuntimeError("No such model_name %s" % model_name)
|
|
return llm
|
|
|
|
|
|
class H2OGPT4All(gpt4all.GPT4All):
|
|
model: Any
|
|
prompter: Any
|
|
"""Path to the pre-trained GPT4All model file."""
|
|
|
|
@root_validator()
|
|
def validate_environment(cls, values: Dict) -> Dict:
|
|
"""Validate that the python package exists in the environment."""
|
|
try:
|
|
if isinstance(values["model"], str):
|
|
from gpt4all import GPT4All as GPT4AllModel
|
|
|
|
full_path = values["model"]
|
|
model_path, delimiter, model_name = full_path.rpartition("/")
|
|
model_path += delimiter
|
|
|
|
values["client"] = GPT4AllModel(
|
|
model_name=model_name,
|
|
model_path=model_path or None,
|
|
model_type=values["backend"],
|
|
allow_download=False,
|
|
)
|
|
if values["n_threads"] is not None:
|
|
# set n_threads
|
|
values["client"].model.set_thread_count(
|
|
values["n_threads"]
|
|
)
|
|
else:
|
|
values["client"] = values["model"]
|
|
try:
|
|
values["backend"] = values["client"].model_type
|
|
except AttributeError:
|
|
# The below is for compatibility with GPT4All Python bindings <= 0.2.3.
|
|
values["backend"] = values["client"].model.model_type
|
|
|
|
except ImportError:
|
|
raise ValueError(
|
|
"Could not import gpt4all python package. "
|
|
"Please install it with `pip install gpt4all`."
|
|
)
|
|
return values
|
|
|
|
def _call(
|
|
self,
|
|
prompt: str,
|
|
stop: Optional[List[str]] = None,
|
|
run_manager: Optional[CallbackManagerForLLMRun] = None,
|
|
**kwargs,
|
|
) -> str:
|
|
# Roughly 4 chars per token if natural language
|
|
prompt = prompt[-self.n_ctx * 4 :]
|
|
|
|
# use instruct prompting
|
|
data_point = dict(context="", instruction=prompt, input="")
|
|
prompt = self.prompter.generate_prompt(data_point)
|
|
|
|
verbose = False
|
|
if verbose:
|
|
print("_call prompt: %s" % prompt, flush=True)
|
|
# FIXME: GPT4ALl doesn't support yield during generate, so cannot support streaming except via itself to stdout
|
|
return super()._call(prompt, stop=stop, run_manager=run_manager)
|
|
|
|
|
|
from langchain.llms import LlamaCpp
|
|
|
|
|
|
class H2OLlamaCpp(LlamaCpp):
|
|
model_path: Any
|
|
prompter: Any
|
|
"""Path to the pre-trained GPT4All model file."""
|
|
|
|
@root_validator()
|
|
def validate_environment(cls, values: Dict) -> Dict:
|
|
"""Validate that llama-cpp-python library is installed."""
|
|
if isinstance(values["model_path"], str):
|
|
model_path = values["model_path"]
|
|
model_param_names = [
|
|
"lora_path",
|
|
"lora_base",
|
|
"n_ctx",
|
|
"n_parts",
|
|
"seed",
|
|
"f16_kv",
|
|
"logits_all",
|
|
"vocab_only",
|
|
"use_mlock",
|
|
"n_threads",
|
|
"n_batch",
|
|
"use_mmap",
|
|
"last_n_tokens_size",
|
|
]
|
|
model_params = {k: values[k] for k in model_param_names}
|
|
# For backwards compatibility, only include if non-null.
|
|
if values["n_gpu_layers"] is not None:
|
|
model_params["n_gpu_layers"] = values["n_gpu_layers"]
|
|
|
|
try:
|
|
from llama_cpp import Llama
|
|
|
|
values["client"] = Llama(model_path, **model_params)
|
|
except ImportError:
|
|
raise ModuleNotFoundError(
|
|
"Could not import llama-cpp-python library. "
|
|
"Please install the llama-cpp-python library to "
|
|
"use this embedding model: pip install llama-cpp-python"
|
|
)
|
|
except Exception as e:
|
|
raise ValueError(
|
|
f"Could not load Llama model from path: {model_path}. "
|
|
f"Received error {e}"
|
|
)
|
|
else:
|
|
values["client"] = values["model_path"]
|
|
return values
|
|
|
|
def _call(
|
|
self,
|
|
prompt: str,
|
|
stop: Optional[List[str]] = None,
|
|
run_manager: Optional[CallbackManagerForLLMRun] = None,
|
|
**kwargs,
|
|
) -> str:
|
|
verbose = False
|
|
# tokenize twice, just to count tokens, since llama cpp python wrapper has no way to truncate
|
|
# still have to avoid crazy sizes, else hit llama_tokenize: too many tokens -- might still hit, not fatal
|
|
prompt = prompt[-self.n_ctx * 4 :]
|
|
prompt_tokens = self.client.tokenize(b" " + prompt.encode("utf-8"))
|
|
num_prompt_tokens = len(prompt_tokens)
|
|
if num_prompt_tokens > self.n_ctx:
|
|
# conservative by using int()
|
|
chars_per_token = int(len(prompt) / num_prompt_tokens)
|
|
prompt = prompt[-self.n_ctx * chars_per_token :]
|
|
if verbose:
|
|
print(
|
|
"reducing tokens, assuming average of %s chars/token: %s"
|
|
% chars_per_token,
|
|
flush=True,
|
|
)
|
|
prompt_tokens2 = self.client.tokenize(
|
|
b" " + prompt.encode("utf-8")
|
|
)
|
|
num_prompt_tokens2 = len(prompt_tokens2)
|
|
print(
|
|
"reduced tokens from %d -> %d"
|
|
% (num_prompt_tokens, num_prompt_tokens2),
|
|
flush=True,
|
|
)
|
|
|
|
# use instruct prompting
|
|
data_point = dict(context="", instruction=prompt, input="")
|
|
prompt = self.prompter.generate_prompt(data_point)
|
|
|
|
if verbose:
|
|
print("_call prompt: %s" % prompt, flush=True)
|
|
|
|
if self.streaming:
|
|
text_callback = None
|
|
if run_manager:
|
|
text_callback = partial(
|
|
run_manager.on_llm_new_token, verbose=self.verbose
|
|
)
|
|
# parent handler of streamer expects to see prompt first else output="" and lose if prompt=None in prompter
|
|
if text_callback:
|
|
text_callback(prompt)
|
|
text = ""
|
|
for token in self.stream(
|
|
prompt=prompt, stop=stop, run_manager=run_manager
|
|
):
|
|
text_chunk = token["choices"][0]["text"]
|
|
# self.stream already calls text_callback
|
|
# if text_callback:
|
|
# text_callback(text_chunk)
|
|
text += text_chunk
|
|
return text
|
|
else:
|
|
params = self._get_parameters(stop)
|
|
params = {**params, **kwargs}
|
|
result = self.client(prompt=prompt, **params)
|
|
return result["choices"][0]["text"]
|