Урок 30: Паттерны отказоустойчивости — retry, circuit breaker, rate limiter

Урок 30. Паттерны отказоустойчивости — retry, circuit breaker, rate limiter

🔄 Node.js → Go (ключевые аналоги):

📋 Что изучаем

📦 Инициализация проекта

mkdir go-resilience && cd go-resilience
go mod init go-resilience

go get github.com/sony/gobreaker
go mod tidy

mkdir -p internal/resilience
mkdir -p cmd/demo

💻 Файл: internal/resilience/retry.go

package resilience

import (
    "context"
    "fmt"
    "math"
    "math/rand"
    "time"
)

// ╔══════════════════════════════════════════════════════════╗
// ║  RETRY С EXPONENTIAL BACKOFF + JITTER                  ║
// ╚══════════════════════════════════════════════════════════╝

// RetryConfig — конфигурация повторных попыток
type RetryConfig struct {
    MaxAttempts     int           // Максимальное количество попыток
    InitialInterval time.Duration // Начальная задержка
    MaxInterval     time.Duration // Максимальная задержка
    Multiplier      float64       // Множитель (обычно 2.0)
    Jitter          bool          // Добавлять случайность
}

// DefaultRetryConfig — разумные значения по умолчанию
func DefaultRetryConfig() RetryConfig {
    return RetryConfig{
        MaxAttempts:     3,
        InitialInterval: 100 * time.Millisecond,
        MaxInterval:     10 * time.Second,
        Multiplier:      2.0,
        Jitter:          true,
    }
}

// IsRetryable — функция, определяющая, можно ли ретраить ошибку
type IsRetryable func(err error) bool

// Retry — выполняет функцию с повторными попытками
func Retry(
    ctx context.Context,
    cfg RetryConfig,
    isRetryable IsRetryable,
    operation func(ctx context.Context) error,
) error {
    var lastErr error

    for attempt := 0; attempt < cfg.MaxAttempts; attempt++ {
        // Проверяем контекст перед попыткой
        select {
        case <-ctx.Done():
            return fmt.Errorf("retry cancelled: %w", ctx.Err())
        default:
        }

        // Выполняем операцию
        err := operation(ctx)
        if err == nil {
            return nil // Успех
        }

        lastErr = err

        // Проверяем, можно ли ретраить эту ошибку
        if !isRetryable(err) {
            return fmt.Errorf("non-retryable error: %w", err)
        }

        // Это была последняя попытка?
        if attempt == cfg.MaxAttempts-1 {
            break
        }

        // Вычисляем задержку
        delay := calculateBackoff(cfg, attempt)

        // Ждём с учётом контекста
        select {
        case <-ctx.Done():
            return fmt.Errorf("retry cancelled during backoff: %w", ctx.Err())
        case <-time.After(delay):
        }
    }

    return fmt.Errorf("all %d attempts failed: %w", cfg.MaxAttempts, lastErr)
}

// RetryWithResult — то же самое, но возвращает результат
func RetryWithResult[T any](
    ctx context.Context,
    cfg RetryConfig,
    isRetryable IsRetryable,
    operation func(ctx context.Context) (T, error),
) (T, error) {
    var lastErr error
    var zero T

    for attempt := 0; attempt < cfg.MaxAttempts; attempt++ {
        select {
        case <-ctx.Done():
            return zero, fmt.Errorf("retry cancelled: %w", ctx.Err())
        default:
        }

        result, err := operation(ctx)
        if err == nil {
            return result, nil
        }

        lastErr = err

        if !isRetryable(err) {
            return zero, fmt.Errorf("non-retryable error: %w", err)
        }

        if attempt == cfg.MaxAttempts-1 {
            break
        }

        delay := calculateBackoff(cfg, attempt)

        select {
        case <-ctx.Done():
            return zero, fmt.Errorf("retry cancelled during backoff: %w", ctx.Err())
        case <-time.After(delay):
        }
    }

    return zero, fmt.Errorf("all %d attempts failed: %w", cfg.MaxAttempts, lastErr)
}

// calculateBackoff — exponential backoff с jitter
func calculateBackoff(cfg RetryConfig, attempt int) time.Duration {
    backoff := float64(cfg.InitialInterval) * math.Pow(cfg.Multiplier, float64(attempt))

    if backoff > float64(cfg.MaxInterval) {
        backoff = float64(cfg.MaxInterval)
    }

    if cfg.Jitter {
        // Добавляем случайность ±25%
        jitter := (rand.Float64() - 0.5) * 0.5 * backoff
        backoff += jitter
    }

    if backoff < 0 {
        backoff = float64(cfg.InitialInterval)
    }

    return time.Duration(backoff)
}

