Compare commits

...

14 Commits

Author SHA1 Message Date
github-actions[bot]
47cf24e19d Update version to v1.4.239 and commit 2025-07-07 18:58:38 +00:00
Kayvan Sylvan
3f07afbef4 Merge pull request #1592 from ksylvan/0707-possible-go-routine-race-condition-fix
Fix Streaming Error Handling in Chatter
2025-07-07 11:57:11 -07:00
Kayvan Sylvan
38d714dccd chore: improve error comparison in TestChatter_Send_StreamingErrorPropagation 2025-07-07 11:20:01 -07:00
Kayvan Sylvan
d0b5c95d61 chore: remove redundant channel closure in Send method
### CHANGES

- Remove redundant `close(responseChan)` in `Send` method
- Update `SendStream` to close `responseChan` properly
- Modify test to reflect channel closure logic
2025-07-07 11:02:04 -07:00
Kayvan Sylvan
f8f80ca206 chore: rename doneChan to done and add streaming aggregation test
## CHANGES

- Rename `doneChan` variable to `done` for consistency
- Add `streamChunks` field to mock vendor struct
- Implement chunk sending logic in mock SendStream method
- Add comprehensive streaming success aggregation test case
- Verify message aggregation from multiple stream chunks
- Test assistant response role and content validation
- Ensure proper session handling in streaming scenarios
2025-07-07 10:49:29 -07:00
Kayvan Sylvan
0af458872f feat: add test for Chatter's Send method error propagation
### CHANGES

- Implement mockVendor for testing ai.Vendor interface
- Add TestChatter_Send_StreamingErrorPropagation test case
- Verify error propagation in Chatter's Send method
- Ensure session returns even on streaming error
- Create temporary database for testing Chatter functionality
2025-07-07 10:36:40 -07:00
Kayvan Sylvan
24e46a6f37 chore: rename channels for clarity in Send method
### CHANGES

- Rename `done` to `doneChan` for clarity
- Adjust channel closure for `doneChan`
- Update channel listening logic to use `doneChan`
2025-07-07 10:28:54 -07:00
Kayvan Sylvan
d6a31e68b0 refactor: rename channel variable to responseChan for better clarity in streaming logic
## CHANGES

- Rename `channel` variable to `responseChan` for clarity
- Update channel references in goroutine defer statements
- Pass renamed channel to `SendStream` method call
- Maintain consistent naming throughout streaming flow
2025-07-07 10:23:42 -07:00
Kayvan Sylvan
b1013ca61b chore: close channel after sending stream in Send
### CHANGES

- Add `channel` closure after sending stream
- Ensure resource cleanup in `Send` method
2025-07-07 10:09:24 -07:00
Kayvan Sylvan
6b4ce946a5 chore: refactor error handling and response aggregation in Send
### CHANGES

- Simplify response aggregation loop in `Send`
- Remove redundant select case for closed channel
- Streamline error checking from `errChan`
- Ensure goroutine completion before returning
2025-07-07 09:39:58 -07:00
Kayvan Sylvan
2d2830e9c8 chore: enhance Chatter.Send method with proper goroutine synchronization
### CHANGES
- Add `done` channel to track goroutine completion.
- Replace `errChan` closure with `done` channel closure.
- Ensure main loop waits for goroutine on channel close.
- Synchronize error handling with `done` channel wait.
2025-07-07 09:09:04 -07:00
Kayvan Sylvan
115327fdab refactor: use select to handle stream and error channels concurrently
### CHANGES

- Replace for-range loop with a non-blocking select statement.
- Process message and error channels concurrently for better handling.
- Improve the robustness of streaming error detection.
- Exit loop cleanly when the message channel closes.
2025-07-07 08:37:31 -07:00
Kayvan Sylvan
e672f9b73f chore: simplify error handling in streaming chat response by removing unnecessary select statement 2025-07-07 08:15:24 -07:00
Kayvan Sylvan
ef4364a1aa fix: improve error handling in streaming chat functionality
## CHANGES

- Add dedicated error channel for stream operations
- Separate error handling from message streaming logic
- Check for streaming errors after channel closure
- Close error channel properly in goroutine cleanup
- Remove error messages from message stream channel
- Add proper error propagation for stream failures
2025-07-07 03:31:58 -07:00
4 changed files with 205 additions and 6 deletions

View File

