Logo

Basic Workflow Patterns

Learn common patterns and techniques for building effective workflows

This guide explores common patterns you can use to build more complex workflows with workflows.

Fan-out (Parallelism)

One of the most powerful features of workflows is the ability to run tasks in parallel:

import { createWorkflow, workflowEvent, getContext } from "@llama-flow/core";
import { collect } from "@llama-flow/core/stream/consumer";
import { until } from "@llama-flow/core/stream/until";
import { filter } from "@llama-flow/core/stream/filter";
 
// Define events
const startEvent = workflowEvent<string>();
const processItemEvent = workflowEvent<number>();
const resultEvent = workflowEvent<string>();
const completeEvent = workflowEvent<string[]>();
 
// Create workflow
const workflow = createWorkflow();
 
// Define a variable accessible within the handler scope to signal completion
let itemsToProcess = 10; // Total number of items
let itemsProcessed = 0;
 
// Process start event: fan out to multiple processItemEvent events
workflow.handle([startEvent], (start) => {
  const { sendEvent, stream } = getContext();
  itemsProcessed = 0; // Reset counter for this execution context
 
  // Emit multiple events to be processed in parallel
  for (let i = 0; i < itemsToProcess; i++) {
    sendEvent(processItemEvent.with(i));
  }
 
  // Use an async IIFE to collect results and emit completeEvent
  (async () => {
    const results = await collect(
      // Filter for resultEvent and stop when all items are processed
      until(
        filter(stream, (event) => resultEvent.include(event)), // Only consider resultEvent
        () => itemsProcessed === itemsToProcess, // Stop condition check
      ),
    );
    // Send the final aggregated result
    sendEvent(completeEvent.with(results.map((event) => event.data)));
  })().catch(console.error); // Handle potential errors during collection
 
  // Note: This handler finishes *before* the collection completes.
  // Returning nothing or a specific "processing started" event might be appropriate.
});
 
// Process each item
workflow.handle([processItemEvent], async (event) => {
  // Simulate async work
  await new Promise((resolve) => setTimeout(resolve, Math.random() * 100));
  const processedValue = `Processed: ${event.data}`;
 
  // Crucially, update the shared counter *after* processing
  itemsProcessed++;
 
  return resultEvent.with(processedValue);
});
 
// Example E2E Run Usage
async function runFanOut() {
  console.log("Running fan-out");
  const { stream, sendEvent } = workflow.createContext();
  sendEvent(startEvent.with("Start fan-out"));
 
  for await (const event of stream) {
    if (processItemEvent.include(event)) {
      console.log(`Processing item: ${event.data}`);
    } else if (resultEvent.include(event)) {
      console.log(`Result received: ${event.data}`);
    } else if (completeEvent.include(event)) {
      console.log("Final aggregated results:", event.data);
      break; // Stop processing the stream
    }
  }
}
 
runFanOut();

This pattern allows you to:

  1. Emit multiple events to be processed in parallel
  2. Collect results as they come in
  3. Complete once all parallel tasks are finished

Conditional Branching

You can implement conditional logic in your workflows:

import { createWorkflow, workflowEvent } from "@llama-flow/core";
import { collect } from "@llama-flow/core/stream/consumer";
import { until } from "@llama-flow/core/stream/until";
 
const inputEvent = workflowEvent<number>();
const evenNumberEvent = workflowEvent<string>();
const oddNumberEvent = workflowEvent<string>();
const resultEvent = workflowEvent<string>();
 
const workflow = createWorkflow();
 
// Branch based on whether the number is even or odd
workflow.handle([inputEvent], (event) => {
  if (event.data % 2 === 0) {
    return evenNumberEvent.with(`${event.data} is even`);
  } else {
    return oddNumberEvent.with(`${event.data} is odd`);
  }
});
 
// Handle even numbers
workflow.handle([evenNumberEvent], (event) => {
  return resultEvent.with(`Even result: ${event.data}`);
});
 
// Handle odd numbers
workflow.handle([oddNumberEvent], (event) => {
  return resultEvent.with(`Odd result: ${event.data}`);
});
 
// Example E2E Run Usage
async function run(input_number: number) {
  // Create a workflow context and send the initial event
  const { stream, sendEvent } = workflow.createContext();
  sendEvent(inputEvent.with(input_number));
 
  // Collect all events until we get a stopEvent
  const allEvents = await collect(until(stream, resultEvent));
 
  // The last event will be the stopEvent that was requested
  const finalEvent = allEvents[allEvents.length - 1];
  if (resultEvent.include(finalEvent)) {
    console.log(`Result: ${finalEvent.data}`);
  }
}
 
run(42);
run(43);

Using Middleware

LlamaIndex workflows provide middleware that can enhance your workflows:

withStore Middleware

The withStore middleware adds a persistent store to your workflow context:

