Background Tasks & Cron Jobs
Erdo provides powerful capabilities for executing background tasks and scheduling recurring operations through cron jobs, enabling you to build robust, automated systems.Overview
Background processing in Erdo includes:- Scheduled cron jobs for recurring tasks
- Background step execution modes
- Asynchronous task processing
- Event-driven background workflows
- Cleanup and maintenance operations
Cron Jobs
Schedule recurring tasks with cron expressions
Background Steps
Execute agent steps asynchronously
Event Processing
Handle events in background workers
Maintenance Tasks
Automated cleanup and system maintenance
Cron Jobs
Basic Cron Job Setup
Copy
package integration
import "encore.dev/cron"
// Refresh available resources for all integration configs every week
var _ = cron.NewJob("refresh-available-resources", cron.JobConfig{
Title: "Refresh available resources",
Schedule: "0 0 * * 1", // Every Monday at midnight
Endpoint: RefreshAvailableResourcesForAllIntegrationConfigs,
})
//encore:api private
func RefreshAvailableResourcesForAllIntegrationConfigs(ctx context.Context) error {
// Implementation for refreshing resources
return nil
}
Cache Cleanup Job
Copy
package cache
import (
"context"
"log"
"encore.dev/cron"
)
// CleanupExpiredCacheEntries removes expired cache fallback entries
var _ = cron.NewJob("cleanup-expired-cache", cron.JobConfig{
Title: "Cleanup Expired Cache Entries",
Schedule: "*/10 * * * *", // Every 10 minutes
Endpoint: CleanupExpiredCacheEntries,
})
//encore:api private
func CleanupExpiredCacheEntries(ctx context.Context) error {
err := CleanupExpiredFallback(ctx)
if err != nil {
log.Printf("Failed to cleanup expired cache entries: %v", err)
return err
}
log.Println("Successfully cleaned up expired cache entries")
return nil
}
Background Step Execution
Background Execution Mode
Agent steps can be configured to run in the background:Copy
from erdo import Step
from erdo.actions import llm
# Regular step (synchronous)
analysis_step = Step(
key="analyze_data",
action=llm.generate(
messages=[{"role": "user", "content": "Analyze this data: {{data}}"}]
)
)
# Background step (asynchronous)
background_step = Step(
key="heavy_processing",
action=llm.generate(
messages=[{"role": "user", "content": "Process large dataset: {{dataset}}"}]
),
execution_mode={
"mode": "all_background"
}
)
Background Step Patterns
- Long-running Analysis
- Batch Processing
- External API Calls
Copy
# Background step for expensive operations
data_analysis = Step(
key="deep_analysis",
action=code_execution.execute(
entrypoint="analyze_large_dataset.py",
code_files=[
{"filename": "analyze_large_dataset.py", "content": analysis_code}
]
),
execution_mode={"mode": "all_background"},
user_output_visibility="hidden",
bot_output_visibility="visible"
)
Copy
# Process multiple items in background
batch_processor = Step(
key="batch_process",
action=utils.iterate_over(
data="{{items}}",
action=llm.generate(
messages=[{"role": "user", "content": "Process: {{item}}"}]
)
),
execution_mode={"mode": "all_background"}
)
Copy
# Background API integration
api_sync = Step(
key="sync_external_data",
action=web_research.fetch_url(
url="{{api_endpoint}}",
headers={"Authorization": "Bearer {{api_token}}"}
),
execution_mode={"mode": "all_background"}
)
Event-Driven Background Processing
Pub/Sub Handlers
Background processing triggered by events:Copy
// Handle user creation events in background
var _ = pubsub.NewSubscription(
events.UserCreatedTopic, "setup-user-resources",
pubsub.SubscriptionConfig[*events.UserCreatedEvent]{
Handler: pubsub.MethodHandler((*UserService).setupUserResources),
},
)
func (s *UserService) setupUserResources(ctx context.Context, event *events.UserCreatedEvent) error {
// Create default organization
org, err := s.createDefaultOrganization(ctx, event.UserID)
if err != nil {
return fmt.Errorf("failed to create organization: %w", err)
}
// Setup sample data in background
go s.setupSampleData(context.Background(), event.UserID, org.ID)
// Send welcome email
return s.sendWelcomeEmail(ctx, event.Email)
}
Dataset Processing Pipeline
Copy
// Background dataset processing
func (svc *DatasetService) handleDatasetUploaded(ctx context.Context, event *events.DatasetUploadedEvent) error {
// Start background processing pipeline
go func() {
bgCtx := context.Background()
// Step 1: Validate data format
if err := svc.validateDataFormat(bgCtx, event.DatasetID); err != nil {
svc.publishError(bgCtx, event.DatasetID, "validation_failed", err)
return
}
// Step 2: Generate embeddings
if err := svc.generateEmbeddings(bgCtx, event.DatasetID); err != nil {
svc.publishError(bgCtx, event.DatasetID, "embedding_failed", err)
return
}
// Step 3: Create analysis
if err := svc.createAnalysis(bgCtx, event.DatasetID); err != nil {
svc.publishError(bgCtx, event.DatasetID, "analysis_failed", err)
return
}
// Publish completion event
svc.publishCompletion(bgCtx, event.DatasetID)
}()
return nil
}
Task Queues & Workers
High-Throughput Processing
Copy
type TaskProcessor struct {
taskQueue chan Task
workerCount int
done chan struct{}
}
func NewTaskProcessor(workerCount int) *TaskProcessor {
return &TaskProcessor{
taskQueue: make(chan Task, 10000), // Buffer for high throughput
workerCount: workerCount,
done: make(chan struct{}),
}
}
func (tp *TaskProcessor) Start() {
for i := 0; i < tp.workerCount; i++ {
go tp.worker(i)
}
}
func (tp *TaskProcessor) worker(id int) {
log.Printf("Starting worker %d", id)
for {
select {
case task := <-tp.taskQueue:
if err := tp.processTask(task); err != nil {
log.Printf("Worker %d failed to process task: %v", id, err)
}
case <-tp.done:
log.Printf("Stopping worker %d", id)
return
}
}
}
func (tp *TaskProcessor) AddTask(task Task) error {
select {
case tp.taskQueue <- task:
return nil
default:
return fmt.Errorf("task queue is full")
}
}
Batch Processing
Copy
type BatchProcessor struct {
items []BatchItem
maxBatch int
timeout time.Duration
processor func([]BatchItem) error
timer *time.Timer
mu sync.Mutex
}
func (bp *BatchProcessor) Add(item BatchItem) {
bp.mu.Lock()
defer bp.mu.Unlock()
bp.items = append(bp.items, item)
// Start timer on first item
if len(bp.items) == 1 {
bp.timer = time.AfterFunc(bp.timeout, bp.flush)
}
// Process immediately if batch is full
if len(bp.items) >= bp.maxBatch {
bp.flushLocked()
}
}
func (bp *BatchProcessor) flush() {
bp.mu.Lock()
defer bp.mu.Unlock()
bp.flushLocked()
}
func (bp *BatchProcessor) flushLocked() {
if len(bp.items) == 0 {
return
}
// Stop timer
if bp.timer != nil {
bp.timer.Stop()
}
// Process batch
items := make([]BatchItem, len(bp.items))
copy(items, bp.items)
bp.items = bp.items[:0] // Clear slice
// Process in background
go func() {
if err := bp.processor(items); err != nil {
log.Printf("Batch processing failed: %v", err)
}
}()
}
Error Handling & Monitoring
Retry Logic
Copy
func (s *Service) processWithRetry(ctx context.Context, task Task) error {
return s.retryWithBackoff(ctx, func() error {
return s.processTask(ctx, task)
}, 3, time.Second)
}
func (s *Service) retryWithBackoff(ctx context.Context, fn func() error, maxRetries int, baseDelay time.Duration) error {
var lastErr error
for attempt := 0; attempt < maxRetries; attempt++ {
if attempt > 0 {
delay := baseDelay * time.Duration(1<<uint(attempt-1)) // Exponential backoff
select {
case <-time.After(delay):
case <-ctx.Done():
return ctx.Err()
}
}
if err := fn(); err != nil {
lastErr = err
log.Printf("Attempt %d failed: %v", attempt+1, err)
continue
}
return nil // Success
}
return fmt.Errorf("failed after %d attempts: %w", maxRetries, lastErr)
}
Dead Letter Queues
Copy
type TaskHandler struct {
mainQueue chan Task
deadLetterQueue chan Task
maxRetries int
}
func (th *TaskHandler) ProcessTask(task Task) {
if task.Attempts >= th.maxRetries {
// Send to dead letter queue
select {
case th.deadLetterQueue <- task:
log.Printf("Task sent to dead letter queue after %d attempts", task.Attempts)
default:
log.Printf("Dead letter queue is full, dropping task")
}
return
}
if err := th.executeTask(task); err != nil {
// Increment attempts and retry
task.Attempts++
task.LastError = err.Error()
// Add delay before retry
go func() {
time.Sleep(time.Duration(task.Attempts) * time.Second)
th.mainQueue <- task
}()
}
}
Best Practices
- Scheduling
- Background Processing
- Performance
- Reliability
- Use appropriate cron expressions for timing - Consider timezone implications for global systems - Implement job overlap prevention - Monitor job execution times and success rates
- Use appropriate execution modes for different task types - Implement proper error handling and recovery - Monitor resource usage of background tasks - Set reasonable timeouts for long-running operations
- Use worker pools for concurrent processing - Implement batch processing for high-throughput scenarios - Monitor queue depths and processing latency
- Scale worker count based on load
- Implement retry logic with exponential backoff - Use dead letter queues for failed tasks - Add health checks for background services - Implement graceful shutdown procedures
Common Patterns
Data Synchronization
Data Synchronization
Copy
// Sync external data on schedule
var _ = cron.NewJob("sync-external-data", cron.JobConfig{
Title: "Sync External Data Sources",
Schedule: "0 2 * * *", // Daily at 2 AM
Endpoint: SyncExternalDataSources,
})
func SyncExternalDataSources(ctx context.Context) error {
// Get all active integrations
// Process each integration in background
// Update sync status
return nil
}
Cleanup Operations
Cleanup Operations
Copy
// Regular cleanup of temporary data
var _ = cron.NewJob("cleanup-temp-files", cron.JobConfig{
Title: "Cleanup Temporary Files",
Schedule: "0 1 * * *", // Daily at 1 AM
Endpoint: CleanupTemporaryFiles,
})
func CleanupTemporaryFiles(ctx context.Context) error {
// Find files older than 24 hours
// Delete temporary files
// Clean up database records
return nil
}
Report Generation
Report Generation
Copy
// Generate weekly reports
var _ = cron.NewJob("generate-weekly-reports", cron.JobConfig{
Title: "Generate Weekly Reports",
Schedule: "0 9 * * 1", // Mondays at 9 AM
Endpoint: GenerateWeeklyReports,
})
func GenerateWeeklyReports(ctx context.Context) error {
// Aggregate data from previous week
// Generate reports for each organization
// Send email notifications
return nil
}