Basic Workflow Patterns
Learn common patterns and techniques for building effective workflows
This guide explores common patterns you can use to build more complex workflows with workflows.
Fan-out (Parallelism)
One of the most powerful features of workflows is the ability to run tasks in parallel:
import { createWorkflow, workflowEvent, getContext } from "@llama-flow/core";
// Define events
const startEvent = workflowEvent<string>();
const processItemEvent = workflowEvent<number>();
const resultEvent = workflowEvent<string>();
const completeEvent = workflowEvent<string[]>();
// Create workflow
const workflow = createWorkflow();
// Define a variable accessible within the handler scope to signal completion
let itemsToProcess = 10; // Total number of items
let itemsProcessed = 0;
// Process start event: fan out to multiple processItemEvent events
workflow.handle([startEvent], async (start) => {
const { sendEvent, stream } = getContext();
itemsProcessed = 0; // Reset counter for this execution context
// Emit multiple events to be processed in parallel
for (let i = 0; i < itemsToProcess; i++) {
sendEvent(processItemEvent.with(i));
}
// Use an async IIFE to collect results and emit completeEvent
try {
const results = await stream
.filter(resultEvent)
.until(() => itemsProcessed === itemsToProcess)
.toArray();
// Send the final aggregated result
sendEvent(completeEvent.with(results.map((event) => event.data)));
} catch (err) {
console.error("Error processing items:", err);
// Handle error if needed
}
// Note: This handler finishes *before* the collection completes.
// Returning nothing or a specific "processing started" event might be appropriate.
});
// Process each item
workflow.handle([processItemEvent], async (event) => {
// Simulate async work
await new Promise((resolve) => setTimeout(resolve, Math.random() * 100));
const processedValue = `Processed: ${event.data}`;
// Crucially, update the shared counter *after* processing
itemsProcessed++;
return resultEvent.with(processedValue);
});
// Example E2E Run Usage
async function runFanOut() {
console.log("Running fan-out");
const { stream, sendEvent } = workflow.createContext();
sendEvent(startEvent.with("Start fan-out"));
for await (const event of stream) {
if (processItemEvent.include(event)) {
console.log(`Processing item: ${event.data}`);
} else if (resultEvent.include(event)) {
console.log(`Result received: ${event.data}`);
} else if (completeEvent.include(event)) {
console.log("Final aggregated results:", event.data);
break; // Stop processing the stream
}
}
}
runFanOut();
This pattern allows you to:
- Emit multiple events to be processed in parallel
- Collect results as they come in
- Complete once all parallel tasks are finished
Conditional Branching
You can implement conditional logic in your workflows:
import { createWorkflow, workflowEvent } from "@llama-flow/core";
const inputEvent = workflowEvent<number>();
const evenNumberEvent = workflowEvent<string>();
const oddNumberEvent = workflowEvent<string>();
const resultEvent = workflowEvent<string>();
const workflow = createWorkflow();
// Branch based on whether the number is even or odd
workflow.handle([inputEvent], (event) => {
if (event.data % 2 === 0) {
return evenNumberEvent.with(`${event.data} is even`);
} else {
return oddNumberEvent.with(`${event.data} is odd`);
}
});
// Handle even numbers
workflow.handle([evenNumberEvent], (event) => {
return resultEvent.with(`Even result: ${event.data}`);
});
// Handle odd numbers
workflow.handle([oddNumberEvent], (event) => {
return resultEvent.with(`Odd result: ${event.data}`);
});
// Example E2E Run Usage
async function run(input_number: number) {
// Create a workflow context and send the initial event
const { stream, sendEvent } = workflow.createContext();
sendEvent(inputEvent.with(input_number));
// Collect all events until we get a stopEvent
const allEvents = await stream.until(resultEvent).toArray();
// The last event will be the stopEvent that was requested
const finalEvent = allEvents[allEvents.length - 1];
if (resultEvent.include(finalEvent)) {
console.log(`Result: ${finalEvent.data}`);
}
}
run(42);
run(43);
Using Middleware
LlamaIndex workflows provide middleware that can enhance your workflows:
withState
Middleware
The withState
middleware adds a persistent state to your workflow context:
import { createStatefulMiddleware } from "@llama-flow/core/middleware/state";
// Define your state type
type CounterState = {
count: number;
history: number[];
};
// Create a workflow with state middleware
const { withState } = createStatefulMiddleware<CounterState>(() => ({
count: 0,
history: [],
}));
const workflow = withState(createWorkflow());
// Use the state in your handlers
workflow.handle([startEvent], () => {
const { state } = getContext();
state.count += 1;
state.history.push(state.count);
return incrementEvent.with(state.count);
});
workflow.handle([incrementEvent], (increment) => {
const { state } = getContext();
console.log("Current count:", state.count);
console.log("History:", state.history);
return resultEvent.with(state.count);
});
// Run the workflow
async function runWithState() {
const { sendEvent, state } = workflow.createContext();
// Send start event multiple times to see state update
sendEvent(startEvent.with());
sendEvent(startEvent.with());
sendEvent(startEvent.with());
// The state persists across events
console.log("Final count:", state.count);
console.log("Final history:", state.history);
}
runWithState();
You can also create a state with input:
const { withState } = createStatefulMiddleware(
(input: { initialCount: number }) => ({
count: input.initialCount,
history: [input.initialCount],
}),
);
const workflow = withState(createWorkflow());
const { state } = workflow.createContext({ initialCount: 10 });
withValidation
Middleware
The withValidation
middleware adds compile-time and runtime validation to your workflows:
import { createWorkflow, workflowEvent } from "@llama-flow/core";
import { withValidation } from "@llama-flow/core/middleware/validation";
const startEvent = workflowEvent<string, "start">();
const processEvent = workflowEvent<number, "process">();
const resultEvent = workflowEvent<string, "result">();
const disallowedEvent = workflowEvent<void, "disallowed">();
// Create a workflow with validation middleware
// Define allowed event paths
const workflow = withValidation(createWorkflow(), [
[[startEvent], [processEvent]], // startEvent can only lead to processEvent
[[processEvent], [resultEvent]], // processEvent can only lead to resultEvent
]);
// This will pass validation
workflow.strictHandle([startEvent], (sendEvent, start) => {
sendEvent(processEvent.with(123)); // ✅ This is allowed
});
// This would fail at compile time and runtime
workflow.strictHandle([startEvent], (sendEvent, start) => {
// sendEvent(disallowedEvent.with("disallowed")); // ❌ This would cause an error
// sendEvent(resultEvent.with("result")); // ❌ This would also cause an error
});
Error Handling
LlamaIndex workflows provide built-in mechanisms for handling errors:
import { createWorkflow, workflowEvent, getContext } from "@llama-flow/core";
const startEvent = workflowEvent<string>();
const processEvent = workflowEvent<number>();
const errorEvent = workflowEvent<Error>();
const resultEvent = workflowEvent<string>();
const workflow = createWorkflow();
workflow.handle([startEvent], (start) => {
try {
const num = Number.parseInt(start.data, 10);
if (isNaN(num)) {
throw new Error("Invalid number");
}
return processEvent.with(num);
} catch (err) {
return errorEvent.with(err instanceof Error ? err : new Error(String(err)));
}
});
workflow.handle([processEvent], (event) => {
return resultEvent.with(`Result: ${event.data * 2}`);
});
workflow.handle([errorEvent], (event) => {
return resultEvent.with(`Error: ${event.data.message}`);
});
You can also use the signal in getContext()
to handle errors:
workflow.handle([processEvent], () => {
const { signal } = getContext();
signal.onabort = () => {
console.error("Process aborted:", signal.reason);
// Clean up resources
};
// Your processing logic here
});
Connecting with Server Endpoints
Workflow can be used as middleware in server frameworks like Express, Hono, or Fastify:
import { Hono } from "hono";
import { serve } from "@hono/node-server";
import { createHonoHandler } from "@llama-flow/core/interrupter/hono";
import { createWorkflow, workflowEvent } from "@llama-flow/core";
// Define events
const queryEvent = workflowEvent<string>();
const responseEvent = workflowEvent<string>();
// Create workflow
const workflow = createWorkflow();
workflow.handle([queryEvent], (event) => {
const response = `Processed: ${event.data}`;
return responseEvent.with(response);
});
// Create Hono app
const app = new Hono();
// Set up workflow endpoint
app.post(
"/workflow",
createHonoHandler(
workflow,
async (ctx) => queryEvent.with(await ctx.req.text()),
responseEvent,
),
);
// Start server
serve(app, ({ port }) => {
console.log(`Server started at http://localhost:${port}`);
});
Next Steps
Now that you've learned about basic workflow patterns, explore more advanced topics: