mirror of
https://github.com/tinygrad/tinygrad.git
synced 2026-01-09 15:08:02 -05:00
* llm: add created/model fields, non-streaming support, and tests - Add `created` timestamp and `model` fields to response (required by OpenAI spec) - Add non-streaming mode support for /v1/chat/completions - Add `send_data` helper to HTTPRequestHandler for responses with Content-Length - Refactor viz/serve.py to use send_data - Add integration tests using real OpenAI client 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * add openai to testing * toml * Remove 'openai' from dependencies Removed 'openai' from the dependencies list. * bump cache --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
137 lines
4.3 KiB
Python
137 lines
4.3 KiB
Python
import unittest, threading, time
|
|
from unittest.mock import Mock
|
|
|
|
class TestLLMServer(unittest.TestCase):
|
|
"""Integration tests using the real OpenAI client."""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.mock_tok = Mock()
|
|
cls.mock_tok.role = Mock(return_value=[100, 101])
|
|
cls.mock_tok.encode = Mock(return_value=[200, 201, 202])
|
|
cls.mock_tok.decode = Mock(return_value="Hello")
|
|
|
|
cls.mock_model = Mock()
|
|
cls.mock_model.generate = Mock(side_effect=lambda ids, **kwargs: iter([300, 301, 999]))
|
|
|
|
cls.bos_id = 1
|
|
cls.eos_id = 999
|
|
|
|
import tinygrad.apps.llm as llm_module
|
|
llm_module.model = cls.mock_model
|
|
llm_module.tok = cls.mock_tok
|
|
llm_module.bos_id = cls.bos_id
|
|
llm_module.eos_id = cls.eos_id
|
|
|
|
from tinygrad.apps.llm import Handler
|
|
from tinygrad.helpers import TCPServerWithReuse
|
|
|
|
cls.port = 11435
|
|
cls.server = TCPServerWithReuse(('127.0.0.1', cls.port), Handler)
|
|
cls.server_thread = threading.Thread(target=cls.server.serve_forever, daemon=True)
|
|
cls.server_thread.start()
|
|
time.sleep(0.1)
|
|
|
|
from openai import OpenAI
|
|
cls.client = OpenAI(base_url=f"http://127.0.0.1:{cls.port}/v1", api_key="test")
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
cls.server.shutdown()
|
|
cls.server.server_close()
|
|
|
|
def test_chat_completion_stream(self):
|
|
stream = self.client.chat.completions.create(
|
|
model="test",
|
|
messages=[{"role": "user", "content": "Hello"}],
|
|
stream=True
|
|
)
|
|
|
|
chunks = list(stream)
|
|
self.assertGreater(len(chunks), 0)
|
|
self.assertEqual(chunks[0].choices[0].delta.role, "assistant")
|
|
self.assertEqual(chunks[-1].choices[0].finish_reason, "stop")
|
|
|
|
def test_openai_response_structure(self):
|
|
stream = self.client.chat.completions.create(
|
|
model="test-model",
|
|
messages=[{"role": "user", "content": "Test"}],
|
|
stream=True
|
|
)
|
|
|
|
for chunk in stream:
|
|
self.assertTrue(chunk.id.startswith("chatcmpl-"))
|
|
self.assertEqual(chunk.object, "chat.completion.chunk")
|
|
self.assertIsNotNone(chunk.choices)
|
|
self.assertIsNotNone(chunk.created)
|
|
self.assertIsInstance(chunk.created, int)
|
|
self.assertEqual(chunk.model, "test-model")
|
|
|
|
def test_stream_with_usage(self):
|
|
stream = self.client.chat.completions.create(
|
|
model="test",
|
|
messages=[{"role": "user", "content": "Hello"}],
|
|
stream=True,
|
|
stream_options={"include_usage": True}
|
|
)
|
|
|
|
chunks = list(stream)
|
|
last_chunk = chunks[-1]
|
|
|
|
self.assertIsNotNone(last_chunk.usage)
|
|
self.assertIsNotNone(last_chunk.usage.prompt_tokens)
|
|
self.assertIsNotNone(last_chunk.usage.completion_tokens)
|
|
self.assertIsNotNone(last_chunk.usage.total_tokens)
|
|
|
|
def test_multi_turn_conversation(self):
|
|
stream = self.client.chat.completions.create(
|
|
model="test",
|
|
messages=[
|
|
{"role": "system", "content": "You are helpful."},
|
|
{"role": "user", "content": "Hello"},
|
|
{"role": "assistant", "content": "Hi!"},
|
|
{"role": "user", "content": "How are you?"}
|
|
],
|
|
stream=True
|
|
)
|
|
|
|
chunks = list(stream)
|
|
self.assertGreater(len(chunks), 0)
|
|
self.assertEqual(chunks[-1].choices[0].finish_reason, "stop")
|
|
|
|
def test_content_is_streamed(self):
|
|
stream = self.client.chat.completions.create(
|
|
model="test",
|
|
messages=[{"role": "user", "content": "Hello"}],
|
|
stream=True
|
|
)
|
|
|
|
contents = []
|
|
for chunk in stream:
|
|
if chunk.choices and chunk.choices[0].delta.content:
|
|
contents.append(chunk.choices[0].delta.content)
|
|
|
|
self.assertGreater(len(contents), 0)
|
|
|
|
def test_non_streaming(self):
|
|
resp = self.client.chat.completions.create(
|
|
model="test-model",
|
|
messages=[{"role": "user", "content": "Hello"}],
|
|
stream=False
|
|
)
|
|
|
|
self.assertTrue(resp.id.startswith("chatcmpl-"))
|
|
self.assertEqual(resp.object, "chat.completion")
|
|
self.assertEqual(resp.model, "test-model")
|
|
self.assertIsNotNone(resp.created)
|
|
self.assertEqual(len(resp.choices), 1)
|
|
self.assertEqual(resp.choices[0].message.role, "assistant")
|
|
self.assertIsNotNone(resp.choices[0].message.content)
|
|
self.assertEqual(resp.choices[0].finish_reason, "stop")
|
|
self.assertIsNotNone(resp.usage)
|
|
self.assertIsNotNone(resp.usage.prompt_tokens)
|
|
self.assertIsNotNone(resp.usage.completion_tokens)
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|