Hazel

AI Streaming

Real-time AI response streaming with tool calls and extended thinking

The Bot SDK provides powerful AI streaming capabilities for building bots that stream responses from AI models in real-time. This includes support for extended thinking visualization, tool call display, and seamless integration with popular AI SDKs.

Overview

AI streaming enables your bot to:

  • Stream text in real-time as the AI generates responses
  • Show thinking to display the AI's reasoning process
  • Visualize tool calls with inputs and outputs
  • Handle errors gracefully with automatic stream failure handling

Quick Start

import { Effect } from "effect"
import { runHazelBot } from "@hazel/bot-sdk"

runHazelBot({
	setup: (bot) =>
		Effect.gen(function* () {
			yield* bot.onMessage((message) =>
				Effect.gen(function* () {
					// Create an AI stream session
					const session = yield* bot.ai.stream(message.channelId, {
						model: "claude-3.5-sonnet",
						showThinking: true,
						showToolCalls: true,
					})

					// Process AI chunks
					yield* session.processChunk({ type: "text", text: "Hello! " })
					yield* session.processChunk({ type: "text", text: "How can I help?" })

					// Complete the stream
					yield* session.complete()
				}),
			)
		}),
})

API Reference

bot.ai.stream

Creates an AI-aware stream session.

const session = yield* bot.ai.stream(channelId, options?)

Parameters:

ParameterTypeDescription
channelIdChannelIdTarget channel for the stream
optionsAIStreamOptionsOptional configuration

AIStreamOptions:

OptionTypeDefaultDescription
modelstring-AI model identifier (shown in UI)
showThinkingbooleanfalseDisplay extended thinking steps
showToolCallsbooleanfalseDisplay tool call steps
loadingobject-Initial loading state configuration

Loading state options:

{
	loading: {
		title: "Thinking...",      // Required: loading indicator title
		description: "Please wait" // Optional: additional context
	}
}

bot.stream.create

Creates a low-level stream session for full control.

const session = yield* bot.stream.create(channelId, options?)

Use this when you need direct control over the streaming process without AI-specific conveniences.

AIStreamSession Methods

processChunk

Process a single AI content chunk.

yield * session.processChunk(chunk)

Chunk types:

// Text content
{ type: "text", text: "Hello world" }

// Extended thinking
{ type: "thinking", thinking: "Let me analyze this..." }

// Tool call
{ type: "tool_call", name: "search", input: { query: "weather" } }

// Tool result
{ type: "tool_result", toolCallId: "call_123", result: { temp: 72 } }

processStream

Process an entire stream of chunks.

yield * session.processStream(chunkStream)

appendText

Append text to the current message.

yield * session.appendText("More text...")

setText

Replace the entire message text.

yield * session.setText("New message content")

startThinking

Start an extended thinking step.

const stepId = yield * session.startThinking("Analyzing...")

startToolCall

Start a tool call step.

const stepId = yield * session.startToolCall("search", { query: "weather" })

updateStepContent

Update a step's content.

yield * session.updateStepContent(stepId, "Updated content...")

completeStep

Mark a step as complete.

yield * session.completeStep(stepId, { result: "success" })

complete

Finalize the stream and persist the message.

yield * session.complete()

fail

Mark the stream as failed.

yield * session.fail(new Error("Something went wrong"))

Vercel AI SDK Integration

The AI streaming API integrates seamlessly with the Vercel AI SDK.

import { Effect, Stream, identity } from "effect"
import { generateText, streamText } from "ai"
import { anthropic } from "@ai-sdk/anthropic"

yield *
	bot.onMessage((message) =>
		Effect.gen(function* () {
			const session = yield* bot.ai.stream(message.channelId, {
				model: "claude-3.5-sonnet",
				showThinking: true,
				showToolCalls: true,
			})

			const result = streamText({
				model: anthropic("claude-3-5-sonnet-20241022"),
				prompt: message.content,
			})

			// Process the stream
			yield* Stream.fromAsyncIterable(result.fullStream, identity).pipe(
				Stream.map(mapPartToChunk),
				Stream.runForEach((chunk) => session.processChunk(chunk)),
				bot.ai.withErrorHandler(ctx, session),
			)

			yield* session.complete()
		}),
	)

// Map Vercel AI SDK parts to Bot SDK chunks
function mapPartToChunk(part: TextStreamPart): AIContentChunk | null {
	switch (part.type) {
		case "text-delta":
			return { type: "text", text: part.textDelta }
		case "reasoning":
			return { type: "thinking", thinking: part.textDelta }
		case "tool-call":
			return { type: "tool_call", name: part.toolName, input: part.args }
		case "tool-result":
			return { type: "tool_result", toolCallId: part.toolCallId, result: part.result }
		default:
			return null
	}
}

Error Handling

Using withErrorHandler

Wrap streaming operations to automatically fail the stream on error:

yield * myStreamingOperation.pipe(bot.ai.withErrorHandler(ctx, session))

This will:

  1. Catch any error
  2. Call session.fail(error) to mark the stream as failed
  3. Re-throw a typed CommandHandlerError so callers can catchTag upstream

Manual Error Handling

For more control:

yield *
	session.processChunk(chunk).pipe(
		Effect.catchTag("StreamProcessingError", (error) =>
			Effect.gen(function* () {
				yield* Effect.logError("Chunk processing failed", { error })
				yield* session.fail(error)
			}),
		),
	)

Chunk Type Reference

TypeRequired PropertiesOptional PropertiesDescription
texttext: string-Text content to append to the message
thinkingthinking: string-Extended thinking/reasoning content
tool_callname: stringinput: unknownTool invocation with name and arguments
tool_resulttoolCallId: string, result: unknown-Result from a tool execution

Loading States

Configure initial loading state when creating a session:

const session =
	yield *
	bot.ai.stream(channelId, {
		loading: {
			title: "Generating response...",
			description: "This may take a moment",
		},
	})

Update progress during streaming:

yield *
	session.setProgress({
		current: 50,
		total: 100,
		label: "Processing...",
	})

Best Practices

  1. Always call complete() or fail() - Ensure every stream session is properly finalized
  2. Use withErrorHandler - Wrap streaming operations to prevent orphaned streams
  3. Show thinking for complex tasks - Extended thinking helps users understand AI reasoning
  4. Show tool calls for transparency - Let users see what tools the AI is using
  5. Handle partial failures - If a tool call fails, continue streaming if possible

On this page