The Streams API allows you to stream data from your tasks to the outside world in realtime using the metadata system. This is particularly useful for streaming LLM outputs or any other real-time data.
For frontend applications using React, see our React hooks streams documentation for consuming streams in your UI.

How streams work

Streams use the metadata system to send data chunks in real-time. You register a stream with a specific key using metadata.stream, and then consumers can subscribe to receive those chunks as they’re emitted.

Basic streaming example

Here’s how to stream data from OpenAI in your task:
import { task, metadata } from "@trigger.dev/sdk/v3";
import OpenAI from "openai";

const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
});

export type STREAMS = {
  openai: OpenAI.ChatCompletionChunk; // The type of the chunk is determined by the provider
};

export const myTask = task({
  id: "my-task",
  run: async (payload: { prompt: string }) => {
    const completion = await openai.chat.completions.create({
      messages: [{ role: "user", content: payload.prompt }],
      model: "gpt-3.5-turbo",
      stream: true,
    });

    // Register the stream with the key "openai"
    // This will "tee" the stream and send it to the metadata system
    const stream = await metadata.stream("openai", completion);

    let text = "";

    // You can read the returned stream as an async iterator
    for await (const chunk of stream) {
      logger.log("Received chunk", { chunk });

      // The type of the chunk is determined by the provider
      text += chunk.choices.map((choice) => choice.delta?.content).join("");
    }

    return { text };
  },
});

Subscribing to streams from backend

You can subscribe to the stream using the runs.subscribeToRun method with .withStreams():
import { runs } from "@trigger.dev/sdk/v3";
import type { myTask, STREAMS } from "./trigger/my-task";

// Somewhere in your backend
async function subscribeToStream(runId: string) {
  // Use a for-await loop to subscribe to the stream
  for await (const part of runs.subscribeToRun<typeof myTask>(runId).withStreams<STREAMS>()) {
    switch (part.type) {
      case "run": {
        console.log("Received run", part.run);
        break;
      }
      case "openai": {
        // part.chunk is of type OpenAI.ChatCompletionChunk
        console.log("Received OpenAI chunk", part.chunk);
        break;
      }
    }
  }
}

Multiple streams

You can register and subscribe to multiple streams in the same task:
import { task, metadata } from "@trigger.dev/sdk/v3";
import OpenAI from "openai";

const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
});

export type STREAMS = {
  openai: OpenAI.ChatCompletionChunk;
  fetch: string; // The response body will be an array of strings
};

export const myTask = task({
  id: "my-task",
  run: async (payload: { prompt: string }) => {
    const completion = await openai.chat.completions.create({
      messages: [{ role: "user", content: payload.prompt }],
      model: "gpt-3.5-turbo",
      stream: true,
    });

    // Register the OpenAI stream
    await metadata.stream("openai", completion);

    const response = await fetch("https://jsonplaceholder.typicode.com/posts");

    if (!response.body) {
      return;
    }

    // Register a fetch response stream
    // Pipe the response.body through a TextDecoderStream to convert it to a string
    await metadata.stream("fetch", response.body.pipeThrough(new TextDecoderStream()));
  },
});
You may notice above that we aren’t consuming either of the streams in the task. In the background, we’ll wait until all streams are consumed before the task is considered complete (with a max timeout of 60 seconds). If you have a longer running stream, make sure to consume it in the task.
Then subscribe to both streams:
import { runs } from "@trigger.dev/sdk/v3";
import type { myTask, STREAMS } from "./trigger/my-task";

// Somewhere in your backend
async function subscribeToStream(runId: string) {
  for await (const part of runs.subscribeToRun<typeof myTask>(runId).withStreams<STREAMS>()) {
    switch (part.type) {
      case "run": {
        console.log("Received run", part.run);
        break;
      }
      case "openai": {
        // part.chunk is of type OpenAI.ChatCompletionChunk
        console.log("Received OpenAI chunk", part.chunk);
        break;
      }
      case "fetch": {
        // part.chunk is a string
        console.log("Received fetch chunk", part.chunk);
        break;
      }
    }
  }
}

Using with the AI SDK

The AI SDK provides a higher-level API for working with AI models. You can use it with the Streams API:
import { openai } from "@ai-sdk/openai";
import { logger, metadata, runs, schemaTask } from "@trigger.dev/sdk/v3";
import { streamText } from "ai";
import { z } from "zod";

export type STREAMS = {
  openai: string;
};

