What I learned

I have been learning go for a while and always amaze by the simplicity of implementing concurrency. Today I relearned again about one of the concurrency patterns, worker pool.

With this pattern, we can easily create a pool of workers that can listen for incoming tasks and execute them concurrently. It’s very useful if we have a heavy task that we want to execute asynchronously, like sending emails, processing docs, etc..

In this example I tried to add some exponential backoff to ilustrate how we can handle failed tasks until we give up. Another important note here is we use buffered channel so that we can avoid blocking as long as the worker pool is not full. Generally it can be useful to increase responsiveness and throughput.

    package main

    import (
        "fmt"
        "log"
        "math"
        "math/rand"
        "sync"
        "time"
    )

    // Task is an interface that represent a task that needs to be executed
    type Task interface {
        Execute() error
        GetID() string
    }

    // EmailTask is a task that represent a task to send an email
    type EmailTask struct {
        ID      string
        To      string
        Subject string
        Body    string
    }

    // Execute executes the email task
    func (t *EmailTask) Execute() error {
        if rand.Float64() < 0.3 {
            return fmt.Errorf("Task %s failed to execute", t.ID)
        }

        log.Println("TASK ID:", t.ID, "DONE", "Email sent to", t.To, "with subject", t.Subject, "and body", t.Body)
        return nil
    }

    // TaskID returns the task ID
    func (t *EmailTask) GetID() string {
        return t.ID
    }

    // RetryConfig is a struct config for exponential backoff
    type RetryConfig struct {
        MaxRetries int
        BaseDelay  time.Duration
        Factor     float64
    }

    func doTaskWithExponentialBackoff(task Task, config RetryConfig) error {
        for i := 0; i < config.MaxRetries; i++ {
            err := task.Execute()
            if err == nil {
                return nil
            }

            if i < config.MaxRetries {
                backoffDuration := time.Duration(
                    float64(config.BaseDelay) * math.Pow(config.Factor, float64(i)),
                )

                log.Printf("[TASK: %s] Attempt %d failed. Retrying in %s...", task.GetID(), i, backoffDuration)
                time.Sleep(backoffDuration)
            }
        }

        return fmt.Errorf("[TASK: %s] failed after maximum retries", task.GetID())
    }

    type WorkerPool struct {
        jobs        chan Task
        wg          sync.WaitGroup
        workers     int
        retryConfig RetryConfig
        bufferSize  int
    }

    // NewWorkerPool creates a new worker pool with a buffered channel
    func NewWorkerPool(numWorkers int, bufferSize int, retryConfig RetryConfig) *WorkerPool {
        return &WorkerPool{
            jobs:        make(chan Task, bufferSize),
            workers:     numWorkers,
            retryConfig: retryConfig,
            wg:          sync.WaitGroup{},
            bufferSize:  bufferSize,
        }
    }

    // Start starts the worker pool
    func (wp *WorkerPool) Start() {
        for i := 0; i < wp.workers; i++ {
            wp.wg.Add(1)

            go func(workerID int) {
                defer wp.wg.Done()
                for task := range wp.jobs {
                    err := doTaskWithExponentialBackoff(task, wp.retryConfig)
                    if err != nil {
                        log.Printf("Worker %d failed to execute task: %v", workerID, err)
                    }
                }
            }(i)
        }
    }

    // Submit submits a task to the worker pool
    func (wp *WorkerPool) Submit(task Task) {
        wp.jobs <- task
    }

    // Stop stops the worker pool
    func (wp *WorkerPool) Stop() {
        close(wp.jobs)
        wp.wg.Wait()
    }

    func main() {
        cfg := RetryConfig{
            MaxRetries: 3,
            BaseDelay:  500 * time.Millisecond,
            Factor:     2.0,
        }

        // create a worker pool consist of 3 workers and a buffer size of 10
        pool := NewWorkerPool(3, 10, cfg)
        pool.Start()

        // Enqueue some email tasks
        for i := 1; i <= 20; i++ {
            pool.Submit(&EmailTask{
                ID:      fmt.Sprintf("%d", i),
                To:      fmt.Sprintf("user_%d", i),
                Subject: fmt.Sprintf("Hello from user %d", i),
                Body:    "Hello, this is a test email.",
            })
        }

        pool.Stop()
        fmt.Println("All tasks have been submitted and processed.")
    }

Some references