Basic Workflow Patterns
Learn common patterns and techniques for building effective workflows
This guide explores common patterns you can use to build more complex workflows with workflows.
Fan-out (Parallelism)
One of the most powerful features of workflows is the ability to run tasks in parallel:
import { createWorkflow, workflowEvent, getContext } from "@llama-flow/core";
import { collect } from "@llama-flow/core/stream/consumer";
import { until } from "@llama-flow/core/stream/until";
import { filter } from "@llama-flow/core/stream/filter";
// Define events
const startEvent = workflowEvent<string>();
const processItemEvent = workflowEvent<number>();
const resultEvent = workflowEvent<string>();
const completeEvent = workflowEvent<string[]>();
// Create workflow
const workflow = createWorkflow();
// Define a variable accessible within the handler scope to signal completion
let itemsToProcess = 10; // Total number of items
let itemsProcessed = 0;
// Process start event: fan out to multiple processItemEvent events
workflow.handle([startEvent], (start) => {
const { sendEvent, stream } = getContext();
itemsProcessed = 0; // Reset counter for this execution context
// Emit multiple events to be processed in parallel
for (let i = 0; i < itemsToProcess; i++) {
sendEvent(processItemEvent.with(i));
}
// Use an async IIFE to collect results and emit completeEvent
(async () => {
const results = await collect(
// Filter for resultEvent and stop when all items are processed
until(
filter(stream, (event) => resultEvent.include(event)), // Only consider resultEvent
() => itemsProcessed === itemsToProcess, // Stop condition check
),
);
// Send the final aggregated result
sendEvent(completeEvent.with(results.map((event) => event.data)));
})().catch(console.error); // Handle potential errors during collection
// Note: This handler finishes *before* the collection completes.
// Returning nothing or a specific "processing started" event might be appropriate.
});
// Process each item
workflow.handle([processItemEvent], async (event) => {
// Simulate async work
await new Promise((resolve) => setTimeout(resolve, Math.random() * 100));
const processedValue = `Processed: ${event.data}`;
// Crucially, update the shared counter *after* processing
itemsProcessed++;
return resultEvent.with(processedValue);
});
// Example E2E Run Usage
async function runFanOut() {
console.log("Running fan-out");
const { stream, sendEvent } = workflow.createContext();
sendEvent(startEvent.with("Start fan-out"));
for await (const event of stream) {
if (processItemEvent.include(event)) {
console.log(`Processing item: ${event.data}`);
} else if (resultEvent.include(event)) {
console.log(`Result received: ${event.data}`);
} else if (completeEvent.include(event)) {
console.log("Final aggregated results:", event.data);
break; // Stop processing the stream
}
}
}
runFanOut();
This pattern allows you to:
- Emit multiple events to be processed in parallel
- Collect results as they come in
- Complete once all parallel tasks are finished
Conditional Branching
You can implement conditional logic in your workflows:
import { createWorkflow, workflowEvent } from "@llama-flow/core";
import { collect } from "@llama-flow/core/stream/consumer";
import { until } from "@llama-flow/core/stream/until";
const inputEvent = workflowEvent<number>();
const evenNumberEvent = workflowEvent<string>();
const oddNumberEvent = workflowEvent<string>();
const resultEvent = workflowEvent<string>();
const workflow = createWorkflow();
// Branch based on whether the number is even or odd
workflow.handle([inputEvent], (event) => {
if (event.data % 2 === 0) {
return evenNumberEvent.with(`${event.data} is even`);
} else {
return oddNumberEvent.with(`${event.data} is odd`);
}
});
// Handle even numbers
workflow.handle([evenNumberEvent], (event) => {
return resultEvent.with(`Even result: ${event.data}`);
});
// Handle odd numbers
workflow.handle([oddNumberEvent], (event) => {
return resultEvent.with(`Odd result: ${event.data}`);
});
// Example E2E Run Usage
async function run(input_number: number) {
// Create a workflow context and send the initial event
const { stream, sendEvent } = workflow.createContext();
sendEvent(inputEvent.with(input_number));
// Collect all events until we get a stopEvent
const allEvents = await collect(until(stream, resultEvent));
// The last event will be the stopEvent that was requested
const finalEvent = allEvents[allEvents.length - 1];
if (resultEvent.include(finalEvent)) {
console.log(`Result: ${finalEvent.data}`);
}
}
run(42);
run(43);
Using Middleware
LlamaIndex workflows provide middleware that can enhance your workflows:
withStore
Middleware
The withStore
middleware adds a persistent store to your workflow context:
import { createWorkflow, workflowEvent } from "@llama-flow/core";
import { withStore } from "@llama-flow/core/middleware/store";
const startEvent = workflowEvent<void>();
const incrementEvent = workflowEvent<number>();
const resultEvent = workflowEvent<number>();
// Create a workflow with store middleware
const workflow = withStore(
() => ({
// Initializer function
count: 0,
history: [] as number[],
}),
createWorkflow(),
);
// Increment the counter
workflow.handle([startEvent], () => {
const store = workflow.getStore(); // Use the provided getStore method
store.count += 1;
store.history.push(store.count);
return incrementEvent.with(store.count);
});
// Return the current count
workflow.handle([incrementEvent], (event) => {
const store = workflow.getStore();
console.log("Current count:", store.count);
console.log("History:", store.history);
return resultEvent.with(store.count);
});
// Example E2E Run Usage
async function runWithStore() {
const { stream, sendEvent } = workflow.createContext();
// Send start event multiple times to see store update
sendEvent(startEvent.with());
sendEvent(startEvent.with());
for await (const event of stream) {
if (resultEvent.include(event)) {
console.log("Final count received:", event.data);
// Note: In a real app, you might need logic to stop listening
// if you only expect one result per startEvent.
}
}
}
runWithStore();
withValidation
Middleware
The withValidation
middleware adds compile-time and runtime validation to your workflows:
import { createWorkflow, workflowEvent } from "@llama-flow/core";
import { withValidation } from "@llama-flow/core/middleware/validation";
const startEvent = workflowEvent<string, "start">();
const processEvent = workflowEvent<number, "process">();
const resultEvent = workflowEvent<string, "result">();
const disallowedEvent = workflowEvent<void, "disallowed">();
// Create a workflow with validation middleware
// Define allowed event paths
const workflow = withValidation(createWorkflow(), [
[[startEvent], [processEvent]], // startEvent can only lead to processEvent
[[processEvent], [resultEvent]], // processEvent can only lead to resultEvent
]);
// This will pass validation
workflow.strictHandle([startEvent], (sendEvent, start) => {
sendEvent(processEvent.with(123)); // ✅ This is allowed
});
// This would fail at compile time and runtime
workflow.strictHandle([startEvent], (sendEvent, start) => {
// sendEvent(disallowedEvent.with("disallowed")); // ❌ This would cause an error
// sendEvent(resultEvent.with("result")); // ❌ This would also cause an error
});
Error Handling
LlamaIndex workflows provide built-in mechanisms for handling errors:
import { createWorkflow, workflowEvent, getContext } from "@llama-flow/core";
const startEvent = workflowEvent<string>();
const processEvent = workflowEvent<number>();
const errorEvent = workflowEvent<Error>();
const resultEvent = workflowEvent<string>();
const workflow = createWorkflow();
workflow.handle([startEvent], (start) => {
try {
const num = Number.parseInt(start.data, 10);
if (isNaN(num)) {
throw new Error("Invalid number");
}
return processEvent.with(num);
} catch (err) {
return errorEvent.with(err instanceof Error ? err : new Error(String(err)));
}
});
workflow.handle([processEvent], (event) => {
return resultEvent.with(`Result: ${event.data * 2}`);
});
workflow.handle([errorEvent], (event) => {
return resultEvent.with(`Error: ${event.data.message}`);
});
You can also use the signal in getContext()
to handle errors:
workflow.handle([processEvent], () => {
const { signal } = getContext();
signal.onabort = () => {
console.error("Process aborted:", signal.reason);
// Clean up resources
};
// Your processing logic here
});
Connecting with Server Endpoints
Workflow can be used as middleware in server frameworks like Express, Hono, or Fastify:
import { Hono } from "hono";
import { serve } from "@hono/node-server";
import { createHonoHandler } from "@llama-flow/core/interrupter/hono";
import { createWorkflow, workflowEvent } from "@llama-flow/core";
// Define events
const queryEvent = workflowEvent<string>();
const responseEvent = workflowEvent<string>();
// Create workflow
const workflow = createWorkflow();
workflow.handle([queryEvent], (event) => {
const response = `Processed: ${event.data}`;
return responseEvent.with(response);
});
// Create Hono app
const app = new Hono();
// Set up workflow endpoint
app.post(
"/workflow",
createHonoHandler(
workflow,
async (ctx) => queryEvent.with(await ctx.req.text()),
responseEvent,
),
);
// Start server
serve(app, ({ port }) => {
console.log(`Server started at http://localhost:${port}`);
});
Next Steps
Now that you've learned about basic workflow patterns, explore more advanced topics: