Advanced Event Handling
Master complex event patterns and middleware with Workflows
This guide explores advanced event handling techniques and patterns you can use with Workflows to build more sophisticated patterns.
Event Composition
Workflows allow you to work with different event types and compose them in powerful ways:
Multiple Event Types
You can define multiple event types for different kinds of data flowing through your workflow:
import { createWorkflow, workflowEvent } from "@llama-flow/core";
// Define different event types
const textEvent = workflowEvent<string>();
const numberEvent = workflowEvent<number>();
const booleanEvent = workflowEvent<boolean>();
const complexEvent = workflowEvent<{ id: string; value: number }>();
// Create a workflow that can process different event types
const workflow = createWorkflow();
// Handle text events
workflow.handle([textEvent], (event) => {
console.log(`Processing text: ${event.data}`);
return numberEvent.with(event.data.length);
});
// Handle number events
workflow.handle([numberEvent], (event) => {
const isEven = event.data % 2 === 0;
console.log(`Number ${event.data} is ${isEven ? "even" : "odd"}`);
return booleanEvent.with(isEven);
});
// Handle boolean events
workflow.handle([booleanEvent], (event) => {
return complexEvent.with({
id: crypto.randomUUID(),
value: event.data ? 100 : 0,
});
});
// Run the workflow
const { stream, sendEvent } = workflow.createContext();
sendEvent(textEvent.with("Hello, world!"));
for await (const event of stream) {
if (complexEvent.include(event)) {
console.log(event.data);
break;
}
}
Event Branching and Merging
You can create complex event flows with branching and merging patterns:
import { createWorkflow, workflowEvent, getContext } from "@llama-flow/core";
import { collect } from "@llama-flow/core/stream/consumer";
import { until } from "@llama-flow/core/stream/until";
import { filter } from "@llama-flow/core/stream/filter";
import { pipeline } from "node:stream/promises";
// Define events for a data processing pipeline with branching and merging
const startEvent = workflowEvent<string[], "start">();
const itemEvent = workflowEvent<string, "item">();
const validEvent = workflowEvent<string, "valid">();
const invalidEvent = workflowEvent<string, "invalid">();
const processedValidEvent = workflowEvent<string, "processedValid">();
const processedInvalidEvent = workflowEvent<string, "processedInvalid">();
const resultEvent = workflowEvent<string[], "result">();
// Create workflow
const workflow = createWorkflow();
// Initial handler: branch out to process multiple items
workflow.handle([startEvent], (event) => {
const { sendEvent } = getContext();
// Fan out: emit an itemEvent for each input item
console.log(`Received ${event.data.length} items to process`);
for (const item of event.data) {
sendEvent(itemEvent.with(item));
}
// No return here - this handler just fans out events
});
// Branch based on item validation
workflow.handle([itemEvent], (event) => {
const item = event.data;
console.log(`Validating item: ${item}`);
// Branch: send to different paths based on condition
if (item && item.length >= 3) {
return validEvent.with(item);
} else {
return invalidEvent.with(item || "[empty]");
}
});
// Process valid items
workflow.handle([validEvent], (event) => {
const item = event.data;
console.log(`Processing valid item: ${item}`);
return processedValidEvent.with(`✓ ${item.toUpperCase()}`);
});
// Process invalid items
workflow.handle([invalidEvent], (event) => {
const item = event.data;
console.log(`Processing invalid item: ${item}`);
return processedInvalidEvent.with(`✗ ${item}`);
});
// Merge handler: collect results from both processing paths
workflow.handle([startEvent], async (event) => {
const { stream } = getContext();
const totalItems = event.data.length;
console.log(`Setting up merger to collect ${totalItems} processed results`);
// Merge: collect results from both processing paths
const results = await collect(
until(
filter(
stream,
(ev) =>
processedValidEvent.include(ev) || processedInvalidEvent.include(ev),
),
() => {
// Stop when we've collected the expected number of results
return results.length >= totalItems;
},
),
);
// Extract and sort the data from collected events
const processedResults = results.map((event) => event.data);
// The valid items (with ✓) first, then invalid items (with ✗)
processedResults.sort();
return resultEvent.with(processedResults);
});
// Example usage
async function runBranchingMergingWorkflow() {
const { stream, sendEvent } = workflow.createContext();
const items = ["apple", "b", "cherry", "", "elderberry", "fig"];
console.log("Starting workflow with items:", items);
sendEvent(startEvent.with(items));
// Process the stream using pipeline to get the final result
const result = await pipeline(stream, async function* (source) {
for await (const event of source) {
if (validEvent.include(event) || invalidEvent.include(event)) {
console.log(`Branched event with data: ${event.data}`);
} else if (
processedValidEvent.include(event) ||
processedInvalidEvent.include(event)
) {
console.log(`Processed event with data: ${event.data}`);
} else if (resultEvent.include(event)) {
console.log("Merged results:", event.data);
yield event.data;
return; // Stop processing
}
}
});
return result;
}
runBranchingMergingWorkflow();
In this branching and merging example:
- We start with a list of items and fan out to process each one individually
- Each item branches to either the valid or invalid path based on its length
- Different processing is applied to each branch
- Finally, we merge the results from both branches back into a single resulting array
This pattern is useful for parallel processing with different business logic for different categories of inputs.
Event Filtering and Transformation
You can filter and transform events to build sophisticated data processing pipelines:
import { createWorkflow, workflowEvent, getContext } from "@llama-flow/core";
import { collect } from "@llama-flow/core/stream/consumer";
import { until } from "@llama-flow/core/stream/until";
import { filter } from "@llama-flow/core/stream/filter";
// Define events
const dataEvent = workflowEvent<number>();
const initEvent = workflowEvent<number>();
const evenEvent = workflowEvent<number>();
const oddEvent = workflowEvent<number>();
const transformedEvent = workflowEvent<string>();
const resultEvent = workflowEvent<string[]>();
// Create workflow
const workflow = createWorkflow();
// Filter even numbers
workflow.handle([dataEvent], (event) => {
if (event.data % 2 === 0) {
return evenEvent.with(event.data);
} else {
return oddEvent.with(event.data);
}
});
// Transform even numbers
workflow.handle([evenEvent], (event) => {
return transformedEvent.with(`Even: ${event.data}`);
});
// Transform odd numbers
workflow.handle([oddEvent], (event) => {
return transformedEvent.with(`Odd: ${event.data}`);
});
// Collect and organize results
workflow.handle([initEvent], async (event) => {
const { stream, sendEvent } = getContext();
// Generate a sequence of numbers
for (let i = 1; i <= event.data; i++) {
sendEvent(dataEvent.with(i));
}
// Collect transformed events
let numResults = 0;
const results = await collect(
until(
filter(stream, (ev) => transformedEvent.include(ev)),
() => {
numResults++;
return numResults >= event.data;
},
),
);
return resultEvent.with(results.map((r) => r.data));
});
// Example usage
async function runFilterTransformWorkflow() {
const { stream, sendEvent } = workflow.createContext();
sendEvent(initEvent.with(10));
// Process the stream to get the final result
for await (const event of stream) {
if (resultEvent.include(event)) {
console.log("Results:", event.data);
return event.data;
}
}
}
runFilterTransformWorkflow();
Debugging with Substreams
You can use the substream
feature to debug specific event flows:
import { createWorkflow, workflowEvent, getContext } from "@llama-flow/core";
import { withTraceEvents } from "@llama-flow/core/middleware/trace-events";
// Define events
const queryEvent = workflowEvent<string>({
debugLabel: "query",
});
const fetchEvent = workflowEvent<string>({
debugLabel: "fetch",
});
const resultEvent = workflowEvent<string>({
debugLabel: "result",
});
// Create workflow with tracing
const workflow = withTraceEvents(createWorkflow());
// Query handler
workflow.handle([queryEvent], (event) => {
const { sendEvent, stream } = getContext();
// Create a specific fetch event for this query
const fetchInstance = fetchEvent.with(event.data);
sendEvent(fetchInstance);
// Create a substream to only track events related to this fetch
const substream = workflow.substream(fetchInstance, stream);
// Listen for results in the substream
(async () => {
for await (const event of substream) {
console.log(`Event in substream: ${event}`);
}
})();
return resultEvent.with(`Querying: ${event.data}`);
});
// Fetch handler
workflow.handle([fetchEvent], (event) => {
console.log(`Fetching data for: ${event.data}`);
// Actual fetch logic would go here
return resultEvent.with(`Results for: ${event.data}`);
});
async function run() {
const { stream, sendEvent } = workflow.createContext();
sendEvent(queryEvent.with("Hello"));
for await (const event of stream) {
if (resultEvent.include(event)) {
console.log(`Result: ${event.data}`);
break;
}
}
}
run();
Advanced Validation and Type Safety
The withValidation
middleware ensures your workflow connections are both type-safe and runtime-safe:
import { createWorkflow, workflowEvent } from "@llama-flow/core";
import { withValidation } from "@llama-flow/core/middleware/validation";
// Define events with explicit types
const inputEvent = workflowEvent<string>();
const validateEvent = workflowEvent<string>();
const processEvent = workflowEvent<string>();
const resultEvent = workflowEvent<string>();
const errorEvent = workflowEvent<Error>({
debugLabel: "errorEvent", // Add debug labels for better error messages
});
// Define the allowed event flow paths
const workflow = withValidation(createWorkflow(), [
[[inputEvent], [validateEvent, errorEvent]], // inputEvent can lead to validateEvent or errorEvent
[[validateEvent], [processEvent, errorEvent]], // validateEvent can lead to processEvent or errorEvent
[[processEvent], [resultEvent, errorEvent]], // processEvent can lead to resultEvent or errorEvent
[[errorEvent], [resultEvent]], // errorEvent can lead to resultEvent
]);
// Now use strictHandle to get compile-time validation
workflow.strictHandle([inputEvent], (sendEvent, event) => {
try {
if (!event.data || event.data.trim().length === 0) {
throw new Error("Empty input");
}
// This is allowed by our validation rules
sendEvent(validateEvent.with(event.data.trim()));
// This would cause a compile-time error:
// sendEvent(resultEvent.with("Result")); // ❌ Not allowed by validation rules
} catch (err) {
// This is allowed by our validation rules
sendEvent(
errorEvent.with(err instanceof Error ? err : new Error(String(err))),
);
}
});
// The rest of the workflow with strict validation
workflow.strictHandle([validateEvent], (sendEvent, event) => {
// Validation logic here
sendEvent(processEvent.with(event.data));
});
workflow.strictHandle([processEvent], (sendEvent, event) => {
// Processing logic here
sendEvent(resultEvent.with(`Processed: ${event.data}`));
});
workflow.strictHandle([errorEvent], (sendEvent, event) => {
// Error handling logic here
sendEvent(resultEvent.with(`Error handled: ${event.data.message}`));
});
Integrating with External Systems
You can extend your workflows to integrate with external systems:
import { createWorkflow, workflowEvent, getContext } from "@llama-flow/core";
// Define events
const fetchEvent = workflowEvent<string>();
const successEvent = workflowEvent<any>();
const failureEvent = workflowEvent<Error>();
// Create workflow
const workflow = createWorkflow();
// Handle external API calls with proper error handling
workflow.handle([fetchEvent], async (event) => {
const { signal } = getContext();
try {
// Use AbortSignal for cancellation support
const response = await fetch(event.data, { signal });
if (!response.ok) {
throw new Error(`HTTP error: ${response.status}`);
}
const data = await response.json();
return successEvent.with(data);
} catch (error) {
if (error instanceof Error && error.name === "AbortError") {
return failureEvent.with(new Error("Request was aborted"));
}
return failureEvent.with(
error instanceof Error ? error : new Error(String(error)),
);
}
});
// Database integration example
const dbQueryEvent = workflowEvent<{ collection: string; query: any }>();
const dbResultEvent = workflowEvent<any[]>();
workflow.handle([dbQueryEvent], async (event) => {
// Connect to database (pseudo-code)
const db = await connectToDatabase();
try {
const results = await db
.collection(event.data.collection)
.find(event.data.query)
.toArray();
return dbResultEvent.with(results);
} catch (error) {
return failureEvent.with(
error instanceof Error ? error : new Error(String(error)),
);
} finally {
await db.close();
}
});
Handling Complex Asynchronous Patterns
LlamaIndex workflows excel at managing complex asynchronous patterns:
import { createWorkflow, workflowEvent, getContext } from "@llama-flow/core";
import { until } from "@llama-flow/core/stream/until";
// Events for an orchestration workflow
const orchestrateEvent = workflowEvent<string[]>();
const taskEvent = workflowEvent<string>();
const progressEvent = workflowEvent<{ task: string; progress: number }>();
const taskCompleteEvent = workflowEvent<string>();
const aggregateEvent = workflowEvent<any>();
// Create workflow
const workflow = createWorkflow();
// Orchestrator: distribute tasks and collect results
workflow.handle([orchestrateEvent], async (event) => {
const { sendEvent, stream } = getContext();
const tasks = event.data;
// Start all tasks
tasks.forEach((task) => sendEvent(taskEvent.with(task)));
// Track progress
let completed = 0;
const results: Record<string, string> = {};
// Process task completion and progress events
for await (const event of until(stream, () => completed === tasks.length)) {
if (progressEvent.include(event)) {
console.log(`Task ${event.data.task}: ${event.data.progress}%`);
} else if (taskCompleteEvent.include(event)) {
completed++;
results[event.data] = `Completed ${event.data}`;
console.log(`Completed ${completed}/${tasks.length} tasks`);
}
}
return aggregateEvent.with(results);
});
// Task processor
workflow.handle([taskEvent], async (event) => {
const { sendEvent } = getContext();
const task = event.data;
// Simulate task processing with progress updates
for (let progress = 0; progress <= 100; progress += 20) {
sendEvent(progressEvent.with({ task, progress }));
await new Promise((resolve) => setTimeout(resolve, 200));
}
return taskCompleteEvent.with(task);
});
Next Steps
Now that you've explored advanced event handling with workflows, you're ready to build sophisticated applications: