Skip to main content

Complex Workflows

Learn how to build sophisticated agent workflows with multiple steps, conditional execution, error handling, and parallel processing patterns.

Workflow Fundamentals

Sequential Workflows

Basic step-by-step execution with dependencies:
from erdo import Agent, Step
from erdo.actions import llm, codeexec, memory

research_agent = Agent(
    name="research_workflow",
    description="Multi-step research and analysis workflow"
)

# Step 1: Research topic
research_step = research_agent.step(
    llm.message(
        model="claude-sonnet-4",
        query="Research {{topic}} and gather key information",
        tools=[web_search_tool, web_parser_tool]
    ),
    key="research"
)

# Step 2: Analyze findings (depends on research)
analysis_step = research_agent.step(
    llm.message(
        model="claude-sonnet-4",
        query="Analyze research findings: {{research.output.content}}",
        system_prompt="You are an expert analyst. Provide structured insights."
    ),
    key="analyze",
    depends_on=research_step
)

# Step 3: Generate report (depends on analysis)
report_step = research_agent.step(
    llm.message(
        model="claude-sonnet-4",
        query="Create comprehensive report from analysis: {{analyze.output.content}}"
    ),
    key="report",
    depends_on=analysis_step
)

# Step 4: Store insights
memory_step = research_agent.step(
    memory.store(memory=
        memory={
            "content": "{{report.output.content}}",
            "description": "Research report on {{topic}}",
            "tags": ["research", "{{topic}}", "report"]
        }
    ),
    key="store",
    depends_on=report_step
)

Parallel Processing

Execute multiple steps simultaneously for efficiency:
from erdo import ExecutionMode

# Create parallel data processing workflow
data_processor = Agent(
    name="parallel_processor",
    description="Processes multiple data sources in parallel"
)

# These steps can run in parallel
process_sales_step = data_processor.step(
    codeexec.execute(
        code="process_sales_data({{sales_data}})"
    ),
    key="process_sales"
)

process_marketing_step = data_processor.step(
    codeexec.execute(
        code="process_marketing_data({{marketing_data}})"
    ),
    key="process_marketing"
)

process_customer_step = data_processor.step(
    codeexec.execute(
        code="process_customer_data({{customer_data}})"
    ),
    key="process_customer"
)

# Aggregate results after parallel processing
aggregate_step = data_processor.step(
    codeexec.execute(
        code="""
# Combine results from parallel processing
sales_results = {{process_sales.output}}
marketing_results = {{process_marketing.output}}
customer_results = {{process_customer.output}}

combined_insights = aggregate_data_insights(
    sales_results, marketing_results, customer_results
)
generate_executive_summary(combined_insights)
"""
    ),
    key="aggregate",
    depends_on=[process_sales_step, process_marketing_step, process_customer_step]
)

Conditional Execution

Condition-Based Workflows

Execute steps based on dynamic conditions:
from erdo._generated.condition import (
    And, Or, Not, IsSuccess, IsError,
    TextContains, GreaterThan, LessThan
)

# Smart processing workflow
smart_processor = Agent(
    name="smart_processor",
    description="Adapts processing based on data characteristics"
)

# Initial data analysis
analyze_data_step = smart_processor.step(
    llm.message(
        model="claude-sonnet-4",
        query="Analyze data characteristics: {{input_data}}",
        response_format={
            "Type": "json_schema",
            "Schema": {
                "schema": {
                    "type": "object",
                    "properties": {
                        "data_size": {"type": "number"},
                        "data_quality": {"type": "string"},
                        "complexity": {"type": "string"},
                        "processing_recommendation": {"type": "string"}
                    }
                }
            }
        }
    ),
    key="analyze_data"
)

# Simple processing for small datasets
analyze_data_step.on(
    IsSuccess() & LessThan("data_size", 1000) & TextContains("complexity", "low"),
    codeexec.execute(
        code="simple_data_processing({{input_data}})"
    )
)

# Advanced processing for large or complex datasets
analyze_data_step.on(
    IsSuccess() & (GreaterThan("data_size", 1000) | TextContains("complexity", "high")),
    codeexec.execute(
        code="advanced_data_processing({{input_data}})"
    )
)

Dynamic Branching

Create workflows that branch based on runtime conditions:
# Content processing workflow with dynamic routing
content_processor = Agent(
    name="content_router",
    description="Routes content based on type and characteristics"
)

