From 6945fbf100607f2e370e56418671ffacc73e0eb5 Mon Sep 17 00:00:00 2001 From: nathandenherder Date: Thu, 5 Feb 2026 18:14:13 -0500 Subject: [PATCH] feat(slack): add native text streaming support Adds support for Slack's Agents & AI Apps text streaming APIs (chat.startStream, chat.appendStream, chat.stopStream) to deliver LLM responses as a single updating message instead of separate messages per block. Changes: - New src/slack/streaming.ts with stream lifecycle helpers using the SDK's ChatStreamer (client.chatStream()) - New 'streaming' config option on SlackAccountConfig - Updated dispatch.ts to route block replies through the stream when enabled, with graceful fallback to normal delivery - Docs in docs/channels/slack.md covering setup and requirements The streaming integration works by intercepting the deliver callback in the reply dispatcher. When streaming is enabled and a thread context exists, the first text delivery starts a stream, subsequent deliveries append to it, and the stream is finalized after dispatch completes. Media payloads and error cases fall back to normal message delivery. Refs: - https://docs.slack.dev/ai/developing-ai-apps#streaming - https://docs.slack.dev/reference/methods/chat.startStream - https://docs.slack.dev/reference/methods/chat.appendStream - https://docs.slack.dev/reference/methods/chat.stopStream --- docs/channels/slack.md | 34 ++++ src/config/types.slack.ts | 11 ++ src/config/zod-schema.providers-core.ts | 1 + src/slack/monitor/message-handler/dispatch.ts | 167 +++++++++++++++++- src/slack/streaming.ts | 136 ++++++++++++++ 5 files changed, 340 insertions(+), 9 deletions(-) create mode 100644 src/slack/streaming.ts diff --git a/docs/channels/slack.md b/docs/channels/slack.md index d692431dad..64a29ed105 100644 --- a/docs/channels/slack.md +++ b/docs/channels/slack.md @@ -563,6 +563,40 @@ Common failures: For triage flow: [/channels/troubleshooting](/channels/troubleshooting). +## Text streaming + +Slack's [Agents & AI Apps](https://docs.slack.dev/ai/developing-ai-apps) feature includes native text streaming APIs that let your app stream responses word-by-word (similar to ChatGPT) instead of waiting for the full response. + +Enable it per-account: + +```yaml +channels: + slack: + streaming: true +``` + +### Requirements + +1. **Agents & AI Apps** must be toggled on in your [Slack app settings](https://api.slack.com/apps). This automatically adds the `assistant:write` scope. +2. Streaming only works **within threads** (DM threads, channel threads). Messages without a thread context fall back to normal delivery automatically. +3. Block streaming (`blockStreaming`) is automatically enabled when `streaming` is active so the LLM's incremental output feeds into the stream. + +### Behavior + +- On the first text block the bot calls `chat.startStream` to create a single updating message. +- Subsequent text blocks are appended via `chat.appendStream`. +- When the reply is complete the stream is finalized with `chat.stopStream`. +- Media attachments (images, files) are delivered as separate messages alongside the stream. +- If a streaming API call fails, the bot gracefully falls back to normal message delivery for the remainder of the response. + +### Relevant Slack API methods + +| Method | Purpose | +| --------------------------------------------------------------------------------- | ------------------------- | +| [`chat.startStream`](https://docs.slack.dev/reference/methods/chat.startStream) | Start a new text stream | +| [`chat.appendStream`](https://docs.slack.dev/reference/methods/chat.appendStream) | Append text to the stream | +| [`chat.stopStream`](https://docs.slack.dev/reference/methods/chat.stopStream) | Finalize the stream | + ## Notes - Mention gating is controlled via `channels.slack.channels` (set `requireMention` to `true`); `agents.list[].groupChat.mentionPatterns` (or `messages.groupChat.mentionPatterns`) also count as mentions. diff --git a/src/config/types.slack.ts b/src/config/types.slack.ts index 4408aeb099..5743651b9e 100644 --- a/src/config/types.slack.ts +++ b/src/config/types.slack.ts @@ -122,6 +122,17 @@ export type SlackAccountConfig = { blockStreaming?: boolean; /** Merge streamed block replies before sending. */ blockStreamingCoalesce?: BlockStreamingCoalesceConfig; + /** + * Enable Slack native text streaming (Agents & AI Apps). + * + * When true, replies are streamed word-by-word into a single updating + * message using `chat.startStream` / `chat.appendStream` / `chat.stopStream`. + * Requires the Agents & AI Apps feature enabled in Slack app settings and + * the `assistant:write` scope. + * + * Falls back to normal delivery on error or when the message is not in a thread. + */ + streaming?: boolean; mediaMaxMb?: number; /** Reaction notification mode (off|own|all|allowlist). Default: own. */ reactionNotifications?: SlackReactionNotificationMode; diff --git a/src/config/zod-schema.providers-core.ts b/src/config/zod-schema.providers-core.ts index 8dc2bff6a8..8b7ac18117 100644 --- a/src/config/zod-schema.providers-core.ts +++ b/src/config/zod-schema.providers-core.ts @@ -480,6 +480,7 @@ export const SlackAccountSchema = z chunkMode: z.enum(["length", "newline"]).optional(), blockStreaming: z.boolean().optional(), blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), + streaming: z.boolean().optional(), mediaMaxMb: z.number().positive().optional(), reactionNotifications: z.enum(["off", "own", "all", "allowlist"]).optional(), reactionAllowlist: z.array(z.union([z.string(), z.number()])).optional(), diff --git a/src/slack/monitor/message-handler/dispatch.ts b/src/slack/monitor/message-handler/dispatch.ts index 8a988ca351..db034385fa 100644 --- a/src/slack/monitor/message-handler/dispatch.ts +++ b/src/slack/monitor/message-handler/dispatch.ts @@ -1,3 +1,5 @@ +import type { ReplyPayload } from "../../../auto-reply/types.js"; +import type { SlackStreamSession } from "../../streaming.js"; import type { PreparedSlackMessage } from "./types.js"; import { resolveHumanDelayConfig } from "../../../agents/identity.js"; import { dispatchInboundMessage } from "../../../auto-reply/dispatch.js"; @@ -10,9 +12,39 @@ import { createTypingCallbacks } from "../../../channels/typing.js"; import { resolveStorePath, updateLastRoute } from "../../../config/sessions.js"; import { danger, logVerbose, shouldLogVerbose } from "../../../globals.js"; import { removeSlackReaction } from "../../actions.js"; +import { appendSlackStream, startSlackStream, stopSlackStream } from "../../streaming.js"; import { resolveSlackThreadTargets } from "../../threading.js"; import { createSlackReplyDeliveryPlan, deliverReplies } from "../replies.js"; +/** + * Check whether a reply payload contains media (images, files, etc.) + * that cannot be delivered through the streaming API. + */ +function hasMedia(payload: ReplyPayload): boolean { + return Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; +} + +/** + * Determine if Slack native text streaming should be used for this message. + * + * Streaming requires: + * 1. The `streaming` config option enabled on the account + * 2. A thread timestamp (streaming only works within threads) + */ +function shouldUseStreaming(params: { + streamingEnabled: boolean; + threadTs: string | undefined; +}): boolean { + if (!params.streamingEnabled) { + return false; + } + if (!params.threadTs) { + logVerbose("slack-stream: streaming disabled — no thread_ts available"); + return false; + } + return true; +} + export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessage) { const { ctx, account, message, route } = prepared; const cfg = ctx.cfg; @@ -102,11 +134,30 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag accountId: route.accountId, }); - const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({ - ...prefixOptions, - humanDelay: resolveHumanDelayConfig(cfg, route.agentId), - deliver: async (payload) => { - const replyThreadTs = replyPlan.nextThreadTs(); + // ----------------------------------------------------------------------- + // Slack native text streaming state + // ----------------------------------------------------------------------- + const streamingEnabled = account.config.streaming === true; + const replyThreadTs = replyPlan.nextThreadTs(); + + const useStreaming = shouldUseStreaming({ + streamingEnabled, + threadTs: replyThreadTs ?? incomingThreadTs ?? statusThreadTs, + }); + + let streamSession: SlackStreamSession | null = null; + let streamFailed = false; + + /** + * Deliver a payload via Slack native text streaming when possible. + * Falls back to normal delivery for media payloads, errors, or if the + * streaming API call itself fails. + */ + const deliverWithStreaming = async (payload: ReplyPayload): Promise => { + const effectiveThreadTs = replyPlan.nextThreadTs(); + + // Fall back to normal delivery for media, errors, or if streaming already failed + if (streamFailed || hasMedia(payload) || !payload.text?.trim()) { await deliverReplies({ replies: [payload], target: prepared.replyTarget, @@ -114,9 +165,92 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag accountId: account.accountId, runtime, textLimit: ctx.textLimit, - replyThreadTs, + replyThreadTs: effectiveThreadTs, }); replyPlan.markSent(); + return; + } + + const text = payload.text.trim(); + + try { + if (!streamSession) { + // Determine the thread_ts for the stream (required by Slack API) + const streamThreadTs = effectiveThreadTs ?? incomingThreadTs ?? statusThreadTs; + + if (!streamThreadTs) { + // No thread context — can't stream, fall back + logVerbose( + "slack-stream: no thread_ts for stream start, falling back to normal delivery", + ); + streamFailed = true; + await deliverReplies({ + replies: [payload], + target: prepared.replyTarget, + token: ctx.botToken, + accountId: account.accountId, + runtime, + textLimit: ctx.textLimit, + replyThreadTs: effectiveThreadTs, + }); + replyPlan.markSent(); + return; + } + + // Start a new stream + streamSession = await startSlackStream({ + client: ctx.app.client, + channel: message.channel, + threadTs: streamThreadTs, + text, + }); + replyPlan.markSent(); + } else { + // Append to existing stream + await appendSlackStream({ + session: streamSession, + text: "\n" + text, + }); + } + } catch (err) { + runtime.error?.( + danger(`slack-stream: streaming API call failed: ${String(err)}, falling back`), + ); + streamFailed = true; + + // Fall back to normal delivery for this payload + await deliverReplies({ + replies: [payload], + target: prepared.replyTarget, + token: ctx.botToken, + accountId: account.accountId, + runtime, + textLimit: ctx.textLimit, + replyThreadTs: effectiveThreadTs, + }); + replyPlan.markSent(); + } + }; + + const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({ + ...prefixOptions, + humanDelay: resolveHumanDelayConfig(cfg, route.agentId), + deliver: async (payload) => { + if (useStreaming) { + await deliverWithStreaming(payload); + } else { + const effectiveThreadTs = replyPlan.nextThreadTs(); + await deliverReplies({ + replies: [payload], + target: prepared.replyTarget, + token: ctx.botToken, + accountId: account.accountId, + runtime, + textLimit: ctx.textLimit, + replyThreadTs: effectiveThreadTs, + }); + replyPlan.markSent(); + } }, onError: (err, info) => { runtime.error?.(danger(`slack ${info.kind} reply failed: ${String(err)}`)); @@ -135,14 +269,29 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag skillFilter: prepared.channelConfig?.skills, hasRepliedRef, disableBlockStreaming: - typeof account.config.blockStreaming === "boolean" - ? !account.config.blockStreaming - : undefined, + // When native streaming is active, keep block streaming enabled so we + // get incremental block callbacks that we route through the stream. + useStreaming + ? false + : typeof account.config.blockStreaming === "boolean" + ? !account.config.blockStreaming + : undefined, onModelSelected, }, }); markDispatchIdle(); + // ----------------------------------------------------------------------- + // Finalize the stream if one was started + // ----------------------------------------------------------------------- + if (streamSession && !streamSession.stopped) { + try { + await stopSlackStream({ session: streamSession }); + } catch (err) { + runtime.error?.(danger(`slack-stream: failed to stop stream: ${String(err)}`)); + } + } + const anyReplyDelivered = queuedFinal || (counts.block ?? 0) > 0 || (counts.final ?? 0) > 0; if (!anyReplyDelivered) { diff --git a/src/slack/streaming.ts b/src/slack/streaming.ts new file mode 100644 index 0000000000..600c56f7b2 --- /dev/null +++ b/src/slack/streaming.ts @@ -0,0 +1,136 @@ +/** + * Slack native text streaming helpers. + * + * Uses the Slack SDK's `ChatStreamer` (via `client.chatStream()`) to stream + * text responses word-by-word in a single updating message, matching Slack's + * "Agents & AI Apps" streaming UX. + * + * @see https://docs.slack.dev/ai/developing-ai-apps#streaming + * @see https://docs.slack.dev/reference/methods/chat.startStream + * @see https://docs.slack.dev/reference/methods/chat.appendStream + * @see https://docs.slack.dev/reference/methods/chat.stopStream + */ + +import type { ChatStreamer, WebClient } from "@slack/web-api"; +import { logVerbose } from "../globals.js"; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +export type SlackStreamSession = { + /** The SDK ChatStreamer instance managing this stream. */ + streamer: ChatStreamer; + /** Channel this stream lives in. */ + channel: string; + /** Thread timestamp (required for streaming). */ + threadTs: string; + /** True once stop() has been called. */ + stopped: boolean; +}; + +export type StartSlackStreamParams = { + client: WebClient; + channel: string; + threadTs: string; + /** Optional initial markdown text to include in the stream start. */ + text?: string; +}; + +export type AppendSlackStreamParams = { + session: SlackStreamSession; + text: string; +}; + +export type StopSlackStreamParams = { + session: SlackStreamSession; + /** Optional final markdown text to append before stopping. */ + text?: string; +}; + +// --------------------------------------------------------------------------- +// Stream lifecycle +// --------------------------------------------------------------------------- + +/** + * Start a new Slack text stream. + * + * Returns a {@link SlackStreamSession} that should be passed to + * {@link appendSlackStream} and {@link stopSlackStream}. + * + * The first chunk of text can optionally be included via `text`. + */ +export async function startSlackStream( + params: StartSlackStreamParams, +): Promise { + const { client, channel, threadTs, text } = params; + + logVerbose(`slack-stream: starting stream in ${channel} thread=${threadTs}`); + + const streamer = client.chatStream({ + channel, + thread_ts: threadTs, + }); + + const session: SlackStreamSession = { + streamer, + channel, + threadTs, + stopped: false, + }; + + // If initial text is provided, send it as the first append which will + // trigger the ChatStreamer to call chat.startStream under the hood. + if (text) { + await streamer.append({ markdown_text: text }); + logVerbose(`slack-stream: appended initial text (${text.length} chars)`); + } + + return session; +} + +/** + * Append markdown text to an active Slack stream. + */ +export async function appendSlackStream(params: AppendSlackStreamParams): Promise { + const { session, text } = params; + + if (session.stopped) { + logVerbose("slack-stream: attempted to append to a stopped stream, ignoring"); + return; + } + + if (!text) { + return; + } + + await session.streamer.append({ markdown_text: text }); + logVerbose(`slack-stream: appended ${text.length} chars`); +} + +/** + * Stop (finalize) a Slack stream. + * + * After calling this the stream message becomes a normal Slack message. + * Optionally include final text to append before stopping. + */ +export async function stopSlackStream(params: StopSlackStreamParams): Promise { + const { session, text } = params; + + if (session.stopped) { + logVerbose("slack-stream: stream already stopped, ignoring duplicate stop"); + return; + } + + session.stopped = true; + + logVerbose( + `slack-stream: stopping stream in ${session.channel} thread=${session.threadTs}${ + text ? ` (final text: ${text.length} chars)` : "" + }`, + ); + + await session.streamer.stop(text ? { markdown_text: text } : undefined); + + logVerbose("slack-stream: stream stopped"); +}