Logo

Streaming with Workflows

Learn how to build streaming workflows

LlamaIndex workflows are designed from the ground up to work with streaming data. The streaming capabilities make it perfect for:

  • Building real-time applications
  • Handling large datasets incrementally
  • Creating responsive UIs that update as data becomes available
  • Implementing long-running tasks with partial results

Basic Streaming

Every workflow context provides a stream of events:

import { createWorkflow, workflowEvent, getContext } from "@llama-flow/core";
 
// Define events
const startEvent = workflowEvent<string>();
const intermediateEvent = workflowEvent<string>();
const resultEvent = workflowEvent<string>();
 
// Create workflow
const workflow = createWorkflow();
 
workflow.handle([startEvent], (event) => {
  const { sendEvent } = getContext();
 
  // Emit multiple intermediate events
  for (let i = 0; i < 5; i++) {
    sendEvent(intermediateEvent.with(`Progress: ${i * 20}%`));
  }
 
  return resultEvent.with("Completed");
});
 
// Run the workflow
const { stream, sendEvent } = workflow.createContext();
sendEvent(startEvent.with("Start processing"));
 
// Process events as they arrive
for await (const event of stream) {
  if (intermediateEvent.include(event)) {
    console.log(event.data); // Show progress updates
  } else if (resultEvent.include(event)) {
    console.log("Final result:", event.data);
    break; // Exit the loop when done
  }
}

Using the Stream Utilities

Workflows provide utility functions to make working with streams easier:

import { createWorkflow, workflowEvent, getContext } from "@llama-flow/core";
import { collect } from "@llama-flow/core/stream/consumer";
import { until } from "@llama-flow/core/stream/until";
 
const startEvent = workflowEvent<void>();
const progressEvent = workflowEvent<number>();
const resultEvent = workflowEvent<string>();
 
const workflow = createWorkflow();
 
workflow.handle([startEvent], () => {
  const { sendEvent } = getContext();
 
  // Emit progress events
  for (let i = 0; i < 100; i += 10) {
    sendEvent(progressEvent.with(i));
  }
 
  return resultEvent.with("Complete");
});
 
// Run the workflow and collect events until a condition is met
const { stream, sendEvent } = workflow.createContext();
sendEvent(startEvent.with());
 
// Collect all events until resultEvent is encountered
const events = await collect(until(stream, resultEvent));
 
// Filter only progress events
const progressEvents = events.filter((event) => progressEvent.include(event));
console.log(`Received ${progressEvents.length} progress updates`);

Conditional Stream Processing

You can conditionally process events and even stop the stream early:

import { createWorkflow, workflowEvent, getContext } from "@llama-flow/core";
 
const startEvent = workflowEvent<number>();
const dataEvent = workflowEvent<number>();
const thresholdEvent = workflowEvent<void>();
const resultEvent = workflowEvent<number[]>();
 
const workflow = createWorkflow();
 
workflow.handle([startEvent], (event) => {
  const { sendEvent } = getContext();
  const max = event.data;
 
  for (let i = 0; i < max; i++) {
    sendEvent(dataEvent.with(i));
    if (i >= 10) {
      // Signal that we've hit a threshold
      sendEvent(thresholdEvent.with());
    }
  }
 
  return resultEvent.with(Array.from({ length: max }, (_, i) => i));
});
 
// Run the workflow
const { stream, sendEvent } = workflow.createContext();
sendEvent(startEvent.with(100)); // Generate 100 numbers
 
const results = [];
let hitThreshold = false;
 
// Process the stream
for await (const event of stream) {
  if (dataEvent.include(event)) {
    results.push(event.data);
  } else if (thresholdEvent.include(event)) {
    hitThreshold = true;
    break; // Stop processing early
  }
}
 
console.log(
  `Collected ${results.length} items before ${hitThreshold ? "hitting threshold" : "completion"}`,
);

Integration with UI Frameworks

Workflow streams can be easily integrated with UI frameworks like React to create responsive interfaces:

// In a React component
import { useEffect, useState } from "react";
import { createWorkflow, workflowEvent, getContext } from "@llama-flow/core";
 
function StreamingComponent() {
  const [updates, setUpdates] = useState([]);
  const [isComplete, setIsComplete] = useState(false);
 
  useEffect(() => {
    // Set up workflow
    const startEvent = workflowEvent<void>();
    const updateEvent = workflowEvent<string>();
    const completeEvent = workflowEvent<void>();
 
    const workflow = createWorkflow();
 
    workflow.handle([startEvent], () => {
      const { sendEvent, signal } = getContext();
 
      // Simulate async updates
      const intervals = [
        setTimeout(() => sendEvent(updateEvent.with("First update")), 500),
        setTimeout(() => sendEvent(updateEvent.with("Second update")), 1000),
        setTimeout(() => sendEvent(updateEvent.with("Final update")), 1500),
        setTimeout(() => sendEvent(completeEvent.with()), 2000),
      ];
 
      // Cleanup function using the signal
      signal.onabort = () => {
        console.log("Workflow context aborted, clearing timeouts.");
        intervals.forEach(clearTimeout);
      };
    });
 
    // Run the workflow
    const { stream, sendEvent, abort } = workflow.createContext();
    sendEvent(startEvent.with());
 
    // Process events
    const processEvents = async () => {
      for await (const event of stream) {
        if (updateEvent.include(event)) {
          setUpdates((prev) => [...prev, event.data]);
        } else if (completeEvent.include(event)) {
          setIsComplete(true);
          break;
        }
      }
    };
 
    processEvents();
 
    // Cleanup
    return () => {
      console.log("Component unmounting, aborting workflow context.");
      abort(); // Call abort on cleanup
    };
  }, []);
 
  return (
    <div>
      <h2>Streaming Updates</h2>
      <ul>
        {updates.map((update, i) => (
          <li key={i}>{update}</li>
        ))}
      </ul>
      {isComplete && <div>Process complete!</div>}
    </div>
  );
}

Server-Sent Events (SSE)

Workflows are also suitable for implementing Server-Sent Events:

import { createWorkflow, workflowEvent, getContext } from "@llama-flow/core";
import express from "express";
 
// Define events
const startEvent = workflowEvent<void>();
const dataEvent = workflowEvent<string>();
 
// Create workflow
const workflow = createWorkflow();
 
workflow.handle([startEvent], () => {
  const { sendEvent, signal } = getContext();
 
  // Send periodic updates
  const intervalId = setInterval(() => {
    // Store single interval ID
    sendEvent(dataEvent.with(`Update: ${new Date().toISOString()}`));
  }, 1000);
 
  // Cleanup using the signal
  signal.onabort = () => {
    console.log("Workflow context aborted, clearing interval.");
    clearInterval(intervalId);
  };
});
 
// Set up Express server
const app = express();
 
app.get("/events", (req, res) => {
  // Set headers for SSE
  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");
 
  // Run workflow
  const { stream, sendEvent, abort } = workflow.createContext();
  sendEvent(startEvent.with());
 
  // Handle client disconnect
  req.on("close", () => {
    console.log("Client disconnected, aborting workflow context.");
    abort(); // Abort the context when client closes connection
  });
 
  // Process and send events
  (async () => {
    for await (const event of stream) {
      if (dataEvent.include(event)) {
        res.write(`data: ${JSON.stringify(event.data)}\n\n`);
      }
    }
  })();
});
 
app.listen(3000, () => {
  console.log("SSE server running on port 3000");
});

Next Steps

Now that you've learned about streaming with workflows, explore more advanced topics:

On this page