mirror of
https://github.com/nod-ai/SHARK-Studio.git
synced 2026-01-08 21:38:04 -05:00
* Add H2OGPT * Add UI tab for h2ogpt * Add source files from h2ogpt * Add the rest of the files * Add h2ogpt support * Add SHARK Compilation support for langchain model for cli mode --------- Co-authored-by: George Petterson <gpetters@protonmail.com>
70 lines
2.2 KiB
Python
70 lines
2.2 KiB
Python
from typing import Any, Dict, List, Union, Optional
|
|
import time
|
|
import queue
|
|
|
|
from langchain.callbacks.base import BaseCallbackHandler
|
|
from langchain.schema import LLMResult
|
|
|
|
|
|
class StreamingGradioCallbackHandler(BaseCallbackHandler):
|
|
"""
|
|
Similar to H2OTextIteratorStreamer that is for HF backend, but here LangChain backend
|
|
"""
|
|
|
|
def __init__(self, timeout: Optional[float] = None, block=True):
|
|
super().__init__()
|
|
self.text_queue = queue.SimpleQueue()
|
|
self.stop_signal = None
|
|
self.do_stop = False
|
|
self.timeout = timeout
|
|
self.block = block
|
|
|
|
def on_llm_start(
|
|
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
|
|
) -> None:
|
|
"""Run when LLM starts running. Clean the queue."""
|
|
while not self.text_queue.empty():
|
|
try:
|
|
self.text_queue.get(block=False)
|
|
except queue.Empty:
|
|
continue
|
|
|
|
def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
|
|
"""Run on new LLM token. Only available when streaming is enabled."""
|
|
self.text_queue.put(token)
|
|
|
|
def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
|
|
"""Run when LLM ends running."""
|
|
self.text_queue.put(self.stop_signal)
|
|
|
|
def on_llm_error(
|
|
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
|
|
) -> None:
|
|
"""Run when LLM errors."""
|
|
self.text_queue.put(self.stop_signal)
|
|
|
|
def __iter__(self):
|
|
return self
|
|
|
|
def __next__(self):
|
|
while True:
|
|
try:
|
|
value = (
|
|
self.stop_signal
|
|
) # value looks unused in pycharm, not true
|
|
if self.do_stop:
|
|
print("hit stop", flush=True)
|
|
# could raise or break, maybe best to raise and make parent see if any exception in thread
|
|
raise StopIteration()
|
|
# break
|
|
value = self.text_queue.get(
|
|
block=self.block, timeout=self.timeout
|
|
)
|
|
break
|
|
except queue.Empty:
|
|
time.sleep(0.01)
|
|
if value == self.stop_signal:
|
|
raise StopIteration()
|
|
else:
|
|
return value
|