Logo
Guide/Workflow

Streaming

Learn how to use the LlamaIndex workflow with streaming.

Workflow API by default is designed for streaming data. In this guide, we will show you how to use the Workflow API with streaming data.

Each workflow.run call returns WorkflowContext, which implements AsyncIterable interface. You can use it to stream data.

import { , , ,  } from '@llamaindex/workflow';
class  extends <number> {
	constructor(: number) {
		super();
	}
}
class  extends <number> {
	constructor(: number) {
		super();
	}
}
 
type  = {
	: number;
}
 
const  = new <, number, number>();
.({
	: [<number>],
	: [<number>]
}, async (, ) => {
	const  = .;
	for (let  = 0;  < ; ++) {
		.(new ());
	}
	const  = await .(.({ :  }).(() => .()));
	// Workflow API allows you to start events in parallel and wait for all of them to finish
	.. = .((, ) =>  + ., 0);
	return new (..);
});

We define a parallel computation workflow that computes the sum of numbers from 0 to total.

The workflow sends ComputeEvent events for each number and waits for ComputeResultEvent events. After receiving all ComputeResultEvent events, the workflow returns the sum as a StopEvent.

What if we want cutoff if the sum exceeds a certain value?

Streaming

const  = .(1000, {
	: 0
});
 
for await (const  of ) {
	if ( instanceof ) {
		if (.. > 100) {
			throw new ('Sum exceeds 100');
		}
	}
	if ( instanceof ) {
		.('result', .);
	}
}

You can define more custom logic using AsyncIterable interface.

For example. I just want to stop the workflow if I get a ComputeResultEvent

async function () {
	const  = .(1000, {
		: 0
	});
	for await (const  of ) {
		if ( instanceof ) {
			return .;
		}
	}
	throw new ('UNREACHABLE');
}
 
const  = await ();

Streaming with UI

You can use the Workflow API with UI libraries like React.

'use server';
// "use server" is required to enable server side feature in React
import {  } from 'ai/rsc';
import {  } from './utils';
export async function () {
	'use server';
	const  = ();
	const  = .(100, {
		: 0
	});
	(async () => {
		for await (const  of ) {
			if ( instanceof ) {
				// Update UI
			} else if ( instanceof ) {
				// Update UI
			}
			// ...
		}
	});
	return .;
}
Compute total
10
Waiting for workflow to start
Edit on GitHub

Last updated on

On this page