💻 Файл: internal/resilience/circuit_breaker.go

package resilience

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// ╔══════════════════════════════════════════════════════════╗
// ║  CIRCUIT BREAKER (ручная реализация)                   ║
// ╚══════════════════════════════════════════════════════════╝

// CircuitState — состояние Circuit Breaker
type CircuitState int

const (
    CircuitClosed   CircuitState = iota // Работает нормально
    CircuitOpen                         // Запросы блокируются
    CircuitHalfOpen                     // Пробный запрос
)

func (s CircuitState) String() string {
    switch s {
    case CircuitClosed:   return "CLOSED"
    case CircuitOpen:     return "OPEN"
    case CircuitHalfOpen: return "HALF_OPEN"
    default:              return "UNKNOWN"
    }
}

// CircuitBreakerConfig — конфигурация
type CircuitBreakerConfig struct {
    MaxFailures       int           // Максимум ошибок до размыкания
    FailureThreshold  float64       // Процент ошибок для размыкания (0.0-1.0)
    OpenTimeout       time.Duration // Сколько ждать до Half-Open
    HalfOpenMaxReqs   int           // Максимум запросов в Half-Open
    WindowDuration    time.Duration // Окно для подсчёта ошибок
}

// CircuitBreaker — реализация паттерна Circuit Breaker
type CircuitBreaker struct {
    config      CircuitBreakerConfig
    state       CircuitState
    mu          sync.RWMutex
    failures    int
    successes   int
    lastFailure time.Time
    windowStart time.Time
    halfOpenReqs int
}

// NewCircuitBreaker — создаёт Circuit Breaker
func NewCircuitBreaker(cfg CircuitBreakerConfig) *CircuitBreaker {
    return &CircuitBreaker{
        config:      cfg,
        state:       CircuitClosed,
        windowStart: time.Now(),
    }
}

// Execute — выполняет операцию через Circuit Breaker
func (cb *CircuitBreaker) Execute(ctx context.Context, operation func(ctx context.Context) error) error {
    // Проверяем состояние
    if err := cb.beforeRequest(); err != nil {
        return err
    }

    // Выполняем операцию
    err := operation(ctx)

    // Обновляем состояние
    cb.afterRequest(err)

    return err
}

// ExecuteWithResult — выполняет операцию, возвращающую результат
func (cb *CircuitBreaker) ExecuteWithResult[T any](
    ctx context.Context,
    operation func(ctx context.Context) (T, error),
) (T, error) {
    var zero T

    if err := cb.beforeRequest(); err != nil {
        return zero, err
    }

    result, err := operation(ctx)
    cb.afterRequest(err)

    return result, err
}

// beforeRequest — проверяет, можно ли выполнить запрос
func (cb *CircuitBreaker) beforeRequest() error {
    cb.mu.Lock()
    defer cb.mu.Unlock()

    now := time.Now()

    switch cb.state {
    case CircuitClosed:
        // Сбрасываем окно, если прошло WindowDuration
        if now.Sub(cb.windowStart) > cb.config.WindowDuration {
            cb.failures = 0
            cb.successes = 0
            cb.windowStart = now
        }
        return nil

    case CircuitOpen:
        // Проверяем, не пора ли в Half-Open
        if now.Sub(cb.lastFailure) > cb.config.OpenTimeout {
            cb.state = CircuitHalfOpen
            cb.halfOpenReqs = 0
            return nil
        }
        return fmt.Errorf("circuit breaker is OPEN")

    case CircuitHalfOpen:
        // Разрешаем ограниченное количество запросов
        if cb.halfOpenReqs < cb.config.HalfOpenMaxReqs {
            cb.halfOpenReqs++
            return nil
        }
        return fmt.Errorf("circuit breaker is HALF_OPEN (max requests reached)")
    }

    return nil
}

// afterRequest — обновляет состояние после запроса
func (cb *CircuitBreaker) afterRequest(err error) {
    cb.mu.Lock()
    defer cb.mu.Unlock()

    if err != nil {
        cb.failures++
        cb.lastFailure = time.Now()

        // Проверяем, не пора ли разомкнуть
        total := cb.failures + cb.successes
        if total > 0 {
            failureRate := float64(cb.failures) / float64(total)
            if cb.failures >= cb.config.MaxFailures || failureRate >= cb.config.FailureThreshold {
                cb.state = CircuitOpen
            }
        }

        if cb.state == CircuitHalfOpen {
            // Ошибка в Half-Open → снова Open
            cb.state = CircuitOpen
        }
    } else {
        cb.successes++

        if cb.state == CircuitHalfOpen {
            // Успех в Half-Open → закрываем
            cb.state = CircuitClosed
            cb.failures = 0
            cb.successes = 0
            cb.windowStart = time.Now()
        }
    }
}

