Logo
Modules

Workflows

A Workflow in LlamaIndexTS is an event-driven abstraction used to chain together several events. Workflows are made up of steps, with each step responsible for handling certain event types and emitting new events.

Workflows in LlamaIndexTS work by defining step functions that handle specific event types and emit new events.

When a step function is added to a workflow, you need to specify the input and optionally the output event types (used for validation). The specification of the input events ensures each step only runs when an accepted event is ready.

You can create a Workflow to do anything! Build an agent, a RAG flow, an extraction flow, or anything else you want.

npm install @llamaindex/workflow

Getting Started

As an illustrative example, let's consider a naive workflow where a joke is generated and then critiqued.

import { OpenAI } from "@llamaindex/openai";
import { StartEvent, StopEvent, Workflow, WorkflowEvent } from "llamaindex";
 
// Create LLM instance
const llm = new OpenAI();
 
// Create a custom event type
export class JokeEvent extends WorkflowEvent<{ joke: string }> {}
 
const generateJoke = async (_: unknown, ev: StartEvent<string>) => {
  const prompt = `Write your best joke about ${ev.data}.`;
  const response = await llm.complete({ prompt });
  return new JokeEvent({ joke: response.text });
};
 
const critiqueJoke = async (_: unknown, ev: JokeEvent) => {
  const prompt = `Give a thorough critique of the following joke: ${ev.data.joke}`;
  const response = await llm.complete({ prompt });
  return new StopEvent(response.text);
};
 
const jokeFlow = new Workflow<unknown, string, string>();
jokeFlow.addStep(
  {
    inputs: [StartEvent<string>],
    outputs: [JokeEvent],
  },
  generateJoke,
);
jokeFlow.addStep(
  {
    inputs: [JokeEvent],
    outputs: [StopEvent<string>],
  },
  critiqueJoke,
);
 
// Usage
async function main() {
  const result = await jokeFlow.run("pirates");
  console.log(result.data);
}
 
main().catch(console.error);
 

There's a few moving pieces here, so let's go through this piece by piece.

Defining Workflow Events

export class JokeEvent extends WorkflowEvent<{ joke: string }> {}

Events are user-defined classes that extend WorkflowEvent and contain arbitrary data provided as template argument. In this case, our workflow relies on a single user-defined event, the JokeEvent with a joke attribute of type string.

Setting up the Workflow Class

const llm = new OpenAI();
...
const jokeFlow = new Workflow<unknown, string, string>();

Our workflow is implemented by initiating the Workflow class with three generic types: the context type (unknown), input type (string), and output type (string). The context type is unknown, as we're not using a shared context in this example.

For simplicity, we created an OpenAI llm instance that we're using for inference in our workflow.

Workflow Entry Points

const generateJoke = async (_: unknown, ev: StartEvent<string>) => {
  const prompt = `Write your best joke about ${ev.data}.`;
  const response = await llm.complete({ prompt });
  return new JokeEvent({ joke: response.text });
};

Here, we come to the entry-point of our workflow. While events are user-defined, there are two special-case events, the StartEvent and the StopEvent. These events are predefined, but we can specify the payload type using generic types. We're using StartEvent<string> to indicate that we're going to send an input of type string.

To add this step to the workflow, we use the addStep method with an object specifying the input and output event types:

jokeFlow.addStep(
  {
    inputs: [StartEvent<string>],
    outputs: [JokeEvent],
  },
  generateJoke
);

Workflow Exit Points

const critiqueJoke = async (_: unknown, ev: JokeEvent) => {
  const prompt = `Give a thorough critique of the following joke: ${ev.data.joke}`;
  const response = await llm.complete({ prompt });
  return new StopEvent(response.text);
};

Here, we have our second and last step in the workflow. We know it's the last step because the special StopEvent is returned. When the workflow encounters a returned StopEvent, it immediately stops the workflow and returns the result. Note that we're using the generic type StopEvent<string> to indicate that we're returning a string.

Add this step to the workflow:

jokeFlow.addStep(
  {
    inputs: [JokeEvent],
    outputs: [StopEvent<string>],
  },
  critiqueJoke
);

Running the Workflow

const result = await jokeFlow.run("pirates");
console.log(result.data.result);

Lastly, we run the workflow. The .run() method is async, so we use await here to wait for the result.

Working with Shared Context/State

Optionally, you can choose to use a shared context between steps by specifying a context type when creating the workflow. Here's an example where multiple steps access a shared state:

import { HandlerContext } from "llamaindex";
 
type MyContextData = {
  query: string;
  intermediateResults: any[];
}
 
const query = async (context: HandlerContext<MyContextData>, ev: MyEvent) => {
  // get the query from the context
  const query = context.data.query;
  // do something with context and event
  const val = ...
  // store in context
  context.data.intermediateResults.push(val);
 
  return new StopEvent({ result });
};

Waiting for Multiple Events

The context does more than just hold data, it also provides utilities to buffer and wait for multiple events.

For example, you might have a step that waits for a query and retrieved nodes before synthesizing a response:

const synthesize = async (context: Context, ev1: QueryEvent, ev2: RetrieveEvent) => {
  const subPrompts = [`Answer this query using the context provided: ${ev1.data.query}`, `Context: ${ev2.data.context}`];
  const prompt = subPrompts.join("\n");
  const response = await llm.complete({ prompt });
  return new StopEvent({ result: response.text });
};

Passing multiple events, we can buffer and wait for ALL expected events to arrive. The receiving step function will only be called once all events have arrived.

Manually Triggering Events

Normally, events are triggered by returning another event during a step. However, events can also be manually dispatched using the ctx.sendEvent(event) method within a workflow.

Examples

You can find many useful examples of using workflows in the examples folder.

Edit on GitHub

Last updated on

On this page