feat: implement REST API support for metadata streaming (Phase 5)

This commit is contained in:
berniegreen
2025-12-31 12:43:48 -06:00
parent 66d3bf786e
commit c539b1edfc
3 changed files with 38 additions and 35 deletions

View File

@@ -77,25 +77,30 @@ func (o *Chatter) Send(request *domain.ChatRequest, opts *domain.ChatOptions) (s
}()
for update := range responseChan {
if opts.UpdateChan != nil {
opts.UpdateChan <- update
}
switch update.Type {
case domain.StreamTypeContent:
message += update.Content
if !opts.SuppressThink {
if !opts.SuppressThink && !opts.Quiet {
fmt.Print(update.Content)
printedStream = true
}
case domain.StreamTypeUsage:
if opts.ShowMetadata && update.Usage != nil {
if opts.ShowMetadata && update.Usage != nil && !opts.Quiet {
fmt.Fprintf(os.Stderr, "\n[Metadata] Input: %d | Output: %d | Total: %d\n",
update.Usage.InputTokens, update.Usage.OutputTokens, update.Usage.TotalTokens)
}
case domain.StreamTypeError:
fmt.Fprintf(os.Stderr, "Error: %s\n", update.Content)
if !opts.Quiet {
fmt.Fprintf(os.Stderr, "Error: %s\n", update.Content)
}
errChan <- errors.New(update.Content)
}
}
if printedStream && !opts.SuppressThink && !strings.HasSuffix(message, "\n") {
if printedStream && !opts.SuppressThink && !strings.HasSuffix(message, "\n") && !opts.Quiet {
fmt.Println()
}

View File

@@ -52,6 +52,8 @@ type ChatOptions struct {
Notification bool
NotificationCommand string
ShowMetadata bool
Quiet bool
UpdateChan chan StreamUpdate
}
// NormalizeMessages remove empty messages and ensure messages order user-assist-user

View File

@@ -40,9 +40,10 @@ type ChatRequest struct {
}
type StreamResponse struct {
Type string `json:"type"` // "content", "error", "complete"
Format string `json:"format"` // "markdown", "mermaid", "plain"
Content string `json:"content"` // The actual content
Type string `json:"type"` // "content", "usage", "error", "complete"
Format string `json:"format,omitempty"` // "markdown", "mermaid", "plain"
Content string `json:"content,omitempty"`
Usage *domain.UsageMetadata `json:"usage,omitempty"`
}
func NewChatHandler(r *gin.Engine, registry *core.PluginRegistry, db *fsdb.Db) *ChatHandler {
@@ -98,7 +99,7 @@ func (h *ChatHandler) HandleChat(c *gin.Context) {
log.Printf("Processing prompt %d: Model=%s Pattern=%s Context=%s",
i+1, prompt.Model, prompt.PatternName, prompt.ContextName)
streamChan := make(chan string)
streamChan := make(chan domain.StreamUpdate)
go func(p PromptRequest) {
defer close(streamChan)
@@ -117,10 +118,10 @@ func (h *ChatHandler) HandleChat(c *gin.Context) {
}
}
chatter, err := h.registry.GetChatter(p.Model, 2048, p.Vendor, "", false, false)
chatter, err := h.registry.GetChatter(p.Model, 2048, p.Vendor, "", true, false)
if err != nil {
log.Printf("Error creating chatter: %v", err)
streamChan <- fmt.Sprintf("Error: %v", err)
streamChan <- domain.StreamUpdate{Type: domain.StreamTypeError, Content: fmt.Sprintf("Error: %v", err)}
return
}
@@ -144,49 +145,44 @@ func (h *ChatHandler) HandleChat(c *gin.Context) {
FrequencyPenalty: request.FrequencyPenalty,
PresencePenalty: request.PresencePenalty,
Thinking: request.Thinking,
UpdateChan: streamChan,
Quiet: true,
}
session, err := chatter.Send(chatReq, opts)
_, err = chatter.Send(chatReq, opts)
if err != nil {
log.Printf("Error from chatter.Send: %v", err)
streamChan <- fmt.Sprintf("Error: %v", err)
// Error already sent to streamChan via domain.StreamTypeError if occurred in Send loop
return
}
if session == nil {
log.Printf("No session returned from chatter.Send")
streamChan <- "Error: No response from model"
return
}
lastMsg := session.GetLastMessage()
if lastMsg != nil {
streamChan <- lastMsg.Content
} else {
log.Printf("No message content in session")
streamChan <- "Error: No response content"
}
}(prompt)
for content := range streamChan {
for update := range streamChan {
select {
case <-clientGone:
return
default:
var response StreamResponse
if strings.HasPrefix(content, "Error:") {
switch update.Type {
case domain.StreamTypeContent:
response = StreamResponse{
Type: "content",
Format: detectFormat(update.Content),
Content: update.Content,
}
case domain.StreamTypeUsage:
response = StreamResponse{
Type: "usage",
Usage: update.Usage,
}
case domain.StreamTypeError:
response = StreamResponse{
Type: "error",
Format: "plain",
Content: content,
}
} else {
response = StreamResponse{
Type: "content",
Format: detectFormat(content),
Content: content,
Content: update.Content,
}
}
if err := writeSSEResponse(c.Writer, response); err != nil {
log.Printf("Error writing response: %v", err)
return