Skip to main content

Creating Custom Integrations

Learn how to build custom integrations for data sources not yet supported by Erdo. This guide covers authentication, resource discovery, and data access patterns.

Integration Types

Erdo supports several integration patterns:
  • API Integration: For REST/GraphQL APIs
  • Database Integration: For SQL/NoSQL databases
  • File Integration: For file-based data sources
  • Streaming Integration: For real-time data feeds

File Structure

Organize your integration code following Erdo conventions:
backend/integration/providers/
├── your_service/
│   ├── provider.go          # Main provider implementation
│   ├── auth.go             # Authentication logic
│   ├── discovery.go        # Resource discovery
│   └── types.go            # Service-specific types

Basic API Integration

Provider Implementation

Create the main provider structure:
package your_service

import (
    "context"
    "net/http"

    "encore.app/backend/integration/provider"
    providerTypes "encore.app/backend/integration/provider/types"
)

type YourServiceProvider struct {
    *providerTypes.BaseProvider
    client *http.Client
}

func NewYourServiceProvider(config *integrationTypes.IntegrationConfig, integration *db.Integration, dataset *datasetDB.IntegrationDataset, serviceDB *sqldb.Database) *YourServiceProvider {
    baseProvider := &providerTypes.BaseProvider{
        Config:      config,
        Integration: integration,
        Dataset:     dataset,
        DB:          serviceDB,
    }

    return &YourServiceProvider{
        BaseProvider: baseProvider,
        client:       &http.Client{Timeout: 30 * time.Second},
    }
}

func (p *YourServiceProvider) VerifyConnection(ctx context.Context) error {
    // Test API connectivity
    req, err := http.NewRequestWithContext(ctx, "GET", "https://api.yourservice.com/health", nil)
    if err != nil {
        return err
    }

    if err := p.GetAuth().PrepareRequest(ctx, req); err != nil {
        return err
    }

    resp, err := p.client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    return nil
}

Authentication

Implement authentication for your service:
type YourServiceAuth struct {
    auth.BaseAuth
}

func (a *YourServiceAuth) PrepareRequest(ctx context.Context, req *http.Request) error {
    credentials, _, err := a.GetCredentials(ctx)
    if err != nil {
        return err
    }

    // Apply authentication (example: Bearer token)
    if token := credentials["access_token"]; token != "" {
        req.Header.Set("Authorization", "Bearer "+token)
    }

    return nil
}

Resource Discovery

Implement resource discovery:
func (p *YourServiceProvider) GetDatasetResources(ctx context.Context) ([]*resourceTypes.ResourceWithRelatedResources, error) {
    // Fetch available resources from the API
    req, err := http.NewRequestWithContext(ctx, "GET", "https://api.yourservice.com/resources", nil)
    if err != nil {
        return nil, err
    }

    if err := p.GetAuth().PrepareRequest(ctx, req); err != nil {
        return nil, err
    }

    resp, err := p.client.Do(req)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

    // Parse response and create resource objects
    var resources []*resourceTypes.ResourceWithRelatedResources
    // ... parsing logic ...

    return resources, nil
}

Database Integration

SQL Database Provider

For SQL databases, implement the database provider pattern:
type SQLProvider struct {
    *providerTypes.BaseProvider
    db *sql.DB
}

func (p *SQLProvider) VerifyConnection(ctx context.Context) error {
    return p.db.PingContext(ctx)
}

func (p *SQLProvider) GetDatasetResources(ctx context.Context) ([]*resourceTypes.ResourceWithRelatedResources, error) {
    // Query information schema to discover tables
    query := `
        SELECT table_name, table_schema
        FROM information_schema.tables
        WHERE table_type = 'BASE TABLE'
    `

    rows, err := p.db.QueryContext(ctx, query)
    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var resources []*resourceTypes.ResourceWithRelatedResources
    for rows.Next() {
        var tableName, schema string
        if err := rows.Scan(&tableName, &schema); err != nil {
            continue
        }

        resource := &resourceTypes.ResourceWithRelatedResources{
            Resource: resourceTypes.Resource{
                Name: tableName,
                Type: "table",
                Config: map[string]interface{}{
                    "schema": schema,
                },
            },
        }
        resources = append(resources, resource)
    }

    return resources, nil
}

File Integration

File System Provider

For file-based integrations:
type FileProvider struct {
    *providerTypes.BaseProvider
    basePath string
}