// State — возвращает текущее состояние
func (cb *CircuitBreaker) State() CircuitState {
    cb.mu.RLock()
    defer cb.mu.RUnlock()
    return cb.state
}

// Stats — возвращает статистику
func (cb *CircuitBreaker) Stats() (state CircuitState, failures, successes int) {
    cb.mu.RLock()
    defer cb.mu.RUnlock()
    return cb.state, cb.failures, cb.successes
}

💻 Файл: internal/resilience/rate_limiter.go

package resilience

import (
    "sync"
    "time"
)

// ╔══════════════════════════════════════════════════════════╗
// ║  SLIDING WINDOW RATE LIMITER                           ║
// ╚══════════════════════════════════════════════════════════╝

// SlidingWindowLimiter — rate limiter на основе скользящего окна
type SlidingWindowLimiter struct {
    mu           sync.Mutex
    windowSize   time.Duration
    maxRequests  int
    timestamps   []time.Time
}

// NewSlidingWindowLimiter — создаёт limiter
// maxRequests — максимум запросов за windowSize
func NewSlidingWindowLimiter(maxRequests int, windowSize time.Duration) *SlidingWindowLimiter {
    return &SlidingWindowLimiter{
        windowSize:  windowSize,
        maxRequests: maxRequests,
        timestamps:  make([]time.Time, 0, maxRequests),
    }
}

// Allow — проверяет, разрешён ли запрос
func (l *SlidingWindowLimiter) Allow() bool {
    l.mu.Lock()
    defer l.mu.Unlock()

    now := time.Now()
    windowStart := now.Add(-l.windowSize)

    // Удаляем старые записи за пределами окна
    cutOff := 0
    for i, ts := range l.timestamps {
        if ts.After(windowStart) {
            cutOff = i
            break
        }
    }
    l.timestamps = l.timestamps[cutOff:]

    // Проверяем лимит
    if len(l.timestamps) >= l.maxRequests {
        return false
    }

    // Добавляем текущий запрос
    l.timestamps = append(l.timestamps, now)
    return true
}

// ╔══════════════════════════════════════════════════════════╗
// ║  ADAPTIVE RATE LIMITER (реагирует на ошибки)           ║
// ╚══════════════════════════════════════════════════════════╝

// AdaptiveLimiter — снижает лимит при ошибках
type AdaptiveLimiter struct {
    mu            sync.Mutex
    maxRequests   int
    currentLimit  int
    windowSize    time.Duration
    timestamps    []time.Time
    consecutiveErrors int
}

func NewAdaptiveLimiter(maxRequests int, windowSize time.Duration) *AdaptiveLimiter {
    return &AdaptiveLimiter{
        maxRequests:  maxRequests,
        currentLimit: maxRequests,
        windowSize:   windowSize,
        timestamps:   make([]time.Time, 0, maxRequests),
    }
}

func (l *AdaptiveLimiter) Allow() bool {
    l.mu.Lock()
    defer l.mu.Unlock()

    now := time.Now()
    windowStart := now.Add(-l.windowSize)

    cutOff := 0
    for i, ts := range l.timestamps {
        if ts.After(windowStart) {
            cutOff = i
            break
        }
    }
    l.timestamps = l.timestamps[cutOff:]

    if len(l.timestamps) >= l.currentLimit {
        return false
    }

    l.timestamps = append(l.timestamps, now)
    return true
}

// RecordSuccess — увеличивает лимит при успехе
func (l *AdaptiveLimiter) RecordSuccess() {
    l.mu.Lock()
    defer l.mu.Unlock()
    l.consecutiveErrors = 0
    // Медленно восстанавливаем лимит
    if l.currentLimit < l.maxRequests {
        l.currentLimit++
    }
}

// RecordError — уменьшает лимит при ошибке
func (l *AdaptiveLimiter) RecordError() {
    l.mu.Lock()
    defer l.mu.Unlock()
    l.consecutiveErrors++
    // Быстро снижаем лимит
    l.currentLimit = l.currentLimit / 2
    if l.currentLimit < 1 {
        l.currentLimit = 1
    }
}

💻 Файл: cmd/demo/main.go

package main

import (
    "context"
    "errors"
    "fmt"
    "log"
    "math/rand"
    "time"

    "go-resilience/internal/resilience"
)