export const aiStreaming = schemaTask({
  id: "ai-streaming",
  description: "Stream data from the AI sdk",
  schema: z.object({
    model: z.string().default("o1-preview"),
    prompt: z.string().default("Hello, how are you?"),
  }),
  run: async ({ model, prompt }) => {
    logger.info("Running OpenAI model", { model, prompt });

    const result = streamText({
      model: openai(model),
      prompt,
    });

    // pass the textStream to the metadata system
    const stream = await metadata.stream("openai", result.textStream);

    let text = "";

    for await (const chunk of stream) {
      logger.log("Received chunk", { chunk });

      text += chunk; // chunk is a string
    }

    return { text };
  },
});

Using AI SDK with tools

When using tools with the AI SDK, you can access tool calls and results using the fullStream:
import { openai } from "@ai-sdk/openai";
import { logger, metadata, runs, schemaTask } from "@trigger.dev/sdk/v3";
import { streamText, tool, type TextStreamPart } from "ai";
import { z } from "zod";

const tools = {
  getWeather: tool({
    description: "Get the weather in a location",
    parameters: z.object({
      location: z.string().describe("The location to get the weather for"),
    }),
    execute: async ({ location }) => ({
      location,
      temperature: 72 + Math.floor(Math.random() * 21) - 10,
    }),
  }),
};

export type STREAMS = {
  // Give the stream a type of TextStreamPart along with the tools
  openai: TextStreamPart<{ getWeather: typeof tools.getWeather }>;
};

export const aiStreamingWithTools = schemaTask({
  id: "ai-streaming-with-tools",
  description: "Stream data from the AI SDK and use tools",
  schema: z.object({
    model: z.string().default("gpt-4o-mini"),
    prompt: z
      .string()
      .default(
        "Based on the temperature, will I need to wear extra clothes today in San Francisco? Please be detailed."
      ),
  }),
  run: async ({ model, prompt }) => {
    logger.info("Running OpenAI model", { model, prompt });

    const result = streamText({
      model: openai(model),
      prompt,
      tools, // Pass in the tools to use
      maxSteps: 5, // Allow streamText to repeatedly call the model
    });

    // pass the fullStream to the metadata system
    const stream = await metadata.stream("openai", result.fullStream);

    let text = "";

    for await (const chunk of stream) {
      logger.log("Received chunk", { chunk });

      // chunk is a TextStreamPart
      if (chunk.type === "text-delta") {
        text += chunk.textDelta;
      }
    }

    return { text };
  },
});

Using toolTask

You can define a Trigger.dev task that can be used as a tool, and will automatically be invoked with triggerAndWait when the tool is called:
import { openai } from "@ai-sdk/openai";
import { logger, metadata, runs, schemaTask, toolTask } from "@trigger.dev/sdk/v3";
import { streamText, tool, type TextStreamPart } from "ai";
import { z } from "zod";

export const getWeather = toolTask({
  id: "get-weather",
  description: "Get the weather for a location",
  // Define the parameters for the tool, which becomes the task payload
  parameters: z.object({
    location: z.string(),
  }),
  run: async ({ location }) => {
    // return mock data
    return {
      location,
      temperature: 72 + Math.floor(Math.random() * 21) - 10,
    };
  },
});

export type STREAMS = {
  // Give the stream a type of TextStreamPart along with the tools
  openai: TextStreamPart<{ getWeather: typeof getWeather.tool }>;
};

export const aiStreamingWithTools = schemaTask({
  id: "ai-streaming-with-tools",
  description: "Stream data from the AI SDK and use tools",
  schema: z.object({
    model: z.string().default("gpt-4o-mini"),
    prompt: z
      .string()
      .default(
        "Based on the temperature, will I need to wear extra clothes today in San Francisco? Please be detailed."
      ),
  }),
  run: async ({ model, prompt }) => {
    logger.info("Running OpenAI model", { model, prompt });

    const result = streamText({
      model: openai(model),
      prompt,
      tools: {
        getWeather: getWeather.tool, // pass weatherTask.tool as a tool
      },
      maxSteps: 5, // Allow streamText to repeatedly call the model
    });

    // pass the fullStream to the metadata system
    const stream = await metadata.stream("openai", result.fullStream);

    let text = "";

    for await (const chunk of stream) {
      logger.log("Received chunk", { chunk });

      // chunk is a TextStreamPart
      if (chunk.type === "text-delta") {
        text += chunk.textDelta;
      }
    }

    return { text };
  },
});