Complex Workflows
Learn how to build sophisticated agent workflows with multiple steps, conditional execution, error handling, and parallel processing patterns.Workflow Fundamentals
Sequential Workflows
Basic step-by-step execution with dependencies:Copy
from erdo import Agent, Step
from erdo.actions import llm, codeexec, memory
research_agent = Agent(
name="research_workflow",
description="Multi-step research and analysis workflow"
)
# Step 1: Research topic
research_step = research_agent.step(
llm.message(
model="claude-sonnet-4",
query="Research {{topic}} and gather key information",
tools=[web_search_tool, web_parser_tool]
),
key="research"
)
# Step 2: Analyze findings (depends on research)
analysis_step = research_agent.step(
llm.message(
model="claude-sonnet-4",
query="Analyze research findings: {{research.output.content}}",
system_prompt="You are an expert analyst. Provide structured insights."
),
key="analyze",
depends_on=research_step
)
# Step 3: Generate report (depends on analysis)
report_step = research_agent.step(
llm.message(
model="claude-sonnet-4",
query="Create comprehensive report from analysis: {{analyze.output.content}}"
),
key="report",
depends_on=analysis_step
)
# Step 4: Store insights
memory_step = research_agent.step(
memory.store(memory=
memory={
"content": "{{report.output.content}}",
"description": "Research report on {{topic}}",
"tags": ["research", "{{topic}}", "report"]
}
),
key="store",
depends_on=report_step
)
Parallel Processing
Execute multiple steps simultaneously for efficiency:Copy
from erdo import ExecutionMode
# Create parallel data processing workflow
data_processor = Agent(
name="parallel_processor",
description="Processes multiple data sources in parallel"
)
# These steps can run in parallel
process_sales_step = data_processor.step(
codeexec.execute(
code="process_sales_data({{sales_data}})"
),
key="process_sales"
)
process_marketing_step = data_processor.step(
codeexec.execute(
code="process_marketing_data({{marketing_data}})"
),
key="process_marketing"
)
process_customer_step = data_processor.step(
codeexec.execute(
code="process_customer_data({{customer_data}})"
),
key="process_customer"
)
# Aggregate results after parallel processing
aggregate_step = data_processor.step(
codeexec.execute(
code="""
# Combine results from parallel processing
sales_results = {{process_sales.output}}
marketing_results = {{process_marketing.output}}
customer_results = {{process_customer.output}}
combined_insights = aggregate_data_insights(
sales_results, marketing_results, customer_results
)
generate_executive_summary(combined_insights)
"""
),
key="aggregate",
depends_on=[process_sales_step, process_marketing_step, process_customer_step]
)
Conditional Execution
Condition-Based Workflows
Execute steps based on dynamic conditions:Copy
from erdo._generated.condition import (
And, Or, Not, IsSuccess, IsError,
TextContains, GreaterThan, LessThan
)
# Smart processing workflow
smart_processor = Agent(
name="smart_processor",
description="Adapts processing based on data characteristics"
)
# Initial data analysis
analyze_data_step = smart_processor.step(
llm.message(
model="claude-sonnet-4",
query="Analyze data characteristics: {{input_data}}",
response_format={
"Type": "json_schema",
"Schema": {
"schema": {
"type": "object",
"properties": {
"data_size": {"type": "number"},
"data_quality": {"type": "string"},
"complexity": {"type": "string"},
"processing_recommendation": {"type": "string"}
}
}
}
}
),
key="analyze_data"
)
# Simple processing for small datasets
analyze_data_step.on(
IsSuccess() & LessThan("data_size", 1000) & TextContains("complexity", "low"),
codeexec.execute(
code="simple_data_processing({{input_data}})"
)
)
# Advanced processing for large or complex datasets
analyze_data_step.on(
IsSuccess() & (GreaterThan("data_size", 1000) | TextContains("complexity", "high")),
codeexec.execute(
code="advanced_data_processing({{input_data}})"
)
)
Dynamic Branching
Create workflows that branch based on runtime conditions:Copy
# Content processing workflow with dynamic routing
content_processor = Agent(
name="content_router",
description="Routes content based on type and characteristics"
)
# Classify content
classify_step = content_processor.step(
llm.message(
model="claude-sonnet-4",
query="Classify this content: {{content}}",
response_format={
"Type": "json_schema",
"Schema": {
"schema": {
"type": "object",
"properties": {
"content_type": {"type": "string"},
"language": {"type": "string"},
"sentiment": {"type": "string"},
"priority": {"type": "string"}
}
}
}
}
),
key="classify"
)
# Route to different processing based on classification
classify_step.on(
TextContains("content_type", "text"),
llm.message(
model="claude-sonnet-4",
query="Process text content: {{content}}"
)
)
classify_step.on(
TextContains("content_type", "image"),
codeexec.execute(
code="process_image_content({{content}})"
)
)
classify_step.on(
TextContains("content_type", "document"),
llm.message(
model="claude-sonnet-4",
query="Extract and summarize document: {{content}}"
)
)
Iterative Processing
Loop Over Collections
Process arrays or collections of data:Copy
# Batch email processing
email_processor = Agent(
name="email_batch_processor",
description="Processes multiple emails with iteration"
)
# Process each email in the batch
process_email_step = email_processor.step(
llm.message(
model="claude-sonnet-4",
query="Process this email: {{email}}",
response_format={
"Type": "json_schema",
"Schema": {
"schema": {
"type": "object",
"properties": {
"category": {"type": "string"},
"priority": {"type": "string"},
"action_required": {"type": "boolean"},
"summary": {"type": "string"}
}
}
}
}
),
key="process_email"
)
# Generate summary after processing all emails
summary_step = email_processor.step(
llm.message(
model="claude-sonnet-4",
query="Generate summary of processed emails: {{process_email.output.all_results}}"
),
key="summary",
depends_on=process_email_step
)
Conditional Iteration
Iterate with filtering conditions:Copy
# Process only high-priority items
priority_processor = Agent(
name="priority_processor",
description="Processes only high-priority items from a collection"
)
process_priority_item = priority_processor.step(
codeexec.execute(
code="process_priority_item({{item}})"
),
key="process_item"
)
Error Handling and Recovery
Comprehensive Error Handling
Build robust workflows with multiple error recovery strategies:Copy
from erdo import ResultHandler
# Resilient data processing workflow
resilient_processor = Agent(
name="resilient_processor",
description="Data processor with comprehensive error handling"
)
# Main processing step
main_processing = resilient_processor.step(
codeexec.execute(
code="{{generated_code}}",
parameters={"data": "{{input_data}}"}
),
key="main_process"
)
# Success handler - normal flow continuation
main_processing.on(
IsSuccess(),
utils.send_status(
status="completed",
message="Processing completed successfully",
data="{{main_processing.output}}"
)
)
# Retry handler for transient errors
main_processing.on(
IsError() & LessThan("retry_count", 3) & TextContains("error", "timeout"),
utils.echo(
data={
"retry_count": "{{increment retry_count}}",
"backoff_time": "{{multiply retry_count 5}}",
"last_error": "{{main_processing.error}}"
}
)
)
# Fallback handler for code generation errors
main_processing.on(
IsError() & TextContains("error", "syntax"),
llm.message(
model="claude-sonnet-4",
query="Fix the code error: {{main_processing.error}}\nOriginal code: {{generated_code}}",
system_prompt="Fix syntax and logical errors in the code."
)
)
# Critical error handler
main_processing.on(
IsError() & GreaterThan("retry_count", 2),
utils.send_status(
status="critical_failure",
message="Processing failed after multiple attempts",
error="{{main_processing.error}}"
)
)
Advanced Workflow Patterns
Pipeline Orchestration
Coordinate multiple agents in complex pipelines:Copy
# Multi-agent data pipeline
pipeline_orchestrator = Agent(
name="data_pipeline",
description="Orchestrates multi-stage data processing pipeline"
)
# Stage 1: Data ingestion and validation
ingest_data = pipeline_orchestrator.step(
bot.invoke(
bot_name="data ingestion bot",
parameters={"source": "{{data_source}}"}
),
key="ingest"
)
# Stage 2: Data cleaning and preprocessing
clean_data = pipeline_orchestrator.step(
bot.invoke(
bot_name="data cleaner bot",
parameters={"raw_data": "{{ingest.output.processed_data}}"}
),
key="clean",
depends_on=ingest_data
)
# Stage 3: Feature engineering
engineer_features = pipeline_orchestrator.step(
bot.invoke(
bot_name="feature engineering bot",
parameters={"clean_data": "{{clean.output.cleaned_data}}"}
),
key="features",
depends_on=clean_data
)
# Stage 4: Model training
train_model = pipeline_orchestrator.step(
bot.invoke(
bot_name="model training bot",
parameters={"features": "{{features.output.feature_data}}"}
),
key="train",
depends_on=engineer_features
)
# Stage 5: Model validation
validate_model = pipeline_orchestrator.step(
bot.invoke(
bot_name="model validation bot",
parameters={"model": "{{train.output.trained_model}}"}
),
key="validate",
depends_on=train_model
)
Dynamic Workflow Generation
Create workflows that adapt based on runtime conditions:Copy
# Adaptive workflow generator
adaptive_agent = Agent(
name="adaptive_workflow",
description="Generates workflow steps based on data characteristics"
)
# Analyze requirements and generate workflow plan
plan_workflow = adaptive_agent.step(
llm.message(
model="claude-sonnet-4",
query="Create processing plan for: {{requirements}}",
response_format={
"Type": "json_schema",
"Schema": {
"schema": {
"type": "object",
"properties": {
"steps": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"action": {"type": "string"},
"condition": {"type": "string"}
}
}
}
}
}
}
}
),
key="plan"
)
# Execute dynamic steps based on plan
execute_dynamic_step = adaptive_agent.step(
codeexec.execute(
code="""
# Execute workflow steps dynamically
plan = {{plan.output.steps}}
for step in plan:
if evaluate_condition(step['condition']):
execute_step(step['name'], step['action'])
"""
),
key="execute_dynamic",
depends_on=plan_workflow
)
Performance Optimization
Optimized Workflow Patterns
- Caching Strategy
- Batch Processing
- Resource Management
Copy
# Implement intelligent caching
cached_processor = agent.step(
codeexec.execute(
code="""
# Check cache first
cache_key = generate_cache_key({{input_params}})
cached_result = get_from_cache(cache_key)
if cached_result:
return cached_result
else:
result = expensive_processing({{input_data}})
cache_result(cache_key, result)
return result
"""
),
key="cached_process"
)
Copy
# Process items in optimized batches
batch_processor = agent.step(
codeexec.execute(
code="""
# Process in optimal batch sizes
batch_size = calculate_optimal_batch_size({{data_size}})
results = []
for batch in chunk_data({{input_data}}, batch_size):
batch_result = process_batch(batch)
results.extend(batch_result)
return aggregate_results(results)
"""
),
key="batch_process"
)
Copy
# Manage computational resources
resource_aware_step = agent.step(
codeexec.execute(
code="""
# Adjust processing based on available resources
available_memory = get_available_memory()
cpu_cores = get_cpu_count()
if available_memory > 8 * GB:
use_memory_intensive_algorithm()
else:
use_disk_based_algorithm()
set_parallel_workers(min(cpu_cores, data_chunks))
"""
),
key="resource_aware"
)
Monitoring and Observability
Workflow Monitoring
Add comprehensive monitoring to your workflows:Copy
# Monitored workflow with detailed tracking
monitored_agent = Agent(
name="monitored_workflow",
description="Workflow with comprehensive monitoring and logging"
)
# Add monitoring step
monitor_start = monitored_agent.step(
utils.send_status(
status="started",
message="Workflow started",
details={
"timestamp": "{{current_timestamp}}",
"input_size": "{{data_size}}",
"expected_duration": "{{estimated_time}}"
}
),
key="monitor_start"
)
# Main processing with progress updates
main_process = monitored_agent.step(
codeexec.execute(
code="""
# Processing with progress tracking
total_items = len({{input_data}})
processed = 0
for item in {{input_data}}:
process_item(item)
processed += 1
# Update progress every 10%
if processed % (total_items // 10) == 0:
progress = (processed / total_items) * 100
send_progress_update(f"Progress: {progress:.1f}%")
return {"processed_count": processed, "success_rate": calculate_success_rate()}
"""
),
key="main_process",
depends_on=monitor_start
)
# Monitor completion
monitor_end = monitored_agent.step(
utils.send_status(
status="completed",
message="Workflow completed successfully",
details={
"duration": "{{execution_time}}",
"processed_items": "{{main_process.output.processed_count}}",
"success_rate": "{{main_process.output.success_rate}}"
}
),
key="monitor_end",
depends_on=main_process
)
Error Handling
Master error handling patterns and recovery strategies
Performance Tips
Optimize your workflows for speed and efficiency
Testing Workflows
Learn how to test complex multi-step workflows
Real Examples
See complete workflow implementations in action
Best Practices
Workflow Design
Workflow Design
- Keep individual steps focused and atomic - Use clear, descriptive step names and keys - Plan for failure scenarios from the start - Document complex conditional logic - Consider resource usage and optimization
Error Handling
Error Handling
- Implement multiple levels of error recovery - Use appropriate retry strategies (exponential backoff) - Distinguish between recoverable and fatal errors - Add comprehensive logging and monitoring - Test error scenarios thoroughly
Performance
Performance
- Use parallel processing where appropriate - Implement caching for expensive operations - Optimize batch sizes for your data - Monitor resource usage and bottlenecks - Consider workflow complexity vs. maintainability