@@ -66,17 +66,35 @@ func (o *Chatter) Send(request *common.ChatRequest, opts *common.ChatOptions) (s
message := ""
if o.Stream {
channel := make(chan string)
responseChan := make(chan string)
errChan := make(chan error, 1)
done := make(chan struct{})
go func() {
if streamErr := o.vendor.SendStream(session.GetVendorMessages(), opts, channel); streamErr != nil {
channel <- streamErr.Error()
defer close(done)
if streamErr := o.vendor.SendStream(session.GetVendorMessages(), opts, responseChan); streamErr != nil {
errChan <- streamErr
}
}()
for response := range channel {
for response := range responseChan {
message += response
fmt.Print(response)
}
// Wait for goroutine to finish
<-done
// Check for errors in errChan
select {
case streamErr := <-errChan:
if streamErr != nil {
err = streamErr
return
}
default:
// No errors, continue
}
} else {
if message, err = o.vendor.Send(context.Background(), session.GetVendorMessages(), opts); err != nil {
return

181
core/chatter_test.go Normal file
View File

@@ -0,0 +1,181 @@
package core
import (
"bytes"
"context"
"errors"
"testing"
"github.com/danielmiessler/fabric/chat"
"github.com/danielmiessler/fabric/common"
"github.com/danielmiessler/fabric/plugins/db/fsdb"
)
// mockVendor implements the ai.Vendor interface for testing
type mockVendor struct {
sendStreamError error
streamChunks []string
}
func (m *mockVendor) GetName() string {
return "mock"
}
func (m *mockVendor) GetSetupDescription() string {
return "mock vendor"
}
func (m *mockVendor) IsConfigured() bool {
return true
}
func (m *mockVendor) Configure() error {
return nil
}
func (m *mockVendor) Setup() error {
return nil
}
func (m *mockVendor) SetupFillEnvFileContent(*bytes.Buffer) {
}
func (m *mockVendor) ListModels() ([]string, error) {
return []string{"test-model"}, nil
}
func (m *mockVendor) SendStream(messages []*chat.ChatCompletionMessage, opts *common.ChatOptions, responseChan chan string) error {
// Send chunks if provided (for successful streaming test)
if m.streamChunks != nil {
for _, chunk := range m.streamChunks {
responseChan <- chunk
}
}
// Close the channel like real vendors do
close(responseChan)
return m.sendStreamError
}
func (m *mockVendor) Send(ctx context.Context, messages []*chat.ChatCompletionMessage, opts *common.ChatOptions) (string, error) {
return "test response", nil
}
func (m *mockVendor) NeedsRawMode(modelName string) bool {
return false
}
func TestChatter_Send_StreamingErrorPropagation(t *testing.T) {
// Create a temporary database for testing
tempDir := t.TempDir()
db := fsdb.NewDb(tempDir)
// Create a mock vendor that will return an error from SendStream
expectedError := errors.New("streaming error")
mockVendor := &mockVendor{
sendStreamError: expectedError,
}
// Create chatter with streaming enabled
chatter := &Chatter{
db: db,
Stream: true, // Enable streaming to trigger SendStream path
vendor: mockVendor,
model: "test-model",
}
// Create a test request
request := &common.ChatRequest{
Message: &chat.ChatCompletionMessage{
Role: chat.ChatMessageRoleUser,
Content: "test message",
},
}
// Create test options
opts := &common.ChatOptions{
Model: "test-model",
}
// Call Send and expect it to return the streaming error
session, err := chatter.Send(request, opts)
// Verify that the error from SendStream is propagated
if err == nil {
t.Fatal("Expected error to be returned, but got nil")
}
if !errors.Is(err, expectedError) {
t.Errorf("Expected error %q, but got %q", expectedError, err)
}
// Session should still be returned (it was built successfully before the streaming error)
if session == nil {
t.Error("Expected session to be returned even when streaming error occurs")
}
}
func TestChatter_Send_StreamingSuccessfulAggregation(t *testing.T) {
// Create a temporary database for testing
tempDir := t.TempDir()
db := fsdb.NewDb(tempDir)
// Create test chunks that should be aggregated
testChunks := []string{"Hello", " ", "world", "!", " This", " is", " a", " test."}
expectedMessage := "Hello world! This is a test."
// Create a mock vendor that will send chunks successfully
mockVendor := &mockVendor{
sendStreamError: nil, // No error for successful streaming
streamChunks: testChunks,
}
// Create chatter with streaming enabled
chatter := &Chatter{
db: db,
Stream: true, // Enable streaming to trigger SendStream path
vendor: mockVendor,
model: "test-model",
}
// Create a test request
request := &common.ChatRequest{
Message: &chat.ChatCompletionMessage{
Role: chat.ChatMessageRoleUser,
Content: "test message",
},
}
// Create test options
opts := &common.ChatOptions{
Model: "test-model",
}
// Call Send and expect successful aggregation
session, err := chatter.Send(request, opts)
// Verify no error occurred
if err != nil {
t.Fatalf("Expected no error, but got: %v", err)
}
// Verify session was returned
if session == nil {
t.Fatal("Expected session to be returned")
}
// Verify the message was aggregated correctly
messages := session.GetVendorMessages()
if len(messages) != 2 { // user message + assistant response
t.Fatalf("Expected 2 messages, got %d", len(messages))
}
// Check the assistant's response (last message)
assistantMessage := messages[len(messages)-1]
if assistantMessage.Role != chat.ChatMessageRoleAssistant {
t.Errorf("Expected assistant role, got %s", assistantMessage.Role)
}
if assistantMessage.Content != expectedMessage {
t.Errorf("Expected aggregated message %q, got %q", expectedMessage, assistantMessage.Content)
}
}

View File

@@ -1 +1 @@
"1.4.238"
"1.4.239"

View File

@@ -1,3 +1,3 @@
package main
var version = "v1.4.238"
var version = "v1.4.239"