func main() {
    fmt.Println("╔══════════════════════════════════════════╗")
    fmt.Println("║   ПАТТЕРНЫ ОТКАЗОУСТОЙЧИВОСТИ          ║")
    fmt.Println("╚══════════════════════════════════════════╝")

    // ==========================================
    // 1. RETRY
    // ==========================================
    fmt.Println("\n── 1. RETRY С BACKOFF ──")
    ctx := context.Background()
    cfg := resilience.DefaultRetryConfig()

    attempts := 0
    err := resilience.Retry(ctx, cfg, func(err error) bool {
        return true // Все ошибки ретраимые
    }, func(ctx context.Context) error {
        attempts++
        if attempts < 3 {
            return errors.New("temporary error")
        }
        return nil // Успех на 3-й попытке
    })

    if err != nil {
        fmt.Printf("  ❌ Ошибка: %v\n", err)
    } else {
        fmt.Printf("  ✅ Успех после %d попыток\n", attempts)
    }

    // ==========================================
    // 2. CIRCUIT BREAKER
    // ==========================================
    fmt.Println("\n── 2. CIRCUIT BREAKER ──")

    cb := resilience.NewCircuitBreaker(resilience.CircuitBreakerConfig{
        MaxFailures:      3,
        FailureThreshold: 0.5, // 50% ошибок
        OpenTimeout:      2 * time.Second,
        HalfOpenMaxReqs:  2,
        WindowDuration:   10 * time.Second,
    })

    // Имитация падающего сервиса
    for i := 1; i <= 10; i++ {
        state, failures, successes := cb.Stats()
        fmt.Printf("  Запрос %2d: состояние=%s ошибки=%d успехи=%d",
            i, state, failures, successes)

        err := cb.Execute(ctx, func(ctx context.Context) error {
            if i <= 4 {
                return errors.New("service unavailable")
            }
            return nil
        })

        if err != nil {
            fmt.Printf("%v\n", err)
        } else {
            fmt.Println("✅ OK")
        }
        time.Sleep(300 * time.Millisecond)
    }

    // ==========================================
    // 3. RATE LIMITER
    // ==========================================
    fmt.Println("\n── 3. SLIDING WINDOW RATE LIMITER ──")

    limiter := resilience.NewSlidingWindowLimiter(5, 2*time.Second) // 5 запросов за 2 секунды

    for i := 1; i <= 10; i++ {
        allowed := limiter.Allow()
        if allowed {
            fmt.Printf("  Запрос %2d: ✅ РАЗРЕШЁН\n", i)
        } else {
            fmt.Printf("  Запрос %2d: ❌ ОТКЛОНЁН (rate limit)\n", i)
        }
        time.Sleep(100 * time.Millisecond)
    }

    // ==========================================
    // 4. КОМБИНАЦИЯ ПАТТЕРНОВ
    // ==========================================
    fmt.Println("\n── 4. КОМБИНАЦИЯ (Circuit Breaker + Retry) ──")

    reliableCall := func(ctx context.Context) error {
        // Circuit Breaker
        return cb.Execute(ctx, func(ctx context.Context) error {
            // Retry внутри
            return resilience.Retry(ctx, cfg, func(err error) bool {
				return errors.Is(err, context.DeadlineExceeded)
			}, func(ctx context.Context) error {
                if rand.Float64() < 0.5 {
                    return errors.New("random failure")
                }
                return nil
            })
        })
    }

    for i := 1; i <= 5; i++ {
        err := reliableCall(ctx)
        if err != nil {
            fmt.Printf("  Вызов %d: ❌ %v\n", i, err)
        } else {
            fmt.Printf("  Вызов %d: ✅ OK\n", i)
        }
    }

    fmt.Println("\n✅ Демонстрация завершена!")
}

🚀 Запуск

go run ./cmd/demo/main.go

📊 Паттерны отказоустойчивости

ПаттернПроблемаРешениеКогда применять
RetryВременные сбоиПовтор с backoffСетевые ошибки, таймауты БД
Circuit BreakerКаскадные отказыОтключение проблемного сервисаВнешние API, микросервисы
Rate LimiterПерегрузкаОграничение скоростиПубличные API, защита от DDoS
BulkheadИстощение ресурсовИзоляция пулов соединенийРазные БД, разные сервисы
TimeoutЗависаниеcontext.WithTimeoutЛюбой внешний вызов

📊 Состояния Circuit Breaker

СостояниеЗапросыПереход
CLOSEDПроходят нормально→ OPEN при превышении порога ошибок
OPENНемедленно отклоняются→ HALF_OPEN после OpenTimeout
HALF_OPENПропускаются ограниченно→ CLOSED при успехе / → OPEN при ошибке
⚠️ Типичные ошибки:
💡 Практический совет:

💡 Best practices от сеньоров:

🔑 Ключевые концепции

Что нужно запомнить из этого урока:

💡 Для Node.js разработчика:

← Предыдущий урок Следующий урок →