Claude Code Plugins

Community-maintained marketplace

Feedback
7
0

Master Effect AI streaming response patterns including start/delta/end protocol, accumulation strategies, resource-safe consumption, and history management with SubscriptionRef.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name effect-ai-streaming
description Master Effect AI streaming response patterns including start/delta/end protocol, accumulation strategies, resource-safe consumption, and history management with SubscriptionRef.

Effect AI Streaming

When to Use This Skill

  • Real-time streaming responses from language models
  • Building chat interfaces with incremental updates
  • Managing conversation history with streaming
  • Protecting concurrent stream operations
  • Accumulating stream parts with side effects
  • Converting stream responses to prompt history

Import Patterns

CRITICAL: Always use namespace imports:

import * as Stream from "effect/Stream"
import * as Effect from "effect/Effect"
import * as Channel from "effect/Channel"
import * as SubscriptionRef from "effect/SubscriptionRef"
import * as Match from "effect/Match"
import * as Response from "@effect/ai/Response"

StreamPart Protocol

stream := start → delta* → end

StreamPart lifecycle for each content type follows a three-phase protocol:

text      :: text-start → text-delta* → text-end
reasoning :: reasoning-start → reasoning-delta* → reasoning-end
toolParam :: tool-params-start → tool-params-delta* → tool-params-end
finish    :: { type: "finish", reason: FinishReason, usage: Usage }

Each streaming sequence has a unique id field that links start/delta/end parts.

Part Type Matching

Pattern match on stream parts using Match.value:

import * as Match from "effect/Match"
import * as Effect from "effect/Effect"

const processPart = (part: StreamPart) =>
  Match.value(part).pipe(
    Match.tag("text-delta", ({ delta }) =>
      Effect.sync(() => console.log(delta))
    ),
    Match.tag("reasoning-delta", ({ delta }) =>
      Effect.sync(() => logReasoning(delta))
    ),
    Match.tag("finish", ({ usage, reason }) =>
      Effect.sync(() => recordUsage(usage, reason))
    ),
    Match.orElse(() => Effect.void)
  )

Type guards for stream parts (polymorphic over encoded/decoded):

isTextDelta :: ∀ P. HasType P ⇒ P → P is TextDelta
isToolCallPart :: ∀ P. HasType P ⇒ P → P is ToolCall
isFinishPart :: ∀ P. HasType P ⇒ P → P is Finish

Accumulation Pattern

Accumulate stream parts incrementally using mutable state for efficiency:

import * as Stream from "effect/Stream"
import * as Effect from "effect/Effect"
import * as Prompt from "@effect/ai/Prompt"

const accumulated: Array<StreamPart> = []
let combined = Prompt.empty

stream.pipe(
  Stream.mapChunksEffect(Effect.fnUntraced(function* (chunk) {
    const parts = Array.from(chunk)

    // Append to mutable accumulator
    accumulated.push(...parts)

    // Build prompt from accumulated parts
    combined = Prompt.merge(combined, Prompt.fromResponseParts(parts))

    // Update history incrementally
    yield* SubscriptionRef.set(history, Prompt.merge(checkpoint, combined))

    return chunk
  }))
)

Key insight: Stream.mapChunksEffect enables side-effectful accumulation while preserving stream semantics.

Resource-Safe Streaming

Prevent concurrent stream operations using semaphore protection:

import * as Channel from "effect/Channel"
import * as Semaphore from "effect/Semaphore"
import * as Stream from "effect/Stream"

const streamWithProtection = Stream.fromChannel(
  Channel.acquireUseRelease(
    // Acquire: Take semaphore, get checkpoint
    semaphore.take(1).pipe(
      Effect.zipRight(SubscriptionRef.get(history)),
      Effect.map((hist) => Prompt.merge(hist, newPrompt)),
      Effect.tap((checkpoint) =>
        SubscriptionRef.set(history, checkpoint)
      )
    ),

    // Use: Stream with accumulation
    (checkpoint) => LanguageModel.streamText({ prompt: checkpoint }).pipe(
      Stream.mapChunksEffect(accumulateAndUpdate),
      Stream.toChannel
    ),

    // Release: Always release semaphore
    () => semaphore.release(1)
  )
)

Resource acquisition order:

  1. Take semaphore (exclusive access)
  2. Get current history snapshot
  3. Merge with new prompt
  4. Update history with checkpoint
  5. Stream response (with incremental updates)
  6. Release semaphore (guaranteed via acquireUseRelease)

Consumption Patterns

runForEach :: (A → Effect<R, E>) → Stream<A, E, R> → Effect<Unit, E, R> runDrain :: Stream<A, E, R> → Effect<Unit, E, R> runLast :: Stream<A, E, R> → Effect<Option, E, R>

// Process each part with side effects
stream.pipe(
  Stream.runForEach((part) =>
    Match.value(part).pipe(
      Match.tag("text-delta", ({ delta }) => updateUI(delta)),
      Match.tag("finish", ({ usage }) => recordMetrics(usage)),
      Match.orElse(() => Effect.void)
    )
  )
)

// Consume without collecting (memory efficient)
stream.pipe(
  Stream.tap(logPart),
  Stream.runDrain
)

// Get final accumulated value
stream.pipe(
  Stream.runFold(initialState, (acc, part) => merge(acc, part)),
  Effect.map(Option.some)
)

History Update Pattern

Incremental merge strategy for conversation history:

Prompt.merge :: Prompt → Prompt → Prompt
Prompt.fromResponseParts :: Array<StreamPart> → Prompt

// Pattern: checkpoint + incremental merge
let combined = Prompt.empty

