Streaming with Workflows
Learn how to build streaming workflows
LlamaIndex workflows are designed from the ground up to work with streaming data. The streaming capabilities make it perfect for:
- Building real-time applications
- Handling large datasets incrementally
- Creating responsive UIs that update as data becomes available
- Implementing long-running tasks with partial results
Basic Streaming
Every workflow context provides a stream of events:
import {
createWorkflow,
workflowEvent,
getContext,
} from "@llamaindex/workflow-core";
// Define events
const startEvent = workflowEvent<string>();
const intermediateEvent = workflowEvent<string>();
const resultEvent = workflowEvent<string>();
// Create workflow
const workflow = createWorkflow();
workflow.handle([startEvent], (event) => {
const { sendEvent } = getContext();
// Emit multiple intermediate events
for (let i = 0; i < 5; i++) {
sendEvent(intermediateEvent.with(`Progress: ${i * 20}%`));
}
return resultEvent.with("Completed");
});
// Run the workflow
const { stream, sendEvent } = workflow.createContext();
sendEvent(startEvent.with("Start processing"));
// Process events as they arrive
for await (const event of stream) {
if (intermediateEvent.include(event)) {
console.log(event.data); // Show progress updates
} else if (resultEvent.include(event)) {
console.log("Final result:", event.data);
break; // Exit the loop when done
}
}
Using the Stream Utilities
Workflows provide utility functions to make working with streams easier:
import {
createWorkflow,
workflowEvent,
getContext,
} from "@llamaindex/workflow-core";
const startEvent = workflowEvent<void>();
const progressEvent = workflowEvent<number>();
const resultEvent = workflowEvent<string>();
const workflow = createWorkflow();
workflow.handle([startEvent], () => {
const { sendEvent } = getContext();
// Emit progress events
for (let i = 0; i < 100; i += 10) {
sendEvent(progressEvent.with(i));
}
return resultEvent.with("Complete");
});
// Run the workflow and collect events until a condition is met
const { stream, sendEvent } = workflow.createContext();
sendEvent(startEvent.with());
// Collect all events until resultEvent is encountered
const progressEvents = await stream
.until(resultEvent)
.filter(progressEvent)
.toArray();
console.log(`Received ${progressEvents.length} progress updates`);
Conditional Stream Processing
You can conditionally process events and even stop the stream early:
import {
createWorkflow,
workflowEvent,
getContext,
} from "@llamaindex/workflow-core";
const startEvent = workflowEvent<number>();
const dataEvent = workflowEvent<number>();
const thresholdEvent = workflowEvent<void>();
const resultEvent = workflowEvent<number[]>();
const workflow = createWorkflow();
workflow.handle([startEvent], (event) => {
const { sendEvent } = getContext();
const max = event.data;
for (let i = 0; i < max; i++) {
sendEvent(dataEvent.with(i));
if (i >= 10) {
// Signal that we've hit a threshold
sendEvent(thresholdEvent.with());
}
}
return resultEvent.with(Array.from({ length: max }, (_, i) => i));
});
// Run the workflow
const { stream, sendEvent } = workflow.createContext();
sendEvent(startEvent.with(100)); // Generate 100 numbers
const results = [];
let hitThreshold = false;
// Process the stream
for await (const event of stream) {
if (dataEvent.include(event)) {
results.push(event.data);
} else if (thresholdEvent.include(event)) {
hitThreshold = true;
break; // Stop processing early
}
}
console.log(
`Collected ${results.length} items before ${hitThreshold ? "hitting threshold" : "completion"}`,
);
Integration with UI Frameworks
Workflow streams can be easily integrated with UI frameworks like React to create responsive interfaces:
// In a React component
import { useEffect, useState } from "react";
import {
createWorkflow,
workflowEvent,
getContext,
} from "@llamaindex/workflow-core";
function StreamingComponent() {
const [updates, setUpdates] = useState([]);
const [isComplete, setIsComplete] = useState(false);
useEffect(() => {
// Set up workflow
const startEvent = workflowEvent<void>();
const updateEvent = workflowEvent<string>();
const completeEvent = workflowEvent<void>();
const workflow = createWorkflow();
workflow.handle([startEvent], () => {
const { sendEvent, signal } = getContext();
// Simulate async updates
const intervals = [
setTimeout(() => sendEvent(updateEvent.with("First update")), 500),
setTimeout(() => sendEvent(updateEvent.with("Second update")), 1000),
setTimeout(() => sendEvent(updateEvent.with("Final update")), 1500),
setTimeout(() => sendEvent(completeEvent.with()), 2000),
];
// Cleanup function using the signal
signal.onabort = () => {
console.log("Workflow context aborted, clearing timeouts.");
intervals.forEach(clearTimeout);
};
});
// Run the workflow
const { stream, sendEvent, abort } = workflow.createContext();
sendEvent(startEvent.with());
// Process events
const processEvents = async () => {
for await (const event of stream) {
if (updateEvent.include(event)) {
setUpdates((prev) => [...prev, event.data]);
} else if (completeEvent.include(event)) {
setIsComplete(true);
break;
}
}
};
processEvents();
// Cleanup
return () => {
console.log("Component unmounting, aborting workflow context.");
abort(); // Call abort on cleanup
};
}, []);
return (
<div>
<h2>Streaming Updates</h2>
<ul>
{updates.map((update, i) => (
<li key={i}>{update}</li>
))}
</ul>
{isComplete && <div>Process complete!</div>}
</div>
);
}
Server-Sent Events (SSE)
Workflows are also suitable for implementing Server-Sent Events:
import {
createWorkflow,
workflowEvent,
getContext,
} from "@llamaindex/workflow-core";
import express from "express";
// Define events
const startEvent = workflowEvent<void>();
const dataEvent = workflowEvent<string>();
// Create workflow
const workflow = createWorkflow();
workflow.handle([startEvent], () => {
const { sendEvent, signal } = getContext();
// Send periodic updates
const intervalId = setInterval(() => {
// Store single interval ID
sendEvent(dataEvent.with(`Update: ${new Date().toISOString()}`));
}, 1000);
// Cleanup using the signal
signal.onabort = () => {
console.log("Workflow context aborted, clearing interval.");
clearInterval(intervalId);
};
});
// Set up Express server
const app = express();
app.get("/events", (req, res) => {
// Set headers for SSE
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
// Run workflow
const { stream, sendEvent, abort } = workflow.createContext();
sendEvent(startEvent.with());
// Handle client disconnect
req.on("close", () => {
console.log("Client disconnected, aborting workflow context.");
abort(); // Abort the context when client closes connection
});
// Process and send events
(async () => {
for await (const event of stream) {
if (dataEvent.include(event)) {
res.write(`data: ${JSON.stringify(event.data)}\n\n`);
}
}
})();
});
app.listen(3000, () => {
console.log("SSE server running on port 3000");
});
Next Steps
Now that you've learned about streaming with workflows, explore more advanced topics: