Skip to main content

Background Tasks & Cron Jobs

Erdo provides powerful capabilities for executing background tasks and scheduling recurring operations through cron jobs, enabling you to build robust, automated systems.

Overview

Background processing in Erdo includes:
  • Scheduled cron jobs for recurring tasks
  • Background step execution modes
  • Asynchronous task processing
  • Event-driven background workflows
  • Cleanup and maintenance operations

Cron Jobs

Schedule recurring tasks with cron expressions

Background Steps

Execute agent steps asynchronously

Event Processing

Handle events in background workers

Maintenance Tasks

Automated cleanup and system maintenance

Cron Jobs

Basic Cron Job Setup

package integration

import "encore.dev/cron"

// Refresh available resources for all integration configs every week
var _ = cron.NewJob("refresh-available-resources", cron.JobConfig{
    Title:    "Refresh available resources",
    Schedule: "0 0 * * 1", // Every Monday at midnight
    Endpoint: RefreshAvailableResourcesForAllIntegrationConfigs,
})

//encore:api private
func RefreshAvailableResourcesForAllIntegrationConfigs(ctx context.Context) error {
    // Implementation for refreshing resources
    return nil
}

Cache Cleanup Job

package cache

import (
    "context"
    "log"
    "encore.dev/cron"
)

// CleanupExpiredCacheEntries removes expired cache fallback entries
var _ = cron.NewJob("cleanup-expired-cache", cron.JobConfig{
    Title:    "Cleanup Expired Cache Entries",
    Schedule: "*/10 * * * *", // Every 10 minutes
    Endpoint: CleanupExpiredCacheEntries,
})

//encore:api private
func CleanupExpiredCacheEntries(ctx context.Context) error {
    err := CleanupExpiredFallback(ctx)
    if err != nil {
        log.Printf("Failed to cleanup expired cache entries: %v", err)
        return err
    }
    log.Println("Successfully cleaned up expired cache entries")
    return nil
}

Background Step Execution

Background Execution Mode

Agent steps can be configured to run in the background:
from erdo import Step
from erdo.actions import llm

# Regular step (synchronous)
analysis_step = Step(
    key="analyze_data",
    action=llm.generate(
        messages=[{"role": "user", "content": "Analyze this data: {{data}}"}]
    )
)

# Background step (asynchronous)
background_step = Step(
    key="heavy_processing",
    action=llm.generate(
        messages=[{"role": "user", "content": "Process large dataset: {{dataset}}"}]
    ),
    execution_mode={
        "mode": "all_background"
    }
)

Background Step Patterns

# Background step for expensive operations
data_analysis = Step(
    key="deep_analysis",
    action=code_execution.execute(
        entrypoint="analyze_large_dataset.py",
        code_files=[
            {"filename": "analyze_large_dataset.py", "content": analysis_code}
        ]
    ),
    execution_mode={"mode": "all_background"},
    user_output_visibility="hidden",
    bot_output_visibility="visible"
)

Event-Driven Background Processing

Pub/Sub Handlers

Background processing triggered by events:
// Handle user creation events in background
var _ = pubsub.NewSubscription(
    events.UserCreatedTopic, "setup-user-resources",
    pubsub.SubscriptionConfig[*events.UserCreatedEvent]{
        Handler: pubsub.MethodHandler((*UserService).setupUserResources),
    },
)

func (s *UserService) setupUserResources(ctx context.Context, event *events.UserCreatedEvent) error {
    // Create default organization
    org, err := s.createDefaultOrganization(ctx, event.UserID)
    if err != nil {
        return fmt.Errorf("failed to create organization: %w", err)
    }

    // Setup sample data in background
    go s.setupSampleData(context.Background(), event.UserID, org.ID)

    // Send welcome email
    return s.sendWelcomeEmail(ctx, event.Email)
}

Dataset Processing Pipeline

// Background dataset processing
func (svc *DatasetService) handleDatasetUploaded(ctx context.Context, event *events.DatasetUploadedEvent) error {
    // Start background processing pipeline
    go func() {
        bgCtx := context.Background()

        // Step 1: Validate data format
        if err := svc.validateDataFormat(bgCtx, event.DatasetID); err != nil {
            svc.publishError(bgCtx, event.DatasetID, "validation_failed", err)
            return
        }

        // Step 2: Generate embeddings
        if err := svc.generateEmbeddings(bgCtx, event.DatasetID); err != nil {
            svc.publishError(bgCtx, event.DatasetID, "embedding_failed", err)
            return
        }

        // Step 3: Create analysis
        if err := svc.createAnalysis(bgCtx, event.DatasetID); err != nil {
            svc.publishError(bgCtx, event.DatasetID, "analysis_failed", err)
            return
        }

        // Publish completion event
        svc.publishCompletion(bgCtx, event.DatasetID)
    }()

    return nil
}

Task Queues & Workers

High-Throughput Processing

type TaskProcessor struct {
    taskQueue   chan Task
    workerCount int
    done        chan struct{}
}

func NewTaskProcessor(workerCount int) *TaskProcessor {
    return &TaskProcessor{
        taskQueue:   make(chan Task, 10000), // Buffer for high throughput
        workerCount: workerCount,
        done:        make(chan struct{}),
    }
}

func (tp *TaskProcessor) Start() {
    for i := 0; i < tp.workerCount; i++ {
        go tp.worker(i)
    }
}

func (tp *TaskProcessor) worker(id int) {
    log.Printf("Starting worker %d", id)
    for {
        select {
        case task := <-tp.taskQueue:
            if err := tp.processTask(task); err != nil {
                log.Printf("Worker %d failed to process task: %v", id, err)
            }
        case <-tp.done:
            log.Printf("Stopping worker %d", id)
            return
        }
    }
}

func (tp *TaskProcessor) AddTask(task Task) error {
    select {
    case tp.taskQueue <- task:
        return nil
    default:
        return fmt.Errorf("task queue is full")
    }
}

Batch Processing

type BatchProcessor struct {
    items       []BatchItem
    maxBatch    int
    timeout     time.Duration
    processor   func([]BatchItem) error
    timer       *time.Timer
    mu          sync.Mutex
}

func (bp *BatchProcessor) Add(item BatchItem) {
    bp.mu.Lock()
    defer bp.mu.Unlock()

    bp.items = append(bp.items, item)

    // Start timer on first item
    if len(bp.items) == 1 {
        bp.timer = time.AfterFunc(bp.timeout, bp.flush)
    }

    // Process immediately if batch is full
    if len(bp.items) >= bp.maxBatch {
        bp.flushLocked()
    }
}

func (bp *BatchProcessor) flush() {
    bp.mu.Lock()
    defer bp.mu.Unlock()
    bp.flushLocked()
}

func (bp *BatchProcessor) flushLocked() {
    if len(bp.items) == 0 {
        return
    }

    // Stop timer
    if bp.timer != nil {
        bp.timer.Stop()
    }

    // Process batch
    items := make([]BatchItem, len(bp.items))
    copy(items, bp.items)
    bp.items = bp.items[:0] // Clear slice

    // Process in background
    go func() {
        if err := bp.processor(items); err != nil {
            log.Printf("Batch processing failed: %v", err)
        }
    }()
}

Error Handling & Monitoring

Retry Logic

func (s *Service) processWithRetry(ctx context.Context, task Task) error {
    return s.retryWithBackoff(ctx, func() error {
        return s.processTask(ctx, task)
    }, 3, time.Second)
}

func (s *Service) retryWithBackoff(ctx context.Context, fn func() error, maxRetries int, baseDelay time.Duration) error {
    var lastErr error

    for attempt := 0; attempt < maxRetries; attempt++ {
        if attempt > 0 {
            delay := baseDelay * time.Duration(1<<uint(attempt-1)) // Exponential backoff
            select {
            case <-time.After(delay):
            case <-ctx.Done():
                return ctx.Err()
            }
        }

        if err := fn(); err != nil {
            lastErr = err
            log.Printf("Attempt %d failed: %v", attempt+1, err)
            continue
        }

        return nil // Success
    }

    return fmt.Errorf("failed after %d attempts: %w", maxRetries, lastErr)
}

Dead Letter Queues

type TaskHandler struct {
    mainQueue       chan Task
    deadLetterQueue chan Task
    maxRetries      int
}

func (th *TaskHandler) ProcessTask(task Task) {
    if task.Attempts >= th.maxRetries {
        // Send to dead letter queue
        select {
        case th.deadLetterQueue <- task:
            log.Printf("Task sent to dead letter queue after %d attempts", task.Attempts)
        default:
            log.Printf("Dead letter queue is full, dropping task")
        }
        return
    }

    if err := th.executeTask(task); err != nil {
        // Increment attempts and retry
        task.Attempts++
        task.LastError = err.Error()

        // Add delay before retry
        go func() {
            time.Sleep(time.Duration(task.Attempts) * time.Second)
            th.mainQueue <- task
        }()
    }
}

Best Practices

  • Use appropriate cron expressions for timing - Consider timezone implications for global systems - Implement job overlap prevention - Monitor job execution times and success rates

Common Patterns

// Sync external data on schedule
var _ = cron.NewJob("sync-external-data", cron.JobConfig{
    Title:    "Sync External Data Sources",
    Schedule: "0 2 * * *", // Daily at 2 AM
    Endpoint: SyncExternalDataSources,
})

func SyncExternalDataSources(ctx context.Context) error {
    // Get all active integrations
    // Process each integration in background
    // Update sync status
    return nil
}
// Regular cleanup of temporary data
var _ = cron.NewJob("cleanup-temp-files", cron.JobConfig{
    Title:    "Cleanup Temporary Files",
    Schedule: "0 1 * * *", // Daily at 1 AM
    Endpoint: CleanupTemporaryFiles,
})

func CleanupTemporaryFiles(ctx context.Context) error {
    // Find files older than 24 hours
    // Delete temporary files
    // Clean up database records
    return nil
}
// Generate weekly reports
var _ = cron.NewJob("generate-weekly-reports", cron.JobConfig{
    Title:    "Generate Weekly Reports",
    Schedule: "0 9 * * 1", // Mondays at 9 AM
    Endpoint: GenerateWeeklyReports,
})

func GenerateWeeklyReports(ctx context.Context) error {
    // Aggregate data from previous week
    // Generate reports for each organization
    // Send email notifications
    return nil
}
Background tasks and cron jobs in Erdo provide a robust foundation for building automated, scalable systems that can handle complex workflows and maintenance operations reliably.