# Classify content
classify_step = content_processor.step(
    llm.message(
        model="claude-sonnet-4",
        query="Classify this content: {{content}}",
        response_format={
            "Type": "json_schema",
            "Schema": {
                "schema": {
                    "type": "object",
                    "properties": {
                        "content_type": {"type": "string"},
                        "language": {"type": "string"},
                        "sentiment": {"type": "string"},
                        "priority": {"type": "string"}
                    }
                }
            }
        }
    ),
    key="classify"
)

# Route to different processing based on classification
classify_step.on(
    TextContains("content_type", "text"),
    llm.message(
        model="claude-sonnet-4",
        query="Process text content: {{content}}"
    )
)

classify_step.on(
    TextContains("content_type", "image"),
    codeexec.execute(
        code="process_image_content({{content}})"
    )
)

classify_step.on(
    TextContains("content_type", "document"),
    llm.message(
        model="claude-sonnet-4",
        query="Extract and summarize document: {{content}}"
    )
)

Iterative Processing

Loop Over Collections

Process arrays or collections of data:
# Batch email processing
email_processor = Agent(
    name="email_batch_processor",
    description="Processes multiple emails with iteration"
)

# Process each email in the batch
process_email_step = email_processor.step(
    llm.message(
        model="claude-sonnet-4",
        query="Process this email: {{email}}",
        response_format={
            "Type": "json_schema",
            "Schema": {
                "schema": {
                    "type": "object",
                    "properties": {
                        "category": {"type": "string"},
                        "priority": {"type": "string"},
                        "action_required": {"type": "boolean"},
                        "summary": {"type": "string"}
                    }
                }
            }
        }
    ),
    key="process_email"
)

# Generate summary after processing all emails
summary_step = email_processor.step(
    llm.message(
        model="claude-sonnet-4",
        query="Generate summary of processed emails: {{process_email.output.all_results}}"
    ),
    key="summary",
    depends_on=process_email_step
)

Conditional Iteration

Iterate with filtering conditions:
# Process only high-priority items
priority_processor = Agent(
    name="priority_processor",
    description="Processes only high-priority items from a collection"
)

process_priority_item = priority_processor.step(
    codeexec.execute(
        code="process_priority_item({{item}})"
    ),
    key="process_item"
)

Error Handling and Recovery

Comprehensive Error Handling

Build robust workflows with multiple error recovery strategies:
from erdo import ResultHandler

# Resilient data processing workflow
resilient_processor = Agent(
    name="resilient_processor",
    description="Data processor with comprehensive error handling"
)

# Main processing step
main_processing = resilient_processor.step(
    codeexec.execute(
        code="{{generated_code}}",
        parameters={"data": "{{input_data}}"}
    ),
    key="main_process"
)

# Success handler - normal flow continuation
main_processing.on(
    IsSuccess(),
    utils.send_status(
        status="completed",
        message="Processing completed successfully",
        data="{{main_processing.output}}"
    )
)

# Retry handler for transient errors
main_processing.on(
    IsError() & LessThan("retry_count", 3) & TextContains("error", "timeout"),
    utils.echo(
        data={
            "retry_count": "{{increment retry_count}}",
            "backoff_time": "{{multiply retry_count 5}}",
            "last_error": "{{main_processing.error}}"
        }
    )
)

# Fallback handler for code generation errors
main_processing.on(
    IsError() & TextContains("error", "syntax"),
    llm.message(
        model="claude-sonnet-4",
        query="Fix the code error: {{main_processing.error}}\nOriginal code: {{generated_code}}",
        system_prompt="Fix syntax and logical errors in the code."
    )
)

# Critical error handler
main_processing.on(
    IsError() & GreaterThan("retry_count", 2),
    utils.send_status(
        status="critical_failure",
        message="Processing failed after multiple attempts",
        error="{{main_processing.error}}"
    )
)

Advanced Workflow Patterns

Pipeline Orchestration

Coordinate multiple agents in complex pipelines:
# Multi-agent data pipeline
pipeline_orchestrator = Agent(
    name="data_pipeline",
    description="Orchestrates multi-stage data processing pipeline"
)

# Stage 1: Data ingestion and validation
ingest_data = pipeline_orchestrator.step(
    bot.invoke(
        bot_name="data ingestion bot",
        parameters={"source": "{{data_source}}"}
    ),
    key="ingest"
)

# Stage 2: Data cleaning and preprocessing
clean_data = pipeline_orchestrator.step(
    bot.invoke(
        bot_name="data cleaner bot",
        parameters={"raw_data": "{{ingest.output.processed_data}}"}
    ),
    key="clean",
    depends_on=ingest_data
)

# Stage 3: Feature engineering
engineer_features = pipeline_orchestrator.step(
    bot.invoke(
        bot_name="feature engineering bot",
        parameters={"clean_data": "{{clean.output.cleaned_data}}"}
    ),
    key="features",
    depends_on=clean_data
)

# Stage 4: Model training
train_model = pipeline_orchestrator.step(
    bot.invoke(
        bot_name="model training bot",
        parameters={"features": "{{features.output.feature_data}}"}
    ),
    key="train",
    depends_on=engineer_features
)

# Stage 5: Model validation
validate_model = pipeline_orchestrator.step(
    bot.invoke(
        bot_name="model validation bot",
        parameters={"model": "{{train.output.trained_model}}"}
    ),
    key="validate",
    depends_on=train_model
)

Dynamic Workflow Generation

Create workflows that adapt based on runtime conditions:
# Adaptive workflow generator
adaptive_agent = Agent(
    name="adaptive_workflow",
    description="Generates workflow steps based on data characteristics"
)

# Analyze requirements and generate workflow plan
plan_workflow = adaptive_agent.step(
    llm.message(
        model="claude-sonnet-4",
        query="Create processing plan for: {{requirements}}",
        response_format={
            "Type": "json_schema",
            "Schema": {
                "schema": {
                    "type": "object",
                    "properties": {
                        "steps": {
                            "type": "array",
                            "items": {
                                "type": "object",
                                "properties": {
                                    "name": {"type": "string"},
                                    "action": {"type": "string"},
                                    "condition": {"type": "string"}
                                }
                            }
                        }
                    }
                }
            }
        }
    ),
    key="plan"
)

# Execute dynamic steps based on plan
execute_dynamic_step = adaptive_agent.step(
    codeexec.execute(
        code="""
# Execute workflow steps dynamically
plan = {{plan.output.steps}}
for step in plan:
    if evaluate_condition(step['condition']):
        execute_step(step['name'], step['action'])
"""
    ),
    key="execute_dynamic",
    depends_on=plan_workflow
)

Performance Optimization

Optimized Workflow Patterns

# Implement intelligent caching
cached_processor = agent.step(
    codeexec.execute(
        code="""
# Check cache first
cache_key = generate_cache_key({{input_params}})
cached_result = get_from_cache(cache_key)

if cached_result:
    return cached_result
else:
    result = expensive_processing({{input_data}})
    cache_result(cache_key, result)
    return result
"""
    ),
    key="cached_process"
)

Monitoring and Observability

Workflow Monitoring

Add comprehensive monitoring to your workflows:
# Monitored workflow with detailed tracking
monitored_agent = Agent(
    name="monitored_workflow",
    description="Workflow with comprehensive monitoring and logging"
)

# Add monitoring step
monitor_start = monitored_agent.step(
    utils.send_status(
        status="started",
        message="Workflow started",
        details={
            "timestamp": "{{current_timestamp}}",
            "input_size": "{{data_size}}",
            "expected_duration": "{{estimated_time}}"
        }
    ),
    key="monitor_start"
)

# Main processing with progress updates
main_process = monitored_agent.step(
    codeexec.execute(
        code="""
# Processing with progress tracking
total_items = len({{input_data}})
processed = 0

for item in {{input_data}}:
    process_item(item)
    processed += 1

    # Update progress every 10%
    if processed % (total_items // 10) == 0:
        progress = (processed / total_items) * 100
        send_progress_update(f"Progress: {progress:.1f}%")

return {"processed_count": processed, "success_rate": calculate_success_rate()}
"""
    ),
    key="main_process",
    depends_on=monitor_start
)

# Monitor completion
monitor_end = monitored_agent.step(
    utils.send_status(
        status="completed",
        message="Workflow completed successfully",
        details={
            "duration": "{{execution_time}}",
            "processed_items": "{{main_process.output.processed_count}}",
            "success_rate": "{{main_process.output.success_rate}}"
        }
    ),
    key="monitor_end",
    depends_on=main_process
)

Best Practices

  • Keep individual steps focused and atomic - Use clear, descriptive step names and keys - Plan for failure scenarios from the start - Document complex conditional logic - Consider resource usage and optimization
  • Implement multiple levels of error recovery - Use appropriate retry strategies (exponential backoff) - Distinguish between recoverable and fatal errors - Add comprehensive logging and monitoring - Test error scenarios thoroughly
  • Use parallel processing where appropriate - Implement caching for expensive operations - Optimize batch sizes for your data - Monitor resource usage and bottlenecks - Consider workflow complexity vs. maintainability