func (p *FileProvider) GetDatasetResources(ctx context.Context) ([]*resourceTypes.ResourceWithRelatedResources, error) {
    var resources []*resourceTypes.ResourceWithRelatedResources

    err := filepath.Walk(p.basePath, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }

        // Only include supported file types
        if p.isSupportedFile(path) {
            resource := &resourceTypes.ResourceWithRelatedResources{
                Resource: resourceTypes.Resource{
                    Name: info.Name(),
                    Type: "file",
                    Config: map[string]interface{}{
                        "path": path,
                        "size": info.Size(),
                    },
                },
            }
            resources = append(resources, resource)
        }

        return nil
    })

    return resources, err
}

func (p *FileProvider) isSupportedFile(path string) bool {
    ext := strings.ToLower(filepath.Ext(path))
    supportedExts := []string{".csv", ".json", ".xlsx", ".txt"}

    for _, supported := range supportedExts {
        if ext == supported {
            return true
        }
    }
    return false
}

Configuration

Integration Config

Define your integration configuration:
# config/integrations/your_service.yaml
name: "Your Service"
type: "api"
auth_type: "oauth2"
endpoints:
  base_url: "https://api.yourservice.com"
  auth_url: "https://auth.yourservice.com/oauth/authorize"
  token_url: "https://auth.yourservice.com/oauth/token"
scopes:
  - "read:data"
  - "read:metadata"
resource_discovery:
  enabled: true
  endpoints:
    - path: "/tables"
      type: "table"
    - path: "/views"
      type: "view"

Provider Registration

Register your provider in the integration system:
// In your provider package
func init() {
    provider.RegisterProvider("your_service", func(config *integrationTypes.IntegrationConfig, integration *db.Integration, dataset *datasetDB.IntegrationDataset, serviceDB *sqldb.Database) provider.Provider {
        return NewYourServiceProvider(config, integration, dataset, serviceDB)
    })
}

Testing Your Integration

Unit Tests

Create comprehensive tests for your integration:
func TestYourServiceProvider_VerifyConnection(t *testing.T) {
    // Setup test provider
    provider := NewYourServiceProvider(testConfig, testIntegration, testDataset, testDB)

    // Mock HTTP client for testing
    provider.client = &http.Client{
        Transport: &mockTransport{
            response: &http.Response{
                StatusCode: 200,
                Body:       ioutil.NopCloser(strings.NewReader(`{"status": "ok"}`)),
            },
        },
    }

    // Test connection verification
    err := provider.VerifyConnection(context.Background())
    assert.NoError(t, err)
}

Integration Tests

Test against real APIs (with test credentials):
func TestYourServiceProvider_Integration(t *testing.T) {
    if testing.Short() {
        t.Skip("Skipping integration test")
    }

    // Use test credentials
    config := &integrationTypes.IntegrationConfig{
        Credentials: map[string]string{
            "client_id":     os.Getenv("TEST_CLIENT_ID"),
            "client_secret": os.Getenv("TEST_CLIENT_SECRET"),
        },
    }

    provider := NewYourServiceProvider(config, nil, nil, nil)

    // Test real API calls
    err := provider.VerifyConnection(context.Background())
    assert.NoError(t, err)
}

Best Practices

Error Handling

Implement robust error handling:
func (p *YourServiceProvider) handleAPIError(resp *http.Response) error {
    if resp.StatusCode >= 400 {
        body, _ := ioutil.ReadAll(resp.Body)
        return fmt.Errorf("API error %d: %s", resp.StatusCode, string(body))
    }
    return nil
}

Rate Limiting

Respect API rate limits:
type RateLimiter struct {
    limiter *rate.Limiter
}

func (p *YourServiceProvider) makeAPICall(ctx context.Context, req *http.Request) (*http.Response, error) {
    // Wait for rate limit
    if err := p.rateLimiter.Wait(ctx); err != nil {
        return nil, err
    }

    return p.client.Do(req)
}

Caching

Implement response caching where appropriate:
func (p *YourServiceProvider) getResourcesWithCache(ctx context.Context) ([]*resourceTypes.ResourceWithRelatedResources, error) {
    // Check cache first
    if cached := p.cache.Get("resources"); cached != nil {
        return cached.([]*resourceTypes.ResourceWithRelatedResources), nil
    }

    // Fetch from API
    resources, err := p.fetchResources(ctx)
    if err != nil {
        return nil, err
    }

    // Cache results
    p.cache.Set("resources", resources, 5*time.Minute)

    return resources, nil
}

Deployment

Once your integration is complete:
  1. Test thoroughly with real data sources
  2. Document configuration requirements
  3. Submit for code review
  4. Deploy to staging environment
  5. Validate in production
Your integration will then be available for use in Erdo agents and workflows.