Stream.mapChunksEffect(function* (chunk) {
  const parts = Array.from(chunk)

  // Merge new parts into combined prompt
  combined = Prompt.merge(combined, Prompt.fromResponseParts(parts))

  // Update history: base checkpoint + accumulated response
  yield* SubscriptionRef.set(
    history,
    Prompt.merge(filteredCheckpoint, combined)
  )

  return chunk
})

Why checkpoint-based merging:

  • Prevents re-merging entire history on each chunk
  • Separates base state (checkpoint) from streaming accumulation (combined)
  • Enables atomic history updates via SubscriptionRef

Complete Example

import * as AI from "@effect/ai"
import * as Stream from "effect/Stream"
import * as Effect from "effect/Effect"
import * as SubscriptionRef from "effect/SubscriptionRef"
import * as Semaphore from "effect/Semaphore"
import * as Match from "effect/Match"

const Chat = Effect.gen(function* () {
  const history = yield* SubscriptionRef.make(AI.Prompt.empty)
  const semaphore = yield* Semaphore.make(1)

  const streamText = (prompt: string) =>
    Stream.fromChannel(
      Channel.acquireUseRelease(
        // Acquire
        semaphore.take(1).pipe(
          Effect.zipRight(SubscriptionRef.get(history)),
          Effect.map((hist) => AI.Prompt.merge(hist, AI.Prompt.make(prompt))),
          Effect.tap((checkpoint) => {
            combined = AI.Prompt.empty
            return SubscriptionRef.set(history, checkpoint)
          })
        ),

        // Use
        (checkpoint) => {
          let combined = AI.Prompt.empty
          const accumulated: Array<AI.Response.StreamPart> = []

          return AI.LanguageModel.streamText({ prompt: checkpoint }).pipe(
            Stream.mapChunksEffect(Effect.fnUntraced(function* (chunk) {
              const parts = Array.from(chunk)
              accumulated.push(...parts)

              combined = AI.Prompt.merge(
                combined,
                AI.Prompt.fromResponseParts(parts)
              )

              yield* SubscriptionRef.set(
                history,
                AI.Prompt.merge(checkpoint, combined)
              )

              return chunk
            })),
            Stream.toChannel
          )
        },

        // Release
        () => semaphore.release(1)
      )
    )

  return { streamText }
})

// Consume stream
chat.streamText("Hello").pipe(
  Stream.runForEach((part) =>
    Match.value(part).pipe(
      Match.tag("text-delta", ({ delta }) => Effect.sync(() => console.log(delta))),
      Match.tag("finish", ({ usage }) => Effect.sync(() => console.log(usage))),
      Match.orElse(() => Effect.void)
    )
  )
)

Anti-Patterns

// ❌ Avoid Effect.either for pattern matching
Effect.either(effect).pipe(
  Effect.map((result) => result._tag === "Left" ? ... : ...)
)

// ✓ Use Match.typeTags or Effect.match
effect.pipe(
  Effect.match({
    onFailure: (error) => ...,
    onSuccess: (value) => ...
  })
)

// ❌ Manual type checking (use Match.tag instead)
if (part.type === "text-delta") { ... }

// ✓ Use Match.tag or type guards
Match.value(part).pipe(Match.tag("text-delta", handler))
isTextDelta(part) ? handler(part.delta) : ...

// ❌ Accumulating in Stream.map (loses effects)
Stream.map((chunk) => {
  accumulated.push(...chunk) // side effect ignored
  return chunk
})

// ✓ Use Stream.mapChunksEffect
Stream.mapChunksEffect(Effect.fnUntraced(function* (chunk) {
  accumulated.push(...chunk)
  yield* updateHistory()
  return chunk
}))

Additional Stream Part Types

File Parts

{ type: "file", mediaType: "image/png", data: Uint8Array }

Source Parts

{ type: "document-source", id: string, title?: string }
{ type: "url-source", url: string, title?: string }

Metadata Parts

{ type: "response-metadata", id: string, modelId: string, timestamp: Date }

Error Parts

{ type: "error", error: AiError }
// Handle with:
Match.tag("error", ({ error }) => Effect.fail(error))

Quality Checklist

  • Use start/delta/end protocol for streaming content
  • Match stream parts with Match.tag (not manual type checks)
  • Accumulate using Stream.mapChunksEffect (not Stream.map)
  • Use SubscriptionRef for reactive history updates
  • Protect concurrent streams with Semaphore
  • Use Channel.acquireUseRelease for resource safety
  • Handle error parts appropriately
  • Checkpoint history before streaming

Related Skills

  • effect-ai-language-model - streamText method that produces these streams
  • effect-ai-prompt - Converting stream responses to history with fromResponseParts
  • effect-ai-tool - Tool call streaming parts
  • effect-ai-provider - Provider-specific streaming behavior

Reference

StreamPart types:

  • text-start, text-delta, text-end - Text content streaming
  • reasoning-start, reasoning-delta, reasoning-end - Chain-of-thought streaming
  • tool-params-start, tool-params-delta, tool-params-end - Tool parameter streaming
  • tool-call - Complete tool invocation (non-streaming)
  • tool-result - Tool execution result
  • finish - Stream completion with usage stats
  • error - Error part

Key modules:

  • @effect/ai/Response - Response part schemas and constructors
  • @effect/ai/Prompt - Prompt construction and merging
  • effect/Stream - Stream combinators (mapChunksEffect, runForEach, runDrain)
  • effect/Channel - Low-level resource management (acquireUseRelease)
  • effect/SubscriptionRef - Reactive shared state
  • effect/Match - Pattern matching on tagged types