import { createWorkflow, workflowEvent } from "@llama-flow/core";
import { withStore } from "@llama-flow/core/middleware/store";
 
const startEvent = workflowEvent<void>();
const incrementEvent = workflowEvent<number>();
const resultEvent = workflowEvent<number>();
 
// Create a workflow with store middleware
const workflow = withStore(
  () => ({
    // Initializer function
    count: 0,
    history: [] as number[],
  }),
  createWorkflow(),
);
 
// Increment the counter
workflow.handle([startEvent], () => {
  const store = workflow.getStore(); // Use the provided getStore method
 
  store.count += 1;
  store.history.push(store.count);
  return incrementEvent.with(store.count);
});
 
// Return the current count
workflow.handle([incrementEvent], (event) => {
  const store = workflow.getStore();
  console.log("Current count:", store.count);
  console.log("History:", store.history);
  return resultEvent.with(store.count);
});
 
// Example E2E Run Usage
async function runWithStore() {
  const { stream, sendEvent } = workflow.createContext();
 
  // Send start event multiple times to see store update
  sendEvent(startEvent.with());
  sendEvent(startEvent.with());
 
  for await (const event of stream) {
    if (resultEvent.include(event)) {
      console.log("Final count received:", event.data);
      // Note: In a real app, you might need logic to stop listening
      // if you only expect one result per startEvent.
    }
  }
}
 
runWithStore();

withValidation Middleware

The withValidation middleware adds compile-time and runtime validation to your workflows:

import { createWorkflow, workflowEvent } from "@llama-flow/core";
import { withValidation } from "@llama-flow/core/middleware/validation";
 
const startEvent = workflowEvent<string, "start">();
const processEvent = workflowEvent<number, "process">();
const resultEvent = workflowEvent<string, "result">();
const disallowedEvent = workflowEvent<void, "disallowed">();
 
// Create a workflow with validation middleware
// Define allowed event paths
const workflow = withValidation(createWorkflow(), [
  [[startEvent], [processEvent]], // startEvent can only lead to processEvent
  [[processEvent], [resultEvent]], // processEvent can only lead to resultEvent
]);
 
// This will pass validation
workflow.strictHandle([startEvent], (sendEvent, start) => {
  sendEvent(processEvent.with(123)); // ✅ This is allowed
});
 
// This would fail at compile time and runtime
workflow.strictHandle([startEvent], (sendEvent, start) => {
  // sendEvent(disallowedEvent.with("disallowed")); // ❌ This would cause an error
  // sendEvent(resultEvent.with("result")); // ❌ This would also cause an error
});

Error Handling

LlamaIndex workflows provide built-in mechanisms for handling errors:

import { createWorkflow, workflowEvent, getContext } from "@llama-flow/core";
 
const startEvent = workflowEvent<string>();
const processEvent = workflowEvent<number>();
const errorEvent = workflowEvent<Error>();
const resultEvent = workflowEvent<string>();
 
const workflow = createWorkflow();
 
workflow.handle([startEvent], (start) => {
  try {
    const num = Number.parseInt(start.data, 10);
    if (isNaN(num)) {
      throw new Error("Invalid number");
    }
    return processEvent.with(num);
  } catch (err) {
    return errorEvent.with(err instanceof Error ? err : new Error(String(err)));
  }
});
 
workflow.handle([processEvent], (event) => {
  return resultEvent.with(`Result: ${event.data * 2}`);
});
 
workflow.handle([errorEvent], (event) => {
  return resultEvent.with(`Error: ${event.data.message}`);
});

You can also use the signal in getContext() to handle errors:

workflow.handle([processEvent], () => {
  const { signal } = getContext();
 
  signal.onabort = () => {
    console.error("Process aborted:", signal.reason);
    // Clean up resources
  };
 
  // Your processing logic here
});

Connecting with Server Endpoints

Workflow can be used as middleware in server frameworks like Express, Hono, or Fastify:

import { Hono } from "hono";
import { serve } from "@hono/node-server";
import { createHonoHandler } from "@llama-flow/core/interrupter/hono";
import { createWorkflow, workflowEvent } from "@llama-flow/core";
 
// Define events
const queryEvent = workflowEvent<string>();
const responseEvent = workflowEvent<string>();
 
// Create workflow
const workflow = createWorkflow();
 
workflow.handle([queryEvent], (event) => {
  const response = `Processed: ${event.data}`;
  return responseEvent.with(response);
});
 
// Create Hono app
const app = new Hono();
 
// Set up workflow endpoint
app.post(
  "/workflow",
  createHonoHandler(
    workflow,
    async (ctx) => queryEvent.with(await ctx.req.text()),
    responseEvent,
  ),
);
 
// Start server
serve(app, ({ port }) => {
  console.log(`Server started at http://localhost:${port}`);
});

Next Steps

Now that you've learned about basic workflow patterns, explore more advanced topics:

On this page